You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by si...@apache.org on 2023/02/03 01:51:16 UTC
[ozone] branch HDDS-6517-Snapshot updated: HDDS-7873. [Snapshot] Prune backup SST files that can be expanded periodically (#4235)
This is an automated email from the ASF dual-hosted git repository.
siyao pushed a commit to branch HDDS-6517-Snapshot
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-6517-Snapshot by this push:
new 506a0d42d1 HDDS-7873. [Snapshot] Prune backup SST files that can be expanded periodically (#4235)
506a0d42d1 is described below
commit 506a0d42d1b608767867ed3f8146785e9b0de5d0
Author: Hemant Kumar <he...@gmail.com>
AuthorDate: Thu Feb 2 17:51:11 2023 -0800
HDDS-7873. [Snapshot] Prune backup SST files that can be expanded periodically (#4235)
---
.../ozone/rocksdiff/RocksDBCheckpointDiffer.java | 40 ++++++-
.../rocksdiff/TestRocksDBCheckpointDiffer.java | 130 ++++++++++++++++++++-
2 files changed, 160 insertions(+), 10 deletions(-)
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index bcdbc28bc4..47fef56405 100644
--- a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -72,8 +72,8 @@ import static java.util.Arrays.asList;
// rocksDB checkpoints.
// - This bootstrapping should also receive the compaction-DAG information
// 9. Handle rebuilding the DAG for a lagging follower. There are two cases
-// - recieve RATIS transactions to replay. Nothing needs to be done in
-// thise case.
+// - receive RATIS transactions to replay. Nothing needs to be done in
+// these cases.
// - Getting the DB sync. This case needs to handle getting the
// compaction-DAG information as well.
@@ -145,7 +145,7 @@ public class RocksDBCheckpointDiffer implements AutoCloseable {
* Used to trim the file extension when writing compaction entries to the log
* to save space.
*/
- private static final String SST_FILE_EXTENSION = ".sst";
+ static final String SST_FILE_EXTENSION = ".sst";
private static final int SST_FILE_EXTENSION_LENGTH =
SST_FILE_EXTENSION.length();
@@ -202,6 +202,13 @@ public class RocksDBCheckpointDiffer implements AutoCloseable {
pruneCompactionDagDaemonRunIntervalInMs,
pruneCompactionDagDaemonRunIntervalInMs,
TimeUnit.MILLISECONDS);
+
+ this.executor.scheduleWithFixedDelay(
+ this::pruneSstFiles,
+ pruneCompactionDagDaemonRunIntervalInMs,
+ pruneCompactionDagDaemonRunIntervalInMs,
+ TimeUnit.MILLISECONDS
+ );
}
}
@@ -1058,7 +1065,8 @@ public class RocksDBCheckpointDiffer implements AutoCloseable {
*/
private void removeSstFile(Set<String> sstFileNodes) {
for (String sstFileNode: sstFileNodes) {
- File file = new File(sstBackupDir + sstFileNode + SST_FILE_EXTENSION);
+ File file =
+ new File(sstBackupDir + "/" + sstFileNode + SST_FILE_EXTENSION);
try {
Files.deleteIfExists(file.toPath());
} catch (IOException exception) {
@@ -1328,6 +1336,30 @@ public class RocksDBCheckpointDiffer implements AutoCloseable {
}
}
+ /**
+ * Defines the task that removes SST files from backup directory which are
+ * not needed to generate snapshot diff using compaction DAG to clean
+ * the disk space.
+ * We can’t simply delete input files in the compaction completed listener
+ * because it is not known which of input files are from previous compaction
+ * and which were created after the compaction.
+ * We can remove SST files which were created from the compaction because
+ * those are not needed to generate snapshot diff. These files are basically
+ * non-leaf nodes of the DAG.
+ */
+ public void pruneSstFiles() {
+ Set<String> nonLeafSstFiles;
+
+ synchronized (compactionListenerWriteLock) {
+ nonLeafSstFiles = forwardCompactionDAG.nodes().stream()
+ .filter(node -> !forwardCompactionDAG.successors(node).isEmpty())
+ .map(node -> node.getFileName())
+ .collect(Collectors.toSet());
+ }
+
+ removeSstFile(nonLeafSstFiles);
+ }
+
@VisibleForTesting
public boolean debugEnabled(Integer level) {
return DEBUG_LEVEL.contains(level);
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
index af5aec638a..871b27446e 100644
--- a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
@@ -22,10 +22,12 @@ import static java.util.Arrays.asList;
import static java.util.concurrent.TimeUnit.MINUTES;
import com.google.common.graph.GraphBuilder;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -68,6 +70,7 @@ import org.slf4j.event.Level;
import static org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.COMPACTION_LOG_FILE_NAME_SUFFIX;
import static org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DEBUG_DAG_LIVE_NODES;
import static org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DEBUG_READ_ALL_DB_KEYS;
+import static org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.SST_FILE_EXTENSION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -456,10 +459,10 @@ public class TestRocksDBCheckpointDiffer {
return rocksDB;
}
- static boolean deleteDirectory(java.io.File directoryToBeDeleted) {
+ private boolean deleteDirectory(File directoryToBeDeleted) {
File[] allContents = directoryToBeDeleted.listFiles();
if (allContents != null) {
- for (java.io.File file : allContents) {
+ for (File file : allContents) {
if (!deleteDirectory(file)) {
return false;
}
@@ -470,8 +473,10 @@ public class TestRocksDBCheckpointDiffer {
// Read from a given RocksDB instance and optionally write all the
// keys to a given file.
- void readRocksDBInstance(String dbPathArg, RocksDB rocksDB, FileWriter file,
- RocksDBCheckpointDiffer differ) {
+ private void readRocksDBInstance(String dbPathArg,
+ RocksDB rocksDB,
+ FileWriter file,
+ RocksDBCheckpointDiffer differ) {
LOG.debug("Reading RocksDB: " + dbPathArg);
boolean createdDB = false;
@@ -1080,11 +1085,20 @@ public class TestRocksDBCheckpointDiffer {
differ.pruneOlderSnapshotsWithCompactionHistory();
- Set<String> actualNodes = differ.getForwardCompactionDAG().nodes().stream()
+ Set<String> actualNodesInForwardDAG = differ.getForwardCompactionDAG()
+ .nodes()
+ .stream()
.map(CompactionNode::getFileName)
.collect(Collectors.toSet());
- assertEquals(expectedNodes, actualNodes);
+ Set<String> actualNodesBackwardDAG = differ.getBackwardCompactionDAG()
+ .nodes()
+ .stream()
+ .map(CompactionNode::getFileName)
+ .collect(Collectors.toSet());
+
+ assertEquals(expectedNodes, actualNodesInForwardDAG);
+ assertEquals(expectedNodes, actualNodesBackwardDAG);
for (int i = 0; i < expectedNumberOfLogFilesDeleted; i++) {
File compactionFile = filesCreated.get(i);
@@ -1100,4 +1114,108 @@ public class TestRocksDBCheckpointDiffer {
deleteDirectory(compactionLogDir);
deleteDirectory(sstBackUpDir);
}
+
+ private static Stream<Arguments> sstFilePruningScenarios() {
+ return Stream.of(
+ Arguments.of("Case 1: No compaction.",
+ "",
+ Arrays.asList("000015", "000013", "000011", "000009"),
+ Arrays.asList("000015", "000013", "000011", "000009")
+ ),
+ Arguments.of("Case 2: One level compaction.",
+ "C 000015,000013,000011,000009:000018,000016,000017\n",
+ Arrays.asList("000015", "000013", "000011", "000009", "000018",
+ "000016", "000017", "000026", "000024", "000022", "000020"),
+ Arrays.asList("000015", "000013", "000011", "000009", "000026",
+ "000024", "000022", "000020")
+ ),
+ Arguments.of("Case 3: Multi-level compaction.",
+ "C 000015,000013,000011,000009:000018,000016,000017\n" +
+ "C 000018,000016,000017,000026,000024,000022,000020:000027," +
+ "000030,000028,000031,000029\n" +
+ "C 000027,000030,000028,000031,000029,000039,000037,000035," +
+ "000033:000040,000044,000042,000043,000046,000041,000045\n" +
+ "C 000040,000044,000042,000043,000046,000041,000045,000054," +
+ "000052,000050,000048:000059,000055,000056,000060,000057," +
+ "000058\n",
+ Arrays.asList("000015", "000013", "000011", "000009", "000018",
+ "000016", "000017", "000026", "000024", "000022", "000020",
+ "000027", "000030", "000028", "000031", "000029", "000039",
+ "000037", "000035", "000033", "000040", "000044", "000042",
+ "000043", "000046", "000041", "000045", "000054", "000052",
+ "000050", "000048", "000059", "000055", "000056", "000060",
+ "000057", "000058"),
+ Arrays.asList("000013", "000024", "000035", "000011", "000022",
+ "000033", "000039", "000015", "000026", "000037", "000048",
+ "000009", "000050", "000054", "000020", "000052")
+ )
+ );
+ }
+
+ /**
+ * End-to-end test for SST file pruning.
+ */
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("sstFilePruningScenarios")
+ public void testSstFilePruning(
+ String description,
+ String compactionLog,
+ List<String> initialFiles,
+ List<String> expectedFiles
+ ) throws IOException {
+
+ String sstBackUpDirName = "./test-compaction-sst-backup";
+ File sstBackUpDir = new File(sstBackUpDirName);
+ if (!sstBackUpDir.exists() && !sstBackUpDir.mkdirs()) {
+ fail("Error creating SST backup directory: " + sstBackUpDirName);
+ }
+
+ String compactionLogDirName = "./test-compaction-log";
+ File compactionLogDir = new File(compactionLogDirName);
+ if (!compactionLogDir.exists() && !compactionLogDir.mkdirs()) {
+ fail("Error creating compaction log directory: " + compactionLogDirName);
+ }
+
+ createFileWithContext(compactionLogDirName + "/compaction_log" +
+ COMPACTION_LOG_FILE_NAME_SUFFIX,
+ compactionLog);
+
+ for (String fileName : initialFiles) {
+ createFileWithContext(sstBackUpDir + "/" + fileName + SST_FILE_EXTENSION,
+ fileName);
+ }
+
+ RocksDBCheckpointDiffer differ =
+ new RocksDBCheckpointDiffer(sstBackUpDirName,
+ compactionLogDirName,
+ null,
+ MINUTES.toMillis(10));
+
+ differ.loadAllCompactionLogs();
+ differ.pruneSstFiles();
+
+ Set<String> actualFileSetAfterPruning;
+ try (Stream<Path> pathStream = Files.list(Paths.get(sstBackUpDirName))
+ .filter(e -> e.toString().toLowerCase()
+ .endsWith(SST_FILE_EXTENSION))
+ .sorted()) {
+ actualFileSetAfterPruning =
+ pathStream.map(path -> path.getFileName().toString())
+ .map(name -> name.substring(0,
+ name.length() - SST_FILE_EXTENSION.length()))
+ .collect(Collectors.toSet());
+ }
+
+ Set<String> expectedFileSet = new HashSet<>(expectedFiles);
+ assertEquals(expectedFileSet, actualFileSetAfterPruning);
+ deleteDirectory(compactionLogDir);
+ deleteDirectory(sstBackUpDir);
+ }
+
+ private void createFileWithContext(String fileName, String context)
+ throws IOException {
+ try (FileOutputStream fileOutputStream = new FileOutputStream(fileName)) {
+ fileOutputStream.write(context.getBytes(UTF_8));
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org