You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kl...@apache.org on 2022/02/01 15:07:30 UTC

[hive] branch master updated: HIVE-25746: Fix of Compaction Failure Counter counted incorrectly (Viktor Csomor, reviewed by Denys Kuzmenko and Karen Coppage)

This is an automated email from the ASF dual-hosted git repository.

klcopp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new a8714c1  HIVE-25746: Fix of Compaction Failure Counter counted incorrectly (Viktor Csomor, reviewed by Denys Kuzmenko and Karen Coppage)
a8714c1 is described below

commit a8714c15295d08868ff2deaab77634c742a6f5a5
Author: Viktor Csomor <cs...@gmail.com>
AuthorDate: Tue Feb 1 16:07:14 2022 +0100

    HIVE-25746: Fix of Compaction Failure Counter counted incorrectly (Viktor Csomor, reviewed by Denys Kuzmenko and Karen Coppage)
    
    Fixing compaction_initiator_failure_counter/compaction_cleaner_failure_counter logic in the Initiator and Cleaner.
    - The `exceptionally` closure made the `.allOf(..).join()` call complete always without any errors
    - The counting logic of the compaction_initiator_failure_counter moved to the `scheduleCompactionIfRequired` method
    - The counting logic of the compaction_cleaner_failure_counter moved to the `clean` method
    
    Both the `scheduleCompactionIfRequired` and `clean` methods catching all the possible exception except the `MetaException` of the `txnHandler.markFailed()` call. Hence, the values only incremented if there were serious issues with the mark failed call.
    
    Closes #2974.
---
 .../hadoop/hive/ql/txn/compactor/Cleaner.java      |  29 ++--
 .../hadoop/hive/ql/txn/compactor/Initiator.java    |  31 ++--
 .../ql/txn/compactor/MetaStoreCompactorThread.java |   2 +-
 .../hive/ql/txn/compactor/CompactorTest.java       |  17 +--
 .../hadoop/hive/ql/txn/compactor/TestCleaner.java  |   6 +-
 .../ql/txn/compactor/TestCompactionMetrics.java    | 162 ++++++++++++++++++---
 6 files changed, 194 insertions(+), 53 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 6dbf08b..1e0dbf8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.api.DataOperationType;
