You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2023/11/27 17:48:28 UTC

(accumulo) branch elasticity updated: Cleanup failed compaction tmp files (#3955)

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 54b7965e60 Cleanup failed compaction tmp files (#3955)
54b7965e60 is described below

commit 54b7965e6060db7375dc785f486070396a29c8b1
Author: Dave Marion <dl...@apache.org>
AuthorDate: Mon Nov 27 12:48:22 2023 -0500

    Cleanup failed compaction tmp files (#3955)
    
    Modified TabletNameGenerator.getNextDataFilenameForMajc to add
    the external compaction id to the filename. When a compaction
    fails, or the DeadCompactionDetector notices a Compactor is down,
    CompactionCoordinator.compactionFailed is called to remove the
    external compaction id from the tablet metadata. Modified this
    method to also remove corresponding tmp files from the tablet
    directory when the external compaction id is successfully removed
    from the tablet metadata. When the external compaction id is not
    successfully removed, like when the tablet is merged or deleted,
    then the cleanup of table tmp files will occur in the background
    DeadCompactionDetector thread. Created the FindCompactionTmpFiles
    utility to find, and optionally delete, compaction tmp files
    at a system level.
    
    Fixes #3577
---
 .../org/apache/accumulo/core/conf/Property.java    |   6 +
 .../core/metadata/ReferencedTabletFile.java        |   2 +
 .../core/metadata/schema/ExternalCompactionId.java |   3 +-
 .../server/tablets/TabletNameGenerator.java        |  10 +-
 .../server/util/FindCompactionTmpFiles.java        | 216 +++++++++++++++++++++
 .../server/tablets/TabletNameGeneratorTest.java    |  93 +++++++++
 .../org/apache/accumulo/compactor/Compactor.java   |   5 +-
 .../coordinator/CompactionCoordinator.java         |  96 +++++++--
 .../coordinator/DeadCompactionDetector.java        | 123 ++++++++----
 .../compaction/ExternalCompactionTestUtils.java    |   1 +
 .../test/compaction/ExternalCompaction_1_IT.java   |  10 +
 .../test/compaction/ExternalCompaction_2_IT.java   |  18 ++
 .../test/compaction/ExternalCompaction_3_IT.java   |  10 +
 .../compaction/ExternalDoNothingCompactor.java     |  15 ++
 .../test/functional/FindCompactionTmpFilesIT.java  | 216 +++++++++++++++++++++
 15 files changed, 759 insertions(+), 65 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index c371c136b5..4c5fba165a 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -1116,6 +1116,12 @@ public enum Property {
   @Experimental
   COMPACTOR_PREFIX("compactor.", null, PropertyType.PREFIX,
       "Properties in this category affect the behavior of the accumulo compactor server.", "2.1.0"),
+  COMPACTOR_CANCEL_CHECK_INTERVAL("compactor.cancel.check.interval", "5m",
+      PropertyType.TIMEDURATION,
+      "Interval at which Compactors will check to see if the currently executing compaction"
+          + " should be cancelled. This checks for situations like was the tablet deleted (split "
+          + " and merge do this), was the table deleted, was a user compaction canceled, etc.",
+      "4.0.0"),
   @Experimental
   COMPACTOR_PORTSEARCH("compactor.port.search", "true", PropertyType.BOOLEAN,
       "If the compactor.port.client is in use, search higher ports until one is available.",
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/ReferencedTabletFile.java b/core/src/main/java/org/apache/accumulo/core/metadata/ReferencedTabletFile.java
index 4e219d8023..c55ba033b7 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/ReferencedTabletFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/ReferencedTabletFile.java
@@ -97,6 +97,8 @@ public class ReferencedTabletFile extends AbstractTabletFile<ReferencedTabletFil
   public static FileParts parsePath(Path filePath) {
     // File name construct: <volume>/<tablePath>/<tableId>/<tablet>/<file>
     // Example: hdfs://namenode:9020/accumulo/tables/1/default_tablet/F00001.rf
+    // Example compaction tmp file:
+    // hdfs://namenode:9020/accumulo/tables/1/default_tablet/F00001.rf_tmp_ECID-<uuid>
     final URI uri = filePath.toUri();
 
     // validate that this is a fully qualified uri
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionId.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionId.java
index 2faf81a56a..2dc3030d6d 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionId.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionId.java
@@ -26,7 +26,7 @@ public class ExternalCompactionId extends AbstractId<ExternalCompactionId> {
 
   // A common prefix is nice when grepping logs for external compaction ids. The prefix also serves
   // as a nice sanity check on data coming in over the network and from persistent storage.
-  private static final String PREFIX = "ECID:";
+  public static final String PREFIX = "ECID-";
 
   private ExternalCompactionId(UUID uuid) {
     super(PREFIX + uuid);
@@ -66,5 +66,4 @@ public class ExternalCompactionId extends AbstractId<ExternalCompactionId> {
     }
     return of(ecid);
   }
-
 }
diff --git a/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletNameGenerator.java b/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletNameGenerator.java
index 802b235dce..7ab3e048f7 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletNameGenerator.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletNameGenerator.java
@@ -25,6 +25,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FilePrefix;
 import org.apache.accumulo.core.metadata.ReferencedTabletFile;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment;
@@ -67,11 +68,12 @@ public class TabletNameGenerator {
   }
 
   public static ReferencedTabletFile getNextDataFilenameForMajc(boolean propagateDeletes,
-      ServerContext context, TabletMetadata tabletMetadata, Consumer<String> dirCreator) {
+      ServerContext context, TabletMetadata tabletMetadata, Consumer<String> dirCreator,
+      ExternalCompactionId ecid) {
     String tmpFileName = getNextDataFilename(
         !propagateDeletes ? FilePrefix.MAJOR_COMPACTION_ALL_FILES : FilePrefix.MAJOR_COMPACTION,
-        context, tabletMetadata.getExtent(), tabletMetadata.getDirName(), dirName -> {}).insert()
-        .getMetadataPath() + "_tmp";
+        context, tabletMetadata.getExtent(), tabletMetadata.getDirName(), dirCreator).insert()
+        .getMetadataPath() + "_tmp_" + ecid.canonical();
     return new ReferencedTabletFile(new Path(tmpFileName));
   }
 
@@ -82,7 +84,7 @@ public class TabletNameGenerator {
       newFilePath = newFilePath.substring(0, idx);
     } else {
       throw new IllegalArgumentException("Expected compaction tmp file "
-          + tmpFile.getNormalizedPathStr() + " to have suffix '_tmp'");
+          + tmpFile.getNormalizedPathStr() + " to have suffix starting with '_tmp'");
     }
     return new ReferencedTabletFile(new Path(newFilePath));
   }
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java b/server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java
new file mode 100644
index 0000000000..aec3e775fb
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FindCompactionTmpFiles.class);
+
+  static class Opts extends ServerUtilOpts {
+
+    @Parameter(names = "--tables", description = "comma separated list of table names")
+    String tables;
+
+    @Parameter(names = "--delete", description = "if true, will delete tmp files")
+    boolean delete = false;
+  }
+
+  public static Set<Path> findTempFiles(ServerContext context, String tableId)
+      throws InterruptedException {
+    String tablePattern = tableId != null ? tableId : "*";
+    final String pattern = "/tables/" + tablePattern + "/*/*";
+    final Collection<Volume> vols = context.getVolumeManager().getVolumes();
+    final ExecutorService svc = Executors.newFixedThreadPool(vols.size());
+    final Set<Path> matches = new ConcurrentSkipListSet<>();
+    final List<Future<Void>> futures = new ArrayList<>(vols.size());
+    for (Volume vol : vols) {
+      final Path volPattern = new Path(vol.getBasePath() + pattern);
+      LOG.trace("Looking for tmp files that match pattern: {}", volPattern);
+      futures.add(svc.submit(() -> {
+        try {
+          FileStatus[] files = vol.getFileSystem().globStatus(volPattern,
+              (p) -> p.getName().contains("_tmp_" + ExternalCompactionId.PREFIX));
+          Arrays.stream(files).forEach(fs -> matches.add(fs.getPath()));
+        } catch (IOException e) {
+          LOG.error("Error looking for tmp files in volume: {}", vol, e);
+        }
+        return null;
+      }));
+    }
+    svc.shutdown();
+
+    while (futures.size() > 0) {
+      Iterator<Future<Void>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Void> future = iter.next();
+        if (future.isDone()) {
+          iter.remove();
+          try {
+            future.get();
+          } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Error getting list of tmp files", e);
+          }
+        }
+      }
+      if (futures.size() > 0) {
+        UtilWaitThread.sleep(3_000);
+      }
+    }
+    svc.awaitTermination(10, TimeUnit.MINUTES);
+    LOG.trace("Found compaction tmp files: {}", matches);
+
+    // Remove paths of all active external compaction output files from the set of
+    // tmp files found on the filesystem. This must be done *after* gathering the
+    // matches on the filesystem.
+    for (DataLevel level : DataLevel.values()) {
+      context.getAmple().readTablets().forLevel(level).fetch(ColumnType.ECOMP).build()
+          .forEach(tm -> {
+            tm.getExternalCompactions().values()
+                .forEach(ecm -> matches.remove(ecm.getCompactTmpName().getPath()));
+          });
+    }
+    LOG.trace("Final set of compaction tmp files after removing active compactions: {}", matches);
+    return matches;
+  }
+
+  public static class DeleteStats {
+    public int success = 0;
+    public int failure = 0;
+    public int error = 0;
+  }
+
+  public static DeleteStats deleteTempFiles(ServerContext context, Set<Path> filesToDelete)
+      throws InterruptedException {
+
+    final ExecutorService delSvc = Executors.newFixedThreadPool(8);
+    final List<Future<Boolean>> futures = new ArrayList<>(filesToDelete.size());
+    final DeleteStats stats = new DeleteStats();
+
+    filesToDelete.forEach(p -> {
+      futures.add(delSvc.submit(() -> {
+        if (context.getVolumeManager().exists(p)) {
+          return context.getVolumeManager().delete(p);
+        }
+        return true;
+      }));
+    });
+    delSvc.shutdown();
+
+    int expectedResponses = filesToDelete.size();
+    while (expectedResponses > 0) {
+      Iterator<Future<Boolean>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Boolean> future = iter.next();
+        if (future.isDone()) {
+          expectedResponses--;
+          iter.remove();
+          try {
+            if (future.get()) {
+              stats.success++;
+            } else {
+              stats.failure++;
+            }
+          } catch (ExecutionException e) {
+            stats.error++;
+            LOG.error("Error deleting a compaction tmp file", e);
+          }
+        }
+      }
+      LOG.debug("Waiting on {} background delete operations", expectedResponses);
+      if (expectedResponses > 0) {
+        UtilWaitThread.sleep(3_000);
+      }
+    }
+    delSvc.awaitTermination(10, TimeUnit.MINUTES);
+    return stats;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(FindCompactionTmpFiles.class.getName(), args);
+    LOG.info("Looking for compaction tmp files over tables: {}, deleting: {}", opts.tables,
+        opts.delete);
+
+    Span span = TraceUtil.startSpan(FindCompactionTmpFiles.class, "main");
+    try (Scope scope = span.makeCurrent()) {
+
+      ServerContext context = opts.getServerContext();
+      String[] tables = opts.tables.split(",");
+
+      for (String table : tables) {
+
+        table = table.trim();
+        String tableId = context.tableOperations().tableIdMap().get(table);
+        if (tableId == null || tableId.isEmpty()) {
+          LOG.warn("TableId for table: {} does not exist, maybe the table was deleted?", table);
+          continue;
+        }
+
+        final Set<Path> matches = findTempFiles(context, tableId);
+        LOG.info("Found the following compaction tmp files for table {}:", table);
+        matches.forEach(p -> LOG.info("{}", p));
+
+        if (opts.delete) {
+          LOG.info("Deleting compaction tmp files for table {}...", table);
+          DeleteStats stats = deleteTempFiles(context, matches);
+          LOG.info(
+              "Deletion of compaction tmp files for table {} complete. Success:{}, Failure:{}, Error:{}",
+              table, stats.success, stats.failure, stats.error);
+        }
+
+      }
+
+    } finally {
+      span.end();
+    }
+  }
+
+}
diff --git a/server/base/src/test/java/org/apache/accumulo/server/tablets/TabletNameGeneratorTest.java b/server/base/src/test/java/org/apache/accumulo/server/tablets/TabletNameGeneratorTest.java
new file mode 100644
index 0000000000..644939d68b
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/tablets/TabletNameGeneratorTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.tablets;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Consumer;
+
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.InstanceId;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.ReferencedTabletFile;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.util.cache.Caches;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.hadoop.io.Text;
+import org.easymock.EasyMock;
+import org.junit.jupiter.api.Test;
+
+public class TabletNameGeneratorTest {
+
+  @Test
+  public void testGetNextDataFilenameForMajc() {
+
+    String instanceId = UUID.randomUUID().toString();
+    String baseUri = "hdfs://localhost:8000/accumulo/" + instanceId;
+    TableId tid = TableId.of("1");
+    KeyExtent ke1 = new KeyExtent(tid, new Text("b"), new Text("a"));
+    String dirName = "t-000001";
+
+    ConfigurationCopy conf = new ConfigurationCopy();
+
+    TableConfiguration tableConf = EasyMock.createMock(TableConfiguration.class);
+    EasyMock.expect(tableConf.get(Property.TABLE_FILE_TYPE)).andReturn("rf");
+
+    VolumeManager vm = EasyMock.createMock(VolumeManager.class);
+    EasyMock.expect(
+        vm.choose(EasyMock.isA(VolumeChooserEnvironmentImpl.class), EasyMock.eq(Set.of(baseUri))))
+        .andReturn(baseUri);
+
+    UniqueNameAllocator allocator = EasyMock.createMock(UniqueNameAllocator.class);
+    EasyMock.expect(allocator.getNextName()).andReturn("NextFileName");
+
+    ServerContext context = EasyMock.createMock(ServerContext.class);
+    EasyMock.expect(context.getInstanceID()).andReturn(InstanceId.of(instanceId)).anyTimes();
+    EasyMock.expect(context.getConfiguration()).andReturn(conf);
+    EasyMock.expect(context.getTableConfiguration(tid)).andReturn(tableConf);
+    EasyMock.expect(context.getCaches()).andReturn(Caches.getInstance());
+    EasyMock.expect(context.getVolumeManager()).andReturn(vm);
+    EasyMock.expect(context.getBaseUris()).andReturn(Set.of(baseUri));
+    EasyMock.expect(context.getUniqueNameAllocator()).andReturn(allocator);
+
+    TabletMetadata tm1 = EasyMock.createMock(TabletMetadata.class);
+    EasyMock.expect(tm1.getExtent()).andReturn(ke1);
+    EasyMock.expect(tm1.getDirName()).andReturn(dirName);
+
+    Consumer<String> dirCreator = (dir) -> {};
+    ExternalCompactionId ecid = ExternalCompactionId.generate(UUID.randomUUID());
+
+    EasyMock.replay(tableConf, vm, allocator, context, tm1);
+
+    ReferencedTabletFile rtf =
+        TabletNameGenerator.getNextDataFilenameForMajc(false, context, tm1, dirCreator, ecid);
+    assertEquals("ANextFileName.rf_tmp_" + ecid.canonical(), rtf.getFileName());
+
+    EasyMock.verify(tableConf, vm, allocator, context, tm1);
+
+  }
+}
diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index ebe5004d24..734f7c88c3 100644
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -19,7 +19,6 @@
 package org.apache.accumulo.compactor;
 
 import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
