You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2024/01/05 02:48:09 UTC

(accumulo) branch elasticity updated (e7d789dc79 -> 0d55086316)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a change to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


    from e7d789dc79 Small improvements to AccumuloStore related code (#4121)
     add bd8a67fa4e ZooStore deferral time to use System.nanoTime() (#4126)
     add 7921425c04 Merge remote-tracking branch 'upstream/2.1'
     add 0612a2d675 Add resource to try-with-resources block in ZooStore (#4128)
     add d443a9729e Merge remote-tracking branch 'upstream/2.1'
     add d02340a7ee Added Indication of Intermediate Compactions to CompactionConfigurer (#4118)
     add 5583129435 Return non-merged view of System config (#4120)
     add 08628c016b Updates DefaultCompactionPlanner to honor table.file.max prop (#4127)
     add 6dc1bcfa55 Merge branch '2.1'
     new 6f908863f5 Merge branch 'main' into elasticity
     new 0d55086316 removes code added for debugging merge

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../core/client/admin/InstanceOperations.java      |  10 ++
 .../admin/compaction/CompactionConfigurer.java     |  23 +++
 .../core/clientImpl/InstanceOperationsImpl.java    |   7 +
 .../org/apache/accumulo/core/conf/Property.java    |   8 +-
 .../accumulo/core/fate/AbstractFateStore.java      |  14 +-
 .../org/apache/accumulo/core/fate/AdminUtil.java   |   5 +-
 .../org/apache/accumulo/core/fate/AgeOffStore.java |   5 +-
 .../java/org/apache/accumulo/core/fate/Fate.java   |  13 +-
 .../org/apache/accumulo/core/fate/FateStore.java   |   3 +-
 .../accumulo/core/fate/WrappedFateTxStore.java     |   5 +-
 .../core/file/rfile/bcfile/PrintBCInfo.java        |  14 +-
 .../spi/compaction/DefaultCompactionPlanner.java   | 117 +++++++++++-
 .../ShellCompactCommandConfigurerTest.java         |   5 +
 .../apache/accumulo/core/fate/AgeOffStoreTest.java |  17 +-
 .../org/apache/accumulo/core/fate/TestStore.java   |   3 +-
 .../compaction/DefaultCompactionPlannerTest.java   | 200 ++++++++++++++++++++-
 .../server/compaction/CompactionPluginUtils.java   |  16 +-
 .../coordinator/CompactionCoordinator.java         |  13 +-
 .../accumulo/test/conf/PropStoreConfigIT.java      |  41 +++++
 .../accumulo/test/functional/CompactionIT.java     | 192 ++++++++++++++++++++
 20 files changed, 664 insertions(+), 47 deletions(-)


(accumulo) 01/02: Merge branch 'main' into elasticity

Posted by kt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 6f908863f571bcb76dd77e0576ec1bd135cd0284
Merge: e7d789dc79 6dc1bcfa55
Author: Keith Turner <kt...@apache.org>
AuthorDate: Thu Jan 4 21:40:30 2024 -0500

    Merge branch 'main' into elasticity

 .../core/client/admin/InstanceOperations.java      |  10 +
 .../admin/compaction/CompactionConfigurer.java     |  23 +++
 .../core/clientImpl/InstanceOperationsImpl.java    |   7 +
 .../org/apache/accumulo/core/conf/Property.java    |   8 +-
 .../accumulo/core/fate/AbstractFateStore.java      |  14 +-
 .../org/apache/accumulo/core/fate/AdminUtil.java   |   5 +-
 .../org/apache/accumulo/core/fate/AgeOffStore.java |   5 +-
 .../java/org/apache/accumulo/core/fate/Fate.java   |  13 +-
 .../org/apache/accumulo/core/fate/FateStore.java   |   3 +-
 .../accumulo/core/fate/WrappedFateTxStore.java     |   5 +-
 .../core/file/rfile/bcfile/PrintBCInfo.java        |  14 +-
 .../spi/compaction/DefaultCompactionPlanner.java   | 117 +++++++++++-
 .../ShellCompactCommandConfigurerTest.java         |   5 +
 .../apache/accumulo/core/fate/AgeOffStoreTest.java |  17 +-
 .../org/apache/accumulo/core/fate/TestStore.java   |   3 +-
 .../compaction/DefaultCompactionPlannerTest.java   | 202 ++++++++++++++++++++-
 .../server/compaction/CompactionPluginUtils.java   |  16 +-
 .../coordinator/CompactionCoordinator.java         |  13 +-
 .../accumulo/test/conf/PropStoreConfigIT.java      |  41 +++++
 .../accumulo/test/functional/CompactionIT.java     | 192 ++++++++++++++++++++
 20 files changed, 666 insertions(+), 47 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java
index b1bc08b2a4,ba0437a9fe..e568dd44d4
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@@ -957,16 -965,13 +957,20 @@@ public enum Property 
            + " adjusting this property you may want to consider adjusting"
            + " table.compaction.major.ratio also. Setting this property to 0 will make"
            + " it default to tserver.scan.files.open.max-1, this will prevent a tablet"
-           + " from having more RFiles than can be opened. Setting this property low may"
-           + " throttle ingest and increase query performance.",
+           + " from having more RFiles than can be opened. Prior to 2.1.0 this property"
+           + " was used to trigger merging minor compactions, but merging minor compactions"
+           + " were removed in 2.1.0. Now this property is only used by the"
+           + " DefaultCompactionStrategy and the DefaultCompactionPlanner."
+           + " The DefaultCompactionPlanner started using this property in 2.1.3, before"
+           + " that it did not use the property.",
        "1.4.0"),
 +  TABLE_MERGE_FILE_MAX("table.merge.file.max", "10000", PropertyType.COUNT,
 +      "The maximum number of files that a merge operation will process.  Before "
 +          + "merging a sum of the number of files in the merge range is computed and if it "
 +          + "exceeds this configuration then the merge will error and fail.  For example if "
 +          + "there are 100 tablets each having 10 files in the merge range, then the sum would "
 +          + "be 1000 and the merge will only proceed if this property is greater than 1000.",
 +      "4.0.0"),
    TABLE_FILE_SUMMARY_MAX_SIZE("table.file.summary.maxSize", "256k", PropertyType.BYTES,
        "The maximum size summary that will be stored. The number of RFiles that"
            + " had summary data exceeding this threshold is reported by"
diff --cc core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
index 874b58d8c6,0000000000..8c18fd378a
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
@@@ -1,321 -1,0 +1,325 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.core.fate;
 +
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +
 +import java.io.ByteArrayInputStream;
 +import java.io.ByteArrayOutputStream;
 +import java.io.IOException;
 +import java.io.ObjectInputStream;
 +import java.io.ObjectOutputStream;
 +import java.io.Serializable;
 +import java.io.UncheckedIOException;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.EnumSet;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Optional;
 +import java.util.Set;
++import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import org.apache.accumulo.core.fate.Fate.TxInfo;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.base.Preconditions;
 +
 +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 +
 +public abstract class AbstractFateStore<T> implements FateStore<T> {
 +
 +  private static final Logger log = LoggerFactory.getLogger(AbstractFateStore.class);
 +
 +  protected final Set<Long> reserved;
 +  protected final Map<Long,Long> deferred;
 +
 +  // This is incremented each time a transaction was unreserved that was non new
 +  protected final SignalCount unreservedNonNewCount = new SignalCount();
 +
 +  // This is incremented each time a transaction is unreserved that was runnable
 +  protected final SignalCount unreservedRunnableCount = new SignalCount();
 +
 +  public AbstractFateStore() {
 +    this.reserved = new HashSet<>();
 +    this.deferred = new HashMap<>();
 +  }
 +
 +  public static byte[] serialize(Object o) {
 +    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +        ObjectOutputStream oos = new ObjectOutputStream(baos)) {
 +      oos.writeObject(o);
 +      return baos.toByteArray();
 +    } catch (IOException e) {
 +      throw new UncheckedIOException(e);
 +    }
 +  }
 +
 +  @SuppressFBWarnings(value = "OBJECT_DESERIALIZATION",
 +      justification = "unsafe to store arbitrary serialized objects like this, but needed for now"
 +          + " for backwards compatibility")
 +  public static Object deserialize(byte[] ser) {
 +    try (ByteArrayInputStream bais = new ByteArrayInputStream(ser);
 +        ObjectInputStream ois = new ObjectInputStream(bais)) {
 +      return ois.readObject();
 +    } catch (IOException e) {
 +      throw new UncheckedIOException(e);
 +    } catch (ReflectiveOperationException e) {
 +      throw new IllegalStateException(e);
 +    }
 +  }
 +
 +  /**
 +   * Attempt to reserve transaction
 +   *
 +   * @param tid transaction id
 +   * @return An Optional containing the FateTxStore if the transaction was successfully reserved, or
 +   *         an empty Optional if the transaction was already reserved.
 +   */
 +  @Override
 +  public Optional<FateTxStore<T>> tryReserve(long tid) {
 +    synchronized (this) {
 +      if (!reserved.contains(tid)) {
 +        return Optional.of(reserve(tid));
 +      }
 +      return Optional.empty();
 +    }
 +  }
 +
 +  @Override
 +  public FateTxStore<T> reserve(long tid) {
 +    synchronized (AbstractFateStore.this) {
 +      while (reserved.contains(tid)) {
 +        try {
 +          AbstractFateStore.this.wait(100);
 +        } catch (InterruptedException e) {
 +          Thread.currentThread().interrupt();
 +          throw new IllegalStateException(e);
 +        }
 +      }
 +
 +      reserved.add(tid);
 +      return newFateTxStore(tid, true);
 +    }
 +  }
 +
 +  @Override
 +  public Iterator<Long> runnable(AtomicBoolean keepWaiting) {
 +
 +    while (keepWaiting.get()) {
 +      ArrayList<Long> runnableTids = new ArrayList<>();
 +
 +      final long beforeCount = unreservedRunnableCount.getCount();
 +
 +      List<String> transactions = getTransactions();
 +      for (String txidStr : transactions) {
 +        long txid = parseTid(txidStr);
 +        if (isRunnable(_getStatus(txid))) {
 +          runnableTids.add(txid);
 +        }
 +      }
 +
 +      synchronized (this) {
 +        runnableTids.removeIf(txid -> {
 +          var deferredTime = deferred.get(txid);
 +          if (deferredTime != null) {
-             if (deferredTime >= System.currentTimeMillis()) {
++            if ((deferredTime - System.nanoTime()) > 0) {
 +              return true;
 +            } else {
 +              deferred.remove(txid);
 +            }
 +          }
 +
 +          return reserved.contains(txid);
 +        });
 +      }
 +
 +      if (runnableTids.isEmpty()) {
 +        if (beforeCount == unreservedRunnableCount.getCount()) {
 +          long waitTime = 5000;
 +          if (!deferred.isEmpty()) {
-             Long minTime = Collections.min(deferred.values());
-             waitTime = minTime - System.currentTimeMillis();
++            long currTime = System.nanoTime();
++            long minWait =
++                deferred.values().stream().mapToLong(l -> l - currTime).min().getAsLong();
++            waitTime = TimeUnit.MILLISECONDS.convert(minWait, TimeUnit.NANOSECONDS);
 +          }
 +
 +          if (waitTime > 0) {
 +            unreservedRunnableCount.waitFor(count -> count != beforeCount, waitTime,
 +                keepWaiting::get);
 +          }
 +        }
 +      } else {
 +        return runnableTids.iterator();
 +      }
 +
 +    }
 +
 +    return Collections.emptyIterator();
 +  }
 +
 +  @Override
 +  public List<Long> list() {
 +    ArrayList<Long> l = new ArrayList<>();
 +    List<String> transactions = getTransactions();
 +    for (String txid : transactions) {
 +      l.add(parseTid(txid));
 +    }
 +    return l;
 +  }
 +
 +  @Override
 +  public ReadOnlyFateTxStore<T> read(long tid) {
 +    return newFateTxStore(tid, false);
 +  }
 +
 +  protected boolean isRunnable(TStatus status) {
 +    return status == TStatus.IN_PROGRESS || status == TStatus.FAILED_IN_PROGRESS
 +        || status == TStatus.SUBMITTED;
 +  }
 +
 +  protected long parseTid(String txdir) {
 +    return Long.parseLong(txdir.split("_")[1], 16);
 +  }
 +
 +  protected abstract List<String> getTransactions();
 +
 +  protected abstract TStatus _getStatus(long tid);
 +
 +  protected abstract FateTxStore<T> newFateTxStore(long tid, boolean isReserved);
 +
 +  protected abstract class AbstractFateTxStoreImpl<T> implements FateTxStore<T> {
 +    protected final long tid;
 +    protected final boolean isReserved;
 +
 +    protected TStatus observedStatus = null;
 +
 +    protected AbstractFateTxStoreImpl(long tid, boolean isReserved) {
 +      this.tid = tid;
 +      this.isReserved = isReserved;
 +    }
 +
 +    @Override
 +    public TStatus waitForStatusChange(EnumSet<TStatus> expected) {
 +      Preconditions.checkState(!isReserved,
 +          "Attempted to wait for status change while reserved " + FateTxId.formatTid(getID()));
 +      while (true) {
 +
 +        long countBefore = unreservedNonNewCount.getCount();
 +
 +        TStatus status = _getStatus(tid);
 +        if (expected.contains(status)) {
 +          return status;
 +        }
 +
 +        unreservedNonNewCount.waitFor(count -> count != countBefore, 1000, () -> true);
 +      }
 +    }
 +
 +    @Override
-     public void unreserve(long deferTime) {
++    public void unreserve(long deferTime, TimeUnit timeUnit) {
++      deferTime = TimeUnit.NANOSECONDS.convert(deferTime, timeUnit);
 +
 +      if (deferTime < 0) {
 +        throw new IllegalArgumentException("deferTime < 0 : " + deferTime);
 +      }
 +
 +      synchronized (AbstractFateStore.this) {
 +        if (!reserved.remove(tid)) {
 +          throw new IllegalStateException(
 +              "Tried to unreserve id that was not reserved " + FateTxId.formatTid(tid));
 +        }
 +
 +        // notify any threads waiting to reserve
 +        AbstractFateStore.this.notifyAll();
 +
 +        if (deferTime > 0) {
-           deferred.put(tid, System.currentTimeMillis() + deferTime);
++          deferred.put(tid, System.nanoTime() + deferTime);
 +        }
 +      }
 +
 +      if (observedStatus != null && isRunnable(observedStatus)) {
 +        unreservedRunnableCount.increment();
 +      }
 +
 +      if (observedStatus != TStatus.NEW) {
 +        unreservedNonNewCount.increment();
 +      }
 +    }
 +
 +    protected void verifyReserved(boolean isWrite) {
 +      if (!isReserved && isWrite) {
 +        throw new IllegalStateException("Attempted write on unreserved FATE transaction.");
 +      }
 +
 +      if (isReserved) {
 +        synchronized (AbstractFateStore.this) {
 +          if (!reserved.contains(tid)) {
 +            throw new IllegalStateException(
 +                "Tried to operate on unreserved transaction " + FateTxId.formatTid(tid));
 +          }
 +        }
 +      }
 +    }
 +
 +    @Override
 +    public TStatus getStatus() {
 +      verifyReserved(false);
 +      var status = _getStatus(tid);
 +      observedStatus = status;
 +      return status;
 +    }
 +
 +    @Override
 +    public long getID() {
 +      return tid;
 +    }
 +
 +    protected byte[] serializeTxInfo(Serializable so) {
 +      if (so instanceof String) {
 +        return ("S " + so).getBytes(UTF_8);
 +      } else {
 +        byte[] sera = serialize(so);
 +        byte[] data = new byte[sera.length + 2];
 +        System.arraycopy(sera, 0, data, 2, sera.length);
 +        data[0] = 'O';
 +        data[1] = ' ';
 +        return data;
 +      }
 +    }
 +
 +    protected Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) {
 +      if (data[0] == 'O') {
 +        byte[] sera = new byte[data.length - 2];
 +        System.arraycopy(data, 2, sera, 0, sera.length);
 +        return (Serializable) deserialize(sera);
 +      } else if (data[0] == 'S') {
 +        return new String(data, 2, data.length - 2, UTF_8);
 +      } else {
 +        throw new IllegalStateException("Bad node data " + txInfo);
 +      }
 +    }
 +
 +  }
 +}
diff --cc core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
index 95ef99448f,858e6e6998..6a64014f01
--- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
@@@ -32,10 -32,9 +32,11 @@@ import java.util.List
  import java.util.Map;
  import java.util.Map.Entry;
  import java.util.Set;
+ import java.util.concurrent.TimeUnit;
  
 -import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus;
 +import org.apache.accumulo.core.fate.FateStore.FateTxStore;
 +import org.apache.accumulo.core.fate.ReadOnlyFateStore.ReadOnlyFateTxStore;
 +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
  import org.apache.accumulo.core.fate.zookeeper.FateLock;
  import org.apache.accumulo.core.fate.zookeeper.FateLock.FateLockPath;
  import org.apache.accumulo.core.fate.zookeeper.ZooReader;
@@@ -431,28 -432,26 +432,28 @@@ public class AdminUtil<T> 
        return false;
      }
      boolean state = false;
 -    zs.reserve(txid);
 -    TStatus ts = zs.getStatus(txid);
 -    switch (ts) {
 -      case UNKNOWN:
 -        System.out.printf("Invalid transaction ID: %016x%n", txid);
 -        break;
 -
 -      case SUBMITTED:
 -      case IN_PROGRESS:
 -      case NEW:
 -      case FAILED:
 -      case FAILED_IN_PROGRESS:
 -      case SUCCESSFUL:
 -        System.out.printf("Deleting transaction: %016x (%s)%n", txid, ts);
 -        zs.delete(txid);
 -        state = true;
 -        break;
 +    FateTxStore<T> txStore = zs.reserve(txid);
 +    try {
 +      TStatus ts = txStore.getStatus();
 +      switch (ts) {
 +        case UNKNOWN:
 +          System.out.printf("Invalid transaction ID: %016x%n", txid);
 +          break;
 +
 +        case SUBMITTED:
 +        case IN_PROGRESS:
 +        case NEW:
 +        case FAILED:
 +        case FAILED_IN_PROGRESS:
 +        case SUCCESSFUL:
 +          System.out.printf("Deleting transaction: %016x (%s)%n", txid, ts);
 +          txStore.delete();
 +          state = true;
 +          break;
 +      }
 +    } finally {
-       txStore.unreserve(0);
++      txStore.unreserve(0, TimeUnit.MILLISECONDS);
      }
 -
 -    zs.unreserve(txid, 0, TimeUnit.MILLISECONDS);
      return state;
    }
  
@@@ -470,36 -469,33 +471,36 @@@
        return false;
      }
      boolean state = false;
 -    zs.reserve(txid);
 -    TStatus ts = zs.getStatus(txid);
 -    switch (ts) {
 -      case UNKNOWN:
 -        System.out.printf("Invalid transaction ID: %016x%n", txid);
 -        break;
 -
 -      case SUBMITTED:
 -      case IN_PROGRESS:
 -      case NEW:
 -        System.out.printf("Failing transaction: %016x (%s)%n", txid, ts);
 -        zs.setStatus(txid, TStatus.FAILED_IN_PROGRESS);
 -        state = true;
 -        break;
 -
 -      case SUCCESSFUL:
 -        System.out.printf("Transaction already completed: %016x (%s)%n", txid, ts);
 -        break;
 -
 -      case FAILED:
 -      case FAILED_IN_PROGRESS:
 -        System.out.printf("Transaction already failed: %016x (%s)%n", txid, ts);
 -        state = true;
 -        break;
 +    FateTxStore<T> txStore = zs.reserve(txid);
 +    try {
 +      TStatus ts = txStore.getStatus();
 +      switch (ts) {
 +        case UNKNOWN:
 +          System.out.printf("Invalid transaction ID: %016x%n", txid);
 +          break;
 +
 +        case SUBMITTED:
 +        case IN_PROGRESS:
 +        case NEW:
 +          System.out.printf("Failing transaction: %016x (%s)%n", txid, ts);
 +          txStore.setStatus(TStatus.FAILED_IN_PROGRESS);
 +          state = true;
 +          break;
 +
 +        case SUCCESSFUL:
 +          System.out.printf("Transaction already completed: %016x (%s)%n", txid, ts);
 +          break;
 +
 +        case FAILED:
 +        case FAILED_IN_PROGRESS:
 +          System.out.printf("Transaction already failed: %016x (%s)%n", txid, ts);
 +          state = true;
 +          break;
 +      }
 +    } finally {
-       txStore.unreserve(0);
++      txStore.unreserve(0, TimeUnit.MILLISECONDS);
      }
  
 -    zs.unreserve(txid, 0, TimeUnit.MILLISECONDS);
      return state;
    }
  
diff --cc core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java
index f61c06028c,ca016d0c9c..080ff0d33d
--- a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java
@@@ -24,8 -25,7 +24,9 @@@ import java.util.Iterator
  import java.util.List;
  import java.util.Map;
  import java.util.Map.Entry;
 +import java.util.Optional;
+ import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
  
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -108,7 -108,7 +109,7 @@@ public class AgeOffStore<T> implements 
            }
  
          } finally {
-           txStore.unreserve(0);
 -          store.unreserve(txid, 0, TimeUnit.MILLISECONDS);
++          txStore.unreserve(0, TimeUnit.MILLISECONDS);
          }
        } catch (Exception e) {
          log.warn("Failed to age off FATE tx " + FateTxId.formatTid(txid), e);
@@@ -138,7 -138,7 +139,7 @@@
              break;
          }
        } finally {
-         txStore.unreserve(0);
 -        store.unreserve(txid, 0, TimeUnit.MILLISECONDS);
++        txStore.unreserve(0, TimeUnit.MILLISECONDS);
        }
      }
    }