@@ -77,6 +78,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hive.conf.Constants.COMPACTOR_CLEANER_THREAD_NAME_FORMAT;
@@ -116,7 +118,6 @@ public class Cleaner extends MetaStoreCompactorThread {
   public void run() {
     LOG.info("Starting Cleaner thread");
     try {
-      Counter failuresCounter = Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER);
       do {
         TxnStore.MutexAPI.LockHandle handle = null;
         long startedAt = -1;
@@ -125,6 +126,7 @@ public class Cleaner extends MetaStoreCompactorThread {
         if (delayedCleanupEnabled) {
           retentionTime = HiveConf.getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, TimeUnit.MILLISECONDS);
         }
+
         // Make sure nothing escapes this run method and kills the metastore at large,
         // so wrap it in a big catch Throwable statement.
         try {
@@ -143,7 +145,8 @@ public class Cleaner extends MetaStoreCompactorThread {
           List<CompactionInfo> readyToClean = txnHandler.findReadyToClean(minOpenTxnId, retentionTime);
           if (!readyToClean.isEmpty()) {
             long minTxnIdSeenOpen = txnHandler.findMinTxnIdSeenOpen();
-            final long cleanerWaterMark = minTxnIdSeenOpen < 0 ? minOpenTxnId : Math.min(minOpenTxnId, minTxnIdSeenOpen);
+            final long cleanerWaterMark =
+                minTxnIdSeenOpen < 0 ? minOpenTxnId : Math.min(minOpenTxnId, minTxnIdSeenOpen);
 
             LOG.info("Cleaning based on min open txn id: " + cleanerWaterMark);
             List<CompletableFuture<Void>> cleanerList = new ArrayList<>();
@@ -154,18 +157,23 @@ public class Cleaner extends MetaStoreCompactorThread {
             // when min_history_level is finally dropped, than every HMS will commit compaction the new way
             // and minTxnIdSeenOpen can be removed and minOpenTxnId can be used instead.
             for (CompactionInfo compactionInfo : readyToClean) {
-              cleanerList.add(CompletableFuture.runAsync(ThrowingRunnable.unchecked(
-                  () -> clean(compactionInfo, cleanerWaterMark, metricsEnabled)), cleanerExecutor));
+              String tableName = compactionInfo.getFullTableName();
+              String partition = compactionInfo.getFullPartitionName();
+              CompletableFuture<Void> asyncJob =
+                  CompletableFuture.runAsync(
+                          ThrowingRunnable.unchecked(() -> clean(compactionInfo, cleanerWaterMark, metricsEnabled)),
+                          cleanerExecutor)
+                      .exceptionally(t -> {
+                        LOG.error("Error during the cleaning the table {} / partition {}", tableName, partition, t);
+                        return null;
+                      });
+              cleanerList.add(asyncJob);
             }
             CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
           }
         } catch (Throwable t) {
-          // the lock timeout on AUX lock, should be ignored.
-          if (metricsEnabled && handle != null) {
-            failuresCounter.inc();
-          }
           LOG.error("Caught an exception in the main loop of compactor cleaner, " +
-                  StringUtils.stringifyException(t));
+              StringUtils.stringifyException(t));
         } finally {
           if (handle != null) {
             handle.releaseLocks();
@@ -277,6 +285,9 @@ public class Cleaner extends MetaStoreCompactorThread {
       LOG.error("Caught exception when cleaning, unable to complete cleaning of " + ci + " " +
           StringUtils.stringifyException(e));
       ci.errorMessage = e.getMessage();
+      if (metricsEnabled) {
+        Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER).inc();
+      }
       txnHandler.markFailed(ci);
     } finally {
       if (metricsEnabled) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 610e438..4cc54da 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -111,7 +111,6 @@ public class Initiator extends MetaStoreCompactorThread {
       long abortedTimeThreshold = HiveConf
           .getTimeVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD,
               TimeUnit.MILLISECONDS);
-      Counter failuresCounter = Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_INITIATOR_FAILURE_COUNTER);
 
       // Make sure we run through the loop once before checking to stop as this makes testing
       // much easier.  The stop value is only for testing anyway and not used when called from
@@ -182,8 +181,18 @@ public class Initiator extends MetaStoreCompactorThread {
               String runAs = resolveUserToRunAs(tblNameOwners, t, p);
               /* checkForCompaction includes many file metadata checks and may be expensive.
                * Therefore, using a thread pool here and running checkForCompactions in parallel */
-              compactionList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() ->
-                  scheduleCompactionIfRequired(ci, t, p, runAs)), compactionExecutor));
+              String tableName = ci.getFullTableName();
+              String partition = ci.getFullPartitionName();
+
+              CompletableFuture<Void> asyncJob =
+                  CompletableFuture.runAsync(
+                          CompactorUtil.ThrowingRunnable.unchecked(() ->
+                              scheduleCompactionIfRequired(ci, t, p, runAs, metricsEnabled)), compactionExecutor)
+                      .exceptionally(exc -> {
+                        LOG.error("Error while running scheduling the compaction on the table {} / partition {}.", tableName, partition, exc);
+                        return null;
+                      });
+              compactionList.add(asyncJob);
             } catch (Throwable t) {
               LOG.error("Caught exception while trying to determine if we should compact {}. " +
                   "Marking failed to avoid repeated failures, {}", ci, t);
@@ -191,20 +200,15 @@ public class Initiator extends MetaStoreCompactorThread {
               txnHandler.markFailed(ci);
             }
           }
-          CompletableFuture.allOf(compactionList.toArray(new CompletableFuture[0]))
-            .join();
+
+          CompletableFuture.allOf(compactionList.toArray(new CompletableFuture[0])).join();
 
           // Check for timed out remote workers.
           recoverFailedCompactions(true);
         } catch (Throwable t) {
-          // the lock timeout on AUX lock, should be ignored.
-          if (metricsEnabled && handle != null) {
-            failuresCounter.inc();
-          }
           LOG.error("Initiator loop caught unexpected exception this time through the loop: " +
               StringUtils.stringifyException(t));
-        }
-        finally {
+        } finally {
           if (handle != null) {
             handle.releaseLocks(startedAt);
           }
@@ -232,7 +236,7 @@ public class Initiator extends MetaStoreCompactorThread {
     }
   }
 
-  private void scheduleCompactionIfRequired(CompactionInfo ci, Table t, Partition p, String runAs)
+  private void scheduleCompactionIfRequired(CompactionInfo ci, Table t, Partition p, String runAs, boolean metricsEnabled)
       throws MetaException {
     StorageDescriptor sd = resolveStorageDescriptor(t, p);
     try {
@@ -246,6 +250,9 @@ public class Initiator extends MetaStoreCompactorThread {
           + "failed to avoid repeated failures, " + ex;
       LOG.error(errorMessage);
       ci.errorMessage = errorMessage;
+      if (metricsEnabled) {
+        Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_INITIATOR_FAILURE_COUNTER).inc();
+      }
       txnHandler.markFailed(ci);
     }
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
index 4184caf..2e32b22 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
@@ -128,7 +128,7 @@ public class MetaStoreCompactorThread extends CompactorThread implements MetaSto
       long elapsed = System.currentTimeMillis() - startedAt;
       LOG.debug("Updating {} metric to {}", metric, elapsed);
       Metrics.getOrCreateGauge(metric)
-          .set((int)elapsed);
+          .set((int) elapsed);
       return elapsed;
     }
     return 0;
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 8f7b385..3aee6ad 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -137,24 +137,23 @@ public abstract class CompactorTest {
     tmpdir = new File(Files.createTempDirectory("compactor_test_table_").toString());
   }
 
-
   protected void compactorTestCleanup() throws IOException {
     FileUtils.deleteDirectory(tmpdir);
   }
 
   protected void startInitiator() throws Exception {
-    startThread(CompactorThreadType.INITIATOR, true);
+    runOneLoopOfCompactorThread(CompactorThreadType.INITIATOR);
   }
 
   protected void startWorker() throws Exception {
-    startThread(CompactorThreadType.WORKER, true);
+    runOneLoopOfCompactorThread(CompactorThreadType.WORKER);
   }
 
   protected void startCleaner() throws Exception {
-    startThread(CompactorThreadType.CLEANER, true);
+    runOneLoopOfCompactorThread(CompactorThreadType.CLEANER);
   }
 
-  protected void runAcidMetricService() throws Exception {
+  protected void runAcidMetricService() {
     TestTxnDbUtil.setConfValues(conf);
     AcidMetricService t = new AcidMetricService();
     t.setConf(conf);
@@ -354,7 +353,7 @@ public abstract class CompactorTest {
   }
 
   // I can't do this with @Before because I want to be able to control when the thread starts
-  private void startThread(CompactorThreadType type, boolean stopAfterOne) throws Exception {
+  private void runOneLoopOfCompactorThread(CompactorThreadType type) throws Exception {
     TestTxnDbUtil.setConfValues(conf);
     CompactorThread t;
     switch (type) {
@@ -365,10 +364,9 @@ public abstract class CompactorTest {
     }
     t.setThreadId((int) t.getId());
     t.setConf(conf);
-    stop.set(stopAfterOne);
+    stop.set(true);
     t.init(stop);
-    if (stopAfterOne) t.run();
-    else t.start();
+    t.run();
   }
 
   private String getLocation(String tableName, String partValue) {
@@ -686,5 +684,4 @@ public abstract class CompactorTest {
     }
     return result;
   }
-
 }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index 1d0112f..1a09d50 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -370,7 +370,7 @@ public class TestCleaner extends CompactorTest {
     Table t = newTable("default", "camipc", true);
     List<Partition> partitions = new ArrayList<>();
     Partition p;
-    for(int i = 0; i < 10; i++) {
+    for (int i = 0; i < 10; i++) {
       p = newPartition(t, "today" + i);
 
       addBaseFile(t, p, 20L, 20);
@@ -381,9 +381,9 @@ public class TestCleaner extends CompactorTest {
     }
 
     burnThroughTransactions("default", "camipc", 25);
-    for(int i = 0; i < 10; i++) {
+    for (int i = 0; i < 10; i++) {
       CompactionRequest rqst = new CompactionRequest("default", "camipc", CompactionType.MINOR);
-      rqst.setPartitionname("ds=today"+i);
+      rqst.setPartitionname("ds=today" + i);
       compactInTxn(rqst);
     }
 
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
index 7a33176..eea0c3b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
@@ -31,12 +31,15 @@ import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.CompactionRequest;
 import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
 import org.apache.hadoop.hive.metastore.api.LockComponent;
 import org.apache.hadoop.hive.metastore.api.LockLevel;
 import org.apache.hadoop.hive.metastore.api.LockRequest;
 import org.apache.hadoop.hive.metastore.api.LockResponse;
 import org.apache.hadoop.hive.metastore.api.LockState;
 import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
@@ -48,9 +51,11 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
 import org.apache.hadoop.hive.metastore.metrics.Metrics;
 import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.ThrowingTxnHandler;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -240,24 +245,6 @@ public class TestCompactionMetrics  extends CompactorTest {
   }
 
   @Test
-  public void  testInitiatorFailure() throws Exception {
-    ThrowingTxnHandler.doThrow = true;
-    MetastoreConf.setVar(conf, MetastoreConf.ConfVars.TXN_STORE_IMPL, "org.apache.hadoop.hive.metastore.txn.ThrowingTxnHandler");
-    startInitiator();
-    Counter counter = Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_INITIATOR_FAILURE_COUNTER);
-    Assert.assertEquals("Count incorrect", 1, counter.getCount());
-  }
-
-  @Test
-  public void  testCleanerFailure() throws Exception {
-    ThrowingTxnHandler.doThrow = true;
-    MetastoreConf.setVar(conf, MetastoreConf.ConfVars.TXN_STORE_IMPL, "org.apache.hadoop.hive.metastore.txn.ThrowingTxnHandler");
-    startCleaner();
-    Counter counter = Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER);
-    Assert.assertEquals("Count incorrect", 1, counter.getCount());
-  }
-
-  @Test
   public void  testInitiatorAuxFailure() throws Exception {
     TxnStore.MutexAPI.LockHandle handle = null;
     try {
@@ -921,6 +908,120 @@ public class TestCompactionMetrics  extends CompactorTest {
         (0 < cleanerDurationFromMetric) && (cleanerDurationFromMetric <= durationUpperLimit));
   }
 
+  @Test
+  public void testInitiatorFailuresCountedCorrectly() throws Exception {
+    final String DEFAULT_DB = "default";
+    final String SUCCESS_TABLE_NAME = "success_table";
+    final String FAILING_TABLE_NAME = "failing_table";
+    final String PARTITION_NAME = "part";
+    final long EXPECTED_SUCCESS_COUNT = 10;
+    final long EXPECTED_FAIL_COUNT = 6;
+
+    ControlledFailingTxHandler.failedTableName = FAILING_TABLE_NAME;
+    MetastoreConf.setVar(conf, MetastoreConf.ConfVars.TXN_STORE_IMPL,
+        "org.apache.hadoop.hive.ql.txn.compactor.TestCompactionMetrics$ControlledFailingTxHandler");
+
+    Table failedTable = newTable(DEFAULT_DB, FAILING_TABLE_NAME, true);
+    Table succeededTable = newTable(DEFAULT_DB, SUCCESS_TABLE_NAME, true);
+
+    for (Table table : new Table[] { succeededTable, failedTable }) {
+      List<LockComponent> components = new ArrayList<>();
+
+      String tableName = table.getTableName();
+
+      long partitionCount = FAILING_TABLE_NAME.equals(tableName) ? EXPECTED_FAIL_COUNT : EXPECTED_SUCCESS_COUNT;
+      for (int i = 0; i < partitionCount; i++) {
+        String partitionName = PARTITION_NAME + i;
+        Partition p = newPartition(table, partitionName);
+
+        addBaseFile(table, p, 20L, 20);
+        addDeltaFile(table, p, 21L, 22L, 2);
+        addDeltaFile(table, p, 23L, 24L, 2);
+        addDeltaFile(table, p, 21L, 24L, 4);
+
+        LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, DEFAULT_DB);
+        comp.setTablename(tableName);
+        comp.setPartitionname("ds=" + partitionName);
+        comp.setOperationType(DataOperationType.UPDATE);
+        components.add(comp);
+      }
+
+      burnThroughTransactions(DEFAULT_DB, tableName, 25);
+
+      long txnid = openTxn();
+
+      LockRequest req = new LockRequest(components, "me", "localhost");
+      req.setTxnid(txnid);
+      LockResponse res = txnHandler.lock(req);
+      Assert.assertEquals(LockState.ACQUIRED, res.getState());
+
+      long writeid = allocateWriteId(DEFAULT_DB, tableName, txnid);
+      Assert.assertEquals(26, writeid);
+      txnHandler.commitTxn(new CommitTxnRequest(txnid));
+    }
+
+    conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_REQUEST_QUEUE, 5);
+    startInitiator();
+
+    // Check if all the compaction have initiated
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(EXPECTED_FAIL_COUNT + EXPECTED_SUCCESS_COUNT, rsp.getCompactsSize());
+
+    Assert.assertEquals(EXPECTED_FAIL_COUNT,
+        Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_INITIATOR_FAILURE_COUNTER)
+            .getCount());
+  }
+
+  @Test
+  public void testCleanerFailuresCountedCorrectly() throws Exception {
+    final String DEFAULT_DB = "default";
+    final String SUCCESS_TABLE_NAME = "success_table";
+    final String FAILING_TABLE_NAME = "failing_table";
+    final String PARTITION_NAME = "part";
+    final long EXPECTED_SUCCESS_COUNT = 10;
+    final long EXPECTED_FAIL_COUNT = 6;
+
+    ControlledFailingTxHandler.failedTableName = FAILING_TABLE_NAME;
+    MetastoreConf.setVar(conf, MetastoreConf.ConfVars.TXN_STORE_IMPL,
+        "org.apache.hadoop.hive.ql.txn.compactor.TestCompactionMetrics$ControlledFailingTxHandler");
+
+    Table failedTable = newTable(DEFAULT_DB, FAILING_TABLE_NAME, true);
+    Table succeededTable = newTable(DEFAULT_DB, SUCCESS_TABLE_NAME, true);
+
+    for (Table table : new Table[] { succeededTable, failedTable }) {
+
+      String tableName = table.getTableName();
+
+      long partitionCount = FAILING_TABLE_NAME.equals(tableName) ? EXPECTED_FAIL_COUNT : EXPECTED_SUCCESS_COUNT;
+      for (int i = 0; i < partitionCount; i++) {
+        Partition p = newPartition(table, PARTITION_NAME + i);
+
+        addBaseFile(table, p, 20L, 20);
+        addDeltaFile(table, p, 21L, 22L, 2);
+        addDeltaFile(table, p, 23L, 24L, 2);
+        addDeltaFile(table, p, 21L, 24L, 4);
+      }
+
+      burnThroughTransactions(DEFAULT_DB, tableName, 25);
+      for (int i = 0; i < partitionCount; i++) {
+        CompactionRequest rqst = new CompactionRequest(DEFAULT_DB, tableName, CompactionType.MINOR);
+        rqst.setPartitionname("ds=" + PARTITION_NAME + i);
+        compactInTxn(rqst);
+      }
+    }
+
+    conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_THREADS_NUM, 5);
+    startCleaner();
+
+    // Check there are no compactions requests left.
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(EXPECTED_FAIL_COUNT + EXPECTED_SUCCESS_COUNT, rsp.getCompactsSize());
+
+    Assert.assertEquals(EXPECTED_FAIL_COUNT,
+        Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER)
+            .getCount());
+  }
+
   private ShowCompactResponseElement generateElement(long id, String db, String table, String partition,
       CompactionType type, String state) {
     return generateElement(id, db, table, partition, type, state, System.currentTimeMillis());
@@ -988,6 +1089,31 @@ public class TestCompactionMetrics  extends CompactorTest {
     startWorker();
   }
 
+  public static class ControlledFailingTxHandler extends ThrowingTxnHandler {
+    public static volatile String failedTableName;
+
+    public ControlledFailingTxHandler() {
+    }
+
+    @Override
+    public GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst) throws MetaException {
+      if (rqst.getFullTableNames()
+          .stream()
+          .anyMatch(t -> t.endsWith("." + failedTableName))) {
+        throw new RuntimeException("TxnHandler fails during getValidWriteIds");
+      }
+      return super.getValidWriteIds(rqst);
+    }
+
+    @Override
+    public void markCleanerStart(CompactionInfo info) throws MetaException {
+      if (failedTableName.equals(info.tableName)) {
+        throw new RuntimeException("TxnHandler fails during MarkCleaned");
+      }
+      super.markCleanerStart(info);
+    }
+  }
+
   @Override
   boolean useHive130DeltaDirName() {
     return false;