-import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
 
 import java.io.IOException;
@@ -125,7 +124,6 @@ import io.micrometer.core.instrument.MeterRegistry;
 public class Compactor extends AbstractServer implements MetricsProducer, CompactorService.Iface {
 
   private static final Logger LOG = LoggerFactory.getLogger(Compactor.class);
-  private static final long TIME_BETWEEN_CANCEL_CHECKS = MINUTES.toMillis(5);
 
   private static final long TEN_MEGABYTES = 10485760;
 
@@ -158,7 +156,8 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac
     watcher = new CompactionWatcher(aconf);
     var schedExecutor =
         ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(aconf);
-    startCancelChecker(schedExecutor, TIME_BETWEEN_CANCEL_CHECKS);
+    startCancelChecker(schedExecutor,
+        aconf.getTimeInMillis(Property.COMPACTOR_CANCEL_CHECK_INTERVAL));
     printStartupMsg();
   }
 
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 887b80ca86..1f0c05eb82 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -41,6 +41,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
@@ -84,11 +85,13 @@ import org.apache.accumulo.core.metadata.StoredTabletFile;
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.Ample.Refreshes.RefreshEntry;
+import org.apache.accumulo.core.metadata.schema.Ample.RejectionHandler;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata;
 import org.apache.accumulo.core.metadata.schema.SelectedFiles;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
 import org.apache.accumulo.core.spi.compaction.CompactionJob;