diff --cc core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index a54ad734ee,1a14418b1a..4b54ccc771
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@@ -38,7 -35,7 +38,8 @@@ import java.util.concurrent.LinkedTrans
  import java.util.concurrent.RejectedExecutionException;
  import java.util.concurrent.ScheduledThreadPoolExecutor;
  import java.util.concurrent.ThreadPoolExecutor;
+ import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.TransferQueue;
  import java.util.concurrent.atomic.AtomicBoolean;
  import java.util.function.Function;
  
@@@ -203,8 -132,8 +204,8 @@@ public class Fate<T> 
          } catch (Exception e) {
            runnerLog.error("Uncaught exception in FATE runner thread.", e);
          } finally {
 -          if (tid != null) {
 -            store.unreserve(tid, deferTime, TimeUnit.MILLISECONDS);
 +          if (txStore != null) {
-             txStore.unreserve(deferTime);
++            txStore.unreserve(deferTime, TimeUnit.MILLISECONDS);
            }
          }
        }
@@@ -356,15 -281,15 +357,15 @@@
          }
  
          if (autoCleanUp) {
 -          store.setTransactionInfo(tid, TxInfo.AUTO_CLEAN, autoCleanUp);
 +          txStore.setTransactionInfo(TxInfo.AUTO_CLEAN, autoCleanUp);
          }
  
 -        store.setTransactionInfo(tid, TxInfo.TX_NAME, txName);
 +        txStore.setTransactionInfo(TxInfo.TX_NAME, txName);
  
 -        store.setStatus(tid, SUBMITTED);
 +        txStore.setStatus(SUBMITTED);
        }
      } finally {
-       txStore.unreserve(0);
 -      store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
++      txStore.unreserve(0, TimeUnit.MILLISECONDS);
      }
  
    }
@@@ -402,7 -325,7 +403,7 @@@
              return false;
            }
          } finally {
-           txStore.unreserve(0);
 -          store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
++          txStore.unreserve(0, TimeUnit.MILLISECONDS);
          }
        } else {
          // reserved, lets retry.
@@@ -433,7 -356,7 +434,7 @@@
            break;
        }
      } finally {
-       txStore.unreserve(0);
 -      store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
++      txStore.unreserve(0, TimeUnit.MILLISECONDS);
      }
    }
  
@@@ -444,9 -367,9 +445,9 @@@
          throw new IllegalStateException("Tried to get exception when transaction "
              + FateTxId.formatTid(tid) + " not in successful state");
        }
 -      return (String) store.getTransactionInfo(tid, TxInfo.RETURN_VALUE);
 +      return (String) txStore.getTransactionInfo(TxInfo.RETURN_VALUE);
      } finally {
-       txStore.unreserve(0);
 -      store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
++      txStore.unreserve(0, TimeUnit.MILLISECONDS);
      }
    }
  
@@@ -458,9 -381,9 +459,9 @@@
          throw new IllegalStateException("Tried to get exception when transaction "
              + FateTxId.formatTid(tid) + " not in failed state");
        }
 -      return (Exception) store.getTransactionInfo(tid, TxInfo.EXCEPTION);
 +      return (Exception) txStore.getTransactionInfo(TxInfo.EXCEPTION);
      } finally {
-       txStore.unreserve(0);
 -      store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
++      txStore.unreserve(0, TimeUnit.MILLISECONDS);
      }
    }
  
diff --cc core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
index 7db5766e81,0000000000..81a0e3212f
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
@@@ -1,110 -1,0 +1,111 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.core.fate;
 +
 +import java.io.Serializable;
 +import java.util.Optional;
++import java.util.concurrent.TimeUnit;
 +
 +/**
 + * Transaction Store: a place to save transactions
 + *
 + * A transaction consists of a number of operations. To use, first create a transaction id, and then
 + * seed the transaction with an initial operation. An executor service can then execute the
 + * transaction's operation, possibly pushing more operations onto the transaction as each step
 + * successfully completes. If a step fails, the stack can be unwound, undoing each operation.
 + */
 +public interface FateStore<T> extends ReadOnlyFateStore<T> {
 +
 +  /**
 +   * Create a new transaction id
 +   *
 +   * @return a transaction id
 +   */
 +  long create();
 +
 +  /**
 +   * An interface that allows read/write access to the data related to a single fate operation.
 +   */
 +  interface FateTxStore<T> extends ReadOnlyFateTxStore<T> {
 +    @Override
 +    Repo<T> top();
 +
 +    /**
 +     * Update the given transaction with the next operation
 +     *
 +     * @param repo the operation
 +     */
 +    void push(Repo<T> repo) throws StackOverflowException;
 +
 +    /**
 +     * Remove the last pushed operation from the given transaction.
 +     */
 +    void pop();
 +
 +    /**
 +     * Update the state of a given transaction
 +     *
 +     * @param status execution status
 +     */
 +    void setStatus(TStatus status);
 +
 +    /**
 +     * Set transaction-specific information.
 +     *
 +     * @param txInfo name of attribute of a transaction to set.
 +     * @param val transaction data to store
 +     */
 +    void setTransactionInfo(Fate.TxInfo txInfo, Serializable val);
 +
 +    /**
 +     * Remove the transaction from the store.
 +     *
 +     */
 +    void delete();
 +
 +    /**
 +     * Return the given transaction to the store.
 +     *
 +     * upon successful return the store now controls the referenced transaction id. caller should no
 +     * longer interact with it.
 +     *
 +     * @param deferTime time in millis to keep this transaction from being returned by
 +     *        {@link #runnable(java.util.concurrent.atomic.AtomicBoolean)}. Must be non-negative.
 +     */
-     void unreserve(long deferTime);
++    void unreserve(long deferTime, TimeUnit timeUnit);
 +  }
 +
 +  /**
 +   * Attempt to reserve transaction
 +   *
 +   * @param tid transaction id
 +   * @return true if reserved by this call, false if already reserved
 +   */
 +  Optional<FateTxStore<T>> tryReserve(long tid);
 +
 +  /**
 +   * Reserve the specific tid.
 +   *
 +   * Reserving a transaction id ensures that nothing else in-process interacting via the same
 +   * instance will be operating on that transaction id.
 +   *
 +   */
 +  FateTxStore<T> reserve(long tid);
 +
 +}
diff --cc core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java
index 238f981a22,0000000000..1d8c7126c2
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java
@@@ -1,96 -1,0 +1,97 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.core.fate;
 +
 +import java.io.Serializable;
 +import java.util.EnumSet;
 +import java.util.List;
