You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by si...@apache.org on 2022/11/03 16:42:08 UTC

[ozone] branch HDDS-6517-Snapshot updated: HDDS-7281. [Snapshot] Handle RocksDB compaction DAG persistence and reconstruction (#3824)

This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch HDDS-6517-Snapshot
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-6517-Snapshot by this push:
     new e642ddeb55 HDDS-7281. [Snapshot] Handle RocksDB compaction DAG persistence and reconstruction (#3824)
e642ddeb55 is described below

commit e642ddeb55edd2bd2580a7ce6a1d30c0e081b1a3
Author: Siyao Meng <50...@users.noreply.github.com>
AuthorDate: Thu Nov 3 09:42:02 2022 -0700

    HDDS-7281. [Snapshot] Handle RocksDB compaction DAG persistence and reconstruction (#3824)
---
 .../hadoop/hdds/utils/db/DBStoreBuilder.java       |    9 +-
 .../org/apache/hadoop/hdds/utils/db/RDBStore.java  |   46 +-
 hadoop-hdds/rocksdb-checkpoint-differ/pom.xml      |   32 +-
 .../ozone/rocksdiff/RocksDBCheckpointDiffer.java   | 1340 +++++++++++---------
 .../rocksdiff/TestRocksDBCheckpointDiffer.java     |  371 ++++--
 .../src/test/resources/log4j.properties            |   21 +
 .../hadoop/ozone/om/helpers/SnapshotInfo.java      |   12 +
 .../hadoop/ozone/freon/TestOMSnapshotDAG.java      |  220 ++++
 .../src/main/proto/OmClientProtocol.proto          |   23 +-
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |    3 +-
 .../apache/hadoop/ozone/om/OmSnapshotManager.java  |   23 +-
 .../request/snapshot/OMSnapshotCreateRequest.java  |    8 +
 12 files changed, 1397 insertions(+), 711 deletions(-)

diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
index 5478e8658a..9f94e1d2c5 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
@@ -92,6 +92,7 @@ public final class DBStoreBuilder {
   private boolean openReadOnly = false;
   private int maxFSSnapshots = 0;
   private final DBProfile defaultCfProfile;
+  private boolean enableCompactionLog;
 
   /**
    * Create DBStoreBuilder from a generic DBDefinition.
@@ -190,7 +191,8 @@ public final class DBStoreBuilder {
       }
 
       return new RDBStore(dbFile, rocksDBOption, writeOptions, tableConfigs,
-          registry, openReadOnly, maxFSSnapshots, dbJmxBeanNameName);
+          registry, openReadOnly, maxFSSnapshots, dbJmxBeanNameName,
+          enableCompactionLog);
     } finally {
       tableConfigs.forEach(TableConfig::close);
     }
@@ -247,6 +249,11 @@ public final class DBStoreBuilder {
     return this;
   }
 
+  public DBStoreBuilder setEnableCompactionLog(boolean enableCompactionLog) {
+    this.enableCompactionLog = enableCompactionLog;
+    return this;
+  }
+
   /**
    * Set the {@link ManagedDBOptions} and default
    * {@link ManagedColumnFamilyOptions} based on {@code prof}.
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
index 9fabf5e9df..992627c4cf 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
@@ -64,19 +64,30 @@ public class RDBStore implements DBStore {
   private final RDBMetrics rdbMetrics;
   private final RocksDBCheckpointDiffer rocksDBCheckpointDiffer;
   private final String dbJmxBeanName;
+  /**
+   * Name of the SST file backup directory placed under metadata dir.
+   * Can be made configurable later.
+   */
+  private final String dbCompactionSSTBackupDirName = "compaction-sst-backup";
+  /**
+   * Name of the compaction log directory placed under metadata dir.
+   * Can be made configurable later.
+   */
+  private final String dbCompactionLogDirName = "compaction-log";
 
   @VisibleForTesting
   public RDBStore(File dbFile, ManagedDBOptions options,
                   Set<TableConfig> families) throws IOException {
     this(dbFile, options, new ManagedWriteOptions(), families,
-        new CodecRegistry(), false, 1000, null);
+        new CodecRegistry(), false, 1000, null, false);
   }
 
   @SuppressWarnings("parameternumber")
   public RDBStore(File dbFile, ManagedDBOptions dbOptions,
                   ManagedWriteOptions writeOptions, Set<TableConfig> families,
                   CodecRegistry registry, boolean readOnly, int maxFSSnapshots,
-                  String dbJmxBeanNameName) throws IOException {
+                  String dbJmxBeanNameName, boolean enableCompactionLog)
+      throws IOException {
     Preconditions.checkNotNull(dbFile, "DB file location cannot be null");
     Preconditions.checkNotNull(families);
     Preconditions.checkArgument(!families.isEmpty());
@@ -86,13 +97,14 @@ public class RDBStore implements DBStore {
         dbJmxBeanNameName;
 
     try {
-      rocksDBCheckpointDiffer =
-          new RocksDBCheckpointDiffer(
-              dbLocation.getAbsolutePath(), maxFSSnapshots,
-          Paths.get(dbLocation.getParent(), "db.checkpoints").toString(),
-          Paths.get(dbLocation.getParent(), "db.savedSSTFiles").toString(),
-          dbLocation.getAbsolutePath(), 0, "Snapshot_");
-      rocksDBCheckpointDiffer.setRocksDBForCompactionTracking(dbOptions);
+      if (enableCompactionLog) {
+        rocksDBCheckpointDiffer = new RocksDBCheckpointDiffer(
+            dbLocation.getParent(), dbCompactionSSTBackupDirName,
+            dbCompactionLogDirName, dbLocation);
+        rocksDBCheckpointDiffer.setRocksDBForCompactionTracking(dbOptions);
+      } else {
+        rocksDBCheckpointDiffer = null;
+      }
 
       db = RocksDatabase.open(dbFile, dbOptions, writeOptions,
           families, readOnly);
@@ -114,7 +126,7 @@ public class RDBStore implements DBStore {
 
       //create checkpoints directory if not exists.
       checkpointsParentDir =
-              Paths.get(dbLocation.getParent(), "db.checkpoints").toString();
+          Paths.get(dbLocation.getParent(), "db.checkpoints").toString();
       File checkpointsDir = new File(checkpointsParentDir);
       if (!checkpointsDir.exists()) {
         boolean success = checkpointsDir.mkdir();
@@ -138,6 +150,15 @@ public class RDBStore implements DBStore {
         }
       }
 
+      if (enableCompactionLog) {
+        // Finish the initialization of compaction DAG tracker by setting the
+        // sequence number as current compaction log filename.
+        rocksDBCheckpointDiffer.setCurrentCompactionLog(
+            db.getLatestSequenceNumber());
+        // Load all previous compaction logs
+        rocksDBCheckpointDiffer.loadAllCompactionLogs();
+      }
+
       //Initialize checkpoint manager
       checkPointManager = new RDBCheckpointManager(db, dbLocation.getName());
       rdbMetrics = RDBMetrics.create();
@@ -160,7 +181,10 @@ public class RDBStore implements DBStore {
     }
   }
 
-  @VisibleForTesting
+  public String getSnapshotsParentDir() {
+    return snapshotsParentDir;
+  }
+
   public RocksDBCheckpointDiffer getRocksDBCheckpointDiffer() {
     return rocksDBCheckpointDiffer;
   }
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
index 85a56f8cf6..568a6cc6f8 100644
--- a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
@@ -22,6 +22,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
     <artifactId>hdds</artifactId>
     <version>1.3.0-SNAPSHOT</version>
   </parent>
+
   <artifactId>rocksdb-checkpoint-differ</artifactId>
   <version>1.3.0-SNAPSHOT</version>
   <description>RocksDB Checkpoint Differ</description>
@@ -29,50 +30,56 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
   <packaging>jar</packaging>
 
   <dependencies>
-
     <dependency>
       <groupId>org.rocksdb</groupId>
       <artifactId>rocksdbjni</artifactId>
     </dependency>
-
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
-
     <dependency>
       <groupId>com.github.vlsi.mxgraph</groupId>
       <artifactId>jgraphx</artifactId>
       <version>4.2.2</version>
     </dependency>
-
     <dependency>
       <groupId>org.jgrapht</groupId>
       <artifactId>jgrapht-core</artifactId>
       <version>1.5.0</version>
     </dependency>
-
     <dependency>
       <groupId>org.jgrapht</groupId>
       <artifactId>jgrapht-guava</artifactId>
       <version>1.5.0</version>
     </dependency>
-
     <dependency>
       <groupId>org.jgrapht</groupId>
       <artifactId>jgrapht-ext</artifactId>
       <version>1.4.0</version>
     </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-    </dependency>
     <dependency>
       <groupId>com.github.spotbugs</groupId>
       <artifactId>spotbugs-annotations</artifactId>
       <scope>compile</scope>
     </dependency>
-
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-reload4j</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.ozone</groupId>
+      <artifactId>hdds-test-utils</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
@@ -145,6 +152,9 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
                   <allowedImports>
                     <allowedImport>org.rocksdb.AbstractEventListener</allowedImport>
                     <allowedImport>org.rocksdb.Checkpoint</allowedImport>
+                    <allowedImport>org.rocksdb.ColumnFamilyDescriptor</allowedImport>
+                    <allowedImport>org.rocksdb.ColumnFamilyHandle</allowedImport>
+                    <allowedImport>org.rocksdb.ColumnFamilyOptions</allowedImport>
                     <allowedImport>org.rocksdb.CompactionJobInfo</allowedImport>
                     <allowedImport>org.rocksdb.CompressionType</allowedImport>
                     <allowedImport>org.rocksdb.DBOptions</allowedImport>
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index 32e9e268c0..26884fb9fb 100644
--- a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -17,70 +17,48 @@
  */
 package org.apache.ozone.rocksdiff;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.graph.GraphBuilder;
+import com.google.common.graph.MutableGraph;
+import org.apache.commons.lang3.StringUtils;
+import org.rocksdb.AbstractEventListener;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionJobInfo;
+import org.rocksdb.DBOptions;
+import org.rocksdb.LiveFileMetaData;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.SstFileReader;
+import org.rocksdb.TableProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
 import java.io.File;
 import java.io.IOException;
+import java.io.Serializable;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
-import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
-
-import org.rocksdb.AbstractEventListener;
-import org.rocksdb.Checkpoint;
-import org.rocksdb.CompactionJobInfo;
-import org.rocksdb.CompressionType;
-import org.rocksdb.DBOptions;
-import org.rocksdb.FlushOptions;
-import org.rocksdb.LiveFileMetaData;
-import org.rocksdb.Options;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.SstFileReader;
-import org.rocksdb.TableProperties;
-
-import com.google.common.graph.GraphBuilder;
-import com.google.common.graph.MutableGraph;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Arrays.asList;
 
 // TODO
-//  1. Create a local instance of RocksDiff-local-RocksDB. This is the
-//  rocksDB that we can use for maintaining DAG and any other state. This is
-//  a per node state so it it doesn't have to go through RATIS anyway.
-//  2. Store fwd DAG in Diff-Local-RocksDB in Compaction Listener
-//  3. Store fwd DAG in Diff-Local-RocksDB in Compaction Listener
-//  4. Store last-Snapshot-counter/Compaction-generation-counter in Diff-Local
-//  -RocksDB in Compaction Listener
-//  5. System Restart handling. Read the DAG from Disk and load it in memory.
-//  6. Take the base snapshot. All the SST file nodes in the base snapshot
-//  should be arked with that Snapshot generation. Subsequently, all SST file
-//  node should have a snapshot-generation count and Compaction-generation
-//  count.
-//  6. DAG based SST file pruning. Start from the oldest snapshot and we can
-//  unlink any SST
-//  file from the SaveCompactedFilePath directory that is reachable in the
-//  reverse DAG.
-//  7. DAG pruning : For each snapshotted bucket, We can recycle the part of
-//  the DAG that is older than the predefined policy for the efficient snapdiff.
-//  E.g. we may decide not to support efficient snapdiff from any snapshot that
-//  is older than 2 weeks.
-//  Note on 8. & 9 .
-//  ==================
-//  A simple handling is to just iterate over all keys in keyspace when the
-//  compaction DAG is lost, instead of optimizing every case. And start
-//  Compaction-DAG afresh from the latest snapshot.
-//  --
 //  8. Handle bootstrapping rocksDB for a new OM follower node
 //      - new node will receive Active object store as well as all existing
 //      rocksDB checkpoints.
@@ -90,109 +68,204 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 //      thise case.
 //      - Getting the DB sync. This case needs to handle getting the
 //      compaction-DAG information as well.
-//
-//
+
 /**
- *  RocksDBCheckpointDiffer class.
+ * RocksDB checkpoint differ.
+ * <p>
+ * Implements Ozone Manager RocksDB compaction listener (compaction log
+ * persistence and SST file hard-linking), compaction DAG construction,
+ * and compaction DAG reconstruction upon OM restarts.
+ * <p>
+ * It is important to note that compaction log is per-DB instance. Since
+ * each OM DB instance might trigger compactions at different timings.
  */
-//CHECKSTYLE:OFF
-@SuppressWarnings({"NM_METHOD_NAMING_CONVENTION"})
 public class RocksDBCheckpointDiffer {
-  private final String rocksDbPath;
-  private String cpPath;
-  private String cfDBPath;
-  private String saveCompactedFilePath;
-  private int maxSnapshots;
+
   private static final Logger LOG =
       LoggerFactory.getLogger(RocksDBCheckpointDiffer.class);
 
-  // keeps track of all the snapshots created so far.
-  private int lastSnapshotCounter;
-  private String lastSnapshotPrefix;
-
-  // tracks number of compactions so far
-  private static final long UNKNOWN_COMPACTION_GEN = 0;
-  private long currentCompactionGen = 0;
-
-  // Something to track all the snapshots created so far.
-  private Snapshot[] allSnapshots;
-
-  public RocksDBCheckpointDiffer(String dbPath,
-                                 int maxSnapshots,
-                                 String checkpointPath,
-                                 String sstFileSaveDir,
-                                 String cfPath,
-                                 int initialSnapshotCounter,
-                                 String snapPrefix) {
-    this.maxSnapshots = maxSnapshots;
-    allSnapshots = new Snapshot[this.maxSnapshots];
-    cpPath = checkpointPath;
-
-    saveCompactedFilePath = sstFileSaveDir;
-    rocksDbPath = dbPath;
-    cfDBPath = cfPath;
-
-    // TODO: This module should be self sufficient in tracking the last
-    //  snapshotCounter and currentCompactionGen for a given dbPath. It needs
-    //  to be persisted.
-    lastSnapshotCounter = initialSnapshotCounter;
-    lastSnapshotPrefix = snapPrefix;
-    currentCompactionGen = lastSnapshotCounter;
-
-    // TODO: this should also independently persist every compaction e.g.
-    //  (input files) ->
-    //  {  (output files) + lastSnapshotCounter + currentCompactionGen }
-    //  mapping.
+  private final String sstBackupDir;
+  private final String activeDBLocationStr;
+
+  private String compactionLogDir = null;
+
+  /**
+   * Compaction log path for DB compaction history persistence.
+   * This is the source of truth for in-memory SST DAG reconstruction upon
+   * OM restarts.
+   * <p>
+   * Initialized to the latest sequence number on OM startup. The log also rolls
+   * over (gets appended to a new file) whenever an Ozone snapshot is taken.
+   */
+  private volatile String currentCompactionLogPath = null;
+
+  private static final String COMPACTION_LOG_FILENAME_SUFFIX = ".log";
+
+  /**
+   * Marks the beginning of a comment line in the compaction log.
+   */
+  private static final String COMPACTION_LOG_COMMENT_LINE_PREFIX = "# ";
+
+  /**
+   * Marks the beginning of a compaction log entry.
+   */
+  private static final String COMPACTION_LOG_ENTRY_LINE_PREFIX = "C ";
+
+  /**
+   * Prefix for the sequence number line when writing to compaction log
+   * right after taking an Ozone snapshot.
+   */
+  private static final String COMPACTION_LOG_SEQNUM_LINE_PREFIX = "S ";
+
+  /**
+   * SST file extension. Must be lower case.
+   * Used to trim the file extension when writing compaction entries to the log
+   * to save space.
+   */
+  private static final String SST_FILE_EXTENSION = ".sst";
+  private static final int SST_FILE_EXTENSION_LENGTH =
+      SST_FILE_EXTENSION.length();
+
+  private static final int LONG_MAX_STRLEN =
+      String.valueOf(Long.MAX_VALUE).length();
+
+  private long reconstructionSnapshotGeneration;
+
+  /**
+   * Dummy object that acts as a write lock in compaction listener.
+   */
+  private final Object compactionListenerWriteLock = new Object();
+
+  /**
+   * Constructor.
+   * Note that previous compaction logs are loaded by RDBStore after this
+   * object's initialization by calling loadAllCompactionLogs().
+   * @param metadataDir Ozone metadata directory.
+   * @param sstBackupDir Name of the SST backup dir under metadata dir.
+   * @param compactionLogDirName Name of the compaction log dir.
+   */
+  public RocksDBCheckpointDiffer(String metadataDir, String sstBackupDir,
+      String compactionLogDirName, File activeDBLocation) {
+
+    setCompactionLogDir(metadataDir, compactionLogDirName);
+
+    this.sstBackupDir = Paths.get(metadataDir, sstBackupDir) + "/";
+
+    // Create the directory if SST backup path does not already exist
+    File dir = new File(this.sstBackupDir);
+    if (!dir.exists() && !dir.mkdir()) {
+      final String errorMsg = "Failed to create SST file backup directory. "
+          + "Check if OM has write permission.";
+      LOG.error(errorMsg);
+      throw new RuntimeException(errorMsg);
+    }
+
+    // Active DB location is used in getSSTFileSummary
+    this.activeDBLocationStr = activeDBLocation.toString() + "/";
   }
 
-  // Node in the DAG to represent an SST file
-  private class CompactionNode {
-    public String fileName;   // Name of the SST file
-    public String snapshotId; // The last snapshot that was created before this
-    // node came into existance;
-    public long snapshotGeneration;
-    public long totalNumberOfKeys;
-    public long cumulativeKeysReverseTraversal;
-
-    CompactionNode (String f, String sid, long numKeys, long compactionGen) {
-      fileName = f;
-      snapshotId = sid;
-      snapshotGeneration = lastSnapshotCounter;
-      totalNumberOfKeys = numKeys;
-      cumulativeKeysReverseTraversal = 0;
+  private void setCompactionLogDir(String metadataDir,
+      String compactionLogDirName) {
+
+    final File parentDir = new File(metadataDir);
+    if (!parentDir.exists()) {
+      if (!parentDir.mkdir()) {
+        LOG.error("Error creating compaction log parent dir.");
+        return;
+      }
+    }
+
+    this.compactionLogDir =
+        Paths.get(metadataDir, compactionLogDirName).toString();
+    File clDir = new File(compactionLogDir);
+    if (!clDir.exists() && !clDir.mkdir()) {
+      LOG.error("Error creating compaction log dir.");
+      return;
     }
+
+    // Create a readme file explaining what the compaction log dir is for
+    final Path readmePath = Paths.get(compactionLogDir, "_README.txt");
+    final File readmeFile = new File(readmePath.toString());
+    if (!readmeFile.exists()) {
+      try (BufferedWriter bw = Files.newBufferedWriter(
+          readmePath, StandardOpenOption.CREATE)) {
+        bw.write("This directory holds Ozone Manager RocksDB compaction logs.\n"
+            + "DO NOT add, change or delete any files in this directory unless "
+            + "you know what you are doing.\n");
+      } catch (IOException ignored) {
+      }
+    }
+
+    // Append /
+    this.compactionLogDir += "/";
   }
 
-  private static class Snapshot {
-    String dbPath;
-    String snapshotID;
-    long snapshotGeneration;
+  /**
+   * Set the current compaction log filename with a given RDB sequence number.
+   * @param latestSequenceNum latest sequence number of RDB.
+   */
+  public void setCurrentCompactionLog(long latestSequenceNum) {
+    String latestSequenceIdStr = String.valueOf(latestSequenceNum);
+
+    if (latestSequenceIdStr.length() < LONG_MAX_STRLEN) {
+      // Pad zeroes to the left for ordered file listing when sorted
+      // alphabetically.
+      latestSequenceIdStr =
+          StringUtils.leftPad(latestSequenceIdStr, LONG_MAX_STRLEN, "0");
+    }
 
-    Snapshot(String db, String id, long gen) {
-      dbPath = db;
-      snapshotID = id;
-      snapshotGeneration = gen;
+    // Local temp variable for storing the new compaction log file path
+    final String newCompactionLog =
+        compactionLogDir + latestSequenceIdStr + COMPACTION_LOG_FILENAME_SUFFIX;
+
+    File clFile = new File(newCompactionLog);
+    if (clFile.exists()) {
+      LOG.warn("Compaction log exists: {}. Will append", newCompactionLog);
     }
+
+    this.currentCompactionLogPath =
+        compactionLogDir + latestSequenceIdStr + COMPACTION_LOG_FILENAME_SUFFIX;
+
+    // Create empty file if it doesn't exist
+    appendToCurrentCompactionLog("");
   }
 
-  public enum GType {FNAME, KEYSIZE, CUMUTATIVE_SIZE};
+  // Node in the DAG to represent an SST file
+  private static class CompactionNode {
+    // Name of the SST file
+    private final String fileName;
+    // The last snapshot created before this node came into existence
+    private final String snapshotId;
+    private final long snapshotGeneration;
+    private final long totalNumberOfKeys;
+    private long cumulativeKeysReverseTraversal;
+
+    CompactionNode(String file, String ssId, long numKeys, long seqNum) {
+      fileName = file;
+      // Retained for debuggability. Unused for now.
+      snapshotId = ssId;
+      totalNumberOfKeys = numKeys;
+      snapshotGeneration = seqNum;
+      cumulativeKeysReverseTraversal = 0L;
+    }
 
+    @Override
+    public String toString() {
+      return String.format("Node{%s}", fileName);
+    }
+  }
 
-  // Hash table to track Compaction node for a given SST File.
-  private ConcurrentHashMap<String, CompactionNode> compactionNodeTable =
+  // Hash table to track CompactionNode for a given SST File.
+  private final ConcurrentHashMap<String, CompactionNode> compactionNodeMap =
       new ConcurrentHashMap<>();
 
-  // We are mainiting a two way DAG. This allows easy traversal from
+  // We are maintaining a two way DAG. This allows easy traversal from
   // source snapshot to destination snapshot as well as the other direction.
-  // TODO : Persist this information to the disk.
-  // TODO: A system crash while the edge is inserted in DAGFwd but not in
-  //  DAGReverse will compromise the two way DAG. Set of input/output files shud
-  //  be written to // disk(RocksDB) first, would avoid this problem.
 
-  private MutableGraph<CompactionNode> compactionDAGFwd =
+  private final MutableGraph<CompactionNode> forwardCompactionDAG =
       GraphBuilder.directed().build();
 
-  private MutableGraph<CompactionNode> compactionDAGReverse =
+  private final MutableGraph<CompactionNode> backwardCompactionDAG =
       GraphBuilder.directed().build();
 
   public static final Integer DEBUG_DAG_BUILD_UP = 2;
@@ -215,390 +288,511 @@ public class RocksDBCheckpointDiffer {
     DEBUG_LEVEL.add(level);
   }
 
-  // Flushes the WAL and Creates a RocksDB checkpoint
-  @SuppressWarnings({"NM_METHOD_NAMING_CONVENTION"})
-  public void createCheckPoint(String dbPathArg, String cpPathArg,
-                               RocksDB rocksDB) {
-    LOG.warn("Creating Checkpoint for RocksDB instance : " +
-        dbPathArg + "in a CheckPoint Location" + cpPathArg);
-    try {
-      rocksDB.flush(new FlushOptions());
-      Checkpoint cp = Checkpoint.create(rocksDB);
-      cp.createCheckpoint(cpPathArg);
-    } catch (RocksDBException e) {
-      throw new RuntimeException(e.getMessage());
+  /**
+   * Append (then flush) to the current compaction log file path.
+   * Note: This does NOT automatically append newline to the log.
+   */
+  private synchronized void appendToCurrentCompactionLog(String content) {
+    if (currentCompactionLogPath == null) {
+      LOG.error("Unable to append compaction log. "
+          + "Compaction log path is not set. "
+          + "Please check initialization.");
+      throw new RuntimeException("Compaction log path not set");
+    }
+    try (BufferedWriter bw = Files.newBufferedWriter(
+        Paths.get(currentCompactionLogPath),
+        StandardOpenOption.CREATE, StandardOpenOption.APPEND)) {
+      bw.write(content);
+      bw.flush();
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to append compaction log to " +
+          currentCompactionLogPath, e);
     }
   }
 
-  public void setRocksDBForCompactionTracking(DBOptions rocksOptions)
-      throws RocksDBException {
-    setRocksDBForCompactionTracking(rocksOptions,
-        new ArrayList<AbstractEventListener>());
-  }
-
-  public void setRocksDBForCompactionTracking(
-      DBOptions rocksOptions, List<AbstractEventListener> list) {
-    final AbstractEventListener onCompactionCompletedListener =
-        new AbstractEventListener() {
-          @Override
-          @SuppressFBWarnings({
-              "AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
-              "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
-          public void onCompactionCompleted(
-              final RocksDB db, final CompactionJobInfo compactionJobInfo) {
-            synchronized (db) {
-              LOG.warn(compactionJobInfo.compactionReason().toString());
-              LOG.warn("List of input files:");
-              for (String file : compactionJobInfo.inputFiles()) {
-                LOG.warn(file);
-                String saveLinkFileName =
-                    saveCompactedFilePath + new File(file).getName();
-                Path link = Paths.get(saveLinkFileName);
-                Path srcFile = Paths.get(file);
-                try {
-                  Files.createLink(link, srcFile);
-                } catch (IOException e) {
-                  LOG.warn("Exception in creating hardlink");
-                  e.printStackTrace();
-                }
-              }
-              LOG.warn("List of output files:");
-              for (String file : compactionJobInfo.outputFiles()) {
-                LOG.warn(file + ",");
-              }
-              // Let us also build the graph
-              for (String outFilePath : compactionJobInfo.outputFiles()) {
-                String outfile =
-                    Paths.get(outFilePath).getFileName().toString();
-                CompactionNode outfileNode = compactionNodeTable.get(outfile);
-                if (outfileNode == null) {
-                  long numKeys = 0;
-                  try {
-                    numKeys = getSSTFileSummary(outfile);
-                  } catch (Exception e) {
-                    LOG.warn(e.getMessage());
-                  }
-                  outfileNode = new CompactionNode(outfile,
-                      lastSnapshotPrefix, numKeys,
-                      currentCompactionGen);
-                  compactionDAGFwd.addNode(outfileNode);
-                  compactionDAGReverse.addNode(outfileNode);
-                  compactionNodeTable.put(outfile, outfileNode);
-                }
-                for (String inFilePath : compactionJobInfo.inputFiles()) {
-                  String infile =
-                      Paths.get(inFilePath).getFileName().toString();
-                  CompactionNode infileNode = compactionNodeTable.get(infile);
-                  if (infileNode == null) {
-                    long numKeys = 0;
-                    try {
-                      numKeys = getSSTFileSummary(infile);
-                    } catch (Exception e) {
-                      LOG.warn(e.getMessage());
-                    }
-                    infileNode = new CompactionNode(infile,
-                        lastSnapshotPrefix,
-                        numKeys, UNKNOWN_COMPACTION_GEN);
-                    compactionDAGFwd.addNode(infileNode);
-                    compactionDAGReverse.addNode(infileNode);
-                    compactionNodeTable.put(infile, infileNode);
-                  }
-                  if (outfileNode.fileName.compareToIgnoreCase(
-                      infileNode.fileName) != 0) {
-                    compactionDAGFwd.putEdge(outfileNode, infileNode);
-                    compactionDAGReverse.putEdge(infileNode, outfileNode);
-                  }
-                }
-              }
-              if (debugEnabled(DEBUG_DAG_BUILD_UP)) {
-                printMutableGraph(null, null, compactionDAGFwd);
-              }
-            }
-          }
-        };
+  /**
+   * Append a sequence number to the compaction log (roughly) when an Ozone
+   * snapshot (RDB checkpoint) is taken.
+   * @param sequenceNum RDB sequence number
+   */
+  public void appendSequenceNumberToCompactionLog(long sequenceNum) {
+    final String line = COMPACTION_LOG_SEQNUM_LINE_PREFIX + sequenceNum + "\n";
+    appendToCurrentCompactionLog(line);
+  }
 
-    list.add(onCompactionCompletedListener);
+  /**
+   * Takes {@link org.rocksdb.Options}.
+   */
+  public void setRocksDBForCompactionTracking(Options rocksOptions,
+      List<AbstractEventListener> list) {
+    list.add(newCompactionBeginListener());
+    list.add(newCompactionCompletedListener());
     rocksOptions.setListeners(list);
   }
 
+  public void setRocksDBForCompactionTracking(Options rocksOptions) {
+    setRocksDBForCompactionTracking(rocksOptions, new ArrayList<>());
+  }
 
+  /**
+   * Takes {@link org.rocksdb.DBOptions}.
+   */
+  public void setRocksDBForCompactionTracking(DBOptions rocksOptions,
+      List<AbstractEventListener> list) {
+    list.add(newCompactionBeginListener());
+    list.add(newCompactionCompletedListener());
+    rocksOptions.setListeners(list);
+  }
 
-  public void setRocksDBForCompactionTracking(Options rocksOptions)
+  public void setRocksDBForCompactionTracking(DBOptions rocksOptions)
       throws RocksDBException {
-    setRocksDBForCompactionTracking(rocksOptions,
-        new ArrayList<AbstractEventListener>());
-  }
-
-  public void setRocksDBForCompactionTracking(
-      Options rocksOptions, List<AbstractEventListener> list) {
-    final AbstractEventListener onCompactionCompletedListener =
-        new AbstractEventListener() {
-          @Override
-          @SuppressFBWarnings({
-              "AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
-              "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
-          public void onCompactionCompleted(
-              final RocksDB db,final CompactionJobInfo compactionJobInfo) {
-            synchronized (db) {
-              LOG.warn(compactionJobInfo.compactionReason().toString());
-              LOG.warn("List of input files:");
-              for (String file : compactionJobInfo.inputFiles()) {
-                LOG.warn(file);
-                String saveLinkFileName =
-                    saveCompactedFilePath + new File(file).getName();
-                Path link = Paths.get(saveLinkFileName);
-                Path srcFile = Paths.get(file);
-                try {
-                  Files.createLink(link, srcFile);
-                } catch (IOException e) {
-                  LOG.warn("Exception in creating hardlink");
-                  e.printStackTrace();
-                }
-              }
-              LOG.warn("List of output files:");
-              for (String file : compactionJobInfo.outputFiles()) {
-                LOG.warn(file);
-              }
-              // Let us also build the graph
-              for (String outFilePath : compactionJobInfo.outputFiles()) {
-                String outfile =
-                    Paths.get(outFilePath).getFileName().toString();
-                CompactionNode outfileNode = compactionNodeTable.get(outfile);
-                if (outfileNode == null) {
-                  long numKeys = 0;
-                  try {
-                    numKeys = getSSTFileSummary(outfile);
-                  } catch (Exception e) {
-                    LOG.warn(e.getMessage());
-                  }
-                  outfileNode = new CompactionNode(outfile,
-                      lastSnapshotPrefix,
-                      numKeys, currentCompactionGen);
-                  compactionDAGFwd.addNode(outfileNode);
-                  compactionDAGReverse.addNode(outfileNode);
-                  compactionNodeTable.put(outfile, outfileNode);
-                }
-                for (String inFilePath : compactionJobInfo.inputFiles()) {
-                  String infile =
-                      Paths.get(inFilePath).getFileName().toString();
-                  CompactionNode infileNode = compactionNodeTable.get(infile);
-                  if (infileNode == null) {
-                    long numKeys = 0;
-                    try {
-                      numKeys = getSSTFileSummary(infile);
-                    } catch (Exception e) {
-                      LOG.warn(e.getMessage());
-                    }
-                    infileNode = new CompactionNode(infile,
-                        lastSnapshotPrefix, numKeys,
-                        UNKNOWN_COMPACTION_GEN);
-                    compactionDAGFwd.addNode(infileNode);
-                    compactionDAGReverse.addNode(infileNode);
-                    compactionNodeTable.put(infile, infileNode);
-                  }
-                  if (outfileNode.fileName.compareToIgnoreCase(
-                      infileNode.fileName) != 0) {
-                    compactionDAGFwd.putEdge(outfileNode, infileNode);
-                    compactionDAGReverse.putEdge(infileNode, outfileNode);
-                  }
-                }
-              }
-              if (debugEnabled(DEBUG_DAG_BUILD_UP)) {
-                printMutableGraph(null, null,
-                    compactionDAGFwd);
-              }
+    setRocksDBForCompactionTracking(rocksOptions, new ArrayList<>());
+  }
+
+  private AbstractEventListener newCompactionBeginListener() {
+    return new AbstractEventListener() {
+      @Override
+      public void onCompactionBegin(RocksDB db,
+          CompactionJobInfo compactionJobInfo) {
+
+        synchronized (compactionListenerWriteLock) {
+
+          if (compactionJobInfo.inputFiles().size() == 0) {
+            LOG.error("Compaction input files list is empty");
+            return;
+          }
+
+          // Create hardlink backups for the SST files that are going
+          // to be deleted after this RDB compaction.
+          for (String file : compactionJobInfo.inputFiles()) {
+            LOG.debug("Creating hard link for '{}'", file);
+            String saveLinkFileName =
+                sstBackupDir + new File(file).getName();
+            Path link = Paths.get(saveLinkFileName);
+            Path srcFile = Paths.get(file);
+            try {
+              Files.createLink(link, srcFile);
+            } catch (IOException e) {
+              LOG.error("Exception in creating hard link for {}", file);
+              throw new RuntimeException("Failed to create hard link", e);
             }
           }
-        };
 
-    list.add(onCompactionCompletedListener);
-    rocksOptions.setListeners(list);
+        }
+      }
+    };
   }
 
-  public RocksDB getRocksDBInstanceWithCompactionTracking(String dbPath)
-      throws RocksDBException {
-    final Options opt = new Options().setCreateIfMissing(true)
-        .setCompressionType(CompressionType.NO_COMPRESSION);
-    opt.setMaxBytesForLevelMultiplier(2);
-    setRocksDBForCompactionTracking(opt);
-    return RocksDB.open(opt, dbPath);
+  private AbstractEventListener newCompactionCompletedListener() {
+    return new AbstractEventListener() {
+      @Override
+      public void onCompactionCompleted(RocksDB db,
+          CompactionJobInfo compactionJobInfo) {
+
+        synchronized (compactionListenerWriteLock) {
+
+          if (compactionJobInfo.inputFiles().isEmpty()) {
+            LOG.error("Compaction input files list is empty");
+            return;
+          }
+
+          final StringBuilder sb = new StringBuilder();
+
+          if (LOG.isDebugEnabled()) {
+            // Print compaction reason for this entry in the log file
+            // e.g. kLevelL0FilesNum / kLevelMaxLevelSize.
+            sb.append(COMPACTION_LOG_COMMENT_LINE_PREFIX)
+                .append(compactionJobInfo.compactionReason())
+                .append('\n');
+          }
+
+          // Mark the beginning of a compaction log
+          sb.append(COMPACTION_LOG_ENTRY_LINE_PREFIX);
+
+          // Trim DB path, only keep the SST file name
+          final int filenameOffset =
+              compactionJobInfo.inputFiles().get(0).lastIndexOf("/") + 1;
+
+          // Append the list of input files
+          final List<String> inputFiles = compactionJobInfo.inputFiles();
+          // Trim the file path, leave only the SST file name without extension
+          inputFiles.replaceAll(s -> s.substring(
+              filenameOffset, s.length() - SST_FILE_EXTENSION_LENGTH));
+          final String inputFilesJoined = String.join(",", inputFiles);
+          sb.append(inputFilesJoined);
+
+          // Insert delimiter between input files an output files
+          sb.append(':');
+
+          // Append the list of output files
+          final List<String> outputFiles = compactionJobInfo.outputFiles();
+          outputFiles.replaceAll(s -> s.substring(
+              filenameOffset, s.length() - SST_FILE_EXTENSION_LENGTH));
+          final String outputFilesJoined = String.join(",", outputFiles);
+          sb.append(outputFilesJoined);
+
+          // End of line
+          sb.append('\n');
+
+          // Write input and output file names to compaction log
+          appendToCurrentCompactionLog(sb.toString());
+
+          // Populate the DAG
+          populateCompactionDAG(inputFiles, outputFiles,
+              db.getLatestSequenceNumber());
+/*
+          if (debugEnabled(DEBUG_DAG_BUILD_UP)) {
+            printMutableGraph(null, null, compactionDAGFwd);
+          }
+ */
+        }
+      }
+    };
   }
 
-  // Get a summary of a given SST file
-  public long getSSTFileSummary(String filename)
-      throws RocksDBException {
+  /**
+   * Get number of keys in an SST file.
+   * @param filename SST filename
+   * @return number of keys
+   */
+  private long getSSTFileSummary(String filename) throws RocksDBException {
+
+    if (!filename.endsWith(SST_FILE_EXTENSION)) {
+      filename += SST_FILE_EXTENSION;
+    }
+
     Options option = new Options();
     SstFileReader reader = new SstFileReader(option);
-    try {
-      reader.open(saveCompactedFilePath + filename);
-    } catch (RocksDBException e) {
-      reader.open(rocksDbPath + "/"+ filename);
+
+    File sstFile = new File(sstBackupDir + filename);
+    File sstFileInActiveDB = new File(activeDBLocationStr + filename);
+    if (sstFile.exists()) {
+      reader.open(sstBackupDir + filename);
+    } else if (sstFileInActiveDB.exists()) {
+      reader.open(activeDBLocationStr + filename);
+    } else {
+      throw new RuntimeException("Can't find SST file: " + filename);
     }
+
     TableProperties properties = reader.getTableProperties();
-    LOG.warn("getSSTFileSummary " + filename + ":: " +
-        properties.getNumEntries());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("{} has {} keys", filename, properties.getNumEntries());
+    }
     return properties.getNumEntries();
   }
 
-  // Read the current Live manifest for a given RocksDB instance (Active or
-  // Checkpoint). Returns the list of currently active SST FileNames.
-  @SuppressFBWarnings({"NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
+  /**
+   * Helper method to trim the filename retrieved from LiveFileMetaData.
+   */
+  private String trimSSTFilename(String filename) {
+    if (!filename.startsWith("/")) {
+      final String errorMsg = String.format(
+          "Invalid start of filename: '%s'. Expected '/'", filename);
+      LOG.error(errorMsg);
+      throw new RuntimeException(errorMsg);
+    }
+    if (!filename.endsWith(SST_FILE_EXTENSION)) {
+      final String errorMsg = String.format(
+          "Invalid extension of file: '%s'. Expected '%s'",
+          filename, SST_FILE_EXTENSION_LENGTH);
+      LOG.error(errorMsg);
+      throw new RuntimeException(errorMsg);
+    }
+    return filename.substring("/".length(),
+        filename.length() - SST_FILE_EXTENSION_LENGTH);
+  }
+
+  /**
+   * Get a list of relevant column family descriptors.
+   * @param cfOpts ColumnFamilyOptions
+   * @return List of ColumnFamilyDescriptor
+   */
+  @VisibleForTesting
+  static List<ColumnFamilyDescriptor> getCFDescriptorList(
+      ColumnFamilyOptions cfOpts) {
+    return asList(
+        new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOpts),
+        new ColumnFamilyDescriptor("keyTable".getBytes(UTF_8), cfOpts),
+        new ColumnFamilyDescriptor("directoryTable".getBytes(UTF_8), cfOpts),
+        new ColumnFamilyDescriptor("fileTable".getBytes(UTF_8), cfOpts)
+    );
+  }
+
+  /**
+   * Read the current Live manifest for a given RocksDB instance (Active or
+   * Checkpoint).
+   * @param dbPathArg path to a RocksDB directory
+   * @return a list of SST files (without extension) in the DB.
+   */
   public HashSet<String> readRocksDBLiveFiles(String dbPathArg) {
     RocksDB rocksDB = null;
     HashSet<String> liveFiles = new HashSet<>();
-    //
-    try (final Options options =
-             new Options().setParanoidChecks(true)
-                 .setCreateIfMissing(true)
-                 .setCompressionType(CompressionType.NO_COMPRESSION)
-                 .setForceConsistencyChecks(false)) {
-      rocksDB = RocksDB.openReadOnly(options, dbPathArg);
+
+    final ColumnFamilyOptions cfOpts = new ColumnFamilyOptions();
+    final List<ColumnFamilyDescriptor> cfDescriptors =
+        getCFDescriptorList(cfOpts);
+    final List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
+
+    try (DBOptions dbOptions = new DBOptions()
+        .setParanoidChecks(true)) {
+
+      rocksDB = RocksDB.openReadOnly(dbOptions, dbPathArg,
+          cfDescriptors, columnFamilyHandles);
       List<LiveFileMetaData> liveFileMetaDataList =
           rocksDB.getLiveFilesMetaData();
-      LOG.warn("Live File Metadata for DB: " + dbPathArg);
+      LOG.debug("SST File Metadata for DB: " + dbPathArg);
       for (LiveFileMetaData m : liveFileMetaDataList) {
-        LOG.warn("\tFile :" + m.fileName());
-        LOG.warn("\tLevel :" + m.level());
-        liveFiles.add(Paths.get(m.fileName()).getFileName().toString());
+        LOG.debug("File: {}, Level: {}", m.fileName(), m.level());
+        final String trimmedFilename = trimSSTFilename(m.fileName());
+        liveFiles.add(trimmedFilename);
       }
     } catch (RocksDBException e) {
+      LOG.error("Error during RocksDB operation: {}", e.getMessage());
       e.printStackTrace();
     } finally {
       if (rocksDB != null) {
         rocksDB.close();
       }
+      cfOpts.close();
     }
     return liveFiles;
   }
 
-  // Given the src and destination Snapshots, it prints a Diff list.
-  private synchronized void printSnapdiffSSTFiles(
-      Snapshot src, Snapshot dest) throws RocksDBException {
-    LOG.warn("Src Snapshot files :" + src.dbPath);
+  /**
+   * Process each line of compaction log text file input and populate the DAG.
+   */
+  private synchronized void processCompactionLogLine(String line) {
+
+    LOG.debug("Processing line: {}", line);
+
+    if (line.startsWith("#")) {
+      // Skip comments
+      LOG.debug("Comment line, skipped");
+    } else if (line.startsWith(COMPACTION_LOG_SEQNUM_LINE_PREFIX)) {
+      // Read sequence number
+      LOG.debug("Reading sequence number as snapshot generation");
+      final String seqNumStr =
+          line.substring(COMPACTION_LOG_SEQNUM_LINE_PREFIX.length()).trim();
+      // This would the snapshot generation for the nodes to come
+      reconstructionSnapshotGeneration = Long.parseLong(seqNumStr);
+    } else if (line.startsWith(COMPACTION_LOG_ENTRY_LINE_PREFIX)) {
+      // Read compaction log entry
+
+      // Trim the beginning
+      line = line.substring(COMPACTION_LOG_SEQNUM_LINE_PREFIX.length());
+      final String[] io = line.split(":");
+      if (io.length != 2) {
+        LOG.error("Invalid line in compaction log: {}", line);
+        return;
+      }
+      final String[] inputFiles = io[0].split(",");
+      final String[] outputFiles = io[1].split(",");
+      populateCompactionDAG(asList(inputFiles),
+          asList(outputFiles), reconstructionSnapshotGeneration);
+    } else {
+      LOG.error("Invalid line in compaction log: {}", line);
+    }
+  }
+
+  /**
+   * Helper to read compaction log to the internal DAG.
+   */
+  private void readCompactionLogToDAG(String currCompactionLogPath) {
+    LOG.debug("Loading compaction log: {}", currCompactionLogPath);
+    try (Stream<String> logLineStream =
+        Files.lines(Paths.get(currCompactionLogPath), UTF_8)) {
+      logLineStream.forEach(this::processCompactionLogLine);
+    } catch (IOException ioEx) {
+      throw new RuntimeException(ioEx);
+    }
+  }
+
+  /**
+   * Load existing compaction log files to the in-memory DAG.
+   * This only needs to be done once during OM startup.
+   */
+  public synchronized void loadAllCompactionLogs() {
+    if (compactionLogDir == null) {
+      throw new RuntimeException("Compaction log directory must be set first");
+    }
+    reconstructionSnapshotGeneration = 0L;
+    try {
+      try (Stream<Path> pathStream = Files.list(Paths.get(compactionLogDir))
+          .filter(e -> e.toString().toLowerCase()
+              .endsWith(COMPACTION_LOG_FILENAME_SUFFIX))
+          .sorted()) {
+        for (Path logPath : pathStream.collect(Collectors.toList())) {
+          readCompactionLogToDAG(logPath.toString());
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Error listing compaction log dir " +
+          compactionLogDir, e);
+    }
+  }
+
+  /**
+   * Snapshot information node class for the differ.
+   */
+  public static class DifferSnapshotInfo {
+    private final String dbPath;
+    private final String snapshotID;
+    private final long snapshotGeneration;
+
+    public DifferSnapshotInfo(String db, String id, long gen) {
+      dbPath = db;
+      snapshotID = id;
+      snapshotGeneration = gen;
+    }
+
+    public String getDbPath() {
+      return dbPath;
+    }
+
+    public String getSnapshotID() {
+      return snapshotID;
+    }
+
+    public long getSnapshotGeneration() {
+      return snapshotGeneration;
+    }
+
+    @Override
+    public String toString() {
+      return "DifferSnapshotInfo{" + "dbPath='" + dbPath + '\''
+          + ", snapshotID='" + snapshotID + '\'' + ", snapshotGeneration="
+          + snapshotGeneration + '}';
+    }
+  }
+
+  /**
+   * Get a list of SST files that differs between src and destination snapshots.
+   * <p>
+   * Expected input: src is a snapshot taken AFTER the dest.
+   *
+   * @param src source snapshot
+   * @param dest destination snapshot
+   */
+  public synchronized List<String> getSSTDiffList(
+      DifferSnapshotInfo src, DifferSnapshotInfo dest) {
+
     HashSet<String> srcSnapFiles = readRocksDBLiveFiles(src.dbPath);
-    LOG.warn("dest Snapshot files :" + dest.dbPath);
     HashSet<String> destSnapFiles = readRocksDBLiveFiles(dest.dbPath);
 
     HashSet<String> fwdDAGSameFiles = new HashSet<>();
     HashSet<String> fwdDAGDifferentFiles = new HashSet<>();
 
-    LOG.warn("Doing forward diff between source and destination " +
-        "Snapshots:" + src.dbPath + ", " + dest.dbPath);
-    realPrintSnapdiffSSTFiles(src, dest, srcSnapFiles, destSnapFiles,
-        compactionDAGFwd,
-        fwdDAGSameFiles,
-        fwdDAGDifferentFiles);
-
-    LOG.warn("Overall Summary \n" +
-            "Doing Overall diff between source and destination Snapshots:" +
-        src.dbPath + ", " + dest.dbPath);
-    System.out.print("fwd DAG Same files :");
-    for (String file : fwdDAGSameFiles) {
-      System.out.print(file + ", ");
-    }
-    LOG.warn("");
-    System.out.print("\nFwd DAG Different files :");
-    for (String file : fwdDAGDifferentFiles) {
-      CompactionNode n = compactionNodeTable.get(file);
-      System.out.print(file + ", ");
-    }
-    LOG.warn("");
-  }
-
-  @SuppressFBWarnings({"NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
-  public synchronized void realPrintSnapdiffSSTFiles(
-      Snapshot src, Snapshot dest,
-      HashSet<String> srcSnapFiles,
-      HashSet<String> destSnapFiles,
+    LOG.debug("Doing forward diff from src '{}' to dest '{}'",
+        src.dbPath, dest.dbPath);
+    internalGetSSTDiffList(src, dest, srcSnapFiles, destSnapFiles,
+        forwardCompactionDAG, fwdDAGSameFiles, fwdDAGDifferentFiles);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Result of diff from src '" + src.dbPath + "' to dest '" +
+          dest.dbPath + "':");
+      StringBuilder logSB = new StringBuilder();
+
+      logSB.append("Fwd DAG same SST files:      ");
+      for (String file : fwdDAGSameFiles) {
+        logSB.append(file).append(" ");
+      }
+      LOG.debug(logSB.toString());
+
+      logSB.setLength(0);
+      logSB.append("Fwd DAG different SST files: ");
+      for (String file : fwdDAGDifferentFiles) {
+        logSB.append(file).append(" ");
+      }
+      LOG.debug("{}", logSB);
+    }
+
+    return new ArrayList<>(fwdDAGDifferentFiles);
+  }
+
+  /**
+   * Core getSSTDiffList logic.
+   */
+  private void internalGetSSTDiffList(
+      DifferSnapshotInfo src, DifferSnapshotInfo dest,
+      HashSet<String> srcSnapFiles, HashSet<String> destSnapFiles,
       MutableGraph<CompactionNode> mutableGraph,
       HashSet<String> sameFiles, HashSet<String> differentFiles) {
 
-
     for (String fileName : srcSnapFiles) {
       if (destSnapFiles.contains(fileName)) {
-        LOG.warn("SrcSnapshot : " + src.dbPath + " and Dest " +
-            "Snapshot" + dest.dbPath + " Contain Same file " + fileName);
+        LOG.debug("Source '{}' and destination '{}' share the same SST '{}'",
+            src.dbPath, dest.dbPath, fileName);
         sameFiles.add(fileName);
         continue;
       }
-      CompactionNode infileNode =
-          compactionNodeTable.get(Paths.get(fileName).getFileName().toString());
+
+      CompactionNode infileNode = compactionNodeMap.get(fileName);
       if (infileNode == null) {
-        LOG.warn("SrcSnapshot : " + src.dbPath + "File " + fileName + "was " +
-            "never compacted");
+        LOG.debug("Source '{}' SST file '{}' is never compacted",
+            src.dbPath, fileName);
         differentFiles.add(fileName);
         continue;
       }
-      System.out.print(" Expandin File:" + fileName + ":\n");
-      Set<CompactionNode> nextLevel = new HashSet<>();
-      nextLevel.add(infileNode);
+
+      LOG.debug("Expanding SST file: {}", fileName);
       Set<CompactionNode> currentLevel = new HashSet<>();
-      currentLevel.addAll(nextLevel);
-      nextLevel = new HashSet<>();
-      int i = 1;
-      while (currentLevel.size() != 0) {
-        LOG.warn("DAG Level :" + i++);
+      currentLevel.add(infileNode);
+      // Traversal level/depth indicator for debug print
+      int level = 1;
+      while (!currentLevel.isEmpty()) {
+        LOG.debug("BFS level: {}. Current level has {} nodes.",
+            level++, currentLevel.size());
+
+        final Set<CompactionNode> nextLevel = new HashSet<>();
         for (CompactionNode current : currentLevel) {
-          LOG.warn("acknowledging file " + current.fileName);
+          LOG.debug("Processing node: {}", current.fileName);
           if (current.snapshotGeneration <= dest.snapshotGeneration) {
-            LOG.warn("Reached dest generation count. SrcSnapshot : " +
-                src.dbPath + " and Dest " + "Snapshot" + dest.dbPath +
-                " Contain Diffrent file " + current.fileName);
+            LOG.debug("Current node's snapshot generation '{}' "
+                    + "reached destination snapshot's '{}'. "
+                    + "Src '{}' and dest '{}' have different SST file: '{}'",
+                current.snapshotGeneration, dest.snapshotGeneration,
+                src.dbPath, dest.dbPath, current.fileName);
             differentFiles.add(current.fileName);
             continue;
           }
+
           Set<CompactionNode> successors = mutableGraph.successors(current);
-          if (successors == null || successors.size() == 0) {
-            LOG.warn("No further compaction for the file" +
-                ".SrcSnapshot : " + src.dbPath + " and Dest " +
-                "Snapshot" + dest.dbPath + " Contain Diffrent file " +
-                current.fileName);
+          if (successors.isEmpty()) {
+            LOG.debug("No further compaction happened to the current file. " +
+                "Src '{}' and dest '{}' have different file: {}",
+                src.dbPath, dest.dbPath, current.fileName);
             differentFiles.add(current.fileName);
-          } else {
-            for (CompactionNode oneSucc : successors) {
-              if (sameFiles.contains(oneSucc.fileName) ||
-                  differentFiles.contains(oneSucc.fileName)) {
-                LOG.warn("Skipping file :" + oneSucc.fileName);
-                continue;
-              }
-              if (destSnapFiles.contains(oneSucc.fileName)) {
-                LOG.warn("SrcSnapshot : " + src.dbPath + " and Dest " +
-                    "Snapshot" + dest.dbPath + " Contain Same file " +
-                    oneSucc.fileName);
-                sameFiles.add(oneSucc.fileName);
-                continue;
-              } else {
-                LOG.warn("SrcSnapshot : " + src.dbPath + " and Dest " +
-                    "Snapshot" + dest.dbPath + " Contain Diffrent file " +
-                    oneSucc.fileName);
-                nextLevel.add(oneSucc);
-              }
+            continue;
+          }
+
+          for (CompactionNode node : successors) {
+            if (sameFiles.contains(node.fileName) ||
+                differentFiles.contains(node.fileName)) {
+              LOG.debug("Skipping known processed SST: {}", node.fileName);
+              continue;
+            }
+
+            if (destSnapFiles.contains(node.fileName)) {
+              LOG.debug("Src '{}' and dest '{}' have the same SST: {}",
+                  src.dbPath, dest.dbPath, node.fileName);
+              sameFiles.add(node.fileName);
+              continue;
             }
+
+            // Queue different SST to the next level
+            LOG.debug("Src '{}' and dest '{}' have a different SST: {}",
+                src.dbPath, dest.dbPath, node.fileName);
+            nextLevel.add(node);
           }
         }
-        currentLevel = new HashSet<>();
-        currentLevel.addAll(nextLevel);
-        nextLevel = new HashSet<>();
-        LOG.warn("");
+        currentLevel = nextLevel;
       }
     }
-    LOG.warn("Summary :");
-    for (String file : sameFiles) {
-      System.out.print("Same File : " + file);
-    }
-    LOG.warn("");
-
-    for (String file : differentFiles) {
-      System.out.print("Different File : " + file);
-    }
-    LOG.warn("");
   }
 
-  @SuppressFBWarnings("SIC_INNER_SHOULD_BE_STATIC")
-  class NodeComparator implements Comparator<CompactionNode>
-  {
-    public int compare(CompactionNode a, CompactionNode b)
-    {
+  static class NodeComparator
+      implements Comparator<CompactionNode>, Serializable {
+    public int compare(CompactionNode a, CompactionNode b) {
       return a.fileName.compareToIgnoreCase(b.fileName);
     }
 
@@ -608,210 +802,204 @@ public class RocksDBCheckpointDiffer {
     }
   }
 
-
-  public void dumpCompactioNodeTable() {
-    List<CompactionNode> nodeList =
-        compactionNodeTable.values().stream().collect(Collectors.toList());
-    Collections.sort(nodeList, new NodeComparator());
-    for (CompactionNode n : nodeList ) {
-      LOG.warn("File : " + n.fileName + " :: Total keys : "
-          + n.totalNumberOfKeys);
-      LOG.warn("File : " + n.fileName + " :: Cumulative keys : "  +
+  @VisibleForTesting
+  void dumpCompactionNodeTable() {
+    List<CompactionNode> nodeList = compactionNodeMap.values().stream()
+        .sorted(new NodeComparator()).collect(Collectors.toList());
+    for (CompactionNode n : nodeList) {
+      LOG.debug("File '{}' total keys: {}", n.fileName, n.totalNumberOfKeys);
+      LOG.debug("File '{}' cumulative keys: {}", n.fileName,
           n.cumulativeKeysReverseTraversal);
     }
   }
 
-  @SuppressFBWarnings({"NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
-  public synchronized void printMutableGraphFromAGivenNode(
-      String fileName, int level, MutableGraph<CompactionNode> mutableGraph) {
-    CompactionNode infileNode =
-        compactionNodeTable.get(Paths.get(fileName).getFileName().toString());
+  @VisibleForTesting
+  public synchronized void printMutableGraphFromAGivenNode(String fileName,
+      int sstLevel, MutableGraph<CompactionNode> mutableGraph) {
+
+    CompactionNode infileNode = compactionNodeMap.get(fileName);
     if (infileNode == null) {
       return;
     }
-    System.out.print("\nCompaction Level : " + level + " Expandin File:" +
-        fileName + ":\n");
-    Set<CompactionNode> nextLevel = new HashSet<>();
-    nextLevel.add(infileNode);
+    LOG.debug("Expanding file: {}. SST compaction level: {}",
+        fileName, sstLevel);
     Set<CompactionNode> currentLevel = new HashSet<>();
-    currentLevel.addAll(nextLevel);
-    int i = 1;
-    while (currentLevel.size() != 0) {
-      LOG.warn("DAG Level :" + i++);
+    currentLevel.add(infileNode);
+    int levelCounter = 1;
+    while (!currentLevel.isEmpty()) {
+      LOG.debug("DAG Level: {}", levelCounter++);
+      final Set<CompactionNode> nextLevel = new HashSet<>();
+      StringBuilder sb = new StringBuilder();
       for (CompactionNode current : currentLevel) {
         Set<CompactionNode> successors = mutableGraph.successors(current);
-        for (CompactionNode oneSucc : successors) {
-          System.out.print(oneSucc.fileName + " ");
-          nextLevel.add(oneSucc);
+        for (CompactionNode succNode : successors) {
+          sb.append(succNode.fileName).append(" ");
+          nextLevel.add(succNode);
         }
       }
-      currentLevel = new HashSet<>();
-      currentLevel.addAll(nextLevel);
-      nextLevel = new HashSet<>();
-      LOG.warn("");
+      LOG.debug("{}", sb);
+      currentLevel = nextLevel;
     }
   }
 
-  public synchronized void printMutableGraph(
-      String srcSnapId, String destSnapId,
+  synchronized void printMutableGraph(String srcSnapId, String destSnapId,
       MutableGraph<CompactionNode> mutableGraph) {
-    LOG.warn("Printing the Graph");
-    Set<CompactionNode> topLevelNodes = new HashSet<>();
-    Set<CompactionNode> allNodes = new HashSet<>();
-    for (CompactionNode n : mutableGraph.nodes()) {
+
+    LOG.debug("Gathering all SST file nodes from src '{}' to dest '{}'",
+        srcSnapId, destSnapId);
+
+    final Queue<CompactionNode> nodeQueue = new LinkedList<>();
+    // Queue source snapshot SST file nodes
+    for (CompactionNode node : mutableGraph.nodes()) {
       if (srcSnapId == null ||
-          n.snapshotId.compareToIgnoreCase(srcSnapId) == 0) {
-        topLevelNodes.add(n);
+          node.snapshotId.compareToIgnoreCase(srcSnapId) == 0) {
+        nodeQueue.add(node);
       }
     }
-    Iterator iter = topLevelNodes.iterator();
-    while (iter.hasNext()) {
-      CompactionNode n = (CompactionNode) iter.next();
-      Set<CompactionNode> succ = mutableGraph.successors(n);
-      LOG.warn("Parent Node :" + n.fileName);
-      if (succ.size() == 0) {
-        LOG.warn("No Children Node ");
-        allNodes.add(n);
-        iter.remove();
-        iter = topLevelNodes.iterator();
+
+    final Set<CompactionNode> allNodesSet = new HashSet<>();
+    while (!nodeQueue.isEmpty()) {
+      CompactionNode node = nodeQueue.poll();
+      Set<CompactionNode> succSet = mutableGraph.successors(node);
+      LOG.debug("Current node: {}", node);
+      if (succSet.isEmpty()) {
+        LOG.debug("Has no successor node");
+        allNodesSet.add(node);
         continue;
       }
-      for (CompactionNode oneSucc : succ) {
-        LOG.warn("Children Node :" + oneSucc.fileName);
-        if (srcSnapId == null||
-            oneSucc.snapshotId.compareToIgnoreCase(destSnapId) == 0) {
-          allNodes.add(oneSucc);
-        } else {
-          topLevelNodes.add(oneSucc);
+      for (CompactionNode succNode : succSet) {
+        LOG.debug("Has successor node: {}", succNode);
+        if (srcSnapId == null ||
+            succNode.snapshotId.compareToIgnoreCase(destSnapId) == 0) {
+          allNodesSet.add(succNode);
+          continue;
         }
+        nodeQueue.add(succNode);
       }
-      iter.remove();
-      iter = topLevelNodes.iterator();
-    }
-    LOG.warn("src snap:" + srcSnapId);
-    LOG.warn("dest snap:" + destSnapId);
-    for (CompactionNode n : allNodes) {
-      LOG.warn("Files are :" + n.fileName);
     }
-  }
 
+    LOG.debug("Files are: {}", allNodesSet);
+  }
 
-  public void createSnapshot(RocksDB rocksDB) throws InterruptedException {
-
-    LOG.warn("Current time is::" + System.currentTimeMillis());
-    long t1 = System.currentTimeMillis();
+  public MutableGraph<CompactionNode> getForwardCompactionDAG() {
+    return forwardCompactionDAG;
+  }
 
-    cpPath = cpPath + lastSnapshotCounter;
-    createCheckPoint(rocksDbPath, cpPath, rocksDB);
-    allSnapshots[lastSnapshotCounter] = new Snapshot(cpPath,
-    lastSnapshotPrefix, lastSnapshotCounter);
+  public MutableGraph<CompactionNode> getBackwardCompactionDAG() {
+    return backwardCompactionDAG;
+  }
 
-    long t2 = System.currentTimeMillis();
-    LOG.warn("Current time is::" + t2);
+  /**
+   * Helper method to add a new file node to the DAG.
+   * @return CompactionNode
+   */
+  private CompactionNode addNodeToDAG(String file, long seqNum) {
+    long numKeys = 0L;
+    try {
+      numKeys = getSSTFileSummary(file);
+    } catch (RocksDBException e) {
+      LOG.warn("Can't get num of keys in SST '{}': {}", file, e.getMessage());
+    }
+    CompactionNode fileNode = new CompactionNode(file, null, numKeys, seqNum);
+    forwardCompactionDAG.addNode(fileNode);
+    backwardCompactionDAG.addNode(fileNode);
 
-    LOG.warn("millisecond difference is ::" + (t2 - t1));
-   Thread.sleep(100);
-   ++lastSnapshotCounter;
-   lastSnapshotPrefix = "sid_" + lastSnapshotCounter;
-   LOG.warn("done :: 1");
+    return fileNode;
   }
 
+  /**
+   * Populate the compaction DAG with input and output SST files lists.
+   */
+  private void populateCompactionDAG(List<String> inputFiles,
+      List<String> outputFiles, long seqNum) {
 
-  public void printAllSnapshots() throws InterruptedException {
-    for (Snapshot snap : allSnapshots) {
-      if (snap == null) {
-        break;
-      }
-      LOG.warn("Snapshot id" + snap.snapshotID);
-      LOG.warn("Snapshot path" + snap.dbPath);
-      LOG.warn("Snapshot Generation" + snap.snapshotGeneration);
-      LOG.warn("");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Input files: {} -> Output files: {}", inputFiles, outputFiles);
     }
-  }
 
-  public void diffAllSnapshots() throws InterruptedException, RocksDBException {
-    for (Snapshot snap : allSnapshots) {
-      if (snap == null) {
-        break;
+    for (String outfile : outputFiles) {
+      final CompactionNode outfileNode = compactionNodeMap.computeIfAbsent(
+          outfile, file -> addNodeToDAG(file, seqNum));
+
+      for (String infile : inputFiles) {
+        final CompactionNode infileNode = compactionNodeMap.computeIfAbsent(
+            infile, file -> addNodeToDAG(file, seqNum));
+        // Draw the edges
+        if (!outfileNode.fileName.equals(infileNode.fileName)) {
+          forwardCompactionDAG.putEdge(outfileNode, infileNode);
+          backwardCompactionDAG.putEdge(infileNode, outfileNode);
+        }
       }
-      printSnapdiffSSTFiles(allSnapshots[lastSnapshotCounter - 1], snap);
     }
-  }
-
-  public MutableGraph<CompactionNode> getCompactionFwdDAG() {
-    return compactionDAGFwd;
-  }
 
-  public MutableGraph<CompactionNode> getCompactionReverseDAG() {
-    return compactionDAGFwd;
   }
 
+  @VisibleForTesting
   public synchronized void traverseGraph(
       MutableGraph<CompactionNode> reverseMutableGraph,
       MutableGraph<CompactionNode> fwdMutableGraph) {
 
-      List<CompactionNode> nodeList =
-        compactionNodeTable.values().stream().collect(Collectors.toList());
-    Collections.sort(nodeList, new NodeComparator());
+    List<CompactionNode> nodeList = compactionNodeMap.values().stream()
+        .sorted(new NodeComparator()).collect(Collectors.toList());
 
-    for (CompactionNode  infileNode : nodeList ) {
-      // fist go through fwdGraph to find nodes that don't have succesors.
+    for (CompactionNode infileNode : nodeList) {
+      // fist go through fwdGraph to find nodes that don't have successors.
       // These nodes will be the top level nodes in reverse graph
       Set<CompactionNode> successors = fwdMutableGraph.successors(infileNode);
-      if (successors == null || successors.size() == 0) {
-        LOG.warn("traverseGraph : No successors. cumulative " +
-            "keys : " + infileNode.cumulativeKeysReverseTraversal + "::total " +
-            "keys ::" + infileNode.totalNumberOfKeys);
+      if (successors.size() == 0) {
+        LOG.debug("No successors. Cumulative keys: {}, total keys: {}",
+            infileNode.cumulativeKeysReverseTraversal,
+            infileNode.totalNumberOfKeys);
         infileNode.cumulativeKeysReverseTraversal =
             infileNode.totalNumberOfKeys;
       }
     }
 
     HashSet<CompactionNode> visited = new HashSet<>();
-    for (CompactionNode  infileNode : nodeList ) {
+    for (CompactionNode infileNode : nodeList) {
       if (visited.contains(infileNode)) {
         continue;
       }
       visited.add(infileNode);
-      System.out.print("traverseGraph: Visiting node " + infileNode.fileName +
-          ":\n");
-      Set<CompactionNode> nextLevel = new HashSet<>();
-      nextLevel.add(infileNode);
+      LOG.debug("Visiting node '{}'", infileNode.fileName);
       Set<CompactionNode> currentLevel = new HashSet<>();
-      currentLevel.addAll(nextLevel);
-      nextLevel = new HashSet<>();
-      int i = 1;
-      while (currentLevel.size() != 0) {
-        LOG.warn("traverseGraph : DAG Level :" + i++);
+      currentLevel.add(infileNode);
+      int level = 1;
+      while (!currentLevel.isEmpty()) {
+        LOG.debug("BFS Level: {}. Current level has {} nodes",
+            level++, currentLevel.size());
+        final Set<CompactionNode> nextLevel = new HashSet<>();
         for (CompactionNode current : currentLevel) {
-          LOG.warn("traverseGraph : expanding node " + current.fileName);
+          LOG.debug("Expanding node: {}", current.fileName);
           Set<CompactionNode> successors =
               reverseMutableGraph.successors(current);
-          if (successors == null || successors.size() == 0) {
-            LOG.warn("traverseGraph : No successors. cumulative " +
-                "keys : " + current.cumulativeKeysReverseTraversal);
-          } else {
-            for (CompactionNode oneSucc : successors) {
-              LOG.warn("traverseGraph : Adding to the next level : " +
-                  oneSucc.fileName);
-              LOG.warn("traverseGraph : " + oneSucc.fileName + "cum" + " keys"
-                  + oneSucc.cumulativeKeysReverseTraversal + "parent" + " " +
-                  current.fileName + " total " + current.totalNumberOfKeys);
-              oneSucc.cumulativeKeysReverseTraversal +=
-                  current.cumulativeKeysReverseTraversal;
-              nextLevel.add(oneSucc);
-            }
+          if (successors.isEmpty()) {
+            LOG.debug("No successors. Cumulative keys: {}",
+                current.cumulativeKeysReverseTraversal);
+            continue;
+          }
+          for (CompactionNode node : successors) {
+            LOG.debug("Adding to the next level: {}", node.fileName);
+            LOG.debug("'{}' cumulative keys: {}. parent '{}' total keys: {}",
+                node.fileName, node.cumulativeKeysReverseTraversal,
+                current.fileName, current.totalNumberOfKeys);
+            node.cumulativeKeysReverseTraversal +=
+                current.cumulativeKeysReverseTraversal;
+            nextLevel.add(node);
           }
         }
-        currentLevel = new HashSet<>();
-        currentLevel.addAll(nextLevel);
-        nextLevel = new HashSet<>();
-        LOG.warn("");
+        currentLevel = nextLevel;
       }
     }
   }
 
+  @VisibleForTesting
   public boolean debugEnabled(Integer level) {
     return DEBUG_LEVEL.contains(level);
   }
+
+  @VisibleForTesting
+  public static Logger getLog() {
+    return LOG;
+  }
 }
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
index a94fb32236..9a4c5c10b0 100644
--- a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
@@ -18,71 +18,129 @@
 package org.apache.ozone.rocksdiff;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Arrays.asList;
 import static org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DEBUG_DAG_LIVE_NODES;
 import static org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DEBUG_READ_ALL_DB_KEYS;
+import static org.junit.jupiter.api.Assertions.fail;
 
+import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.stream.Collectors;
 
+import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DifferSnapshotInfo;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.rocksdb.Checkpoint;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.CompressionType;
 import org.rocksdb.DBOptions;
+import org.rocksdb.FlushOptions;
 import org.rocksdb.LiveFileMetaData;
 import org.rocksdb.Options;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
 
-////CHECKSTYLE:OFF
+/**
+ * Test RocksDBCheckpointDiffer basic functionality.
+ */
 public class TestRocksDBCheckpointDiffer {
 
-  private static final String dbPath   = "./rocksdb-data";
-  private static final int NUM_ROW = 25000000;
-  private static final int SNAPSHOT_EVERY_SO_MANY_KEYS = 999999;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestRocksDBCheckpointDiffer.class);
 
-  // keeps track of all the snapshots created so far.
-  private static int lastSnapshotCounter = 0;
-  private static String lastSnapshotPrefix = "snap_id_";
+  /**
+   * RocksDB path for the test.
+   */
+  private static final String TEST_DB_PATH = "./rocksdb-data";
+  private static final int NUM_ROW = 250000;
+  private static final int SNAPSHOT_EVERY_SO_MANY_KEYS = 49999;
+
+  /**
+   * RocksDB checkpoint path prefix.
+   */
+  private static final String CP_PATH_PREFIX = "rocksdb-cp-";
+  private final ArrayList<DifferSnapshotInfo> snapshots = new ArrayList<>();
 
+  /**
+   * Graph type.
+   */
+  enum GType {
+    FNAME,
+    KEYSIZE,
+    CUMUTATIVE_SIZE
+  }
 
-  public static void main(String[] args) throws Exception {
-    TestRocksDBCheckpointDiffer tester= new TestRocksDBCheckpointDiffer();
+  @BeforeAll
+  public static void init() {
+    // Checkpoint differ log level. Set to DEBUG for verbose output
+    GenericTestUtils.setLogLevel(RocksDBCheckpointDiffer.getLog(), Level.INFO);
+    // Test class log level. Set to DEBUG for verbose output
+    GenericTestUtils.setLogLevel(TestRocksDBCheckpointDiffer.LOG, Level.INFO);
+  }
+
+  @Test
+  void testMain() throws Exception {
+
+    final String clDirStr = "compaction-log";
+    // Delete the compaction log dir for the test, if it exists
+    File clDir = new File(clDirStr);
+    if (clDir.exists()) {
+      deleteDirectory(clDir);
+    }
+
+    final String metadataDirStr = ".";
+    final String sstDirStr = "compaction-sst-backup";
+
+    final File dbLocation = new File(TEST_DB_PATH);
     RocksDBCheckpointDiffer differ = new RocksDBCheckpointDiffer(
-        "./rocksdb-data",
-        1000,
-        "./rocksdb-data-cp",
-        "./SavedCompacted_Files/",
-        "./rocksdb-data-cf/",
-        0,
-        "snap_id_");
-    lastSnapshotPrefix = "snap_id_" + lastSnapshotCounter;
-    RocksDB rocksDB = tester.createRocksDBInstance(dbPath, differ);
-    Thread.sleep(10000);
-
-    tester.readRocksDBInstance(dbPath, rocksDB, null, differ);
-    differ.printAllSnapshots();
+        metadataDirStr, sstDirStr, clDirStr, dbLocation);
+
+    // Empty the SST backup folder first for testing
+    File sstDir = new File(sstDirStr);
+    deleteDirectory(sstDir);
+    if (!sstDir.mkdir()) {
+      fail("Unable to create SST backup directory");
+    }
+
+    RocksDB rocksDB = createRocksDBInstanceAndWriteKeys(TEST_DB_PATH, differ);
+    readRocksDBInstance(TEST_DB_PATH, rocksDB, null, differ);
+
+    if (LOG.isDebugEnabled()) {
+      printAllSnapshots();
+    }
+
     differ.traverseGraph(
-        differ.getCompactionReverseDAG(),
-        differ.getCompactionFwdDAG());
-    differ.diffAllSnapshots();
-    differ.dumpCompactioNodeTable();
-    for (RocksDBCheckpointDiffer.GType gtype :
-        RocksDBCheckpointDiffer.GType.values()) {
-      String fname = "fwdGraph_" + gtype.toString() +  ".png";
-      String rname = "reverseGraph_"+ gtype.toString() + ".png";
-
-      //differ.pngPrintMutableGrapth(differ.getCompactionFwdDAG(),
-      //  fname, gtype);
-      //differ.pngPrintMutableGrapth(differ.getCompactionReverseDAG(), rname,
-      //    gtype);
+        differ.getBackwardCompactionDAG(),
+        differ.getForwardCompactionDAG());
+
+    diffAllSnapshots(differ);
+
+    if (LOG.isDebugEnabled()) {
+      differ.dumpCompactionNodeTable();
     }
+
+    for (GType gtype : GType.values()) {
+      String fname = "fwdGraph_" + gtype +  ".png";
+      String rname = "reverseGraph_" + gtype + ".png";
+/*
+      differ.pngPrintMutableGrapth(differ.getCompactionFwdDAG(), fname, gtype);
+      differ.pngPrintMutableGrapth(
+          differ.getCompactionReverseDAG(), rname, gtype);
+ */
+    }
+
     rocksDB.close();
   }
 
@@ -93,53 +151,169 @@ public class TestRocksDBCheckpointDiffer {
 
     return random.ints(leftLimit, rightLimit + 1)
         .filter(i -> (i <= 57 || i >= 65) && (i <= 90 || i >= 97))
-        .limit(7)
+        .limit(length)
         .collect(StringBuilder::new,
             StringBuilder::appendCodePoint, StringBuilder::append)
         .toString();
   }
 
-  //  Test Code to create sample RocksDB instance.
-  public RocksDB createRocksDBInstance(String dbPathArg,
-                                       RocksDBCheckpointDiffer differ)
-      throws RocksDBException, InterruptedException {
+  /**
+   * Test SST differ.
+   */
+  void diffAllSnapshots(RocksDBCheckpointDiffer differ) {
+    final DifferSnapshotInfo src = snapshots.get(snapshots.size() - 1);
+
+    // Hard-coded expected output.
+    // The results are deterministic. Retrieved from a successful run.
+    final List<List<String>> expectedDifferResult = asList(
+        asList("000024", "000017", "000028", "000026", "000019", "000021"),
+        asList("000024", "000028", "000026", "000019", "000021"),
+        asList("000024", "000028", "000026", "000021"),
+        asList("000024", "000028", "000026"),
+        asList("000028", "000026"),
+        Collections.singletonList("000028"),
+        Collections.emptyList()
+    );
+    Assertions.assertEquals(snapshots.size(), expectedDifferResult.size());
+
+    int index = 0;
+    for (DifferSnapshotInfo snap : snapshots) {
+      // Returns a list of SST files to be fed into RocksDiff
+      List<String> sstDiffList = differ.getSSTDiffList(src, snap);
+      LOG.debug("SST diff list from '{}' to '{}': {}",
+          src.getDbPath(), snap.getDbPath(), sstDiffList);
+
+      Assertions.assertEquals(expectedDifferResult.get(index), sstDiffList);
+      ++index;
+    }
+  }
+
+  /**
+   * Helper function that creates an RDB checkpoint (= Ozone snapshot).
+   */
+  private void createCheckpoint(RocksDBCheckpointDiffer differ,
+      RocksDB rocksDB) {
+
+    LOG.trace("Current time: " + System.currentTimeMillis());
+    long t1 = System.currentTimeMillis();
+
+    final long snapshotGeneration = rocksDB.getLatestSequenceNumber();
+    final String cpPath = CP_PATH_PREFIX + snapshotGeneration;
+
+    // Delete the checkpoint dir if it already exists for the test
+    File dir = new File(cpPath);
+    if (dir.exists()) {
+      deleteDirectory(dir);
+    }
+
+    final long dbLatestSequenceNumber = rocksDB.getLatestSequenceNumber();
+
+    createCheckPoint(TEST_DB_PATH, cpPath, rocksDB);
+    final String snapshotId = "snap_id_" + snapshotGeneration;
+    final DifferSnapshotInfo currentSnapshot =
+        new DifferSnapshotInfo(cpPath, snapshotId, snapshotGeneration);
+    this.snapshots.add(currentSnapshot);
+
+    // Same as what OmSnapshotManager#createOmSnapshotCheckpoint would do
+    differ.appendSequenceNumberToCompactionLog(dbLatestSequenceNumber);
+
+    differ.setCurrentCompactionLog(dbLatestSequenceNumber);
+
+    long t2 = System.currentTimeMillis();
+    LOG.trace("Current time: " + t2);
+    LOG.debug("Time elapsed: " + (t2 - t1) + " ms");
+  }
+
+  // Flushes the WAL and Creates a RocksDB checkpoint
+  void createCheckPoint(String dbPathArg, String cpPathArg,
+      RocksDB rocksDB) {
+    LOG.debug("Creating RocksDB '{}' checkpoint at '{}'", dbPathArg, cpPathArg);
+    try {
+      rocksDB.flush(new FlushOptions());
+      Checkpoint cp = Checkpoint.create(rocksDB);
+      cp.createCheckpoint(cpPathArg);
+    } catch (RocksDBException e) {
+      throw new RuntimeException(e.getMessage());
+    }
+  }
+
+  void printAllSnapshots() {
+    for (DifferSnapshotInfo snap : snapshots) {
+      LOG.debug("{}", snap);
+    }
+  }
+
+  // Test Code to create sample RocksDB instance.
+  private RocksDB createRocksDBInstanceAndWriteKeys(String dbPathArg,
+      RocksDBCheckpointDiffer differ) throws RocksDBException {
+
+    LOG.debug("Creating RocksDB at '{}'", dbPathArg);
+
+    // Delete the test DB dir if it exists
+    File dir = new File(dbPathArg);
+    if (dir.exists()) {
+      deleteDirectory(dir);
+    }
 
-    System.out.println("Creating RocksDB instance at :" + dbPathArg);
+    final ColumnFamilyOptions cfOpts = new ColumnFamilyOptions()
+        .optimizeUniversalStyleCompaction();
+    final List<ColumnFamilyDescriptor> cfDescriptors =
+        RocksDBCheckpointDiffer.getCFDescriptorList(cfOpts);
+    List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
 
-    RocksDB rocksDB = null;
-    rocksDB = differ.getRocksDBInstanceWithCompactionTracking(dbPathArg);
+    // Create a RocksDB instance with compaction tracking
+    final DBOptions dbOptions = new DBOptions()
+        .setCreateIfMissing(true)
+        .setCreateMissingColumnFamilies(true);
+    differ.setRocksDBForCompactionTracking(dbOptions);
+    RocksDB rocksDB = RocksDB.open(dbOptions, dbPathArg, cfDescriptors,
+        cfHandles);
+
+    differ.setCurrentCompactionLog(rocksDB.getLatestSequenceNumber());
 
     Random random = new Random();
     // key-value
     for (int i = 0; i < NUM_ROW; ++i) {
       String generatedString = getRandomString(random, 7);
-      String keyStr = " My" + generatedString + "StringKey" + i;
-      String valueStr = " My " + generatedString + "StringValue" + i;
+      String keyStr = "Key-" + i + "-" + generatedString;
+      String valueStr = "Val-" + i + "-" + generatedString;
       byte[] key = keyStr.getBytes(UTF_8);
-      rocksDB.put(key, valueStr.getBytes(UTF_8));
+      // Put entry in keyTable
+      rocksDB.put(cfHandles.get(1), key, valueStr.getBytes(UTF_8));
       if (i % SNAPSHOT_EVERY_SO_MANY_KEYS == 0) {
-        differ.createSnapshot(rocksDB);
+        createCheckpoint(differ, rocksDB);
       }
-      //System.out.println(toStr(rocksDB.get(key));
     }
-    differ.createSnapshot(rocksDB);
+    createCheckpoint(differ, rocksDB);
     return rocksDB;
   }
 
-  //  RocksDB.DEFAULT_COLUMN_FAMILY
-  private void UpdateRocksDBInstance(String dbPathArg, RocksDB rocksDB) {
+  static boolean deleteDirectory(java.io.File directoryToBeDeleted) {
+    File[] allContents = directoryToBeDeleted.listFiles();
+    if (allContents != null) {
+      for (java.io.File file : allContents) {
+        if (!deleteDirectory(file)) {
+          return false;
+        }
+      }
+    }
+    return directoryToBeDeleted.delete();
+  }
+
+  /**
+   * RocksDB.DEFAULT_COLUMN_FAMILY.
+   */
+  private void updateRocksDBInstance(String dbPathArg, RocksDB rocksDB) {
     System.out.println("Updating RocksDB instance at :" + dbPathArg);
-    //
-    try (final Options options =
-             new Options().setCreateIfMissing(true).
-                 setCompressionType(CompressionType.NO_COMPRESSION)) {
+
+    try (Options options = new Options().setCreateIfMissing(true)) {
       if (rocksDB == null) {
         rocksDB = RocksDB.open(options, dbPathArg);
       }
 
       Random random = new Random();
       // key-value
-      for (int i = 0; i< NUM_ROW; ++i) {
+      for (int i = 0; i < NUM_ROW; ++i) {
         String generatedString = getRandomString(random, 7);
         String keyStr = " MyUpdated" + generatedString + "StringKey" + i;
         String valueStr = " My Updated" + generatedString + "StringValue" + i;
@@ -152,12 +326,14 @@ public class TestRocksDBCheckpointDiffer {
     }
   }
 
-  //  RocksDB.DEFAULT_COLUMN_FAMILY
+  /**
+   * RocksDB.DEFAULT_COLUMN_FAMILY.
+   */
   public void testDefaultColumnFamilyOriginal() {
     System.out.println("testDefaultColumnFamily begin...");
-    //
-    try (final Options options = new Options().setCreateIfMissing(true)) {
-      try (final RocksDB rocksDB = RocksDB.open(options, "./rocksdb-data")) {
+
+    try (Options options = new Options().setCreateIfMissing(true)) {
+      try (RocksDB rocksDB = RocksDB.open(options, "./rocksdb-data")) {
         // key-value
         byte[] key = "Hello".getBytes(UTF_8);
         rocksDB.put(key, "World".getBytes(UTF_8));
@@ -167,7 +343,7 @@ public class TestRocksDBCheckpointDiffer {
         rocksDB.put("SecondKey".getBytes(UTF_8), "SecondValue".getBytes(UTF_8));
 
         // List
-        List<byte[]> keys = Arrays.asList(key, "SecondKey".getBytes(UTF_8),
+        List<byte[]> keys = asList(key, "SecondKey".getBytes(UTF_8),
             "missKey".getBytes(UTF_8));
         List<byte[]> values = rocksDB.multiGetAsList(keys);
         for (int i = 0; i < keys.size(); i++) {
@@ -200,21 +376,22 @@ public class TestRocksDBCheckpointDiffer {
   // (table)
   public void testCertainColumnFamily() {
     System.out.println("\ntestCertainColumnFamily begin...");
-    try (final ColumnFamilyOptions cfOpts =
-             new ColumnFamilyOptions().optimizeUniversalStyleCompaction()) {
+    try (ColumnFamilyOptions cfOpts = new ColumnFamilyOptions()
+        .optimizeUniversalStyleCompaction()) {
       String cfName = "my-first-columnfamily";
       // list of column family descriptors, first entry must always be
       // default column family
-      final List<ColumnFamilyDescriptor> cfDescriptors = Arrays.asList(
+      final List<ColumnFamilyDescriptor> cfDescriptors = asList(
           new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOpts),
           new ColumnFamilyDescriptor(cfName.getBytes(UTF_8), cfOpts)
       );
 
       List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
-      try (final DBOptions dbOptions = new DBOptions().setCreateIfMissing(true).
-          setCreateMissingColumnFamilies(true);
-           final RocksDB rocksDB = RocksDB.open(dbOptions, "./rocksdb-data-cf" +
-               "/", cfDescriptors, cfHandles)) {
+      try (DBOptions dbOptions = new DBOptions()
+          .setCreateIfMissing(true)
+          .setCreateMissingColumnFamilies(true);
+          RocksDB rocksDB = RocksDB.open(dbOptions,
+              "./rocksdb-data-cf/", cfDescriptors, cfHandles)) {
         ColumnFamilyHandle cfHandle = cfHandles.stream().filter(x -> {
           try {
             return (toStr(x.getName())).equals(cfName);
@@ -229,39 +406,39 @@ public class TestRocksDBCheckpointDiffer {
             "FirstValue".getBytes(UTF_8));
         // key
         byte[] getValue = rocksDB.get(cfHandles.get(0), key.getBytes(UTF_8));
-        System.out.println("get Value : " + toStr(getValue));
+        LOG.debug("get Value: " + toStr(getValue));
         // key/value
         rocksDB.put(cfHandles.get(1), "SecondKey".getBytes(UTF_8),
             "SecondValue".getBytes(UTF_8));
 
-        List<byte[]> keys = Arrays.asList(key.getBytes(UTF_8),
+        List<byte[]> keys = asList(key.getBytes(UTF_8),
             "SecondKey".getBytes(UTF_8));
-        List<ColumnFamilyHandle> cfHandleList = Arrays.asList(cfHandle,
-            cfHandle);
+        List<ColumnFamilyHandle> cfHandleList = asList(cfHandle, cfHandle);
+
         // key
         List<byte[]> values = rocksDB.multiGetAsList(cfHandleList, keys);
         for (int i = 0; i < keys.size(); i++) {
-          System.out.println("multiGet:" + toStr(keys.get(i)) + "--" +
+          LOG.debug("multiGet:" + toStr(keys.get(i)) + "--" +
               (values.get(i) == null ? null : toStr(values.get(i))));
         }
-        //rocksDB.compactRange();
-        //rocksDB.compactFiles();
+
         List<LiveFileMetaData> liveFileMetaDataList =
             rocksDB.getLiveFilesMetaData();
         for (LiveFileMetaData m : liveFileMetaDataList) {
           System.out.println("Live File Metadata");
           System.out.println("\tFile :" + m.fileName());
-          System.out.println("\ttable :" + toStr(m.columnFamilyName()));
+          System.out.println("\tTable :" + toStr(m.columnFamilyName()));
           System.out.println("\tKey Range :" + toStr(m.smallestKey()) +
               " " + "<->" + toStr(m.largestKey()));
         }
+
         // key
         rocksDB.delete(cfHandle, key.getBytes(UTF_8));
 
         // key
         RocksIterator iter = rocksDB.newIterator(cfHandle);
         for (iter.seekToFirst(); iter.isValid(); iter.next()) {
-          System.out.println("iterator:" + toStr(iter.key()) + ":" +
+          System.out.println("Iterator:" + toStr(iter.key()) + ":" +
               toStr(iter.value()));
         }
       } finally {
@@ -277,18 +454,15 @@ public class TestRocksDBCheckpointDiffer {
 
   // Read from a given RocksDB instance and optionally write all the
   // keys to a given file.
-  //
-  public void readRocksDBInstance(String dbPathArg, RocksDB rocksDB,
-                                  FileWriter file,
-                                  RocksDBCheckpointDiffer differ) {
-    System.out.println("Reading RocksDB instance at : " + dbPathArg);
+  void readRocksDBInstance(String dbPathArg, RocksDB rocksDB, FileWriter file,
+      RocksDBCheckpointDiffer differ) {
+
+    LOG.debug("Reading RocksDB: " + dbPathArg);
     boolean createdDB = false;
-    //
-    try (final Options options =
-             new Options().setParanoidChecks(true)
-                 .setCreateIfMissing(true)
-                 .setCompressionType(CompressionType.NO_COMPRESSION)
-                 .setForceConsistencyChecks(false)) {
+
+    try (Options options = new Options()
+        .setParanoidChecks(true)
+        .setForceConsistencyChecks(false)) {
 
       if (rocksDB == null) {
         rocksDB = RocksDB.openReadOnly(options, dbPathArg);
@@ -298,22 +472,21 @@ public class TestRocksDBCheckpointDiffer {
       List<LiveFileMetaData> liveFileMetaDataList =
           rocksDB.getLiveFilesMetaData();
       for (LiveFileMetaData m : liveFileMetaDataList) {
-        System.out.println("Live File Metadata");
-        System.out.println("\tFile : " + m.fileName());
-        System.out.println("\tLevel : " + m.level());
-        System.out.println("\tTable : " + toStr(m.columnFamilyName()));
-        System.out.println("\tKey Range : " + toStr(m.smallestKey())
+        LOG.debug("SST File: {}. ", m.fileName());
+        LOG.debug("\tLevel: {}", m.level());
+        LOG.debug("\tTable: {}", toStr(m.columnFamilyName()));
+        LOG.debug("\tKey Range: {}", toStr(m.smallestKey())
             + " <-> " + toStr(m.largestKey()));
         if (differ.debugEnabled(DEBUG_DAG_LIVE_NODES)) {
           differ.printMutableGraphFromAGivenNode(m.fileName(), m.level(),
-              differ.getCompactionFwdDAG());
+              differ.getForwardCompactionDAG());
         }
       }
 
-      if(differ.debugEnabled(DEBUG_READ_ALL_DB_KEYS)) {
+      if (differ.debugEnabled(DEBUG_READ_ALL_DB_KEYS)) {
         RocksIterator iter = rocksDB.newIterator();
         for (iter.seekToFirst(); iter.isValid(); iter.next()) {
-          System.out.println("iterator key:" + toStr(iter.key()) + ", " +
+          LOG.debug("Iterator key:" + toStr(iter.key()) + ", " +
               "iter value:" + toStr(iter.value()));
           if (file != null) {
             file.write("iterator key:" + toStr(iter.key()) + ", iter " +
@@ -325,7 +498,7 @@ public class TestRocksDBCheckpointDiffer {
     } catch (IOException | RocksDBException e) {
       e.printStackTrace();
     } finally {
-      if (createdDB){
+      if (createdDB) {
         rocksDB.close();
       }
     }
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/resources/log4j.properties b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..b8ad21d6c7
--- /dev/null
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/resources/log4j.properties
@@ -0,0 +1,21 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} (%F:%M(%L)) - %m%n
+
+log4j.logger.org.apache.hadoop.security.ShellBasedUnixGroupsMapping=ERROR
+log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java
index e4323c271b..689cd439c5 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java
@@ -107,6 +107,10 @@ public final class SnapshotInfo implements Auditable {
   private String globalPreviousSnapshotID;
   private String snapshotPath; // snapshot mask
   private String checkpointDir;
+  /**
+   * RocksDB transaction sequence number at the time of checkpoint creation.
+   */
+  private long dbTxSequenceNumber;
 
   /**
    * Private constructor, constructed via builder.
@@ -410,6 +414,14 @@ public final class SnapshotInfo implements Auditable {
     return getCheckpointDirName(getSnapshotID());
   }
 
+  public long getDbTxSequenceNumber() {
+    return dbTxSequenceNumber;
+  }
+
+  public void setDbTxSequenceNumber(long dbTxSequenceNumber) {
+    this.dbTxSequenceNumber = dbTxSequenceNumber;
+  }
+
   /**
    * Get the table key for this snapshot.
    */
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
new file mode 100644
index 0000000000..d90a1c6e03
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.freon;
+
+import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMStorage;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
+import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DifferSnapshotInfo;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+import picocli.CommandLine;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_S3_VOLUME_NAME_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
+
+/**
+ * Tests Freon, with MiniOzoneCluster.
+ */
+public class TestOMSnapshotDAG {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestOMSnapshotDAG.class);
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+  private static ObjectStore store;
+  private final File metaDir = OMStorage.getOmDbDir(conf);
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true
+   *
+   */
+  @BeforeAll
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    DatanodeRatisServerConfig ratisServerConfig =
+        conf.getObject(DatanodeRatisServerConfig.class);
+    ratisServerConfig.setRequestTimeOut(Duration.ofSeconds(3));
+    ratisServerConfig.setWatchTimeOut(Duration.ofSeconds(3));
+    conf.setFromObject(ratisServerConfig);
+
+    RatisClientConfig.RaftConfig raftClientConfig =
+        conf.getObject(RatisClientConfig.RaftConfig.class);
+    raftClientConfig.setRpcRequestTimeout(Duration.ofSeconds(3));
+    raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(3));
+    conf.setFromObject(raftClientConfig);
+
+    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build();
+    cluster.waitForClusterToBeReady();
+
+    store = cluster.getClient().getObjectStore();
+
+    GenericTestUtils.setLogLevel(RaftLog.LOG, Level.INFO);
+    GenericTestUtils.setLogLevel(RaftServer.LOG, Level.INFO);
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterAll
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  private String getDBCheckpointAbsolutePath(SnapshotInfo snapshotInfo) {
+    return metaDir + OM_KEY_PREFIX +
+        OM_SNAPSHOT_DIR + OM_KEY_PREFIX +
+        OM_DB_NAME + snapshotInfo.getCheckpointDirName();
+  }
+
+  private static String getSnapshotDBKey(String volumeName, String bucketName,
+      String snapshotName) {
+
+    final String dbKeyPrefix = OM_KEY_PREFIX + volumeName +
+        OM_KEY_PREFIX + bucketName;
+    return dbKeyPrefix + OM_KEY_PREFIX + snapshotName;
+  }
+
+  private DifferSnapshotInfo getDifferSnapshotInfo(
+      OMMetadataManager omMetadataManager, String volumeName, String bucketName,
+      String snapshotName) throws IOException {
+
+    final String dbKey = getSnapshotDBKey(volumeName, bucketName, snapshotName);
+    final SnapshotInfo snapshotInfo =
+        omMetadataManager.getSnapshotInfoTable().get(dbKey);
+    String checkpointPath = getDBCheckpointAbsolutePath(snapshotInfo);
+
+    // Use RocksDB transaction sequence number in SnapshotInfo, which is
+    // persisted at the time of snapshot creation, as the snapshot generation
+    return new DifferSnapshotInfo(checkpointPath, snapshotInfo.getSnapshotID(),
+        snapshotInfo.getDbTxSequenceNumber());
+  }
+
+  @Test
+  void testZeroSizeKey()
+      throws IOException, InterruptedException, TimeoutException {
+
+    RandomKeyGenerator randomKeyGenerator =
+        new RandomKeyGenerator(cluster.getConf());
+    CommandLine cmd = new CommandLine(randomKeyGenerator);
+    cmd.execute("--num-of-volumes", "1",
+        "--num-of-buckets", "1",
+        "--num-of-keys", "600",
+        "--num-of-threads", "1",
+        "--key-size", "0",
+        "--factor", "THREE",
+        "--type", "RATIS",
+        "--validate-writes"
+    );
+
+    Assertions.assertEquals(600L, randomKeyGenerator.getNumberOfKeysAdded());
+    Assertions.assertEquals(600L,
+        randomKeyGenerator.getSuccessfulValidationCount());
+
+    List<OmVolumeArgs> volList = cluster.getOzoneManager()
+        .listAllVolumes("", "", 10);
+    LOG.debug("List of all volumes: {}", volList);
+    final String volumeName = volList.stream().filter(e ->
+        !e.getVolume().equals(OZONE_S3_VOLUME_NAME_DEFAULT))  // Ignore s3v vol
+        .collect(Collectors.toList()).get(0).getVolume();
+    List<OmBucketInfo> bucketList =
+        cluster.getOzoneManager().listBuckets(volumeName, "", "", 10);
+    LOG.debug("List of all buckets under the first volume: {}", bucketList);
+    final String bucketName = bucketList.get(0).getBucketName();
+
+    // Create snapshot
+    String resp = store.createSnapshot(volumeName, bucketName, "snap1");
+    LOG.debug("Snapshot created: {}", resp);
+
+    final OzoneVolume volume = store.getVolume(volumeName);
+    final OzoneBucket bucket = volume.getBucket(bucketName);
+
+    for (int i = 0; i < 6000; i++) {
+      bucket.createKey("b_" + i, 0).close();
+    }
+
+    // Create another snapshot
+    resp = store.createSnapshot(volumeName, bucketName, "snap3");
+    LOG.debug("Snapshot created: {}", resp);
+
+    // Get snapshot SST diff list
+    OzoneManager ozoneManager = cluster.getOzoneManager();
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    RDBStore rdbStore = (RDBStore) omMetadataManager.getStore();
+    RocksDBCheckpointDiffer differ = rdbStore.getRocksDBCheckpointDiffer();
+
+    DifferSnapshotInfo snap1 = getDifferSnapshotInfo(omMetadataManager,
+        volumeName, bucketName, "snap1");
+    DifferSnapshotInfo snap3 = getDifferSnapshotInfo(omMetadataManager,
+        volumeName, bucketName, "snap3");
+
+    // RocksDB does checkpointing in a separate thread, wait for it
+    final File checkpointSnap1 = new File(snap1.getDbPath());
+    GenericTestUtils.waitFor(checkpointSnap1::exists, 2000, 20000);
+    final File checkpointSnap3 = new File(snap3.getDbPath());
+    GenericTestUtils.waitFor(checkpointSnap3::exists, 2000, 20000);
+
+    List<String> actualDiffList = differ.getSSTDiffList(snap3, snap1);
+    LOG.debug("Got diff list: {}", actualDiffList);
+    // Hard-coded expected output.
+    // The result is deterministic. Retrieved from a successful run.
+    final List<String> expectedDiffList = Collections.singletonList("000059");
+    Assertions.assertEquals(expectedDiffList, actualDiffList);
+
+    // TODO: Use smaller DB write buffer size (currently it is set to 128 MB
+    //  in DBProfile), or generate enough keys (in the millions) to trigger
+    //  RDB compaction. Take another snapshot and do the diff again.
+    //  Then restart OM, do the same diff again to see if DAG reconstruction
+    //  works.
+  }
+
+}
\ No newline at end of file
diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 5c9b3409a5..27787143f7 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -744,17 +744,18 @@ enum SnapshotStatusProto {
  * SnapshotInfo table entry for Bucket/Volume snapshot metadata
  */
 message SnapshotInfo {
-  required string snapshotID = 1;
-  required string name = 2;
-  required string volumeName = 3;
-  required string bucketName = 4;
-  required SnapshotStatusProto snapshotStatus = 5;
-  required uint64 creationTime = 6;
-  required uint64 deletionTime = 7;
-  required string pathPreviousSnapshotID = 8;
-  required string globalPreviousSnapshotID = 9;
-  required string snapshotPath = 10;
-  required string checkpointDir = 11;
+  optional string snapshotID = 1;
+  optional string name = 2;
+  optional string volumeName = 3;
+  optional string bucketName = 4;
+  optional SnapshotStatusProto snapshotStatus = 5;
+  optional uint64 creationTime = 6;
+  optional uint64 deletionTime = 7;
+  optional string pathPreviousSnapshotID = 8;
+  optional string globalPreviousSnapshotID = 9;
+  optional string snapshotPath = 10;
+  optional string checkpointDir = 11;
+  optional int64 dbTxSequenceNumber = 12;
  }
 
 message OzoneObj {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 5b829664a0..e78df5a4a5 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -466,7 +466,8 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
         rocksDBConfiguration).setName(dbName)
         .setOpenReadOnly(readOnly)
         .setPath(Paths.get(metaDir.getPath()))
-        .setMaxFSSnapshots(maxFSSnapshots);
+        .setMaxFSSnapshots(maxFSSnapshots)
+        .setEnableCompactionLog(true);
     DBStore dbStore = addOMTablesAndCodecs(dbStoreBuilder).build();
     return dbStore;
   }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index ffdd136877..3e0057c981 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import java.io.IOException;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -131,7 +132,27 @@ public final class OmSnapshotManager {
       OMMetadataManager omMetadataManager, SnapshotInfo snapshotInfo)
       throws IOException {
     RDBStore store = (RDBStore) omMetadataManager.getStore();
-    return store.getSnapshot(snapshotInfo.getCheckpointDirName());
+
+    final long dbLatestSequenceNumber = snapshotInfo.getDbTxSequenceNumber();
+
+    final RocksDBCheckpointDiffer dbCpDiffer =
+        omMetadataManager.getStore().getRocksDBCheckpointDiffer();
+
+    final DBCheckpoint dbCheckpoint = store.getSnapshot(
+        snapshotInfo.getCheckpointDirName());
+
+    // Write snapshot generation (latest sequence number) to compaction log.
+    // This will be used for DAG reconstruction as snapshotGeneration.
+    dbCpDiffer.appendSequenceNumberToCompactionLog(dbLatestSequenceNumber);
+
+    // Set compaction log filename to the latest DB sequence number
+    // right after taking the RocksDB checkpoint for Ozone snapshot.
+    //
+    // Note it doesn't matter if sequence number hasn't increased (even though
+    // it shouldn't happen), since the writer always appends the file.
+    dbCpDiffer.setCurrentCompactionLog(dbLatestSequenceNumber);
+
+    return dbCheckpoint;
   }
 
   // Get OmSnapshot if the keyname has ".snapshot" key indicator
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotCreateRequest.java
index 7efa4ab7f9..3c36845e19 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotCreateRequest.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone.om.request.snapshot;
 
 import com.google.common.base.Optional;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.OmUtils;
@@ -132,6 +133,13 @@ public class OMSnapshotCreateRequest extends OMClientRequest {
         throw new OMException("Snapshot already exists", FILE_ALREADY_EXISTS);
       }
 
+      // Note down RDB latest transaction sequence number, which is used
+      // as snapshot generation in the differ.
+      final long dbLatestSequenceNumber =
+          ((RDBStore) omMetadataManager.getStore()).getDb()
+              .getLatestSequenceNumber();
+      snapshotInfo.setDbTxSequenceNumber(dbLatestSequenceNumber);
+
       omMetadataManager.getSnapshotInfoTable()
           .addCacheEntry(new CacheKey<>(key),
             new CacheValue<>(Optional.of(snapshotInfo), transactionLogIndex));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org