@@ -106,6 +109,7 @@ import org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl;
 import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.core.util.compaction.RunningCompaction;
 import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.core.volume.Volume;
 import org.apache.accumulo.manager.EventCoordinator;
 import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues;
 import org.apache.accumulo.manager.tableOps.bulkVer2.TabletRefresher;
@@ -116,6 +120,7 @@ import org.apache.accumulo.server.manager.LiveTServerSet;
 import org.apache.accumulo.server.security.SecurityOperation;
 import org.apache.accumulo.server.tablets.TabletNameGenerator;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.thrift.TException;
 import org.apache.zookeeper.KeeperException;
@@ -171,6 +176,7 @@ public class CompactionCoordinator implements CompactionCoordinatorService.Iface
   private final Cache<ExternalCompactionId,RunningCompaction> completed;
   private LoadingCache<Long,CompactionConfig> compactionConfigCache;
   private final Cache<Path,Integer> checked_tablet_dir_cache;
+  private final DeadCompactionDetector deadCompactionDetector;
 
   public CompactionCoordinator(ServerContext ctx, LiveTServerSet tservers,
       SecurityOperation security, CompactionJobQueues jobQueues,
@@ -209,6 +215,7 @@ public class CompactionCoordinator implements CompactionCoordinatorService.Iface
         ctx.getCaches().createNewBuilder(CacheName.COMPACTION_DIR_CACHE, true)
             .maximumWeight(10485760L).weigher(weigher).build();
 
+    deadCompactionDetector = new DeadCompactionDetector(this.ctx, this, schedExecutor);
     // At this point the manager does not have its lock so no actions should be taken yet
   }
 
