You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kl...@apache.org on 2022/02/01 15:07:30 UTC
[hive] branch master updated: HIVE-25746: Fix of Compaction Failure Counter counted incorrectly (Viktor Csomor, reviewed by Denys Kuzmenko and Karen Coppage)
This is an automated email from the ASF dual-hosted git repository.
klcopp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new a8714c1 HIVE-25746: Fix of Compaction Failure Counter counted incorrectly (Viktor Csomor, reviewed by Denys Kuzmenko and Karen Coppage)
a8714c1 is described below
commit a8714c15295d08868ff2deaab77634c742a6f5a5
Author: Viktor Csomor <cs...@gmail.com>
AuthorDate: Tue Feb 1 16:07:14 2022 +0100
HIVE-25746: Fix of Compaction Failure Counter counted incorrectly (Viktor Csomor, reviewed by Denys Kuzmenko and Karen Coppage)
Fixing compaction_initiator_failure_counter/compaction_cleaner_failure_counter logic in the Initiator and Cleaner.
- The `exceptionally` closure made the `.allOf(..).join()` call complete always without any errors
- The counting logic of the compaction_initiator_failure_counter moved to the `scheduleCompactionIfRequired` method
- The counting logic of the compaction_cleaner_failure_counter moved to the `clean` method
Both the `scheduleCompactionIfRequired` and `clean` methods catching all the possible exception except the `MetaException` of the `txnHandler.markFailed()` call. Hence, the values only incremented if there were serious issues with the mark failed call.
Closes #2974.
---
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 29 ++--
.../hadoop/hive/ql/txn/compactor/Initiator.java | 31 ++--
.../ql/txn/compactor/MetaStoreCompactorThread.java | 2 +-
.../hive/ql/txn/compactor/CompactorTest.java | 17 +--
.../hadoop/hive/ql/txn/compactor/TestCleaner.java | 6 +-
.../ql/txn/compactor/TestCompactionMetrics.java | 162 ++++++++++++++++++---
6 files changed, 194 insertions(+), 53 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 6dbf08b..1e0dbf8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
@@ -77,6 +78,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.apache.hadoop.hive.conf.Constants.COMPACTOR_CLEANER_THREAD_NAME_FORMAT;
@@ -116,7 +118,6 @@ public class Cleaner extends MetaStoreCompactorThread {
public void run() {
LOG.info("Starting Cleaner thread");
try {
- Counter failuresCounter = Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER);
do {
TxnStore.MutexAPI.LockHandle handle = null;
long startedAt = -1;
@@ -125,6 +126,7 @@ public class Cleaner extends MetaStoreCompactorThread {
if (delayedCleanupEnabled) {
retentionTime = HiveConf.getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, TimeUnit.MILLISECONDS);
}
+
// Make sure nothing escapes this run method and kills the metastore at large,
// so wrap it in a big catch Throwable statement.
try {
@@ -143,7 +145,8 @@ public class Cleaner extends MetaStoreCompactorThread {
List<CompactionInfo> readyToClean = txnHandler.findReadyToClean(minOpenTxnId, retentionTime);
if (!readyToClean.isEmpty()) {
long minTxnIdSeenOpen = txnHandler.findMinTxnIdSeenOpen();
- final long cleanerWaterMark = minTxnIdSeenOpen < 0 ? minOpenTxnId : Math.min(minOpenTxnId, minTxnIdSeenOpen);
+ final long cleanerWaterMark =
+ minTxnIdSeenOpen < 0 ? minOpenTxnId : Math.min(minOpenTxnId, minTxnIdSeenOpen);
LOG.info("Cleaning based on min open txn id: " + cleanerWaterMark);
List<CompletableFuture<Void>> cleanerList = new ArrayList<>();
@@ -154,18 +157,23 @@ public class Cleaner extends MetaStoreCompactorThread {
// when min_history_level is finally dropped, than every HMS will commit compaction the new way
// and minTxnIdSeenOpen can be removed and minOpenTxnId can be used instead.
for (CompactionInfo compactionInfo : readyToClean) {
- cleanerList.add(CompletableFuture.runAsync(ThrowingRunnable.unchecked(
- () -> clean(compactionInfo, cleanerWaterMark, metricsEnabled)), cleanerExecutor));
+ String tableName = compactionInfo.getFullTableName();
+ String partition = compactionInfo.getFullPartitionName();
+ CompletableFuture<Void> asyncJob =
+ CompletableFuture.runAsync(
+ ThrowingRunnable.unchecked(() -> clean(compactionInfo, cleanerWaterMark, metricsEnabled)),
+ cleanerExecutor)
+ .exceptionally(t -> {
+ LOG.error("Error during the cleaning the table {} / partition {}", tableName, partition, t);
+ return null;
+ });
+ cleanerList.add(asyncJob);
}
CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
}
} catch (Throwable t) {
- // the lock timeout on AUX lock, should be ignored.
- if (metricsEnabled && handle != null) {
- failuresCounter.inc();
- }
LOG.error("Caught an exception in the main loop of compactor cleaner, " +
- StringUtils.stringifyException(t));
+ StringUtils.stringifyException(t));
} finally {
if (handle != null) {
handle.releaseLocks();
@@ -277,6 +285,9 @@ public class Cleaner extends MetaStoreCompactorThread {
LOG.error("Caught exception when cleaning, unable to complete cleaning of " + ci + " " +
StringUtils.stringifyException(e));
ci.errorMessage = e.getMessage();
+ if (metricsEnabled) {
+ Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER).inc();
+ }
txnHandler.markFailed(ci);
} finally {
if (metricsEnabled) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 610e438..4cc54da 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -111,7 +111,6 @@ public class Initiator extends MetaStoreCompactorThread {
long abortedTimeThreshold = HiveConf
.getTimeVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD,
TimeUnit.MILLISECONDS);
- Counter failuresCounter = Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_INITIATOR_FAILURE_COUNTER);
// Make sure we run through the loop once before checking to stop as this makes testing
// much easier. The stop value is only for testing anyway and not used when called from
@@ -182,8 +181,18 @@ public class Initiator extends MetaStoreCompactorThread {
String runAs = resolveUserToRunAs(tblNameOwners, t, p);
/* checkForCompaction includes many file metadata checks and may be expensive.
* Therefore, using a thread pool here and running checkForCompactions in parallel */
- compactionList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() ->
- scheduleCompactionIfRequired(ci, t, p, runAs)), compactionExecutor));
+ String tableName = ci.getFullTableName();
+ String partition = ci.getFullPartitionName();
+
+ CompletableFuture<Void> asyncJob =
+ CompletableFuture.runAsync(
+ CompactorUtil.ThrowingRunnable.unchecked(() ->
+ scheduleCompactionIfRequired(ci, t, p, runAs, metricsEnabled)), compactionExecutor)
+ .exceptionally(exc -> {
+ LOG.error("Error while running scheduling the compaction on the table {} / partition {}.", tableName, partition, exc);
+ return null;
+ });
+ compactionList.add(asyncJob);
} catch (Throwable t) {
LOG.error("Caught exception while trying to determine if we should compact {}. " +
"Marking failed to avoid repeated failures, {}", ci, t);
@@ -191,20 +200,15 @@ public class Initiator extends MetaStoreCompactorThread {
txnHandler.markFailed(ci);
}
}
- CompletableFuture.allOf(compactionList.toArray(new CompletableFuture[0]))
- .join();
+
+ CompletableFuture.allOf(compactionList.toArray(new CompletableFuture[0])).join();
// Check for timed out remote workers.
recoverFailedCompactions(true);
} catch (Throwable t) {
- // the lock timeout on AUX lock, should be ignored.
- if (metricsEnabled && handle != null) {
- failuresCounter.inc();
- }
LOG.error("Initiator loop caught unexpected exception this time through the loop: " +
StringUtils.stringifyException(t));
- }
- finally {
+ } finally {
if (handle != null) {
handle.releaseLocks(startedAt);
}
@@ -232,7 +236,7 @@ public class Initiator extends MetaStoreCompactorThread {
}
}
- private void scheduleCompactionIfRequired(CompactionInfo ci, Table t, Partition p, String runAs)
+ private void scheduleCompactionIfRequired(CompactionInfo ci, Table t, Partition p, String runAs, boolean metricsEnabled)
throws MetaException {
StorageDescriptor sd = resolveStorageDescriptor(t, p);
try {
@@ -246,6 +250,9 @@ public class Initiator extends MetaStoreCompactorThread {
+ "failed to avoid repeated failures, " + ex;
LOG.error(errorMessage);
ci.errorMessage = errorMessage;
+ if (metricsEnabled) {
+ Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_INITIATOR_FAILURE_COUNTER).inc();
+ }
txnHandler.markFailed(ci);
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
index 4184caf..2e32b22 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
@@ -128,7 +128,7 @@ public class MetaStoreCompactorThread extends CompactorThread implements MetaSto
long elapsed = System.currentTimeMillis() - startedAt;
LOG.debug("Updating {} metric to {}", metric, elapsed);
Metrics.getOrCreateGauge(metric)
- .set((int)elapsed);
+ .set((int) elapsed);
return elapsed;
}
return 0;
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 8f7b385..3aee6ad 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -137,24 +137,23 @@ public abstract class CompactorTest {
tmpdir = new File(Files.createTempDirectory("compactor_test_table_").toString());
}
-
protected void compactorTestCleanup() throws IOException {
FileUtils.deleteDirectory(tmpdir);
}
protected void startInitiator() throws Exception {
- startThread(CompactorThreadType.INITIATOR, true);
+ runOneLoopOfCompactorThread(CompactorThreadType.INITIATOR);
}
protected void startWorker() throws Exception {
- startThread(CompactorThreadType.WORKER, true);
+ runOneLoopOfCompactorThread(CompactorThreadType.WORKER);
}
protected void startCleaner() throws Exception {
- startThread(CompactorThreadType.CLEANER, true);
+ runOneLoopOfCompactorThread(CompactorThreadType.CLEANER);
}
- protected void runAcidMetricService() throws Exception {
+ protected void runAcidMetricService() {
TestTxnDbUtil.setConfValues(conf);
AcidMetricService t = new AcidMetricService();
t.setConf(conf);
@@ -354,7 +353,7 @@ public abstract class CompactorTest {
}
// I can't do this with @Before because I want to be able to control when the thread starts
- private void startThread(CompactorThreadType type, boolean stopAfterOne) throws Exception {
+ private void runOneLoopOfCompactorThread(CompactorThreadType type) throws Exception {
TestTxnDbUtil.setConfValues(conf);
CompactorThread t;
switch (type) {
@@ -365,10 +364,9 @@ public abstract class CompactorTest {
}
t.setThreadId((int) t.getId());
t.setConf(conf);
- stop.set(stopAfterOne);
+ stop.set(true);
t.init(stop);
- if (stopAfterOne) t.run();
- else t.start();
+ t.run();
}
private String getLocation(String tableName, String partValue) {
@@ -686,5 +684,4 @@ public abstract class CompactorTest {
}
return result;
}
-
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index 1d0112f..1a09d50 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -370,7 +370,7 @@ public class TestCleaner extends CompactorTest {
Table t = newTable("default", "camipc", true);
List<Partition> partitions = new ArrayList<>();
Partition p;
- for(int i = 0; i < 10; i++) {
+ for (int i = 0; i < 10; i++) {
p = newPartition(t, "today" + i);
addBaseFile(t, p, 20L, 20);
@@ -381,9 +381,9 @@ public class TestCleaner extends CompactorTest {
}
burnThroughTransactions("default", "camipc", 25);
- for(int i = 0; i < 10; i++) {
+ for (int i = 0; i < 10; i++) {
CompactionRequest rqst = new CompactionRequest("default", "camipc", CompactionType.MINOR);
- rqst.setPartitionname("ds=today"+i);
+ rqst.setPartitionname("ds=today" + i);
compactInTxn(rqst);
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
index 7a33176..eea0c3b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
@@ -31,12 +31,15 @@ import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
import org.apache.hadoop.hive.metastore.api.LockComponent;
import org.apache.hadoop.hive.metastore.api.LockLevel;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
@@ -48,9 +51,11 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
import org.apache.hadoop.hive.metastore.metrics.Metrics;
import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.ThrowingTxnHandler;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -240,24 +245,6 @@ public class TestCompactionMetrics extends CompactorTest {
}
@Test
- public void testInitiatorFailure() throws Exception {
- ThrowingTxnHandler.doThrow = true;
- MetastoreConf.setVar(conf, MetastoreConf.ConfVars.TXN_STORE_IMPL, "org.apache.hadoop.hive.metastore.txn.ThrowingTxnHandler");
- startInitiator();
- Counter counter = Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_INITIATOR_FAILURE_COUNTER);
- Assert.assertEquals("Count incorrect", 1, counter.getCount());
- }
-
- @Test
- public void testCleanerFailure() throws Exception {
- ThrowingTxnHandler.doThrow = true;
- MetastoreConf.setVar(conf, MetastoreConf.ConfVars.TXN_STORE_IMPL, "org.apache.hadoop.hive.metastore.txn.ThrowingTxnHandler");
- startCleaner();
- Counter counter = Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER);
- Assert.assertEquals("Count incorrect", 1, counter.getCount());
- }
-
- @Test
public void testInitiatorAuxFailure() throws Exception {
TxnStore.MutexAPI.LockHandle handle = null;
try {
@@ -921,6 +908,120 @@ public class TestCompactionMetrics extends CompactorTest {
(0 < cleanerDurationFromMetric) && (cleanerDurationFromMetric <= durationUpperLimit));
}
+ @Test
+ public void testInitiatorFailuresCountedCorrectly() throws Exception {
+ final String DEFAULT_DB = "default";
+ final String SUCCESS_TABLE_NAME = "success_table";
+ final String FAILING_TABLE_NAME = "failing_table";
+ final String PARTITION_NAME = "part";
+ final long EXPECTED_SUCCESS_COUNT = 10;
+ final long EXPECTED_FAIL_COUNT = 6;
+
+ ControlledFailingTxHandler.failedTableName = FAILING_TABLE_NAME;
+ MetastoreConf.setVar(conf, MetastoreConf.ConfVars.TXN_STORE_IMPL,
+ "org.apache.hadoop.hive.ql.txn.compactor.TestCompactionMetrics$ControlledFailingTxHandler");
+
+ Table failedTable = newTable(DEFAULT_DB, FAILING_TABLE_NAME, true);
+ Table succeededTable = newTable(DEFAULT_DB, SUCCESS_TABLE_NAME, true);
+
+ for (Table table : new Table[] { succeededTable, failedTable }) {
+ List<LockComponent> components = new ArrayList<>();
+
+ String tableName = table.getTableName();
+
+ long partitionCount = FAILING_TABLE_NAME.equals(tableName) ? EXPECTED_FAIL_COUNT : EXPECTED_SUCCESS_COUNT;
+ for (int i = 0; i < partitionCount; i++) {
+ String partitionName = PARTITION_NAME + i;
+ Partition p = newPartition(table, partitionName);
+
+ addBaseFile(table, p, 20L, 20);
+ addDeltaFile(table, p, 21L, 22L, 2);
+ addDeltaFile(table, p, 23L, 24L, 2);
+ addDeltaFile(table, p, 21L, 24L, 4);
+
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, DEFAULT_DB);
+ comp.setTablename(tableName);
+ comp.setPartitionname("ds=" + partitionName);
+ comp.setOperationType(DataOperationType.UPDATE);
+ components.add(comp);
+ }
+
+ burnThroughTransactions(DEFAULT_DB, tableName, 25);
+
+ long txnid = openTxn();
+
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ LockResponse res = txnHandler.lock(req);
+ Assert.assertEquals(LockState.ACQUIRED, res.getState());
+
+ long writeid = allocateWriteId(DEFAULT_DB, tableName, txnid);
+ Assert.assertEquals(26, writeid);
+ txnHandler.commitTxn(new CommitTxnRequest(txnid));
+ }
+
+ conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_REQUEST_QUEUE, 5);
+ startInitiator();
+
+ // Check if all the compaction have initiated
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(EXPECTED_FAIL_COUNT + EXPECTED_SUCCESS_COUNT, rsp.getCompactsSize());
+
+ Assert.assertEquals(EXPECTED_FAIL_COUNT,
+ Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_INITIATOR_FAILURE_COUNTER)
+ .getCount());
+ }
+
+ @Test
+ public void testCleanerFailuresCountedCorrectly() throws Exception {
+ final String DEFAULT_DB = "default";
+ final String SUCCESS_TABLE_NAME = "success_table";
+ final String FAILING_TABLE_NAME = "failing_table";
+ final String PARTITION_NAME = "part";
+ final long EXPECTED_SUCCESS_COUNT = 10;
+ final long EXPECTED_FAIL_COUNT = 6;
+
+ ControlledFailingTxHandler.failedTableName = FAILING_TABLE_NAME;
+ MetastoreConf.setVar(conf, MetastoreConf.ConfVars.TXN_STORE_IMPL,
+ "org.apache.hadoop.hive.ql.txn.compactor.TestCompactionMetrics$ControlledFailingTxHandler");
+
+ Table failedTable = newTable(DEFAULT_DB, FAILING_TABLE_NAME, true);
+ Table succeededTable = newTable(DEFAULT_DB, SUCCESS_TABLE_NAME, true);
+
+ for (Table table : new Table[] { succeededTable, failedTable }) {
+
+ String tableName = table.getTableName();
+
+ long partitionCount = FAILING_TABLE_NAME.equals(tableName) ? EXPECTED_FAIL_COUNT : EXPECTED_SUCCESS_COUNT;
+ for (int i = 0; i < partitionCount; i++) {
+ Partition p = newPartition(table, PARTITION_NAME + i);
+
+ addBaseFile(table, p, 20L, 20);
+ addDeltaFile(table, p, 21L, 22L, 2);
+ addDeltaFile(table, p, 23L, 24L, 2);
+ addDeltaFile(table, p, 21L, 24L, 4);
+ }
+
+ burnThroughTransactions(DEFAULT_DB, tableName, 25);
+ for (int i = 0; i < partitionCount; i++) {
+ CompactionRequest rqst = new CompactionRequest(DEFAULT_DB, tableName, CompactionType.MINOR);
+ rqst.setPartitionname("ds=" + PARTITION_NAME + i);
+ compactInTxn(rqst);
+ }
+ }
+
+ conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_THREADS_NUM, 5);
+ startCleaner();
+
+ // Check there are no compactions requests left.
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(EXPECTED_FAIL_COUNT + EXPECTED_SUCCESS_COUNT, rsp.getCompactsSize());
+
+ Assert.assertEquals(EXPECTED_FAIL_COUNT,
+ Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER)
+ .getCount());
+ }
+
private ShowCompactResponseElement generateElement(long id, String db, String table, String partition,
CompactionType type, String state) {
return generateElement(id, db, table, partition, type, state, System.currentTimeMillis());
@@ -988,6 +1089,31 @@ public class TestCompactionMetrics extends CompactorTest {
startWorker();
}
+ public static class ControlledFailingTxHandler extends ThrowingTxnHandler {
+ public static volatile String failedTableName;
+
+ public ControlledFailingTxHandler() {
+ }
+
+ @Override
+ public GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst) throws MetaException {
+ if (rqst.getFullTableNames()
+ .stream()
+ .anyMatch(t -> t.endsWith("." + failedTableName))) {
+ throw new RuntimeException("TxnHandler fails during getValidWriteIds");
+ }
+ return super.getValidWriteIds(rqst);
+ }
+
+ @Override
+ public void markCleanerStart(CompactionInfo info) throws MetaException {
+ if (failedTableName.equals(info.tableName)) {
+ throw new RuntimeException("TxnHandler fails during MarkCleaned");
+ }
+ super.markCleanerStart(info);
+ }
+ }
+
@Override
boolean useHive130DeltaDirName() {
return false;