You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by lp...@apache.org on 2020/04/15 13:01:38 UTC

[hive] branch master updated: HIVE-23107: Remove MIN_HISTORY_LEVEL table (Laszlo Pinter, reviewed by Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

lpinter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 01035cd  HIVE-23107: Remove MIN_HISTORY_LEVEL table (Laszlo Pinter, reviewed by Peter Vary)
01035cd is described below

commit 01035cd6097bd0df1deee35e5d375a80fc6e4dc7
Author: Laszlo Pinter <lp...@cloudera.com>
AuthorDate: Wed Apr 15 15:01:21 2020 +0200

    HIVE-23107: Remove MIN_HISTORY_LEVEL table (Laszlo Pinter, reviewed by Peter Vary)
---
 .../hadoop/hive/ql/txn/compactor/Cleaner.java      |   2 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java    |  12 --
 .../hadoop/hive/metastore/txn/CompactionInfo.java  |   1 +
 .../hive/metastore/txn/CompactionTxnHandler.java   | 132 ++++++++++-----------
 .../hadoop/hive/metastore/txn/TxnDbUtil.java       |   3 +-
 .../hadoop/hive/metastore/txn/TxnHandler.java      |  53 +--------
 .../apache/hadoop/hive/metastore/txn/TxnStore.java |  18 ++-
 .../src/main/sql/derby/hive-schema-4.0.0.derby.sql |  11 +-
 .../sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql     |   4 +
 .../src/main/sql/mssql/hive-schema-4.0.0.mssql.sql |  12 +-
 .../sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql     |   4 +
 .../src/main/sql/mysql/hive-schema-4.0.0.mysql.sql |  11 +-
 .../sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql     |   4 +
 .../main/sql/oracle/hive-schema-4.0.0.oracle.sql   |  11 +-
 .../sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql   |   4 +
 .../sql/postgres/hive-schema-4.0.0.postgres.sql    |  11 +-
 .../postgres/upgrade-3.2.0-to-4.0.0.postgres.sql   |   4 +
 17 files changed, 108 insertions(+), 189 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 54b616e..5fa3d9a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -92,7 +92,7 @@ public class Cleaner extends MetaStoreCompactorThread {
       try {
         handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
         startedAt = System.currentTimeMillis();
-        long minOpenTxnId = txnHandler.findMinOpenTxnId();
+        long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
         for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) {
           clean(compactionInfo, minOpenTxnId);
         }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index f3834cc..2c13e8d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -2045,13 +2045,10 @@ public class TestTxnCommands2 {
 
     // All inserts are committed and hence would expect in TXN_TO_WRITE_ID, 3 entries for acidTbl
     // and 2 entries for acidTblPart as each insert would have allocated a writeid.
-    // Also MIN_HISTORY_LEVEL won't have any entries as no reference for open txns.
     String acidTblWhereClause = " where t2w_database = " + quoteString("default")
             + " and t2w_table = " + quoteString(Table.ACIDTBL.name().toLowerCase());
     String acidTblPartWhereClause = " where t2w_database = " + quoteString("default")
             + " and t2w_table = " + quoteString(Table.ACIDTBLPART.name().toLowerCase());
-    Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from MIN_HISTORY_LEVEL"),
-            0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from MIN_HISTORY_LEVEL"));
     Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblWhereClause),
             3, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblWhereClause));
     Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblPartWhereClause),
@@ -2091,13 +2088,10 @@ public class TestTxnCommands2 {
 
     // We would expect 4 entries in TXN_TO_WRITE_ID as each insert would have allocated a writeid
     // including aborted one.
-    // Also MIN_HISTORY_LEVEL will have 1 entry for the open txn.
     Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblWhereClause),
             3, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblWhereClause));
     Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblPartWhereClause),
             1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblPartWhereClause));
-    Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from MIN_HISTORY_LEVEL"),
-            1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from MIN_HISTORY_LEVEL"));
 
     // The entry relevant to aborted txns shouldn't be removed from TXN_TO_WRITE_ID as
     // aborted txn would be removed from TXNS only after the compaction. Also, committed txn > open txn is retained.
