You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dd...@apache.org on 2024/02/16 22:45:57 UTC

(accumulo) branch elasticity updated (3de72f63c4 -> 9f073b4838)

This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a change to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


    from 3de72f63c4 fixes FateStarvationIT (#4278)
     add 3298d6db89 Add Compaction Job Min & Max Wait properties (#4223)
     add 9f430a2695 Merge branch '2.1'
     add 4886e821ed Remove unused var added by merge
     new 9f073b4838 Merge branch 'main' into elasticity

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../main/java/org/apache/accumulo/core/conf/Property.java  |  4 ++--
 .../main/java/org/apache/accumulo/compactor/Compactor.java | 14 ++++++++------
 .../compaction/coordinator/CompactionCoordinator.java      |  3 +--
 3 files changed, 11 insertions(+), 10 deletions(-)


(accumulo) 01/01: Merge branch 'main' into elasticity

Posted by dd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 9f073b4838ee87847d51fc94878e6aeffae85a78
Merge: 3de72f63c4 4886e821ed
Author: Daniel Roberts <dd...@gmail.com>
AuthorDate: Fri Feb 16 22:39:39 2024 +0000

    Merge branch 'main' into elasticity

 .../main/java/org/apache/accumulo/core/conf/Property.java  |  4 ++--
 .../main/java/org/apache/accumulo/compactor/Compactor.java | 14 ++++++++------
 .../compaction/coordinator/CompactionCoordinator.java      |  3 +--
 3 files changed, 11 insertions(+), 10 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 22c2606c85,6cb1207f40..394b79746e
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@@ -1112,12 -1130,20 +1112,12 @@@ public enum Property 
    COMPACTOR_MIN_JOB_WAIT_TIME("compactor.wait.time.job.min", "1s", PropertyType.TIMEDURATION,
        "The minimum amount of time to wait between checks for the next compaction job, backing off"
            + "exponentially until COMPACTOR_MAX_JOB_WAIT_TIME is reached.",
-       "4.0.0"),
+       "2.1.3"),
 -  @Experimental
    COMPACTOR_MAX_JOB_WAIT_TIME("compactor.wait.time.job.max", "5m", PropertyType.TIMEDURATION,
        "Compactors do exponential backoff when their request for work repeatedly come back empty. "
            + "This is the maximum amount of time to wait between checks for the next compaction job.",
-       "4.0.0"),
+       "2.1.3"),
    @Experimental
 -  COMPACTOR_PORTSEARCH("compactor.port.search", "false", PropertyType.BOOLEAN,
 -      "If the compactor.port.client is in use, search higher ports until one is available.",
 -      "2.1.0"),
 -  @Experimental
 -  COMPACTOR_CLIENTPORT("compactor.port.client", "9133", PropertyType.PORT,
 -      "The port used for handling client connections on the compactor servers.", "2.1.0"),
 -  @Experimental
    COMPACTOR_MINTHREADS("compactor.threads.minimum", "1", PropertyType.COUNT,
        "The minimum number of threads to use to handle incoming requests.", "2.1.0"),
    @Experimental
diff --cc server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 1a1c2ece58,b81c30dee3..114dd59519
--- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@@ -589,15 -582,16 +589,17 @@@ public class Compactor extends Abstract
    }
  
    protected long getWaitTimeBetweenCompactionChecks() {
 -    // get the total number of compactors assigned to this queue
 -    int numCompactors = ExternalCompactionUtil.countCompactors(queueName, getContext());
 +    // get the total number of compactors assigned to this group
 +    int numCompactors =
 +        ExternalCompactionUtil.countCompactors(this.getResourceGroup(), getContext());
-     // Aim for around 3 compactors checking in every second
-     long sleepTime = numCompactors * 1000L / 3;
-     // Ensure a compactor sleeps at least around a second
-     sleepTime = Math.max(1000, sleepTime);
-     // Ensure a compactor sleep not too much more than 5 mins
-     sleepTime = Math.min(300_000L, sleepTime);
+     long minWait = getConfiguration().getTimeInMillis(Property.COMPACTOR_MIN_JOB_WAIT_TIME);
+     // Aim for around 3 compactors checking in per min wait time.
+     long sleepTime = numCompactors * minWait / 3;
+     // Ensure a compactor waits at least the minimum time
+     sleepTime = Math.max(minWait, sleepTime);
+     // Ensure a sleeping compactor has a configurable max sleep time
+     sleepTime = Math.min(getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME),
+         sleepTime);
      // Add some random jitter to the sleep time, that averages out to sleep time. This will spread
      // compactors out evenly over time.
      sleepTime = (long) (.9 * sleepTime + sleepTime * .2 * RANDOM.get().nextDouble());
