You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ve...@apache.org on 2023/04/24 07:22:53 UTC

[hive] branch master updated: HIVE-27020: Implement a separate handler to handle aborted transaction cleanup (Sourabh Badhya, reviewed by Laszlo Vegh, Denys Kuzmenko)

This is an automated email from the ASF dual-hosted git repository.

veghlaci05 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 45cafcaea79 HIVE-27020: Implement a separate handler to handle aborted transaction cleanup (Sourabh Badhya, reviewed by Laszlo Vegh, Denys Kuzmenko)
45cafcaea79 is described below

commit 45cafcaea7907c831a5e024726e65e04f9b32216
Author: Sourabh Badhya <42...@users.noreply.github.com>
AuthorDate: Mon Apr 24 12:52:41 2023 +0530

    HIVE-27020: Implement a separate handler to handle aborted transaction cleanup (Sourabh Badhya, reviewed by Laszlo Vegh, Denys Kuzmenko)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   4 +-
 .../hive/ql/txn/compactor/TestCompactor.java       | 115 +++++++-
 .../hive/ql/txn/compactor/TestCompactorBase.java   |   1 +
 ...pactorWithAbortCleanupUsingCompactionCycle.java |  31 ++
 .../hive/ql/txn/compactor/TestInitiator2.java      |   4 +-
 ...iator2WithAbortCleanupUsingCompactionCycle.java |  30 ++
 .../hadoop/hive/ql/txn/compactor/Cleaner.java      |  12 +-
 .../hadoop/hive/ql/txn/compactor/Initiator.java    |   5 +-
 .../ql/txn/compactor/MetaStoreCompactorThread.java |   2 +-
 .../txn/compactor/handler/AbortedTxnCleaner.java   | 158 ++++++++++
 .../txn/compactor/handler/CompactionCleaner.java   |  90 +-----
 .../hive/ql/txn/compactor/handler/TaskHandler.java |  80 +++++
 .../txn/compactor/handler/TaskHandlerFactory.java  |  12 +-
 .../metastore/txn/TestCompactionTxnHandler.java    |  10 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java    |  26 +-
 ...mands2WithAbortCleanupUsingCompactionCycle.java |  37 +++
 .../hive/ql/txn/compactor/CompactorTest.java       |  41 ++-
 .../TestAbortCleanupUsingCompactionCycle.java      |  30 ++
 ...pUsingCompactionCycleWithMinHistoryWriteId.java |  35 +++
 .../hadoop/hive/ql/txn/compactor/TestCleaner.java  |  19 +-
 .../hive/ql/txn/compactor/TestInitiator.java       |  69 +++--
 ...tiatorWithAbortCleanupUsingCompactionCycle.java |  28 ++
 .../compactor/handler/TestAbortedTxnCleaner.java   | 323 ++++++++++++++++++++
 .../hive/ql/txn/compactor/handler/TestHandler.java |   4 +-
 .../hadoop/hive/metastore/conf/MetastoreConf.java  |   4 +
 .../apache/hadoop/hive/metastore/HMSHandler.java   |   2 +-
 .../hadoop/hive/metastore/txn/CompactionInfo.java  |   1 -
 .../hive/metastore/txn/CompactionTxnHandler.java   | 328 +++++++++++++--------
 .../apache/hadoop/hive/metastore/txn/TxnStore.java |  18 +-
 .../apache/hadoop/hive/metastore/txn/TxnUtils.java |  47 ++-
 30 files changed, 1268 insertions(+), 298 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 7e6903a39d6..b6835928011 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3272,11 +3272,11 @@ public class HiveConf extends Configuration {
 
     HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD("hive.compactor.abortedtxn.threshold", 1000,
         "Number of aborted transactions involving a given table or partition that will trigger\n" +
-        "a major compaction."),
+        "a major compaction / cleanup of aborted directories."),
 
     HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD("hive.compactor.aborted.txn.time.threshold", "12h",
         new TimeValidator(TimeUnit.HOURS),
-        "Age of table/partition's oldest aborted transaction when compaction will be triggered. " +
+        "Age of table/partition's oldest aborted transaction when compaction / cleanup of aborted directories will be triggered. " +
         "Default time unit is: hours. Set to a negative number to disable."),
 
     HIVE_COMPACTOR_ACTIVE_DELTA_DIR_THRESHOLD("hive.compactor.active.delta.dir.threshold", 200,
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index d0f3ebd19d6..dead15e1799 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.CompactionRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
@@ -50,6 +52,11 @@ import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.Reader;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.txn.compactor.Cleaner;
+import org.apache.hadoop.hive.ql.txn.compactor.FSRemover;
+import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
+import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandler;
+import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandlerFactory;
 import org.apache.hive.streaming.HiveStreamingConnection;
 import org.apache.hive.streaming.StreamingConnection;
 import org.apache.hive.streaming.StreamingException;
@@ -75,7 +82,9 @@ import static org.apache.hadoop.hive.ql.TestTxnCommands2.runCleaner;
 import static org.apache.hadoop.hive.ql.TestTxnCommands2.runInitiator;
 import static org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.times;
@@ -1064,7 +1073,78 @@ public class TestCompactor extends TestCompactorBase {
     connection2.close();
   }
 
+  @Test
+  public void testAbortAfterMarkCleaned() throws Exception {
+    assumeTrue(MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER));
+    String dbName = "default";
+    String tableName = "cws";
+
+    String agentInfo = "UT_" + Thread.currentThread().getName();
+
+    executeStatementOnDriver("drop table if exists " + tableName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tableName + "(a STRING, b STRING) " + //currently ACID requires table to be bucketed
+            " STORED AS ORC  TBLPROPERTIES ('transactional'='true')", driver);
+    executeStatementOnDriver("insert into table " + tableName + " values ('1', '2'), ('3', '4') ", driver);
+    executeStatementOnDriver("insert into table " + tableName + " values ('1', '2'), ('3', '4') ", driver);
+
+
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+            .withFieldDelimiter(',')
+            .build();
+
+    // Create three folders with two different transactions
+    HiveStreamingConnection connection1 = HiveStreamingConnection.newBuilder()
+            .withDatabase(dbName)
+            .withTable(tableName)
+            .withAgentInfo(agentInfo)
+            .withHiveConf(conf)
+            .withRecordWriter(writer)
+            .withStreamingOptimizations(true)
+            .withTransactionBatchSize(1)
+            .connect();
+
+    HiveStreamingConnection connection2 = HiveStreamingConnection.newBuilder()
+            .withDatabase(dbName)
+            .withTable(tableName)
+            .withAgentInfo(agentInfo)
+            .withHiveConf(conf)
+            .withRecordWriter(writer)
+            .withStreamingOptimizations(true)
+            .withTransactionBatchSize(1)
+            .connect();
+
+    // Abort a transaction which writes data.
+    connection1.beginTransaction();
+    connection1.write("1,1".getBytes());
+    connection1.write("2,1".getBytes());
+    connection1.abortTransaction();
+
+    // Open a txn which is opened and long running.
+    connection2.beginTransaction();
+    connection2.write("3,1".getBytes());
+
+    Cleaner cleaner = new Cleaner();
+    TxnStore mockedTxnHandler = Mockito.spy(TxnUtils.getTxnStore(conf));
+    doAnswer(invocationOnMock -> {
+      connection2.abortTransaction();
+      return invocationOnMock.callRealMethod();
+    }).when(mockedTxnHandler).markCleaned(any(), eq(false));
+
+    MetadataCache metadataCache = new MetadataCache(false);
+    FSRemover fsRemover = new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache);
+    cleaner.setConf(conf);
+    List<TaskHandler> cleanupHandlers = TaskHandlerFactory.getInstance()
+            .getHandlers(conf, mockedTxnHandler, metadataCache, false, fsRemover);
+    cleaner.init(new AtomicBoolean(true));
+    cleaner.setCleanupHandlers(cleanupHandlers);
+    cleaner.run();
+
+    int count = TestTxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS");
+    Assert.assertEquals(TestTxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 1, count);
+  }
+
   private void assertAndCompactCleanAbort(String dbName, String tblName, boolean partialAbort, boolean singleSession) throws Exception {
+    boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
     IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
     TxnStore txnHandler = TxnUtils.getTxnStore(conf);
     Table table = msClient.getTable(dbName, tblName);
@@ -1083,14 +1163,17 @@ public class TestCompactor extends TestCompactorBase {
     count = TestTxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE");
     // Only one job is added to the queue per table. This job corresponds to all the entries for a particular table
     // with rows in TXN_COMPONENTS
-    Assert.assertEquals(TestTxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 1, count);
+    Assert.assertEquals(TestTxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"),
+            useCleanerForAbortCleanup ? 0 : 1, count);
     runWorker(conf);
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    Assert.assertEquals(1, rsp.getCompacts().size());
-    Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState());
-    Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename());
-    Assert.assertEquals(CompactionType.MINOR, rsp.getCompacts().get(0).getType());
+    Assert.assertEquals(useCleanerForAbortCleanup ? 0 : 1, rsp.getCompacts().size());
+    if (!useCleanerForAbortCleanup) {
+      Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState());
+      Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename());
+      Assert.assertEquals(CompactionType.MINOR, rsp.getCompacts().get(0).getType());
+    }
 
     runCleaner(conf);
 
@@ -1108,10 +1191,12 @@ public class TestCompactor extends TestCompactorBase {
     }
 
     rsp = txnHandler.showCompact(new ShowCompactRequest());
-    Assert.assertEquals(1, rsp.getCompacts().size());
-    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState());
-    Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename());
-    Assert.assertEquals(CompactionType.MINOR, rsp.getCompacts().get(0).getType());
+    Assert.assertEquals(useCleanerForAbortCleanup ? 0 : 1, rsp.getCompacts().size());
+    if (!useCleanerForAbortCleanup) {
+      Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState());
+      Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename());
+      Assert.assertEquals(CompactionType.MINOR, rsp.getCompacts().get(0).getType());
+    }
   }
 
   @Test