++import java.util.concurrent.TimeUnit;
 +
 +public class WrappedFateTxStore<T> implements FateStore.FateTxStore<T> {
 +  protected final FateStore.FateTxStore<T> wrapped;
 +
 +  public WrappedFateTxStore(FateStore.FateTxStore<T> wrapped) {
 +    this.wrapped = wrapped;
 +  }
 +
 +  @Override
-   public void unreserve(long deferTime) {
-     wrapped.unreserve(deferTime);
++  public void unreserve(long deferTime, TimeUnit timeUnit) {
++    wrapped.unreserve(deferTime, timeUnit);
 +  }
 +
 +  @Override
 +  public Repo<T> top() {
 +    return wrapped.top();
 +  }
 +
 +  @Override
 +  public void push(Repo<T> repo) throws StackOverflowException {
 +    wrapped.push(repo);
 +  }
 +
 +  @Override
 +  public void pop() {
 +    wrapped.pop();
 +  }
 +
 +  @Override
 +  public FateStore.TStatus getStatus() {
 +    return wrapped.getStatus();
 +  }
 +
 +  @Override
 +  public void setStatus(FateStore.TStatus status) {
 +    wrapped.setStatus(status);
 +  }
 +
 +  @Override
 +  public FateStore.TStatus waitForStatusChange(EnumSet<FateStore.TStatus> expected) {
 +    return wrapped.waitForStatusChange(expected);
 +  }
 +
 +  @Override
 +  public void setTransactionInfo(Fate.TxInfo txInfo, Serializable val) {
 +    wrapped.setTransactionInfo(txInfo, val);
 +  }
 +
 +  @Override
 +  public Serializable getTransactionInfo(Fate.TxInfo txInfo) {
 +    return wrapped.getTransactionInfo(txInfo);
 +  }
 +
 +  @Override
 +  public void delete() {
 +    wrapped.delete();
 +  }
 +
 +  @Override
 +  public long timeCreated() {
 +    return wrapped.timeCreated();
 +  }
 +
 +  @Override
 +  public long getID() {
 +    return wrapped.getID();
 +  }
 +
 +  @Override
 +  public List<ReadOnlyRepo<T>> getStack() {
 +    return wrapped.getStack();
 +  }
 +}
diff --cc core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
index d3c3deb57a,eba25df062..dcbb78dfcd
--- a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
@@@ -370,94 -329,157 +386,179 @@@ public class DefaultCompactionPlanner i
  
      Set<CompactableFile> filesCopy = new HashSet<>(params.getCandidates());
  
 +    FakeFileGenerator fakeFileGenerator = new FakeFileGenerator();
 +
      long maxSizeToCompact = getMaxSizeToCompact(params.getKind());
  
 -    Collection<CompactableFile> group;
 -    if (params.getRunningCompactions().isEmpty()) {
 -      group =
 -          findDataFilesToCompact(filesCopy, params.getRatio(), maxFilesToCompact, maxSizeToCompact);
 +    // This set represents future files that will be produced by running compactions. If the optimal
 +    // set of files to compact is computed and contains one of these files, then its optimal to wait
 +    // for this compaction to finish.
 +    Set<CompactableFile> expectedFiles = new HashSet<>();
 +    params.getRunningCompactions().stream().filter(job -> job.getKind() == params.getKind())
 +        .map(job -> getExpected(job.getFiles(), fakeFileGenerator))
 +        .forEach(compactableFile -> Preconditions.checkState(expectedFiles.add(compactableFile)));
 +    Preconditions.checkState(Collections.disjoint(expectedFiles, filesCopy));
 +    filesCopy.addAll(expectedFiles);
  
 -      if (!group.isEmpty() && group.size() < params.getCandidates().size()
 -          && params.getCandidates().size() <= maxFilesToCompact
 -          && (params.getKind() == CompactionKind.USER
 -              || params.getKind() == CompactionKind.SELECTOR)) {
 -        // USER and SELECTOR compactions must eventually compact all files. When a subset of files
 -        // that meets the compaction ratio is selected, look ahead and see if the next compaction
 -        // would also meet the compaction ratio. If not then compact everything to avoid doing
 -        // more than logarithmic work across multiple comapctions.
 -
 -        filesCopy.removeAll(group);
 -        filesCopy.add(getExpected(group, 0));
 -
 -        if (findDataFilesToCompact(filesCopy, params.getRatio(), maxFilesToCompact,
 -            maxSizeToCompact).isEmpty()) {
 -          // The next possible compaction does not meet the compaction ratio, so compact
 -          // everything.
 -          group = Set.copyOf(params.getCandidates());
 -        }
 +    List<Collection<CompactableFile>> compactionJobs = new ArrayList<>();
  
 +    while (true) {
 +      var filesToCompact =
 +          findDataFilesToCompact(filesCopy, params.getRatio(), maxFilesToCompact, maxSizeToCompact);
 +      if (!Collections.disjoint(filesToCompact, expectedFiles)) {
 +        // the optimal set of files to compact includes the output of a running compaction, so lets
 +        // wait for that running compaction to finish.
 +        break;
        }
  
 -    } else if (params.getKind() == CompactionKind.SYSTEM) {
 -      // This code determines if once the files compacting finish would they be included in a
 -      // compaction with the files smaller than them? If so, then wait for the running compaction
 -      // to complete.
 +      if (filesToCompact.isEmpty()) {
 +        break;
 +      }
  
 -      // The set of files running compactions may produce
 -      var expectedFiles = getExpected(params.getRunningCompactions());
 +      filesCopy.removeAll(filesToCompact);
  
 -      if (!Collections.disjoint(filesCopy, expectedFiles)) {
 -        throw new AssertionError();
 -      }
 +      // A compaction job will be created for these files, so lets add an expected file for that
 +      // planned compaction job. Then if future iterations of this loop will include that file then
 +      // they will not compact.
 +      var expectedFile = getExpected(filesToCompact, fakeFileGenerator);
 +      Preconditions.checkState(expectedFiles.add(expectedFile));
 +      Preconditions.checkState(filesCopy.add(expectedFile));
  
 -      filesCopy.addAll(expectedFiles);
 +      compactionJobs.add(filesToCompact);
  
 -      group =
 -          findDataFilesToCompact(filesCopy, params.getRatio(), maxFilesToCompact, maxSizeToCompact);
 +      if (filesToCompact.size() < maxFilesToCompact) {
 +        // Only continue looking for more compaction jobs when a set of files is found equals
 +        // maxFilesToCompact in size. When the files found is less than the max size its an
 +        // indication that the compaction ratio was no longer met and therefore it would be
 +        // suboptimal to look for more jobs because the smallest optimal set was found.
 +        break;
 +      }
 +    }
  
 -      if (!Collections.disjoint(group, expectedFiles)) {
 -        // file produced by running compaction will eventually compact with existing files, so
 -        // wait.
 -        group = Set.of();
 +    if (compactionJobs.size() == 1
 +        && (params.getKind() == CompactionKind.USER || params.getKind() == CompactionKind.SELECTOR)
 +        && compactionJobs.get(0).size() < params.getCandidates().size()
 +        && compactionJobs.get(0).size() <= maxFilesToCompact) {
 +      // USER and SELECTOR compactions must eventually compact all files. When a subset of files
 +      // that meets the compaction ratio is selected, look ahead and see if the next compaction
 +      // would also meet the compaction ratio. If not then compact everything to avoid doing
 +      // more than logarithmic work across multiple comapctions.
 +
 +      var group = compactionJobs.get(0);
 +      var candidatesCopy = new HashSet<>(params.getCandidates());
 +
 +      candidatesCopy.removeAll(group);
 +      Preconditions.checkState(candidatesCopy.add(getExpected(group, fakeFileGenerator)));
 +
 +      if (findDataFilesToCompact(candidatesCopy, params.getRatio(), maxFilesToCompact,
 +          maxSizeToCompact).isEmpty()) {
 +        // The next possible compaction does not meet the compaction ratio, so compact
 +        // everything.
 +        compactionJobs.set(0, Set.copyOf(params.getCandidates()));
        }
 -    } else {
 -      group = Set.of();
      }
  
-     if (compactionJobs.isEmpty()
-         && (params.getKind() == CompactionKind.USER || params.getKind() == CompactionKind.SELECTOR)
-         && params.getRunningCompactions().stream()
-             .noneMatch(job -> job.getKind() == params.getKind())) {
-       // These kinds of compaction require files to compact even if none of the files meet the
-       // compaction ratio. No files were found using the compaction ratio and no compactions are
-       // running, so force a compaction.
-       compactionJobs = findMaximalRequiredSetToCompact(params.getCandidates(), maxFilesToCompact);
 -    if (group.isEmpty()) {
++    if (compactionJobs.isEmpty()) {
+       if ((params.getKind() == CompactionKind.USER || params.getKind() == CompactionKind.SELECTOR)
+           && params.getRunningCompactions().stream()
+               .noneMatch(job -> job.getKind() == params.getKind())) {
 -        group = findMaximalRequiredSetToCompact(params.getCandidates(), maxFilesToCompact);
++        // These kinds of compaction require files to compact even if none of the files meet the
++        // compaction ratio. No files were found using the compaction ratio and no compactions are
++        // running, so force a compaction.
++        compactionJobs = findMaximalRequiredSetToCompact(params.getCandidates(), maxFilesToCompact);
+       } else if (params.getKind() == CompactionKind.SYSTEM
+           && params.getRunningCompactions().isEmpty()
+           && params.getAll().size() == params.getCandidates().size()) {
+         int maxTabletFiles =
+             getMaxTabletFiles(params.getServiceEnvironment().getConfiguration(params.getTableId()));
+         if (params.getAll().size() > maxTabletFiles) {
+           // The tablet is above its max files, there are no compactions running, all files are
+           // candidates for a system compaction, and no files were found to compact. Attempt to
+           // find a set of files to compact by lowering the compaction ratio.
 -          group = findFilesToCompactWithLowerRatio(params, maxSizeToCompact, maxTabletFiles);
++          compactionJobs =
++              findFilesToCompactWithLowerRatio(params, maxSizeToCompact, maxTabletFiles);
+         }
+       }
      }
  
 -    if (group.isEmpty()) {
 -      return params.createPlanBuilder().build();
 -    } else {
 -      // determine which executor to use based on the size of the files
 -      var ceid = getExecutor(group);
 -
 -      return params.createPlanBuilder().addJob(createPriority(params, group), ceid, group).build();
 -    }
 +    var builder = params.createPlanBuilder();
 +    compactionJobs.forEach(jobFiles -> builder.addJob(createPriority(params, jobFiles),
 +        getExecutor(jobFiles), jobFiles));
 +    return builder.build();
    }
  
+   static int getMaxTabletFiles(ServiceEnvironment.Configuration configuration) {
+     int maxTabletFiles = Integer.parseInt(configuration.get(Property.TABLE_FILE_MAX.getKey()));
+     if (maxTabletFiles <= 0) {
+       maxTabletFiles =
+           Integer.parseInt(configuration.get(Property.TSERV_SCAN_MAX_OPENFILES.getKey())) - 1;
+     }
+     return maxTabletFiles;
+   }
+ 
+   /**
+    * Searches for the highest compaction ratio that is less than the configured ratio that will
+    * lower the number of files.
+    */
 -  private Collection<CompactableFile> findFilesToCompactWithLowerRatio(PlanningParameters params,
 -      long maxSizeToCompact, int maxTabletFiles) {
++  private List<Collection<CompactableFile>> findFilesToCompactWithLowerRatio(
++      PlanningParameters params, long maxSizeToCompact, int maxTabletFiles) {
+     double lowRatio = 1.0;
+     double highRatio = params.getRatio();
+ 
+     Preconditions.checkArgument(highRatio >= lowRatio);
+ 
+     var candidates = Set.copyOf(params.getCandidates());
+     Collection<CompactableFile> found = Set.of();
+ 
+     int goalCompactionSize = candidates.size() - maxTabletFiles + 1;
+     if (goalCompactionSize > maxFilesToCompact) {
+       // The tablet is way over max tablet files, so multiple compactions will be needed. Therefore,
+       // do not set a goal size for this compaction and find the largest compaction ratio that will
+       // compact some set of files.
+       goalCompactionSize = 0;
+     }
+ 
+     // Do a binary search of the compaction ratios.
+     while (highRatio - lowRatio > .1) {
+       double ratioToCheck = (highRatio - lowRatio) / 2 + lowRatio;
+ 
+       // This is continually resorting the list of files in the following call, could optimize this
+       var filesToCompact =
+           findDataFilesToCompact(candidates, ratioToCheck, maxFilesToCompact, maxSizeToCompact);
+ 
 -      log.trace("Tried ratio {} and found {} {} {}", ratioToCheck, filesToCompact,
 -          filesToCompact.size() >= goalCompactionSize, goalCompactionSize);
++      log.info("Tried ratio {} and found {} {} {} {}", ratioToCheck, filesToCompact,
++          filesToCompact.size() >= goalCompactionSize, goalCompactionSize, maxFilesToCompact);
+ 
+       if (filesToCompact.isEmpty() || filesToCompact.size() < goalCompactionSize) {
+         highRatio = ratioToCheck;
+       } else {
+         lowRatio = ratioToCheck;
+         found = filesToCompact;
+       }
+     }
+ 
+     if (found.isEmpty() && lowRatio == 1.0) {
+       // in this case the data must be really skewed, operator intervention may be needed.
+       log.warn(
+           "Attempted to lower compaction ration from {} to {} for {} because there are {} files "
+               + "and the max tablet files is {}, however no set of files to compact were found.",
+           params.getRatio(), highRatio, params.getTableId(), params.getCandidates().size(),
+           maxTabletFiles);
+     }
+ 
+     log.info(
+         "For {} found {} files to compact lowering compaction ratio from {} to {} because the tablet "
+             + "exceeded {} files, it had {}",
+         params.getTableId(), found.size(), params.getRatio(), lowRatio, maxTabletFiles,
+         params.getCandidates().size());
+ 
 -    return found;
++    if (found.isEmpty()) {
++      return List.of();
++    } else {
++      return List.of(found);
++    }
+   }
+ 
    private static short createPriority(PlanningParameters params,
        Collection<CompactableFile> group) {
      return CompactionJobPrioritizer.createPriority(params.getKind(), params.getAll().size(),
diff --cc core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java
index d2530ce1f3,c2b086ee34..1e02bd2620
--- a/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java
@@@ -22,9 -22,10 +22,10 @@@ import static org.junit.jupiter.api.Ass
  
  import java.util.HashSet;
  import java.util.Set;
+ import java.util.concurrent.TimeUnit;
  
  import org.apache.accumulo.core.fate.AgeOffStore.TimeSource;
 -import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus;
 +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
  import org.apache.zookeeper.KeeperException;
  import org.junit.jupiter.api.Test;
  
@@@ -50,25 -51,25 +51,25 @@@ public class AgeOffStoreTest 
      aoStore.ageOff();
  
      long txid1 = aoStore.create();
 -    aoStore.reserve(txid1);
 -    aoStore.setStatus(txid1, TStatus.IN_PROGRESS);
 -    aoStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS);
 +    var txStore1 = aoStore.reserve(txid1);
 +    txStore1.setStatus(TStatus.IN_PROGRESS);
-     txStore1.unreserve(0);
++    txStore1.unreserve(0, TimeUnit.MILLISECONDS);
  
      aoStore.ageOff();
  
      long txid2 = aoStore.create();
 -    aoStore.reserve(txid2);
 -    aoStore.setStatus(txid2, TStatus.IN_PROGRESS);
 -    aoStore.setStatus(txid2, TStatus.FAILED);
 -    aoStore.unreserve(txid2, 0, TimeUnit.MILLISECONDS);
 +    var txStore2 = aoStore.reserve(txid2);
 +    txStore2.setStatus(TStatus.IN_PROGRESS);
 +    txStore2.setStatus(TStatus.FAILED);
-     txStore2.unreserve(0);
++    txStore2.unreserve(0, TimeUnit.MILLISECONDS);
  
      tts.time = 6;
  
      long txid3 = aoStore.create();
 -    aoStore.reserve(txid3);
 -    aoStore.setStatus(txid3, TStatus.IN_PROGRESS);
 -    aoStore.setStatus(txid3, TStatus.SUCCESSFUL);
 -    aoStore.unreserve(txid3, 0, TimeUnit.MILLISECONDS);
 +    var txStore3 = aoStore.reserve(txid3);
 +    txStore3.setStatus(TStatus.IN_PROGRESS);
 +    txStore3.setStatus(TStatus.SUCCESSFUL);
-     txStore3.unreserve(0);
++    txStore3.unreserve(0, TimeUnit.MILLISECONDS);
  
      Long txid4 = aoStore.create();
  
@@@ -99,21 -100,21 +100,21 @@@
      TestTimeSource tts = new TestTimeSource();
      TestStore testStore = new TestStore();
      long txid1 = testStore.create();
 -    testStore.reserve(txid1);
 -    testStore.setStatus(txid1, TStatus.IN_PROGRESS);
 -    testStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS);
 +    var txStore1 = testStore.reserve(txid1);
 +    txStore1.setStatus(TStatus.IN_PROGRESS);
-     txStore1.unreserve(0);
++    txStore1.unreserve(0, TimeUnit.MILLISECONDS);
  
      long txid2 = testStore.create();
 -    testStore.reserve(txid2);
 -    testStore.setStatus(txid2, TStatus.IN_PROGRESS);
 -    testStore.setStatus(txid2, TStatus.FAILED);
 -    testStore.unreserve(txid2, 0, TimeUnit.MILLISECONDS);
 +    var txStore2 = testStore.reserve(txid2);
 +    txStore2.setStatus(TStatus.IN_PROGRESS);
 +    txStore2.setStatus(TStatus.FAILED);
-     txStore2.unreserve(0);
++    txStore2.unreserve(0, TimeUnit.MILLISECONDS);
  
      long txid3 = testStore.create();
 -    testStore.reserve(txid3);
 -    testStore.setStatus(txid3, TStatus.IN_PROGRESS);
 -    testStore.setStatus(txid3, TStatus.SUCCESSFUL);
 -    testStore.unreserve(txid3, 0, TimeUnit.MILLISECONDS);
 +    var txStore3 = testStore.reserve(txid3);
 +    txStore3.setStatus(TStatus.IN_PROGRESS);
 +    txStore3.setStatus(TStatus.SUCCESSFUL);
-     txStore3.unreserve(0);
++    txStore3.unreserve(0, TimeUnit.MILLISECONDS);
  
      Long txid4 = testStore.create();
  
@@@ -134,9 -135,9 +135,9 @@@
      assertEquals(Set.of(txid1), new HashSet<>(aoStore.list()));
      assertEquals(1, new HashSet<>(aoStore.list()).size());
  
 -    aoStore.reserve(txid1);
 -    aoStore.setStatus(txid1, TStatus.FAILED_IN_PROGRESS);
 -    aoStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS);
 +    txStore1 = aoStore.reserve(txid1);
 +    txStore1.setStatus(TStatus.FAILED_IN_PROGRESS);
-     txStore1.unreserve(0);
++    txStore1.unreserve(0, TimeUnit.MILLISECONDS);
  
      tts.time = 30;
  
@@@ -145,9 -146,9 +146,9 @@@
      assertEquals(Set.of(txid1), new HashSet<>(aoStore.list()));
      assertEquals(1, new HashSet<>(aoStore.list()).size());
  
 -    aoStore.reserve(txid1);
 -    aoStore.setStatus(txid1, TStatus.FAILED);
 -    aoStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS);
 +    txStore1 = aoStore.reserve(txid1);
 +    txStore1.setStatus(TStatus.FAILED);
-     txStore1.unreserve(0);
++    txStore1.unreserve(0, TimeUnit.MILLISECONDS);
  
      aoStore.ageOff();
  
diff --cc core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
index 058b0c50a4,3253c41a90..1dabfcf697
--- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
+++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
@@@ -18,17 -18,13 +18,18 @@@
   */
  package org.apache.accumulo.core.fate;
  
 +import java.io.Serializable;
  import java.util.ArrayList;
 +import java.util.EnumSet;
  import java.util.HashMap;
  import java.util.HashSet;
 +import java.util.Iterator;
  import java.util.List;
  import java.util.Map;
 +import java.util.Optional;
  import java.util.Set;
+ import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
  
  /**
   * Transient in memory store for transactions.
@@@ -66,97 -61,35 +67,97 @@@ public class TestStore implements FateS
      }
    }
  
 -  @Override
 -  public void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit) {
 -    if (!reserved.remove(tid)) {
 -      throw new IllegalStateException();
 +  private class TestFateTxStore implements FateTxStore<String> {
 +
 +    private final long tid;
 +
 +    TestFateTxStore(long tid) {
 +      this.tid = tid;
      }
 -  }
  
 -  @Override
 -  public org.apache.accumulo.core.fate.TStore.TStatus getStatus(long tid) {
 -    if (!reserved.contains(tid)) {
 -      throw new IllegalStateException();
 +    @Override
 +    public Repo<String> top() {
 +      throw new UnsupportedOperationException();
      }
  
 -    TStatus status = statuses.get(tid);
 -    if (status == null) {
 -      return TStatus.UNKNOWN;
 +    @Override
 +    public List<ReadOnlyRepo<String>> getStack() {
 +      throw new UnsupportedOperationException();
      }
 -    return status;
 -  }
  
 -  @Override
 -  public void setStatus(long tid, org.apache.accumulo.core.fate.TStore.TStatus status) {
 -    if (!reserved.contains(tid)) {
 -      throw new IllegalStateException();
 +    @Override
 +    public TStatus getStatus() {
 +      if (!reserved.contains(tid)) {
 +        throw new IllegalStateException();
 +      }
 +
 +      TStatus status = statuses.get(tid);
 +      if (status == null) {
 +        return TStatus.UNKNOWN;
 +      }
 +      return status;
 +    }
 +
 +    @Override
 +    public TStatus waitForStatusChange(EnumSet<TStatus> expected) {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public Serializable getTransactionInfo(Fate.TxInfo txInfo) {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public long timeCreated() {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public long getID() {
 +      return tid;
 +    }
 +
 +    @Override
 +    public void push(Repo<String> repo) throws StackOverflowException {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public void pop() {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public void setStatus(TStatus status) {
 +      if (!reserved.contains(tid)) {
 +        throw new IllegalStateException();
 +      }
 +      if (!statuses.containsKey(tid)) {
 +        throw new IllegalStateException();
 +      }
 +      statuses.put(tid, status);
 +    }
 +
 +    @Override
 +    public void setTransactionInfo(Fate.TxInfo txInfo, Serializable val) {
 +      throw new UnsupportedOperationException();
      }
 -    if (!statuses.containsKey(tid)) {
 -      throw new IllegalStateException();
 +
 +    @Override
 +    public void delete() {
 +      if (!reserved.contains(tid)) {
 +        throw new IllegalStateException();
 +      }
 +      statuses.remove(tid);
 +    }
 +
 +    @Override
-     public void unreserve(long deferTime) {
++    public void unreserve(long deferTime, TimeUnit timeUnit) {
 +      if (!reserved.remove(tid)) {
 +        throw new IllegalStateException();
 +      }
      }
 -    statuses.put(tid, status);
    }
  
    @Override
diff --cc core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
index 6447a2d147,9f4f9d315c..7886a5f0c6
--- a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
@@@ -27,14 -26,15 +27,16 @@@ import static org.junit.jupiter.api.Ass
  
  import java.net.URI;
  import java.net.URISyntaxException;
+ import java.util.ArrayList;
  import java.util.Collection;
 +import java.util.Collections;
  import java.util.HashMap;
  import java.util.HashSet;
  import java.util.List;
  import java.util.Map;
+ import java.util.Optional;
  import java.util.Set;
 -import java.util.stream.Collectors;
 +import java.util.stream.IntStream;
  
  import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
  import org.apache.accumulo.core.conf.ConfigurationCopy;
@@@ -600,6 -446,144 +605,146 @@@ public class DefaultCompactionPlannerTe
      assertTrue(e.getMessage().contains("maxSize"), "Error message didn't contain maxSize");
    }
  
+   // Test cases where a tablet has more than table.file.max files, but no files were found using the
+   // compaction ratio. The planner should try to find the highest ratio that will result in a
+   // compaction.
+   @Test
+   public void testMaxTabletFiles() throws Exception {
+     String executors = "[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1},"
+         + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2},"
+         + "{'name':'large','type': 'internal','numThreads':3}]";
+ 
+     Map<String,String> overrides = new HashMap<>();
+     overrides.put(Property.COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts.maxOpen", "10");
+     overrides.put(Property.TABLE_FILE_MAX.getKey(), "7");
+     var conf = new ConfigurationImpl(SiteConfiguration.empty().withOverrides(overrides).build());
+ 
+     // For this case need to compact three files and the highest ratio that achieves that is 1.8
+     var planner = createPlanner(conf, executors);
+     var all = createCFs(1000, 1.1, 1.9, 1.8, 1.6, 1.3, 1.4, 1.3, 1.2, 1.1);
+     var params = createPlanningParams(all, all, Set.of(), 3, CompactionKind.SYSTEM, conf);
+     var plan = planner.makePlan(params);
+     var job = getOnlyElement(plan.getJobs());
+     assertEquals(createCFs(1000, 1.1, 1.9, 1.8), job.getFiles());
+ 
+     // For this case need to compact two files and the highest ratio that achieves that is 2.9
+     all = createCFs(1000, 2, 2.9, 2.8, 2.7, 2.6, 2.5, 2.4, 2.3);
+     params = createPlanningParams(all, all, Set.of(), 3, CompactionKind.SYSTEM, conf);
+     plan = planner.makePlan(params);
+     job = getOnlyElement(plan.getJobs());
+     assertEquals(createCFs(1000, 2, 2.9), job.getFiles());
+ 
+     all =
+         createCFs(1000, 1.1, 2.89, 2.85, 2.7, 2.3, 2.9, 2.8, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2);
+     params = createPlanningParams(all, all, Set.of(), 3, CompactionKind.SYSTEM, conf);
+     plan = planner.makePlan(params);
+     job = getOnlyElement(plan.getJobs());
+     assertEquals(createCFs(1000, 1.1, 2.89, 2.85, 2.7, 2.3, 2.9), job.getFiles());
+ 
+     all = createCFs(1000, 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 1.9, 1.1);
+     params = createPlanningParams(all, all, Set.of(), 3, CompactionKind.SYSTEM, conf);
+     plan = planner.makePlan(params);
+     job = getOnlyElement(plan.getJobs());
+     assertEquals(createCFs(1000, 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 1.9), job.getFiles());
+ 
+     // In this case the tablet can not be brought below the max files limit in a single compaction,
+     // so it should find the highest ratio to compact
+     for (var ratio : List.of(1.9, 2.0, 3.0, 4.0)) {
+       all = createCFs(1000, 1.9, 1.8, 1.7, 1.6, 1.5, 1.4, 1.5, 1.2, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1,
+           1.1, 1.1);
+       params = createPlanningParams(all, all, Set.of(), ratio, CompactionKind.SYSTEM, conf);
+       plan = planner.makePlan(params);
+       job = getOnlyElement(plan.getJobs());
+       assertEquals(createCFs(1000, 1.9), job.getFiles());
+     }
+ 
+     // In this case the tablet can be brought below the max limit in single compaction, so it should
+     // find this
+     all =
+         createCFs(1000, 1.9, 1.8, 1.7, 1.6, 1.5, 1.4, 1.5, 1.2, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1);
+     params = createPlanningParams(all, all, Set.of(), 3, CompactionKind.SYSTEM, conf);
+     plan = planner.makePlan(params);
+     job = getOnlyElement(plan.getJobs());
+     assertEquals(createCFs(1000, 1.9, 1.8, 1.7, 1.6, 1.5, 1.4, 1.5, 1.2, 1.1), job.getFiles());
+ 
+     // each file is 10x the size of the file smaller than it
+     all = createCFs(10, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1);
+     params = createPlanningParams(all, all, Set.of(), 3, CompactionKind.SYSTEM, conf);
+     plan = planner.makePlan(params);
+     job = getOnlyElement(plan.getJobs());
+     assertEquals(createCFs(10, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1), job.getFiles());
+ 
+     // test with some files growing 20x, ensure those are not included
+     for (var ratio : List.of(1.9, 2.0, 3.0, 4.0)) {
+       all = createCFs(10, 1.05, 1.05, 1.25, 1.75, 1.25, 1.05, 1.05, 1.05);
+       params = createPlanningParams(all, all, Set.of(), ratio, CompactionKind.SYSTEM, conf);
+       plan = planner.makePlan(params);
+       job = getOnlyElement(plan.getJobs());
+       assertEquals(createCFs(10, 1.05, 1.05, 1.25, 1.75), job.getFiles());
+     }
+ 
+   }
+ 
+   @Test
+   public void testMaxTabletFilesNoCompaction() throws Exception {
+     String executors = "[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1},"
+         + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2},"
+         + "{'name':'large','type': 'internal','maxSize':'512M','numThreads':3}]";
+ 
+     Map<String,String> overrides = new HashMap<>();
+     overrides.put(Property.COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts.maxOpen", "10");
+     overrides.put(Property.TABLE_FILE_MAX.getKey(), "7");
+     var conf = new ConfigurationImpl(SiteConfiguration.empty().withOverrides(overrides).build());
+ 
+     // ensure that when a compaction would be over the max size limit that it is not planned
+     var planner = createPlanner(conf, executors);
+     var all = createCFs(1_000_000_000, 2, 2, 2, 2, 2, 2, 2);
+     var params = createPlanningParams(all, all, Set.of(), 3, CompactionKind.SYSTEM, conf);
+     var plan = planner.makePlan(params);
+ 
++    System.out.println(plan.getJobs());
++
+     assertTrue(plan.getJobs().isEmpty());
+ 
+     // ensure when a compaction is running and we are over files max but below the compaction ratio
+     // that a compaction is not planned
+     all = createCFs(1_000, 2, 2, 2, 2, 2, 2, 2);
+     var job = new CompactionJobImpl((short) 1, CompactionExecutorIdImpl.externalId("ee1"),
+         createCFs("F1", "1000"), CompactionKind.SYSTEM, Optional.of(false));
+     params = createPlanningParams(all, all, Set.of(job), 3, CompactionKind.SYSTEM, conf);
+     plan = planner.makePlan(params);
+ 
+     assertTrue(plan.getJobs().isEmpty());
+ 
+     // a really bad situation, each file is 20 times the size of its smaller file. The algorithm
+     // does not search that for ratios that low.
+     all = createCFs(10, 1.05, 1.05, 1.05, 1.05, 1.05, 1.05, 1.05, 1.05);
+     params = createPlanningParams(all, all, Set.of(), 3, CompactionKind.SYSTEM, conf);
+     plan = planner.makePlan(params);
+     assertTrue(plan.getJobs().isEmpty());
+   }
+ 
+   // Test to ensure that plugin falls back from TABLE_FILE_MAX to TSERV_SCAN_MAX_OPENFILES
+   @Test
+   public void testMaxTableFilesFallback() throws Exception {
+     String executors = "[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1},"
+         + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2},"
+         + "{'name':'large','type': 'internal','numThreads':3}]";
+ 
+     Map<String,String> overrides = new HashMap<>();
+     overrides.put(Property.COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts.maxOpen", "10");
+     overrides.put(Property.TABLE_FILE_MAX.getKey(), "0");
+     overrides.put(Property.TSERV_SCAN_MAX_OPENFILES.getKey(), "5");
+     var conf = new ConfigurationImpl(SiteConfiguration.empty().withOverrides(overrides).build());
+ 
+     var planner = createPlanner(conf, executors);
+     var all = createCFs(1000, 1.9, 1.8, 1.7, 1.6, 1.5, 1.4, 1.3, 1.2, 1.1);
+     var params = createPlanningParams(all, all, Set.of(), 3, CompactionKind.SYSTEM, conf);
+     var plan = planner.makePlan(params);
+     var job = getOnlyElement(plan.getJobs());
+     assertEquals(createCFs(1000, 1.9, 1.8, 1.7, 1.6, 1.5, 1.4), job.getFiles());
+   }
+ 
    private CompactionJob createJob(CompactionKind kind, Set<CompactableFile> all,
        Set<CompactableFile> files) {
      return new CompactionPlanImpl.BuilderImpl(kind, all, all)
@@@ -607,16 -591,46 +752,53 @@@
          .build().getJobs().iterator().next();
    }
  
+   // Create a set of files whose sizes would require certain compaction ratios to compact
 -  private Set<CompactableFile> createCFs(int initialSize, double... desiredRatios)
 -      throws URISyntaxException {
++  private Set<CompactableFile> createCFs(int initialSize, double... desiredRatios) {
+     List<String> pairs = new ArrayList<>();
+     pairs.add("F1");
+     pairs.add(initialSize + "");
+ 
+     double previousFileSizes = initialSize;
+ 
+     int i = 2;
+     for (double desiredRatio : desiredRatios) {
+       Preconditions.checkArgument(desiredRatio > 1.0);
+       Preconditions.checkArgument(desiredRatio <= i);
+ 
+       /*
+        * The compaction ratio formula is fileSize * ratio < fileSize + previousFileSizes. Solved the
+        * following equation to compute a file size given a desired ratio.
+        *
+        * fileSize * ratio = fileSize + previousFileSizes
+        *
+        * fileSize * ratio - fileSize = previousFileSizes
+        *
+        * fileSize * (ratio - 1) = previousFileSizes
+        *
+        * fileSize = previousFileSizes / (ratio - 1)
+        */
+ 
+       double fileSize = previousFileSizes / (desiredRatio - 1);
+       pairs.add("F" + i + "_" + desiredRatio);
+       pairs.add(Math.round(fileSize) + "");
+ 
+       previousFileSizes += fileSize;
+       i++;
+     }
+ 
+     return createCFs(pairs.toArray(new String[0]));
+   }
+ 
 -  private static Set<CompactableFile> createCFs(String... namesSizePairs)
 -      throws URISyntaxException {
 +  private static CompactableFile createCF(String name, long size) {
 +    try {
 +      return CompactableFile
 +          .create(new URI("hdfs://fake/accumulo/tables/1/t-0000000z/" + name + ".rf"), size, 0);
 +    } catch (URISyntaxException e) {
 +      throw new RuntimeException(e);
 +    }
 +  }
 +
 +  private static Set<CompactableFile> createCFs(String... namesSizePairs) {
      Set<CompactableFile> files = new HashSet<>();
  
      for (int i = 0; i < namesSizePairs.length; i += 2) {
@@@ -721,9 -750,13 +913,15 @@@
    private static CompactionPlanner.InitParameters getInitParams(Configuration conf,
        String executors) {
  
+     String maxOpen = conf.get(prefix + "cs1.planner.opts.maxOpen");
      Map<String,String> options = new HashMap<>();
      options.put("executors", executors.replaceAll("'", "\""));
-     options.put("maxOpen", "15");
+ 
+     if (maxOpen != null) {
+       options.put("maxOpen", maxOpen);
++    } else {
++      options.put("maxOpen", "15");
+     }
  
      ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class);
      EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes();
diff --cc server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java
index 7978d3c2e5,0000000000..fdcb93732f
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java
@@@ -1,331 -1,0 +1,337 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.server.compaction;
 +
 +import java.io.IOException;
 +import java.io.UncheckedIOException;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.Map;
 +import java.util.Optional;
 +import java.util.Set;
 +import java.util.function.Predicate;
 +import java.util.stream.Collectors;
 +
 +import org.apache.accumulo.core.classloader.ClassLoaderUtil;
 +import org.apache.accumulo.core.client.PluginEnvironment;
 +import org.apache.accumulo.core.client.admin.CompactionConfig;
 +import org.apache.accumulo.core.client.admin.PluginConfig;
 +import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
 +import org.apache.accumulo.core.client.admin.compaction.CompactionConfigurer;
 +import org.apache.accumulo.core.client.admin.compaction.CompactionSelector;
 +import org.apache.accumulo.core.client.rfile.RFileSource;
 +import org.apache.accumulo.core.client.sample.SamplerConfiguration;
 +import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
 +import org.apache.accumulo.core.client.summary.Summary;
 +import org.apache.accumulo.core.clientImpl.UserCompactionUtils;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.data.TabletId;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.dataImpl.TabletIdImpl;
 +import org.apache.accumulo.core.file.FileOperations;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.metadata.CompactableFileImpl;
 +import org.apache.accumulo.core.metadata.StoredTabletFile;
 +import org.apache.accumulo.core.metadata.schema.DataFileValue;
 +import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 +import org.apache.accumulo.core.spi.common.ServiceEnvironment;
 +import org.apache.accumulo.core.spi.compaction.CompactionDispatcher;
 +import org.apache.accumulo.core.summary.Gatherer;
 +import org.apache.accumulo.core.summary.SummarizerFactory;
 +import org.apache.accumulo.core.summary.SummaryCollection;
 +import org.apache.accumulo.core.summary.SummaryReader;
 +import org.apache.accumulo.server.ServerContext;
 +import org.apache.accumulo.server.ServiceEnvironmentImpl;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FSDataInputStream;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.collect.Collections2;
 +
 +public class CompactionPluginUtils {
 +
 +  private static final Logger log = LoggerFactory.getLogger(CompactionPluginUtils.class);
 +
 +  private static <T> T newInstance(AccumuloConfiguration tableConfig, String className,
 +      Class<T> baseClass) {
 +    String context = ClassLoaderUtil.tableContext(tableConfig);
 +    try {
 +      return ConfigurationTypeHelper.getClassInstance(context, className, baseClass);
 +    } catch (ReflectiveOperationException e) {
 +      throw new IllegalArgumentException(e);
 +    }
 +  }
 +
 +  public static Set<StoredTabletFile> selectFiles(ServerContext context, KeyExtent extent,
 +      CompactionConfig compactionConfig, Map<StoredTabletFile,DataFileValue> allFiles) {
 +    if (!UserCompactionUtils.isDefault(compactionConfig.getSelector())) {
 +      return selectFiles(context, extent, allFiles, compactionConfig.getSelector());
 +    } else {
 +      return allFiles.keySet();
 +    }
 +  }
 +
 +  private static Set<StoredTabletFile> selectFiles(ServerContext context, KeyExtent extent,
 +      Map<StoredTabletFile,DataFileValue> datafiles, PluginConfig selectorConfig) {
 +
 +    log.debug("Selecting files for {} using {}", extent, selectorConfig);
 +
 +    CompactionSelector selector = newInstance(context.getTableConfiguration(extent.tableId()),
 +        selectorConfig.getClassName(), CompactionSelector.class);
 +
 +    final ServiceEnvironment senv = new ServiceEnvironmentImpl(context);
 +
 +    selector.init(new CompactionSelector.InitParameters() {
 +      @Override
 +      public Map<String,String> getOptions() {
 +        return selectorConfig.getOptions();
 +      }
 +
 +      @Override
 +      public PluginEnvironment getEnvironment() {
 +        return senv;
 +      }
 +
 +      @Override
 +      public TableId getTableId() {
 +        return extent.tableId();
 +      }
 +    });
 +
 +    CompactionSelector.Selection selection =
 +        selector.select(new CompactionSelector.SelectionParameters() {
 +          @Override
 +          public PluginEnvironment getEnvironment() {
 +            return senv;
 +          }
 +
 +          @Override
 +          public Collection<CompactableFile> getAvailableFiles() {
 +            return Collections2.transform(datafiles.entrySet(),
 +                e -> new CompactableFileImpl(e.getKey(), e.getValue()));
 +          }
 +
 +          @Override
 +          public Collection<Summary> getSummaries(Collection<CompactableFile> files,
 +              Predicate<SummarizerConfiguration> summarySelector) {
 +
 +            // ELASTICITY_TODO this may open files for user tables in the manager, need to avoid
 +            // this. See #3526
 +
 +            try {
 +              var tableConf = context.getTableConfiguration(extent.tableId());
 +
 +              SummaryCollection sc = new SummaryCollection();
 +              SummarizerFactory factory = new SummarizerFactory(tableConf);
 +              for (CompactableFile cf : files) {
 +                var file = CompactableFileImpl.toStoredTabletFile(cf);
 +                FileSystem fs = context.getVolumeManager().getFileSystemByPath(file.getPath());
 +                Configuration conf = context.getHadoopConf();
 +                RFileSource source = new RFileSource(new FSDataInputStream(fs.open(file.getPath())),
 +                    fs.getFileStatus(file.getPath()).getLen(), file.getRange());
 +
 +                SummaryCollection fsc = SummaryReader
 +                    .load(conf, source, file.getFileName(), summarySelector, factory,
 +                        tableConf.getCryptoService())
 +                    .getSummaries(Collections.singletonList(new Gatherer.RowRange(extent)));
 +
 +                sc.merge(fsc, factory);
 +              }
 +              return sc.getSummaries();
 +            } catch (IOException ioe) {
 +              throw new UncheckedIOException(ioe);
 +            }
 +          }
 +
 +          @Override
 +          public TableId getTableId() {
 +            return extent.tableId();
 +          }
 +
 +          @Override
 +          public TabletId getTabletId() {
 +            return new TabletIdImpl(extent);
 +          }
 +
 +          @Override
 +          public Optional<SortedKeyValueIterator<Key,Value>> getSample(CompactableFile cf,
 +              SamplerConfiguration sc) {
 +
 +            // ELASTICITY_TODO this may open files for user tables in the manager, need to avoid
 +            // this. See #3526
 +
 +            try {
 +              var file = CompactableFileImpl.toStoredTabletFile(cf);
 +              FileSystem fs = context.getVolumeManager().getFileSystemByPath(file.getPath());
 +              Configuration conf = context.getHadoopConf();
 +              var tableConf = context.getTableConfiguration(extent.tableId());
 +              var iter = FileOperations.getInstance().newReaderBuilder()
 +                  .forFile(file, fs, conf, tableConf.getCryptoService())
 +                  .withTableConfiguration(tableConf).seekToBeginning().build();
 +              var sampleIter = iter.getSample(new SamplerConfigurationImpl(sc));
 +              if (sampleIter == null) {
 +                iter.close();
 +                return Optional.empty();
 +              }
 +
 +              return Optional.of(sampleIter);
 +            } catch (IOException ioe) {
 +              throw new UncheckedIOException(ioe);
 +            }
 +          }
 +        });
 +
 +    return selection.getFilesToCompact().stream().map(CompactableFileImpl::toStoredTabletFile)
 +        .collect(Collectors.toSet());
 +  }
 +
 +  public static Map<String,String> computeOverrides(Optional<CompactionConfig> compactionConfig,
-       ServerContext context, KeyExtent extent, Set<CompactableFile> files) {
++      ServerContext context, KeyExtent extent, Set<CompactableFile> inputFiles,
++      Set<CompactableFile> selectedFiles) {
 +
 +    if (compactionConfig.isPresent()
 +        && !UserCompactionUtils.isDefault(compactionConfig.orElseThrow().getConfigurer())) {
-       return CompactionPluginUtils.computeOverrides(context, extent, files,
++      return CompactionPluginUtils.computeOverrides(context, extent, inputFiles, selectedFiles,
 +          compactionConfig.orElseThrow().getConfigurer());
 +    }
 +
 +    var tableConf = context.getTableConfiguration(extent.tableId());
 +
 +    var configurorClass = tableConf.get(Property.TABLE_COMPACTION_CONFIGURER);
 +    if (configurorClass == null || configurorClass.isBlank()) {
 +      return Map.of();
 +    }
 +
 +    var opts =
 +        tableConf.getAllPropertiesWithPrefixStripped(Property.TABLE_COMPACTION_CONFIGURER_OPTS);
 +
-     return CompactionPluginUtils.computeOverrides(context, extent, files,
++    return CompactionPluginUtils.computeOverrides(context, extent, inputFiles, selectedFiles,
 +        new PluginConfig(configurorClass, opts));
 +  }
 +
 +  public static Map<String,String> computeOverrides(ServerContext context, KeyExtent extent,
-       Set<CompactableFile> files, PluginConfig cfg) {
++      Set<CompactableFile> inputFiles, Set<CompactableFile> selectedFiles, PluginConfig cfg) {
 +
 +    CompactionConfigurer configurer = newInstance(context.getTableConfiguration(extent.tableId()),
 +        cfg.getClassName(), CompactionConfigurer.class);
 +
 +    final ServiceEnvironment senv = new ServiceEnvironmentImpl(context);
 +
 +    configurer.init(new CompactionConfigurer.InitParameters() {
 +      @Override
 +      public Map<String,String> getOptions() {
 +        return cfg.getOptions();
 +      }
 +
 +      @Override
 +      public PluginEnvironment getEnvironment() {
 +        return senv;
 +      }
 +
 +      @Override
 +      public TableId getTableId() {
 +        return extent.tableId();
 +      }
 +    });
 +
 +    var overrides = configurer.override(new CompactionConfigurer.InputParameters() {
 +      @Override
 +      public Collection<CompactableFile> getInputFiles() {
-         return files;
++        return inputFiles;
++      }
++
++      @Override
++      public Set<CompactableFile> getSelectedFiles() {
++        return selectedFiles;
 +      }
 +
 +      @Override
 +      public PluginEnvironment getEnvironment() {
 +        return senv;
 +      }
 +
 +      @Override
 +      public TableId getTableId() {
 +        return extent.tableId();
 +      }
 +
 +      @Override
 +      public TabletId getTabletId() {
 +        return new TabletIdImpl(extent);
 +      }
 +    });
 +
 +    if (overrides.getOverrides().isEmpty()) {
 +      return null;
 +    }
 +
 +    return overrides.getOverrides();
 +  }
 +
 +  static CompactionDispatcher createDispatcher(ServiceEnvironment env, TableId tableId) {
 +
 +    var conf = env.getConfiguration(tableId);
 +
 +    var className = conf.get(Property.TABLE_COMPACTION_DISPATCHER.getKey());
 +
 +    Map<String,String> opts = new HashMap<>();
 +
 +    conf.getWithPrefix(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey()).forEach((k, v) -> {
 +      opts.put(k.substring(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey().length()), v);
 +    });
 +
 +    var finalOpts = Collections.unmodifiableMap(opts);
 +
 +    CompactionDispatcher.InitParameters initParameters = new CompactionDispatcher.InitParameters() {
 +      @Override
 +      public Map<String,String> getOptions() {
 +        return finalOpts;
 +      }
 +
 +      @Override
 +      public TableId getTableId() {
 +        return tableId;
 +      }
 +
 +      @Override
 +      public ServiceEnvironment getServiceEnv() {
 +        return env;
 +      }
 +    };
 +
 +    CompactionDispatcher dispatcher = null;
 +    try {
 +      dispatcher = env.instantiate(tableId, className, CompactionDispatcher.class);
 +    } catch (ReflectiveOperationException e) {
 +      throw new RuntimeException(e);
 +    }
 +
 +    dispatcher.init(initParameters);
 +
 +    return dispatcher;
 +  }
 +}
diff --cc server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 6d2a51c46f,0000000000..c03cb3241f
mode 100644,000000..100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@@ -1,1388 -1,0 +1,1399 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.manager.compaction.coordinator;
 +
 +import static java.util.concurrent.TimeUnit.MILLISECONDS;
 +import static java.util.concurrent.TimeUnit.MINUTES;
 +import static java.util.concurrent.TimeUnit.SECONDS;
 +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED;
 +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP;
 +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
 +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
 +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
 +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
 +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SCANS;
 +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED;
 +
 +import java.io.FileNotFoundException;
 +import java.io.IOException;
 +import java.io.UncheckedIOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.EnumMap;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Objects;
 +import java.util.Optional;
 +import java.util.Set;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.ScheduledFuture;
 +import java.util.concurrent.ScheduledThreadPoolExecutor;
 +import java.util.concurrent.ThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +import java.util.function.Consumer;
 +import java.util.stream.Collectors;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.TableDeletedException;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.admin.CompactionConfig;
++import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
 +import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
 +import org.apache.accumulo.core.clientImpl.thrift.TInfo;
 +import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
 +import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
 +import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
 +import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
 +import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
 +import org.apache.accumulo.core.compaction.thrift.TCompactionState;
 +import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate;
 +import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
 +import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.NamespaceId;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 +import org.apache.accumulo.core.fate.FateTxId;
 +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 +import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
 +import org.apache.accumulo.core.metadata.AbstractTabletFile;
 +import org.apache.accumulo.core.metadata.CompactableFileImpl;
 +import org.apache.accumulo.core.metadata.ReferencedTabletFile;
 +import org.apache.accumulo.core.metadata.StoredTabletFile;
 +import org.apache.accumulo.core.metadata.schema.Ample;
 +import org.apache.accumulo.core.metadata.schema.Ample.Refreshes.RefreshEntry;
 +import org.apache.accumulo.core.metadata.schema.Ample.RejectionHandler;
 +import org.apache.accumulo.core.metadata.schema.DataFileValue;
 +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
 +import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata;
 +import org.apache.accumulo.core.metadata.schema.SelectedFiles;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 +import org.apache.accumulo.core.metrics.MetricsProducer;
 +import org.apache.accumulo.core.rpc.ThriftUtil;
 +import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
 +import org.apache.accumulo.core.spi.compaction.CompactionJob;
 +import org.apache.accumulo.core.spi.compaction.CompactionKind;
 +import org.apache.accumulo.core.tabletserver.thrift.InputFile;
 +import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig;
 +import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind;
 +import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
 +import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService;
 +import org.apache.accumulo.core.util.Retry;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.core.util.cache.Caches.CacheName;
 +import org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl;
 +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 +import org.apache.accumulo.core.util.compaction.RunningCompaction;
 +import org.apache.accumulo.core.util.threads.ThreadPools;
 +import org.apache.accumulo.core.util.threads.Threads;
 +import org.apache.accumulo.core.volume.Volume;
 +import org.apache.accumulo.manager.EventCoordinator;
 +import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues;
 +import org.apache.accumulo.manager.tableOps.bulkVer2.TabletRefresher;
 +import org.apache.accumulo.server.ServerContext;
 +import org.apache.accumulo.server.compaction.CompactionConfigStorage;
 +import org.apache.accumulo.server.compaction.CompactionPluginUtils;
 +import org.apache.accumulo.server.manager.LiveTServerSet;
 +import org.apache.accumulo.server.security.SecurityOperation;
 +import org.apache.accumulo.server.tablets.TabletNameGenerator;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.thrift.TException;
 +import org.apache.zookeeper.KeeperException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.github.benmanes.caffeine.cache.Cache;
 +import com.github.benmanes.caffeine.cache.CacheLoader;
 +import com.github.benmanes.caffeine.cache.LoadingCache;
 +import com.github.benmanes.caffeine.cache.Weigher;
 +import com.google.common.base.Preconditions;
 +import com.google.common.collect.Iterators;
 +import com.google.common.collect.Sets;
 +import com.google.common.net.HostAndPort;
 +import com.google.common.util.concurrent.MoreExecutors;
 +
 +import io.micrometer.core.instrument.Gauge;
 +import io.micrometer.core.instrument.MeterRegistry;
 +
 +public class CompactionCoordinator
 +    implements CompactionCoordinatorService.Iface, Runnable, MetricsProducer {
 +
 +  private static final Logger LOG = LoggerFactory.getLogger(CompactionCoordinator.class);
 +  private static final long FIFTEEN_MINUTES = TimeUnit.MINUTES.toMillis(15);
 +
 +  /*
 +   * Map of compactionId to RunningCompactions. This is an informational cache of what external
 +   * compactions may be running. Its possible it may contain external compactions that are not
 +   * actually running. It may not contain compactions that are actually running. The metadata table
 +   * is the most authoritative source of what external compactions are currently running, but it
 +   * does not have the stats that this map has.
 +   */
 +  protected static final Map<ExternalCompactionId,RunningCompaction> RUNNING_CACHE =
 +      new ConcurrentHashMap<>();
 +
 +  /*
 +   * When the manager starts up any refreshes that were in progress when the last manager process
 +   * died must be completed before new refresh entries are written. This map of countdown latches
 +   * helps achieve that goal.
 +   */
 +  private final Map<Ample.DataLevel,CountDownLatch> refreshLatches;
 +
 +  /* Map of group name to last time compactor called to get a compaction job */
 +  // ELASTICITY_TODO need to clean out groups that are no longer configured..
 +  private static final Map<String,Long> TIME_COMPACTOR_LAST_CHECKED = new ConcurrentHashMap<>();
 +
 +  private final ServerContext ctx;
 +  private final LiveTServerSet tserverSet;
 +  private final SecurityOperation security;
 +  private final CompactionJobQueues jobQueues;
 +  private final EventCoordinator eventCoordinator;
 +  // Exposed for tests
 +  protected volatile Boolean shutdown = false;
 +
 +  private final ScheduledThreadPoolExecutor schedExecutor;
 +
 +  private final Cache<ExternalCompactionId,RunningCompaction> completed;
 +  private LoadingCache<Long,CompactionConfig> compactionConfigCache;
 +  private final Cache<Path,Integer> checked_tablet_dir_cache;
 +  private final DeadCompactionDetector deadCompactionDetector;
 +
 +  private final QueueMetrics queueMetrics;
 +
 +  public CompactionCoordinator(ServerContext ctx, LiveTServerSet tservers,
 +      SecurityOperation security, EventCoordinator eventCoordinator) {
 +    this.ctx = ctx;
 +    this.tserverSet = tservers;
 +    this.schedExecutor = this.ctx.getScheduledExecutor();
 +    this.security = security;
 +    this.eventCoordinator = eventCoordinator;
 +
 +    this.jobQueues = new CompactionJobQueues(
 +        ctx.getConfiguration().getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE));
 +
 +    this.queueMetrics = new QueueMetrics(jobQueues);
 +
 +    var refreshLatches = new EnumMap<Ample.DataLevel,CountDownLatch>(Ample.DataLevel.class);
 +    refreshLatches.put(Ample.DataLevel.ROOT, new CountDownLatch(1));
 +    refreshLatches.put(Ample.DataLevel.METADATA, new CountDownLatch(1));
 +    refreshLatches.put(Ample.DataLevel.USER, new CountDownLatch(1));
 +    this.refreshLatches = Collections.unmodifiableMap(refreshLatches);
 +
 +    completed = ctx.getCaches().createNewBuilder(CacheName.COMPACTIONS_COMPLETED, true)
 +        .maximumSize(200).expireAfterWrite(10, TimeUnit.MINUTES).build();
 +
 +    CacheLoader<Long,CompactionConfig> loader =
 +        txid -> CompactionConfigStorage.getConfig(ctx, txid);
 +
 +    // Keep a small short lived cache of compaction config. Compaction config never changes, however
 +    // when a compaction is canceled it is deleted which is why there is a time limit. It does not
 +    // hurt to let a job that was canceled start, it will be canceled later. Caching this immutable
 +    // config will help avoid reading the same data over and over.
 +    compactionConfigCache = ctx.getCaches().createNewBuilder(CacheName.COMPACTION_CONFIGS, true)
 +        .expireAfterWrite(30, SECONDS).maximumSize(100).build(loader);
 +
 +    Weigher<Path,Integer> weigher = (path, count) -> {
 +      return path.toUri().toString().length();
 +    };
 +
 +    checked_tablet_dir_cache =
 +        ctx.getCaches().createNewBuilder(CacheName.COMPACTION_DIR_CACHE, true)
 +            .maximumWeight(10485760L).weigher(weigher).build();
 +
 +    deadCompactionDetector = new DeadCompactionDetector(this.ctx, this, schedExecutor);
 +    // At this point the manager does not have its lock so no actions should be taken yet
 +  }
 +
 +  private volatile Thread serviceThread = null;
 +
 +  public void start() {
 +    serviceThread = Threads.createThread("CompactionCoordinator Thread", this);
 +    serviceThread.start();
 +  }
 +
 +  public void shutdown() {
 +    shutdown = true;
 +    var localThread = serviceThread;
 +    if (localThread != null) {
 +      try {
 +        localThread.join();
 +      } catch (InterruptedException e) {
 +        LOG.error("Exception stopping compaction coordinator thread", e);
 +      }
 +    }
 +  }
 +
 +  protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) {
 +    ScheduledFuture<?> future =
 +        schedExecutor.scheduleWithFixedDelay(this::cleanUpCompactors, 0, 5, TimeUnit.MINUTES);
 +    ThreadPools.watchNonCriticalScheduledTask(future);
 +  }
 +
 +  protected void startRunningCleaner(ScheduledThreadPoolExecutor schedExecutor) {
 +    ScheduledFuture<?> future =
 +        schedExecutor.scheduleWithFixedDelay(this::cleanUpRunning, 0, 5, TimeUnit.MINUTES);
 +    ThreadPools.watchNonCriticalScheduledTask(future);
 +  }
 +
 +  private void processRefreshes(Ample.DataLevel dataLevel) {
 +    try (var refreshStream = ctx.getAmple().refreshes(dataLevel).stream()) {
 +      // process batches of refresh entries to avoid reading all into memory at once
 +      Iterators.partition(refreshStream.iterator(), 10000).forEachRemaining(refreshEntries -> {
 +        LOG.info("Processing {} tablet refreshes for {}", refreshEntries.size(), dataLevel);
 +
 +        var extents =
 +            refreshEntries.stream().map(RefreshEntry::getExtent).collect(Collectors.toList());
 +        var tabletsMeta = new HashMap<KeyExtent,TabletMetadata>();
 +        try (var tablets = ctx.getAmple().readTablets().forTablets(extents, Optional.empty())
 +            .fetch(PREV_ROW, LOCATION, SCANS).build()) {
 +          tablets.stream().forEach(tm -> tabletsMeta.put(tm.getExtent(), tm));
 +        }
 +
 +        var tserverRefreshes = new HashMap<TabletMetadata.Location,List<TKeyExtent>>();
 +
 +        refreshEntries.forEach(refreshEntry -> {
 +          var tm = tabletsMeta.get(refreshEntry.getExtent());
 +
 +          // only need to refresh if the tablet is still on the same tserver instance
 +          if (tm != null && tm.getLocation() != null
 +              && tm.getLocation().getServerInstance().equals(refreshEntry.getTserver())) {
 +            KeyExtent extent = tm.getExtent();
 +            Collection<StoredTabletFile> scanfiles = tm.getScans();
 +            var ttr = extent.toThrift();
 +            tserverRefreshes.computeIfAbsent(tm.getLocation(), k -> new ArrayList<>()).add(ttr);
 +          }
 +        });
 +
 +        String logId = "Coordinator:" + dataLevel;
 +        ThreadPoolExecutor threadPool =
 +            ctx.threadPools().createFixedThreadPool(10, "Tablet refresh " + logId, false);
 +        try {
 +          TabletRefresher.refreshTablets(threadPool, logId, ctx, tserverSet::getCurrentServers,
 +              tserverRefreshes);
 +        } finally {
 +          threadPool.shutdownNow();
 +        }
 +
 +        ctx.getAmple().refreshes(dataLevel).delete(refreshEntries);
 +      });
 +    }
 +    // allow new refreshes to be written now that all preexisting ones are processed
 +    refreshLatches.get(dataLevel).countDown();
 +  }
 +
 +  @Override
 +  public void run() {
 +
 +    processRefreshes(Ample.DataLevel.ROOT);
 +    processRefreshes(Ample.DataLevel.METADATA);
 +    processRefreshes(Ample.DataLevel.USER);
 +
 +    startCompactionCleaner(schedExecutor);
 +    startRunningCleaner(schedExecutor);
 +
 +    // On a re-start of the coordinator it's possible that external compactions are in-progress.
 +    // Attempt to get the running compactions on the compactors and then resolve which tserver
 +    // the external compaction came from to re-populate the RUNNING collection.
 +    LOG.info("Checking for running external compactions");
 +    // On re-start contact the running Compactors to try and seed the list of running compactions
 +    List<RunningCompaction> running = getCompactionsRunningOnCompactors();
 +    if (running.isEmpty()) {
 +      LOG.info("No running external compactions found");
 +    } else {
 +      LOG.info("Found {} running external compactions", running.size());
 +      running.forEach(rc -> {
 +        TCompactionStatusUpdate update = new TCompactionStatusUpdate();
 +        update.setState(TCompactionState.IN_PROGRESS);
 +        update.setMessage("Coordinator restarted, compaction found in progress");
 +        rc.addUpdate(System.currentTimeMillis(), update);
 +        RUNNING_CACHE.put(ExternalCompactionId.of(rc.getJob().getExternalCompactionId()), rc);
 +      });
 +    }
 +
 +    startDeadCompactionDetector();
 +
 +    // ELASTICITY_TODO the main function of the following loop was getting group summaries from
 +    // tservers. Its no longer doing that. May be best to remove the loop and make the remaining
 +    // task a scheduled one.
 +
 +    LOG.info("Starting loop to check tservers for compaction summaries");
 +    while (!shutdown) {
 +      long start = System.currentTimeMillis();
 +
 +      long now = System.currentTimeMillis();
 +      TIME_COMPACTOR_LAST_CHECKED.forEach((k, v) -> {
 +        if ((now - v) > getMissingCompactorWarningTime()) {
 +          // ELASTICITY_TODO may want to consider of the group has any jobs queued OR if the group
 +          // still exist in configuration
 +          LOG.warn("No compactors have checked in with coordinator for group {} in {}ms", k,
 +              getMissingCompactorWarningTime());
 +        }
 +      });
 +
 +      long checkInterval = getTServerCheckInterval();
 +      long duration = (System.currentTimeMillis() - start);
 +      if (checkInterval - duration > 0) {
 +        LOG.debug("Waiting {}ms for next group check", (checkInterval - duration));
 +        UtilWaitThread.sleep(checkInterval - duration);
 +      }
 +    }
 +
 +    LOG.info("Shutting down");
 +  }
 +
 +  protected void startDeadCompactionDetector() {
 +    deadCompactionDetector.start();
 +  }
 +
 +  protected long getMissingCompactorWarningTime() {
 +    return FIFTEEN_MINUTES;
 +  }
 +
 +  protected long getTServerCheckInterval() {
 +    return this.ctx.getConfiguration()
 +        .getTimeInMillis(Property.COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL);
 +  }
 +
 +  public long getNumRunningCompactions() {
 +    return RUNNING_CACHE.size();
 +  }
 +
 +  /**
 +   * Return the next compaction job from the queue to a Compactor
 +   *
 +   * @param groupName group
 +   * @param compactorAddress compactor address
 +   * @throws ThriftSecurityException when permission error
 +   * @return compaction job
 +   */
 +  @Override
 +  public TExternalCompactionJob getCompactionJob(TInfo tinfo, TCredentials credentials,
 +      String groupName, String compactorAddress, String externalCompactionId)
 +      throws ThriftSecurityException {
 +
 +    // do not expect users to call this directly, expect compactors to call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +    final String group = groupName.intern();
 +    LOG.trace("getCompactionJob called for group {} by compactor {}", group, compactorAddress);
 +    TIME_COMPACTOR_LAST_CHECKED.put(group, System.currentTimeMillis());
 +
 +    TExternalCompactionJob result = null;
 +
 +    CompactionJobQueues.MetaJob metaJob =
 +        jobQueues.poll(CompactionExecutorIdImpl.externalId(groupName));
 +
 +    while (metaJob != null) {
 +
 +      Optional<CompactionConfig> compactionConfig = getCompactionConfig(metaJob);
 +
 +      // this method may reread the metadata, do not use the metadata in metaJob for anything after
 +      // this method
 +      ExternalCompactionMetadata ecm = null;
 +
 +      var kind = metaJob.getJob().getKind();
 +
 +      // Only reserve user compactions when the config is present. When compactions are canceled the
 +      // config is deleted.
 +      if (kind == CompactionKind.SYSTEM
 +          || (kind == CompactionKind.USER && compactionConfig.isPresent())) {
 +        ecm = reserveCompaction(metaJob, compactorAddress,
 +            ExternalCompactionId.from(externalCompactionId));
 +      }
 +
 +      if (ecm != null) {
 +        result = createThriftJob(externalCompactionId, ecm, metaJob, compactionConfig);
 +        // It is possible that by the time this added that the the compactor that made this request
 +        // is dead. In this cases the compaction is not actually running.
 +        RUNNING_CACHE.put(ExternalCompactionId.of(result.getExternalCompactionId()),
 +            new RunningCompaction(result, compactorAddress, group));
 +        LOG.debug("Returning external job {} to {} with {} files", result.externalCompactionId,
 +            compactorAddress, ecm.getJobFiles().size());
 +        break;
 +      } else {
 +        LOG.debug("Unable to reserve compaction job for {}, pulling another off the queue ",
 +            metaJob.getTabletMetadata().getExtent());
 +        metaJob = jobQueues.poll(CompactionExecutorIdImpl.externalId(groupName));
 +      }
 +    }
 +
 +    if (metaJob == null) {
 +      LOG.debug("No jobs found in group {} ", group);
 +    }
 +
 +    if (result == null) {
 +      LOG.trace("No jobs found for group {}, returning empty job to compactor {}", group,
 +          compactorAddress);
 +      result = new TExternalCompactionJob();
 +    }
 +
 +    return result;
 +
 +  }
 +
 +  // ELASTICITY_TODO unit test this code
 +  private boolean canReserveCompaction(TabletMetadata tablet, CompactionJob job,
 +      Set<StoredTabletFile> jobFiles) {
 +
 +    if (tablet == null) {
 +      // the tablet no longer exist
 +      return false;
 +    }
 +
 +    if (tablet.getOperationId() != null) {
 +      return false;
 +    }
 +
 +    if (!tablet.getFiles().containsAll(jobFiles)) {
 +      return false;
 +    }
 +
 +    var currentlyCompactingFiles = tablet.getExternalCompactions().values().stream()
 +        .flatMap(ecm -> ecm.getJobFiles().stream()).collect(Collectors.toSet());
 +
 +    if (!Collections.disjoint(jobFiles, currentlyCompactingFiles)) {
 +      return false;
 +    }
 +
 +    switch (job.getKind()) {
 +      case SYSTEM:
 +        if (tablet.getSelectedFiles() != null
 +            && !Collections.disjoint(jobFiles, tablet.getSelectedFiles().getFiles())) {
 +          return false;
 +        }
 +        break;
 +      case USER:
 +      case SELECTOR:
 +        if (tablet.getSelectedFiles() == null
 +            || !tablet.getSelectedFiles().getFiles().containsAll(jobFiles)) {
 +          return false;
 +        }
 +        break;
 +      default:
 +        throw new UnsupportedOperationException("Not currently handling " + job.getKind());
 +    }
 +
 +    return true;
 +  }
 +
 +  private void checkTabletDir(KeyExtent extent, Path path) {
 +    try {
 +      if (checked_tablet_dir_cache.getIfPresent(path) == null) {
 +        FileStatus[] files = null;
 +        try {
 +          files = ctx.getVolumeManager().listStatus(path);
 +        } catch (FileNotFoundException ex) {
 +          // ignored
 +        }
 +
 +        if (files == null) {
 +          LOG.debug("Tablet {} had no dir, creating {}", extent, path);
 +
 +          ctx.getVolumeManager().mkdirs(path);
 +        }
 +        checked_tablet_dir_cache.put(path, 1);
 +      }
 +    } catch (IOException e) {
 +      throw new UncheckedIOException(e);
 +    }
 +  }
 +
 +  private ExternalCompactionMetadata createExternalCompactionMetadata(CompactionJob job,
 +      Set<StoredTabletFile> jobFiles, TabletMetadata tablet, String compactorAddress,
 +      ExternalCompactionId externalCompactionId) {
 +    boolean propDels;
 +
 +    Long fateTxId = null;
 +
 +    switch (job.getKind()) {
 +      case SYSTEM: {
 +        boolean compactingAll = tablet.getFiles().equals(jobFiles);
 +        propDels = !compactingAll;
 +      }
 +        break;
 +      case SELECTOR:
 +      case USER: {
 +        boolean compactingAll = tablet.getSelectedFiles().initiallySelectedAll()
 +            && tablet.getSelectedFiles().getFiles().equals(jobFiles);
 +        propDels = !compactingAll;
 +        fateTxId = tablet.getSelectedFiles().getFateTxId();
 +      }
 +        break;
 +      default:
 +        throw new IllegalArgumentException();
 +    }
 +
 +    Consumer<String> directoryCreator = dir -> checkTabletDir(tablet.getExtent(), new Path(dir));
 +    ReferencedTabletFile newFile = TabletNameGenerator.getNextDataFilenameForMajc(propDels, ctx,
 +        tablet, directoryCreator, externalCompactionId);
 +
 +    return new ExternalCompactionMetadata(jobFiles, newFile, compactorAddress, job.getKind(),
 +        job.getPriority(), job.getExecutor(), propDels, fateTxId);
 +
 +  }
 +
 +  private ExternalCompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob metaJob,
 +      String compactorAddress, ExternalCompactionId externalCompactionId) {
 +
 +    Preconditions.checkArgument(metaJob.getJob().getKind() == CompactionKind.SYSTEM
 +        || metaJob.getJob().getKind() == CompactionKind.USER);
 +
 +    var tabletMetadata = metaJob.getTabletMetadata();
 +
 +    var jobFiles = metaJob.getJob().getFiles().stream().map(CompactableFileImpl::toStoredTabletFile)
 +        .collect(Collectors.toSet());
 +
 +    Retry retry =
 +        Retry.builder().maxRetries(5).retryAfter(100, MILLISECONDS).incrementBy(100, MILLISECONDS)
 +            .maxWait(10, SECONDS).backOffFactor(1.5).logInterval(3, MINUTES).createRetry();
 +
 +    while (retry.canRetry()) {
 +      try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
 +        var extent = metaJob.getTabletMetadata().getExtent();
 +
 +        if (!canReserveCompaction(tabletMetadata, metaJob.getJob(), jobFiles)) {
 +          return null;
 +        }
 +
 +        var ecm = createExternalCompactionMetadata(metaJob.getJob(), jobFiles, tabletMetadata,
 +            compactorAddress, externalCompactionId);
 +
 +        // any data that is read from the tablet to make a decision about if it can compact or not
 +        // must be included in the requireSame call
 +        var tabletMutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation()
 +            .requireSame(tabletMetadata, FILES, SELECTED, ECOMP);
 +
 +        tabletMutator.putExternalCompaction(externalCompactionId, ecm);
 +        tabletMutator.submit(tm -> tm.getExternalCompactions().containsKey(externalCompactionId));
 +
 +        var result = tabletsMutator.process().get(extent);
 +
 +        if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
 +          return ecm;
 +        } else {
 +          tabletMetadata = result.readMetadata();
 +        }
 +      }
 +
 +      retry.useRetry();
 +      try {
 +        retry.waitForNextAttempt(LOG,
 +            "Reserved compaction for " + metaJob.getTabletMetadata().getExtent());
 +      } catch (InterruptedException e) {
 +        throw new RuntimeException(e);
 +      }
 +    }
 +
 +    return null;
 +  }
 +
 +  TExternalCompactionJob createThriftJob(String externalCompactionId,
 +      ExternalCompactionMetadata ecm, CompactionJobQueues.MetaJob metaJob,
 +      Optional<CompactionConfig> compactionConfig) {
 +
++    Set<CompactableFile> selectedFiles;
++    if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
++      selectedFiles = Set.of();
++    } else {
++      selectedFiles = metaJob.getTabletMetadata().getSelectedFiles().getFiles().stream()
++          .map(file -> new CompactableFileImpl(file,
++              metaJob.getTabletMetadata().getFilesMap().get(file)))
++          .collect(Collectors.toUnmodifiableSet());
++    }
++
 +    Map<String,String> overrides = CompactionPluginUtils.computeOverrides(compactionConfig, ctx,
-         metaJob.getTabletMetadata().getExtent(), metaJob.getJob().getFiles());
++        metaJob.getTabletMetadata().getExtent(), metaJob.getJob().getFiles(), selectedFiles);
 +
 +    IteratorConfig iteratorSettings = SystemIteratorUtil
 +        .toIteratorConfig(compactionConfig.map(CompactionConfig::getIterators).orElse(List.of()));
 +
 +    var files = ecm.getJobFiles().stream().map(storedTabletFile -> {
 +      var dfv = metaJob.getTabletMetadata().getFilesMap().get(storedTabletFile);
 +      return new InputFile(storedTabletFile.getMetadata(), dfv.getSize(), dfv.getNumEntries(),
 +          dfv.getTime());
 +    }).collect(Collectors.toList());
 +
 +    long fateTxid = 0;
 +    if (metaJob.getJob().getKind() == CompactionKind.USER) {
 +      fateTxid = metaJob.getTabletMetadata().getSelectedFiles().getFateTxId();
 +    }
 +
 +    return new TExternalCompactionJob(externalCompactionId,
 +        metaJob.getTabletMetadata().getExtent().toThrift(), files, iteratorSettings,
 +        ecm.getCompactTmpName().getNormalizedPathStr(), ecm.getPropagateDeletes(),
 +        TCompactionKind.valueOf(ecm.getKind().name()), fateTxid, overrides);
 +  }
 +
 +  @Override
 +  public void registerMetrics(MeterRegistry registry) {
 +    Gauge.builder(METRICS_MAJC_QUEUED, jobQueues, CompactionJobQueues::getQueuedJobCount)
 +        .description("Number of queued major compactions").register(registry);
 +    Gauge.builder(METRICS_MAJC_RUNNING, this, CompactionCoordinator::getNumRunningCompactions)
 +        .description("Number of running major compactions").register(registry);
 +
 +    queueMetrics.registerMetrics(registry);
 +  }
 +
 +  public void addJobs(TabletMetadata tabletMetadata, Collection<CompactionJob> jobs) {
 +    jobQueues.add(tabletMetadata, jobs);
 +  }
 +
 +  public CompactionCoordinatorService.Iface getThriftService() {
 +    return this;
 +  }
 +
 +  class RefreshWriter {
 +
 +    private final ExternalCompactionId ecid;
 +    private final KeyExtent extent;
 +
 +    private RefreshEntry writtenEntry;
 +
 +    RefreshWriter(ExternalCompactionId ecid, KeyExtent extent) {
 +      this.ecid = ecid;
 +      this.extent = extent;
 +
 +      var dataLevel = Ample.DataLevel.of(extent.tableId());
 +      try {
 +        // Wait for any refresh entries from the previous manager process to be processed before
 +        // writing new ones.
 +        refreshLatches.get(dataLevel).await();
 +      } catch (InterruptedException e) {
 +        throw new RuntimeException(e);
 +      }
 +    }
 +
 +    public void writeRefresh(TabletMetadata.Location location) {
 +      Objects.requireNonNull(location);
 +
 +      if (writtenEntry != null) {
 +        if (location.getServerInstance().equals(writtenEntry.getTserver())) {
 +          // the location was already written so nothing to do
 +          return;
 +        } else {
 +          deleteRefresh();
 +        }
 +      }
 +
 +      var entry = new RefreshEntry(ecid, extent, location.getServerInstance());
 +
 +      ctx.getAmple().refreshes(Ample.DataLevel.of(extent.tableId())).add(List.of(entry));
 +
 +      LOG.debug("wrote refresh entry for {}", ecid);
 +
 +      writtenEntry = entry;
 +    }
 +
 +    public void deleteRefresh() {
 +      if (writtenEntry != null) {
 +        ctx.getAmple().refreshes(Ample.DataLevel.of(extent.tableId()))
 +            .delete(List.of(writtenEntry));
 +        LOG.debug("deleted refresh entry for {}", ecid);
 +        writtenEntry = null;
 +      }
 +    }
 +  }
 +
 +  private Optional<CompactionConfig> getCompactionConfig(CompactionJobQueues.MetaJob metaJob) {
 +    if (metaJob.getJob().getKind() == CompactionKind.USER
 +        && metaJob.getTabletMetadata().getSelectedFiles() != null) {
 +      var cconf =
 +          compactionConfigCache.get(metaJob.getTabletMetadata().getSelectedFiles().getFateTxId());
 +      return Optional.ofNullable(cconf);
 +    }
 +    return Optional.empty();
 +  }
 +
 +  /**
 +   * Compactors calls this method when they have finished a compaction. This method does the
 +   * following.
 +   *
 +   * <ol>
 +   * <li>Reads the tablets metadata and determines if the compaction can commit. Its possible that
 +   * things changed while the compaction was running and it can no longer commit.</li>
 +   * <li>If the compaction can commit then a ~refresh entry may be written to the metadata table.
 +   * This is done before attempting to commit to cover the case of process failure after commit. If
 +   * the manager dies after commit then when it restarts it will see the ~refresh entry and refresh
 +   * that tablet. The ~refresh entry is only written when its a system compaction on a tablet with a
 +   * location.</li>
 +   * <li>Commit the compaction using a conditional mutation. If the tablets files or location
 +   * changed since reading the tablets metadata, then conditional mutation will fail. When this
 +   * happens it will reread the metadata and go back to step 1 conceptually. When committing a
 +   * compaction the compacted files are removed and scan entries are added to the tablet in case the
 +   * files are in use, this prevents GC from deleting the files between updating tablet metadata and
 +   * refreshing the tablet. The scan entries are only added when a tablet has a location.</li>
 +   * <li>After successful commit a refresh request is sent to the tablet if it has a location. This
 +   * will cause the tablet to start using the newly compacted files for future scans. Also the
 +   * tablet can delete the scan entries if there are no active scans using them.</li>
 +   * <li>If a ~refresh entry was written, delete it since the refresh was successful.</li>
 +   * </ol>
 +   *
 +   * <p>
 +   * User compactions will be refreshed as part of the fate operation. The user compaction fate
 +   * operation will see the compaction was committed after this code updates the tablet metadata,
 +   * however if it were to rely on this code to do the refresh it would not be able to know when the
 +   * refresh was actually done. Therefore, user compactions will refresh as part of the fate
 +   * operation so that it's known to be done before the fate operation returns. Since the fate
 +   * operation will do it, there is no need to do it here for user compactions.
 +   * </p>
 +   *
 +   * <p>
 +   * The ~refresh entries serve a similar purpose to FATE operations, it ensures that code executes
 +   * even when a process dies. FATE was intentionally not used for compaction commit because FATE
 +   * stores its data in zookeeper. The refresh entry is stored in the metadata table, which is much
 +   * more scalable than zookeeper. The number of system compactions of small files could be large
 +   * and this would be a large number of writes to zookeeper. Zookeeper scales somewhat with reads,
 +   * but not with writes.
 +   * </p>
 +   *
 +   * <p>
 +   * Issue #3559 was opened to explore the possibility of making compaction commit a fate operation
 +   * which would remove the need for the ~refresh section.
 +   * </p>
 +   *
 +   * @param tinfo trace info
 +   * @param credentials tcredentials object
 +   * @param externalCompactionId compaction id
 +   * @param textent tablet extent
 +   * @param stats compaction stats
 +   * @throws ThriftSecurityException when permission error
 +   */
 +  @Override
 +  public void compactionCompleted(TInfo tinfo, TCredentials credentials,
 +      String externalCompactionId, TKeyExtent textent, TCompactionStats stats)
 +      throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +
 +    var extent = KeyExtent.fromThrift(textent);
 +    LOG.info("Compaction completed, id: {}, stats: {}, extent: {}", externalCompactionId, stats,
 +        extent);
 +    final var ecid = ExternalCompactionId.of(externalCompactionId);
 +
 +    var tabletMeta =
 +        ctx.getAmple().readTablet(extent, ECOMP, SELECTED, LOCATION, FILES, COMPACTED, OPID);
 +
 +    if (!canCommitCompaction(ecid, tabletMeta)) {
 +      return;
 +    }
 +
 +    ExternalCompactionMetadata ecm = tabletMeta.getExternalCompactions().get(ecid);
 +
 +    // ELASTICITY_TODO this code does not handle race conditions or faults. Need to ensure refresh
 +    // happens in the case of manager process death between commit and refresh.
 +    ReferencedTabletFile newDatafile =
 +        TabletNameGenerator.computeCompactionFileDest(ecm.getCompactTmpName());
 +
 +    Optional<ReferencedTabletFile> optionalNewFile;
 +    try {
 +      optionalNewFile = renameOrDeleteFile(stats, ecm, newDatafile);
 +    } catch (IOException e) {
 +      LOG.warn("Can not commit complete compaction {} because unable to delete or rename {} ", ecid,
 +          ecm.getCompactTmpName(), e);
 +      compactionFailed(Map.of(ecid, extent));
 +      return;
 +    }
 +
 +    RefreshWriter refreshWriter = new RefreshWriter(ecid, extent);
 +
 +    try {
 +      tabletMeta = commitCompaction(stats, ecid, tabletMeta, optionalNewFile, refreshWriter);
 +    } catch (RuntimeException e) {
 +      LOG.warn("Failed to commit complete compaction {} {}", ecid, extent, e);
 +      compactionFailed(Map.of(ecid, extent));
 +    }
 +
 +    if (ecm.getKind() != CompactionKind.USER) {
 +      refreshTablet(tabletMeta);
 +    }
 +
 +    // if a refresh entry was written, it can be removed after the tablet was refreshed
 +    refreshWriter.deleteRefresh();
 +
 +    // It's possible that RUNNING might not have an entry for this ecid in the case
 +    // of a coordinator restart when the Coordinator can't find the TServer for the
 +    // corresponding external compaction.
 +    recordCompletion(ecid);
 +
 +    // This will causes the tablet to be reexamined to see if it needs any more compactions.
 +    eventCoordinator.event(extent, "Compaction completed %s", extent);
 +  }
 +
 +  private Optional<ReferencedTabletFile> renameOrDeleteFile(TCompactionStats stats,
 +      ExternalCompactionMetadata ecm, ReferencedTabletFile newDatafile) throws IOException {
 +    if (stats.getEntriesWritten() == 0) {
 +      // the compaction produced no output so do not need to rename or add a file to the metadata
 +      // table, only delete the input files.
 +      if (!ctx.getVolumeManager().delete(ecm.getCompactTmpName().getPath())) {
 +        throw new IOException("delete returned false");
 +      }
 +
 +      return Optional.empty();
 +    } else {
 +      if (!ctx.getVolumeManager().rename(ecm.getCompactTmpName().getPath(),
 +          newDatafile.getPath())) {
 +        throw new IOException("rename returned false");
 +      }
 +
 +      return Optional.of(newDatafile);
 +    }
 +  }
 +
 +  private void refreshTablet(TabletMetadata metadata) {
 +    var location = metadata.getLocation();
 +    if (location != null) {
 +      KeyExtent extent = metadata.getExtent();
 +
 +      // there is a single tserver and single tablet, do not need a thread pool. The direct executor
 +      // will run everything in the current thread
 +      ExecutorService executorService = MoreExecutors.newDirectExecutorService();
 +      try {
 +        TabletRefresher.refreshTablets(executorService,
 +            "compaction:" + metadata.getExtent().toString(), ctx, tserverSet::getCurrentServers,
 +            Map.of(metadata.getLocation(), List.of(extent.toThrift())));
 +      } finally {
 +        executorService.shutdownNow();
 +      }
 +    }
 +  }
 +
 +  // ELASTICITY_TODO unit test this method
 +  private boolean canCommitCompaction(ExternalCompactionId ecid, TabletMetadata tabletMetadata) {
 +
 +    if (tabletMetadata == null) {
 +      LOG.debug("Received completion notification for nonexistent tablet {}", ecid);
 +      return false;
 +    }
 +
 +    var extent = tabletMetadata.getExtent();
 +
 +    if (tabletMetadata.getOperationId() != null) {
 +      // split, merge, and delete tablet should delete the compaction entry in the tablet
 +      LOG.debug("Received completion notification for tablet with active operation {} {} {}", ecid,
 +          extent, tabletMetadata.getOperationId());
 +      return false;
 +    }
 +
 +    ExternalCompactionMetadata ecm = tabletMetadata.getExternalCompactions().get(ecid);
 +
 +    if (ecm == null) {
 +      LOG.debug("Received completion notification for unknown compaction {} {}", ecid, extent);
 +      return false;
 +    }
 +
 +    if (ecm.getKind() == CompactionKind.USER || ecm.getKind() == CompactionKind.SELECTOR) {
 +      if (tabletMetadata.getSelectedFiles() == null) {
 +        // when the compaction is canceled, selected files are deleted
 +        LOG.debug(
 +            "Received completion notification for user compaction and tablet has no selected files {} {}",
 +            ecid, extent);
 +        return false;
 +      }
 +
 +      if (ecm.getFateTxId() != tabletMetadata.getSelectedFiles().getFateTxId()) {
 +        // maybe the compaction was cancled and another user compaction was started on the tablet.
 +        LOG.debug(
 +            "Received completion notification for user compaction where its fate txid did not match the tablets {} {} {} {}",
 +            ecid, extent, FateTxId.formatTid(ecm.getFateTxId()),
 +            FateTxId.formatTid(tabletMetadata.getSelectedFiles().getFateTxId()));
 +      }
 +
 +      if (!tabletMetadata.getSelectedFiles().getFiles().containsAll(ecm.getJobFiles())) {
 +        // this is not expected to happen
 +        LOG.error("User compaction contained files not in the selected set {} {} {} {} {}",
 +            tabletMetadata.getExtent(), ecid, ecm.getKind(),
 +            Optional.ofNullable(tabletMetadata.getSelectedFiles()).map(SelectedFiles::getFiles),
 +            ecm.getJobFiles());
 +        return false;
 +      }
 +    }
 +
 +    if (!tabletMetadata.getFiles().containsAll(ecm.getJobFiles())) {
 +      // this is not expected to happen
 +      LOG.error("Compaction contained files not in the tablet files set {} {} {} {}",
 +          tabletMetadata.getExtent(), ecid, tabletMetadata.getFiles(), ecm.getJobFiles());
 +      return false;
 +    }
 +
 +    return true;
 +  }
 +
 +  private TabletMetadata commitCompaction(TCompactionStats stats, ExternalCompactionId ecid,
 +      TabletMetadata tablet, Optional<ReferencedTabletFile> newDatafile,
 +      RefreshWriter refreshWriter) {
 +
 +    KeyExtent extent = tablet.getExtent();
 +
 +    Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS)
 +        .incrementBy(100, MILLISECONDS).maxWait(10, SECONDS).backOffFactor(1.5)
 +        .logInterval(3, MINUTES).createRetry();
 +
 +    while (canCommitCompaction(ecid, tablet)) {
 +      ExternalCompactionMetadata ecm = tablet.getExternalCompactions().get(ecid);
 +
 +      // the compacted files should not exists in the tablet already
 +      var tablet2 = tablet;
 +      newDatafile.ifPresent(
 +          newFile -> Preconditions.checkState(!tablet2.getFiles().contains(newFile.insert()),
 +              "File already exists in tablet %s %s", newFile, tablet2.getFiles()));
 +
 +      if (tablet.getLocation() != null
 +          && tablet.getExternalCompactions().get(ecid).getKind() != CompactionKind.USER) {
 +        // Write the refresh entry before attempting to update tablet metadata, this ensures that
 +        // refresh will happen even if this process dies. In the case where this process does not
 +        // die refresh will happen after commit. User compactions will make refresh calls in their
 +        // fate operation, so it does not need to be done here.
 +        refreshWriter.writeRefresh(tablet.getLocation());
 +      }
 +
 +      try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
 +        var tabletMutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation()
 +            .requireCompaction(ecid).requireSame(tablet, FILES, LOCATION);
 +
 +        if (ecm.getKind() == CompactionKind.USER || ecm.getKind() == CompactionKind.SELECTOR) {
 +          tabletMutator.requireSame(tablet, SELECTED, COMPACTED);
 +        }
 +
 +        // make the needed updates to the tablet
 +        updateTabletForCompaction(stats, ecid, tablet, newDatafile, extent, ecm, tabletMutator);
 +
 +        tabletMutator
 +            .submit(tabletMetadata -> !tabletMetadata.getExternalCompactions().containsKey(ecid));
 +
 +        // TODO expensive logging
 +        LOG.debug("Compaction completed {} added {} removed {}", tablet.getExtent(), newDatafile,
 +            ecm.getJobFiles().stream().map(AbstractTabletFile::getFileName)
 +                .collect(Collectors.toList()));
 +
 +        // ELASTICITY_TODO check return value and retry, could fail because of race conditions
 +        var result = tabletsMutator.process().get(extent);
 +        if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
 +          // compaction was committed, mark the compaction input files for deletion
 +          //
 +          // ELASTICITIY_TODO in the case of process death the GC candidates would never be added
 +          // like #3811. If compaction commit were moved to FATE per #3559 then this would not
 +          // be an issue. If compaction commit is never moved to FATE, then this addition could
 +          // moved to the compaction refresh process. The compaction refresh process will go away
 +          // if compaction commit is moved to FATE, so should only do this if not moving to FATE.
 +          ctx.getAmple().putGcCandidates(extent.tableId(), ecm.getJobFiles());
 +          break;
 +        } else {
 +          // compaction failed to commit, maybe something changed on the tablet so lets reread the
 +          // metadata and try again
 +          tablet = result.readMetadata();
 +        }
 +
 +        retry.waitForNextAttempt(LOG, "Failed to commit " + ecid + " for tablet " + extent);
 +      } catch (InterruptedException e) {
 +        throw new RuntimeException(e);
 +      }
 +    }
 +
 +    return tablet;
 +  }
 +
 +  private void updateTabletForCompaction(TCompactionStats stats, ExternalCompactionId ecid,
 +      TabletMetadata tablet, Optional<ReferencedTabletFile> newDatafile, KeyExtent extent,
 +      ExternalCompactionMetadata ecm, Ample.ConditionalTabletMutator tabletMutator) {
 +    // ELASTICITY_TODO improve logging adapt to use existing tablet files logging
 +    if (ecm.getKind() == CompactionKind.USER) {
 +      if (tablet.getSelectedFiles().getFiles().equals(ecm.getJobFiles())) {
 +        // all files selected for the user compactions are finished, so the tablet is finish and
 +        // its compaction id needs to be updated.
 +
 +        long fateTxId = tablet.getSelectedFiles().getFateTxId();
 +
 +        Preconditions.checkArgument(!tablet.getCompacted().contains(fateTxId),
 +            "Tablet %s unexpected has selected files and compacted columns for %s",
 +            tablet.getExtent(), fateTxId);
 +
 +        // TODO set to trace
 +        LOG.debug("All selected files compcated for {} setting compacted for {}",
 +            tablet.getExtent(), FateTxId.formatTid(tablet.getSelectedFiles().getFateTxId()));
 +
 +        tabletMutator.deleteSelectedFiles();
 +        tabletMutator.putCompacted(fateTxId);
 +
 +      } else {
 +        // not all of the selected files were finished, so need to add the new file to the
 +        // selected set
 +
 +        Set<StoredTabletFile> newSelectedFileSet =
 +            new HashSet<>(tablet.getSelectedFiles().getFiles());
 +        newSelectedFileSet.removeAll(ecm.getJobFiles());
 +
 +        if (newDatafile.isPresent()) {
 +          // TODO set to trace
 +          LOG.debug(
 +              "Not all selected files for {} are done, adding new selected file {} from compaction",
 +              tablet.getExtent(), newDatafile.orElseThrow().getPath().getName());
 +          newSelectedFileSet.add(newDatafile.orElseThrow().insert());
 +        } else {
 +          // TODO set to trace
 +          LOG.debug(
 +              "Not all selected files for {} are done, compaction produced no output so not adding to selected set.",
 +              tablet.getExtent());
 +        }
 +
 +        tabletMutator.putSelectedFiles(
 +            new SelectedFiles(newSelectedFileSet, tablet.getSelectedFiles().initiallySelectedAll(),
 +                tablet.getSelectedFiles().getFateTxId()));
 +      }
 +    }
 +
 +    if (tablet.getLocation() != null) {
 +      // add scan entries to prevent GC in case the hosted tablet is currently using the files for
 +      // scan
 +      ecm.getJobFiles().forEach(tabletMutator::putScan);
 +    }
 +    ecm.getJobFiles().forEach(tabletMutator::deleteFile);
 +    tabletMutator.deleteExternalCompaction(ecid);
 +
 +    if (newDatafile.isPresent()) {
 +      tabletMutator.putFile(newDatafile.orElseThrow(),
 +          new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()));
 +    }
 +  }
 +
 +  @Override
 +  public void compactionFailed(TInfo tinfo, TCredentials credentials, String externalCompactionId,
 +      TKeyExtent extent) throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +    LOG.info("Compaction failed, id: {}", externalCompactionId);
 +    final var ecid = ExternalCompactionId.of(externalCompactionId);
 +    compactionFailed(Map.of(ecid, KeyExtent.fromThrift(extent)));
 +  }
 +
 +  void compactionFailed(Map<ExternalCompactionId,KeyExtent> compactions) {
 +
 +    try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
 +      compactions.forEach((ecid, extent) -> {
 +        try {
 +          ctx.requireNotDeleted(extent.tableId());
 +          tabletsMutator.mutateTablet(extent).requireAbsentOperation().requireCompaction(ecid)
 +              .deleteExternalCompaction(ecid).submit(new RejectionHandler() {
 +
 +                @Override
 +                public boolean callWhenTabletDoesNotExists() {
 +                  return true;
 +                }
 +
 +                @Override
 +                public boolean test(TabletMetadata tabletMetadata) {
 +                  return tabletMetadata == null
 +                      || !tabletMetadata.getExternalCompactions().containsKey(ecid);
 +                }
 +
 +              });
 +        } catch (TableDeletedException e) {
 +          LOG.warn("Table {} was deleted, unable to update metadata for compaction failure.",
 +              extent.tableId());
 +        }
 +      });
 +
 +      final List<ExternalCompactionId> ecidsForTablet = new ArrayList<>();
 +      tabletsMutator.process().forEach((extent, result) -> {
 +        if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) {
 +
 +          // this should try again later when the dead compaction detector runs, lets log it in case
 +          // its a persistent problem
 +          if (LOG.isDebugEnabled()) {
 +            var ecid =
 +                compactions.entrySet().stream().filter(entry -> entry.getValue().equals(extent))
 +                    .findFirst().map(Map.Entry::getKey).orElse(null);
 +            LOG.debug("Unable to remove failed compaction {} {}", extent, ecid);
 +          }
 +        } else {
 +          // compactionFailed is called from the Compactor when either a compaction fails or
 +          // is cancelled and it's called from the DeadCompactionDetector. This block is
 +          // entered when the conditional mutator above successfully deletes an ecid from
 +          // the tablet metadata. Remove compaction tmp files from the tablet directory
 +          // that have a corresponding ecid in the name.
 +
 +          ecidsForTablet.clear();
 +          compactions.entrySet().stream().filter(e -> e.getValue().compareTo(extent) == 0)
 +              .map(Entry::getKey).forEach(ecidsForTablet::add);
 +
 +          if (!ecidsForTablet.isEmpty()) {
 +            final TabletMetadata tm = ctx.getAmple().readTablet(extent, ColumnType.DIR);
 +            if (tm != null) {
 +              final Collection<Volume> vols = ctx.getVolumeManager().getVolumes();
 +              for (Volume vol : vols) {
 +                try {
 +                  final String volPath =
 +                      vol.getBasePath() + Constants.HDFS_TABLES_DIR + Path.SEPARATOR
 +                          + extent.tableId().canonical() + Path.SEPARATOR + tm.getDirName();
 +                  final FileSystem fs = vol.getFileSystem();
 +                  for (ExternalCompactionId ecid : ecidsForTablet) {
 +                    final String fileSuffix = "_tmp_" + ecid.canonical();
 +                    FileStatus[] files = fs.listStatus(new Path(volPath), (path) -> {
 +                      return path.getName().endsWith(fileSuffix);
 +                    });
 +                    if (files.length > 0) {
 +                      for (FileStatus file : files) {
 +                        if (!fs.delete(file.getPath(), false)) {
 +                          LOG.warn("Unable to delete ecid tmp file: {}: ", file.getPath());
 +                        } else {
 +                          LOG.debug("Deleted ecid tmp file: {}", file.getPath());
 +                        }
 +                      }
 +                    }
 +                  }
 +                } catch (IOException e) {
 +                  LOG.error("Exception deleting compaction tmp files for tablet: {}", extent, e);
 +                }
 +              }
 +            } else {
 +              // TabletMetadata does not exist for the extent. This could be due to a merge or
 +              // split operation. Use the utility to find tmp files at the table level
 +              deadCompactionDetector.addTableId(extent.tableId());
 +            }
 +          }
 +        }
 +      });
 +    }
 +
 +    compactions.forEach((k, v) -> recordCompletion(k));
 +  }
 +
 +  /**
 +   * Compactor calls to update the status of the assigned compaction
 +   *
 +   * @param tinfo trace info
 +   * @param credentials tcredentials object
 +   * @param externalCompactionId compaction id
 +   * @param update compaction status update
 +   * @param timestamp timestamp of the message
 +   * @throws ThriftSecurityException when permission error
 +   */
 +  @Override
 +  public void updateCompactionStatus(TInfo tinfo, TCredentials credentials,
 +      String externalCompactionId, TCompactionStatusUpdate update, long timestamp)
 +      throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +    LOG.debug("Compaction status update, id: {}, timestamp: {}, update: {}", externalCompactionId,
 +        timestamp, update);
 +    final RunningCompaction rc = RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId));
 +    if (null != rc) {
 +      rc.addUpdate(timestamp, update);
 +    }
 +  }
 +
 +  private void recordCompletion(ExternalCompactionId ecid) {
 +    var rc = RUNNING_CACHE.remove(ecid);
 +    if (rc != null) {
 +      completed.put(ecid, rc);
 +    }
 +  }
 +
 +  protected Set<ExternalCompactionId> readExternalCompactionIds() {
 +    return this.ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER).fetch(ECOMP).build()
 +        .stream().flatMap(tm -> tm.getExternalCompactions().keySet().stream())
 +        .collect(Collectors.toSet());
 +  }
 +
 +  /**
 +   * The RUNNING_CACHE set may contain external compactions that are not actually running. This
 +   * method periodically cleans those up.
 +   */
 +  protected void cleanUpRunning() {
 +
 +    // grab a snapshot of the ids in the set before reading the metadata table. This is done to
 +    // avoid removing things that are added while reading the metadata.
 +    Set<ExternalCompactionId> idsSnapshot = Set.copyOf(RUNNING_CACHE.keySet());
 +
 +    // grab the ids that are listed as running in the metadata table. It important that this is done
 +    // after getting the snapshot.
 +    Set<ExternalCompactionId> idsInMetadata = readExternalCompactionIds();
 +
 +    var idsToRemove = Sets.difference(idsSnapshot, idsInMetadata);
 +
 +    // remove ids that are in the running set but not in the metadata table
 +    idsToRemove.forEach(this::recordCompletion);
 +
 +    if (idsToRemove.size() > 0) {
 +      LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove);
 +    }
 +  }
 +
 +  /**
 +   * Return information about running compactions
 +   *
 +   * @param tinfo trace info
 +   * @param credentials tcredentials object
 +   * @return map of ECID to TExternalCompaction objects
 +   * @throws ThriftSecurityException permission error
 +   */
 +  @Override
 +  public TExternalCompactionList getRunningCompactions(TInfo tinfo, TCredentials credentials)
 +      throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +
 +    final TExternalCompactionList result = new TExternalCompactionList();
 +    RUNNING_CACHE.forEach((ecid, rc) -> {
 +      TExternalCompaction trc = new TExternalCompaction();
 +      trc.setGroupName(rc.getGroupName());
 +      trc.setCompactor(rc.getCompactorAddress());
 +      trc.setUpdates(rc.getUpdates());
 +      trc.setJob(rc.getJob());
 +      result.putToCompactions(ecid.canonical(), trc);
 +    });
 +    return result;
 +  }
 +
 +  /**
 +   * Return information about recently completed compactions
 +   *
 +   * @param tinfo trace info
 +   * @param credentials tcredentials object
 +   * @return map of ECID to TExternalCompaction objects
 +   * @throws ThriftSecurityException permission error
 +   */
 +  @Override
 +  public TExternalCompactionList getCompletedCompactions(TInfo tinfo, TCredentials credentials)
 +      throws ThriftSecurityException {
 +    // do not expect users to call this directly, expect other tservers to call this method
 +    if (!security.canPerformSystemActions(credentials)) {
 +      throw new AccumuloSecurityException(credentials.getPrincipal(),
 +          SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +    }
 +    final TExternalCompactionList result = new TExternalCompactionList();
 +    completed.asMap().forEach((ecid, rc) -> {
 +      TExternalCompaction trc = new TExternalCompaction();
 +      trc.setGroupName(rc.getGroupName());
 +      trc.setCompactor(rc.getCompactorAddress());
 +      trc.setJob(rc.getJob());
 +      trc.setUpdates(rc.getUpdates());
 +      result.putToCompactions(ecid.canonical(), trc);
 +    });
 +    return result;
 +  }
 +
 +  @Override
 +  public void cancel(TInfo tinfo, TCredentials credentials, String externalCompactionId)
 +      throws TException {
 +    var runningCompaction = RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId));
 +    var extent = KeyExtent.fromThrift(runningCompaction.getJob().getExtent());
 +    try {
 +      NamespaceId nsId = this.ctx.getNamespaceId(extent.tableId());
 +      if (!security.canCompact(credentials, extent.tableId(), nsId)) {
 +        throw new AccumuloSecurityException(credentials.getPrincipal(),
 +            SecurityErrorCode.PERMISSION_DENIED).asThriftException();
 +      }
 +    } catch (TableNotFoundException e) {
 +      throw new ThriftTableOperationException(extent.tableId().canonical(), null,
 +          TableOperation.COMPACT_CANCEL, TableOperationExceptionType.NOTFOUND, e.getMessage());
 +    }
 +
 +    cancelCompactionOnCompactor(runningCompaction.getCompactorAddress(), externalCompactionId);
 +  }
 +
 +  /* Method exists to be overridden in test to hide static method */
 +  protected String getTServerAddressString(HostAndPort tserverAddress) {
 +    return ExternalCompactionUtil.getHostPortString(tserverAddress);
 +  }
 +
 +  /* Method exists to be overridden in test to hide static method */
 +  protected List<RunningCompaction> getCompactionsRunningOnCompactors() {
 +    return ExternalCompactionUtil.getCompactionsRunningOnCompactors(this.ctx);
 +  }
 +
 +  /* Method exists to be overridden in test to hide static method */
 +  protected void cancelCompactionOnCompactor(String address, String externalCompactionId) {
 +    HostAndPort hostPort = HostAndPort.fromString(address);
 +    ExternalCompactionUtil.cancelCompaction(this.ctx, hostPort, externalCompactionId);
 +  }
 +
 +  /* Method exists to be overridden in test to hide static method */
 +  protected void returnTServerClient(TabletServerClientService.Client client) {
 +    ThriftUtil.returnClient(client, this.ctx);
 +  }
 +
 +  private void deleteEmpty(ZooReaderWriter zoorw, String path)
 +      throws KeeperException, InterruptedException {
 +    try {
 +      LOG.debug("Deleting empty ZK node {}", path);
 +      zoorw.delete(path);
 +    } catch (KeeperException.NotEmptyException e) {
 +      LOG.debug("Failed to delete {} its not empty, likely an expected race condition.", path);
 +    }
 +  }
 +
 +  private void cleanUpCompactors() {
 +    final String compactorQueuesPath = this.ctx.getZooKeeperRoot() + Constants.ZCOMPACTORS;
 +
 +    var zoorw = this.ctx.getZooReaderWriter();
 +
 +    try {
 +      var groups = zoorw.getChildren(compactorQueuesPath);
 +
 +      for (String group : groups) {
 +        String qpath = compactorQueuesPath + "/" + group;
 +
 +        var compactors = zoorw.getChildren(qpath);
 +
 +        if (compactors.isEmpty()) {
 +          deleteEmpty(zoorw, qpath);
 +        }
 +
 +        for (String compactor : compactors) {
 +          String cpath = compactorQueuesPath + "/" + group + "/" + compactor;
 +          var lockNodes = zoorw.getChildren(compactorQueuesPath + "/" + group + "/" + compactor);
 +          if (lockNodes.isEmpty()) {
 +            deleteEmpty(zoorw, cpath);
 +          }
 +        }
 +      }
 +
 +    } catch (KeeperException | RuntimeException e) {
 +      LOG.warn("Failed to clean up compactors", e);
 +    } catch (InterruptedException e) {
 +      Thread.currentThread().interrupt();
 +      throw new IllegalStateException(e);
 +    }
 +  }
 +
 +}