@@ -2112,7 +2106,6 @@ public class TestTxnCommands2 {
     // The cleaner will removed aborted txns data/metadata but cannot remove aborted txn2 from TXN_TO_WRITE_ID
     // as there is a open txn < aborted txn2. The aborted txn1 < open txn and will be removed.
     // Also, committed txn > open txn is retained.
-    // MIN_HISTORY_LEVEL will have 1 entry for the open txn.
     txnHandler.compact(new CompactionRequest("default", Table.ACIDTBL.name().toLowerCase(), CompactionType.MAJOR));
     runWorker(hiveConf);
     runCleaner(hiveConf);
@@ -2120,18 +2113,13 @@ public class TestTxnCommands2 {
     txnHandler.cleanTxnToWriteIdTable();
     Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID"),
             2, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID"));
-    Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from MIN_HISTORY_LEVEL"),
-            1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from MIN_HISTORY_LEVEL"));
 
     // Commit the open txn, which lets the cleanup on TXN_TO_WRITE_ID.
-    // Now all txns are removed from MIN_HISTORY_LEVEL. So, all entries from TXN_TO_WRITE_ID would be cleaned.
     txnMgr.commitTxn();
     txnHandler.cleanTxnToWriteIdTable();
 
     Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID"),
             0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID"));
-    Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from MIN_HISTORY_LEVEL"),
-            0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from MIN_HISTORY_LEVEL"));
   }
 
   private void verifyDirAndResult(int expectedDeltas) throws Exception {
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
index bf91ae7..70d63ab 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
@@ -225,6 +225,7 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
     cr.setWorkerId(ci.workerId);
     cr.setHighestWriteId(ci.highestWriteId);
     cr.setErrorMessage(ci.errorMessage);
+
     return cr;
   }
 
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 19a95b6..2344c2d 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -221,8 +221,10 @@ class CompactionTxnHandler extends TxnHandler {
       try {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
-        String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_STATE\" = '" + READY_FOR_CLEANING + "', " +
-          "\"CQ_WORKER_ID\" = NULL WHERE \"CQ_ID\" = " + info.id;
+        String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_STATE\" = '" + READY_FOR_CLEANING + "', "
+            + "\"CQ_WORKER_ID\" = NULL, \"CQ_NEXT_TXN_ID\" = "
+            + "(SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\")"
+            + " WHERE \"CQ_ID\" = " + info.id;
         LOG.debug("Going to execute update <" + s + ">");
         int updCnt = stmt.executeUpdate(s);
         if (updCnt != 1) {
@@ -302,57 +304,6 @@ class CompactionTxnHandler extends TxnHandler {
       return findReadyToClean();
     }
   }
-  @Override
-  public long findMinOpenTxnId() throws MetaException {
-    Connection dbConn = null;
-    Statement stmt = null;
-    ResultSet rs = null;
-    try {
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        return findMinOpenTxnGLB(stmt);
-      } catch (SQLException e) {
-        LOG.error("Unable to findMinOpenTxnId() due to:" + e.getMessage());
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "findMinOpenTxnId");
-        throw new MetaException("Unable to execute findMinOpenTxnId() " +
-            StringUtils.stringifyException(e));
-      } finally {
-        close(rs, stmt, dbConn);
-      }
-    } catch (RetryException e) {
-      return findMinOpenTxnId();
-    }
-  }
-
-  /**
-   * See doc at {@link TxnStore#findMinOpenTxnId()}
-   * Note that {@link #openTxns(OpenTxnRequest)} makes update of NEXT_TXN and MIN_HISTORY_LEVEL
-   * a single atomic operation (and no one else should update these tables except the cleaner
-   * which deletes rows from MIN_HISTORY_LEVEL which can only allow minOpenTxn to move higher)
-   */
-  private long findMinOpenTxnGLB(Statement stmt) throws MetaException, SQLException {
-    String s = "SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\"";
-    LOG.debug("Going to execute query <" + s + ">");
-    ResultSet rs = stmt.executeQuery(s);
-    if (!rs.next()) {
-      throw new MetaException("Transaction tables not properly " +
-          "initialized, no record found in next_txn_id");
-    }
-    long hwm = rs.getLong(1);
-    s = "SELECT MIN(\"MHL_MIN_OPEN_TXNID\") FROM \"MIN_HISTORY_LEVEL\"";
-    LOG.debug("Going to execute query <" + s + ">");
-    rs = stmt.executeQuery(s);
-    rs.next();
-    long minOpenTxnId = rs.getLong(1);
-    if(rs.wasNull()) {
-      return hwm;
-    }
-    //since generating new txnid uses select for update on single row in NEXT_TXN_ID
-    assert hwm >= minOpenTxnId : "(hwm, minOpenTxnId)=(" + hwm + "," + minOpenTxnId + ")";
-    return minOpenTxnId;
-  }
 
   /**
    * This will remove an entry from the queue after
@@ -523,7 +474,7 @@ class CompactionTxnHandler extends TxnHandler {
   }
   /**
    * Clean up entries from TXN_TO_WRITE_ID table less than min_uncommited_txnid as found by
-   * min(NEXT_TXN_ID.ntxn_next, min(MIN_HISTORY_LEVEL.mhl_min_open_txnid), min(Aborted TXNS.txn_id)).
+   * min(NEXT_TXN_ID.ntxn_next, min(WRITE_SET.WS_COMMIT_ID), min(Aborted TXNS.txn_id)).
    */
   @Override
   @RetrySemantics.SafeToRetry
@@ -542,25 +493,27 @@ class CompactionTxnHandler extends TxnHandler {
         // First need to find the min_uncommitted_txnid which is currently seen by any open transactions.
         // If there are no txns which are currently open or aborted in the system, then current value of
         // NEXT_TXN_ID.ntxn_next could be min_uncommitted_txnid.
-        long minUncommittedTxnId = findMinOpenTxnGLB(stmt);
-
-        // If there are aborted txns, then the minimum aborted txnid could be the min_uncommitted_txnid
-        // if lesser than both NEXT_TXN_ID.ntxn_next and min(MIN_HISTORY_LEVEL .mhl_min_open_txnid).
-        String s = "SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_ABORTED);
+        String s = "SELECT MIN(\"RES\".\"ID\") AS \"ID\" FROM (" +
+            "SELECT MIN(\"NTXN_NEXT\") AS \"ID\" FROM \"NEXT_TXN_ID\" " +
+            "UNION " +
+            "SELECT MIN(\"WS_COMMIT_ID\") AS \"ID\" FROM \"WRITE_SET\" " +
+            "UNION " +
+            "SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_ABORTED) +
+            " OR \"TXN_STATE\" = " + quoteChar(TXN_OPEN) +
+            ") \"RES\"";
         LOG.debug("Going to execute query <" + s + ">");
         rs = stmt.executeQuery(s);
-        if (rs.next()) {
-          long minAbortedTxnId = rs.getLong(1);
-          if (minAbortedTxnId > 0) {
-            minUncommittedTxnId = Math.min(minAbortedTxnId, minUncommittedTxnId);
-          }
+        if (!rs.next()) {
+          throw new MetaException("Transaction tables not properly initialized, no record found in NEXT_TXN_ID");
         }
+        long minUncommitedTxnid = rs.getLong(1);
+
         // As all txns below min_uncommitted_txnid are either committed or empty_aborted, we are allowed
         // to cleanup the entries less than min_uncommitted_txnid from the TXN_TO_WRITE_ID table.
-        s = "DELETE FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_TXNID\" < " + minUncommittedTxnId;
+        s = "DELETE FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_TXNID\" < " + minUncommitedTxnid;
         LOG.debug("Going to execute delete <" + s + ">");
         int rc = stmt.executeUpdate(s);
-        LOG.info("Removed " + rc + " rows from TXN_TO_WRITE_ID with Txn Low-Water-Mark: " + minUncommittedTxnId);
+        LOG.info("Removed " + rc + " rows from TXN_TO_WRITE_ID with Txn Low-Water-Mark: " + minUncommitedTxnid);
 
         LOG.debug("Going to commit");
         dbConn.commit();
@@ -1168,6 +1121,53 @@ class CompactionTxnHandler extends TxnHandler {
       setHadoopJobId(hadoopJobId, id);
     }
   }
+
+  @Override
+  @RetrySemantics.Idempotent
+  public long findMinOpenTxnIdForCleaner() throws MetaException{
+    Connection dbConn = null;
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      try {
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        String query = "SELECT COUNT(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN);
+        LOG.debug("Going to execute query <" + query + ">");
+        rs = stmt.executeQuery(query);
+        if (!rs.next()) {
+          throw new MetaException("Transaction tables not properly initialized.");
+        }
+        long numOpenTxns = rs.getLong(1);
+        if (numOpenTxns > 0) {
+          query = "SELECT MIN(\"RES\".\"ID\") FROM (" +
+              "SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN) +
+              " UNION " +
+              "SELECT MAX(\"CQ_NEXT_TXN_ID\") AS \"ID\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = "
+              + quoteChar(READY_FOR_CLEANING) +
+              ") \"RES\"";
+        } else {
+          query = "SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\"";
+        }
+        LOG.debug("Going to execute query <" + query + ">");
+        rs = stmt.executeQuery(query);
+        if (!rs.next()) {
+          throw new MetaException("Transaction tables not properly initialized, no record found in NEXT_TXN_ID");
+        }
+        return rs.getLong(1);
+      } catch (SQLException e) {
+        LOG.error("Unable to getMinOpenTxnIdForCleaner", e);
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "getMinOpenTxnForCleaner");
+        throw new MetaException("Unable to execute getMinOpenTxnIfForCleaner() " +
+            StringUtils.stringifyException(e));
+      } finally {
+        close(rs, stmt, dbConn);
+      }
+    } catch (RetryException e) {
+      return findMinOpenTxnIdForCleaner();
+    }
+  }
 }
 
 
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index 620c77e..a66e169 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -173,7 +173,8 @@ public final class TxnDbUtil {
           " CQ_HIGHEST_WRITE_ID bigint," +
           " CQ_META_INFO varchar(2048) for bit data," +
           " CQ_HADOOP_JOB_ID varchar(32)," +
-          " CQ_ERROR_MESSAGE clob)");
+          " CQ_ERROR_MESSAGE clob," +
+          " CQ_NEXT_TXN_ID bigint)");
 
       stmt.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)");
       stmt.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)");
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 7d0db0c..be4d63c 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -645,48 +645,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         pst.execute();
       }
 