diff --cc server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index fdbf009352,0000000000..bf2e9009f3
mode 100644,000000..100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@@ -1,1012 -1,0 +1,1011 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.manager.compaction.coordinator;
 +
 +import static java.util.concurrent.TimeUnit.MILLISECONDS;
 +import static java.util.concurrent.TimeUnit.MINUTES;
 +import static java.util.concurrent.TimeUnit.SECONDS;
 +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED;
 +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP;
 +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
 +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
 +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
 +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED;
 +
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.io.UncheckedIOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Optional;
 +import java.util.Set;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ScheduledFuture;
 +import java.util.concurrent.ScheduledThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicReference;
 +import java.util.function.Consumer;
 +import java.util.stream.Collectors;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.TableDeletedException;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.admin.CompactionConfig;
 +import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
 +import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
 +import org.apache.accumulo.core.clientImpl.thrift.TInfo;
 +import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
 +import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
 +import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
 +import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
 +import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
 +import org.apache.accumulo.core.compaction.thrift.TCompactionState;
 +import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
 +import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
 +import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.NamespaceId;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 +import org.apache.accumulo.core.fate.Fate;
 +import org.apache.accumulo.core.fate.FateId;
 +import org.apache.accumulo.core.fate.FateInstanceType;
 +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 +import org.apache.accumulo.core.iterators.user.HasExternalCompactionsFilter;
 +import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
 +import org.apache.accumulo.core.metadata.CompactableFileImpl;
 +import org.apache.accumulo.core.metadata.ReferencedTabletFile;
 +import org.apache.accumulo.core.metadata.StoredTabletFile;
 +import org.apache.accumulo.core.metadata.schema.Ample;
 +import org.apache.accumulo.core.metadata.schema.Ample.RejectionHandler;
 +import org.apache.accumulo.core.metadata.schema.CompactionMetadata;
 +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 +import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 +import org.apache.accumulo.core.metrics.MetricsProducer;
 +import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
 +import org.apache.accumulo.core.spi.compaction.CompactionJob;
 +import org.apache.accumulo.core.spi.compaction.CompactionKind;
 +import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
 +import org.apache.accumulo.core.tabletserver.thrift.InputFile;
 +import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig;
 +import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind;
 +import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
 +import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
 +import org.apache.accumulo.core.util.Retry;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.core.util.cache.Caches.CacheName;
 +import org.apache.accumulo.core.util.compaction.CompactorGroupIdImpl;
 +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 +import org.apache.accumulo.core.util.compaction.RunningCompaction;
 +import org.apache.accumulo.core.util.threads.ThreadPools;
 +import org.apache.accumulo.core.util.threads.Threads;
 +import org.apache.accumulo.core.volume.Volume;
 +import org.apache.accumulo.manager.Manager;
 +import org.apache.accumulo.manager.compaction.coordinator.commit.CommitCompaction;
 +import org.apache.accumulo.manager.compaction.coordinator.commit.CompactionCommitData;
 +import org.apache.accumulo.manager.compaction.coordinator.commit.RenameCompactionFile;
 +import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues;
 +import org.apache.accumulo.server.ServerContext;
 +import org.apache.accumulo.server.compaction.CompactionConfigStorage;
 +import org.apache.accumulo.server.compaction.CompactionPluginUtils;
 +import org.apache.accumulo.server.security.SecurityOperation;
 +import org.apache.accumulo.server.tablets.TabletNameGenerator;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.thrift.TException;
 +import org.apache.zookeeper.KeeperException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.github.benmanes.caffeine.cache.Cache;
 +import com.github.benmanes.caffeine.cache.CacheLoader;
 +import com.github.benmanes.caffeine.cache.LoadingCache;
 +import com.github.benmanes.caffeine.cache.Weigher;
 +import com.google.common.base.Preconditions;
 +import com.google.common.collect.Sets;
 +import com.google.common.net.HostAndPort;
 +
 +import io.micrometer.core.instrument.Gauge;
 +import io.micrometer.core.instrument.MeterRegistry;
 +
 +public class CompactionCoordinator
 +    implements CompactionCoordinatorService.Iface, Runnable, MetricsProducer {
 +
 +  private static final Logger LOG = LoggerFactory.getLogger(CompactionCoordinator.class);
-   private static final long FIFTEEN_MINUTES = TimeUnit.MINUTES.toMillis(15);
 +
 +  /*
 +   * Map of compactionId to RunningCompactions. This is an informational cache of what external
 +   * compactions may be running. Its possible it may contain external compactions that are not
 +   * actually running. It may not contain compactions that are actually running. The metadata table
 +   * is the most authoritative source of what external compactions are currently running, but it
 +   * does not have the stats that this map has.
 +   */
 +  protected final Map<ExternalCompactionId,RunningCompaction> RUNNING_CACHE =
 +      new ConcurrentHashMap<>();
 +
 +  /* Map of group name to last time compactor called to get a compaction job */
 +  // ELASTICITY_TODO need to clean out groups that are no longer configured..
 +  private final Map<CompactorGroupId,Long> TIME_COMPACTOR_LAST_CHECKED = new ConcurrentHashMap<>();
 +
 +  private final ServerContext ctx;
 +  private final SecurityOperation security;
 +  private final CompactionJobQueues jobQueues;
 +  private final AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances;
 +  // Exposed for tests
 +  protected volatile Boolean shutdown = false;
 +
 +  private final ScheduledThreadPoolExecutor schedExecutor;
 +
 +  private final Cache<ExternalCompactionId,RunningCompaction> completed;
 +  private final LoadingCache<FateId,CompactionConfig> compactionConfigCache;
 +  private final Cache<Path,Integer> tabletDirCache;
 +  private final DeadCompactionDetector deadCompactionDetector;
 +
 +  private final QueueMetrics queueMetrics;
 +
 +  public CompactionCoordinator(ServerContext ctx, SecurityOperation security,
 +      AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances) {
 +    this.ctx = ctx;
 +    this.schedExecutor = this.ctx.getScheduledExecutor();
 +    this.security = security;
 +
 +    this.jobQueues = new CompactionJobQueues(
 +        ctx.getConfiguration().getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE));
 +
 +    this.queueMetrics = new QueueMetrics(jobQueues);
 +
 +    this.fateInstances = fateInstances;
 +
 +    completed = ctx.getCaches().createNewBuilder(CacheName.COMPACTIONS_COMPLETED, true)
 +        .maximumSize(200).expireAfterWrite(10, TimeUnit.MINUTES).build();
 +
 +    CacheLoader<FateId,CompactionConfig> loader =
 +        fateId -> CompactionConfigStorage.getConfig(ctx, fateId);
 +
 +    // Keep a small short lived cache of compaction config. Compaction config never changes, however
 +    // when a compaction is canceled it is deleted which is why there is a time limit. It does not
 +    // hurt to let a job that was canceled start, it will be canceled later. Caching this immutable
 +    // config will help avoid reading the same data over and over.
 +    compactionConfigCache = ctx.getCaches().createNewBuilder(CacheName.COMPACTION_CONFIGS, true)
 +        .expireAfterWrite(30, SECONDS).maximumSize(100).build(loader);
 +
 +    Weigher<Path,Integer> weigher = (path, count) -> {
 +      return path.toUri().toString().length();
 +    };
 +
 +    tabletDirCache = ctx.getCaches().createNewBuilder(CacheName.COMPACTION_DIR_CACHE, true)
 +        .maximumWeight(10485760L).weigher(weigher).build();
 +
 +    deadCompactionDetector = new DeadCompactionDetector(this.ctx, this, schedExecutor);
 +    // At this point the manager does not have its lock so no actions should be taken yet
 +  }
 +
 +  private volatile Thread serviceThread = null;
 +
 +  public void start() {
 +    serviceThread = Threads.createThread("CompactionCoordinator Thread", this);
 +    serviceThread.start();
 +  }
 +
 +  public void shutdown() {
 +    shutdown = true;
 +    var localThread = serviceThread;
 +    if (localThread != null) {
 +      try {
 +        localThread.join();
 +      } catch (InterruptedException e) {
 +        LOG.error("Exception stopping compaction coordinator thread", e);
 +      }
 +    }
 +  }
 +
 +  protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) {
 +    ScheduledFuture<?> future =
 +        schedExecutor.scheduleWithFixedDelay(this::cleanUpCompactors, 0, 5, TimeUnit.MINUTES);
 +    ThreadPools.watchNonCriticalScheduledTask(future);
 +  }
 +
 +  protected void startRunningCleaner(ScheduledThreadPoolExecutor schedExecutor) {
 +    ScheduledFuture<?> future =
 +        schedExecutor.scheduleWithFixedDelay(this::cleanUpRunning, 0, 5, TimeUnit.MINUTES);
 +    ThreadPools.watchNonCriticalScheduledTask(future);
 +  }
 +
 +  @Override
 +  public void run() {
 +
 +    startCompactionCleaner(schedExecutor);
 +    startRunningCleaner(schedExecutor);
 +
 +    // On a re-start of the coordinator it's possible that external compactions are in-progress.
 +    // Attempt to get the running compactions on the compactors and then resolve which tserver
 +    // the external compaction came from to re-populate the RUNNING collection.
 +    LOG.info("Checking for running external compactions");
 +    // On re-start contact the running Compactors to try and seed the list of running compactions
 +    List<RunningCompaction> running = getCompactionsRunningOnCompactors();
 +    if (running.isEmpty()) {
 +      LOG.info("No running external compactions found");
 +    } else {
 +      LOG.info("Found {} running external compactions", running.size());
 +      running.forEach(rc -> {
 +        TCompactionStatusUpdate update = new TCompactionStatusUpdate();
 +        update.setState(TCompactionState.IN_PROGRESS);
 +        update.setMessage("Coordinator restarted, compaction found in progress");
 +        rc.addUpdate(System.currentTimeMillis(), update);
 +        RUNNING_CACHE.put(ExternalCompactionId.of(rc.getJob().getExternalCompactionId()), rc);
 +      });
 +    }
 +
 +    startDeadCompactionDetector();
 +
 +    // ELASTICITY_TODO the main function of the following loop was getting group summaries from
 +    // tservers. Its no longer doing that. May be best to remove the loop and make the remaining
 +    // task a scheduled one.
 +
 +    LOG.info("Starting loop to check for compactors not checking in");
 +    while (!shutdown) {
 +      long start = System.currentTimeMillis();
 +
 +      long now = System.currentTimeMillis();
 +      TIME_COMPACTOR_LAST_CHECKED.forEach((k, v) -> {
 +        if ((now - v) > getMissingCompactorWarningTime()) {
 +          // ELASTICITY_TODO may want to consider of the group has any jobs queued OR if the group
 +          // still exist in configuration
 +          LOG.warn("No compactors have checked in with coordinator for group {} in {}ms", k,
 +              getMissingCompactorWarningTime());
 +        }
 +      });
 +
 +      long checkInterval = getTServerCheckInterval();
 +      long duration = (System.currentTimeMillis() - start);
 +      if (checkInterval - duration > 0) {
 +        LOG.debug("Waiting {}ms for next group check", (checkInterval - duration));
 +        UtilWaitThread.sleep(checkInterval - duration);
 +      }
 +    }
 +
 +    LOG.info("Shutting down");
 +  }
 +
 +  protected void startDeadCompactionDetector() {
 +    deadCompactionDetector.start();
 +  }
 +
 +  protected long getMissingCompactorWarningTime() {
-     return FIFTEEN_MINUTES;
++    return getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME) * 3;
 +  }
 +
 +  protected long getTServerCheckInterval() {
 +    return this.ctx.getConfiguration()
 +        .getTimeInMillis(Property.COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL);
 +  }
 +
 +  public long getNumRunningCompactions() {
 +    return RUNNING_CACHE.size();
 +  }
 +
 +  /**
 +   * Return the next compaction job from the queue to a Compactor
 +   *
 +   * @param groupName group
 +   * @param compactorAddress compactor address
 +   * @throws ThriftSecurityException when permission error
 +   * @return compaction job
 +   */
 +  @Override
 +  public TExternalCompactionJob getCompactionJob(TInfo tinfo, TCredentials credentials,
 +      String groupName, String compactorAddress, String externalCompactionId)
 +      throws ThriftSecurityException {
 +
 +    // do not expect users to call this directly, expect compactors to call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +    CompactorGroupId groupId = CompactorGroupIdImpl.groupId(groupName);
 +    LOG.trace("getCompactionJob called for group {} by compactor {}", groupId, compactorAddress);
 +    TIME_COMPACTOR_LAST_CHECKED.put(groupId, System.currentTimeMillis());
 +
 +    TExternalCompactionJob result = null;
 +
 +    CompactionJobQueues.MetaJob metaJob = jobQueues.poll(groupId);
 +
 +    while (metaJob != null) {
 +
 +      Optional<CompactionConfig> compactionConfig = getCompactionConfig(metaJob);
 +
 +      // this method may reread the metadata, do not use the metadata in metaJob for anything after
 +      // this method
 +      CompactionMetadata ecm = null;
 +
 +      var kind = metaJob.getJob().getKind();
 +
 +      // Only reserve user compactions when the config is present. When compactions are canceled the
 +      // config is deleted.
 +      if (kind == CompactionKind.SYSTEM
 +          || (kind == CompactionKind.USER && compactionConfig.isPresent())) {
 +        ecm = reserveCompaction(metaJob, compactorAddress,
 +            ExternalCompactionId.from(externalCompactionId));
 +      }
 +
 +      if (ecm != null) {
 +        result = createThriftJob(externalCompactionId, ecm, metaJob, compactionConfig);
 +        // It is possible that by the time this added that the the compactor that made this request
 +        // is dead. In this cases the compaction is not actually running.
 +        RUNNING_CACHE.put(ExternalCompactionId.of(result.getExternalCompactionId()),
 +            new RunningCompaction(result, compactorAddress, groupName));
 +        LOG.debug("Returning external job {} to {} with {} files", result.externalCompactionId,
 +            compactorAddress, ecm.getJobFiles().size());
 +        break;
 +      } else {
 +        LOG.debug(
 +            "Unable to reserve compaction job for {}, pulling another off the queue for group {}",
 +            metaJob.getTabletMetadata().getExtent(), groupName);
 +        metaJob = jobQueues.poll(CompactorGroupIdImpl.groupId(groupName));
 +      }
 +    }
 +
 +    if (metaJob == null) {
 +      LOG.debug("No jobs found in group {} ", groupName);
 +    }
 +
 +    if (result == null) {
 +      LOG.trace("No jobs found for group {}, returning empty job to compactor {}", groupName,
 +          compactorAddress);
 +      result = new TExternalCompactionJob();
 +    }
 +
 +    return result;
 +
 +  }
 +
 +  // ELASTICITY_TODO unit test this code
 +  private boolean canReserveCompaction(TabletMetadata tablet, CompactionJob job,
 +      Set<StoredTabletFile> jobFiles) {
 +
 +    if (tablet == null) {
 +      // the tablet no longer exist
 +      return false;
 +    }
 +
 +    if (tablet.getOperationId() != null) {
 +      return false;
 +    }
 +
 +    if (!tablet.getFiles().containsAll(jobFiles)) {
 +      return false;
 +    }
 +
 +    var currentlyCompactingFiles = tablet.getExternalCompactions().values().stream()
 +        .flatMap(ecm -> ecm.getJobFiles().stream()).collect(Collectors.toSet());
 +
 +    if (!Collections.disjoint(jobFiles, currentlyCompactingFiles)) {
 +      return false;
 +    }
 +
 +    switch (job.getKind()) {
 +      case SYSTEM:
 +        if (tablet.getSelectedFiles() != null
 +            && !Collections.disjoint(jobFiles, tablet.getSelectedFiles().getFiles())) {
 +          return false;
 +        }
 +        break;
 +      case USER:
 +      case SELECTOR:
 +        if (tablet.getSelectedFiles() == null
 +            || !tablet.getSelectedFiles().getFiles().containsAll(jobFiles)) {
 +          return false;
 +        }
 +        break;
 +      default:
 +        throw new UnsupportedOperationException("Not currently handling " + job.getKind());
 +    }
 +
 +    return true;
 +  }
 +
 +  private void checkTabletDir(KeyExtent extent, Path path) {
 +    try {
 +      if (tabletDirCache.getIfPresent(path) == null) {
 +        FileStatus[] files = null;
 +        try {
 +          files = ctx.getVolumeManager().listStatus(path);
 +        } catch (FileNotFoundException ex) {
 +          // ignored
 +        }
 +
 +        if (files == null) {
 +          LOG.debug("Tablet {} had no dir, creating {}", extent, path);
 +
 +          ctx.getVolumeManager().mkdirs(path);
 +        }
 +        tabletDirCache.put(path, 1);
 +      }
 +    } catch (IOException e) {
 +      throw new UncheckedIOException(e);
 +    }
 +  }
 +
 +  protected CompactionMetadata createExternalCompactionMetadata(CompactionJob job,
 +      Set<StoredTabletFile> jobFiles, TabletMetadata tablet, String compactorAddress,
 +      ExternalCompactionId externalCompactionId) {
 +    boolean propDels;
 +
 +    FateId fateId = null;
 +
 +    switch (job.getKind()) {
 +      case SYSTEM: {
 +        boolean compactingAll = tablet.getFiles().equals(jobFiles);
 +        propDels = !compactingAll;
 +      }
 +        break;
 +      case SELECTOR:
 +      case USER: {
 +        boolean compactingAll = tablet.getSelectedFiles().initiallySelectedAll()
 +            && tablet.getSelectedFiles().getFiles().equals(jobFiles);
 +        propDels = !compactingAll;
 +        fateId = tablet.getSelectedFiles().getFateId();
 +      }
 +        break;
 +      default:
 +        throw new IllegalArgumentException();
 +    }
 +
 +    Consumer<String> directoryCreator = dir -> checkTabletDir(tablet.getExtent(), new Path(dir));
 +    ReferencedTabletFile newFile = TabletNameGenerator.getNextDataFilenameForMajc(propDels, ctx,
 +        tablet, directoryCreator, externalCompactionId);
 +
 +    return new CompactionMetadata(jobFiles, newFile, compactorAddress, job.getKind(),
 +        job.getPriority(), job.getGroup(), propDels, fateId);
 +
 +  }
 +
 +  protected CompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob metaJob,
 +      String compactorAddress, ExternalCompactionId externalCompactionId) {
 +
 +    Preconditions.checkArgument(metaJob.getJob().getKind() == CompactionKind.SYSTEM
 +        || metaJob.getJob().getKind() == CompactionKind.USER);
 +
 +    var tabletMetadata = metaJob.getTabletMetadata();
 +
 +    var jobFiles = metaJob.getJob().getFiles().stream().map(CompactableFileImpl::toStoredTabletFile)
 +        .collect(Collectors.toSet());
 +
 +    Retry retry =
 +        Retry.builder().maxRetries(5).retryAfter(100, MILLISECONDS).incrementBy(100, MILLISECONDS)
 +            .maxWait(10, SECONDS).backOffFactor(1.5).logInterval(3, MINUTES).createRetry();
 +
 +    while (retry.canRetry()) {
 +      try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
 +        var extent = metaJob.getTabletMetadata().getExtent();
 +
 +        if (!canReserveCompaction(tabletMetadata, metaJob.getJob(), jobFiles)) {
 +          return null;
 +        }
 +
 +        var ecm = createExternalCompactionMetadata(metaJob.getJob(), jobFiles, tabletMetadata,
 +            compactorAddress, externalCompactionId);
 +
 +        // any data that is read from the tablet to make a decision about if it can compact or not
 +        // must be included in the requireSame call
 +        var tabletMutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation()
 +            .requireSame(tabletMetadata, FILES, SELECTED, ECOMP);
 +
 +        tabletMutator.putExternalCompaction(externalCompactionId, ecm);
 +        tabletMutator.submit(tm -> tm.getExternalCompactions().containsKey(externalCompactionId));
 +
 +        var result = tabletsMutator.process().get(extent);
 +
 +        if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
 +          return ecm;
 +        } else {
 +          tabletMetadata = result.readMetadata();
 +        }
 +      }
 +
 +      retry.useRetry();
 +      try {
 +        retry.waitForNextAttempt(LOG,
 +            "Reserved compaction for " + metaJob.getTabletMetadata().getExtent());
 +      } catch (InterruptedException e) {
 +        throw new RuntimeException(e);
 +      }
 +    }
 +
 +    return null;
 +  }
 +
 +  protected TExternalCompactionJob createThriftJob(String externalCompactionId,
 +      CompactionMetadata ecm, CompactionJobQueues.MetaJob metaJob,
 +      Optional<CompactionConfig> compactionConfig) {
 +
 +    Set<CompactableFile> selectedFiles;
 +    if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
 +      selectedFiles = Set.of();
 +    } else {
 +      selectedFiles = metaJob.getTabletMetadata().getSelectedFiles().getFiles().stream()
 +          .map(file -> new CompactableFileImpl(file,
 +              metaJob.getTabletMetadata().getFilesMap().get(file)))
 +          .collect(Collectors.toUnmodifiableSet());
 +    }
 +
 +    Map<String,String> overrides = CompactionPluginUtils.computeOverrides(compactionConfig, ctx,
 +        metaJob.getTabletMetadata().getExtent(), metaJob.getJob().getFiles(), selectedFiles);
 +
 +    IteratorConfig iteratorSettings = SystemIteratorUtil
 +        .toIteratorConfig(compactionConfig.map(CompactionConfig::getIterators).orElse(List.of()));
 +
 +    var files = ecm.getJobFiles().stream().map(storedTabletFile -> {
 +      var dfv = metaJob.getTabletMetadata().getFilesMap().get(storedTabletFile);
 +      return new InputFile(storedTabletFile.getMetadata(), dfv.getSize(), dfv.getNumEntries(),
 +          dfv.getTime());
 +    }).collect(Collectors.toList());
 +
 +    FateInstanceType type = FateInstanceType.fromTableId(metaJob.getTabletMetadata().getTableId());
 +    FateId fateId = FateId.from(type, 0);
 +    if (metaJob.getJob().getKind() == CompactionKind.USER) {
 +      fateId = metaJob.getTabletMetadata().getSelectedFiles().getFateId();
 +    }
 +
 +    return new TExternalCompactionJob(externalCompactionId,
 +        metaJob.getTabletMetadata().getExtent().toThrift(), files, iteratorSettings,
 +        ecm.getCompactTmpName().getNormalizedPathStr(), ecm.getPropagateDeletes(),
 +        TCompactionKind.valueOf(ecm.getKind().name()), fateId.toThrift(), overrides);
 +  }
 +
 +  @Override
 +  public void registerMetrics(MeterRegistry registry) {
 +    Gauge.builder(METRICS_MAJC_QUEUED, jobQueues, CompactionJobQueues::getQueuedJobCount)
 +        .description("Number of queued major compactions").register(registry);
 +    Gauge.builder(METRICS_MAJC_RUNNING, this, CompactionCoordinator::getNumRunningCompactions)
 +        .description("Number of running major compactions").register(registry);
 +
 +    queueMetrics.registerMetrics(registry);
 +  }
 +
 +  public void addJobs(TabletMetadata tabletMetadata, Collection<CompactionJob> jobs) {
 +    jobQueues.add(tabletMetadata, jobs);
 +  }
 +
 +  public CompactionCoordinatorService.Iface getThriftService() {
 +    return this;
 +  }
 +
 +  private Optional<CompactionConfig> getCompactionConfig(CompactionJobQueues.MetaJob metaJob) {
 +    if (metaJob.getJob().getKind() == CompactionKind.USER
 +        && metaJob.getTabletMetadata().getSelectedFiles() != null) {
 +      var cconf =
 +          compactionConfigCache.get(metaJob.getTabletMetadata().getSelectedFiles().getFateId());
 +      return Optional.ofNullable(cconf);
 +    }
 +    return Optional.empty();
 +  }
 +
 +  /**
 +   * Compactors calls this method when they have finished a compaction. This method does the
 +   * following.
 +   *
 +   * <ol>
 +   * <li>Reads the tablets metadata and determines if the compaction can commit. Its possible that
 +   * things changed while the compaction was running and it can no longer commit.</li>
 +   * <li>Commit the compaction using a conditional mutation. If the tablets files or location
 +   * changed since reading the tablets metadata, then conditional mutation will fail. When this
 +   * happens it will reread the metadata and go back to step 1 conceptually. When committing a
 +   * compaction the compacted files are removed and scan entries are added to the tablet in case the
 +   * files are in use, this prevents GC from deleting the files between updating tablet metadata and
 +   * refreshing the tablet. The scan entries are only added when a tablet has a location.</li>
 +   * <li>After successful commit a refresh request is sent to the tablet if it has a location. This
 +   * will cause the tablet to start using the newly compacted files for future scans. Also the
 +   * tablet can delete the scan entries if there are no active scans using them.</li>
 +   * </ol>
 +   *
 +   * <p>
 +   * User compactions will be refreshed as part of the fate operation. The user compaction fate
 +   * operation will see the compaction was committed after this code updates the tablet metadata,
 +   * however if it were to rely on this code to do the refresh it would not be able to know when the
 +   * refresh was actually done. Therefore, user compactions will refresh as part of the fate
 +   * operation so that it's known to be done before the fate operation returns. Since the fate
 +   * operation will do it, there is no need to do it here for user compactions.
 +   *
 +   * @param tinfo trace info
 +   * @param credentials tcredentials object
 +   * @param externalCompactionId compaction id
 +   * @param textent tablet extent
 +   * @param stats compaction stats
 +   * @throws ThriftSecurityException when permission error
 +   */
 +  @Override
 +  public void compactionCompleted(TInfo tinfo, TCredentials credentials,
 +      String externalCompactionId, TKeyExtent textent, TCompactionStats stats)
 +      throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +
 +    // maybe fate has not started yet
 +    var localFates = fateInstances.get();
 +    while (localFates == null) {
 +      UtilWaitThread.sleep(100);
 +      if (shutdown) {
 +        return;
 +      }
 +      localFates = fateInstances.get();
 +    }
 +
 +    var extent = KeyExtent.fromThrift(textent);
 +    var localFate = localFates.get(FateInstanceType.fromTableId(extent.tableId()));
 +
 +    LOG.info("Compaction completed, id: {}, stats: {}, extent: {}", externalCompactionId, stats,
 +        extent);
 +    final var ecid = ExternalCompactionId.of(externalCompactionId);
 +
 +    var tabletMeta =
 +        ctx.getAmple().readTablet(extent, ECOMP, SELECTED, LOCATION, FILES, COMPACTED, OPID);
 +
 +    if (!CommitCompaction.canCommitCompaction(ecid, tabletMeta)) {
 +      return;
 +    }
 +
 +    CompactionMetadata ecm = tabletMeta.getExternalCompactions().get(ecid);
 +    var renameOp = new RenameCompactionFile(new CompactionCommitData(ecid, extent, ecm, stats));
 +
 +    // ELASTICITY_TODO add tag to fate that ECID can be added to. This solves two problem. First it
 +    // defends against starting multiple fate txs for the same thing. This will help the split code
 +    // also. Second the tag can be used by the dead compaction detector to ignore committing
 +    // compactions. The imple coould hash the key to produce the fate tx id.
 +    var txid = localFate.startTransaction();
 +    localFate.seedTransaction("COMMIT_COMPACTION", txid, renameOp, true,
 +        "Commit compaction " + ecid);
 +
 +    // ELASTICITY_TODO need to remove this wait. It is here because when the dead compaction
 +    // detector ask a compactor what its currently running it expects that cover commit. To remove
 +    // this wait would need another way for the dead compaction detector to know about committing
 +    // compactions. Could add a tag to the fate tx with the ecid and have dead compaction detector
 +    // scan these tags. This wait makes the code running in fate not be fault tolerant because in
 +    // the
 +    // case of faults the dead compaction detector may remove the compaction entry.
 +    localFate.waitForCompletion(txid);
 +
 +    // It's possible that RUNNING might not have an entry for this ecid in the case
 +    // of a coordinator restart when the Coordinator can't find the TServer for the
 +    // corresponding external compaction.
 +    recordCompletion(ecid);
 +    // ELASTICITY_TODO should above call move into fate code?
 +  }
 +
 +  @Override
 +  public void compactionFailed(TInfo tinfo, TCredentials credentials, String externalCompactionId,
 +      TKeyExtent extent) throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +    KeyExtent fromThriftExtent = KeyExtent.fromThrift(extent);
 +    LOG.info("Compaction failed, id: {}, extent: {}", externalCompactionId, fromThriftExtent);
 +    final var ecid = ExternalCompactionId.of(externalCompactionId);
 +    compactionFailed(Map.of(ecid, KeyExtent.fromThrift(extent)));
 +  }
 +
 +  void compactionFailed(Map<ExternalCompactionId,KeyExtent> compactions) {
 +
 +    try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
 +      compactions.forEach((ecid, extent) -> {
 +        try {
 +          ctx.requireNotDeleted(extent.tableId());
 +          tabletsMutator.mutateTablet(extent).requireAbsentOperation().requireCompaction(ecid)
 +              .deleteExternalCompaction(ecid).submit(new RejectionHandler() {
 +
 +                @Override
 +                public boolean callWhenTabletDoesNotExists() {
 +                  return true;
 +                }
 +
 +                @Override
 +                public boolean test(TabletMetadata tabletMetadata) {
 +                  return tabletMetadata == null
 +                      || !tabletMetadata.getExternalCompactions().containsKey(ecid);
 +                }
 +
 +              });
 +        } catch (TableDeletedException e) {
 +          LOG.warn("Table {} was deleted, unable to update metadata for compaction failure.",
 +              extent.tableId());
 +        }
 +      });
 +
 +      final List<ExternalCompactionId> ecidsForTablet = new ArrayList<>();
 +      tabletsMutator.process().forEach((extent, result) -> {
 +        if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) {
 +
 +          // this should try again later when the dead compaction detector runs, lets log it in case
 +          // its a persistent problem
 +          if (LOG.isDebugEnabled()) {
 +            var ecid =
 +                compactions.entrySet().stream().filter(entry -> entry.getValue().equals(extent))
 +                    .findFirst().map(Map.Entry::getKey).orElse(null);
 +            LOG.debug("Unable to remove failed compaction {} {}", extent, ecid);
 +          }
 +        } else {
 +          // compactionFailed is called from the Compactor when either a compaction fails or
 +          // is cancelled and it's called from the DeadCompactionDetector. This block is
 +          // entered when the conditional mutator above successfully deletes an ecid from
 +          // the tablet metadata. Remove compaction tmp files from the tablet directory
 +          // that have a corresponding ecid in the name.
 +
 +          ecidsForTablet.clear();
 +          compactions.entrySet().stream().filter(e -> e.getValue().compareTo(extent) == 0)
 +              .map(Entry::getKey).forEach(ecidsForTablet::add);
 +
 +          if (!ecidsForTablet.isEmpty()) {
 +            final TabletMetadata tm = ctx.getAmple().readTablet(extent, ColumnType.DIR);
 +            if (tm != null) {
 +              final Collection<Volume> vols = ctx.getVolumeManager().getVolumes();
 +              for (Volume vol : vols) {
 +                try {
 +                  final String volPath =
 +                      vol.getBasePath() + Constants.HDFS_TABLES_DIR + Path.SEPARATOR
 +                          + extent.tableId().canonical() + Path.SEPARATOR + tm.getDirName();
 +                  final FileSystem fs = vol.getFileSystem();
 +                  for (ExternalCompactionId ecid : ecidsForTablet) {
 +                    final String fileSuffix = "_tmp_" + ecid.canonical();
 +                    FileStatus[] files = fs.listStatus(new Path(volPath), (path) -> {
 +                      return path.getName().endsWith(fileSuffix);
 +                    });
 +                    if (files.length > 0) {
 +                      for (FileStatus file : files) {
 +                        if (!fs.delete(file.getPath(), false)) {
 +                          LOG.warn("Unable to delete ecid tmp file: {}: ", file.getPath());
 +                        } else {
 +                          LOG.debug("Deleted ecid tmp file: {}", file.getPath());
 +                        }
 +                      }
 +                    }
 +                  }
 +                } catch (IOException e) {
 +                  LOG.error("Exception deleting compaction tmp files for tablet: {}", extent, e);
 +                }
 +              }
 +            } else {
 +              // TabletMetadata does not exist for the extent. This could be due to a merge or
 +              // split operation. Use the utility to find tmp files at the table level
 +              deadCompactionDetector.addTableId(extent.tableId());
 +            }
 +          }
 +        }
 +      });
 +    }
 +
 +    compactions.forEach((k, v) -> recordCompletion(k));
 +  }
 +
 +  /**
 +   * Compactor calls to update the status of the assigned compaction
 +   *
 +   * @param tinfo trace info
 +   * @param credentials tcredentials object
 +   * @param externalCompactionId compaction id
 +   * @param update compaction status update
 +   * @param timestamp timestamp of the message
 +   * @throws ThriftSecurityException when permission error
 +   */
 +  @Override
 +  public void updateCompactionStatus(TInfo tinfo, TCredentials credentials,
 +      String externalCompactionId, TCompactionStatusUpdate update, long timestamp)
 +      throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +    LOG.debug("Compaction status update, id: {}, timestamp: {}, update: {}", externalCompactionId,
 +        timestamp, update);
 +    final RunningCompaction rc = RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId));
 +    if (null != rc) {
 +      rc.addUpdate(timestamp, update);
 +    }
 +  }
 +
 +  private void recordCompletion(ExternalCompactionId ecid) {
 +    var rc = RUNNING_CACHE.remove(ecid);
 +    if (rc != null) {
 +      completed.put(ecid, rc);
 +    }
 +  }
 +
 +  protected Set<ExternalCompactionId> readExternalCompactionIds() {
 +    try (TabletsMetadata tabletsMetadata =
 +        this.ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER)
 +            .filter(new HasExternalCompactionsFilter()).fetch(ECOMP).build()) {
 +      return tabletsMetadata.stream().flatMap(tm -> tm.getExternalCompactions().keySet().stream())
 +          .collect(Collectors.toSet());
 +    }
 +  }
 +
 +  /**
 +   * The RUNNING_CACHE set may contain external compactions that are not actually running. This
 +   * method periodically cleans those up.
 +   */
 +  public void cleanUpRunning() {
 +
 +    // grab a snapshot of the ids in the set before reading the metadata table. This is done to
 +    // avoid removing things that are added while reading the metadata.
 +    Set<ExternalCompactionId> idsSnapshot = Set.copyOf(RUNNING_CACHE.keySet());
 +
 +    // grab the ids that are listed as running in the metadata table. It important that this is done
 +    // after getting the snapshot.
 +    Set<ExternalCompactionId> idsInMetadata = readExternalCompactionIds();
 +
 +    var idsToRemove = Sets.difference(idsSnapshot, idsInMetadata);
 +
 +    // remove ids that are in the running set but not in the metadata table
 +    idsToRemove.forEach(this::recordCompletion);
 +
 +    if (idsToRemove.size() > 0) {
 +      LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove);
 +    }
 +  }
 +
 +  /**
 +   * Return information about running compactions
 +   *
 +   * @param tinfo trace info
 +   * @param credentials tcredentials object
 +   * @return map of ECID to TExternalCompaction objects
 +   * @throws ThriftSecurityException permission error
 +   */
 +  @Override
 +  public TExternalCompactionList getRunningCompactions(TInfo tinfo, TCredentials credentials)
 +      throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +
 +    final TExternalCompactionList result = new TExternalCompactionList();
 +    RUNNING_CACHE.forEach((ecid, rc) -> {
 +      TExternalCompaction trc = new TExternalCompaction();
 +      trc.setGroupName(rc.getGroupName());
 +      trc.setCompactor(rc.getCompactorAddress());
 +      trc.setUpdates(rc.getUpdates());
 +      trc.setJob(rc.getJob());
 +      result.putToCompactions(ecid.canonical(), trc);
 +    });
 +    return result;
 +  }
 +
 +  /**
 +   * Return information about recently completed compactions
 +   *
 +   * @param tinfo trace info
 +   * @param credentials tcredentials object
 +   * @return map of ECID to TExternalCompaction objects
 +   * @throws ThriftSecurityException permission error
 +   */
 +  @Override
 +  public TExternalCompactionList getCompletedCompactions(TInfo tinfo, TCredentials credentials)
 +      throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +    final TExternalCompactionList result = new TExternalCompactionList();
 +    completed.asMap().forEach((ecid, rc) -> {
 +      TExternalCompaction trc = new TExternalCompaction();
 +      trc.setGroupName(rc.getGroupName());
 +      trc.setCompactor(rc.getCompactorAddress());
 +      trc.setJob(rc.getJob());
 +      trc.setUpdates(rc.getUpdates());
 +      result.putToCompactions(ecid.canonical(), trc);
 +    });
 +    return result;
 +  }
 +
 +  @Override
 +  public void cancel(TInfo tinfo, TCredentials credentials, String externalCompactionId)
 +      throws TException {
 +    var runningCompaction = RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId));
 +    var extent = KeyExtent.fromThrift(runningCompaction.getJob().getExtent());
 +    try {
 +      NamespaceId nsId = this.ctx.getNamespaceId(extent.tableId());
 +      if (!security.canCompact(credentials, extent.tableId(), nsId)) {
 +        throw new AccumuloSecurityException(credentials.getPrincipal(),
 +            SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +      }
 +    } catch (TableNotFoundException e) {
 +      throw new ThriftTableOperationException(extent.tableId().canonical(), null,
 +          TableOperation.COMPACT_CANCEL, TableOperationExceptionType.NOTFOUND, e.getMessage());
 +    }
 +
 +    cancelCompactionOnCompactor(runningCompaction.getCompactorAddress(), externalCompactionId);
 +  }
 +
 +  /* Method exists to be called from test */
 +  public CompactionJobQueues getJobQueues() {
 +    return jobQueues;
 +  }
 +
 +  /* Method exists to be overridden in test to hide static method */
 +  protected List<RunningCompaction> getCompactionsRunningOnCompactors() {
 +    return ExternalCompactionUtil.getCompactionsRunningOnCompactors(this.ctx);
 +  }
 +
 +  /* Method exists to be overridden in test to hide static method */
 +  protected void cancelCompactionOnCompactor(String address, String externalCompactionId) {
 +    HostAndPort hostPort = HostAndPort.fromString(address);
 +    ExternalCompactionUtil.cancelCompaction(this.ctx, hostPort, externalCompactionId);
 +  }
 +
 +  private void deleteEmpty(ZooReaderWriter zoorw, String path)
 +      throws KeeperException, InterruptedException {
 +    try {
 +      LOG.debug("Deleting empty ZK node {}", path);
 +      zoorw.delete(path);
 +    } catch (KeeperException.NotEmptyException e) {
 +      LOG.debug("Failed to delete {} its not empty, likely an expected race condition.", path);
 +    }
 +  }
 +
 +  private void cleanUpCompactors() {
 +    final String compactorQueuesPath = this.ctx.getZooKeeperRoot() + Constants.ZCOMPACTORS;
 +
 +    var zoorw = this.ctx.getZooReaderWriter();
 +
 +    try {
 +      var groups = zoorw.getChildren(compactorQueuesPath);
 +
 +      for (String group : groups) {
 +        String qpath = compactorQueuesPath + "/" + group;
 +
 +        var compactors = zoorw.getChildren(qpath);
 +
 +        if (compactors.isEmpty()) {
 +          deleteEmpty(zoorw, qpath);
 +        }
 +
 +        for (String compactor : compactors) {
 +          String cpath = compactorQueuesPath + "/" + group + "/" + compactor;
 +          var lockNodes = zoorw.getChildren(compactorQueuesPath + "/" + group + "/" + compactor);
 +          if (lockNodes.isEmpty()) {
 +            deleteEmpty(zoorw, cpath);
 +          }
 +        }
 +      }
 +
 +    } catch (KeeperException | RuntimeException e) {
 +      LOG.warn("Failed to clean up compactors", e);
 +    } catch (InterruptedException e) {
 +      Thread.currentThread().interrupt();
 +      throw new IllegalStateException(e);
 +    }
 +  }
 +
 +}