diff --cc test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index 7928b511d5,c6b194e8c7..76e9487708
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@@ -19,15 -19,26 +19,24 @@@
  package org.apache.accumulo.test.functional;
  
  import static java.util.concurrent.TimeUnit.SECONDS;
 -import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
 +import static java.util.stream.Collectors.toList;
 +import static java.util.stream.Collectors.toSet;
  import static org.junit.jupiter.api.Assertions.assertEquals;
  import static org.junit.jupiter.api.Assertions.assertFalse;
+ import static org.junit.jupiter.api.Assertions.assertNotNull;
 -import static org.junit.jupiter.api.Assertions.assertThrows;
  import static org.junit.jupiter.api.Assertions.assertTrue;
 -import static org.junit.jupiter.api.Assertions.fail;
  
  import java.io.IOException;
+ import java.io.UncheckedIOException;
+ import java.nio.file.FileVisitResult;
+ import java.nio.file.Files;
+ import java.nio.file.Paths;
+ import java.nio.file.SimpleFileVisitor;
+ import java.nio.file.attribute.BasicFileAttributes;
  import java.time.Duration;
 -import java.util.ArrayList;
+ import java.util.Arrays;
  import java.util.EnumSet;
+ import java.util.HashMap;
  import java.util.HashSet;
  import java.util.List;
  import java.util.Map;