@@ -1119,6 +1204,7 @@ public class TestCompactor extends TestCompactorBase {
     String dbName = "default";
     String tblName1 = "cws1";
     String tblName2 = "cws2";
+    boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
 
     HiveStreamingConnection connection1 = prepareTableAndConnection(dbName, tblName1, 1);
     HiveStreamingConnection connection2 = prepareTableAndConnection(dbName, tblName2, 1);
@@ -1155,7 +1241,8 @@ public class TestCompactor extends TestCompactorBase {
     count = TestTxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE");
     // Only one job is added to the queue per table. This job corresponds to all the entries for a particular table
     // with rows in TXN_COMPONENTS
-    Assert.assertEquals(TestTxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 2, count);
+    Assert.assertEquals(TestTxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"),
+            useCleanerForAbortCleanup ? 0 : 2, count);
 
     runWorker(conf);
     runWorker(conf);
@@ -1203,6 +1290,7 @@ public class TestCompactor extends TestCompactorBase {
 
   @Test
   public void testCleanDynPartAbortNoDataLoss() throws Exception {
+    boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
     String dbName = "default";
     String tblName = "cws";
 
@@ -1226,7 +1314,8 @@ public class TestCompactor extends TestCompactorBase {
     runInitiator(conf);
 
     int count = TestTxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE");
-    Assert.assertEquals(TestTxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 4, count);
+    Assert.assertEquals(TestTxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"),
+            useCleanerForAbortCleanup ? 3 : 4, count);
 
     runWorker(conf);
     runWorker(conf);
@@ -1258,6 +1347,7 @@ public class TestCompactor extends TestCompactorBase {
   public void testCleanAbortAndMinorCompact() throws Exception {
     String dbName = "default";
     String tblName = "cws";
+    boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
 
     HiveStreamingConnection connection = prepareTableAndConnection(dbName, tblName, 1);
 
@@ -1273,7 +1363,8 @@ public class TestCompactor extends TestCompactorBase {
     runInitiator(conf);
 
     int count = TestTxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE");
-    Assert.assertEquals(TestTxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 2, count);
+    Assert.assertEquals(TestTxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"),
+            useCleanerForAbortCleanup ? 1 : 2, count);
 
     runWorker(conf);
     runWorker(conf);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorBase.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorBase.java
index 8b6e57c8d0f..a57a817e161 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorBase.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorBase.java
@@ -89,6 +89,7 @@ class TestCompactorBase {
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES, false);
     MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
     MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true);
+    MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, true);
 
     TestTxnDbUtil.setConfValues(hiveConf);
     TestTxnDbUtil.cleanDb(hiveConf);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorWithAbortCleanupUsingCompactionCycle.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorWithAbortCleanupUsingCompactionCycle.java
new file mode 100644
index 00000000000..beb80d18e6c
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorWithAbortCleanupUsingCompactionCycle.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.ql.txn.compactor.TestCompactor;
+import org.junit.Before;
+
+public class TestCompactorWithAbortCleanupUsingCompactionCycle extends TestCompactor {
+  @Override
+  @Before
+  public void setup() throws Exception {
+    super.setup();
+    MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, false);
+  }
+}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator2.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator2.java
index 18fd05b9fde..84fc4cfdaf6 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator2.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.txn.compactor;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -62,6 +63,7 @@ public class TestInitiator2 extends CompactorTest {
 
   @Test
   public void dbNoAutoCompactSetFalseUpperCase() throws Exception {
+    boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
     String dbName = "test1";
     Map<String, String> params = new HashMap<String, String>(1);
     params.put("NO_AUTO_COMPACTION", "false");
@@ -89,7 +91,7 @@ public class TestInitiator2 extends CompactorTest {
     startInitiator();
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    Assert.assertEquals(1, rsp.getCompactsSize());
+    Assert.assertEquals(useCleanerForAbortCleanup ? 0 : 1, rsp.getCompactsSize());
   }
 
   @Override
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator2WithAbortCleanupUsingCompactionCycle.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator2WithAbortCleanupUsingCompactionCycle.java
new file mode 100644
index 00000000000..9a47ba59464
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator2WithAbortCleanupUsingCompactionCycle.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.ql.txn.compactor.TestInitiator2;
+
+public class TestInitiator2WithAbortCleanupUsingCompactionCycle extends TestInitiator2 {
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, false);
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 2325c1527d7..ce822effe7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hive.ql.txn.compactor;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.collections4.CollectionUtils;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
@@ -61,12 +60,9 @@ public class Cleaner extends MetaStoreCompactorThread {
     cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(
             conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_THREADS_NUM),
             COMPACTOR_CLEANER_THREAD_NAME_FORMAT);
-    if (CollectionUtils.isEmpty(cleanupHandlers)) {
-      FSRemover fsRemover = new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache);
-      cleanupHandlers = TaskHandlerFactory.getInstance()
-              .getHandlers(conf, txnHandler, metadataCache,
-                      metricsEnabled, fsRemover);
-    }
+    FSRemover fsRemover = new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache);
+    cleanupHandlers = TaskHandlerFactory.getInstance()
+            .getHandlers(conf, txnHandler, metadataCache, metricsEnabled, fsRemover);
   }
 
   @Override
@@ -150,7 +146,7 @@ public class Cleaner extends MetaStoreCompactorThread {
   }
 
   @Override
-  public boolean isCacheEnabled() {
+  protected boolean isCacheEnabled() {
     return MetastoreConf.getBoolVar(conf,
             MetastoreConf.ConfVars.COMPACTOR_CLEANER_TABLECACHE_ON);
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 9398258a64b..a86c18baad9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -157,8 +157,7 @@ public class Initiator extends MetaStoreCompactorThread {
               .parallelStream()
               .filter(ci -> isEligibleForCompaction(ci, currentCompactions, skipDBs, skipTables))
               .collect(Collectors.toSet())).get();
-          LOG.debug("Found " + potentials.size() + " potential compactions, " +
-              "checking to see if we should compact any of them");
+          LOG.debug("Found {} potential compactions, checking to see if we should compact any of them", potentials.size());
 
           checkInterrupt();
 
@@ -246,7 +245,7 @@ public class Initiator extends MetaStoreCompactorThread {
   }
 
   @Override
-  public boolean isCacheEnabled() {
+  protected boolean isCacheEnabled() {
     return MetastoreConf.getBoolVar(conf,
             MetastoreConf.ConfVars.COMPACTOR_INITIATOR_TABLECACHE_ON);
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
index 7cda085ff6b..39d0e10f589 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
@@ -88,7 +88,7 @@ public abstract class MetaStoreCompactorThread extends CompactorThread implement
     return CompactorUtil.getPartitionsByNames(conf, ci.dbname, ci.tableName, ci.partName);
   }
 
-  public abstract boolean isCacheEnabled();
+  protected abstract boolean isCacheEnabled();
 
   protected void startCycleUpdater(long updateInterval, Runnable taskToRun) {
     if (cycleUpdaterExecutorService == null) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java
new file mode 100644
index 00000000000..06dc02942d9
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor.handler;
+
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
+import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.ThrowingRunnable;
+import org.apache.hadoop.hive.ql.txn.compactor.FSRemover;
+import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.isNull;
+
+/**
+ * Abort-cleanup based implementation of TaskHandler.
+ * Provides implementation of creation of abort clean tasks.
+ */
+class AbortedTxnCleaner extends TaskHandler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbortedTxnCleaner.class.getName());
+
+  public AbortedTxnCleaner(HiveConf conf, TxnStore txnHandler,
+                           MetadataCache metadataCache, boolean metricsEnabled,
+                           FSRemover fsRemover) {
+    super(conf, txnHandler, metadataCache, metricsEnabled, fsRemover);
+  }
+
+  /**
+   The following cleanup is based on the following idea - <br>
+   1. Aborted cleanup is independent of compaction. This is because directories which are written by
+      aborted txns are not visible by any open txns. It is only visible while determining the AcidState (which
+      only sees the aborted deltas and does not read the file).<br><br>
+
+   The following algorithm is used to clean the set of aborted directories - <br>
+      a. Find the list of entries which are suitable for cleanup (This is done in {@link TxnStore#findReadyToCleanAborts(long, int)}).<br>
+      b. If the table/partition does not exist, then remove the associated aborted entry in TXN_COMPONENTS table. <br>
+      c. Get the AcidState of the table by using the min open txnID, database name, tableName, partition name, highest write ID <br>
+      d. Fetch the aborted directories and delete the directories. <br>
+      e. Fetch the aborted write IDs from the AcidState and use it to delete the associated metadata in the TXN_COMPONENTS table.
+   **/
+  @Override
+  public List<Runnable> getTasks() throws MetaException {
+    int abortedThreshold = HiveConf.getIntVar(conf,
+              HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD);
+    long abortedTimeThreshold = HiveConf
+              .getTimeVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD,
+                      TimeUnit.MILLISECONDS);
+    List<CompactionInfo> readyToCleanAborts = txnHandler.findReadyToCleanAborts(abortedTimeThreshold, abortedThreshold);
+
+    if (!readyToCleanAborts.isEmpty()) {
+      return readyToCleanAborts.stream().map(ci -> ThrowingRunnable.unchecked(() ->
+                      clean(ci, ci.txnId > 0 ? ci.txnId : Long.MAX_VALUE, metricsEnabled)))
+              .collect(Collectors.toList());
+    }
+    return Collections.emptyList();
+  }
+
+  private void clean(CompactionInfo info, long minOpenWriteTxn, boolean metricsEnabled) throws MetaException, InterruptedException {
+    LOG.info("Starting cleaning for {}", info);
+    PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
+    String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_";
+    try {
+      if (metricsEnabled) {
+        perfLogger.perfLogBegin(AbortedTxnCleaner.class.getName(), cleanerMetric);
+      }
+      Partition p = null;
+      Table t = metadataCache.computeIfAbsent(info.getFullTableName(), () -> resolveTable(info.dbname, info.tableName));
+      if (isNull(t)) {
+        // The table was dropped before we got around to cleaning it.
+        LOG.info("Unable to find table {}, assuming it was dropped.", info.getFullTableName());
+        txnHandler.markCleaned(info, true);
+        return;
+      }
+      if (!isNull(info.partName)) {
+        p = resolvePartition(info.dbname, info.tableName, info.partName);
+        if (isNull(p)) {
+          // The partition was dropped before we got around to cleaning it.
+          LOG.info("Unable to find partition {}, assuming it was dropped.",
+                  info.getFullPartitionName());
+          txnHandler.markCleaned(info, true);
+          return;
+        }
+      }
+
+      String location = CompactorUtil.resolveStorageDescriptor(t,p).getLocation();
+      info.runAs = TxnUtils.findUserToRunAs(location, t, conf);
+      abortCleanUsingAcidDir(info, location, minOpenWriteTxn);
+
+    } catch (InterruptedException e) {
+      throw e;
+    } catch (Exception e) {
+      LOG.error("Caught exception when cleaning, unable to complete cleaning of {} due to {}", info,
+              e.getMessage());
+      throw new MetaException(e.getMessage());
+    } finally {
+      if (metricsEnabled) {
+        perfLogger.perfLogEnd(AbortedTxnCleaner.class.getName(), cleanerMetric);
+      }
+    }
+  }
+
+  private void abortCleanUsingAcidDir(CompactionInfo info, String location, long minOpenWriteTxn) throws Exception {
+    ValidTxnList validTxnList =
+            TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenWriteTxn, true);
+    //save it so that getAcidState() sees it
+    conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+
+    ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(info, validTxnList);
+
+    // Set the highestWriteId of the cleanup equal to the min(minOpenWriteId - 1, highWatermark).
+    // This is necessary for looking at the complete state of the table till the min open write Id
+    // (if there is an open txn on the table) or the highestWatermark.
+    // This is used later on while deleting the records in TXN_COMPONENTS table.
+    info.highestWriteId = Math.min(isNull(validWriteIdList.getMinOpenWriteId()) ?
+            Long.MAX_VALUE : validWriteIdList.getMinOpenWriteId() - 1, validWriteIdList.getHighWatermark());
+    Table table = metadataCache.computeIfAbsent(info.getFullTableName(), () -> resolveTable(info.dbname, info.tableName));
+    LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
+
+    boolean success = cleanAndVerifyObsoleteDirectories(info, location, validWriteIdList, table);
+    if (success || CompactorUtil.isDynPartAbort(table, info.partName)) {
+      txnHandler.markCleaned(info, false);
+    } else {
+      LOG.warn("Leaving aborted entry {} in TXN_COMPONENTS table.", info);
+    }
+
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
index a352dca4dc5..453a1616c33 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
@@ -17,17 +17,13 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor.handler;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.LockComponentBuilder;
 import org.apache.hadoop.hive.metastore.LockRequestBuilder;
 import org.apache.hadoop.hive.metastore.api.DataOperationType;
-import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
-import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
 import org.apache.hadoop.hive.metastore.api.LockRequest;
 import org.apache.hadoop.hive.metastore.api.LockResponse;
 import org.apache.hadoop.hive.metastore.api.LockType;
@@ -39,35 +35,27 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
 import org.apache.hadoop.hive.metastore.api.TxnOpenException;
 import org.apache.hadoop.hive.metastore.api.UnlockRequest;
-import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
 import org.apache.hadoop.hive.metastore.metrics.Metrics;
 import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
 import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
-import org.apache.hadoop.hive.ql.io.AcidDirectory;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.txn.compactor.CleanupRequest;
 import org.apache.hadoop.hive.ql.txn.compactor.CleanupRequest.CleanupRequestBuilder;
 import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
 import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.ThrowingRunnable;
 import org.apache.hadoop.hive.ql.txn.compactor.FSRemover;
 import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
-import org.apache.hive.common.util.Ref;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.BitSet;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import static org.apache.commons.collections4.ListUtils.subtract;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
 import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS;
@@ -77,7 +65,7 @@ import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar;
 import static java.util.Objects.isNull;
 
 /**
- * A compaction based implementation of RequestHandler.
+ * A compaction based implementation of TaskHandler.
  * Provides implementation of creation of compaction clean tasks.
  */
 class CompactionCleaner extends TaskHandler {
@@ -114,7 +102,7 @@ class CompactionCleaner extends TaskHandler {
     return Collections.emptyList();
   }
 
-  private void clean(CompactionInfo ci, long minOpenTxnGLB, boolean metricsEnabled) throws MetaException {
+  private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) throws MetaException {
     LOG.info("Starting cleaning for {}", ci);
     PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
     String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_" +
@@ -134,13 +122,13 @@ class CompactionCleaner extends TaskHandler {
           // The table was dropped before we got around to cleaning it.
           LOG.info("Unable to find table {}, assuming it was dropped. {}", ci.getFullTableName(),
                   idWatermark(ci));
-          txnHandler.markCleaned(ci);
+          txnHandler.markCleaned(ci, false);
           return;
         }
         if (MetaStoreUtils.isNoCleanUpSet(t.getParameters())) {
           // The table was marked no clean up true.
           LOG.info("Skipping table {} clean up, as NO_CLEANUP set to true", ci.getFullTableName());
-          txnHandler.markCleaned(ci);
+          txnHandler.markRefused(ci);
           return;
         }
         if (!isNull(ci.partName)) {
@@ -149,13 +137,13 @@ class CompactionCleaner extends TaskHandler {
             // The partition was dropped before we got around to cleaning it.
             LOG.info("Unable to find partition {}, assuming it was dropped. {}",
                     ci.getFullPartitionName(), idWatermark(ci));
-            txnHandler.markCleaned(ci);
+            txnHandler.markCleaned(ci, false);
             return;
           }
           if (MetaStoreUtils.isNoCleanUpSet(p.getParameters())) {
             // The partition was marked no clean up true.
             LOG.info("Skipping partition {} clean up, as NO_CLEANUP set to true", ci.getFullPartitionName());
-            txnHandler.markCleaned(ci);
+            txnHandler.markRefused(ci);
             return;
           }
         }
@@ -172,7 +160,7 @@ class CompactionCleaner extends TaskHandler {
         if (dropPartition && isNull(resolvePartition(ci.dbname, ci.tableName, ci.partName))) {
           cleanUsingLocation(ci, path, true);
         } else {
-          cleanUsingAcidDir(ci, path, minOpenTxnGLB);
+          cleanUsingAcidDir(ci, path, minOpenTxn);
         }
       } else {
         cleanUsingLocation(ci, location, false);
@@ -216,15 +204,15 @@ class CompactionCleaner extends TaskHandler {
       deleted = fsRemover.clean(getCleaningRequestBasedOnLocation(ci, path));
     }
     if (!deleted.isEmpty()) {
-      txnHandler.markCleaned(ci);
+      txnHandler.markCleaned(ci, false);
     } else {
       txnHandler.clearCleanerStart(ci);
     }
   }
 
-  private void cleanUsingAcidDir(CompactionInfo ci, String location, long minOpenTxnGLB) throws Exception {
+  private void cleanUsingAcidDir(CompactionInfo ci, String location, long minOpenTxn) throws Exception {
     ValidTxnList validTxnList =
-            TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenTxnGLB);
+            TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenTxn, false);
     //save it so that getAcidState() sees it
     conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
     /*
@@ -260,50 +248,12 @@ class CompactionCleaner extends TaskHandler {
 
     // Creating 'reader' list since we are interested in the set of 'obsolete' files
     ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(ci, validTxnList);
-    LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
-
-    Path path = new Path(location);
-    FileSystem fs = path.getFileSystem(conf);
-
-    // Collect all the files/dirs
-    Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots = AcidUtils.getHdfsDirSnapshotsForCleaner(fs, path);
-    AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf, validWriteIdList, Ref.from(false), false,
-            dirSnapshots);
     Table table = metadataCache.computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci.dbname, ci.tableName));
-    boolean isDynPartAbort = CompactorUtil.isDynPartAbort(table, ci.partName);
-
-    List<Path> obsoleteDirs = CompactorUtil.getObsoleteDirs(dir, isDynPartAbort);
-    if (isDynPartAbort || dir.hasUncompactedAborts()) {
-      ci.setWriteIds(dir.hasUncompactedAborts(), dir.getAbortedWriteIds());
-    }
-
-    List<Path> deleted = fsRemover.clean(new CleanupRequestBuilder().setLocation(location)
-            .setDbName(ci.dbname).setFullPartitionName(ci.getFullPartitionName())
-            .setRunAs(ci.runAs).setObsoleteDirs(obsoleteDirs).setPurge(true)
-            .build());
-
-    if (!deleted.isEmpty()) {
-      AcidMetricService.updateMetricsFromCleaner(ci.dbname, ci.tableName, ci.partName, dir.getObsolete(), conf,
-              txnHandler);
-    }
-
-    // Make sure there are no leftovers below the compacted watermark
-    boolean success = false;
-    conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString());
-    dir = AcidUtils.getAcidState(fs, path, conf, new ValidReaderWriteIdList(
-                    ci.getFullTableName(), new long[0], new BitSet(), ci.highestWriteId, Long.MAX_VALUE),
-            Ref.from(false), false, dirSnapshots);
+    LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
 
-    List<Path> remained = subtract(CompactorUtil.getObsoleteDirs(dir, isDynPartAbort), deleted);
-    if (!remained.isEmpty()) {
-      LOG.warn("{} Remained {} obsolete directories from {}. {}",
-              idWatermark(ci), remained.size(), location, CompactorUtil.getDebugInfo(remained));
-    } else {
-      LOG.debug("{} All cleared below the watermark: {} from {}", idWatermark(ci), ci.highestWriteId, location);
-      success = true;
-    }
+    boolean success = cleanAndVerifyObsoleteDirectories(ci, location, validWriteIdList, table);
     if (success || CompactorUtil.isDynPartAbort(table, ci.partName)) {
-      txnHandler.markCleaned(ci);
+      txnHandler.markCleaned(ci, false);
     } else {
       txnHandler.clearCleanerStart(ci);
       LOG.warn("No files were removed. Leaving queue entry {} in ready for cleaning state.", ci);
@@ -337,18 +287,10 @@ class CompactionCleaner extends TaskHandler {
     return " id=" + ci.id;
   }
 
-  private ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo ci, ValidTxnList validTxnList)
+  @Override
+  protected ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo ci, ValidTxnList validTxnList)
           throws NoSuchTxnException, MetaException {
-    List<String> tblNames = Collections.singletonList(AcidUtils.getFullTableName(ci.dbname, ci.tableName));
-    GetValidWriteIdsRequest request = new GetValidWriteIdsRequest(tblNames);
-    request.setValidTxnList(validTxnList.writeToString());
-    GetValidWriteIdsResponse rsp = txnHandler.getValidWriteIds(request);
-    // we could have no write IDs for a table if it was never written to but
-    // since we are in the Cleaner phase of compactions, there must have
-    // been some delta/base dirs
-    assert rsp != null && rsp.getTblValidWriteIdsSize() == 1;
-    ValidReaderWriteIdList validWriteIdList =
-            TxnCommonUtils.createValidReaderWriteIdList(rsp.getTblValidWriteIds().get(0));
+    ValidReaderWriteIdList validWriteIdList = super.getValidCleanerWriteIdList(ci, validTxnList);
     /*
      * We need to filter the obsoletes dir list, to only remove directories that were made obsolete by this compaction
      * If we have a higher retentionTime it is possible for a second compaction to run on the same partition. Cleaning up the first compaction
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java
index 327b9238486..ef95a100c1a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java
@@ -17,19 +17,40 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor.handler;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.txn.compactor.CleanupRequest;
 import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
 import org.apache.hadoop.hive.ql.txn.compactor.FSRemover;
 import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
+import org.apache.hive.common.util.Ref;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+
+import static org.apache.commons.collections.ListUtils.subtract;
 
 /**
  * An abstract class which defines the list of utility methods for performing cleanup activities.
@@ -81,4 +102,63 @@ public abstract class TaskHandler {
       return null;
     }
   }
+
+  protected ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo info, ValidTxnList validTxnList)
+          throws NoSuchTxnException, MetaException {
+    List<String> tblNames = Collections.singletonList(AcidUtils.getFullTableName(info.dbname, info.tableName));
+    GetValidWriteIdsRequest request = new GetValidWriteIdsRequest(tblNames);
+    request.setValidTxnList(validTxnList.writeToString());
+    GetValidWriteIdsResponse rsp = txnHandler.getValidWriteIds(request);
+    // we could have no write IDs for a table if it was never written to but
+    // since we are in the Cleaner phase of compactions, there must have
+    // been some delta/base dirs
+    assert rsp != null && rsp.getTblValidWriteIdsSize() == 1;
+
+    return TxnCommonUtils.createValidReaderWriteIdList(rsp.getTblValidWriteIds().get(0));
+  }
+
+  protected boolean cleanAndVerifyObsoleteDirectories(CompactionInfo info, String location,
+                                                      ValidReaderWriteIdList validWriteIdList, Table table) throws MetaException, IOException {
+    Path path = new Path(location);
+    FileSystem fs = path.getFileSystem(conf);
+
+    // Collect all the files/dirs
+    Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots = AcidUtils.getHdfsDirSnapshotsForCleaner(fs, path);
+    AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf, validWriteIdList, Ref.from(false), false,
+            dirSnapshots);
+    boolean isDynPartAbort = CompactorUtil.isDynPartAbort(table, info.partName);
+
+    List<Path> obsoleteDirs = CompactorUtil.getObsoleteDirs(dir, isDynPartAbort);
+    if (isDynPartAbort || dir.hasUncompactedAborts()) {
+      info.setWriteIds(dir.hasUncompactedAborts(), dir.getAbortedWriteIds());
+    }
+
+    List<Path> deleted = fsRemover.clean(new CleanupRequest.CleanupRequestBuilder().setLocation(location)
+            .setDbName(info.dbname).setFullPartitionName(info.getFullPartitionName())
+            .setRunAs(info.runAs).setObsoleteDirs(obsoleteDirs).setPurge(true)
+            .build());
+
+    if (!deleted.isEmpty()) {
+      AcidMetricService.updateMetricsFromCleaner(info.dbname, info.tableName, info.partName, dir.getObsolete(), conf,
+              txnHandler);
+    }
+
+    // Make sure there are no leftovers below the compacted watermark
+    boolean success = false;
+    conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString());
+    dir = AcidUtils.getAcidState(fs, path, conf, new ValidReaderWriteIdList(
+                    info.getFullTableName(), new long[0], new BitSet(), info.highestWriteId, Long.MAX_VALUE),
+            Ref.from(false), false, dirSnapshots);
+
+    List<Path> remained = subtract(CompactorUtil.getObsoleteDirs(dir, isDynPartAbort), deleted);
+    if (!remained.isEmpty()) {
+      LOG.warn("Remained {} obsolete directories from {}. {}",
+              remained.size(), location, CompactorUtil.getDebugInfo(remained));
+    } else {
+      LOG.debug("All cleared below the watermark: {} from {}", info.highestWriteId, location);
+      success = true;
+    }
+
+    return success;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandlerFactory.java
index 79293a22c26..57a4dc625c8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandlerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandlerFactory.java
@@ -18,11 +18,12 @@
 package org.apache.hadoop.hive.ql.txn.compactor.handler;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.ql.txn.compactor.FSRemover;
 import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
 
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -43,7 +44,14 @@ public class TaskHandlerFactory {
 
   public List<TaskHandler> getHandlers(HiveConf conf, TxnStore txnHandler, MetadataCache metadataCache,
                                                   boolean metricsEnabled, FSRemover fsRemover) {
-    return Arrays.asList(new CompactionCleaner(conf, txnHandler, metadataCache,
+    boolean useAbortHandler = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
+    List<TaskHandler> taskHandlers = new ArrayList<>();
+    if (useAbortHandler) {
+      taskHandlers.add(new AbortedTxnCleaner(conf, txnHandler, metadataCache,
+              metricsEnabled, fsRemover));
+    }
+    taskHandlers.add(new CompactionCleaner(conf, txnHandler, metadataCache,
             metricsEnabled, fsRemover));
+    return taskHandlers;
   }
 }
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index 5b334838ac8..03a7326710e 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -238,7 +238,7 @@ public class TestCompactionTxnHandler {
     List<CompactionInfo> toClean = txnHandler.findReadyToClean(0, 0);
     assertEquals(1, toClean.size());
     assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)));
-    txnHandler.markCleaned(ci);
+    txnHandler.markCleaned(ci, false);
     assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)));
     assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
 
@@ -529,7 +529,7 @@ public class TestCompactionTxnHandler {
     txnHandler.compact(rqst);
     ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
     assertNotNull(ci);
-    txnHandler.markCleaned(ci);
+    txnHandler.markCleaned(ci, false);
   }
 
   private void addWaitingForCleaningCompaction(String dbName, String tableName, CompactionType type,
@@ -866,7 +866,7 @@ public class TestCompactionTxnHandler {
     Thread.sleep(txnHandler.getOpenTxnTimeOutMillis());
     List<CompactionInfo> toClean = txnHandler.findReadyToClean(0, 0);
     assertEquals(1, toClean.size());
-    txnHandler.markCleaned(ci);
+    txnHandler.markCleaned(ci, false);
 
     // Check that we are cleaning up the empty aborted transactions
     GetOpenTxnsResponse txnList = txnHandler.getOpenTxns();
@@ -892,7 +892,7 @@ public class TestCompactionTxnHandler {
 
     toClean = txnHandler.findReadyToClean(0, 0);
     assertEquals(1, toClean.size());
-    txnHandler.markCleaned(ci);
+    txnHandler.markCleaned(ci, false);
 
     txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost"));
     // The open txn will became the low water mark
@@ -999,7 +999,7 @@ public class TestCompactionTxnHandler {
     txnHandler.markCompacted(ci);
     checkEnqueueTime(enqueueTime);
 
-    txnHandler.markCleaned(ci);
+    txnHandler.markCleaned(ci, false);
     checkEnqueueTime(enqueueTime);
   }
 
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 2a9ae73ca26..566b1251427 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -156,6 +156,8 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
     //TestTxnCommands2WithSplitUpdateAndVectorization has the vectorized version
     //of these tests.
     HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
+    //TestTxnCommands2WithAbortCleanupUsingCompactionCycle has the tests with abort cleanup in compaction cycle
+    MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, true);
     HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES, false);
     HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_TRUNCATE_USE_BASE, false);
   }
@@ -2620,6 +2622,7 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
 
   @Test
   public void testDynPartInsertWithAborts() throws Exception {
+    boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
     int[][] resultData = new int[][]{{1, 1}, {2, 2}};
     runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1')");
     verifyDeltaDirAndResult(1, Table.ACIDTBLPART.toString(), "p=p1", resultData);
@@ -2641,7 +2644,8 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
     count = TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPACTION_QUEUE");
     // Only one job is added to the queue per table. This job corresponds to all the entries for a particular table
     // with rows in TXN_COMPONENTS
-    Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from COMPACTION_QUEUE"), 1, count);
+    Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from COMPACTION_QUEUE"),
+            useCleanerForAbortCleanup ? 0 : 1, count);
 
     runWorker(hiveConf);
     runCleaner(hiveConf);
@@ -2651,6 +2655,7 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
 
   @Test
   public void testDynPartInsertWithMultiPartitionAborts() throws Exception {
+    boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
     int [][] resultData = new int[][] {{1,1}, {2,2}};
     runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1')");
     runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p2'),(2,2,'p2')");
@@ -2680,7 +2685,8 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
     count = TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPACTION_QUEUE");
     // Only one job is added to the queue per table. This job corresponds to all the entries for a particular table
     // with rows in TXN_COMPONENTS
-    Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from COMPACTION_QUEUE"), 1, count);
+    Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from COMPACTION_QUEUE"),
+            useCleanerForAbortCleanup ? 0 : 1, count);
 
     r1 = runStatementOnDriver("select count(*) from " + Table.ACIDTBLPART);
     Assert.assertEquals("4", r1.get(0));
@@ -2696,6 +2702,7 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
 
   @Test
   public void testDynPartIOWWithAborts() throws Exception {
+    boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
     int [][] resultData = new int[][] {{1,1}, {2,2}};
     runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1')");
     verifyDeltaDirAndResult(1, Table.ACIDTBLPART.toString(), "p=p1", resultData);
@@ -2718,7 +2725,8 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
     count = TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPACTION_QUEUE");
     // Only one job is added to the queue per table. This job corresponds to all the entries for a particular table
     // with rows in TXN_COMPONENTS
-    Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from COMPACTION_QUEUE"), 1, count);
+    Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from COMPACTION_QUEUE"),
+            useCleanerForAbortCleanup ? 0 : 1, count);
 
     runWorker(hiveConf);
     runCleaner(hiveConf);
@@ -2729,6 +2737,7 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
 
   @Test
   public void testDynPartIOWWithMultiPartitionAborts() throws Exception {
+    boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
     int [][] resultData = new int[][] {{1,1}, {2,2}};
     runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1')");
     runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p2'),(2,2,'p2')");
@@ -2760,7 +2769,8 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
     count = TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPACTION_QUEUE");
     // Only one job is added to the queue per table. This job corresponds to all the entries for a particular table
     // with rows in TXN_COMPONENTS
-    Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from COMPACTION_QUEUE"), 1, count);
+    Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from COMPACTION_QUEUE"),
+            useCleanerForAbortCleanup ? 0 : 1, count);
 
     r1 = runStatementOnDriver("select count(*) from " + Table.ACIDTBLPART);
     Assert.assertEquals("4", r1.get(0));
@@ -2778,6 +2788,7 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
 
   @Test
   public void testDynPartUpdateWithAborts() throws Exception {
+    boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
     int[][] resultData1 = new int[][]{{1, 2}, {3, 4}};
     runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values (1,2,'p1')");
     runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values (3,4,'p1')");
@@ -2801,7 +2812,8 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
     count = TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPACTION_QUEUE");
     // Only one job is added to the queue per table. This job corresponds to all the entries for a particular table
     // with rows in TXN_COMPONENTS
-    Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from COMPACTION_QUEUE"), 1, count);
+    Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from COMPACTION_QUEUE"),
+            useCleanerForAbortCleanup ? 0 : 1, count);
 
     runWorker(hiveConf);
     runCleaner(hiveConf);
@@ -2812,6 +2824,7 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
 
   @Test
   public void testDynPartMergeWithAborts() throws Exception {
+    boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
     int [][] resultData = new int[][] {{1,1}, {2,2}};
     runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1')");
     verifyDeltaDirAndResult(1, Table.ACIDTBLPART.toString(), "p=p1", resultData);
@@ -2845,7 +2858,8 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
     count = TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPACTION_QUEUE");
     // Only one job is added to the queue per table. This job corresponds to all the entries for a particular table
     // with rows in TXN_COMPONENTS
-    Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from COMPACTION_QUEUE"), 1, count);
+    Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from COMPACTION_QUEUE"),
+            useCleanerForAbortCleanup ? 0 : 1, count);
 
     r1 = runStatementOnDriver("select count(*) from " + Table.ACIDTBLPART);
     Assert.assertEquals("2", r1.get(0));
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithAbortCleanupUsingCompactionCycle.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithAbortCleanupUsingCompactionCycle.java
new file mode 100644
index 00000000000..628c0f979aa
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithAbortCleanupUsingCompactionCycle.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql;
+
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+
+/**
+ * Same as TestTxnCommands2 but tests ACID tables with abort cleanup happening explicitly using
+ * compaction cycle by default, and having 'transactional_properties' set to 'default'. This
+ * specifically tests the abort cleanup done exclusively using compaction cycle for ACID tables.
+ */
+public class TestTxnCommands2WithAbortCleanupUsingCompactionCycle extends TestTxnCommands2 {
+  public TestTxnCommands2WithAbortCleanupUsingCompactionCycle() {
+    super();
+  }
+
+  @Override
+  void initHiveConf() {
+    super.initHiveConf();
+    MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, false);
+  }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index a556c7fdf3b..113173acb37 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -32,15 +32,20 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.LockRequestBuilder;
 import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
 import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
 import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
 import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.FindNextCompactRequest;
 import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
 import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockType;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
 import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
@@ -135,6 +140,8 @@ public abstract class CompactorTest {
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true);
     MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, useMinHistoryWriteId());
+    // Set this config to true in the base class, there are extended test classes which set this config to false.
+    MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, true);
     TestTxnDbUtil.setConfValues(conf);
     TestTxnDbUtil.cleanDb(conf);
     TestTxnDbUtil.prepDb(conf);
@@ -260,6 +267,36 @@ public abstract class CompactorTest {
     return awiResp.getTxnToWriteIds().get(0).getWriteId();
   }
 
+  protected void addDeltaFileWithTxnComponents(Table t, Partition p, int numRecords, boolean abort)
+      throws Exception {
+    long txnId = openTxn();
+    long writeId = ms.allocateTableWriteId(txnId, t.getDbName(), t.getTableName());
+    acquireLock(t, p, txnId);
+    addDeltaFile(t, p, writeId, writeId, numRecords);
+    if (abort) {
+      txnHandler.abortTxns(new AbortTxnsRequest(Collections.singletonList(txnId)));
+    } else {
+      txnHandler.commitTxn(new CommitTxnRequest(txnId));
+    }
+  }
+
+  protected void acquireLock(Table t, Partition p, long txnId) throws Exception {
+    LockComponentBuilder lockCompBuilder = new LockComponentBuilder()
+            .setLock(LockType.SHARED_WRITE)
+            .setOperationType(DataOperationType.INSERT)
+            .setDbName(t.getDbName())
+            .setTableName(t.getTableName())
+            .setIsTransactional(true);
+    if (p != null) {
+      lockCompBuilder.setPartitionName(t.getPartitionKeys().get(0).getName() + "=" + p.getValues().get(0));
+    }
+    LockRequestBuilder requestBuilder = new LockRequestBuilder().setUser(null)
+            .setTransactionId(txnId).addLockComponent(lockCompBuilder.build());
+    requestBuilder.setZeroWaitReadEnabled(!conf.getBoolVar(HiveConf.ConfVars.TXN_OVERWRITE_X_LOCK) ||
+            !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK));
+    ms.lock(requestBuilder.build());
+  }
+
   protected void addDeltaFile(Table t, Partition p, long minTxn, long maxTxn, int numRecords)
       throws Exception {
     addFile(t, p, minTxn, maxTxn, numRecords, FileType.DELTA, 2, true);
@@ -347,7 +384,7 @@ public abstract class CompactorTest {
       } else if (open == null || !open.contains(tid)) {
         txnHandler.commitTxn(new CommitTxnRequest(tid));
       } else if (open.contains(tid) && useMinHistoryWriteId()){
-        txnHandler.addWriteIdsToMinHistory(tid, 
+        txnHandler.addWriteIdsToMinHistory(tid,
           Collections.singletonMap(dbName + "." + tblName, minOpenWriteId));
       }
     }
@@ -664,7 +701,7 @@ public abstract class CompactorTest {
    * are used since new (1.3) code has to be able to read old files.
    */
   abstract boolean useHive130DeltaDirName();
-  
+
   protected boolean useMinHistoryWriteId() {
     return false;
   }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycle.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycle.java
new file mode 100644
index 00000000000..b57599113e6
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycle.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.junit.Before;
+
+public class TestAbortCleanupUsingCompactionCycle extends TestCleaner {
+  @Override
+  @Before
+  public void setup() throws Exception {
+    super.setup();
+    MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, false);
+  }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycleWithMinHistoryWriteId.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycleWithMinHistoryWriteId.java
new file mode 100644
index 00000000000..bbc72661abf
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycleWithMinHistoryWriteId.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.junit.Before;
+
+public class TestAbortCleanupUsingCompactionCycleWithMinHistoryWriteId extends TestCleaner {
+  @Override
+  @Before
+  public void setup() throws Exception {
+    super.setup();
+    MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, false);
+  }
+
+  @Override
+  protected boolean useMinHistoryWriteId() {
+    return true;
+  }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index 82f052fe362..df4a786a184 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -67,6 +67,7 @@ import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_
 import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar;
 import static org.apache.hadoop.hive.ql.io.AcidUtils.addVisibilitySuffix;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
@@ -107,7 +108,7 @@ public class TestCleaner extends CompactorTest {
     FSRemover fsRemover = new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache);
     List<TaskHandler> taskHandlers = TaskHandlerFactory.getInstance()
             .getHandlers(conf, mockedHandler, metadataCache, false, fsRemover);
-    doThrow(new RuntimeException(errorMessage)).when(mockedHandler).markCleaned(nullable(CompactionInfo.class));
+    doThrow(new RuntimeException(errorMessage)).when(mockedHandler).markCleaned(nullable(CompactionInfo.class), eq(false));
 
     Table t = newTable("default", "retry_test", false);
 
@@ -130,8 +131,8 @@ public class TestCleaner extends CompactorTest {
     for (int i = 1; i < 4; i++) {
       Cleaner cleaner = new Cleaner();
       cleaner.setConf(conf);
-      cleaner.setCleanupHandlers(taskHandlers);
       cleaner.init(new AtomicBoolean(true));
+      cleaner.setCleanupHandlers(taskHandlers);
       FieldSetter.setField(cleaner, MetaStoreCompactorThread.class.getDeclaredField("txnHandler"), mockedHandler);
 
       cleaner.run();
@@ -157,8 +158,8 @@ public class TestCleaner extends CompactorTest {
     //Do a final run to reach the maximum retry attempts, so the state finally should be set to failed
     Cleaner cleaner = new Cleaner();
     cleaner.setConf(conf);
-    cleaner.setCleanupHandlers(taskHandlers);
     cleaner.init(new AtomicBoolean(true));
+    cleaner.setCleanupHandlers(taskHandlers);
     FieldSetter.setField(cleaner, MetaStoreCompactorThread.class.getDeclaredField("txnHandler"), mockedHandler);
 
     cleaner.run();
@@ -195,13 +196,13 @@ public class TestCleaner extends CompactorTest {
     FSRemover fsRemover = new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache);
     List<TaskHandler> taskHandlers = TaskHandlerFactory.getInstance()
             .getHandlers(conf, mockedHandler, metadataCache, false, fsRemover);
-    doThrow(new RuntimeException()).when(mockedHandler).markCleaned(nullable(CompactionInfo.class));
+    doThrow(new RuntimeException()).when(mockedHandler).markCleaned(nullable(CompactionInfo.class), eq(false));
 
     //Do a run to fail the clean and set the retention time
     Cleaner cleaner = new Cleaner();
     cleaner.setConf(conf);
-    cleaner.setCleanupHandlers(taskHandlers);
     cleaner.init(new AtomicBoolean(true));
+    cleaner.setCleanupHandlers(taskHandlers);
     FieldSetter.setField(cleaner, MetaStoreCompactorThread.class.getDeclaredField("txnHandler"), mockedHandler);
 
     cleaner.run();
@@ -216,8 +217,8 @@ public class TestCleaner extends CompactorTest {
     //Do a final run and check if the compaction is not picked up again
     cleaner = new Cleaner();
     cleaner.setConf(conf);
-    cleaner.setCleanupHandlers(taskHandlers);
     cleaner.init(new AtomicBoolean(true));
+    cleaner.setCleanupHandlers(taskHandlers);
     FieldSetter.setField(cleaner, MetaStoreCompactorThread.class.getDeclaredField("txnHandler"), mockedHandler);
 
     cleaner.run();
@@ -815,7 +816,7 @@ public class TestCleaner extends CompactorTest {
   }
 
   @Test
-  public void NoCleanupAfterMajorCompaction() throws Exception {
+  public void noCleanupAfterMajorCompaction() throws Exception {
     Map<String, String> parameters = new HashMap<>();
 
     //With no cleanup true
@@ -836,7 +837,7 @@ public class TestCleaner extends CompactorTest {
     // Check there are no compactions requests left.
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals(1, rsp.getCompactsSize());
-    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState());
+    Assert.assertEquals(TxnStore.REFUSED_RESPONSE, rsp.getCompacts().get(0).getState());
 
     // Check that the files are not removed
     List<Path> paths = getDirectories(conf, t, null);
@@ -884,7 +885,7 @@ public class TestCleaner extends CompactorTest {
     // Check there are no compactions requests left.
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals(1, rsp.getCompactsSize());
-    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState());
+    Assert.assertEquals(TxnStore.REFUSED_RESPONSE, rsp.getCompacts().get(0).getState());
 
     // Check that the files are not removed
     List<Path> paths = getDirectories(conf, t, p);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index 6a9a37dfe3f..c98b310ca25 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -123,6 +123,7 @@ public class TestInitiator extends CompactorTest {
 
   @Test
   public void majorCompactOnTableTooManyAborts() throws Exception {
+    boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
     Table t = newTable("default", "mcottma", false);
 
     HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10);
@@ -144,14 +145,19 @@ public class TestInitiator extends CompactorTest {
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-    Assert.assertEquals("initiated", compacts.get(0).getState());
-    Assert.assertEquals("mcottma", compacts.get(0).getTablename());
-    Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+    if (useCleanerForAbortCleanup) {
+      Assert.assertEquals(0, compacts.size());
+    } else {
+      Assert.assertEquals(1, compacts.size());
+      Assert.assertEquals("initiated", compacts.get(0).getState());
+      Assert.assertEquals("mcottma", compacts.get(0).getTablename());
+      Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+    }
   }
 
   @Test
   public void majorCompactOnPartitionTooManyAborts() throws Exception {
+    boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
     Table t = newTable("default", "mcoptma", true);
     Partition p = newPartition(t, "today");
 
@@ -175,11 +181,15 @@ public class TestInitiator extends CompactorTest {
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-    Assert.assertEquals("initiated", compacts.get(0).getState());
-    Assert.assertEquals("mcoptma", compacts.get(0).getTablename());
-    Assert.assertEquals("ds=today", compacts.get(0).getPartitionname());
-    Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+    if (useCleanerForAbortCleanup) {
+      Assert.assertEquals(0, compacts.size());
+    } else {
+      Assert.assertEquals(1, compacts.size());
+      Assert.assertEquals("initiated", compacts.get(0).getState());
+      Assert.assertEquals("mcoptma", compacts.get(0).getTablename());
+      Assert.assertEquals("ds=today", compacts.get(0).getPartitionname());
+      Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+    }
   }
 
   @Test
@@ -218,6 +228,7 @@ public class TestInitiator extends CompactorTest {
    */
   @Test
   public void compactExpiredAbortedTxns() throws Exception {
+    boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
     Table t = newTable("default", "expiredAbortedTxns", false);
     // abort a txn
     long txnid = openTxn();
@@ -243,8 +254,12 @@ public class TestInitiator extends CompactorTest {
     // set to 1 ms, wait 1 ms, and check that minor compaction is queued
     conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD, 1, TimeUnit.MILLISECONDS);
     Thread.sleep(1L);
-    ShowCompactResponse rsp = initiateAndVerifyCompactionQueueLength(1);
-    Assert.assertEquals(CompactionType.MINOR, rsp.getCompacts().get(0).getType());
+    if (useCleanerForAbortCleanup) {
+      initiateAndVerifyCompactionQueueLength(0);
+    } else {
+      ShowCompactResponse rsp = initiateAndVerifyCompactionQueueLength(1);
+      Assert.assertEquals(CompactionType.MINOR, rsp.getCompacts().get(0).getType());
+    }
   }
 
   private ShowCompactResponse initiateAndVerifyCompactionQueueLength(int expectedLength)
@@ -961,6 +976,7 @@ public class TestInitiator extends CompactorTest {
   }
 
   @Test public void testInitiatorFailure() throws Exception {
+    boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
     String tableName = "my_table";
     Table t = newTable("default", tableName, false);
 
@@ -991,9 +1007,13 @@ public class TestInitiator extends CompactorTest {
     // verify status of table compaction
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-    Assert.assertEquals("did not initiate", compacts.get(0).getState());
-    Assert.assertEquals(tableName, compacts.get(0).getTablename());
+    if (useCleanerForAbortCleanup) {
+      Assert.assertEquals(0, compacts.size());
+    } else {
+      Assert.assertEquals(1, compacts.size());
+      Assert.assertEquals("did not initiate", compacts.get(0).getState());
+      Assert.assertEquals(tableName, compacts.get(0).getTablename());
+    }
   }
 
   @Test
@@ -1029,6 +1049,7 @@ public class TestInitiator extends CompactorTest {
 
   @Test
   public void testInitiatorHostAndVersion() throws Exception {
+    boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
     String tableName = "my_table";
     Table t = newTable("default", tableName, false);
 
@@ -1059,14 +1080,18 @@ public class TestInitiator extends CompactorTest {
     // verify status of table compaction
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-    Assert.assertEquals("initiated", compacts.get(0).getState());
-    Assert.assertEquals(tableName, compacts.get(0).getTablename());
-    Assert.assertEquals(runtimeVersion, compacts.get(0).getInitiatorVersion());
-    // split the threadid
-    String[] parts = compacts.get(0).getInitiatorId().split("-");
-    Assert.assertTrue(parts.length > 1);
-    Assert.assertEquals(ServerUtils.hostname(), String.join("-", Arrays.copyOfRange(parts, 0, parts.length - 1)));
+    if (useCleanerForAbortCleanup) {
+      Assert.assertEquals(0, compacts.size());
+    } else {
+      Assert.assertEquals(1, compacts.size());
+      Assert.assertEquals("initiated", compacts.get(0).getState());
+      Assert.assertEquals(tableName, compacts.get(0).getTablename());
+      Assert.assertEquals(runtimeVersion, compacts.get(0).getInitiatorVersion());
+      // split the threadid
+      String[] parts = compacts.get(0).getInitiatorId().split("-");
+      Assert.assertTrue(parts.length > 1);
+      Assert.assertEquals(ServerUtils.hostname(), String.join("-", Arrays.copyOfRange(parts, 0, parts.length - 1)));
+    }
   }
 
   @Test
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiatorWithAbortCleanupUsingCompactionCycle.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiatorWithAbortCleanupUsingCompactionCycle.java
new file mode 100644
index 00000000000..5fdf9fc72a9
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiatorWithAbortCleanupUsingCompactionCycle.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+
+public class TestInitiatorWithAbortCleanupUsingCompactionCycle extends TestInitiator {
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, false);
+  }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java
new file mode 100644
index 00000000000..3b40e2f7594
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor.handler;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.txn.compactor.Cleaner;
+import org.apache.hadoop.hive.ql.txn.compactor.CleanupRequest;
+import org.apache.hadoop.hive.ql.txn.compactor.FSRemover;
+import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.mockito.ArgumentMatchers.any;
+
+public class TestAbortedTxnCleaner extends TestHandler {
+
+  @Test
+  public void testCleaningOfAbortedDirectoriesForUnpartitionedTables() throws Exception {
+    String dbName = "default", tableName = "handler_unpart_test";
+    Table t = newTable(dbName, tableName, false);
+
+    // 3-aborted deltas & one committed delta
+    addDeltaFileWithTxnComponents(t, null, 2, true);
+    addDeltaFileWithTxnComponents(t, null, 2, true);
+    addDeltaFileWithTxnComponents(t, null, 2, false);
+    addDeltaFileWithTxnComponents(t, null, 2, true);
+
+    HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+    MetadataCache metadataCache = new MetadataCache(true);
+    FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache));
+    TaskHandler mockedTaskHandler = Mockito.spy(new AbortedTxnCleaner(conf, txnHandler, metadataCache,
+            false, mockedFSRemover));
+    Cleaner cleaner = new Cleaner();
+    cleaner.setConf(conf);
+    cleaner.init(new AtomicBoolean(true));
+    cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+    cleaner.run();
+
+    Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class));
+    Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+
+    List<Path> directories = getDirectories(conf, t, null);
+    // All aborted directories removed, hence 1 committed delta directory must be present
+    Assert.assertEquals(1, directories.size());
+  }
+
+  @Test
+  public void testCleaningOfAbortedDirectoriesForSinglePartition() throws Exception {
+    String dbName = "default", tableName = "handler_part_single_test", partName = "today";
+    Table t = newTable(dbName, tableName, true);
+    Partition p = newPartition(t, partName);
+
+    // 3-aborted deltas & one committed delta
+    addDeltaFileWithTxnComponents(t, p, 2, true);
+    addDeltaFileWithTxnComponents(t, p, 2, true);
+    addDeltaFileWithTxnComponents(t, p, 2, false);
+    addDeltaFileWithTxnComponents(t, p, 2, true);
+
+    HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+    MetadataCache metadataCache = new MetadataCache(true);
+    FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache));
+    TaskHandler mockedTaskHandler = Mockito.spy(new AbortedTxnCleaner(conf, txnHandler, metadataCache,
+            false, mockedFSRemover));
+    Cleaner cleaner = new Cleaner();
+    cleaner.setConf(conf);
+    cleaner.init(new AtomicBoolean(true));
+    cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+    cleaner.run();
+
+    Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class));
+    Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+
+    List<Path> directories = getDirectories(conf, t, p);
+    // All aborted directories removed, hence 1 committed delta directory must be present
+    Assert.assertEquals(1, directories.size());
+  }
+
+  @Test
+  public void testCleaningOfAbortedDirectoriesForMultiplePartitions() throws Exception {
+    String dbName = "default", tableName = "handler_part_multiple_test", partName1 = "today1", partName2 = "today2";
+    Table t = newTable(dbName, tableName, true);
+    Partition p1 = newPartition(t, partName1);
+    Partition p2 = newPartition(t, partName2);
+
+    // 3-aborted deltas & one committed delta for partition-1
+    addDeltaFileWithTxnComponents(t, p1, 2, true);
+    addDeltaFileWithTxnComponents(t, p1, 2, true);
+    addDeltaFileWithTxnComponents(t, p1, 2, false);
+    addDeltaFileWithTxnComponents(t, p1, 2, true);
+
+    // 3-aborted deltas & one committed delta for partition-2
+    addDeltaFileWithTxnComponents(t, p2, 2, true);
+    addDeltaFileWithTxnComponents(t, p2, 2, true);
+    addDeltaFileWithTxnComponents(t, p2, 2, false);
+    addDeltaFileWithTxnComponents(t, p2, 2, true);
+
+    HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+    MetadataCache metadataCache = new MetadataCache(true);
+    FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache));
+    TaskHandler mockedTaskHandler = Mockito.spy(new AbortedTxnCleaner(conf, txnHandler, metadataCache,
+            false, mockedFSRemover));
+    Cleaner cleaner = new Cleaner();
+    cleaner.setConf(conf);
+    cleaner.init(new AtomicBoolean(true));
+    cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+    cleaner.run();
+
+    Mockito.verify(mockedFSRemover, Mockito.times(2)).clean(any(CleanupRequest.class));
+    Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+
+    List<Path> directories = getDirectories(conf, t, p1);
+    // All aborted directories removed, hence 1 committed delta directory must be present
+    Assert.assertEquals(1, directories.size());
+
+    directories = getDirectories(conf, t, p2);
+    // All aborted directories removed, hence 1 committed delta directory must be present
+    Assert.assertEquals(1, directories.size());
+  }
+
+  @Test
+  public void testCleaningOfAbortedDirectoriesWithLongRunningOpenWriteTxn() throws Exception {
+    String dbName = "default", tableName = "handler_unpart_open_test";
+    Table t = newTable(dbName, tableName, false);
+
+    // 3-aborted deltas & one committed delta
+    addDeltaFileWithTxnComponents(t, null, 2, true);
+    addDeltaFileWithTxnComponents(t, null, 2, true);
+    addDeltaFileWithTxnComponents(t, null, 2, false);
+    addDeltaFileWithTxnComponents(t, null, 2, true);
+
+    // Open a long-running transaction
+    long openTxnId = openTxn();
+    long writeId = ms.allocateTableWriteId(openTxnId, t.getDbName(), t.getTableName());
+    acquireLock(t, null, openTxnId);
+    addDeltaFile(t, null, writeId, writeId, 2);
+
+    // Add an aborted write after open txn
+    addDeltaFileWithTxnComponents(t, null, 2, true);
+
+    HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+    MetadataCache metadataCache = new MetadataCache(true);
+    FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache));
+    TaskHandler mockedTaskHandler = Mockito.spy(new AbortedTxnCleaner(conf, txnHandler, metadataCache,
+            false, mockedFSRemover));
+    Cleaner cleaner = new Cleaner();
+    cleaner.setConf(conf);
+    cleaner.init(new AtomicBoolean(true));
+    cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+    cleaner.run();
+
+    Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class));
+    Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+
+    List<Path> directories = getDirectories(conf, t, null);
+    // All aborted directories below min open write ID are removed,
+    // hence 1 open, 1 committed, 1 aborted delta directory must be present
+    Assert.assertEquals(3, directories.size());
+
+    // Commit the long open txn
+    txnHandler.commitTxn(new CommitTxnRequest(openTxnId));
+  }
+
+  @Test
+  public void testCleaningOfAbortedDirectoriesOnTopOfBase() throws Exception {
+    String dbName = "default", tableName = "handler_unpart_top_test";
+    Table t = newTable(dbName, tableName, false);
+
+    // Add 4 committed deltas
+    addDeltaFileWithTxnComponents(t, null, 2, false);
+    addDeltaFileWithTxnComponents(t, null, 2, false);
+    addDeltaFileWithTxnComponents(t, null, 2, false);
+    addDeltaFileWithTxnComponents(t, null, 2, false);
+
+    CompactionRequest cr = new CompactionRequest(dbName, tableName, CompactionType.MAJOR);
+    txnHandler.compact(cr);
+
+    // Run compaction
+    startWorker();
+
+    // Check if there is a one base file
+    List<Path> directories = getDirectories(conf, t, null);
+    // Both base and delta files are present since we haven't cleaned yet.
+    Assert.assertEquals(5, directories.size());
+    Assert.assertEquals(1, directories.stream().filter(dir -> dir.getName().startsWith(AcidUtils.BASE_PREFIX)).count());
+
+    // 3 aborted deltas
+    addDeltaFileWithTxnComponents(t, null, 2, true);
+    addDeltaFileWithTxnComponents(t, null, 2, true);
+    addDeltaFileWithTxnComponents(t, null, 2, true);
+
+    HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+    MetadataCache metadataCache = new MetadataCache(true);
+    FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache));
+    TaskHandler mockedTaskHandler = Mockito.spy(new AbortedTxnCleaner(conf, txnHandler, metadataCache,
+            false, mockedFSRemover));
+    Cleaner cleaner = new Cleaner();
+    cleaner.setConf(conf);
+    cleaner.init(new AtomicBoolean(true));
+    cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+    cleaner.run();
+
+    Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class));
+    Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+
+    directories = getDirectories(conf, t, null);
+    Assert.assertEquals(1, directories.size());
+    Assert.assertTrue(directories.get(0).getName().startsWith(AcidUtils.BASE_PREFIX));
+  }
+
+  @Test
+  public void testCleaningOfAbortedDirectoriesBelowBase() throws Exception {
+    String dbName = "default", tableName = "handler_unpart_below_test";
+    Table t = newTable(dbName, tableName, false);
+
+    // Add 2 committed deltas and 2 aborted deltas
+    addDeltaFileWithTxnComponents(t, null, 2, false);
+    addDeltaFileWithTxnComponents(t, null, 2, true);
+    addDeltaFileWithTxnComponents(t, null, 2, true);
+    addDeltaFileWithTxnComponents(t, null, 2, false);
+
+    CompactionRequest cr = new CompactionRequest(dbName, tableName, CompactionType.MAJOR);
+    txnHandler.compact(cr);
+
+    // Run compaction
+    startWorker();
+
+    // Check if there is a one base file
+    List<Path> directories = getDirectories(conf, t, null);
+    // Both base and delta files are present since we haven't cleaned yet.
+    Assert.assertEquals(5, directories.size());
+    Assert.assertEquals(1, directories.stream().filter(dir -> dir.getName().startsWith(AcidUtils.BASE_PREFIX)).count());
+
+    HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+    MetadataCache metadataCache = new MetadataCache(true);
+    FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache));
+    TaskHandler mockedTaskHandler = Mockito.spy(new AbortedTxnCleaner(conf, txnHandler, metadataCache,
+            false, mockedFSRemover));
+    Cleaner cleaner = new Cleaner();
+    cleaner.setConf(conf);
+    cleaner.init(new AtomicBoolean(true));
+    cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+    cleaner.run();
+
+    Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class));
+    Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+
+    directories = getDirectories(conf, t, null);
+    // The table is already compacted, so we must see 1 base delta
+    Assert.assertEquals(1, directories.size());
+  }
+
+  @Test
+  public void testAbortedCleaningWithThreeTxnsWithDiffWriteIds() throws Exception {
+    String dbName = "default", tableName = "handler_unpart_writeid_test";
+    Table t = newTable(dbName, tableName, false);
+
+    // Add 2 committed deltas and 2 aborted deltas
+    addDeltaFileWithTxnComponents(t, null, 2, false);
+    addDeltaFileWithTxnComponents(t, null, 2, true);
+    addDeltaFileWithTxnComponents(t, null, 2, true);
+    addDeltaFileWithTxnComponents(t, null, 2, false);
+
+    long openTxnId1 = openTxn();
+    long openTxnId2 = openTxn();
+    long openTxnId3 = openTxn();
+    long writeId2 = ms.allocateTableWriteId(openTxnId2, t.getDbName(), t.getTableName());
+    long writeId3 = ms.allocateTableWriteId(openTxnId3, t.getDbName(), t.getTableName());
+    long writeId1 = ms.allocateTableWriteId(openTxnId1, t.getDbName(), t.getTableName());
+    assert writeId2 < writeId1 && writeId2 < writeId3;
+    acquireLock(t, null, openTxnId3);
+    acquireLock(t, null, openTxnId2);
+    acquireLock(t, null, openTxnId1);
+    addDeltaFile(t, null, writeId3, writeId3, 2);
+    addDeltaFile(t, null, writeId1, writeId1, 2);
+    addDeltaFile(t, null, writeId2, writeId2, 2);
+
+    ms.abortTxns(Collections.singletonList(openTxnId2));
+    ms.commitTxn(openTxnId3);
+
+    HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+    MetadataCache metadataCache = new MetadataCache(true);
+    FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache));
+    TaskHandler mockedTaskHandler = Mockito.spy(new AbortedTxnCleaner(conf, txnHandler, metadataCache,
+            false, mockedFSRemover));
+    Cleaner cleaner = new Cleaner();
+    cleaner.setConf(conf);
+    cleaner.init(new AtomicBoolean(true));
+    cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+    cleaner.run();
+
+    List<Path> directories = getDirectories(conf, t, null);
+    Assert.assertEquals(5, directories.size());
+  }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java
index c14da6950c4..b1d60b8a851 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java
@@ -67,8 +67,8 @@ public class TestHandler extends TestCleaner {
     AtomicBoolean stop = new AtomicBoolean(true);
     Cleaner cleaner = new Cleaner();
     cleaner.setConf(conf);
-    cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
     cleaner.init(stop);
+    cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
     cleaner.run();
 
     Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class));
