You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ve...@apache.org on 2023/03/03 14:21:18 UTC

[hive] branch master updated: HIVE-27019: Split Cleaner into separate manageable modular entities (Sourabh Badhya, reviewed by Laszlo Vegh, Denys Kuzmenko)

This is an automated email from the ASF dual-hosted git repository.

veghlaci05 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 98b3861de9b HIVE-27019: Split Cleaner into separate manageable modular entities (Sourabh Badhya, reviewed by Laszlo Vegh, Denys Kuzmenko)
98b3861de9b is described below

commit 98b3861de9b7426468fe935d0f8f320fcc5122d3
Author: Sourabh Badhya <42...@users.noreply.github.com>
AuthorDate: Fri Mar 3 19:51:03 2023 +0530

    HIVE-27019: Split Cleaner into separate manageable modular entities (Sourabh Badhya, reviewed by Laszlo Vegh, Denys Kuzmenko)
---
 .../hadoop/hive/ql/txn/compactor/Cleaner.java      | 467 +++------------------
 .../hive/ql/txn/compactor/CleanupRequest.java      | 115 +++++
 .../hive/ql/txn/compactor/CompactorUtil.java       |  67 +++
 .../hadoop/hive/ql/txn/compactor/FSRemover.java    | 126 ++++++
 .../hadoop/hive/ql/txn/compactor/Initiator.java    |  19 +-
 .../ql/txn/compactor/MetaStoreCompactorThread.java |  48 +--
 .../hive/ql/txn/compactor/MetadataCache.java       |  53 +++
 .../txn/compactor/handler/CompactionCleaner.java   | 389 +++++++++++++++++
 .../hive/ql/txn/compactor/handler/TaskHandler.java |  92 ++++
 .../txn/compactor/handler/TaskHandlerFactory.java  |  49 +++
 .../hadoop/hive/ql/txn/compactor/TestCleaner.java  |  50 +--
 .../hive/ql/txn/compactor/handler/TestHandler.java | 111 +++++
 12 files changed, 1083 insertions(+), 503 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 509343041ac..2325c1527d7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -17,76 +17,27 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
-import org.apache.hadoop.hive.common.ValidReadTxnList;
-import org.apache.hadoop.hive.common.ValidTxnList;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
-import org.apache.hadoop.hive.metastore.api.DataOperationType;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
-import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
-import org.apache.hadoop.hive.metastore.api.LockRequest;
-import org.apache.hadoop.hive.metastore.api.LockResponse;
-import org.apache.hadoop.hive.metastore.api.LockState;
-import org.apache.hadoop.hive.metastore.api.LockType;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
-import org.apache.hadoop.hive.metastore.api.TxnOpenException;
-import org.apache.hadoop.hive.metastore.api.UnlockRequest;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
-import org.apache.hadoop.hive.metastore.metrics.Metrics;
 import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
-import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
-import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
-import org.apache.hadoop.hive.metastore.txn.TxnUtils;
-import org.apache.hadoop.hive.metastore.utils.FileUtils;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
-import org.apache.hadoop.hive.ql.io.AcidDirectory;
-import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.ThrowingRunnable;
+import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandler;
+import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandlerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.ValidWriteIdList;
-import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hive.common.util.Ref;
 
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
 
-import static org.apache.commons.collections4.ListUtils.subtract;
 import static org.apache.hadoop.hive.conf.Constants.COMPACTOR_CLEANER_THREAD_NAME_FORMAT;