@@@ -49,14 -62,19 +58,16 @@@ import org.apache.accumulo.core.client.
  import org.apache.accumulo.core.client.admin.CompactionConfig;
  import org.apache.accumulo.core.client.admin.NewTableConfiguration;
  import org.apache.accumulo.core.client.admin.PluginConfig;
 -import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
+ import org.apache.accumulo.core.client.admin.compaction.CompactionConfigurer;
  import org.apache.accumulo.core.client.admin.compaction.CompactionSelector;
 -import org.apache.accumulo.core.client.admin.compaction.CompressionConfigurer;
  import org.apache.accumulo.core.clientImpl.ClientContext;
 -import org.apache.accumulo.core.clientImpl.TableOperationsImpl;
  import org.apache.accumulo.core.conf.Property;
  import org.apache.accumulo.core.data.Key;
  import org.apache.accumulo.core.data.Mutation;
  import org.apache.accumulo.core.data.TableId;
  import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
+ import org.apache.accumulo.core.file.rfile.bcfile.PrintBCInfo;
 -import org.apache.accumulo.core.iterators.DevNull;
  import org.apache.accumulo.core.iterators.Filter;
  import org.apache.accumulo.core.iterators.IteratorEnvironment;
  import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;


(accumulo) 02/02: removes code added for debugging merge