-      // Need to register minimum open txnid for current transactions into MIN_HISTORY table.
-      // For a single txn we can do it in a single insert. With multiple txns calculating the
-      // minOpenTxnId for every insert is not cost effective, so caching the value
-      if (txnIds.size() == 1) {
-        s = "INSERT INTO \"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\",\"MHL_MIN_OPEN_TXNID\") " +
-                "SELECT ?, MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN);
-        LOG.debug("Going to execute query <" + s + ">");
-        try (PreparedStatement pstmt = dbConn.prepareStatement(s)) {
-          pstmt.setLong(1, txnIds.get(0));
-          pstmt.execute();
-        }
-        LOG.info("Added entries to MIN_HISTORY_LEVEL with a single query for current txn: " + txnIds);
-      } else {
-        s = "SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN);
-        LOG.debug("Going to execute query <" + s + ">");
-        long minOpenTxnId = -1L;
-        try(ResultSet minOpenTxnIdRs = stmt.executeQuery(s)) {
-          if (!minOpenTxnIdRs.next()) {
-            throw new IllegalStateException("Scalar query returned no rows?!?!!");
-          }
-          // TXNS table should have at least one entry because we just inserted the newly opened txns.
-          // So, min(txn_id) would be a non-zero txnid.
-          minOpenTxnId = minOpenTxnIdRs.getLong(1);
-        }
-
-        assert (minOpenTxnId > 0);
-        rows.clear();
-        for (long txnId = first; txnId < first + numTxns; txnId++) {
-          rows.add(txnId + ", " + minOpenTxnId);
-        }
-
-        // Insert transaction entries into MIN_HISTORY_LEVEL.
-        List<String> inserts = sqlGenerator.createInsertValuesStmt(
-            "\"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\", \"MHL_MIN_OPEN_TXNID\")", rows);
-        for (String insert : inserts) {
-          LOG.debug("Going to execute insert <" + insert + ">");
-          stmt.execute(insert);
-        }
-        LOG.info("Added entries to MIN_HISTORY_LEVEL for current txns: (" + txnIds
-                     + ") with min_open_txn: " + minOpenTxnId);
-      }
-
       if (rqst.isSetReplPolicy()) {
         List<String> rowsRepl = new ArrayList<>();
         for (PreparedStatement pst : insertPreparedStmts) {
@@ -1330,7 +1288,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy());
         }
         cleanUpTxnRelatedMetadata(txnid, stmt);