-import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
-import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
-import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf;
-import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS;
-import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME;
-import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getIntVar;
-import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar;
-import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
 
 /**
  * A class to clean directories after compactions.  This will run in a separate thread.
@@ -97,23 +48,25 @@ public class Cleaner extends MetaStoreCompactorThread {
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
   private boolean metricsEnabled = false;
 
-  private ReplChangeManager replChangeManager;
   private ExecutorService cleanerExecutor;
+  private List<TaskHandler> cleanupHandlers;
 
   @Override
   public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
-    replChangeManager = ReplChangeManager.getInstance(conf);
     checkInterval = conf.getTimeVar(
             HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
+    metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) &&
+        MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
     cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(
             conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_THREADS_NUM),
             COMPACTOR_CLEANER_THREAD_NAME_FORMAT);
-    metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) &&
-        MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
-    boolean tableCacheOn = MetastoreConf.getBoolVar(conf,
-            MetastoreConf.ConfVars.COMPACTOR_CLEANER_TABLECACHE_ON);
-    initializeCache(tableCacheOn);
+    if (CollectionUtils.isEmpty(cleanupHandlers)) {
+      FSRemover fsRemover = new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache);
+      cleanupHandlers = TaskHandlerFactory.getInstance()
+              .getHandlers(conf, txnHandler, metadataCache,
+                      metricsEnabled, fsRemover);
+    }
   }
 
   @Override
@@ -122,11 +75,8 @@ public class Cleaner extends MetaStoreCompactorThread {
     try {
       do {
         TxnStore.MutexAPI.LockHandle handle = null;
-        invalidateMetaCache();
+        metadataCache.invalidate();
         long startedAt = -1;
-        long retentionTime = HiveConf.getBoolVar(conf, HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED)
-                ? HiveConf.getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, TimeUnit.MILLISECONDS)
-                : 0;
 
         // Make sure nothing escapes this run method and kills the metastore at large,
         // so wrap it in a big catch Throwable statement.
@@ -140,48 +90,38 @@ public class Cleaner extends MetaStoreCompactorThread {
                     HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_DURATION_UPDATE_INTERVAL, TimeUnit.MILLISECONDS),
                     new CleanerCycleUpdater(MetricsConstants.COMPACTION_CLEANER_CYCLE_DURATION, startedAt));
           }
-          long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
-          checkInterrupt();
 
-          List<CompactionInfo> readyToClean = txnHandler.findReadyToClean(minOpenTxnId, retentionTime);
-          checkInterrupt();
-
-          if (!readyToClean.isEmpty()) {
-            long minTxnIdSeenOpen = Math.min(minOpenTxnId, txnHandler.findMinTxnIdSeenOpen());
-            
-            List<CompletableFuture<Void>> cleanerList = new ArrayList<>();
-            // For checking which compaction can be cleaned we can use the minOpenTxnId
-            // However findReadyToClean will return all records that were compacted with old version of HMS
-            // where the CQ_NEXT_TXN_ID is not set. For these compactions we need to provide minTxnIdSeenOpen
-            // to the clean method, to avoid cleaning up deltas needed for running queries
-            // when min_history_level is finally dropped, than every HMS will commit compaction the new way
-            // and minTxnIdSeenOpen can be removed and minOpenTxnId can be used instead.
-            for (CompactionInfo ci : readyToClean) {
-              //Check for interruption before scheduling each compactionInfo and return if necessary
+          for (TaskHandler cleanupHandler : cleanupHandlers) {
+            try {
               checkInterrupt();
-              long cleanerWaterMark = (ci.minOpenWriteId > 0) ? ci.nextTxnId + 1 : minTxnIdSeenOpen;
-              
-              CompletableFuture<Void> asyncJob =
-                  CompletableFuture.runAsync(
-                      ThrowingRunnable.unchecked(() -> {
-                        LOG.info("Cleaning based on min open txn id: " + cleanerWaterMark);
-                        clean(ci, cleanerWaterMark, metricsEnabled);
-                      }), cleanerExecutor)
-                  .exceptionally(t -> {
-                    LOG.error("Error clearing {}", ci.getFullPartitionName(), t);
-                    return null;
-                  });
-              cleanerList.add(asyncJob);
+              List<Runnable> tasks = cleanupHandler.getTasks();
+              List<CompletableFuture<Void>> asyncTasks = new ArrayList<>();
+              for (Runnable task : tasks) {
+                CompletableFuture<Void> asyncTask = CompletableFuture.runAsync(
+                                task, cleanerExecutor)
+                        .exceptionally(t -> {
+                          LOG.error("Error clearing due to :", t);
+                          return null;
+                        });
+                asyncTasks.add(asyncTask);
+              }
+              //Use get instead of join, so we can receive InterruptedException and shutdown gracefully
+              CompletableFuture.allOf(asyncTasks.toArray(new CompletableFuture[0])).get();
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              return;
+            } catch (Throwable t) {
+              LOG.error("Caught an exception while executing RequestHandler loop : {} of compactor cleaner, {}",
+                       cleanupHandler.getClass().getName(), t.getMessage());
+              throw t;
             }
-
-            //Use get instead of join, so we can receive InterruptedException and shutdown gracefully
-            CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).get();
           }
+          checkInterrupt();
         } catch (InterruptedException e) {
-          // do not ignore interruption requests
+          Thread.currentThread().interrupt();
           return;
         } catch (Throwable t) {
-          LOG.error("Caught an exception in the main loop of compactor cleaner, " +
+          LOG.error("Caught an exception in the main loop of compactor cleaner, {}",
               StringUtils.stringifyException(t));
         } finally {
           if (handle != null) {
@@ -196,7 +136,8 @@ public class Cleaner extends MetaStoreCompactorThread {
         doPostLoopActions(System.currentTimeMillis() - startedAt);
       } while (!stop.get());
     } catch (InterruptedException ie) {
-      LOG.error("Compactor cleaner thread interrupted, exiting " +
+      Thread.currentThread().interrupt();
+      LOG.error("Compactor cleaner thread interrupted, exiting {}",
         StringUtils.stringifyException(ie));
     } finally {
       if (Thread.currentThread().isInterrupted()) {
@@ -208,326 +149,15 @@ public class Cleaner extends MetaStoreCompactorThread {
     }
   }
 
-  private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) throws MetaException {
-    LOG.info("Starting cleaning for " + ci);
-    PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
-    String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_" +
-        (ci.type != null ? ci.type.toString().toLowerCase() : null);
-    try {
-      if (metricsEnabled) {
-        perfLogger.perfLogBegin(CLASS_NAME, cleanerMetric);
-      }
-      final String location = ci.getProperty("location");
-
-      Callable<Boolean> cleanUpTask;
-      Table t = null;
-      Partition p = null;
-
-      if (location == null) {
-        t = computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci));
-        if (t == null) {
-          // The table was dropped before we got around to cleaning it.
-          LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped." +
-            idWatermark(ci));
-          txnHandler.markCleaned(ci);
-          return;
-        }
-        if (MetaStoreUtils.isNoCleanUpSet(t.getParameters())) {
-          // The table was marked no clean up true.
-          LOG.info("Skipping table " + ci.getFullTableName() + " clean up, as NO_CLEANUP set to true");
-          txnHandler.markCleaned(ci);
-          return;
-        }
-        if (ci.partName != null) {
-          p = resolvePartition(ci);
-          if (p == null) {
-            // The partition was dropped before we got around to cleaning it.
-            LOG.info("Unable to find partition " + ci.getFullPartitionName() +
-              ", assuming it was dropped." + idWatermark(ci));
-            txnHandler.markCleaned(ci);
-            return;
-          }
-          if (MetaStoreUtils.isNoCleanUpSet(p.getParameters())) {
-            // The partition was marked no clean up true.
-            LOG.info("Skipping partition " + ci.getFullPartitionName() + " clean up, as NO_CLEANUP set to true");
-            txnHandler.markCleaned(ci);
-            return;
-          }
-        }
-      }
-      txnHandler.markCleanerStart(ci);
-
-      if (t != null || ci.partName != null) {
-        String path = location == null
-            ? resolveStorageDescriptor(t, p).getLocation()
-            : location;
-        boolean dropPartition = ci.partName != null && p == null;
-        cleanUpTask = () -> removeFiles(path, minOpenTxn, ci, dropPartition);
-      } else {
-        cleanUpTask = () -> removeFiles(location, ci);
-      }
-
-      Ref<Boolean> removedFiles = Ref.from(false);
-      if (runJobAsSelf(ci.runAs)) {
-        removedFiles.value = cleanUpTask.call();
-      } else {
-        LOG.info("Cleaning as user " + ci.runAs + " for " + ci.getFullPartitionName());
-        UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs,
-            UserGroupInformation.getLoginUser());
-        try {
-          ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
-            removedFiles.value = cleanUpTask.call();
-            return null;
-          });
-        } finally {
-          try {
-            FileSystem.closeAllForUGI(ugi);
-          } catch (IOException exception) {
-            LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " +
-                ci.getFullPartitionName() + idWatermark(ci), exception);
-          }
-        }
-      }
-      if (removedFiles.value || isDynPartAbort(t, ci)) {
-        txnHandler.markCleaned(ci);
-      } else {
-        txnHandler.clearCleanerStart(ci);
-        LOG.warn("No files were removed. Leaving queue entry " + ci + " in ready for cleaning state.");
-      }
-    } catch (Exception e) {
-      LOG.error("Caught exception when cleaning, unable to complete cleaning of " + ci + " " +
-          StringUtils.stringifyException(e));
-      ci.errorMessage = e.getMessage();
-      if (metricsEnabled) {
-        Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER).inc();
-      }
-      handleCleanerAttemptFailure(ci);
-    }  finally {
-      if (metricsEnabled) {
-        perfLogger.perfLogEnd(CLASS_NAME, cleanerMetric);
-      }
-    }
-  }
-
-  private void handleCleanerAttemptFailure(CompactionInfo ci) throws MetaException {
-    long defaultRetention = getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, TimeUnit.MILLISECONDS);
-    int cleanAttempts = 0;
-    if (ci.retryRetention > 0) {
-      cleanAttempts = (int)(Math.log(ci.retryRetention / defaultRetention) / Math.log(2)) + 1;
-    }
-    if (cleanAttempts >= getIntVar(conf, HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS)) {
-      //Mark it as failed if the max attempt threshold is reached.
-      txnHandler.markFailed(ci);
-    } else {
-      //Calculate retry retention time and update record.
-      ci.retryRetention = (long)Math.pow(2, cleanAttempts) * defaultRetention;
-      txnHandler.setCleanerRetryRetentionTimeOnError(ci);
-    }
-  }
-
-  private ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo ci, ValidTxnList validTxnList)
-      throws NoSuchTxnException, MetaException {
-    List<String> tblNames = Collections.singletonList(AcidUtils.getFullTableName(ci.dbname, ci.tableName));
-    GetValidWriteIdsRequest request = new GetValidWriteIdsRequest(tblNames);
-    request.setValidTxnList(validTxnList.writeToString());
-    GetValidWriteIdsResponse rsp = txnHandler.getValidWriteIds(request);
-    // we could have no write IDs for a table if it was never written to but
-    // since we are in the Cleaner phase of compactions, there must have
-    // been some delta/base dirs
-    assert rsp != null && rsp.getTblValidWriteIdsSize() == 1;
-    ValidReaderWriteIdList validWriteIdList =
-        TxnCommonUtils.createValidReaderWriteIdList(rsp.getTblValidWriteIds().get(0));
-    /*
-     * We need to filter the obsoletes dir list, to only remove directories that were made obsolete by this compaction
-     * If we have a higher retentionTime it is possible for a second compaction to run on the same partition. Cleaning up the first compaction
-     * should not touch the newer obsolete directories to not to violate the retentionTime for those.
-     */
-    if (ci.highestWriteId < validWriteIdList.getHighWatermark()) {
-      validWriteIdList = validWriteIdList.updateHighWatermark(ci.highestWriteId);
-    }
-    return validWriteIdList;
-  }
-
-  private static boolean isDynPartAbort(Table t, CompactionInfo ci) {
-    return Optional.ofNullable(t).map(Table::getPartitionKeys).filter(pk -> pk.size() > 0).isPresent()
-        && ci.partName == null;
-  }
-
-  private static String idWatermark(CompactionInfo ci) {
-    return " id=" + ci.id;
-  }
-
-  private boolean removeFiles(String location, long minOpenTxn, CompactionInfo ci, boolean dropPartition)
-      throws Exception {
-
-    if (dropPartition) {
-      LockRequest lockRequest = createLockRequest(ci, 0, LockType.EXCL_WRITE, DataOperationType.DELETE);
-      LockResponse res = null;
-
-      try {
-        res = txnHandler.lock(lockRequest);
-        if (res.getState() == LockState.ACQUIRED) {
-          //check if partition wasn't re-created
-          if (resolvePartition(ci) == null) {
-            return removeFiles(location, ci);
-          }
-        }
-      } catch (NoSuchTxnException | TxnAbortedException e) {
-        LOG.error(e.getMessage());
-      } finally {
-        if (res != null) {
-          try {
-            txnHandler.unlock(new UnlockRequest(res.getLockid()));
-          } catch (NoSuchLockException | TxnOpenException e) {
-            LOG.error(e.getMessage());
-          }
-        }
-      }
-    }
-
-    ValidTxnList validTxnList =
-      TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenTxn);
-    //save it so that getAcidState() sees it
-    conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
-    /**
-     * {@code validTxnList} is capped by minOpenTxn so if
-     * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} sees a base/delta
-     * produced by a compactor, that means every reader that could be active right now see it
-     * as well.  That means if this base/delta shadows some earlier base/delta, the it will be
-     * used in favor of any files that it shadows.  Thus the shadowed files are safe to delete.
-     *
-     *
-     * The metadata about aborted writeIds (and consequently aborted txn IDs) cannot be deleted
-     * above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID.
-     * See {@link TxnStore#markCleaned(CompactionInfo)} for details.
-     * For example given partition P1, txnid:150 starts and sees txnid:149 as open.
-     * Say compactor runs in txnid:160, but 149 is still open and P1 has the largest resolved
-     * writeId:17.  Compactor will produce base_17_c160.
-     * Suppose txnid:149 writes delta_18_18
-     * to P1 and aborts.  Compactor can only remove TXN_COMPONENTS entries
-     * up to (inclusive) writeId:17 since delta_18_18 may be on disk (and perhaps corrupted) but
-     * not visible based on 'validTxnList' capped at minOpenTxn so it will not not be cleaned by
-     * {@link #removeFiles(String, ValidWriteIdList, CompactionInfo)} and so we must keep the
-     * metadata that says that 18 is aborted.
-     * In a slightly different case, whatever txn created delta_18 (and all other txn) may have
-     * committed by the time cleaner runs and so cleaner will indeed see delta_18_18 and remove
-     * it (since it has nothing but aborted data).  But we can't tell which actually happened
-     * in markCleaned() so make sure it doesn't delete meta above CG_CQ_HIGHEST_WRITE_ID.
-     *
-     * We could perhaps make cleaning of aborted and obsolete and remove all aborted files up
-     * to the current Min Open Write Id, this way aborted TXN_COMPONENTS meta can be removed
-     * as well up to that point which may be higher than CQ_HIGHEST_WRITE_ID.  This could be
-     * useful if there is all of a sudden a flood of aborted txns.  (For another day).
-     */
-
-    // Creating 'reader' list since we are interested in the set of 'obsolete' files
-    ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(ci, validTxnList);
-    LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
-
-    return removeFiles(location, validWriteIdList, ci);
-  }
-  /**
-   * @return true if any files were removed
-   */
-  private boolean removeFiles(String location, ValidWriteIdList writeIdList, CompactionInfo ci)
-      throws Exception {
-    Path path = new Path(location);
-    FileSystem fs = path.getFileSystem(conf);
-    
-    // Collect all of the files/dirs
-    Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots = AcidUtils.getHdfsDirSnapshotsForCleaner(fs, path);
-    AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf, writeIdList, Ref.from(false), false, 
-        dirSnapshots);
-    Table table = computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci));
-    boolean isDynPartAbort = isDynPartAbort(table, ci);
-    
-    List<Path> obsoleteDirs = getObsoleteDirs(dir, isDynPartAbort);
-    if (isDynPartAbort || dir.hasUncompactedAborts()) {
-      ci.setWriteIds(dir.hasUncompactedAborts(), dir.getAbortedWriteIds());
-    }
-    List<Path> deleted = remove(location, ci, obsoleteDirs, true, fs);
-    if (dir.getObsolete().size() > 0) {
-      AcidMetricService.updateMetricsFromCleaner(ci.dbname, ci.tableName, ci.partName, dir.getObsolete(), conf,
-          txnHandler);
-    }
-    // Make sure there are no leftovers below the compacted watermark
-    if (ci.minOpenWriteId < 0) {
-      conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString());
-    }
-    dir = AcidUtils.getAcidState(fs, path, conf, new ValidReaderWriteIdList(
-        ci.getFullTableName(), new long[0], new BitSet(), ci.highestWriteId, Long.MAX_VALUE),
-      Ref.from(false), false, dirSnapshots);
-    
-    List<Path> remained = subtract(getObsoleteDirs(dir, isDynPartAbort), deleted);
-    if (!remained.isEmpty()) {
-      LOG.warn(idWatermark(ci) + " Remained " + remained.size() +
-        " obsolete directories from " + location + ". " + getDebugInfo(remained));
-      return false;
-    }
-    LOG.debug(idWatermark(ci) + " All cleared below the watermark: " + ci.highestWriteId + " from " + location);
-    return true;
-  }
-  
-  private List<Path> getObsoleteDirs(AcidDirectory dir, boolean isDynPartAbort) {
-    List<Path> obsoleteDirs = dir.getObsolete();
-    /**
-     * add anything in 'dir'  that only has data from aborted transactions - no one should be
-     * trying to read anything in that dir (except getAcidState() that only reads the name of
-     * this dir itself)
-     * So this may run ahead of {@link CompactionInfo#highestWriteId} but it's ok (suppose there
-     * are no active txns when cleaner runs).  The key is to not delete metadata about aborted
-     * txns with write IDs > {@link CompactionInfo#highestWriteId}.
-     * See {@link TxnStore#markCleaned(CompactionInfo)}
-     */
-    obsoleteDirs.addAll(dir.getAbortedDirectories());
-    if (isDynPartAbort) {
-      // In the event of an aborted DP operation, we should only consider the aborted directories for cleanup.
-      // Including obsolete directories for partitioned tables can result in data loss.
-      obsoleteDirs = dir.getAbortedDirectories();
-    }
-    return obsoleteDirs;
-  }
-
-  private boolean removeFiles(String location, CompactionInfo ci) 
-      throws IOException, MetaException {
-    String strIfPurge = ci.getProperty("ifPurge");
-    boolean ifPurge = strIfPurge != null || Boolean.parseBoolean(ci.getProperty("ifPurge"));
-    
-    Path path = new Path(location);
-    return !remove(location, ci, Collections.singletonList(path), ifPurge,
-      path.getFileSystem(conf)).isEmpty();
+  @Override
+  public boolean isCacheEnabled() {
+    return MetastoreConf.getBoolVar(conf,
+            MetastoreConf.ConfVars.COMPACTOR_CLEANER_TABLECACHE_ON);
   }
 
-  private List<Path> remove(String location, CompactionInfo ci, List<Path> paths, boolean ifPurge, FileSystem fs)
-      throws MetaException, IOException {
-    List<Path> deleted = new ArrayList<>();
-    if (paths.size() < 1) {
-      return deleted;
-    }
-    LOG.info(idWatermark(ci) + " About to remove " + paths.size() +
-      " obsolete directories from " + location + ". " + getDebugInfo(paths));
-    boolean needCmRecycle;
-    try {
-      Database db = getMSForConf(conf).getDatabase(getDefaultCatalog(conf), ci.dbname);
-      needCmRecycle = ReplChangeManager.isSourceOfReplication(db);
-    } catch (NoSuchObjectException ex) {
-      // can not drop a database which is a source of replication
-      needCmRecycle = false;
-    }
-    for (Path dead : paths) {
-      LOG.debug("Going to delete path " + dead.toString());
-      if (needCmRecycle) {
-        replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, ifPurge);
-      }
-      if (FileUtils.moveToTrash(fs, dead, conf, ifPurge)) {
-        deleted.add(dead);
-      }
-    }
-    return deleted;
-  }
-  
-  private String getDebugInfo(List<Path> paths) {
-    return "[" + paths.stream().map(Path::getName).collect(Collectors.joining(",")) + ']';
+  @VisibleForTesting
+  public void setCleanupHandlers(List<TaskHandler> cleanupHandlers) {
+    this.cleanupHandlers = cleanupHandlers;
   }
 
   private static class CleanerCycleUpdater implements Runnable {
@@ -544,5 +174,4 @@ public class Cleaner extends MetaStoreCompactorThread {
       updateCycleDurationMetric(metric, startedAt);
     }
   }
-
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CleanupRequest.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CleanupRequest.java
new file mode 100644
index 00000000000..badfaeaf283
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CleanupRequest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.fs.Path;
+
+import java.util.List;
+
+/**
+ * A class which specifies the required information for cleanup.
+ * Objects of this class are created by request handlers.
+ * Objects from this class are passed to FSRemover for cleanup.
+ */
+public class CleanupRequest {
+  private final String location;
+  private final List<Path> obsoleteDirs;
+  private final boolean purge;
+  private final String runAs;
+  private final String dbName;
+  private final String fullPartitionName;
+
+  public CleanupRequest(CleanupRequestBuilder builder) {
+    this.location = builder.location;
+    this.obsoleteDirs = builder.obsoleteDirs;
+    this.purge = builder.purge;
+    this.runAs = builder.runAs;
+    this.dbName = builder.dbName;
+    this.fullPartitionName = builder.fullPartitionName;
+  }
+
+  public String getLocation() {
+    return location;
+  }
+
+  public List<Path> getObsoleteDirs() {
+    return obsoleteDirs;
+  }
+
+  public boolean isPurge() {
+    return purge;
+  }
+
+  public String runAs() {
+    return runAs;
+  }
+
+  public String getDbName() {
+    return dbName;
+  }
+
+  public String getFullPartitionName() {
+    return fullPartitionName;
+  }
+
+  /**
+   * A builder for generating objects of CleaningRequest.
+   */
+  public static class CleanupRequestBuilder {
+    private String location;
+    private List<Path> obsoleteDirs;
+    private boolean purge;
+    private String runAs;
+    private String dbName;
+    private String fullPartitionName;
+
+    public CleanupRequestBuilder setLocation(String location) {
+      this.location = location;
+      return this;
+    }
+
+    public CleanupRequestBuilder setObsoleteDirs(List<Path> obsoleteDirs) {
+      this.obsoleteDirs = obsoleteDirs;
+      return this;
+    }
+
+    public CleanupRequestBuilder setPurge(boolean purge) {
+      this.purge = purge;
+      return this;
+    }
+
+    public CleanupRequestBuilder setDbName(String dbName) {
+      this.dbName = dbName;
+      return this;
+    }
+
+    public CleanupRequestBuilder setRunAs(String runAs) {
+      this.runAs = runAs;
+      return this;
+    }
+
+    public CleanupRequestBuilder setFullPartitionName(String fullPartitionName) {
+      this.fullPartitionName = fullPartitionName;
+      return this;
+    }
+
+    public CleanupRequest build() {
+      return new CleanupRequest(this);
+    }
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java
index 7111b942563..ddc6a0312aa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java
@@ -17,22 +17,36 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.utils.StringableMap;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinWorkerThread;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static java.lang.String.format;
+import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
 
 public class CompactorUtil {
+  private static final Logger LOG = LoggerFactory.getLogger(CompactorUtil.class);
   public static final String COMPACTOR = "compactor";
   /**
    * List of accepted properties for defining the compactor's job queue.
@@ -100,4 +114,57 @@ public class CompactorUtil {
     return conf.getVar(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE);
   }
 
+  public static StorageDescriptor resolveStorageDescriptor(Table t, Partition p) {
+    return (p == null) ? t.getSd() : p.getSd();
+  }
+
+  public static boolean isDynPartAbort(Table t, String partName) {
+    return Optional.ofNullable(t).map(Table::getPartitionKeys).filter(pk -> !pk.isEmpty()).isPresent()
+            && partName == null;
+  }
+
+  public static List<Partition> getPartitionsByNames(HiveConf conf, String dbName, String tableName, String partName) throws MetaException {
+    try {
+      return getMSForConf(conf).getPartitionsByNames(getDefaultCatalog(conf), dbName, tableName,
+              Collections.singletonList(partName));
+    } catch (Exception e) {
+      LOG.error("Unable to get partitions by name = {}.{}.{}", dbName, tableName, partName);
+      throw new MetaException(e.toString());
+    }
+  }
+
+  public static String getDebugInfo(List<Path> paths) {
+    return "[" + paths.stream().map(Path::getName).collect(Collectors.joining(",")) + ']';
+  }
+
+  /**
+   * Determine whether to run this job as the current user or whether we need a doAs to switch
+   * users.
+   * @param owner of the directory we will be working in, as determined by
+   * {@link org.apache.hadoop.hive.metastore.txn.TxnUtils#findUserToRunAs(String, Table, Configuration)}
+   * @return true if the job should run as the current user, false if a doAs is needed.
+   */
+  public static boolean runJobAsSelf(String owner) {
+    return (owner.equals(System.getProperty("user.name")));
+  }
+
+  public static List<Path> getObsoleteDirs(AcidDirectory dir, boolean isDynPartAbort) {
+    List<Path> obsoleteDirs = dir.getObsolete();
+    /*
+     * add anything in 'dir'  that only has data from aborted transactions - no one should be
+     * trying to read anything in that dir (except getAcidState() that only reads the name of
+     * this dir itself)
+     * So this may run ahead of {@link CompactionInfo#highestWriteId} but it's ok (suppose there
+     * are no active txns when cleaner runs).  The key is to not delete metadata about aborted
+     * txns with write IDs > {@link CompactionInfo#highestWriteId}.
+     * See {@link TxnStore#markCleaned(CompactionInfo)}
+     */
+    obsoleteDirs.addAll(dir.getAbortedDirectories());
+    if (isDynPartAbort) {
+      // In the event of an aborted DP operation, we should only consider the aborted directories for cleanup.
+      // Including obsolete directories for partitioned tables can result in data loss.
+      obsoleteDirs = dir.getAbortedDirectories();
+    }
+    return obsoleteDirs;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/FSRemover.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/FSRemover.java
new file mode 100644
index 00000000000..e59a5490206
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/FSRemover.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.utils.FileUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.Ref;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
+
+/**
+ * A runnable class which takes in cleaningRequestHandler and cleaning request and deletes the files
+ * according to the cleaning request.
+ */
+public class FSRemover {
+  private static final Logger LOG = LoggerFactory.getLogger(FSRemover.class);
+  private final HiveConf conf;
+  private final ReplChangeManager replChangeManager;
+  private final MetadataCache metadataCache;
+
+  public FSRemover(HiveConf conf, ReplChangeManager replChangeManager, MetadataCache metadataCache) {
+    this.conf = conf;
+    this.replChangeManager = replChangeManager;
+    this.metadataCache = metadataCache;
+  }
+
+  public List<Path> clean(CleanupRequest cr) throws MetaException {
+    Ref<List<Path>> removedFiles = Ref.from(new ArrayList<>());
+    try {
+      Callable<List<Path>> cleanUpTask;
+      cleanUpTask = () -> removeFiles(cr);
+
+      if (CompactorUtil.runJobAsSelf(cr.runAs())) {
+        removedFiles.value = cleanUpTask.call();
+      } else {
+        LOG.info("Cleaning as user {} for {}", cr.runAs(), cr.getFullPartitionName());
+        UserGroupInformation ugi = UserGroupInformation.createProxyUser(cr.runAs(),
+                UserGroupInformation.getLoginUser());
+        try {
+          ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+            removedFiles.value = cleanUpTask.call();
+            return null;
+          });
+        } finally {
+          try {
+            FileSystem.closeAllForUGI(ugi);
+          } catch (IOException exception) {
+            LOG.error("Could not clean up file-system handles for UGI: {} for {}",
+                    ugi, cr.getFullPartitionName(), exception);
+          }
+        }
+      }
+    } catch (Exception ex) {
+      LOG.error("Caught exception when cleaning, unable to complete cleaning of {} due to {}", cr,
+              StringUtils.stringifyException(ex));
+    }
+    return removedFiles.value;
+  }
+
+  /**
+   * @param cr Cleaning request
+   * @return List of deleted files if any files were removed
+   */
+  private List<Path> removeFiles(CleanupRequest cr)
+          throws MetaException, IOException {
+    List<Path> deleted = new ArrayList<>();
+    if (cr.getObsoleteDirs().isEmpty()) {
+      return deleted;
+    }
+    LOG.info("About to remove {} obsolete directories from {}. {}", cr.getObsoleteDirs().size(),
+            cr.getLocation(), CompactorUtil.getDebugInfo(cr.getObsoleteDirs()));
+    boolean needCmRecycle;
+    try {
+      Database db = metadataCache.computeIfAbsent(cr.getDbName(), () -> getMSForConf(conf).getDatabase(getDefaultCatalog(conf), cr.getDbName()));
+      needCmRecycle = ReplChangeManager.isSourceOfReplication(db);
+    } catch (NoSuchObjectException ex) {
+      // can not drop a database which is a source of replication
+      needCmRecycle = false;
+    } catch (Exception ex) {
+      throw new MetaException(ex.getMessage());
+    }
+    FileSystem fs = new Path(cr.getLocation()).getFileSystem(conf);
+    for (Path dead : cr.getObsoleteDirs()) {
+      LOG.debug("Going to delete path: {}", dead);
+      if (needCmRecycle) {
+        replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, cr.isPurge());
+      }
+      if (FileUtils.moveToTrash(fs, dead, conf, cr.isPurge())) {
+        deleted.add(dead);
+      }
+    }
+    return deleted;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index ee7727b69ba..284a6ffca15 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -149,7 +149,7 @@ public class Initiator extends MetaStoreCompactorThread {
 
           // Currently we invalidate all entries after each cycle, because the bootstrap replication is marked via
           // table property hive.repl.first.inc.pending which would be cached.
-          invalidateMetaCache();
+          metadataCache.invalidate();
           Set<String> skipDBs = Sets.newConcurrentHashSet();
           Set<String> skipTables = Sets.newConcurrentHashSet();
 
@@ -179,7 +179,7 @@ public class Initiator extends MetaStoreCompactorThread {
                 return;
               }
 
-              Table t = computeIfAbsent(ci.getFullTableName(),() -> resolveTable(ci));
+              Table t = metadataCache.computeIfAbsent(ci.getFullTableName(),() -> resolveTable(ci));
               String poolName = getPoolName(ci, t);
               Partition p = resolvePartition(ci);
               if (p == null && ci.partName != null) {
@@ -246,6 +246,12 @@ public class Initiator extends MetaStoreCompactorThread {
     }
   }
 
+  @Override
+  public boolean isCacheEnabled() {
+    return MetastoreConf.getBoolVar(conf,
+            MetastoreConf.ConfVars.COMPACTOR_INITIATOR_TABLECACHE_ON);
+  }
+
   private void scheduleCompactionIfRequired(CompactionInfo ci, Table t, Partition p, String poolName,
                                             String runAs, boolean metricsEnabled)
       throws MetaException {
@@ -280,7 +286,7 @@ public class Initiator extends MetaStoreCompactorThread {
     Map<String, String> params = t.getParameters();
     String poolName = params == null ? null : params.get(Constants.HIVE_COMPACTOR_WORKER_POOL);
     if (StringUtils.isBlank(poolName)) {
-      params = computeIfAbsent(ci.dbname, () -> resolveDatabase(ci)).getParameters();
+      params = metadataCache.computeIfAbsent(ci.dbname, () -> resolveDatabase(ci)).getParameters();
       poolName = params == null ? null : params.get(Constants.HIVE_COMPACTOR_WORKER_POOL);
     }
     return poolName;
@@ -329,9 +335,6 @@ public class Initiator extends MetaStoreCompactorThread {
     compactionExecutor = CompactorUtil.createExecutorWithThreadFactory(
             conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_REQUEST_QUEUE),
             COMPACTOR_INTIATOR_THREAD_NAME_FORMAT);
-    boolean tableCacheOn = MetastoreConf.getBoolVar(conf,
-        MetastoreConf.ConfVars.COMPACTOR_INITIATOR_TABLECACHE_ON);
-    initializeCache(tableCacheOn);
     metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) &&
         MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
   }
@@ -593,7 +596,7 @@ public class Initiator extends MetaStoreCompactorThread {
         return false;
       }
 
-      Table t = computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci));
+      Table t = metadataCache.computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci));
       if (t == null) {
         LOG.info("Can't find table " + ci.getFullTableName() + ", assuming it's a temp " +
             "table or has been dropped and moving on.");
@@ -605,7 +608,7 @@ public class Initiator extends MetaStoreCompactorThread {
         return false;
       }
 
-      Map<String, String> dbParams = computeIfAbsent(ci.dbname, () -> resolveDatabase(ci)).getParameters();
+      Map<String, String> dbParams = metadataCache.computeIfAbsent(ci.dbname, () -> resolveDatabase(ci)).getParameters();
       if (MetaStoreUtils.isNoAutoCompactSet(dbParams, t.getParameters())) {
         if (Boolean.parseBoolean(MetaStoreUtils.getNoAutoCompact(dbParams))) {
           skipDBs.add(ci.dbname);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
index 1db2e539dcb..d7871b47054 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.hive.metastore.MetaStoreThread;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -32,17 +30,12 @@ import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
-import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 
-import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf;
@@ -53,12 +46,11 @@ import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCa
  * Compactor threads that runs in the metastore. It uses a {@link TxnStore}
  * to access the internal database.
  */
-public class MetaStoreCompactorThread extends CompactorThread implements MetaStoreThread {
+public abstract class MetaStoreCompactorThread extends CompactorThread implements MetaStoreThread {
 
   protected TxnStore txnHandler;
   protected ScheduledExecutorService cycleUpdaterExecutorService;
-
-  private Optional<Cache<String, TBase>> metaCache = Optional.empty();
+  protected MetadataCache metadataCache;
 
   @Override
   public void init(AtomicBoolean stop) throws Exception {
@@ -66,6 +58,7 @@ public class MetaStoreCompactorThread extends CompactorThread implements MetaSto
 
     // Get our own instance of the transaction handler
     txnHandler = TxnUtils.getTxnStore(conf);
+    metadataCache = new MetadataCache(isCacheEnabled());
     // Initialize the RawStore, with the flag marked as true. Since its stored as a ThreadLocal variable in the
     // HMSHandlerContext, it will use the compactor related pool.
     MetastoreConf.setBoolVar(conf, COMPACTOR_USE_CUSTOM_POOL, true);
@@ -97,18 +90,11 @@ public class MetaStoreCompactorThread extends CompactorThread implements MetaSto
   }
 
   @Override List<Partition> getPartitionsByNames(CompactionInfo ci) throws MetaException {
-    try {
-      return getMSForConf(conf).getPartitionsByNames(getDefaultCatalog(conf), ci.dbname, ci.tableName,
-          Collections.singletonList(ci.partName));
-    } catch (MetaException e) {
-      LOG.error("Unable to get partitions by name for CompactionInfo=" + ci);
-      throw e;
-    } catch (NoSuchObjectException e) {
-      LOG.error("Unable to get partitions by name for CompactionInfo=" + ci);
-      throw new MetaException(e.toString());
-    }
+    return CompactorUtil.getPartitionsByNames(conf, ci.dbname, ci.tableName, ci.partName);
   }
 
+  public abstract boolean isCacheEnabled();
+
   protected void startCycleUpdater(long updateInterval, Runnable taskToRun) {
     if (cycleUpdaterExecutorService == null) {
       if (updateInterval > 0) {
@@ -142,26 +128,4 @@ public class MetaStoreCompactorThread extends CompactorThread implements MetaSto
     return 0;
   }
 
-  <T extends TBase<T,?>> T computeIfAbsent(String key, Callable<T> callable) throws Exception {
-    if (metaCache.isPresent()) {
-      try {
-        return (T) metaCache.get().get(key, callable);
-      } catch (ExecutionException e) {
-        throw (Exception) e.getCause();
-      }
-    }
-    return callable.call();
-  }
-
-  Optional<Cache<String, TBase>> initializeCache(boolean tableCacheOn) {
-    if (tableCacheOn) {
-      metaCache = Optional.of(CacheBuilder.newBuilder().softValues().build());
-    }
-    return metaCache;
-  }
-
-  void invalidateMetaCache(){
-    metaCache.ifPresent(Cache::invalidateAll);
-  }
-
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetadataCache.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetadataCache.java
new file mode 100644
index 00000000000..a09491b4e2f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetadataCache.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import org.apache.thrift.TBase;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+public class MetadataCache {
+
+  private Cache<String, TBase> metaCache;
+
+  public MetadataCache(boolean tableCacheOn) {
+    if (tableCacheOn) {
+      metaCache = CacheBuilder.newBuilder().softValues().build();
+    }
+  }
+
+  public <T extends TBase<T,?>> T computeIfAbsent(String key, Callable<T> callable) throws Exception {
+    if (metaCache != null) {
+      try {
+        return (T) metaCache.get(key, callable);
+      } catch (ExecutionException e) {
+        throw (Exception) e.getCause();
+      }
+    }
+    return callable.call();
+  }
+
+  public void invalidate() {
+    if (metaCache != null) {
+      metaCache.invalidateAll();
+    }
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
new file mode 100644
index 00000000000..a352dca4dc5
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor.handler;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.LockRequestBuilder;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnOpenException;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
+import org.apache.hadoop.hive.metastore.metrics.Metrics;
+import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
+import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.txn.compactor.CleanupRequest;
+import org.apache.hadoop.hive.ql.txn.compactor.CleanupRequest.CleanupRequestBuilder;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.ThrowingRunnable;
+import org.apache.hadoop.hive.ql.txn.compactor.FSRemover;
+import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
+import org.apache.hive.common.util.Ref;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.commons.collections4.ListUtils.subtract;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getIntVar;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar;
+import static java.util.Objects.isNull;
+
+/**
+ * A compaction based implementation of RequestHandler.
+ * Provides implementation of creation of compaction clean tasks.
+ */
+class CompactionCleaner extends TaskHandler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CompactionCleaner.class.getName());
+
+  public CompactionCleaner(HiveConf conf, TxnStore txnHandler,
+                                MetadataCache metadataCache, boolean metricsEnabled,
+                                FSRemover fsRemover) {
+    super(conf, txnHandler, metadataCache, metricsEnabled, fsRemover);
+  }
+
+  @Override
+  public List<Runnable> getTasks() throws MetaException {
+    long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+    long retentionTime = HiveConf.getBoolVar(conf, HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED)
+            ? HiveConf.getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, TimeUnit.MILLISECONDS)
+            : 0;
+    List<CompactionInfo> readyToClean = txnHandler.findReadyToClean(minOpenTxnId, retentionTime);
+    if (!readyToClean.isEmpty()) {
+      long minTxnIdSeenOpen = txnHandler.findMinTxnIdSeenOpen();
+      // For checking which compaction can be cleaned we can use the minOpenTxnId
+      // However findReadyToClean will return all records that were compacted with old version of HMS
+      // where the CQ_NEXT_TXN_ID is not set. For these compactions we need to provide minTxnIdSeenOpen
+      // to the clean method, to avoid cleaning up deltas needed for running queries
+      // when min_history_level is finally dropped, than every HMS will commit compaction the new way
+      // and minTxnIdSeenOpen can be removed and minOpenTxnId can be used instead.
+      return readyToClean.stream().map(ci -> {
+        long cleanerWaterMark = (ci.minOpenWriteId > 0) ? ci.nextTxnId + 1 : minTxnIdSeenOpen;
+        LOG.info("Cleaning based on min open txn id: {}", cleanerWaterMark);
+        return ThrowingRunnable.unchecked(() -> clean(ci, cleanerWaterMark, metricsEnabled));
+      }).collect(Collectors.toList());
+    }
+    return Collections.emptyList();
+  }
+
+  private void clean(CompactionInfo ci, long minOpenTxnGLB, boolean metricsEnabled) throws MetaException {
+    LOG.info("Starting cleaning for {}", ci);
+    PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
+    String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_" +
+            (!isNull(ci.type) ? ci.type.toString().toLowerCase() : null);
+    try {
+      if (metricsEnabled) {
+        perfLogger.perfLogBegin(CompactionCleaner.class.getName(), cleanerMetric);
+      }
+      final String location = ci.getProperty("location");
+
+      Table t = null;
+      Partition p = null;
+
+      if (isNull(location)) {
+        t = metadataCache.computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci.dbname, ci.tableName));
+        if (isNull(t)) {
+          // The table was dropped before we got around to cleaning it.
+          LOG.info("Unable to find table {}, assuming it was dropped. {}", ci.getFullTableName(),
+                  idWatermark(ci));
+          txnHandler.markCleaned(ci);
+          return;
+        }
+        if (MetaStoreUtils.isNoCleanUpSet(t.getParameters())) {
+          // The table was marked no clean up true.
+          LOG.info("Skipping table {} clean up, as NO_CLEANUP set to true", ci.getFullTableName());
+          txnHandler.markCleaned(ci);
+          return;
+        }
+        if (!isNull(ci.partName)) {
+          p = resolvePartition(ci.dbname, ci.tableName, ci.partName);
+          if (isNull(p)) {
+            // The partition was dropped before we got around to cleaning it.
+            LOG.info("Unable to find partition {}, assuming it was dropped. {}",
+                    ci.getFullPartitionName(), idWatermark(ci));
+            txnHandler.markCleaned(ci);
+            return;
+          }
+          if (MetaStoreUtils.isNoCleanUpSet(p.getParameters())) {
+            // The partition was marked no clean up true.
+            LOG.info("Skipping partition {} clean up, as NO_CLEANUP set to true", ci.getFullPartitionName());
+            txnHandler.markCleaned(ci);
+            return;
+          }
+        }
+      }
+      txnHandler.markCleanerStart(ci);
+
+      if (!isNull(t) || !isNull(ci.partName)) {
+        String path = isNull(location)
+                ? CompactorUtil.resolveStorageDescriptor(t, p).getLocation()
+                : location;
+        boolean dropPartition = !isNull(ci.partName) && isNull(p);
+
+        //check if partition wasn't re-created
+        if (dropPartition && isNull(resolvePartition(ci.dbname, ci.tableName, ci.partName))) {
+          cleanUsingLocation(ci, path, true);
+        } else {
+          cleanUsingAcidDir(ci, path, minOpenTxnGLB);
+        }
+      } else {
+        cleanUsingLocation(ci, location, false);
+      }
+    } catch (Exception e) {
+      LOG.error("Caught exception when cleaning, unable to complete cleaning of {} due to {}", ci,
+              e.getMessage());
+      ci.errorMessage = e.getMessage();
+      if (metricsEnabled) {
+        Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER).inc();
+      }
+      handleCleanerAttemptFailure(ci);
+    }  finally {
+      if (metricsEnabled) {
+        perfLogger.perfLogEnd(CompactionCleaner.class.getName(), cleanerMetric);
+      }
+    }
+  }
+
+  private void cleanUsingLocation(CompactionInfo ci, String path, boolean requiresLock) throws MetaException {
+    List<Path> deleted;
+    if (requiresLock) {
+      LockRequest lockRequest = createLockRequest(ci);
+      LockResponse res = null;
+      try {
+        res = txnHandler.lock(lockRequest);
+        deleted = fsRemover.clean(getCleaningRequestBasedOnLocation(ci, path));
+      } catch (NoSuchTxnException | TxnAbortedException e) {
+        LOG.error("Error while trying to acquire exclusive write lock: {}", e.getMessage());
+        throw new MetaException(e.getMessage());
+      } finally {
+        if (res != null) {
+          try {
+            txnHandler.unlock(new UnlockRequest(res.getLockid()));
+          } catch (NoSuchLockException | TxnOpenException e) {
+            LOG.error("Error while trying to release exclusive write lock: {}", e.getMessage());
+          }
+        }
+      }
+    } else {
+      deleted = fsRemover.clean(getCleaningRequestBasedOnLocation(ci, path));
+    }
+    if (!deleted.isEmpty()) {
+      txnHandler.markCleaned(ci);
+    } else {
+      txnHandler.clearCleanerStart(ci);
+    }
+  }
+
+  private void cleanUsingAcidDir(CompactionInfo ci, String location, long minOpenTxnGLB) throws Exception {
+    ValidTxnList validTxnList =
+            TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenTxnGLB);
+    //save it so that getAcidState() sees it
+    conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+    /*
+     * {@code validTxnList} is capped by minOpenTxnGLB so if
+     * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} sees a base/delta
+     * produced by a compactor, that means every reader that could be active right now see it
+     * as well.  That means if this base/delta shadows some earlier base/delta, it will be
+     * used in favor of any files that it shadows. Thus, the shadowed files are safe to delete.
+     *
+     *
+     * The metadata about aborted writeIds (and consequently aborted txn IDs) cannot be deleted
+     * above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID.
+     * See {@link TxnStore#markCleaned(CompactionInfo)} for details.
+     * For example given partition P1, txnid:150 starts and sees txnid:149 as open.
+     * Say compactor runs in txnid:160, but 149 is still open and P1 has the largest resolved
+     * writeId:17.  Compactor will produce base_17_c160.
+     * Suppose txnid:149 writes delta_18_18
+     * to P1 and aborts.  Compactor can only remove TXN_COMPONENTS entries
+     * up to (inclusive) writeId:17 since delta_18_18 may be on disk (and perhaps corrupted) but
+     * not visible based on 'validTxnList' capped at minOpenTxn, so it will  not be cleaned by
+     * {@link #removeFiles(String, ValidWriteIdList, CompactionInfo)} and so we must keep the
+     * metadata that says that 18 is aborted.
+     * In a slightly different case, whatever txn created delta_18 (and all other txn) may have
+     * committed by the time cleaner runs and so cleaner will indeed see delta_18_18 and remove
+     * it (since it has nothing but aborted data).  But we can't tell which actually happened
+     * in markCleaned() so make sure it doesn't delete meta above CG_CQ_HIGHEST_WRITE_ID.
+     *
+     * We could perhaps make cleaning of aborted and obsolete and remove all aborted files up
+     * to the current Min Open Write Id, this way aborted TXN_COMPONENTS meta can be removed
+     * as well up to that point which may be higher than CQ_HIGHEST_WRITE_ID.  This could be
+     * useful if there is all of a sudden a flood of aborted txns.  (For another day).
+     */
+
+    // Creating 'reader' list since we are interested in the set of 'obsolete' files
+    ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(ci, validTxnList);
+    LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
+
+    Path path = new Path(location);
+    FileSystem fs = path.getFileSystem(conf);
+
+    // Collect all the files/dirs
+    Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots = AcidUtils.getHdfsDirSnapshotsForCleaner(fs, path);
+    AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf, validWriteIdList, Ref.from(false), false,
+            dirSnapshots);
+    Table table = metadataCache.computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci.dbname, ci.tableName));
+    boolean isDynPartAbort = CompactorUtil.isDynPartAbort(table, ci.partName);
+
+    List<Path> obsoleteDirs = CompactorUtil.getObsoleteDirs(dir, isDynPartAbort);
+    if (isDynPartAbort || dir.hasUncompactedAborts()) {
+      ci.setWriteIds(dir.hasUncompactedAborts(), dir.getAbortedWriteIds());
+    }
+
+    List<Path> deleted = fsRemover.clean(new CleanupRequestBuilder().setLocation(location)
+            .setDbName(ci.dbname).setFullPartitionName(ci.getFullPartitionName())
+            .setRunAs(ci.runAs).setObsoleteDirs(obsoleteDirs).setPurge(true)
+            .build());
+
+    if (!deleted.isEmpty()) {
+      AcidMetricService.updateMetricsFromCleaner(ci.dbname, ci.tableName, ci.partName, dir.getObsolete(), conf,
+              txnHandler);
+    }
+
+    // Make sure there are no leftovers below the compacted watermark
+    boolean success = false;
+    conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString());
+    dir = AcidUtils.getAcidState(fs, path, conf, new ValidReaderWriteIdList(
+                    ci.getFullTableName(), new long[0], new BitSet(), ci.highestWriteId, Long.MAX_VALUE),
+            Ref.from(false), false, dirSnapshots);
+
+    List<Path> remained = subtract(CompactorUtil.getObsoleteDirs(dir, isDynPartAbort), deleted);
+    if (!remained.isEmpty()) {
+      LOG.warn("{} Remained {} obsolete directories from {}. {}",
+              idWatermark(ci), remained.size(), location, CompactorUtil.getDebugInfo(remained));
+    } else {
+      LOG.debug("{} All cleared below the watermark: {} from {}", idWatermark(ci), ci.highestWriteId, location);
+      success = true;
+    }
+    if (success || CompactorUtil.isDynPartAbort(table, ci.partName)) {
+      txnHandler.markCleaned(ci);
+    } else {
+      txnHandler.clearCleanerStart(ci);
+      LOG.warn("No files were removed. Leaving queue entry {} in ready for cleaning state.", ci);
+    }
+  }
+
+  protected LockRequest createLockRequest(CompactionInfo ci) {
+    String agentInfo = Thread.currentThread().getName();
+    LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo);
+    requestBuilder.setUser(ci.runAs);
+    requestBuilder.setTransactionId(0);
+
+    LockComponentBuilder lockCompBuilder = new LockComponentBuilder()
+            .setLock(LockType.EXCL_WRITE)
+            .setOperationType(DataOperationType.DELETE)
+            .setDbName(ci.dbname)
+            .setTableName(ci.tableName)
+            .setIsTransactional(true);
+
+    if (ci.partName != null) {
+      lockCompBuilder.setPartitionName(ci.partName);
+    }
+    requestBuilder.addLockComponent(lockCompBuilder.build());
+
+    requestBuilder.setZeroWaitReadEnabled(!conf.getBoolVar(HiveConf.ConfVars.TXN_OVERWRITE_X_LOCK) ||
+            !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK));
+    return requestBuilder.build();
+  }
+
+  private static String idWatermark(CompactionInfo ci) {
+    return " id=" + ci.id;
+  }
+
+  private ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo ci, ValidTxnList validTxnList)
+          throws NoSuchTxnException, MetaException {
+    List<String> tblNames = Collections.singletonList(AcidUtils.getFullTableName(ci.dbname, ci.tableName));
+    GetValidWriteIdsRequest request = new GetValidWriteIdsRequest(tblNames);
+    request.setValidTxnList(validTxnList.writeToString());
+    GetValidWriteIdsResponse rsp = txnHandler.getValidWriteIds(request);
+    // we could have no write IDs for a table if it was never written to but
+    // since we are in the Cleaner phase of compactions, there must have
+    // been some delta/base dirs
+    assert rsp != null && rsp.getTblValidWriteIdsSize() == 1;
+    ValidReaderWriteIdList validWriteIdList =
+            TxnCommonUtils.createValidReaderWriteIdList(rsp.getTblValidWriteIds().get(0));
+    /*
+     * We need to filter the obsoletes dir list, to only remove directories that were made obsolete by this compaction
+     * If we have a higher retentionTime it is possible for a second compaction to run on the same partition. Cleaning up the first compaction
+     * should not touch the newer obsolete directories to not violate the retentionTime for those.
+     */
+    if (ci.highestWriteId < validWriteIdList.getHighWatermark()) {
+      validWriteIdList = validWriteIdList.updateHighWatermark(ci.highestWriteId);
+    }
+    return validWriteIdList;
+  }
+
+  private void handleCleanerAttemptFailure(CompactionInfo ci) throws MetaException {
+    long defaultRetention = getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, TimeUnit.MILLISECONDS);
+    int cleanAttempts = 0;
+    if (ci.retryRetention > 0) {
+      cleanAttempts = (int)(Math.log(ci.retryRetention / defaultRetention) / Math.log(2)) + 1;
+    }
+    if (cleanAttempts >= getIntVar(conf, HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS)) {
+      //Mark it as failed if the max attempt threshold is reached.
+      txnHandler.markFailed(ci);
+    } else {
+      //Calculate retry retention time and update record.
+      ci.retryRetention = (long)Math.pow(2, cleanAttempts) * defaultRetention;
+      txnHandler.setCleanerRetryRetentionTimeOnError(ci);
+    }
+  }
+
+  private CleanupRequest getCleaningRequestBasedOnLocation(CompactionInfo ci, String location) {
+    String strIfPurge = ci.getProperty("ifPurge");
+    boolean ifPurge = strIfPurge != null || Boolean.parseBoolean(ci.getProperty("ifPurge"));
+
+    Path obsoletePath = new Path(location);
+    return new CleanupRequestBuilder()
+            .setLocation(location).setDbName(ci.dbname).setFullPartitionName(ci.getFullPartitionName())
+            .setRunAs(ci.runAs).setPurge(ifPurge).setObsoleteDirs(Collections.singletonList(obsoletePath))
+            .build();
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java
new file mode 100644
index 00000000000..d804a318e54
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor.handler;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
+import org.apache.hadoop.hive.ql.txn.compactor.FSRemover;
+import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
+
+/**
+ * An abstract class which defines the list of utility methods for performing cleanup activities.
+ */
+public abstract class TaskHandler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskHandler.class.getName());
+  protected final TxnStore txnHandler;
+  protected final HiveConf conf;
+  protected final boolean metricsEnabled;
+  protected final MetadataCache metadataCache;
+  protected final FSRemover fsRemover;
+
+  TaskHandler(HiveConf conf, TxnStore txnHandler, MetadataCache metadataCache,
+                         boolean metricsEnabled, FSRemover fsRemover) {
+    this.conf = conf;
+    this.txnHandler = txnHandler;
+    this.metadataCache = metadataCache;
+    this.metricsEnabled = metricsEnabled;
+    this.fsRemover = fsRemover;
+  }
+
+  public abstract List<Runnable> getTasks() throws MetaException;
+
+  protected Table resolveTable(String dbName, String tableName) throws MetaException {
+    try {
+      return getMSForConf(conf).getTable(getDefaultCatalog(conf), dbName, tableName);
+    } catch (MetaException e) {
+      LOG.error("Unable to find table {}.{}, {}", dbName, tableName, e.getMessage());
+      throw e;
+    }
+  }
+
+  protected Partition resolvePartition(String dbName, String tableName, String partName) throws MetaException {
+    if (partName != null) {
+      List<Partition> parts;
+      try {
+        parts = CompactorUtil.getPartitionsByNames(conf, dbName, tableName, partName);
+        if (parts == null || parts.isEmpty()) {
+          // The partition got dropped before we went looking for it.
+          return null;
+        }
+      } catch (Exception e) {
+        LOG.error("Unable to find partition: {}.{}.{}", dbName, tableName, partName, e);
+        throw e;
+      }
+      if (parts.size() != 1) {
+        LOG.error("{}.{}.{} does not refer to a single partition. {}", dbName, tableName, partName,
+                Arrays.toString(parts.toArray()));
+        throw new MetaException(String.join("Too many partitions for : ", dbName, tableName, partName));
+      }
+      return parts.get(0);
+    } else {
+      return null;
+    }
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandlerFactory.java
new file mode 100644
index 00000000000..79293a22c26
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandlerFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor.handler;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.ql.txn.compactor.FSRemover;
+import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * A factory class to fetch handlers.
+ */
+public class TaskHandlerFactory {
+  private static final TaskHandlerFactory INSTANCE = new TaskHandlerFactory();
+
+  public static TaskHandlerFactory getInstance() {
+    return INSTANCE;
+  }
+
+  /**
+   * Factory class, no need to expose constructor.
+   */
+  private TaskHandlerFactory() {
+  }
+
+  public List<TaskHandler> getHandlers(HiveConf conf, TxnStore txnHandler, MetadataCache metadataCache,
+                                                  boolean metricsEnabled, FSRemover fsRemover) {
+    return Arrays.asList(new CompactionCleaner(conf, txnHandler, metadataCache,
+            metricsEnabled, fsRemover));
+  }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index 8812c0ca44f..82f052fe362 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
 import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
 import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
@@ -41,6 +42,8 @@ import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandler;
+import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandlerFactory;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -58,7 +61,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.mockito.Mockito;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
 import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME;
@@ -69,7 +71,6 @@ import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 
 /**
  * Tests for the compactor Cleaner thread
@@ -102,6 +103,10 @@ public class TestCleaner extends CompactorTest {
 
     //Prevent cleaner from marking the compaction as cleaned
     TxnStore mockedHandler = spy(txnHandler);
+    MetadataCache metadataCache = new MetadataCache(true);
+    FSRemover fsRemover = new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache);
+    List<TaskHandler> taskHandlers = TaskHandlerFactory.getInstance()
+            .getHandlers(conf, mockedHandler, metadataCache, false, fsRemover);
     doThrow(new RuntimeException(errorMessage)).when(mockedHandler).markCleaned(nullable(CompactionInfo.class));
 
     Table t = newTable("default", "retry_test", false);
@@ -125,6 +130,7 @@ public class TestCleaner extends CompactorTest {
     for (int i = 1; i < 4; i++) {
       Cleaner cleaner = new Cleaner();
       cleaner.setConf(conf);
+      cleaner.setCleanupHandlers(taskHandlers);
       cleaner.init(new AtomicBoolean(true));
       FieldSetter.setField(cleaner, MetaStoreCompactorThread.class.getDeclaredField("txnHandler"), mockedHandler);
 
@@ -151,6 +157,7 @@ public class TestCleaner extends CompactorTest {
     //Do a final run to reach the maximum retry attempts, so the state finally should be set to failed
     Cleaner cleaner = new Cleaner();
     cleaner.setConf(conf);
+    cleaner.setCleanupHandlers(taskHandlers);
     cleaner.init(new AtomicBoolean(true));
     FieldSetter.setField(cleaner, MetaStoreCompactorThread.class.getDeclaredField("txnHandler"), mockedHandler);
 
@@ -184,11 +191,16 @@ public class TestCleaner extends CompactorTest {
 
     //Prevent cleaner from marking the compaction as cleaned
     TxnStore mockedHandler = spy(txnHandler);
+    MetadataCache metadataCache = new MetadataCache(true);
+    FSRemover fsRemover = new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache);
+    List<TaskHandler> taskHandlers = TaskHandlerFactory.getInstance()
+            .getHandlers(conf, mockedHandler, metadataCache, false, fsRemover);
     doThrow(new RuntimeException()).when(mockedHandler).markCleaned(nullable(CompactionInfo.class));
 
     //Do a run to fail the clean and set the retention time
     Cleaner cleaner = new Cleaner();
     cleaner.setConf(conf);
+    cleaner.setCleanupHandlers(taskHandlers);
     cleaner.init(new AtomicBoolean(true));
     FieldSetter.setField(cleaner, MetaStoreCompactorThread.class.getDeclaredField("txnHandler"), mockedHandler);
 
@@ -204,6 +216,7 @@ public class TestCleaner extends CompactorTest {
     //Do a final run and check if the compaction is not picked up again
     cleaner = new Cleaner();
     cleaner.setConf(conf);
+    cleaner.setCleanupHandlers(taskHandlers);
     cleaner.init(new AtomicBoolean(true));
     FieldSetter.setField(cleaner, MetaStoreCompactorThread.class.getDeclaredField("txnHandler"), mockedHandler);
 
@@ -744,7 +757,7 @@ public class TestCleaner extends CompactorTest {
     rqst.setPartitionname(partName);
     long compactTxn = compactInTxn(rqst);
     addDeltaFile(t, p, 21, 22, 2);
-    
+
     txnHandler.addWriteIdsToMinHistory(1, Collections.singletonMap("default.trfcp", 23L));
     startCleaner();
 
@@ -1104,37 +1117,6 @@ public class TestCleaner extends CompactorTest {
     Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState());
   }
 
-  @Test
-  public void testMetaCache() throws Exception {
-    conf.setBoolVar(HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED, false);
-
-    Table t = newTable("default", "retry_test", false);
-
-    addBaseFile(t, null, 20L, 20);
-    addDeltaFile(t, null, 21L, 22L, 2);
-    addDeltaFile(t, null, 23L, 24L, 2);
-    burnThroughTransactions("default", "retry_test", 25);
-
-    CompactionRequest rqst = new CompactionRequest("default", "retry_test", CompactionType.MAJOR);
-    long compactTxn = compactInTxn(rqst);
-    addBaseFile(t, null, 25L, 25, compactTxn);
-
-    //Prevent cleaner from marking the compaction as cleaned
-    TxnStore mockedHandler = spy(txnHandler);
-    doThrow(new RuntimeException()).when(mockedHandler).markCleaned(nullable(CompactionInfo.class));
-    Cleaner cleaner = Mockito.spy(new Cleaner());
-    cleaner.setConf(conf);
-    cleaner.init(new AtomicBoolean(true));
-    cleaner.run();
-    cleaner.run();
-
-    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-    Mockito.verify(cleaner, times(2)).computeIfAbsent(Mockito.any(),Mockito.any());
-    Mockito.verify(cleaner, times(1)).resolveTable(Mockito.any());
-  }
-
   private void allocateTableWriteId(String dbName, String tblName, long txnId) throws Exception {
     AllocateTableWriteIdsRequest awiRqst = new AllocateTableWriteIdsRequest(dbName, tblName);
     awiRqst.setTxnIds(Collections.singletonList(txnId));
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java
new file mode 100644
index 00000000000..c14da6950c4
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor.handler;
+
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.ql.txn.compactor.Cleaner;
+import org.apache.hadoop.hive.ql.txn.compactor.CleanupRequest;
+import org.apache.hadoop.hive.ql.txn.compactor.FSRemover;
+import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
+import org.apache.hadoop.hive.ql.txn.compactor.TestCleaner;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+
+public class TestHandler extends TestCleaner {
+
+  @Test
+  public void testCompactionHandlerAndFsRemover() throws Exception {
+    Table t = newTable("default", "handler_test", true);
+    Partition p = newPartition(t, "today");
+    addBaseFile(t, p, 20L, 20);
+    addDeltaFile(t, p, 21L, 22L, 2);
+    addDeltaFile(t, p, 23L, 24L, 2);
+    addBaseFile(t, p, 25L, 25);
+
+    burnThroughTransactions(t.getDbName(), t.getTableName(), 25);
+
+    CompactionRequest rqst = new CompactionRequest(t.getDbName(), t.getTableName(), CompactionType.MAJOR);
+    rqst.setPartitionname("ds=today");
+    compactInTxn(rqst);
+    MetadataCache metadataCache = new MetadataCache(true);
+    FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache));
+    TaskHandler mockedTaskHandler = Mockito.spy(new CompactionCleaner(conf, txnHandler, metadataCache,
+            false, mockedFSRemover));
+    AtomicBoolean stop = new AtomicBoolean(true);
+    Cleaner cleaner = new Cleaner();
+    cleaner.setConf(conf);
+    cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+    cleaner.init(stop);
+    cleaner.run();
+
+    Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class));
+    Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+  }
+
+  @Test
+  public void testMetaCache() throws Exception {
+    conf.setBoolVar(HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED, false);
+
+    Table t = newTable("default", "retry_test", false);
+
+    addBaseFile(t, null, 20L, 20);
+    addDeltaFile(t, null, 21L, 22L, 2);
+    addDeltaFile(t, null, 23L, 24L, 2);
+    burnThroughTransactions("default", "retry_test", 25);
+
+    CompactionRequest rqst = new CompactionRequest("default", "retry_test", CompactionType.MAJOR);
+    long compactTxn = compactInTxn(rqst);
+    addBaseFile(t, null, 25L, 25, compactTxn);
+
+    //Prevent cleaner from marking the compaction as cleaned
+    MetadataCache mockedMetadataCache = Mockito.spy(new MetadataCache(true));
+    TxnStore mockedTxnHandler = spy(txnHandler);
+    FSRemover fsRemover = new FSRemover(conf, ReplChangeManager.getInstance(conf), mockedMetadataCache);
+    TaskHandler mockedTaskHandler = Mockito.spy(new CompactionCleaner(conf, mockedTxnHandler, mockedMetadataCache,
+            false, fsRemover));
+    Cleaner cleaner = new Cleaner();
+    cleaner.setConf(conf);
+    cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+    cleaner.init(new AtomicBoolean(true));
+    cleaner.run();
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals(1, compacts.size());
+    Mockito.verify(mockedMetadataCache, times(3)).computeIfAbsent(any(), any());
+    Mockito.verify(mockedTaskHandler, times(1)).resolveTable(any(), any());
+  }
+}