You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by si...@apache.org on 2023/02/03 01:51:16 UTC

[ozone] branch HDDS-6517-Snapshot updated: HDDS-7873. [Snapshot] Prune backup SST files that can be expanded periodically (#4235)

This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch HDDS-6517-Snapshot
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-6517-Snapshot by this push:
     new 506a0d42d1 HDDS-7873. [Snapshot] Prune backup SST files that can be expanded periodically (#4235)
506a0d42d1 is described below

commit 506a0d42d1b608767867ed3f8146785e9b0de5d0
Author: Hemant Kumar <he...@gmail.com>
AuthorDate: Thu Feb 2 17:51:11 2023 -0800

    HDDS-7873. [Snapshot] Prune backup SST files that can be expanded periodically (#4235)
---
 .../ozone/rocksdiff/RocksDBCheckpointDiffer.java   |  40 ++++++-
 .../rocksdiff/TestRocksDBCheckpointDiffer.java     | 130 ++++++++++++++++++++-
 2 files changed, 160 insertions(+), 10 deletions(-)

diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index bcdbc28bc4..47fef56405 100644
--- a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -72,8 +72,8 @@ import static java.util.Arrays.asList;
 //      rocksDB checkpoints.
 //      - This bootstrapping should also receive the compaction-DAG information
 //  9. Handle rebuilding the DAG for a lagging follower. There are two cases
-//      - recieve RATIS transactions to replay. Nothing needs to be done in
-//      thise case.
+//      - receive RATIS transactions to replay. Nothing needs to be done in
+//      these cases.
 //      - Getting the DB sync. This case needs to handle getting the
 //      compaction-DAG information as well.
 
@@ -145,7 +145,7 @@ public class RocksDBCheckpointDiffer implements AutoCloseable {
    * Used to trim the file extension when writing compaction entries to the log
    * to save space.
    */
-  private static final String SST_FILE_EXTENSION = ".sst";
+  static final String SST_FILE_EXTENSION = ".sst";
   private static final int SST_FILE_EXTENSION_LENGTH =
       SST_FILE_EXTENSION.length();
 
@@ -202,6 +202,13 @@ public class RocksDBCheckpointDiffer implements AutoCloseable {
           pruneCompactionDagDaemonRunIntervalInMs,
           pruneCompactionDagDaemonRunIntervalInMs,
           TimeUnit.MILLISECONDS);
+
+      this.executor.scheduleWithFixedDelay(
+          this::pruneSstFiles,
+          pruneCompactionDagDaemonRunIntervalInMs,
+          pruneCompactionDagDaemonRunIntervalInMs,
+          TimeUnit.MILLISECONDS
+      );
     }
   }
 
@@ -1058,7 +1065,8 @@ public class RocksDBCheckpointDiffer implements AutoCloseable {
    */
   private void removeSstFile(Set<String> sstFileNodes) {
     for (String sstFileNode: sstFileNodes) {
-      File file = new File(sstBackupDir + sstFileNode + SST_FILE_EXTENSION);
+      File file =
+          new File(sstBackupDir + "/" + sstFileNode + SST_FILE_EXTENSION);
       try {
         Files.deleteIfExists(file.toPath());
       } catch (IOException exception) {
@@ -1328,6 +1336,30 @@ public class RocksDBCheckpointDiffer implements AutoCloseable {
     }
   }
 
+  /**
+   * Defines the task that removes SST files from backup directory which are
+   * not needed to generate snapshot diff using compaction DAG to clean
+   * the disk space.
+   * We can’t simply delete input files in the compaction completed listener
+   * because it is not known which of input files are from previous compaction
+   * and which were created after the compaction.
+   * We can remove SST files which were created from the compaction because
+   * those are not needed to generate snapshot diff. These files are basically
+   * non-leaf nodes of the DAG.
+   */
+  public void pruneSstFiles() {
+    Set<String> nonLeafSstFiles;
+
+    synchronized (compactionListenerWriteLock) {
+      nonLeafSstFiles = forwardCompactionDAG.nodes().stream()
+          .filter(node -> !forwardCompactionDAG.successors(node).isEmpty())
+          .map(node -> node.getFileName())
+          .collect(Collectors.toSet());
+    }
+
+    removeSstFile(nonLeafSstFiles);
+  }
+
   @VisibleForTesting
   public boolean debugEnabled(Integer level) {
     return DEBUG_LEVEL.contains(level);
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
index af5aec638a..871b27446e 100644
--- a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
@@ -22,10 +22,12 @@ import static java.util.Arrays.asList;
 import static java.util.concurrent.TimeUnit.MINUTES;
 import com.google.common.graph.GraphBuilder;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -68,6 +70,7 @@ import org.slf4j.event.Level;
 import static org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.COMPACTION_LOG_FILE_NAME_SUFFIX;
 import static org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DEBUG_DAG_LIVE_NODES;
 import static org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DEBUG_READ_ALL_DB_KEYS;
+import static org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.SST_FILE_EXTENSION;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -456,10 +459,10 @@ public class TestRocksDBCheckpointDiffer {
     return rocksDB;
   }
 
-  static boolean deleteDirectory(java.io.File directoryToBeDeleted) {
+  private boolean deleteDirectory(File directoryToBeDeleted) {
     File[] allContents = directoryToBeDeleted.listFiles();
     if (allContents != null) {
-      for (java.io.File file : allContents) {
+      for (File file : allContents) {
         if (!deleteDirectory(file)) {
           return false;
         }
@@ -470,8 +473,10 @@ public class TestRocksDBCheckpointDiffer {
 
   // Read from a given RocksDB instance and optionally write all the
   // keys to a given file.
-  void readRocksDBInstance(String dbPathArg, RocksDB rocksDB, FileWriter file,
-      RocksDBCheckpointDiffer differ) {
+  private void readRocksDBInstance(String dbPathArg,
+                                   RocksDB rocksDB,
+                                   FileWriter file,
+                                   RocksDBCheckpointDiffer differ) {
 
     LOG.debug("Reading RocksDB: " + dbPathArg);
     boolean createdDB = false;
@@ -1080,11 +1085,20 @@ public class TestRocksDBCheckpointDiffer {
 
     differ.pruneOlderSnapshotsWithCompactionHistory();
 
-    Set<String> actualNodes = differ.getForwardCompactionDAG().nodes().stream()
+    Set<String> actualNodesInForwardDAG = differ.getForwardCompactionDAG()
+        .nodes()
+        .stream()
         .map(CompactionNode::getFileName)
         .collect(Collectors.toSet());
 
-    assertEquals(expectedNodes, actualNodes);
+    Set<String> actualNodesBackwardDAG = differ.getBackwardCompactionDAG()
+        .nodes()
+        .stream()
+        .map(CompactionNode::getFileName)
+        .collect(Collectors.toSet());
+
+    assertEquals(expectedNodes, actualNodesInForwardDAG);
+    assertEquals(expectedNodes, actualNodesBackwardDAG);
 
     for (int i = 0; i < expectedNumberOfLogFilesDeleted; i++) {
       File compactionFile = filesCreated.get(i);
@@ -1100,4 +1114,108 @@ public class TestRocksDBCheckpointDiffer {
     deleteDirectory(compactionLogDir);
     deleteDirectory(sstBackUpDir);
   }
+
+  private static Stream<Arguments> sstFilePruningScenarios() {
+    return Stream.of(
+        Arguments.of("Case 1: No compaction.",
+            "",
+            Arrays.asList("000015", "000013", "000011", "000009"),
+            Arrays.asList("000015", "000013", "000011", "000009")
+        ),
+        Arguments.of("Case 2: One level compaction.",
+            "C 000015,000013,000011,000009:000018,000016,000017\n",
+            Arrays.asList("000015", "000013", "000011", "000009", "000018",
+                "000016", "000017", "000026", "000024", "000022", "000020"),
+            Arrays.asList("000015", "000013", "000011", "000009", "000026",
+                "000024", "000022", "000020")
+        ),
+        Arguments.of("Case 3: Multi-level compaction.",
+            "C 000015,000013,000011,000009:000018,000016,000017\n" +
+                "C 000018,000016,000017,000026,000024,000022,000020:000027," +
+                "000030,000028,000031,000029\n" +
+                "C 000027,000030,000028,000031,000029,000039,000037,000035," +
+                "000033:000040,000044,000042,000043,000046,000041,000045\n" +
+                "C 000040,000044,000042,000043,000046,000041,000045,000054," +
+                "000052,000050,000048:000059,000055,000056,000060,000057," +
+                "000058\n",
+            Arrays.asList("000015", "000013", "000011", "000009", "000018",
+                "000016", "000017", "000026", "000024", "000022", "000020",
+                "000027", "000030", "000028", "000031", "000029", "000039",
+                "000037", "000035", "000033", "000040", "000044", "000042",
+                "000043", "000046", "000041", "000045", "000054", "000052",
+                "000050", "000048", "000059", "000055", "000056", "000060",
+                "000057", "000058"),
+            Arrays.asList("000013", "000024", "000035", "000011", "000022",
+                "000033", "000039", "000015", "000026", "000037", "000048",
+                "000009", "000050", "000054", "000020", "000052")
+        )
+    );
+  }
+
+  /**
+   * End-to-end test for SST file pruning.
+   */
+  @ParameterizedTest(name = "{0}")
+  @MethodSource("sstFilePruningScenarios")
+  public void testSstFilePruning(
+      String description,
+      String compactionLog,
+      List<String> initialFiles,
+      List<String> expectedFiles
+  ) throws IOException {
+
+    String sstBackUpDirName = "./test-compaction-sst-backup";
+    File sstBackUpDir = new File(sstBackUpDirName);
+    if (!sstBackUpDir.exists() && !sstBackUpDir.mkdirs()) {
+      fail("Error creating SST backup directory: " + sstBackUpDirName);
+    }
+
+    String compactionLogDirName = "./test-compaction-log";
+    File compactionLogDir = new File(compactionLogDirName);
+    if (!compactionLogDir.exists() && !compactionLogDir.mkdirs()) {
+      fail("Error creating compaction log directory: " + compactionLogDirName);
+    }
+
+    createFileWithContext(compactionLogDirName + "/compaction_log" +
+            COMPACTION_LOG_FILE_NAME_SUFFIX,
+        compactionLog);
+
+    for (String fileName : initialFiles) {
+      createFileWithContext(sstBackUpDir + "/" + fileName + SST_FILE_EXTENSION,
+          fileName);
+    }
+
+    RocksDBCheckpointDiffer differ =
+        new RocksDBCheckpointDiffer(sstBackUpDirName,
+            compactionLogDirName,
+            null,
+            MINUTES.toMillis(10));
+
+    differ.loadAllCompactionLogs();
+    differ.pruneSstFiles();
+
+    Set<String> actualFileSetAfterPruning;
+    try (Stream<Path> pathStream = Files.list(Paths.get(sstBackUpDirName))
+        .filter(e -> e.toString().toLowerCase()
+            .endsWith(SST_FILE_EXTENSION))
+        .sorted()) {
+      actualFileSetAfterPruning =
+          pathStream.map(path -> path.getFileName().toString())
+              .map(name -> name.substring(0,
+                  name.length() - SST_FILE_EXTENSION.length()))
+              .collect(Collectors.toSet());
+    }
+
+    Set<String> expectedFileSet = new HashSet<>(expectedFiles);
+    assertEquals(expectedFileSet, actualFileSetAfterPruning);
+    deleteDirectory(compactionLogDir);
+    deleteDirectory(sstBackUpDir);
+  }
+
+  private void createFileWithContext(String fileName, String context)
+      throws IOException {
+    try (FileOutputStream fileOutputStream = new FileOutputStream(fileName)) {
+      fileOutputStream.write(context.getBytes(UTF_8));
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org