@@ -335,7 +342,7 @@ public class CompactionCoordinator implements CompactionCoordinatorService.Iface
   }
 
   protected void startDeadCompactionDetector() {
-    new DeadCompactionDetector(this.ctx, this, schedExecutor).start();
+    deadCompactionDetector.start();
   }
 
   protected long getMissingCompactorWarningTime() {
@@ -405,7 +412,8 @@ public class CompactionCoordinator implements CompactionCoordinatorService.Iface
       // config is deleted.
       if (kind == CompactionKind.SYSTEM
           || (kind == CompactionKind.USER && compactionConfig.isPresent())) {
-        ecm = reserveCompaction(metaJob, compactorAddress, externalCompactionId);
+        ecm = reserveCompaction(metaJob, compactorAddress,
+            ExternalCompactionId.from(externalCompactionId));
       }
 
       if (ecm != null) {
@@ -506,7 +514,8 @@ public class CompactionCoordinator implements CompactionCoordinatorService.Iface
   }
 
   private ExternalCompactionMetadata createExternalCompactionMetadata(CompactionJob job,
-      Set<StoredTabletFile> jobFiles, TabletMetadata tablet, String compactorAddress) {
+      Set<StoredTabletFile> jobFiles, TabletMetadata tablet, String compactorAddress,
+      ExternalCompactionId externalCompactionId) {
     boolean propDels;
 
     Long fateTxId = null;
@@ -530,8 +539,8 @@ public class CompactionCoordinator implements CompactionCoordinatorService.Iface
     }
 
     Consumer<String> directoryCreator = dir -> checkTabletDir(tablet.getExtent(), new Path(dir));
-    ReferencedTabletFile newFile =
-        TabletNameGenerator.getNextDataFilenameForMajc(propDels, ctx, tablet, directoryCreator);
+    ReferencedTabletFile newFile = TabletNameGenerator.getNextDataFilenameForMajc(propDels, ctx,
+        tablet, directoryCreator, externalCompactionId);
 
     return new ExternalCompactionMetadata(jobFiles, newFile, compactorAddress, job.getKind(),
         job.getPriority(), job.getExecutor(), propDels, fateTxId);
@@ -539,7 +548,7 @@ public class CompactionCoordinator implements CompactionCoordinatorService.Iface
   }
 
   private ExternalCompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob metaJob,
-      String compactorAddress, String externalCompactionId) {
+      String compactorAddress, ExternalCompactionId externalCompactionId) {
 
     Preconditions.checkArgument(metaJob.getJob().getKind() == CompactionKind.SYSTEM
         || metaJob.getJob().getKind() == CompactionKind.USER);
@@ -562,16 +571,15 @@ public class CompactionCoordinator implements CompactionCoordinatorService.Iface
         }
 
         var ecm = createExternalCompactionMetadata(metaJob.getJob(), jobFiles, tabletMetadata,
-            compactorAddress);
+            compactorAddress, externalCompactionId);
 
         // any data that is read from the tablet to make a decision about if it can compact or not
         // must be included in the requireSame call
         var tabletMutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation()
             .requireSame(tabletMetadata, FILES, SELECTED, ECOMP);
 
-        var ecid = ExternalCompactionId.of(externalCompactionId);
-        tabletMutator.putExternalCompaction(ecid, ecm);
-        tabletMutator.submit(tm -> tm.getExternalCompactions().containsKey(ecid));
+        tabletMutator.putExternalCompaction(externalCompactionId, ecm);
+        tabletMutator.submit(tm -> tm.getExternalCompactions().containsKey(externalCompactionId));
 
         var result = tabletsMutator.process().get(extent);
 
@@ -1041,10 +1049,6 @@ public class CompactionCoordinator implements CompactionCoordinatorService.Iface
     LOG.info("Compaction failed, id: {}", externalCompactionId);
     final var ecid = ExternalCompactionId.of(externalCompactionId);
     compactionFailed(Map.of(ecid, KeyExtent.fromThrift(extent)));
-
-    // ELASTICITIY_TODO need to open an issue about making the GC clean up tmp files. The tablet
-    // currently cleans up tmp files on tablet load. With tablets never loading possibly but still
-    // compacting dying compactors may still leave tmp files behind.
   }
 
   void compactionFailed(Map<ExternalCompactionId,KeyExtent> compactions) {
@@ -1054,16 +1058,30 @@ public class CompactionCoordinator implements CompactionCoordinatorService.Iface
         try {
           ctx.requireNotDeleted(extent.tableId());
           tabletsMutator.mutateTablet(extent).requireAbsentOperation().requireCompaction(ecid)
-              .deleteExternalCompaction(ecid)
-              .submit(tabletMetadata -> !tabletMetadata.getExternalCompactions().containsKey(ecid));
+              .deleteExternalCompaction(ecid).submit(new RejectionHandler() {
+
+                @Override
+                public boolean callWhenTabletDoesNotExists() {
+                  return true;
+                }
+
+                @Override
+                public boolean test(TabletMetadata tabletMetadata) {
+                  return tabletMetadata == null
+                      || !tabletMetadata.getExternalCompactions().containsKey(ecid);
+                }
+
+              });
         } catch (TableDeletedException e) {
           LOG.warn("Table {} was deleted, unable to update metadata for compaction failure.",
               extent.tableId());
         }
       });
 