@@ -98,8 +98,8 @@ public class TestHandler extends TestCleaner {
             false, fsRemover));
     Cleaner cleaner = new Cleaner();
     cleaner.setConf(conf);
-    cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
     cleaner.init(new AtomicBoolean(true));
+    cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
     cleaner.run();
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 570f4f94dbe..8d17c0a23cf 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -649,6 +649,10 @@ public class MetastoreConf {
     COMPACTOR_CLEANER_TABLECACHE_ON("metastore.compactor.cleaner.tablecache.on",
             "hive.compactor.cleaner.tablecache.on", true,
             "Enable table caching in the cleaner. Currently the cache is cleaned after each cycle."),
+    COMPACTOR_CLEAN_ABORTS_USING_CLEANER("metastore.compactor.clean.aborts.using.cleaner", "hive.compactor.clean.aborts.using.cleaner", false,
+            "Whether to use cleaner for cleaning aborted directories or not.\n" +
+            "Set to true when cleaner is expected to clean delta/delete-delta directories from aborted transactions.\n" +
+            "Otherwise the cleanup of such directories will take place within the compaction cycle."),
     HIVE_COMPACTOR_CONNECTION_POOLING_MAX_CONNECTIONS("metastore.compactor.connectionPool.maxPoolSize",
             "hive.compactor.connectionPool.maxPoolSize", 5,
             "Specify the maximum number of connections in the connection pool used by the compactor."),
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
index 5c091836d81..5357e394506 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
@@ -8815,7 +8815,7 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
 
   @Override
   public void mark_cleaned(CompactionInfoStruct cr) throws MetaException {
-    getTxnHandler().markCleaned(CompactionInfo.compactionStructToInfo(cr));
+    getTxnHandler().markCleaned(CompactionInfo.compactionStructToInfo(cr), false);
   }
 
   @Override
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
index 5e5db301d63..dda975f17b4 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hive.metastore.txn;
 
-import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
 import org.apache.hadoop.hive.metastore.api.CompactionInfoStruct;
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 12851547693..5e3e31c1bc5 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -162,31 +162,33 @@ class CompactionTxnHandler extends TxnHandler {
         }
         rs.close();
 
-        // Check for aborted txns: number of aborted txns past threshold and age of aborted txns
-        // past time threshold
-        boolean checkAbortedTimeThreshold = abortedTimeThreshold >= 0;
-        String sCheckAborted = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", " +
-          "MIN(\"TXN_STARTED\"), COUNT(*) FROM \"TXNS\", \"TXN_COMPONENTS\" " +
-          "   WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = " + TxnStatus.ABORTED + " " +
-          "GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" " +
-              (checkAbortedTimeThreshold ? "" : " HAVING COUNT(*) > " + abortedThreshold);
-
-        LOG.debug("Going to execute query <{}>", sCheckAborted);
-        rs = stmt.executeQuery(sCheckAborted);
-        long systemTime = System.currentTimeMillis();
-        while (rs.next()) {
-          boolean pastTimeThreshold =
-              checkAbortedTimeThreshold && rs.getLong(4) + abortedTimeThreshold < systemTime;
-          int numAbortedTxns = rs.getInt(5);
-          if (numAbortedTxns > abortedThreshold || pastTimeThreshold) {
-            CompactionInfo info = new CompactionInfo();
-            info.dbname = rs.getString(1);
-            info.tableName = rs.getString(2);
-            info.partName = rs.getString(3);
-            info.tooManyAborts = numAbortedTxns > abortedThreshold;
-            info.hasOldAbort = pastTimeThreshold;
-            LOG.debug("Found potential compaction: {}", info);
-            response.add(info);
+        if (!MetastoreConf.getBoolVar(conf, ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER)) {
+          // Check for aborted txns: number of aborted txns past threshold and age of aborted txns
+          // past time threshold
+          boolean checkAbortedTimeThreshold = abortedTimeThreshold >= 0;
+          String sCheckAborted = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", " +
+                  "MIN(\"TXN_STARTED\"), COUNT(*) FROM \"TXNS\", \"TXN_COMPONENTS\" " +
+                  "   WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = " + TxnStatus.ABORTED + " " +
+                  "GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" " +
+                  (checkAbortedTimeThreshold ? "" : " HAVING COUNT(*) > " + abortedThreshold);
+
+          LOG.debug("Going to execute query <{}>", sCheckAborted);
+          rs = stmt.executeQuery(sCheckAborted);
+          long systemTime = System.currentTimeMillis();
+          while (rs.next()) {
+            boolean pastTimeThreshold =
+                    checkAbortedTimeThreshold && rs.getLong(4) + abortedTimeThreshold < systemTime;
+            int numAbortedTxns = rs.getInt(5);
+            if (numAbortedTxns > abortedThreshold || pastTimeThreshold) {
+              CompactionInfo info = new CompactionInfo();
+              info.dbname = rs.getString(1);
+              info.tableName = rs.getString(2);
+              info.partName = rs.getString(3);
+              info.tooManyAborts = numAbortedTxns > abortedThreshold;
+              info.hasOldAbort = pastTimeThreshold;
+              LOG.debug("Found potential compaction: {}", info);
+              response.add(info);
+            }
           }
         }
       } catch (SQLException e) {
@@ -464,6 +466,55 @@ class CompactionTxnHandler extends TxnHandler {
     }
   }
 
+  @Override
+  @RetrySemantics.ReadOnly
+  public List<CompactionInfo> findReadyToCleanAborts(long abortedTimeThreshold, int abortedThreshold) throws MetaException {
+    try {
+      List<CompactionInfo> readyToCleanAborts = new ArrayList<>();
+      try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolCompaction);
+           Statement stmt = dbConn.createStatement()) {
+        boolean checkAbortedTimeThreshold = abortedTimeThreshold >= 0;
+        String sCheckAborted = "SELECT \"tc\".\"TC_DATABASE\", \"tc\".\"TC_TABLE\", \"tc\".\"TC_PARTITION\", " +
+            " \"tc\".\"MIN_TXN_START_TIME\", \"tc\".\"ABORTED_TXN_COUNT\", \"minOpenWriteTxnId\".\"MIN_OPEN_WRITE_TXNID\" FROM " +
+            " ( SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", " +
+            " MIN(\"TXN_STARTED\") AS \"MIN_TXN_START_TIME\", COUNT(*) AS \"ABORTED_TXN_COUNT\" FROM \"TXNS\", \"TXN_COMPONENTS\" " +
+            " WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = " + TxnStatus.ABORTED +
+            " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" " +
+            (checkAbortedTimeThreshold ? "" : " HAVING COUNT(*) > " + abortedThreshold) + " ) \"tc\" " +
+            " LEFT JOIN ( SELECT MIN(\"TC_TXNID\") AS \"MIN_OPEN_WRITE_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" FROM \"TXNS\", \"TXN_COMPONENTS\" " +
+            " WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\"=" + TxnStatus.OPEN + " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" ) \"minOpenWriteTxnId\" " +
+            " ON \"tc\".\"TC_DATABASE\" = \"minOpenWriteTxnId\".\"TC_DATABASE\" AND \"tc\".\"TC_TABLE\" = \"minOpenWriteTxnId\".\"TC_TABLE\"" +
+            " AND (\"tc\".\"TC_PARTITION\" = \"minOpenWriteTxnId\".\"TC_PARTITION\" OR (\"tc\".\"TC_PARTITION\" IS NULL AND \"minOpenWriteTxnId\".\"TC_PARTITION\" IS NULL))";
+
+        LOG.debug("Going to execute query <{}>", sCheckAborted);
+        try (ResultSet rs = stmt.executeQuery(sCheckAborted)) {
+          long systemTime = System.currentTimeMillis();
+          while (rs.next()) {
+            boolean pastTimeThreshold =
+                    checkAbortedTimeThreshold && rs.getLong(4) + abortedTimeThreshold < systemTime;
+            int numAbortedTxns = rs.getInt(5);
+            if (numAbortedTxns > abortedThreshold || pastTimeThreshold) {
+              CompactionInfo info = new CompactionInfo();
+              info.dbname = rs.getString(1);
+              info.tableName = rs.getString(2);
+              info.partName = rs.getString(3);
+              // In this case, this field contains min open write txn ID.
+              info.txnId = rs.getLong(6);
+              readyToCleanAborts.add(info);
+            }
+          }
+        }
+        return readyToCleanAborts;
+      } catch (SQLException e) {
+        LOG.error("Unable to select next element for cleaning, " + e.getMessage());
+        checkRetryable(e, "findReadyToCleanForAborts");
+        throw new MetaException("Unable to connect to transaction database " +
+                e.getMessage());
+      }
+    } catch (RetryException e) {
+      return findReadyToCleanAborts(abortedTimeThreshold, abortedThreshold);
+    }
+  }
 
   /**
    * Mark the cleaning start time for a particular compaction
@@ -565,7 +616,7 @@ class CompactionTxnHandler extends TxnHandler {
    */
   @Override
   @RetrySemantics.CannotRetry
