You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by jm...@apache.org on 2021/07/29 12:08:06 UTC
[accumulo] branch main updated: Remove continue point from Garbage
Collector (#2214)
This is an automated email from the ASF dual-hosted git repository.
jmark99 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 485114a Remove continue point from Garbage Collector (#2214)
485114a is described below
commit 485114a4760028e70d2eb4ed23b795ed386fbaa1
Author: Mark Owens <jm...@apache.org>
AuthorDate: Thu Jul 29 08:07:56 2021 -0400
Remove continue point from Garbage Collector (#2214)
Updated Garbage Collection code to no longer use a continue point when processing deletion candidates. The GC now uses an iterator that lasts during the lifetime of a GC cycle.
The GarbageCollectionTest was updated to work with the update, as was the GC integration test.
Closes #1351
---
.../accumulo/core/metadata/schema/Ample.java | 2 +-
.../accumulo/server/metadata/ServerAmpleImpl.java | 14 +--
.../accumulo/server/util/ListVolumesUsed.java | 2 +-
.../accumulo/gc/GarbageCollectionAlgorithm.java | 41 +++---
.../accumulo/gc/GarbageCollectionEnvironment.java | 25 ++--
.../apache/accumulo/gc/SimpleGarbageCollector.java | 22 ++--
.../apache/accumulo/gc/GarbageCollectionTest.java | 139 ++++++++++++++++++++-
.../test/functional/GarbageCollectorIT.java | 3 +
8 files changed, 185 insertions(+), 63 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index fd5634e..18bc12a 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -198,7 +198,7 @@ public interface Ample {
throw new UnsupportedOperationException();
}
- default Iterator<String> getGcCandidates(DataLevel level, String continuePoint) {
+ default Iterator<String> getGcCandidates(DataLevel level) {
throw new UnsupportedOperationException();
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
index 1f7eefe..5ef7237 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
@@ -32,9 +32,7 @@ import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -163,7 +161,7 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample {
}
@Override
- public Iterator<String> getGcCandidates(DataLevel level, String continuePoint) {
+ public Iterator<String> getGcCandidates(DataLevel level) {
if (level == DataLevel.ROOT) {
var zooReader = new ZooReader(context.getZooKeepers(), context.getZooKeepersSessionTimeOut());
byte[] json;
@@ -173,19 +171,9 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample {
throw new RuntimeException(e);
}
Stream<String> candidates = RootGcCandidates.fromJson(json).stream().sorted();
-
- if (continuePoint != null && !continuePoint.isEmpty()) {
- candidates = candidates.dropWhile(candidate -> candidate.compareTo(continuePoint) <= 0);
- }
-
return candidates.iterator();
} else if (level == DataLevel.METADATA || level == DataLevel.USER) {
Range range = DeletesSection.getRange();
- if (continuePoint != null && !continuePoint.isEmpty()) {
- String continueRow = DeletesSection.encodeRow(continuePoint);
- range = new Range(new Key(continueRow).followingKey(PartialKey.ROW), true,
- range.getEndKey(), range.isEndKeyInclusive());
- }
Scanner scanner;
try {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
index 3eb8c80..212822f 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
@@ -78,7 +78,7 @@ public class ListVolumesUsed {
+ " deletes section (volume replacement occurs at deletion time)");
volumes.clear();
- Iterator<String> delPaths = context.getAmple().getGcCandidates(level, "");
+ Iterator<String> delPaths = context.getAmple().getGcCandidates(level);
while (delPaths.hasNext()) {
volumes.add(getTableURI(delPaths.next()));
}
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java
index 595af24..643f46c 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionAlgorithm.java
@@ -267,10 +267,10 @@ public class GarbageCollectionAlgorithm {
}
- private boolean getCandidates(GarbageCollectionEnvironment gce, String lastCandidate,
- List<String> candidates) throws TableNotFoundException {
+ private Iterator<String> getCandidates(GarbageCollectionEnvironment gce)
+ throws TableNotFoundException, IOException {
try (TraceScope candidatesSpan = Trace.startSpan("getCandidates")) {
- return gce.getCandidates(lastCandidate, candidates);
+ return gce.getCandidates();
}
}
@@ -292,28 +292,29 @@ public class GarbageCollectionAlgorithm {
public void collect(GarbageCollectionEnvironment gce) throws TableNotFoundException, IOException {
- String lastCandidate = "";
+ Iterator<String> candidatesIter = getCandidates(gce);
- boolean outOfMemory = true;
- while (outOfMemory) {
- List<String> candidates = new ArrayList<>();
-
- outOfMemory = getCandidates(gce, lastCandidate, candidates);
+ while (candidatesIter.hasNext()) {
+ List<String> batchOfCandidates = gce.readCandidatesThatFitInMemory(candidatesIter);
+ deleteBatch(gce, batchOfCandidates);
+ }
+ }
- if (candidates.isEmpty())
- break;
- else
- lastCandidate = candidates.get(candidates.size() - 1);
+ /**
+ * Given a sub-list of possible deletion candidates, process and remove valid deletion candidates.
+ */
+ private void deleteBatch(GarbageCollectionEnvironment gce, List<String> currentBatch)
+ throws TableNotFoundException, IOException {
- long origSize = candidates.size();
- gce.incrementCandidatesStat(origSize);
+ long origSize = currentBatch.size();
+ gce.incrementCandidatesStat(origSize);
- SortedMap<String,String> candidateMap = makeRelative(candidates);
+ SortedMap<String,String> candidateMap = makeRelative(currentBatch);
- confirmDeletesTrace(gce, candidateMap);
- gce.incrementInUseStat(origSize - candidateMap.size());
+ confirmDeletesTrace(gce, candidateMap);
+ gce.incrementInUseStat(origSize - candidateMap.size());
- deleteConfirmed(gce, candidateMap);
- }
+ deleteConfirmed(gce, candidateMap);
}
+
}
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java
index ae8ffbb..add7c7f 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectionEnvironment.java
@@ -40,19 +40,22 @@ import org.apache.accumulo.server.replication.proto.Replication.Status;
public interface GarbageCollectionEnvironment {
/**
- * Return a list of paths to files and dirs which are candidates for deletion from a given table,
- * {@link RootTable#NAME} or {@link MetadataTable#NAME}
+ * Return an iterator which points to a list of paths to files and dirs which are candidates for
+ * deletion from a given table, {@link RootTable#NAME} or {@link MetadataTable#NAME}
+ *
+ * @return an iterator referencing a List containing deletion candidates
+ */
+ Iterator<String> getCandidates() throws TableNotFoundException;
+
+ /**
+ * Given an iterator to a deletion candidate list, return a sub-list of candidates which fit
+ * within provided memory constraints.
*
- * @param continuePoint
- * A row to resume from if a previous invocation was stopped due to finding an extremely
- * large number of candidates to remove which would have exceeded memory limitations
- * @param candidates
- * A collection of candidates files for deletion, may not be the complete collection of
- * files for deletion at this point in time
- * @return true if the results are short due to insufficient memory, otherwise false
+ * @param candidatesIter
+ * iterator referencing a List of possible deletion candidates
+ * @return a List of possible deletion candidates
*/
- boolean getCandidates(String continuePoint, List<String> candidates)
- throws TableNotFoundException;
+ List<String> readCandidatesThatFitInMemory(Iterator<String> candidatesIter);
/**
* Fetch a list of paths for all bulk loads in progress (blip) from a given table,
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 2223294..10bed91 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -209,29 +209,29 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface {
}
@Override
- public boolean getCandidates(String continuePoint, List<String> result)
- throws TableNotFoundException {
+ public Iterator<String> getCandidates() throws TableNotFoundException {
+ return getContext().getAmple().getGcCandidates(level);
+ }
- Iterator<String> candidates = getContext().getAmple().getGcCandidates(level, continuePoint);
+ @Override
+ public List<String> readCandidatesThatFitInMemory(Iterator<String> candidates) {
long candidateLength = 0;
// Converting the bytes to approximate number of characters for batch size.
long candidateBatchSize = getCandidateBatchSize() / 2;
- result.clear();
+ List<String> candidatesBatch = new ArrayList<>();
while (candidates.hasNext()) {
String candidate = candidates.next();
candidateLength += candidate.length();
- result.add(candidate);
+ candidatesBatch.add(candidate);
if (candidateLength > candidateBatchSize) {
- log.info(
- "Candidate batch of size {} has exceeded the"
- + " threshold. Attempting to delete what has been gathered so far.",
- candidateLength);
- return true;
+ log.info("Candidate batch of size {} has exceeded the threshold. Attempting to delete "
+ + "what has been gathered so far.", candidateLength);
+ return candidatesBatch;
}
}
- return false;
+ return candidatesBatch;
}
@Override
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java
index cd557ae..c6736ed 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java
@@ -34,12 +34,18 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Stream;
+import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication.Status;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class GarbageCollectionTest {
+
+ private static final Logger log = LoggerFactory.getLogger(GarbageCollectionTest.class);
+
static class TestGCE implements GarbageCollectionEnvironment {
TreeSet<String> candidates = new TreeSet<>();
ArrayList<String> blips = new ArrayList<>();
@@ -51,13 +57,17 @@ public class GarbageCollectionTest {
TreeMap<String,Status> filesToReplicate = new TreeMap<>();
@Override
- public boolean getCandidates(String continuePoint, List<String> ret) {
- Iterator<String> iter = candidates.tailSet(continuePoint, false).iterator();
- while (iter.hasNext() && ret.size() < 3) {
- ret.add(iter.next());
- }
+ public Iterator<String> getCandidates() throws TableNotFoundException {
+ return List.copyOf(candidates).iterator();
+ }
- return ret.size() == 3;
+ @Override
+ public List<String> readCandidatesThatFitInMemory(Iterator<String> candidatesIter) {
+ List<String> candidatesBatch = new ArrayList<>();
+ while (candidatesIter.hasNext() && candidatesBatch.size() < 3) {
+ candidatesBatch.add(candidatesIter.next());
+ }
+ return candidatesBatch;
}
@Override
@@ -123,6 +133,28 @@ public class GarbageCollectionTest {
assertEquals(0, gce.deletes.size());
}
+ // This test was created to help track down a ConcurrentModificationException error that was
+ // occurring with the unit tests once the GC was refactored to use a single iterator for the
+ // collect process. This was a minimal test case that would cause the exception to occur.
+ @Test
+ public void minimalDelete() throws Exception {
+ TestGCE gce = new TestGCE();
+
+ gce.candidates.add("hdfs://foo:6000/accumulo/tables/4/t0/F000.rf");
+ gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/4/t0/F001.rf");
+ gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/5/t0/F005.rf");
+ gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/6/t0/F006.rf");
+
+ gce.addFileReference("4", null, "hdfs://foo.com:6000/accumulo/tables/4/t0/F000.rf");
+ gce.addFileReference("4", null, "hdfs://foo.com:6000/accumulo/tables/4/t0/F001.rf");
+ gce.addFileReference("6", null, "hdfs://foo.com:6000/accumulo/tables/6/t0/F006.rf");
+
+ GarbageCollectionAlgorithm gca = new GarbageCollectionAlgorithm();
+ gca.collect(gce);
+
+ assertRemoved(gce, "hdfs://foo.com:6000/accumulo/tables/5/t0/F005.rf");
+ }
+
@Test
public void testBasic() throws Exception {
TestGCE gce = new TestGCE();
@@ -166,6 +198,101 @@ public class GarbageCollectionTest {
}
+ /*
+ * Additional test with more candidates. Also, not a multiple of 3 as the test above. Since the
+ * unit tests always return 3 candidates in a batch, some edge cases could be missed if that was
+ * always the case.
+ */
+ @Test
+ public void testBasic2() throws Exception {
+ TestGCE gce = new TestGCE();
+
+ gce.candidates.add("hdfs://foo:6000/accumulo/tables/4/t0/F000.rf");
+ gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/4/t0/F001.rf");
+ gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/5/t0/F005.rf");
+
+ gce.candidates.add("hdfs://foo:6000/accumulo/tables/5/t0/F000.rf");
+ gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/5/t0/F001.rf");
+ gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/6/t1/F005.rf");
+
+ gce.candidates.add("hdfs://foo:6000/accumulo/tables/6/t0/F000.rf");
+ gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/6/t0/F001.rf");
+ gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/7/t0/F005.rf");
+
+ gce.candidates.add("hdfs://foo:6000/accumulo/tables/7/t0/F000.rf");
+ gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/7/t0/F001.rf");
+ gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/8/t0/F005.rf");
+
+ gce.candidates.add("hdfs://foo:6000/accumulo/tables/8/t0/F000.rf");
+ gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/8/t0/F001.rf");
+ gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/9/t0/F005.rf");
+
+ gce.candidates.add("hdfs://foo:6000/accumulo/tables/9/t0/F000.rf");
+ gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/9/t0/F001.rf");
+ gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/10/t0/F005.rf");
+
+ gce.candidates.add("hdfs://foo:6000/accumulo/tables/10/t0/F000.rf");
+ gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/10/t0/F001.rf");
+ gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/11/t0/F005.rf");
+
+ gce.candidates.add("hdfs://foo:6000/accumulo/tables/11/t0/F000.rf");
+ gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/11/t0/F001.rf");
+
+ gce.addFileReference("4", null, "hdfs://foo.com:6000/accumulo/tables/4/t0/F000.rf");
+ gce.addFileReference("4", null, "hdfs://foo:6000/accumulo/tables/4/t0/F001.rf");
+ gce.addFileReference("4", null, "hdfs://foo.com:6000/accumulo/tables/4/t0//F002.rf");
+ gce.addFileReference("5", null, "hdfs://foo.com:6000/accumulo/tables/5/t0/F005.rf");
+
+ GarbageCollectionAlgorithm gca = new GarbageCollectionAlgorithm();
+
+ gca.collect(gce);
+ // items to be removed from candidates
+ String[] toBeRemoved = {"hdfs://foo.com:6000/accumulo/tables/5/t0/F001.rf",
+ "hdfs://foo:6000/accumulo/tables/5/t0/F000.rf",
+ "hdfs://foo.com:6000/accumulo/tables/6/t1/F005.rf",
+ "hdfs://foo:6000/accumulo/tables/6/t0/F000.rf",
+ "hdfs://foo.com:6000/accumulo/tables/6/t0/F001.rf",
+ "hdfs://foo.com:6000/accumulo/tables/7/t0/F005.rf",
+ "hdfs://foo:6000/accumulo/tables/7/t0/F000.rf",
+ "hdfs://foo.com:6000/accumulo/tables/7/t0/F001.rf",
+ "hdfs://foo.com:6000/accumulo/tables/8/t0/F005.rf",
+ "hdfs://foo:6000/accumulo/tables/8/t0/F000.rf",
+ "hdfs://foo.com:6000/accumulo/tables/8/t0/F001.rf",
+ "hdfs://foo.com:6000/accumulo/tables/9/t0/F005.rf",
+ "hdfs://foo:6000/accumulo/tables/9/t0/F000.rf",
+ "hdfs://foo.com:6000/accumulo/tables/9/t0/F001.rf",
+ "hdfs://foo.com:6000/accumulo/tables/10/t0/F005.rf",
+ "hdfs://foo:6000/accumulo/tables/10/t0/F000.rf",
+ "hdfs://foo.com:6000/accumulo/tables/10/t0/F001.rf",
+ "hdfs://foo.com:6000/accumulo/tables/11/t0/F005.rf",
+ "hdfs://foo:6000/accumulo/tables/11/t0/F000.rf",
+ "hdfs://foo.com:6000/accumulo/tables/11/t0/F001.rf"};
+ assertRemoved(gce, toBeRemoved);
+
+ // Remove the reference to this flush file, run the GC which should not trim it from the
+ // candidates, and assert that it's gone
+ gce.removeFileReference("4", null, "hdfs://foo.com:6000/accumulo/tables/4/t0/F000.rf");
+ gca.collect(gce);
+ assertRemoved(gce, "hdfs://foo:6000/accumulo/tables/4/t0/F000.rf");
+
+ // Removing a reference to a file that wasn't in the candidates should do nothing
+ gce.removeFileReference("4", null, "hdfs://foo.com:6000/accumulo/tables/4/t0/F002.rf");
+ gca.collect(gce);
+ assertRemoved(gce);
+
+ // Remove the reference to a file in the candidates should cause it to be removed
+ gce.removeFileReference("4", null, "hdfs://foo:6000/accumulo/tables/4/t0/F001.rf");
+ gca.collect(gce);
+ assertRemoved(gce, "hdfs://foo.com:6000/accumulo/tables/4/t0/F001.rf");
+
+ // Adding more candidates which do no have references should be removed
+ gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/4/t0/F003.rf");
+ gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/4/t0/F004.rf");
+ gca.collect(gce);
+ assertRemoved(gce, "hdfs://foo.com:6000/accumulo/tables/4/t0/F003.rf",
+ "hdfs://foo.com:6000/accumulo/tables/4/t0/F004.rf");
+ }
+
@Test
public void testRelative() throws Exception {
TestGCE gce = new TestGCE();
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
index f0ac9c1..6452f20 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
@@ -90,6 +90,9 @@ public class GarbageCollectorIT extends ConfigurableMacBase {
cfg.setProperty(Property.GC_PORT, "0");
cfg.setProperty(Property.TSERV_MAXMEM, "5K");
cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
+ // reduce the batch size significantly in order to cause the integration tests to have
+ // to process many batches of deletion candidates.
+ cfg.setProperty(Property.GC_CANDIDATE_BATCH_SIZE, "256K");
// use raw local file system so walogs sync and flush will work
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());