You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2023/07/27 20:45:14 UTC

[accumulo] branch elasticity updated: Makes DefaultCompactionPlanner emit multiple jobs (#3662)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 97b08d0b32 Makes DefaultCompactionPlanner emit multiple jobs (#3662)
97b08d0b32 is described below

commit 97b08d0b32f0d173e27dcb1dd46dc6080bd2cdef
Author: Keith Turner <kt...@apache.org>
AuthorDate: Thu Jul 27 16:45:08 2023 -0400

    Makes DefaultCompactionPlanner emit multiple jobs (#3662)
    
    The DefaultCompactionPlanner would only emit one compaction job per
    tablet.  This commit changes it to be able to emit multiple compactions
    for a single tablet as long as doing so is optimal w.r.t. to the
    compaction ratio.
---
 .../spi/compaction/DefaultCompactionPlanner.java   | 188 ++++++++++++---------
 .../compaction/DefaultCompactionPlannerTest.java   | 167 +++++++++++++++++-
 .../accumulo/manager/TabletGroupWatcher.java       |   2 +-
 .../coordinator/CompactionCoordinator.java         |  18 --
 .../accumulo/test/functional/CompactionIT.java     |  13 +-
 5 files changed, 276 insertions(+), 112 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
index a3c76ed4ce..e3f5f6535a 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
@@ -180,6 +180,22 @@ public class DefaultCompactionPlanner implements CompactionPlanner {
     }
   }
 
+  private static class FakeFileGenerator {
+
+    private int count = 0;
+
+    public CompactableFile create(long size) {
+      try {
+        count++;
+        return CompactableFile.create(
+            new URI("hdfs://fake/accumulo/tables/adef/t-zzFAKEzz/FAKE-0000" + count + ".rf"), size,
+            0);
+      } catch (URISyntaxException e) {
+        throw new IllegalStateException(e);
+      }
+    }
+  }
+
   private List<Executor> executors;
   private int maxFilesToCompact;
 
@@ -258,76 +274,93 @@ public class DefaultCompactionPlanner implements CompactionPlanner {
 
     Set<CompactableFile> filesCopy = new HashSet<>(params.getCandidates());
 
+    FakeFileGenerator fakeFileGenerator = new FakeFileGenerator();
+
     long maxSizeToCompact = getMaxSizeToCompact(params.getKind());
 
-    Collection<CompactableFile> group;
-    if (params.getRunningCompactions().isEmpty()) {
-      group =
-          findDataFilesToCompact(filesCopy, params.getRatio(), maxFilesToCompact, maxSizeToCompact);
+    // This set represents future files that will be produced by running compactions. If the optimal
+    // set of files to compact is computed and contains one of these files, then its optimal to wait
+    // for this compaction to finish.
+    Set<CompactableFile> expectedFiles = new HashSet<>();
+    params.getRunningCompactions().stream().filter(job -> job.getKind() == params.getKind())
+        .map(job -> getExpected(job.getFiles(), fakeFileGenerator))
+        .forEach(compactableFile -> Preconditions.checkState(expectedFiles.add(compactableFile)));
+    Preconditions.checkState(Collections.disjoint(expectedFiles, filesCopy));
+    filesCopy.addAll(expectedFiles);
 
-      if (!group.isEmpty() && group.size() < params.getCandidates().size()
-          && params.getCandidates().size() <= maxFilesToCompact
-          && (params.getKind() == CompactionKind.USER
-              || params.getKind() == CompactionKind.SELECTOR)) {
-        // USER and SELECTOR compactions must eventually compact all files. When a subset of files
-        // that meets the compaction ratio is selected, look ahead and see if the next compaction
-        // would also meet the compaction ratio. If not then compact everything to avoid doing
-        // more than logarithmic work across multiple comapctions.
-
-        filesCopy.removeAll(group);
-        filesCopy.add(getExpected(group, 0));
-
-        if (findDataFilesToCompact(filesCopy, params.getRatio(), maxFilesToCompact,
-            maxSizeToCompact).isEmpty()) {
-          // The next possible compaction does not meet the compaction ratio, so compact
-          // everything.
-          group = Set.copyOf(params.getCandidates());
-        }
+    List<Collection<CompactableFile>> compactionJobs = new ArrayList<>();
+
+    while (true) {
+      var filesToCompact =
+          findDataFilesToCompact(filesCopy, params.getRatio(), maxFilesToCompact, maxSizeToCompact);
+      if (!Collections.disjoint(filesToCompact, expectedFiles)) {
+        // the optimal set of files to compact includes the output of a running compaction, so lets
+        // wait for that running compaction to finish.
+        break;
+      }
 
+      if (filesToCompact.isEmpty()) {
+        break;
       }
 
-    } else if (params.getKind() == CompactionKind.SYSTEM) {
-      // This code determines if once the files compacting finish would they be included in a
-      // compaction with the files smaller than them? If so, then wait for the running compaction
-      // to complete.
+      filesCopy.removeAll(filesToCompact);
+
+      // A compaction job will be created for these files, so lets add an expected file for that
+      // planned compaction job. Then if future iterations of this loop will include that file then
+      // they will not compact.
+      var expectedFile = getExpected(filesToCompact, fakeFileGenerator);
+      Preconditions.checkState(expectedFiles.add(expectedFile));
+      Preconditions.checkState(filesCopy.add(expectedFile));
 
-      // The set of files running compactions may produce
-      var expectedFiles = getExpected(params.getRunningCompactions());
+      compactionJobs.add(filesToCompact);
 
-      if (!Collections.disjoint(filesCopy, expectedFiles)) {
-        throw new AssertionError();
+      if (filesToCompact.size() < maxFilesToCompact) {
+        // Only continue looking for more compaction jobs when a set of files is found equals
+        // maxFilesToCompact in size. When the files found is less than the max size its an
+        // indication that the compaction ratio was no longer met and therefore it would be
+        // suboptimal to look for more jobs because the smallest optimal set was found.
+        break;
       }
+    }
 
-      filesCopy.addAll(expectedFiles);
+    if (compactionJobs.size() == 1
+        && (params.getKind() == CompactionKind.USER || params.getKind() == CompactionKind.SELECTOR)
+        && compactionJobs.get(0).size() < params.getCandidates().size()
+        && compactionJobs.get(0).size() <= maxFilesToCompact) {
+      // USER and SELECTOR compactions must eventually compact all files. When a subset of files
+      // that meets the compaction ratio is selected, look ahead and see if the next compaction
+      // would also meet the compaction ratio. If not then compact everything to avoid doing
+      // more than logarithmic work across multiple comapctions.
 
-      group =
-          findDataFilesToCompact(filesCopy, params.getRatio(), maxFilesToCompact, maxSizeToCompact);
+      var group = compactionJobs.get(0);
+      var candidatesCopy = new HashSet<>(params.getCandidates());
+
+      candidatesCopy.removeAll(group);
+      Preconditions.checkState(candidatesCopy.add(getExpected(group, fakeFileGenerator)));
 
-      if (!Collections.disjoint(group, expectedFiles)) {
-        // file produced by running compaction will eventually compact with existing files, so
-        // wait.
-        group = Set.of();
+      if (findDataFilesToCompact(candidatesCopy, params.getRatio(), maxFilesToCompact,
+          maxSizeToCompact).isEmpty()) {
+        // The next possible compaction does not meet the compaction ratio, so compact
+        // everything.
+        compactionJobs.set(0, Set.copyOf(params.getCandidates()));
       }
-    } else {
-      group = Set.of();
     }
 
-    if (group.isEmpty()
+    if (compactionJobs.isEmpty()
         && (params.getKind() == CompactionKind.USER || params.getKind() == CompactionKind.SELECTOR
             || params.getKind() == CompactionKind.CHOP)
         && params.getRunningCompactions().stream()
             .noneMatch(job -> job.getKind() == params.getKind())) {
-      group = findMaximalRequiredSetToCompact(params.getCandidates(), maxFilesToCompact);
+      // These kinds of compaction require files to compact even if none of the files meet the
+      // compaction ratio. No files were found using the compaction ratio and no compactions are
+      // running, so force a compaction.
+      compactionJobs = findMaximalRequiredSetToCompact(params.getCandidates(), maxFilesToCompact);
     }
 
-    if (group.isEmpty()) {
-      return params.createPlanBuilder().build();
-    } else {
-      // determine which executor to use based on the size of the files
-      var ceid = getExecutor(group);
-
-      return params.createPlanBuilder().addJob(createPriority(params, group), ceid, group).build();
-    }
+    var builder = params.createPlanBuilder();
+    compactionJobs.forEach(jobFiles -> builder.addJob(createPriority(params, jobFiles),
+        getExecutor(jobFiles), jobFiles));
+    return builder.build();
   }
 
   private static short createPriority(PlanningParameters params,
@@ -346,51 +379,42 @@ public class DefaultCompactionPlanner implements CompactionPlanner {
     return Long.MAX_VALUE;
   }
 
-  private CompactableFile getExpected(Collection<CompactableFile> files, int count) {
+  private CompactableFile getExpected(Collection<CompactableFile> files,
+      FakeFileGenerator fakeFileGenerator) {
     long size = files.stream().mapToLong(CompactableFile::getEstimatedSize).sum();
-    try {
-      return CompactableFile.create(
-          new URI("hdfs://fake/accumulo/tables/adef/t-zzFAKEzz/FAKE-0000" + count + ".rf"), size,
-          0);
-    } catch (URISyntaxException e) {
-      throw new IllegalStateException(e);
-    }
+    return fakeFileGenerator.create(size);
   }
 
-  /**
-   * @return the expected files sizes for sets of compacting files.
-   */
-  private Set<CompactableFile> getExpected(Collection<CompactionJob> compacting) {
-
-    Set<CompactableFile> expected = new HashSet<>();
-
-    int count = 0;
-
-    for (CompactionJob job : compacting) {
-      count++;
-      expected.add(getExpected(job.getFiles(), count));
-    }
-
-    return expected;
-  }
-
-  private static Collection<CompactableFile>
+  private static List<Collection<CompactableFile>>
       findMaximalRequiredSetToCompact(Collection<CompactableFile> files, int maxFilesToCompact) {
 
     if (files.size() <= maxFilesToCompact) {
-      return files;
+      return List.of(files);
     }
 
     List<CompactableFile> sortedFiles = sortByFileSize(files);
 
-    int numToCompact = maxFilesToCompact;
+    // compute the number of full compaction jobs with full files that could run and then subtract
+    // 1. The 1 is subtracted because the last job is a special case.
+    int batches = sortedFiles.size() / maxFilesToCompact - 1;
 
-    if (sortedFiles.size() > maxFilesToCompact && sortedFiles.size() < 2 * maxFilesToCompact) {
-      // on the second to last compaction pass, compact the minimum amount of files possible
-      numToCompact = sortedFiles.size() - maxFilesToCompact + 1;
-    }
+    if (batches > 0) {
+      ArrayList<Collection<CompactableFile>> jobs = new ArrayList<>();
+      for (int i = 0; i < batches; i++) {
+        jobs.add(sortedFiles.subList(i * maxFilesToCompact, (i + 1) * maxFilesToCompact));
+      }
+      return jobs;
+    } else {
+      int numToCompact = maxFilesToCompact;
+
+      if (sortedFiles.size() > maxFilesToCompact && sortedFiles.size() < 2 * maxFilesToCompact) {
+        // On the second to last compaction pass, compact the minimum amount of files possible. This
+        // is done to avoid unnecessarily compacting the largest files more than once.
+        numToCompact = sortedFiles.size() - maxFilesToCompact + 1;
+      }
 
-    return sortedFiles.subList(0, numToCompact);
+      return List.of(sortedFiles.subList(0, numToCompact));
+    }
   }
 
   static Collection<CompactableFile> findDataFilesToCompact(Set<CompactableFile> files,
diff --git a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
index 4fcd6e5714..76563555f6 100644
--- a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.core.spi.compaction;
 
 import static com.google.common.collect.MoreCollectors.onlyElement;
+import static java.util.stream.Collectors.toSet;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -27,10 +28,12 @@ import static org.junit.jupiter.api.Assertions.fail;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
 import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
@@ -239,6 +242,149 @@ public class DefaultCompactionPlannerTest {
     assertEquals(CompactionExecutorIdImpl.externalId("large"), job.getExecutor());
   }
 
+  @Test
+  public void testMultipleCompactions() {
+    // This test validates that when a tablet has many files that multiple compaction jobs can be
+    // issued at the same time.
+    for (var kind : List.of(CompactionKind.USER, CompactionKind.SYSTEM)) {
+      var planner = createPlanner(false);
+      var all = IntStream.range(0, 990).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet());
+      // simulate 10 larger files, these should not compact at the same time as the smaller files.
+      // Its more optimal to wait for all of the smaller files to compact and them compact the
+      // output of compacting the smaller files with the larger files.
+      IntStream.range(990, 1000).mapToObj(i -> createCF("C" + i, 20000)).forEach(all::add);
+      var params = createPlanningParams(all, all, Set.of(), 2, kind);
+      var plan = planner.makePlan(params);
+
+      // There are 990 smaller files to compact. Should produce 66 jobs of 15 smaller files each.
+      assertEquals(66, plan.getJobs().size());
+      Set<CompactableFile> filesSeen = new HashSet<>();
+      plan.getJobs().forEach(job -> {
+        assertEquals(15, job.getFiles().size());
+        assertEquals(kind, job.getKind());
+        assertEquals(CompactionExecutorIdImpl.externalId("small"), job.getExecutor());
+        // ensure the files across all of the jobs are disjoint
+        job.getFiles().forEach(cf -> assertTrue(filesSeen.add(cf)));
+      });
+
+      // Ensure all of the smaller files are scheduled for compaction. Should not see any of the
+      // larger files.
+      assertEquals(IntStream.range(0, 990).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet()),
+          filesSeen);
+    }
+  }
+
+  @Test
+  public void testMultipleCompactionsAndLargeCompactionRatio() {
+    var planner = createPlanner(false);
+    var all = IntStream.range(0, 65).mapToObj(i -> createCF("F" + i, i + 1)).collect(toSet());
+    // This compaction ratio would not cause a system compaction, how a user compaction must compact
+    // all of the files so it should generate some compactions.
+    var params = createPlanningParams(all, all, Set.of(), 100, CompactionKind.USER);
+    var plan = planner.makePlan(params);
+
+    assertEquals(3, plan.getJobs().size());
+
+    var iterator = plan.getJobs().iterator();
+    var job1 = iterator.next();
+    var job2 = iterator.next();
+    var job3 = iterator.next();
+    assertTrue(Collections.disjoint(job1.getFiles(), job2.getFiles()));
+    assertTrue(Collections.disjoint(job1.getFiles(), job3.getFiles()));
+    assertTrue(Collections.disjoint(job2.getFiles(), job3.getFiles()));
+
+    for (var job : plan.getJobs()) {
+      assertEquals(15, job.getFiles().size());
+      assertEquals(CompactionKind.USER, job.getKind());
+      assertTrue(all.containsAll(job.getFiles()));
+      // Should select three sets of files that are from the smallest 45 files.
+      assertTrue(job.getFiles().stream().mapToLong(CompactableFile::getEstimatedSize).sum()
+          <= IntStream.range(1, 46).sum());
+    }
+  }
+
+  @Test
+  public void testMultipleCompactionsAndRunningCompactions() {
+    // This test validates that when a tablet has many files that multiple compaction jobs can be
+    // issued at the same time even if there are running compaction as long everything meets the
+    // compaction ratio.
+    for (var kind : List.of(CompactionKind.USER, CompactionKind.SYSTEM)) {
+      var planner = createPlanner(false);
+      var all = IntStream.range(0, 990).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet());
+      // simulate 10 larger files, these should not compact at the same time as the smaller files.
+      // Its more optimal to wait for all of the smaller files to compact and them compact the
+      // output of compacting the smaller files with the larger files.
+      IntStream.range(990, 1000).mapToObj(i -> createCF("C" + i, 20000)).forEach(all::add);
+      // 30 files are compacting, so they will not be in the candidate set.
+      var candidates =
+          IntStream.range(30, 990).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet());
+      // create two jobs covering the first 30 files
+      var job1 = createJob(kind, all,
+          IntStream.range(0, 15).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet()));
+      var job2 = createJob(kind, all,
+          IntStream.range(15, 30).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet()));
+      var params = createPlanningParams(all, candidates, Set.of(job1, job2), 2, kind);
+      var plan = planner.makePlan(params);
+
+      // There are 990 smaller files to compact. Should produce 66 jobs of 15 smaller files each.
+      assertEquals(64, plan.getJobs().size());
+      Set<CompactableFile> filesSeen = new HashSet<>();
+      plan.getJobs().forEach(job -> {
+        assertEquals(15, job.getFiles().size());
+        assertEquals(kind, job.getKind());
+        assertEquals(CompactionExecutorIdImpl.externalId("small"), job.getExecutor());
+        // ensure the files across all of the jobs are disjoint
+        job.getFiles().forEach(cf -> assertTrue(filesSeen.add(cf)));
+      });
+
+      // Ensure all of the smaller files are scheduled for compaction. Should not see any of the
+      // larger files.
+      assertEquals(IntStream.range(30, 990).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet()),
+          filesSeen);
+    }
+  }
+
+  @Test
+  public void testUserCompactionDoesNotWaitOnSystemCompaction() {
+    // this test ensure user compactions do not wait on system compactions to complete
+    var planner = createPlanner(true);
+    var all = createCFs("F1", "1M", "F2", "1M", "F3", "1M", "F4", "3M", "F5", "3M", "F6", "3M",
+        "F7", "20M");
+    var candidates = createCFs("F4", "3M", "F5", "3M", "F6", "3M", "F7", "20M");
+    var compacting = Set
+        .of(createJob(CompactionKind.SYSTEM, all, createCFs("F1", "1M", "F2", "1M", "F3", "1M")));
+    var params = createPlanningParams(all, candidates, compacting, 2, CompactionKind.SYSTEM);
+    var plan = planner.makePlan(params);
+    // The planning of the system compaction should find its most optimal to wait on the running
+    // system compaction and emit zero jobs.
+    assertEquals(0, plan.getJobs().size());
+
+    params = createPlanningParams(all, candidates, compacting, 2, CompactionKind.USER);
+    plan = planner.makePlan(params);
+    // The planning of user compaction should not take the running system compaction into
+    // consideration and should create a compaction job.
+    assertEquals(1, plan.getJobs().size());
+    assertEquals(createCFs("F4", "3M", "F5", "3M", "F6", "3M", "F7", "20M"),
+        getOnlyElement(plan.getJobs()).getFiles());
+
+    // Reverse the situation and turn the running compaction into a user compaction
+    compacting =
+        Set.of(createJob(CompactionKind.USER, all, createCFs("F1", "1M", "F2", "1M", "F3", "1M")));
+    params = createPlanningParams(all, candidates, compacting, 2, CompactionKind.SYSTEM);
+    plan = planner.makePlan(params);
+    // The planning of a system compaction should not take the running user compaction into account
+    // and should emit a job
+    assertEquals(1, plan.getJobs().size());
+    assertEquals(createCFs("F4", "3M", "F5", "3M", "F6", "3M"),
+        getOnlyElement(plan.getJobs()).getFiles());
+
+    params = createPlanningParams(all, candidates, compacting, 2, CompactionKind.USER);
+    plan = planner.makePlan(params);
+    // The planning of the user compaction should decide the most optimal thing to do is to wait on
+    // the running user compaction and should not emit any jobs.
+    assertEquals(0, plan.getJobs().size());
+  }
+
   /**
    * Tests internal type executor with no numThreads set throws error
    */
@@ -393,15 +539,22 @@ public class DefaultCompactionPlannerTest {
         .getJobs().iterator().next();
   }
 
-  private static Set<CompactableFile> createCFs(String... namesSizePairs)
-      throws URISyntaxException {
+  private static CompactableFile createCF(String name, long size) {
+    try {
+      return CompactableFile
+          .create(new URI("hdfs://fake/accumulo/tables/1/t-0000000z/" + name + ".rf"), size, 0);
+    } catch (URISyntaxException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static Set<CompactableFile> createCFs(String... namesSizePairs) {
     Set<CompactableFile> files = new HashSet<>();
 
     for (int i = 0; i < namesSizePairs.length; i += 2) {
       String name = namesSizePairs[i];
       long size = ConfigurationTypeHelper.getFixedMemoryAsBytes(namesSizePairs[i + 1]);
-      files.add(CompactableFile
-          .create(new URI("hdfs://fake/accumulo/tables/1/t-0000000z/" + name + ".rf"), size, 0));
+      files.add(createCF(name, size));
     }
 
     return files;
@@ -425,9 +578,9 @@ public class DefaultCompactionPlannerTest {
       double ratio, int maxFiles, long maxSize) {
     var result = DefaultCompactionPlanner.findDataFilesToCompact(files, ratio, maxFiles, maxSize);
     var expectedNames = expected.stream().map(CompactableFile::getUri).map(URI::getPath)
-        .map(path -> path.split("/")).map(t -> t[t.length - 1]).collect(Collectors.toSet());
+        .map(path -> path.split("/")).map(t -> t[t.length - 1]).collect(toSet());
     var resultNames = result.stream().map(CompactableFile::getUri).map(URI::getPath)
-        .map(path -> path.split("/")).map(t -> t[t.length - 1]).collect(Collectors.toSet());
+        .map(path -> path.split("/")).map(t -> t[t.length - 1]).collect(toSet());
     assertEquals(expectedNames, resultNames);
   }
 
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index 025d3ccf34..ffc88ccf6c 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -391,7 +391,7 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread {
           if (actions.contains(ManagementAction.NEEDS_COMPACTING)) {
             var jobs = compactionGenerator.generateJobs(tm,
                 TabletManagementIterator.determineCompactionKinds(actions));
-            LOG.debug("{} may need compacting.", tm.getExtent());
+            LOG.debug("{} may need compacting adding {} jobs", tm.getExtent(), jobs.size());
             manager.getCompactionQueues().add(tm, jobs);
           }
 
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 49a27ef7db..636156a624 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -90,7 +90,6 @@ import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata;
 import org.apache.accumulo.core.metadata.schema.SelectedFiles;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.rpc.ThriftUtil;
-import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
 import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
 import org.apache.accumulo.core.spi.compaction.CompactionJob;
 import org.apache.accumulo.core.spi.compaction.CompactionKind;
@@ -119,8 +118,6 @@ import org.apache.accumulo.server.tablets.TabletNameGenerator;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -434,21 +431,6 @@ public class CompactionCoordinator implements CompactionCoordinatorService.Iface
 
   }
 
-  /**
-   * Return the Thrift client for the TServer
-   *
-   * @param tserver tserver instance
-   * @return thrift client
-   * @throws TTransportException thrift error
-   */
-  protected TabletServerClientService.Client getTabletServerConnection(TServerInstance tserver)
-      throws TTransportException {
-    LiveTServerSet.TServerConnection connection = tserverSet.getConnection(tserver);
-    TTransport transport =
-        this.ctx.getTransportPool().getTransport(connection.getAddress(), 0, this.ctx);
-    return ThriftUtil.createClient(ThriftClientTypes.TABLET_SERVER, transport);
-  }
-
   // ELASTICITY_TODO unit test this code
   private boolean canReserveCompaction(TabletMetadata tablet, CompactionJob job,
       Set<StoredTabletFile> jobFiles) {
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index 53ded2699c..037488c46f 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -182,8 +182,9 @@ public class CompactionIT extends AccumuloClusterHarness {
   public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
     cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
     cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
-    cfg.setProperty(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL, "5ms");
-    cfg.setProperty(Property.COMPACTOR_MIN_JOB_WAIT_TIME, "10ms");
+    cfg.setProperty(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL, "1s");
+    cfg.setProperty(Property.COMPACTOR_MIN_JOB_WAIT_TIME, "100ms");
+    cfg.setProperty(Property.COMPACTOR_MAX_JOB_WAIT_TIME, "1s");
     // use raw local file system so walogs sync and flush will work
     hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
   }
@@ -313,6 +314,9 @@ public class CompactionIT extends AccumuloClusterHarness {
       client.tableOperations().setProperty(table1, Property.TABLE_MAJC_RATIO.getKey(), "1001");
       TableId tid = TableId.of(client.tableOperations().tableIdMap().get(table1));
 
+      // In addition to testing errors in compactions, this test also exercises creating lots of
+      // files to compact. The following will create 1000 files to compact. When changing this test
+      // try to keep both or create a new test for lots of files to compact.
       ReadWriteIT.ingest(client, MAX_DATA, 1, 1, 0, "colf", table1, 1);
 
       Ample ample = ((ClientContext) client).getAmple();
@@ -325,12 +329,13 @@ public class CompactionIT extends AccumuloClusterHarness {
       client.tableOperations().attachIterator(table1, setting, EnumSet.of(IteratorScope.majc));
       client.tableOperations().compact(table1, new CompactionConfig().setWait(true));
 
-      tms = ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build();
+      tms = ample.readTablets().forTable(tid).fetch(ColumnType.FILES, ColumnType.ECOMP).build();
       tm = tms.iterator().next();
       assertEquals(1, tm.getFiles().size());
+      // ensure the failed compactions did not leave anything in the metadata table
+      assertEquals(0, tm.getExternalCompactions().size());
 
       ReadWriteIT.verify(client, MAX_DATA, 1, 1, 0, table1);
-
     }
   }