+      final List<ExternalCompactionId> ecidsForTablet = new ArrayList<>();
       tabletsMutator.process().forEach((extent, result) -> {
         if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) {
+
           // this should try again later when the dead compaction detector runs, lets log it in case
           // its a persistent problem
           if (LOG.isDebugEnabled()) {
@@ -1072,6 +1090,52 @@ public class CompactionCoordinator implements CompactionCoordinatorService.Iface
                     .findFirst().map(Map.Entry::getKey).orElse(null);
             LOG.debug("Unable to remove failed compaction {} {}", extent, ecid);
           }
+        } else {
+          // compactionFailed is called from the Compactor when either a compaction fails or
+          // is cancelled and it's called from the DeadCompactionDetector. This block is
+          // entered when the conditional mutator above successfully deletes an ecid from
+          // the tablet metadata. Remove compaction tmp files from the tablet directory
+          // that have a corresponding ecid in the name.
+
+          ecidsForTablet.clear();
+          compactions.entrySet().stream().filter(e -> e.getValue().compareTo(extent) == 0)
+              .map(Entry::getKey).forEach(ecidsForTablet::add);
+
+          if (!ecidsForTablet.isEmpty()) {
+            final TabletMetadata tm = ctx.getAmple().readTablet(extent, ColumnType.DIR);
+            if (tm != null) {
+              final Collection<Volume> vols = ctx.getVolumeManager().getVolumes();
+              for (Volume vol : vols) {
+                try {
+                  final String volPath =
+                      vol.getBasePath() + Constants.HDFS_TABLES_DIR + Path.SEPARATOR
+                          + extent.tableId().canonical() + Path.SEPARATOR + tm.getDirName();
+                  final FileSystem fs = vol.getFileSystem();
+                  for (ExternalCompactionId ecid : ecidsForTablet) {
+                    final String fileSuffix = "_tmp_" + ecid.canonical();
+                    FileStatus[] files = fs.listStatus(new Path(volPath), (path) -> {
+                      return path.getName().endsWith(fileSuffix);
+                    });
+                    if (files.length > 0) {
+                      for (FileStatus file : files) {
+                        if (!fs.delete(file.getPath(), false)) {
+                          LOG.warn("Unable to delete ecid tmp file: {}: ", file.getPath());
+                        } else {
+                          LOG.debug("Deleted ecid tmp file: {}", file.getPath());
+                        }
+                      }
+                    }
+                  }
+                } catch (IOException e) {
+                  LOG.error("Exception deleting compaction tmp files for tablet: {}", extent, e);
+                }
+              }
+            } else {
+              // TabletMetadata does not exist for the extent. This could be due to a merge or
+              // split operation. Use the utility to find tmp files at the table level
+              deadCompactionDetector.addTableId(extent.tableId());
+            }
+          }
         }
       });
     }
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
index 98540e1b93..2c653158ad 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.manager.compaction.coordinator;
 
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
@@ -29,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
@@ -36,6 +38,9 @@ import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.util.FindCompactionTmpFiles;
+import org.apache.accumulo.server.util.FindCompactionTmpFiles.DeleteStats;
+import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +52,7 @@ public class DeadCompactionDetector {
   private final CompactionCoordinator coordinator;
   private final ScheduledThreadPoolExecutor schedExecutor;
   private final ConcurrentHashMap<ExternalCompactionId,Long> deadCompactions;
+  private final Set<TableId> tablesWithUnreferencedTmpFiles = new HashSet<>();
 
   public DeadCompactionDetector(ServerContext context, CompactionCoordinator coordinator,
       ScheduledThreadPoolExecutor stpe) {
@@ -56,6 +62,12 @@ public class DeadCompactionDetector {
     this.deadCompactions = new ConcurrentHashMap<>();
   }
 
+  public void addTableId(TableId tableWithUnreferencedTmpFiles) {
+    synchronized (tablesWithUnreferencedTmpFiles) {
+      tablesWithUnreferencedTmpFiles.add(tableWithUnreferencedTmpFiles);
+    }
+  }
+
   private void detectDeadCompactions() {
 
     // The order of obtaining information is very important to avoid race conditions.
@@ -77,51 +89,82 @@ public class DeadCompactionDetector {
       log.trace("Clearing the dead compaction map, no tablets have compactions running");
       this.deadCompactions.clear();
       // no need to look for dead compactions when tablets don't have anything recorded as running
-      return;
-    }
+    } else {
+      if (log.isTraceEnabled()) {
+        tabletCompactions.forEach((ecid, extent) -> log.trace("Saw {} for {}", ecid, extent));
+      }
 
-    if (log.isTraceEnabled()) {
-      tabletCompactions.forEach((ecid, extent) -> log.trace("Saw {} for {}", ecid, extent));
+      // Remove from the dead map any compactions that the Tablet's
+      // do not think are running any more.
+      this.deadCompactions.keySet().retainAll(tabletCompactions.keySet());
+
+      // Determine what compactions are currently running and remove those.
+      //
+      // In order for this overall algorithm to be correct and avoid race conditions, the compactor
+      // must return ids covering the time period from before reservation until after commit. If the
+      // ids do not cover this time period then legitimate running compactions could be canceled.
+      Collection<ExternalCompactionId> running =
+          ExternalCompactionUtil.getCompactionIdsRunningOnCompactors(context);
+
+      running.forEach((ecid) -> {
+        if (tabletCompactions.remove(ecid) != null) {
+          log.debug("Ignoring compaction {} that is running on a compactor", ecid);
+        }
+        if (this.deadCompactions.remove(ecid) != null) {
+          log.debug("Removed {} from the dead compaction map, it's running on a compactor", ecid);
+        }
+      });
+
+      tabletCompactions.forEach((ecid, extent) -> {
+        log.info("Possible dead compaction detected {} {}", ecid, extent);
+        this.deadCompactions.merge(ecid, 1L, Long::sum);
+      });
+
+      // Everything left in tabletCompactions is no longer running anywhere and should be failed.
+      // Its possible that a compaction committed while going through the steps above, if so then
+      // that is ok and marking it failed will end up being a no-op.
+      Set<ExternalCompactionId> toFail =
+          this.deadCompactions.entrySet().stream().filter(e -> e.getValue() > 2)
+              .map(e -> e.getKey()).collect(Collectors.toCollection(TreeSet::new));
+      tabletCompactions.keySet().retainAll(toFail);
+      tabletCompactions.forEach((eci, v) -> {
+        log.warn("Compaction {} believed to be dead, failing it.", eci);
+      });
+      coordinator.compactionFailed(tabletCompactions);
+      this.deadCompactions.keySet().removeAll(toFail);
     }
 
-    // Remove from the dead map any compactions that the Tablet's
-    // do not think are running any more.
-    this.deadCompactions.keySet().retainAll(tabletCompactions.keySet());
-
-    // Determine what compactions are currently running and remove those.
-    //
-    // In order for this overall algorithm to be correct and avoid race conditions, the compactor
-    // must return ids covering the time period from before reservation until after commit. If the
-    // ids do not cover this time period then legitimate running compactions could be canceled.
-    Collection<ExternalCompactionId> running =
-        ExternalCompactionUtil.getCompactionIdsRunningOnCompactors(context);
-
-    running.forEach((ecid) -> {
-      if (tabletCompactions.remove(ecid) != null) {
-        log.debug("Ignoring compaction {} that is running on a compactor", ecid);
+    // Find and delete any known tables that have unreferenced
+    // compaction tmp files.
+    if (!tablesWithUnreferencedTmpFiles.isEmpty()) {
+
+      Set<TableId> copy = new HashSet<>();
+      synchronized (tablesWithUnreferencedTmpFiles) {
+        copy.addAll(tablesWithUnreferencedTmpFiles);
+        tablesWithUnreferencedTmpFiles.clear();
       }
-      if (this.deadCompactions.remove(ecid) != null) {
-        log.debug("Removed {} from the dead compaction map, it's running on a compactor", ecid);
+
+      log.debug("Tables that may have unreferenced compaction tmp files: {}", copy);
+      for (TableId tid : copy) {
+        try {
+          final Set<Path> matches = FindCompactionTmpFiles.findTempFiles(context, tid.canonical());
+          log.debug("Found the following compaction tmp files for table {}:", tid);
+          matches.forEach(p -> log.debug("{}", p));
+
+          if (!matches.isEmpty()) {
+            log.debug("Deleting compaction tmp files for table {}...", tid);
+            DeleteStats stats = FindCompactionTmpFiles.deleteTempFiles(context, matches);
+            log.debug(
+                "Deletion of compaction tmp files for table {} complete. Success:{}, Failure:{}, Error:{}",
+                tid, stats.success, stats.failure, stats.error);
+          }
+        } catch (InterruptedException e) {
+          log.error("Interrupted while finding compaction tmp files for table: {}", tid.canonical(),
+              e);
+        }
       }
-    });
-
-    tabletCompactions.forEach((ecid, extent) -> {
-      log.info("Possible dead compaction detected {} {}", ecid, extent);
-      this.deadCompactions.merge(ecid, 1L, Long::sum);
-    });
-
-    // Everything left in tabletCompactions is no longer running anywhere and should be failed.
-    // Its possible that a compaction committed while going through the steps above, if so then
-    // that is ok and marking it failed will end up being a no-op.
-    Set<ExternalCompactionId> toFail =
-        this.deadCompactions.entrySet().stream().filter(e -> e.getValue() > 2).map(e -> e.getKey())
-            .collect(Collectors.toCollection(TreeSet::new));
-    tabletCompactions.keySet().retainAll(toFail);
-    tabletCompactions.forEach((eci, v) -> {
-      log.warn("Compaction {} believed to be dead, failing it.", eci);
-    });
-    coordinator.compactionFailed(tabletCompactions);
-    this.deadCompactions.keySet().removeAll(toFail);
+    }
+
   }
 
   public void start() {
diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
index 6b014c0f21..dfbe5736ab 100644
--- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
+++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java
@@ -230,6 +230,7 @@ public class ExternalCompactionTestUtils {
     cfg.setProperty(Property.COMPACTION_COORDINATOR_FINALIZER_COMPLETION_CHECK_INTERVAL, "5s");
     cfg.setProperty(Property.COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL, "5s");
     cfg.setProperty(Property.COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL, "3s");
+    cfg.setProperty(Property.COMPACTOR_CANCEL_CHECK_INTERVAL, "5s");
     cfg.setProperty(Property.COMPACTOR_PORTSEARCH, "true");
     cfg.setProperty(Property.COMPACTOR_MIN_JOB_WAIT_TIME, "100ms");
     cfg.setProperty(Property.COMPACTOR_MAX_JOB_WAIT_TIME, "1s");
diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
index bc763fc5f8..5d3d7b18fe 100644
--- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
+++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
@@ -79,7 +79,9 @@ import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.util.FindCompactionTmpFiles;
 import org.apache.accumulo.test.functional.CompactionIT.ErrorThrowingSelector;
+import org.apache.accumulo.test.util.Wait;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.BeforeAll;
@@ -227,6 +229,10 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase {
 
       assertFalse(ecids.isEmpty());
 
+      // Verify that a tmp file is created
+      Wait.waitFor(() -> FindCompactionTmpFiles
+          .findTempFiles(getCluster().getServerContext(), tid.canonical()).size() == 1);
+
       // Kill the compactor
       getCluster().getClusterControl().stop(ServerType.COMPACTOR);
 
@@ -235,6 +241,10 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase {
       ExternalCompactionTestUtils.waitForRunningCompactions(getCluster().getServerContext(), tid,
           ecids);
 
+      // Verify that the tmp file are cleaned up
+      Wait.waitFor(() -> FindCompactionTmpFiles
+          .findTempFiles(getCluster().getServerContext(), tid.canonical()).size() == 0);
+
       // If the compaction actually ran it would have filtered data, so lets make sure all the data
       // written is there. This check provides evidence the compaction did not run.
       verify(client, table1, 1);
diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java
index f532451c18..8a18b0bab3 100644
--- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java
+++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java
@@ -62,6 +62,8 @@ import org.apache.accumulo.harness.SharedMiniClusterBase;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.util.FindCompactionTmpFiles;
+import org.apache.accumulo.test.util.Wait;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.BeforeAll;
@@ -106,6 +108,10 @@ public class ExternalCompaction_2_IT extends SharedMiniClusterBase {
           .confirmCompactionRunning(getCluster().getServerContext(), ecids);
       assertTrue(matches > 0);
 
+      // Verify that a tmp file is created
+      Wait.waitFor(() -> FindCompactionTmpFiles
+          .findTempFiles(getCluster().getServerContext(), tid.canonical()).size() == 1);
+
       // ExternalDoNothingCompactor will not compact, it will wait, split the table.
       SortedSet<Text> splits = new TreeSet<>();
       int jump = MAX_DATA / 5;
@@ -130,6 +136,10 @@ public class ExternalCompaction_2_IT extends SharedMiniClusterBase {
       // compaction above in the test. Even though the external compaction was cancelled
       // because we split the table, FaTE will continue to queue up a compaction
       client.tableOperations().cancelCompaction(table1);
+
+      // Verify that the tmp file are cleaned up
+      Wait.waitFor(() -> FindCompactionTmpFiles
+          .findTempFiles(getCluster().getServerContext(), tid.canonical()).size() == 0, 60_000);
     }
   }
 
@@ -163,6 +173,10 @@ public class ExternalCompaction_2_IT extends SharedMiniClusterBase {
           .confirmCompactionRunning(getCluster().getServerContext(), ecids);
       assertTrue(matches > 0);
 
+      // Verify that a tmp file is created
+      Wait.waitFor(() -> FindCompactionTmpFiles
+          .findTempFiles(getCluster().getServerContext(), tid.canonical()).size() == 1);
+
       // when the compaction starts it will create a selected files column in the tablet, wait for
       // that to happen
       while (countTablets(getCluster().getServerContext(), table1,
@@ -185,6 +199,10 @@ public class ExternalCompaction_2_IT extends SharedMiniClusterBase {
           tm -> tm.getSelectedFiles() != null || !tm.getCompacted().isEmpty()) > 0) {
         Thread.sleep(1000);
       }
+
+      // Verify that the tmp file are cleaned up
+      Wait.waitFor(() -> FindCompactionTmpFiles
+          .findTempFiles(getCluster().getServerContext(), tid.canonical()).size() == 0);
     }
   }
 
diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java
index 2860b1aea4..b1d7128b59 100644
--- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java
+++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_3_IT.java
@@ -53,6 +53,8 @@ import org.apache.accumulo.harness.SharedMiniClusterBase;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.util.FindCompactionTmpFiles;
+import org.apache.accumulo.test.util.Wait;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.thrift.TException;
@@ -109,6 +111,10 @@ public class ExternalCompaction_3_IT extends SharedMiniClusterBase {
         assertEquals(2, md.size());
       }
 
+      // Verify that a tmp file is created
+      Wait.waitFor(() -> FindCompactionTmpFiles
+          .findTempFiles(getCluster().getServerContext(), tid.canonical()).size() == 1);
+
       // Merge - blocking operation
       Text start = md.get(0).getPrevEndRow();
       Text end = md.get(1).getEndRow();
@@ -133,6 +139,10 @@ public class ExternalCompaction_3_IT extends SharedMiniClusterBase {
       // compaction above in the test. Even though the external compaction was cancelled
       // because we split the table, FaTE will continue to queue up a compaction
       client.tableOperations().delete(table1);
+
+      // Verify that the tmp file are cleaned up
+      Wait.waitFor(() -> FindCompactionTmpFiles
+          .findTempFiles(getCluster().getServerContext(), tid.canonical()).size() == 0);
     }
   }
 
diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
index 14b3329faa..61884cab62 100644
--- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
+++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java
@@ -30,9 +30,15 @@ import org.apache.accumulo.core.cli.ConfigOpts;
 import org.apache.accumulo.core.compaction.thrift.CompactorService.Iface;
 import org.apache.accumulo.core.compaction.thrift.TCompactionState;
 import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.ReferencedTabletFile;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.compaction.FileCompactor.CompactionCanceledException;
+import org.apache.accumulo.server.tablets.TabletNameGenerator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,6 +73,15 @@ public class ExternalDoNothingCompactor extends Compactor implements Iface {
         update.setMessage("Compaction started");
         updateCompactionState(job, update);
 
+        // Create tmp output file
+        final TabletMetadata tm = getContext().getAmple()
+            .readTablet(KeyExtent.fromThrift(job.getExtent()), ColumnType.DIR);
+        ReferencedTabletFile newFile =
+            TabletNameGenerator.getNextDataFilenameForMajc(job.isPropagateDeletes(), getContext(),
+                tm, (dir) -> {}, ExternalCompactionId.from(job.getExternalCompactionId()));
+        LOG.info("Creating tmp file: {}", newFile.getPath());
+        getContext().getVolumeManager().createNewFile(newFile.getPath());
+
         LOG.info("Starting compactor");
         started.countDown();
 
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FindCompactionTmpFilesIT.java b/test/src/main/java/org/apache/accumulo/test/functional/FindCompactionTmpFilesIT.java
new file mode 100644
index 0000000000..7946ea0a12
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/FindCompactionTmpFilesIT.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.metadata.ReferencedTabletFile;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.tablets.TabletNameGenerator;
+import org.apache.accumulo.server.util.FindCompactionTmpFiles;
+import org.apache.accumulo.server.util.FindCompactionTmpFiles.DeleteStats;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class FindCompactionTmpFilesIT extends SharedMiniClusterBase {
+
+  @BeforeAll
+  public static void before() throws Exception {
+    startMiniCluster();
+  }
+
+  @AfterAll
+  public static void after() throws Exception {
+    stopMiniCluster();
+  }
+
+  private Set<Path> generateTmpFilePaths(ServerContext context, TableId tid, Path tabletDir,
+      int numFiles) {
+    final Set<Path> paths = new HashSet<>(numFiles);
+    final TabletsMetadata tms = context.getAmple().readTablets().forTable(tid).build();
+    final TabletMetadata tm = tms.iterator().next();
+
+    for (int i = 0; i < numFiles; i++) {
+      ReferencedTabletFile rtf = TabletNameGenerator.getNextDataFilenameForMajc(false, context, tm,
+          (s) -> {}, ExternalCompactionId.generate(UUID.randomUUID()));
+      paths.add(rtf.getPath());
+    }
+    return paths;
+  }
+
+  @Test
+  public void testFindCompactionTmpFiles() throws Exception {
+
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+
+      String tableName = getUniqueNames(1)[0];
+      c.tableOperations().create(tableName);
+      ReadWriteIT.ingest(c, 100, 1, 1, 0, tableName);
+      c.tableOperations().flush(tableName);
+
+      String tableId = c.tableOperations().tableIdMap().get(tableName);
+      TableId tid = TableId.of(tableId);
+
+      ServerContext ctx = getCluster().getServerContext();
+      FileSystem fs = getCluster().getFileSystem();
+
+      Set<String> tablesDirs = ctx.getTablesDirs();
+      assertEquals(1, tablesDirs.size());
+
+      String tdir = tablesDirs.iterator().next() + "/" + tid.canonical() + "/default_tablet";
+      Path defaultTabletPath = new Path(tdir);
+      assertTrue(fs.exists(defaultTabletPath));
+
+      assertEquals(0, FindCompactionTmpFiles.findTempFiles(ctx, tid.canonical()).size());
+
+      Set<Path> generatedPaths = generateTmpFilePaths(ctx, tid, defaultTabletPath, 100);
+
+      for (Path p : generatedPaths) {
+        assertFalse(fs.exists(p));
+        assertTrue(fs.createNewFile(p));
+        assertTrue(fs.exists(p));
+      }
+
+      Set<Path> foundPaths = FindCompactionTmpFiles.findTempFiles(ctx, tid.canonical());
+      assertEquals(100, foundPaths.size());
+      assertEquals(foundPaths, generatedPaths);
+
+      DeleteStats stats = FindCompactionTmpFiles.deleteTempFiles(ctx, foundPaths);
+      assertEquals(100, stats.success);
+      assertEquals(0, stats.failure);
+      assertEquals(0, stats.error);
+
+      foundPaths = FindCompactionTmpFiles.findTempFiles(ctx, tid.canonical());
+      assertEquals(0, foundPaths.size());
+
+    }
+  }
+
+  @Test
+  public void testFindCompactionTmpFilesMainNoDelete() throws Exception {
+
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+
+      String tableName = getUniqueNames(1)[0];
+      c.tableOperations().create(tableName);
+      ReadWriteIT.ingest(c, 100, 1, 1, 0, tableName);
+      c.tableOperations().flush(tableName);
+
+      String tableId = c.tableOperations().tableIdMap().get(tableName);
+      TableId tid = TableId.of(tableId);
+
+      ServerContext ctx = getCluster().getServerContext();
+      FileSystem fs = getCluster().getFileSystem();
+
+      Set<String> tablesDirs = ctx.getTablesDirs();
+      assertEquals(1, tablesDirs.size());
+
+      String tdir = tablesDirs.iterator().next() + "/" + tid.canonical() + "/default_tablet";
+      Path defaultTabletPath = new Path(tdir);
+      assertTrue(fs.exists(defaultTabletPath));
+
+      assertEquals(0, FindCompactionTmpFiles.findTempFiles(ctx, tid.canonical()).size());
+
+      Set<Path> generatedPaths = generateTmpFilePaths(ctx, tid, defaultTabletPath, 100);
+
+      for (Path p : generatedPaths) {
+        assertFalse(fs.exists(p));
+        assertTrue(fs.createNewFile(p));
+        assertTrue(fs.exists(p));
+      }
+
+      Set<Path> foundPaths = FindCompactionTmpFiles.findTempFiles(ctx, tid.canonical());
+      assertEquals(100, foundPaths.size());
+      assertEquals(foundPaths, generatedPaths);
+
+      System.setProperty("accumulo.properties",
+          "file://" + getCluster().getAccumuloPropertiesPath());
+      FindCompactionTmpFiles.main(new String[] {"--tables", tableName});
+
+      foundPaths = FindCompactionTmpFiles.findTempFiles(ctx, tid.canonical());
+      assertEquals(100, foundPaths.size());
+      assertEquals(foundPaths, generatedPaths);
+
+    }
+  }
+
+  @Test
+  public void testFindCompactionTmpFilesMainWithDelete() throws Exception {
+
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+
+      String tableName = getUniqueNames(1)[0];
+      c.tableOperations().create(tableName);
+      ReadWriteIT.ingest(c, 100, 1, 1, 0, tableName);
+      c.tableOperations().flush(tableName);
+
+      String tableId = c.tableOperations().tableIdMap().get(tableName);
+      TableId tid = TableId.of(tableId);
+
+      ServerContext ctx = getCluster().getServerContext();
+      FileSystem fs = getCluster().getFileSystem();
+
+      Set<String> tablesDirs = ctx.getTablesDirs();
+      assertEquals(1, tablesDirs.size());
+
+      String tdir = tablesDirs.iterator().next() + "/" + tid.canonical() + "/default_tablet";
+      Path defaultTabletPath = new Path(tdir);
+      assertTrue(fs.exists(defaultTabletPath));
+
+      assertEquals(0, FindCompactionTmpFiles.findTempFiles(ctx, tid.canonical()).size());
+
+      Set<Path> generatedPaths = generateTmpFilePaths(ctx, tid, defaultTabletPath, 100);
+
+      for (Path p : generatedPaths) {
+        assertFalse(fs.exists(p));
+        assertTrue(fs.createNewFile(p));
+        assertTrue(fs.exists(p));
+      }
+
+      Set<Path> foundPaths = FindCompactionTmpFiles.findTempFiles(ctx, tid.canonical());
+      assertEquals(100, foundPaths.size());
+      assertEquals(foundPaths, generatedPaths);
+
+      System.setProperty("accumulo.properties",
+          "file://" + getCluster().getAccumuloPropertiesPath());
+      FindCompactionTmpFiles.main(new String[] {"--tables", tableName, "--delete"});
+
+      foundPaths = FindCompactionTmpFiles.findTempFiles(ctx, tid.canonical());
+      assertEquals(0, foundPaths.size());
+
+    }
+  }
+
+}