-  public void markCleaned(CompactionInfo info) throws MetaException {
+  public void markCleaned(CompactionInfo info, boolean isAbortOnly) throws MetaException {
     LOG.debug("Running markCleaned with CompactionInfo: {}", info);
     try {
       Connection dbConn = null;
@@ -573,132 +624,151 @@ class CompactionTxnHandler extends TxnHandler {
       ResultSet rs = null;
       try {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolCompaction);
-        String s = "INSERT INTO \"COMPLETED_COMPACTIONS\"(\"CC_ID\", \"CC_DATABASE\", "
-            + "\"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", \"CC_TYPE\", \"CC_TBLPROPERTIES\", \"CC_WORKER_ID\", "
-            + "\"CC_START\", \"CC_END\", \"CC_RUN_AS\", \"CC_HIGHEST_WRITE_ID\", \"CC_META_INFO\", "
-            + "\"CC_HADOOP_JOB_ID\", \"CC_ERROR_MESSAGE\", \"CC_ENQUEUE_TIME\", "
-            + "\"CC_WORKER_VERSION\", \"CC_INITIATOR_ID\", \"CC_INITIATOR_VERSION\", "
-            + "\"CC_NEXT_TXN_ID\", \"CC_TXN_ID\", \"CC_COMMIT_TIME\", \"CC_POOL_NAME\", \"CC_NUMBER_OF_BUCKETS\","
-            + "\"CC_ORDER_BY\") "
-          + "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", "
-            + quoteChar(SUCCEEDED_STATE) + ", \"CQ_TYPE\", \"CQ_TBLPROPERTIES\", \"CQ_WORKER_ID\", \"CQ_START\", "
-            + getEpochFn(dbProduct) + ", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", "
-            + "\"CQ_HADOOP_JOB_ID\", \"CQ_ERROR_MESSAGE\", \"CQ_ENQUEUE_TIME\", "
-            + "\"CQ_WORKER_VERSION\", \"CQ_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\", "
-            + "\"CQ_NEXT_TXN_ID\", \"CQ_TXN_ID\", \"CQ_COMMIT_TIME\", \"CQ_POOL_NAME\", \"CQ_NUMBER_OF_BUCKETS\", "
-            + "\"CQ_ORDER_BY\" "
-            + "FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?";
-        pStmt = dbConn.prepareStatement(s);
-        pStmt.setLong(1, info.id);
-        LOG.debug("Going to execute update <{}> for CQ_ID={}", s, info.id);
-        pStmt.executeUpdate();
-
-        s = "DELETE FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?";
-        pStmt = dbConn.prepareStatement(s);
-        pStmt.setLong(1, info.id);
-        LOG.debug("Going to execute update <{}>", s);
-        int updCount = pStmt.executeUpdate();
-        if (updCount != 1) {
-          LOG.error("Unable to delete compaction record: {}.  Update count={}", info, updCount);
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
-        }
-        // Remove entries from completed_txn_components as well, so we don't start looking there
-        // again but only up to the highest write ID include in this compaction job.
-        //highestWriteId will be NULL in upgrade scenarios
-        s = "DELETE FROM \"COMPLETED_TXN_COMPONENTS\" WHERE \"CTC_DATABASE\" = ? AND \"CTC_TABLE\" = ?";
-        if (info.partName != null) {
-          s += " AND \"CTC_PARTITION\" = ?";
-        }
-        if(info.highestWriteId != 0) {
-          s += " AND \"CTC_WRITEID\" <= ?";
-        }
-        pStmt = dbConn.prepareStatement(s);
-        int paramCount = 1;
-        pStmt.setString(paramCount++, info.dbname);
-        pStmt.setString(paramCount++, info.tableName);
-        if (info.partName != null) {
-          pStmt.setString(paramCount++, info.partName);
-        }
-        if(info.highestWriteId != 0) {
-          pStmt.setLong(paramCount, info.highestWriteId);
-        }
-        LOG.debug("Going to execute update <{}>", s);
-        if ((updCount = pStmt.executeUpdate()) < 1) {
-          LOG.warn("Expected to remove at least one row from completed_txn_components when " +
-            "marking compaction entry as clean!");
-        }
-        LOG.debug("Removed {} records from completed_txn_components", updCount);
-        /*
-         * compaction may remove data from aborted txns above tc_writeid bit it only guarantees to
-         * remove it up to (inclusive) tc_writeid, so it's critical to not remove metadata about
-         * aborted TXN_COMPONENTS above tc_writeid (and consequently about aborted txns).
-         * See {@link ql.txn.compactor.Cleaner.removeFiles()}
-         */
-        s = "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" IN ( "
-              + "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + TxnStatus.ABORTED + ") "
-            + "AND \"TC_DATABASE\" = ? AND \"TC_TABLE\" = ? "
-            + "AND \"TC_PARTITION\" "+ (info.partName != null ? "= ?" : "IS NULL");
-
-        List<String> queries = new ArrayList<>();
-        Iterator<Long> writeIdsIter = null;
-        List<Integer> counts = null;
-
-        if (info.writeIds != null && !info.writeIds.isEmpty()) {
-          StringBuilder prefix = new StringBuilder(s).append(" AND ");
-          List<String> questions = Collections.nCopies(info.writeIds.size(), "?");
+        if (!isAbortOnly) {
+          String s = "INSERT INTO \"COMPLETED_COMPACTIONS\"(\"CC_ID\", \"CC_DATABASE\", "
+              + "\"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", \"CC_TYPE\", \"CC_TBLPROPERTIES\", \"CC_WORKER_ID\", "
+              + "\"CC_START\", \"CC_END\", \"CC_RUN_AS\", \"CC_HIGHEST_WRITE_ID\", \"CC_META_INFO\", "
+              + "\"CC_HADOOP_JOB_ID\", \"CC_ERROR_MESSAGE\", \"CC_ENQUEUE_TIME\", "
+              + "\"CC_WORKER_VERSION\", \"CC_INITIATOR_ID\", \"CC_INITIATOR_VERSION\", "
+              + "\"CC_NEXT_TXN_ID\", \"CC_TXN_ID\", \"CC_COMMIT_TIME\", \"CC_POOL_NAME\", \"CC_NUMBER_OF_BUCKETS\","
+              + "\"CC_ORDER_BY\") "
+              + "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", "
+              + quoteChar(SUCCEEDED_STATE) + ", \"CQ_TYPE\", \"CQ_TBLPROPERTIES\", \"CQ_WORKER_ID\", \"CQ_START\", "
+              + getEpochFn(dbProduct) + ", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", "
+              + "\"CQ_HADOOP_JOB_ID\", \"CQ_ERROR_MESSAGE\", \"CQ_ENQUEUE_TIME\", "
+              + "\"CQ_WORKER_VERSION\", \"CQ_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\", "
+              + "\"CQ_NEXT_TXN_ID\", \"CQ_TXN_ID\", \"CQ_COMMIT_TIME\", \"CQ_POOL_NAME\", \"CQ_NUMBER_OF_BUCKETS\", "
+              + "\"CQ_ORDER_BY\" "
+              + "FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?";
+          pStmt = dbConn.prepareStatement(s);
+          pStmt.setLong(1, info.id);
+          LOG.debug("Going to execute update <{}> for CQ_ID={}", s, info.id);
+          pStmt.executeUpdate();
 
-          counts = TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix,
-            new StringBuilder(), questions, "\"TC_WRITEID\"", false, false);
-          writeIdsIter = info.writeIds.iterator();
-        } else if (!info.hasUncompactedAborts){
+          s = "DELETE FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?";
+          pStmt = dbConn.prepareStatement(s);
+          pStmt.setLong(1, info.id);
+          LOG.debug("Going to execute update <{}>", s);
+          int updCount = pStmt.executeUpdate();
+          if (updCount != 1) {
+            LOG.error("Unable to delete compaction record: {}.  Update count={}", info, updCount);
+            LOG.debug("Going to rollback");
+            dbConn.rollback();
+          }
+          // Remove entries from completed_txn_components as well, so we don't start looking there
+          // again but only up to the highest write ID include in this compaction job.
+          //highestWriteId will be NULL in upgrade scenarios
+          s = "DELETE FROM \"COMPLETED_TXN_COMPONENTS\" WHERE \"CTC_DATABASE\" = ? AND \"CTC_TABLE\" = ?";
+          if (info.partName != null) {
+            s += " AND \"CTC_PARTITION\" = ?";
+          }
           if (info.highestWriteId != 0) {
-            s += " AND \"TC_WRITEID\" <= ?";
+            s += " AND \"CTC_WRITEID\" <= ?";
           }
-          queries.add(s);
-        }
-
-        for (int i = 0; i < queries.size(); i++) {
-          String query = queries.get(i);
-          int writeIdCount = (counts != null) ? counts.get(i) : 0;
-
-          LOG.debug("Going to execute update <{}>", query);
-          pStmt = dbConn.prepareStatement(query);
-          paramCount = 1;
-
+          pStmt = dbConn.prepareStatement(s);
+          int paramCount = 1;
           pStmt.setString(paramCount++, info.dbname);
           pStmt.setString(paramCount++, info.tableName);
           if (info.partName != null) {
             pStmt.setString(paramCount++, info.partName);
           }
-          if (info.highestWriteId != 0 && writeIdCount == 0) {
+          if (info.highestWriteId != 0) {
             pStmt.setLong(paramCount, info.highestWriteId);
           }
-          for (int j = 0; j < writeIdCount; j++) {
-            if (writeIdsIter.hasNext()) {
-              pStmt.setLong(paramCount + j, writeIdsIter.next());
-            }
+          LOG.debug("Going to execute update <{}>", s);
+          if ((updCount = pStmt.executeUpdate()) < 1) {
+            LOG.warn("Expected to remove at least one row from completed_txn_components when " +
+                    "marking compaction entry as clean!");
           }
-          int rc = pStmt.executeUpdate();
-          LOG.debug("Removed {} records from txn_components", rc);
-          // Don't bother cleaning from the txns table.  A separate call will do that.  We don't
-          // know here which txns still have components from other tables or partitions in the
-          // table, so we don't know which ones we can and cannot clean.
+          LOG.debug("Removed {} records from completed_txn_components", updCount);
         }
+
+        // Do cleanup of metadata in TXN_COMPONENTS table.
+        removeTxnComponents(dbConn, info);
         LOG.debug("Going to commit");
         dbConn.commit();
       } catch (SQLException e) {
         LOG.error("Unable to delete from compaction queue " + e.getMessage());
         LOG.debug("Going to rollback");
         rollbackDBConn(dbConn);
-        checkRetryable(e, "markCleaned(" + info + ")");
+        checkRetryable(e, "markCleaned(" + info + "," + isAbortOnly + ")");
         throw new MetaException("Unable to connect to transaction database " +
           e.getMessage());
       } finally {
         close(rs, pStmt, dbConn);
       }
     } catch (RetryException e) {
-      markCleaned(info);
+      markCleaned(info, isAbortOnly);
+    }
+  }
+
+  private void removeTxnComponents(Connection dbConn, CompactionInfo info) throws MetaException, RetryException {
+    PreparedStatement pStmt = null;
+    ResultSet rs = null;
+    try {
+      /*
+       * compaction may remove data from aborted txns above tc_writeid bit it only guarantees to
+       * remove it up to (inclusive) tc_writeid, so it's critical to not remove metadata about
+       * aborted TXN_COMPONENTS above tc_writeid (and consequently about aborted txns).
+       * See {@link ql.txn.compactor.Cleaner.removeFiles()}
+       */
+      String s = "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" IN ( "
+              + "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + TxnStatus.ABORTED + ") "
+              + "AND \"TC_DATABASE\" = ? AND \"TC_TABLE\" = ? "
+              + "AND \"TC_PARTITION\" " + (info.partName != null ? "= ?" : "IS NULL");
+
+      List<String> queries = new ArrayList<>();
+      Iterator<Long> writeIdsIter = null;
+      List<Integer> counts = null;
+
+      if (info.writeIds != null && !info.writeIds.isEmpty()) {
+        StringBuilder prefix = new StringBuilder(s).append(" AND ");
+        List<String> questions = Collections.nCopies(info.writeIds.size(), "?");
+
+        counts = TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix,
+                new StringBuilder(), questions, "\"TC_WRITEID\"", false, false);
+        writeIdsIter = info.writeIds.iterator();
+      } else if (!info.hasUncompactedAborts) {
+        if (info.highestWriteId != 0) {
+          s += " AND \"TC_WRITEID\" <= ?";
+        }
+        queries.add(s);
+      }
+
+      for (int i = 0; i < queries.size(); i++) {
+        String query = queries.get(i);
+        int writeIdCount = (counts != null) ? counts.get(i) : 0;
+
+        LOG.debug("Going to execute update <{}>", query);
+        pStmt = dbConn.prepareStatement(query);
+        int paramCount = 1;
+
+        pStmt.setString(paramCount++, info.dbname);
+        pStmt.setString(paramCount++, info.tableName);
+        if (info.partName != null) {
+          pStmt.setString(paramCount++, info.partName);
+        }
+        if (info.highestWriteId != 0 && writeIdCount == 0) {
+          pStmt.setLong(paramCount, info.highestWriteId);
+        }
+        for (int j = 0; j < writeIdCount; j++) {
+          if (writeIdsIter.hasNext()) {
+            pStmt.setLong(paramCount + j, writeIdsIter.next());
+          }
+        }
+        int rc = pStmt.executeUpdate();
+        LOG.debug("Removed {} records from txn_components", rc);
+      }
+    } catch (SQLException e) {
+      LOG.error("Unable to delete from txn components due to {}", e.getMessage());
+      LOG.debug("Going to rollback");
+      rollbackDBConn(dbConn);
+      checkRetryable(e, "markCleanedForAborts(" + info + ")");
+      throw new MetaException("Unable to connect to transaction database " +
+              e.getMessage());
+    } finally {
+      close(rs);
+      closeStmt(pStmt);
     }
   }
 
@@ -866,7 +936,7 @@ class CompactionTxnHandler extends TxnHandler {
    * The committed txns are left there for TXN_OPENTXN_TIMEOUT window period intentionally.
    * The reason such aborted txns exist can be that now work was done in this txn
    * (e.g. Streaming opened TransactionBatch and abandoned it w/o doing any work)
-   * or due to {@link #markCleaned(CompactionInfo)} being called.
+   * or due to {@link #markCleaned(CompactionInfo, boolean)} being called.
    */
   @Override
   @RetrySemantics.SafeToRetry
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 525747716c3..cc8f9d94a26 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -516,6 +516,19 @@ public interface TxnStore extends Configurable {
   @RetrySemantics.ReadOnly
   List<CompactionInfo> findReadyToClean(long minOpenTxnWaterMark, long retentionTime) throws MetaException;
 
+  /**
+   * Find the aborted entries in TXN_COMPONENTS which can be used to
+   * clean directories belonging to transactions in aborted state.
+   * @param abortedTimeThreshold Age of table/partition's oldest aborted transaction involving a given table
+   *                            or partition that will trigger cleanup.
+   * @param abortedThreshold Number of aborted transactions involving a given table or partition
+   *                         that will trigger cleanup.
+   * @return Information of potential abort items that needs to be cleaned.
+   * @throws MetaException
+   */
+  @RetrySemantics.ReadOnly
+  List<CompactionInfo> findReadyToCleanAborts(long abortedTimeThreshold, int abortedThreshold) throws MetaException;
+
   /**
    * Sets the cleaning start time for a particular compaction
    *
@@ -537,9 +550,10 @@ public interface TxnStore extends Configurable {
    * it has been compacted.
    *
    * @param info info on the compaction entry to remove
+   * @param isAbortOnly whether to cleanup only abort related cleanup information
    */
   @RetrySemantics.CannotRetry
-  void markCleaned(CompactionInfo info) throws MetaException;
+  void markCleaned(CompactionInfo info, boolean isAbortOnly) throws MetaException;
 
   /**
    * Mark a compaction entry as failed.  This will move it to the compaction history queue with a
@@ -584,7 +598,7 @@ public interface TxnStore extends Configurable {
   /**
    * Clean up aborted or committed transactions from txns that have no components in txn_components.  The reason such
    * txns exist can be that no work was done in this txn (e.g. Streaming opened TransactionBatch and
-   * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called,
+   * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo, boolean)} being called,
    * or the delete from the txns was delayed because of TXN_OPENTXN_TIMEOUT window.
    */
   @RetrySemantics.SafeToRetry
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 31e5b712d21..704956a6f46 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -60,30 +60,45 @@ import static org.apache.hadoop.hive.metastore.TransactionalValidationListener.D
 public class TxnUtils {
   private static final Logger LOG = LoggerFactory.getLogger(TxnUtils.class);
 
-  public static ValidTxnList createValidTxnListForCleaner(GetOpenTxnsResponse txns, long minOpenTxnGLB) {
-    long highWaterMark = minOpenTxnGLB - 1;
-    long[] abortedTxns = new long[txns.getOpen_txnsSize()];
+  /**
+   * Returns a valid txn list for cleaner.
+   * @param txns Response containing open txns list.
+   * @param minOpenTxn Minimum open txn which is min open write txn on the table in the case of abort cleanup.
+   * @param isAbortCleanup Whether the request is for abort cleanup.
+   * @return a valid txn list
+   */
+  public static ValidTxnList createValidTxnListForCleaner(GetOpenTxnsResponse txns, long minOpenTxn, boolean isAbortCleanup) {
+    long highWatermark = minOpenTxn - 1;
+    long[] exceptions = new long[txns.getOpen_txnsSize()];
     BitSet abortedBits = BitSet.valueOf(txns.getAbortedBits());
     int i = 0;
-    for(long txnId : txns.getOpen_txns()) {
-      if(txnId > highWaterMark) {
+    for (long txnId : txns.getOpen_txns()) {
+      if (txnId > highWatermark) {
         break;
       }
-      if(abortedBits.get(i)) {
-        abortedTxns[i] = txnId;
-      }
-      else {
-        assert false : JavaUtils.txnIdToString(txnId) + " is open and <= hwm:" + highWaterMark;
+      if (abortedBits.get(i)) {
+        exceptions[i] = txnId;
+      } else {
+        if (isAbortCleanup) {
+          exceptions[i] = txnId;
+        } else {
+          assert false : JavaUtils.txnIdToString(txnId) + " is open and <= hwm:" + highWatermark;
+        }
       }
       ++i;
     }
-    abortedTxns = Arrays.copyOf(abortedTxns, i);
-    BitSet bitSet = new BitSet(abortedTxns.length);
-    bitSet.set(0, abortedTxns.length);
-    //add ValidCleanerTxnList? - could be problematic for all the places that read it from
-    // string as they'd have to know which object to instantiate
-    return new ValidReadTxnList(abortedTxns, bitSet, highWaterMark, Long.MAX_VALUE);
+    exceptions = Arrays.copyOf(exceptions, i);
+    if (!isAbortCleanup) {
+      BitSet bitSet = new BitSet(exceptions.length);
+      bitSet.set(0, exceptions.length);
+      //add ValidCleanerTxnList? - could be problematic for all the places that read it from
+      // string as they'd have to know which object to instantiate
+      return new ValidReadTxnList(exceptions, bitSet, highWatermark, Long.MAX_VALUE);
+    } else {
+      return new ValidReadTxnList(exceptions, abortedBits, highWatermark, Long.MAX_VALUE);
+    }
   }
+
   /**
    * Transform a {@link org.apache.hadoop.hive.metastore.api.TableValidWriteIds} to a
    * {@link org.apache.hadoop.hive.common.ValidCompactorWriteIdList}.  This assumes that the caller intends to