-
         // update the key/value associated with the transaction if it has been
         // set
         if (rqst.isSetKeyValue()) {
@@ -1391,10 +1348,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid,
         "DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid,
         "DELETE FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnid,
-        "DELETE FROM \"MIN_HISTORY_LEVEL\" WHERE \"MHL_TXNID\" = " + txnid,
         "DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid);
     executeQueriesInBatch(stmt, queries, conf);
-    LOG.info("Removed committed transaction: (" + txnid + ") from MIN_HISTORY_LEVEL");
   }
 
   /**
@@ -4280,20 +4235,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"TXN_ID\"", true, false);
       int numUpdateQueries = queries.size();
 
-      // add delete min history queries to query list
-      prefix.setLength(0);
-      suffix.setLength(0);
-      prefix.append("DELETE FROM \"MIN_HISTORY_LEVEL\" WHERE ");
-      TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"MHL_TXNID\"", false, false);
-
       // add delete hive locks queries to query list
       prefix.setLength(0);
+      suffix.setLength(0);
       prefix.append("DELETE FROM \"HIVE_LOCKS\" WHERE ");
       TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"HL_TXNID\"", false, false);
 
       // execute all queries in the list in one batch
       List<Integer> affectedRowsByQuery = executeQueriesInBatch(stmt, queries, conf);
-      LOG.info("Removed aborted transactions: (" + txnids + ") from MIN_HISTORY_LEVEL");
       return getUpdateCount(numUpdateQueries, affectedRowsByQuery);
     } finally {
       closeStmt(stmt);
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 41d2e79..87130a5 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -360,16 +360,6 @@ public interface TxnStore extends Configurable {
   List<CompactionInfo> findReadyToClean() throws MetaException;
 
   /**
-   * Returns the smallest txnid that could be seen in open state across all active transactions in
-   * the system or {@code NEXT_TXN_ID.NTXN_NEXT} if there are no active transactions, i.e. the
-   * smallest txnid that can be seen as unresolved in the whole system.  Even if a transaction
-   * is opened concurrently with this call it cannot have an id less than what this method returns.
-   * @return transaction ID
-   */
-  @RetrySemantics.ReadOnly
-  long findMinOpenTxnId() throws MetaException;
-
-  /**
    * This will remove an entry from the queue after
    * it has been compacted.
    * 
@@ -517,4 +507,12 @@ public interface TxnStore extends Configurable {
    */
   @RetrySemantics.Idempotent
   void addWriteNotificationLog(AcidWriteEvent acidWriteEvent) throws MetaException;
