You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2023/07/27 20:45:14 UTC
[accumulo] branch elasticity updated: Makes DefaultCompactionPlanner emit multiple jobs (#3662)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new 97b08d0b32 Makes DefaultCompactionPlanner emit multiple jobs (#3662)
97b08d0b32 is described below
commit 97b08d0b32f0d173e27dcb1dd46dc6080bd2cdef
Author: Keith Turner <kt...@apache.org>
AuthorDate: Thu Jul 27 16:45:08 2023 -0400
Makes DefaultCompactionPlanner emit multiple jobs (#3662)
The DefaultCompactionPlanner would only emit one compaction job per
tablet. This commit changes it to be able to emit multiple compactions
for a single tablet as long as doing so is optimal w.r.t. to the
compaction ratio.
---
.../spi/compaction/DefaultCompactionPlanner.java | 188 ++++++++++++---------
.../compaction/DefaultCompactionPlannerTest.java | 167 +++++++++++++++++-
.../accumulo/manager/TabletGroupWatcher.java | 2 +-
.../coordinator/CompactionCoordinator.java | 18 --
.../accumulo/test/functional/CompactionIT.java | 13 +-
5 files changed, 276 insertions(+), 112 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
index a3c76ed4ce..e3f5f6535a 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java
@@ -180,6 +180,22 @@ public class DefaultCompactionPlanner implements CompactionPlanner {
}
}
+ private static class FakeFileGenerator {
+
+ private int count = 0;
+
+ public CompactableFile create(long size) {
+ try {
+ count++;
+ return CompactableFile.create(
+ new URI("hdfs://fake/accumulo/tables/adef/t-zzFAKEzz/FAKE-0000" + count + ".rf"), size,
+ 0);
+ } catch (URISyntaxException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
private List<Executor> executors;
private int maxFilesToCompact;
@@ -258,76 +274,93 @@ public class DefaultCompactionPlanner implements CompactionPlanner {
Set<CompactableFile> filesCopy = new HashSet<>(params.getCandidates());
+ FakeFileGenerator fakeFileGenerator = new FakeFileGenerator();
+
long maxSizeToCompact = getMaxSizeToCompact(params.getKind());
- Collection<CompactableFile> group;
- if (params.getRunningCompactions().isEmpty()) {
- group =
- findDataFilesToCompact(filesCopy, params.getRatio(), maxFilesToCompact, maxSizeToCompact);
+ // This set represents future files that will be produced by running compactions. If the optimal
+ // set of files to compact is computed and contains one of these files, then its optimal to wait
+ // for this compaction to finish.
+ Set<CompactableFile> expectedFiles = new HashSet<>();
+ params.getRunningCompactions().stream().filter(job -> job.getKind() == params.getKind())
+ .map(job -> getExpected(job.getFiles(), fakeFileGenerator))
+ .forEach(compactableFile -> Preconditions.checkState(expectedFiles.add(compactableFile)));
+ Preconditions.checkState(Collections.disjoint(expectedFiles, filesCopy));
+ filesCopy.addAll(expectedFiles);
- if (!group.isEmpty() && group.size() < params.getCandidates().size()
- && params.getCandidates().size() <= maxFilesToCompact
- && (params.getKind() == CompactionKind.USER
- || params.getKind() == CompactionKind.SELECTOR)) {
- // USER and SELECTOR compactions must eventually compact all files. When a subset of files
- // that meets the compaction ratio is selected, look ahead and see if the next compaction
- // would also meet the compaction ratio. If not then compact everything to avoid doing
- // more than logarithmic work across multiple comapctions.
-
- filesCopy.removeAll(group);
- filesCopy.add(getExpected(group, 0));
-
- if (findDataFilesToCompact(filesCopy, params.getRatio(), maxFilesToCompact,
- maxSizeToCompact).isEmpty()) {
- // The next possible compaction does not meet the compaction ratio, so compact
- // everything.
- group = Set.copyOf(params.getCandidates());
- }
+ List<Collection<CompactableFile>> compactionJobs = new ArrayList<>();
+
+ while (true) {
+ var filesToCompact =
+ findDataFilesToCompact(filesCopy, params.getRatio(), maxFilesToCompact, maxSizeToCompact);
+ if (!Collections.disjoint(filesToCompact, expectedFiles)) {
+ // the optimal set of files to compact includes the output of a running compaction, so lets
+ // wait for that running compaction to finish.
+ break;
+ }
+ if (filesToCompact.isEmpty()) {
+ break;
}
- } else if (params.getKind() == CompactionKind.SYSTEM) {
- // This code determines if once the files compacting finish would they be included in a
- // compaction with the files smaller than them? If so, then wait for the running compaction
- // to complete.
+ filesCopy.removeAll(filesToCompact);
+
+ // A compaction job will be created for these files, so lets add an expected file for that
+ // planned compaction job. Then if future iterations of this loop will include that file then
+ // they will not compact.
+ var expectedFile = getExpected(filesToCompact, fakeFileGenerator);
+ Preconditions.checkState(expectedFiles.add(expectedFile));
+ Preconditions.checkState(filesCopy.add(expectedFile));
- // The set of files running compactions may produce
- var expectedFiles = getExpected(params.getRunningCompactions());
+ compactionJobs.add(filesToCompact);
- if (!Collections.disjoint(filesCopy, expectedFiles)) {
- throw new AssertionError();
+ if (filesToCompact.size() < maxFilesToCompact) {
+ // Only continue looking for more compaction jobs when a set of files is found equals
+ // maxFilesToCompact in size. When the files found is less than the max size its an
+ // indication that the compaction ratio was no longer met and therefore it would be
+ // suboptimal to look for more jobs because the smallest optimal set was found.
+ break;
}
+ }
- filesCopy.addAll(expectedFiles);
+ if (compactionJobs.size() == 1
+ && (params.getKind() == CompactionKind.USER || params.getKind() == CompactionKind.SELECTOR)
+ && compactionJobs.get(0).size() < params.getCandidates().size()
+ && compactionJobs.get(0).size() <= maxFilesToCompact) {
+ // USER and SELECTOR compactions must eventually compact all files. When a subset of files
+ // that meets the compaction ratio is selected, look ahead and see if the next compaction
+ // would also meet the compaction ratio. If not then compact everything to avoid doing
+ // more than logarithmic work across multiple comapctions.
- group =
- findDataFilesToCompact(filesCopy, params.getRatio(), maxFilesToCompact, maxSizeToCompact);
+ var group = compactionJobs.get(0);
+ var candidatesCopy = new HashSet<>(params.getCandidates());
+
+ candidatesCopy.removeAll(group);
+ Preconditions.checkState(candidatesCopy.add(getExpected(group, fakeFileGenerator)));
- if (!Collections.disjoint(group, expectedFiles)) {
- // file produced by running compaction will eventually compact with existing files, so
- // wait.
- group = Set.of();
+ if (findDataFilesToCompact(candidatesCopy, params.getRatio(), maxFilesToCompact,
+ maxSizeToCompact).isEmpty()) {
+ // The next possible compaction does not meet the compaction ratio, so compact
+ // everything.
+ compactionJobs.set(0, Set.copyOf(params.getCandidates()));
}
- } else {
- group = Set.of();
}
- if (group.isEmpty()
+ if (compactionJobs.isEmpty()
&& (params.getKind() == CompactionKind.USER || params.getKind() == CompactionKind.SELECTOR
|| params.getKind() == CompactionKind.CHOP)
&& params.getRunningCompactions().stream()
.noneMatch(job -> job.getKind() == params.getKind())) {
- group = findMaximalRequiredSetToCompact(params.getCandidates(), maxFilesToCompact);
+ // These kinds of compaction require files to compact even if none of the files meet the
+ // compaction ratio. No files were found using the compaction ratio and no compactions are
+ // running, so force a compaction.
+ compactionJobs = findMaximalRequiredSetToCompact(params.getCandidates(), maxFilesToCompact);
}
- if (group.isEmpty()) {
- return params.createPlanBuilder().build();
- } else {
- // determine which executor to use based on the size of the files
- var ceid = getExecutor(group);
-
- return params.createPlanBuilder().addJob(createPriority(params, group), ceid, group).build();
- }
+ var builder = params.createPlanBuilder();
+ compactionJobs.forEach(jobFiles -> builder.addJob(createPriority(params, jobFiles),
+ getExecutor(jobFiles), jobFiles));
+ return builder.build();
}
private static short createPriority(PlanningParameters params,
@@ -346,51 +379,42 @@ public class DefaultCompactionPlanner implements CompactionPlanner {
return Long.MAX_VALUE;
}
- private CompactableFile getExpected(Collection<CompactableFile> files, int count) {
+ private CompactableFile getExpected(Collection<CompactableFile> files,
+ FakeFileGenerator fakeFileGenerator) {
long size = files.stream().mapToLong(CompactableFile::getEstimatedSize).sum();
- try {
- return CompactableFile.create(
- new URI("hdfs://fake/accumulo/tables/adef/t-zzFAKEzz/FAKE-0000" + count + ".rf"), size,
- 0);
- } catch (URISyntaxException e) {
- throw new IllegalStateException(e);
- }
+ return fakeFileGenerator.create(size);
}
- /**
- * @return the expected files sizes for sets of compacting files.
- */
- private Set<CompactableFile> getExpected(Collection<CompactionJob> compacting) {
-
- Set<CompactableFile> expected = new HashSet<>();
-
- int count = 0;
-
- for (CompactionJob job : compacting) {
- count++;
- expected.add(getExpected(job.getFiles(), count));
- }
-
- return expected;
- }
-
- private static Collection<CompactableFile>
+ private static List<Collection<CompactableFile>>
findMaximalRequiredSetToCompact(Collection<CompactableFile> files, int maxFilesToCompact) {
if (files.size() <= maxFilesToCompact) {
- return files;
+ return List.of(files);
}
List<CompactableFile> sortedFiles = sortByFileSize(files);
- int numToCompact = maxFilesToCompact;
+ // compute the number of full compaction jobs with full files that could run and then subtract
+ // 1. The 1 is subtracted because the last job is a special case.
+ int batches = sortedFiles.size() / maxFilesToCompact - 1;
- if (sortedFiles.size() > maxFilesToCompact && sortedFiles.size() < 2 * maxFilesToCompact) {
- // on the second to last compaction pass, compact the minimum amount of files possible
- numToCompact = sortedFiles.size() - maxFilesToCompact + 1;
- }
+ if (batches > 0) {
+ ArrayList<Collection<CompactableFile>> jobs = new ArrayList<>();
+ for (int i = 0; i < batches; i++) {
+ jobs.add(sortedFiles.subList(i * maxFilesToCompact, (i + 1) * maxFilesToCompact));
+ }
+ return jobs;
+ } else {
+ int numToCompact = maxFilesToCompact;
+
+ if (sortedFiles.size() > maxFilesToCompact && sortedFiles.size() < 2 * maxFilesToCompact) {
+ // On the second to last compaction pass, compact the minimum amount of files possible. This
+ // is done to avoid unnecessarily compacting the largest files more than once.
+ numToCompact = sortedFiles.size() - maxFilesToCompact + 1;
+ }
- return sortedFiles.subList(0, numToCompact);
+ return List.of(sortedFiles.subList(0, numToCompact));
+ }
}
static Collection<CompactableFile> findDataFilesToCompact(Set<CompactableFile> files,
diff --git a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
index 4fcd6e5714..76563555f6 100644
--- a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.core.spi.compaction;
import static com.google.common.collect.MoreCollectors.onlyElement;
+import static java.util.stream.Collectors.toSet;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -27,10 +28,12 @@ import static org.junit.jupiter.api.Assertions.fail;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
@@ -239,6 +242,149 @@ public class DefaultCompactionPlannerTest {
assertEquals(CompactionExecutorIdImpl.externalId("large"), job.getExecutor());
}
+ @Test
+ public void testMultipleCompactions() {
+ // This test validates that when a tablet has many files that multiple compaction jobs can be
+ // issued at the same time.
+ for (var kind : List.of(CompactionKind.USER, CompactionKind.SYSTEM)) {
+ var planner = createPlanner(false);
+ var all = IntStream.range(0, 990).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet());
+ // simulate 10 larger files, these should not compact at the same time as the smaller files.
+ // Its more optimal to wait for all of the smaller files to compact and them compact the
+ // output of compacting the smaller files with the larger files.
+ IntStream.range(990, 1000).mapToObj(i -> createCF("C" + i, 20000)).forEach(all::add);
+ var params = createPlanningParams(all, all, Set.of(), 2, kind);
+ var plan = planner.makePlan(params);
+
+ // There are 990 smaller files to compact. Should produce 66 jobs of 15 smaller files each.
+ assertEquals(66, plan.getJobs().size());
+ Set<CompactableFile> filesSeen = new HashSet<>();
+ plan.getJobs().forEach(job -> {
+ assertEquals(15, job.getFiles().size());
+ assertEquals(kind, job.getKind());
+ assertEquals(CompactionExecutorIdImpl.externalId("small"), job.getExecutor());
+ // ensure the files across all of the jobs are disjoint
+ job.getFiles().forEach(cf -> assertTrue(filesSeen.add(cf)));
+ });
+
+ // Ensure all of the smaller files are scheduled for compaction. Should not see any of the
+ // larger files.
+ assertEquals(IntStream.range(0, 990).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet()),
+ filesSeen);
+ }
+ }
+
+ @Test
+ public void testMultipleCompactionsAndLargeCompactionRatio() {
+ var planner = createPlanner(false);
+ var all = IntStream.range(0, 65).mapToObj(i -> createCF("F" + i, i + 1)).collect(toSet());
+ // This compaction ratio would not cause a system compaction, how a user compaction must compact
+ // all of the files so it should generate some compactions.
+ var params = createPlanningParams(all, all, Set.of(), 100, CompactionKind.USER);
+ var plan = planner.makePlan(params);
+
+ assertEquals(3, plan.getJobs().size());
+
+ var iterator = plan.getJobs().iterator();
+ var job1 = iterator.next();
+ var job2 = iterator.next();
+ var job3 = iterator.next();
+ assertTrue(Collections.disjoint(job1.getFiles(), job2.getFiles()));
+ assertTrue(Collections.disjoint(job1.getFiles(), job3.getFiles()));
+ assertTrue(Collections.disjoint(job2.getFiles(), job3.getFiles()));
+
+ for (var job : plan.getJobs()) {
+ assertEquals(15, job.getFiles().size());
+ assertEquals(CompactionKind.USER, job.getKind());
+ assertTrue(all.containsAll(job.getFiles()));
+ // Should select three sets of files that are from the smallest 45 files.
+ assertTrue(job.getFiles().stream().mapToLong(CompactableFile::getEstimatedSize).sum()
+ <= IntStream.range(1, 46).sum());
+ }
+ }
+
+ @Test
+ public void testMultipleCompactionsAndRunningCompactions() {
+ // This test validates that when a tablet has many files that multiple compaction jobs can be
+ // issued at the same time even if there are running compaction as long everything meets the
+ // compaction ratio.
+ for (var kind : List.of(CompactionKind.USER, CompactionKind.SYSTEM)) {
+ var planner = createPlanner(false);
+ var all = IntStream.range(0, 990).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet());
+ // simulate 10 larger files, these should not compact at the same time as the smaller files.
+ // Its more optimal to wait for all of the smaller files to compact and them compact the
+ // output of compacting the smaller files with the larger files.
+ IntStream.range(990, 1000).mapToObj(i -> createCF("C" + i, 20000)).forEach(all::add);
+ // 30 files are compacting, so they will not be in the candidate set.
+ var candidates =
+ IntStream.range(30, 990).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet());
+ // create two jobs covering the first 30 files
+ var job1 = createJob(kind, all,
+ IntStream.range(0, 15).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet()));
+ var job2 = createJob(kind, all,
+ IntStream.range(15, 30).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet()));
+ var params = createPlanningParams(all, candidates, Set.of(job1, job2), 2, kind);
+ var plan = planner.makePlan(params);
+
+ // There are 990 smaller files to compact. Should produce 66 jobs of 15 smaller files each.
+ assertEquals(64, plan.getJobs().size());
+ Set<CompactableFile> filesSeen = new HashSet<>();
+ plan.getJobs().forEach(job -> {
+ assertEquals(15, job.getFiles().size());
+ assertEquals(kind, job.getKind());
+ assertEquals(CompactionExecutorIdImpl.externalId("small"), job.getExecutor());
+ // ensure the files across all of the jobs are disjoint
+ job.getFiles().forEach(cf -> assertTrue(filesSeen.add(cf)));
+ });
+
+ // Ensure all of the smaller files are scheduled for compaction. Should not see any of the
+ // larger files.
+ assertEquals(IntStream.range(30, 990).mapToObj(i -> createCF("F" + i, 1000)).collect(toSet()),
+ filesSeen);
+ }
+ }
+
+ @Test
+ public void testUserCompactionDoesNotWaitOnSystemCompaction() {
+ // this test ensure user compactions do not wait on system compactions to complete
+ var planner = createPlanner(true);
+ var all = createCFs("F1", "1M", "F2", "1M", "F3", "1M", "F4", "3M", "F5", "3M", "F6", "3M",
+ "F7", "20M");
+ var candidates = createCFs("F4", "3M", "F5", "3M", "F6", "3M", "F7", "20M");
+ var compacting = Set
+ .of(createJob(CompactionKind.SYSTEM, all, createCFs("F1", "1M", "F2", "1M", "F3", "1M")));
+ var params = createPlanningParams(all, candidates, compacting, 2, CompactionKind.SYSTEM);
+ var plan = planner.makePlan(params);
+ // The planning of the system compaction should find its most optimal to wait on the running
+ // system compaction and emit zero jobs.
+ assertEquals(0, plan.getJobs().size());
+
+ params = createPlanningParams(all, candidates, compacting, 2, CompactionKind.USER);
+ plan = planner.makePlan(params);
+ // The planning of user compaction should not take the running system compaction into
+ // consideration and should create a compaction job.
+ assertEquals(1, plan.getJobs().size());
+ assertEquals(createCFs("F4", "3M", "F5", "3M", "F6", "3M", "F7", "20M"),
+ getOnlyElement(plan.getJobs()).getFiles());
+
+ // Reverse the situation and turn the running compaction into a user compaction
+ compacting =
+ Set.of(createJob(CompactionKind.USER, all, createCFs("F1", "1M", "F2", "1M", "F3", "1M")));
+ params = createPlanningParams(all, candidates, compacting, 2, CompactionKind.SYSTEM);
+ plan = planner.makePlan(params);
+ // The planning of a system compaction should not take the running user compaction into account
+ // and should emit a job
+ assertEquals(1, plan.getJobs().size());
+ assertEquals(createCFs("F4", "3M", "F5", "3M", "F6", "3M"),
+ getOnlyElement(plan.getJobs()).getFiles());
+
+ params = createPlanningParams(all, candidates, compacting, 2, CompactionKind.USER);
+ plan = planner.makePlan(params);
+ // The planning of the user compaction should decide the most optimal thing to do is to wait on
+ // the running user compaction and should not emit any jobs.
+ assertEquals(0, plan.getJobs().size());
+ }
+
/**
* Tests internal type executor with no numThreads set throws error
*/
@@ -393,15 +539,22 @@ public class DefaultCompactionPlannerTest {
.getJobs().iterator().next();
}
- private static Set<CompactableFile> createCFs(String... namesSizePairs)
- throws URISyntaxException {
+ private static CompactableFile createCF(String name, long size) {
+ try {
+ return CompactableFile
+ .create(new URI("hdfs://fake/accumulo/tables/1/t-0000000z/" + name + ".rf"), size, 0);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static Set<CompactableFile> createCFs(String... namesSizePairs) {
Set<CompactableFile> files = new HashSet<>();
for (int i = 0; i < namesSizePairs.length; i += 2) {
String name = namesSizePairs[i];
long size = ConfigurationTypeHelper.getFixedMemoryAsBytes(namesSizePairs[i + 1]);
- files.add(CompactableFile
- .create(new URI("hdfs://fake/accumulo/tables/1/t-0000000z/" + name + ".rf"), size, 0));
+ files.add(createCF(name, size));
}
return files;
@@ -425,9 +578,9 @@ public class DefaultCompactionPlannerTest {
double ratio, int maxFiles, long maxSize) {
var result = DefaultCompactionPlanner.findDataFilesToCompact(files, ratio, maxFiles, maxSize);
var expectedNames = expected.stream().map(CompactableFile::getUri).map(URI::getPath)
- .map(path -> path.split("/")).map(t -> t[t.length - 1]).collect(Collectors.toSet());
+ .map(path -> path.split("/")).map(t -> t[t.length - 1]).collect(toSet());
var resultNames = result.stream().map(CompactableFile::getUri).map(URI::getPath)
- .map(path -> path.split("/")).map(t -> t[t.length - 1]).collect(Collectors.toSet());
+ .map(path -> path.split("/")).map(t -> t[t.length - 1]).collect(toSet());
assertEquals(expectedNames, resultNames);
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index 025d3ccf34..ffc88ccf6c 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -391,7 +391,7 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread {
if (actions.contains(ManagementAction.NEEDS_COMPACTING)) {
var jobs = compactionGenerator.generateJobs(tm,
TabletManagementIterator.determineCompactionKinds(actions));
- LOG.debug("{} may need compacting.", tm.getExtent());
+ LOG.debug("{} may need compacting adding {} jobs", tm.getExtent(), jobs.size());
manager.getCompactionQueues().add(tm, jobs);
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 49a27ef7db..636156a624 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -90,7 +90,6 @@ import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata;
import org.apache.accumulo.core.metadata.schema.SelectedFiles;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.rpc.ThriftUtil;
-import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
import org.apache.accumulo.core.spi.compaction.CompactionJob;
import org.apache.accumulo.core.spi.compaction.CompactionKind;
@@ -119,8 +118,6 @@ import org.apache.accumulo.server.tablets.TabletNameGenerator;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -434,21 +431,6 @@ public class CompactionCoordinator implements CompactionCoordinatorService.Iface
}
- /**
- * Return the Thrift client for the TServer
- *
- * @param tserver tserver instance
- * @return thrift client
- * @throws TTransportException thrift error
- */
- protected TabletServerClientService.Client getTabletServerConnection(TServerInstance tserver)
- throws TTransportException {
- LiveTServerSet.TServerConnection connection = tserverSet.getConnection(tserver);
- TTransport transport =
- this.ctx.getTransportPool().getTransport(connection.getAddress(), 0, this.ctx);
- return ThriftUtil.createClient(ThriftClientTypes.TABLET_SERVER, transport);
- }
-
// ELASTICITY_TODO unit test this code
private boolean canReserveCompaction(TabletMetadata tablet, CompactionJob job,
Set<StoredTabletFile> jobFiles) {
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index 53ded2699c..037488c46f 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -182,8 +182,9 @@ public class CompactionIT extends AccumuloClusterHarness {
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
- cfg.setProperty(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL, "5ms");
- cfg.setProperty(Property.COMPACTOR_MIN_JOB_WAIT_TIME, "10ms");
+ cfg.setProperty(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL, "1s");
+ cfg.setProperty(Property.COMPACTOR_MIN_JOB_WAIT_TIME, "100ms");
+ cfg.setProperty(Property.COMPACTOR_MAX_JOB_WAIT_TIME, "1s");
// use raw local file system so walogs sync and flush will work
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
}
@@ -313,6 +314,9 @@ public class CompactionIT extends AccumuloClusterHarness {
client.tableOperations().setProperty(table1, Property.TABLE_MAJC_RATIO.getKey(), "1001");
TableId tid = TableId.of(client.tableOperations().tableIdMap().get(table1));
+ // In addition to testing errors in compactions, this test also exercises creating lots of
+ // files to compact. The following will create 1000 files to compact. When changing this test
+ // try to keep both or create a new test for lots of files to compact.
ReadWriteIT.ingest(client, MAX_DATA, 1, 1, 0, "colf", table1, 1);
Ample ample = ((ClientContext) client).getAmple();
@@ -325,12 +329,13 @@ public class CompactionIT extends AccumuloClusterHarness {
client.tableOperations().attachIterator(table1, setting, EnumSet.of(IteratorScope.majc));
client.tableOperations().compact(table1, new CompactionConfig().setWait(true));
- tms = ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build();
+ tms = ample.readTablets().forTable(tid).fetch(ColumnType.FILES, ColumnType.ECOMP).build();
tm = tms.iterator().next();
assertEquals(1, tm.getFiles().size());
+ // ensure the failed compactions did not leave anything in the metadata table
+ assertEquals(0, tm.getExternalCompactions().size());
ReadWriteIT.verify(client, MAX_DATA, 1, 1, 0, table1);
-
}
}