Posted by kt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 0d5508631663be0b4b44a5103ddddcd67606a8bd
Author: Keith Turner <kt...@apache.org>
AuthorDate: Thu Jan 4 21:47:57 2024 -0500

    removes code added for debugging merge
---
 .../apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java   | 2 +-
 .../accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java      | 2 --
 2 files changed, 1 insertion(+), 3 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
index dcbb78dfcd..6960042e92 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
@@ -526,7 +526,7 @@ public class DefaultCompactionPlanner implements CompactionPlanner {
       var filesToCompact =
           findDataFilesToCompact(candidates, ratioToCheck, maxFilesToCompact, maxSizeToCompact);
 
-      log.info("Tried ratio {} and found {} {} {} {}", ratioToCheck, filesToCompact,
+      log.trace("Tried ratio {} and found {} {} {} {}", ratioToCheck, filesToCompact,
           filesToCompact.size() >= goalCompactionSize, goalCompactionSize, maxFilesToCompact);
 
       if (filesToCompact.isEmpty() || filesToCompact.size() < goalCompactionSize) {
diff --git a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
index 7886a5f0c6..ecc95d6e72 100644
--- a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
@@ -702,8 +702,6 @@ public class DefaultCompactionPlannerTest {
     var params = createPlanningParams(all, all, Set.of(), 3, CompactionKind.SYSTEM, conf);
     var plan = planner.makePlan(params);
 
-    System.out.println(plan.getJobs());
-
     assertTrue(plan.getJobs().isEmpty());
 
     // ensure when a compaction is running and we are over files max but below the compaction ratio