+
+  /**
+   * Return the currently seen minimum open transaction ID.
+   * @return minimum transaction ID
+   * @throws MetaException
+   */
+  @RetrySemantics.Idempotent
+  long findMinOpenTxnIdForCleaner() throws MetaException;
 }
diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
index 05adbe9..1ace9d3 100644
--- a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
@@ -603,7 +603,8 @@ CREATE TABLE COMPACTION_QUEUE (
   CQ_HIGHEST_WRITE_ID bigint,
   CQ_META_INFO varchar(2048) for bit data,
   CQ_HADOOP_JOB_ID varchar(32),
-  CQ_ERROR_MESSAGE clob
+  CQ_ERROR_MESSAGE clob,
+  CQ_NEXT_TXN_ID bigint
 );
 
 CREATE TABLE NEXT_COMPACTION_QUEUE_ID (
@@ -667,14 +668,6 @@ CREATE TABLE NEXT_WRITE_ID (
 
 CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
 
-CREATE TABLE MIN_HISTORY_LEVEL (
-  MHL_TXNID bigint NOT NULL,
-  MHL_MIN_OPEN_TXNID bigint NOT NULL,
-  PRIMARY KEY(MHL_TXNID)
-);
-
-CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID);
-
 CREATE TABLE MATERIALIZATION_REBUILD_LOCKS (
   MRL_TXN_ID BIGINT NOT NULL,
   MRL_DB_NAME VARCHAR(128) NOT NULL,
diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
index 35a2e64..8a3cd56 100644
--- a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
@@ -64,5 +64,9 @@ ALTER TABLE "SCHEDULED_QUERIES" ADD "ACTIVE_EXECUTION_ID" bigint;
 -- HIVE-22995
 ALTER TABLE "APP"."DBS" ADD COLUMN "DB_MANAGED_LOCATION_URI" VARCHAR(4000);
 
+-- HIVE-23107
+ALTER TABLE COMPACTION_QUEUE ADD CQ_NEXT_TXN_ID bigint;
+DROP TABLE MIN_HISTORY_LEVEL;
+
 -- This needs to be the last thing done.  Insert any changes above this line.
 UPDATE "APP".VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
index f3c74bf..2e01777 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
@@ -1016,6 +1016,7 @@ CREATE TABLE COMPACTION_QUEUE(
     CQ_META_INFO varbinary(2048) NULL,
 	CQ_HADOOP_JOB_ID nvarchar(128) NULL,
 	CQ_ERROR_MESSAGE varchar(max) NULL,
+    CQ_NEXT_TXN_ID bigint NOT NULL,
 PRIMARY KEY CLUSTERED
 (
 	CQ_ID ASC
@@ -1200,17 +1201,6 @@ CREATE TABLE NEXT_WRITE_ID (
 
 CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
 
-CREATE TABLE MIN_HISTORY_LEVEL (
-  MHL_TXNID bigint NOT NULL,
-  MHL_MIN_OPEN_TXNID bigint NOT NULL,
-PRIMARY KEY CLUSTERED
-(
-    MHL_TXNID ASC
-)
-);
-
-CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID);
-
 CREATE TABLE MATERIALIZATION_REBUILD_LOCKS (
   MRL_TXN_ID bigint NOT NULL,
   MRL_DB_NAME nvarchar(128) NOT NULL,
diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
index 228bb7c..9f39515 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
@@ -67,6 +67,10 @@ INSERT INTO NOTIFICATION_SEQUENCE (NNI_ID, NEXT_EVENT_ID) SELECT 1,1 WHERE NOT E
 -- HIVE-22995
 ALTER TABLE DBS ADD DB_MANAGED_LOCATION_URI nvarchar(4000);
 
+-- HIVE-23107
+ALTER TABLE COMPACTION_QUEUE bigint CQ_NEXT_TXN_ID NOT NULL;
+DROP TABLE MIN_HISTORY_LEVEL;
+
 -- These lines need to be last.  Insert any changes above.
 UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE;
diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
index 626d888..0512a45 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
@@ -1072,7 +1072,8 @@ CREATE TABLE COMPACTION_QUEUE (
   CQ_HIGHEST_WRITE_ID bigint,
   CQ_META_INFO varbinary(2048),
   CQ_HADOOP_JOB_ID varchar(32),
-  CQ_ERROR_MESSAGE mediumtext
+  CQ_ERROR_MESSAGE mediumtext,
+  CQ_NEXT_TXN_ID bigint
 ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
 
 CREATE TABLE COMPLETED_COMPACTIONS (
@@ -1134,14 +1135,6 @@ CREATE TABLE NEXT_WRITE_ID (
 
 CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
 
-CREATE TABLE MIN_HISTORY_LEVEL (
-  MHL_TXNID bigint NOT NULL,
-  MHL_MIN_OPEN_TXNID bigint NOT NULL,
-  PRIMARY KEY(MHL_TXNID)
-) ENGINE=InnoDB DEFAULT CHARSET=latin1;
-
-CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID);
-
 CREATE TABLE MATERIALIZATION_REBUILD_LOCKS (
   MRL_TXN_ID bigint NOT NULL,
   MRL_DB_NAME VARCHAR(128) NOT NULL,
diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
index 35da7b5..4b82e36 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
@@ -68,6 +68,10 @@ ALTER TABLE SCHEDULED_QUERIES ADD COLUMN ACTIVE_EXECUTION_ID INTEGER ;
 -- HIVE-22995
 ALTER TABLE DBS ADD COLUMN DB_MANAGED_LOCATION_URI VARCHAR(4000) CHARACTER SET latin1 COLLATE latin1_bin;
 
+-- HIVE-23107
+ALTER TABLE COMPACTION_QUEUE ADD CQ_NEXT_TXN_ID bigint;
+DROP TABLE MIN_HISTORY_LEVEL;
+
 -- These lines need to be last.  Insert any changes above.
 UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE;
diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
index a25f4e4..db398e5 100644
--- a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
@@ -1053,7 +1053,8 @@ CREATE TABLE COMPACTION_QUEUE (
   CQ_HIGHEST_WRITE_ID NUMBER(19),
   CQ_META_INFO BLOB,
   CQ_HADOOP_JOB_ID varchar2(32),
-  CQ_ERROR_MESSAGE CLOB
+  CQ_ERROR_MESSAGE CLOB,
+  CQ_NEXT_TXN_ID NUMBER(19)
 ) ROWDEPENDENCIES;
 
 CREATE TABLE NEXT_COMPACTION_QUEUE_ID (
@@ -1115,14 +1116,6 @@ CREATE TABLE NEXT_WRITE_ID (
 
 CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE);
 
-CREATE TABLE MIN_HISTORY_LEVEL (
-  MHL_TXNID NUMBER(19) NOT NULL,
-  MHL_MIN_OPEN_TXNID NUMBER(19) NOT NULL,
-  PRIMARY KEY(MHL_TXNID)
-);
-
-CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID);
-
 CREATE TABLE MATERIALIZATION_REBUILD_LOCKS (
   MRL_TXN_ID NUMBER NOT NULL,
   MRL_DB_NAME VARCHAR(128) NOT NULL,
diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
index d462b4a..1be83fc 100644
--- a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
@@ -68,6 +68,10 @@ ALTER TABLE SCHEDULED_QUERIES ADD ACTIVE_EXECUTION_ID number(19);
 -- HIVE-22995
 ALTER TABLE DBS ADD DB_MANAGED_LOCATION_URI VARCHAR2(4000) NULL;
 
+-- HIVE-23107
+ALTER TABLE COMPACTION_QUEUE ADD CQ_NEXT_TXN_ID NUMBER(19);
+DROP TABLE MIN_HISTORY_LEVEL;
+
 -- These lines need to be last.  Insert any changes above.
 UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS Status from dual;
diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
index 2066340..e6e3016 100644
--- a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
@@ -1739,7 +1739,8 @@ CREATE TABLE "COMPACTION_QUEUE" (
   "CQ_HIGHEST_WRITE_ID" bigint,
   "CQ_META_INFO" bytea,
   "CQ_HADOOP_JOB_ID" varchar(32),
-  "CQ_ERROR_MESSAGE" text
+  "CQ_ERROR_MESSAGE" text,
+  "CQ_NEXT_TXN_ID" bigint
 );
 
 CREATE TABLE "NEXT_COMPACTION_QUEUE_ID" (
@@ -1801,14 +1802,6 @@ CREATE TABLE "NEXT_WRITE_ID" (
 
 CREATE UNIQUE INDEX "NEXT_WRITE_ID_IDX" ON "NEXT_WRITE_ID" ("NWI_DATABASE", "NWI_TABLE");
 
-CREATE TABLE "MIN_HISTORY_LEVEL" (
-  "MHL_TXNID" bigint NOT NULL,
-  "MHL_MIN_OPEN_TXNID" bigint NOT NULL,
-  PRIMARY KEY("MHL_TXNID")
-);
-
-CREATE INDEX "MIN_HISTORY_LEVEL_IDX" ON "MIN_HISTORY_LEVEL" ("MHL_MIN_OPEN_TXNID");
-
 CREATE TABLE "MATERIALIZATION_REBUILD_LOCKS" (
   "MRL_TXN_ID" bigint NOT NULL,
   "MRL_DB_NAME" varchar(128) NOT NULL,
diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
index a50a071..b90cecb 100644
--- a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
@@ -199,6 +199,10 @@ ALTER TABLE "SCHEDULED_QUERIES" ADD "ACTIVE_EXECUTION_ID" bigint;
 -- HIVE-22995
 ALTER TABLE "DBS" ADD "DB_MANAGED_LOCATION_URI" character varying(4000);
 
+-- HIVE-23107
+ALTER TABLE "COMPACTION_QUEUE" ADD "CQ_NEXT_TXN_ID" bigint;
+DROP TABLE "MIN_HISTORY_LEVEL";
+
 -- These lines need to be last. Insert any changes above.
 UPDATE "VERSION" SET "SCHEMA_VERSION"='4.0.0', "VERSION_COMMENT"='Hive release version 4.0.0' where "VER_ID"=1;
 SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0';