You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by pr...@apache.org on 2022/09/26 17:22:24 UTC

[ozone] branch HDDS-6517-Snapshot updated: HDDS-7224. Create a new RocksDBCheckpoint Diff utility. (#3755)

This is an automated email from the ASF dual-hosted git repository.

prashantpogde pushed a commit to branch HDDS-6517-Snapshot
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-6517-Snapshot by this push:
     new 03dfa243ac HDDS-7224. Create a new RocksDBCheckpoint Diff utility. (#3755)
03dfa243ac is described below

commit 03dfa243ac835a69accee7656593fa4ff61f792d
Author: prashantpogde <pr...@gmail.com>
AuthorDate: Mon Sep 26 10:22:18 2022 -0700

    HDDS-7224. Create a new RocksDBCheckpoint Diff utility. (#3755)
    
    * HDDS-7224. Create a new RocksDBCheckpoint Diff utility.
    
    * HDDS-7224. Renamed RocksDBCheckpointDiffer to rocksdb-checkpoint-differ
    
    * HDDS-7224. Fixing CI errors.
    
    * HDDS-7224. Fixing CI errors.
    
    * Add license headers.
    
    Change-Id: I65f15273d97863bf2703debba0a0bdb6e256b9b8
    
    * findbugs: add empty hadoop-hdds/rocksdb-checkpoint-differ/dev-support/findbugsExcludeFile.xml
    
    Change-Id: Ia75af4a7ee3d33b5235572187d4cfd56e185586a
    
    * Add new dependency licenses (remove them later if unneeded).
    
    Change-Id: I57a7992912af330cbe4225c78f14755a150bdace
    
    * HDDS-7224. Fixing CI errors.
    
    * HDDS-7224. Fixing CI errors.
    
    * Empty commit. Looks like fingbugs is not picking up the correct line
    numbers.
    
    Change-Id: Iacec58a628daa0e502fdd578164245d1fa34b9aa
    
    * Address findbugs
    
    Change-Id: I2e73578328f5a14c73da756a0eb6a785c1dccdc4
    
    * artifactId rename to `rocksdb-checkpoint-differ`
    
    Change-Id: I11b58b682eabaa13b636ba6ae80015b2270df9e3
    
    * Correct file name to `RelationshipEdge`
    
    Change-Id: Idb148a86d11e4ba99925def26792d37608265f34
    
    * Checkstyle: unused import
    
    Change-Id: I227c24209d5360320f6a9c7d450dfb01611e8fe7
    
    * override enforcer rule in submodule `rocksdb-checkpoint-differ`
    
    Change-Id: I4ef2a68e9d799ab1bdcb80ab8136cb6aa2ccf600
    
    * Update jar-report.txt
    
    Change-Id: I26affe5da495969e7bff40f8590d812b5914c5e9
    
    * Update hadoop-hdds/pom.xml
    
    Co-authored-by: Doroszlai, Attila <64...@users.noreply.github.com>
    
    * Address Nanda's comments.
    
    Change-Id: I31206a447da88215e5fa5e9c4c0e216635235fec
    
    Co-authored-by: Prashant Pogde <pr...@apache.org>
    Co-authored-by: Prashant Pogde <pp...@cloudera.com>
    Co-authored-by: Siyao Meng <50...@users.noreply.github.com>
    Co-authored-by: Doroszlai, Attila <64...@users.noreply.github.com>
---
 .../common/src/main/resources/ozone-default.xml    |   9 +
 hadoop-hdds/framework/pom.xml                      |   7 +
 .../org/apache/hadoop/hdds/utils/db/DBStore.java   |   6 +
 .../hadoop/hdds/utils/db/DBStoreBuilder.java       |   8 +-
 .../org/apache/hadoop/hdds/utils/db/RDBStore.java  |  21 +-
 hadoop-hdds/pom.xml                                |   7 +
 hadoop-hdds/rocksdb-checkpoint-differ/README.md    |  18 +
 .../dev-support/findbugsExcludeFile.xml            |  19 +
 hadoop-hdds/rocksdb-checkpoint-differ/pom.xml      | 191 +++++
 .../apache/ozone/rocksdiff/RelationshipEdge.java   |  30 +
 .../ozone/rocksdiff/RocksDBCheckpointDiffer.java   | 817 +++++++++++++++++++++
 .../org/apache/ozone/rocksdiff/package-info.java   |  23 +
 .../rocksdiff/TestRocksDBCheckpointDiffer.java     | 340 +++++++++
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |   4 +
 hadoop-ozone/dist/src/main/license/bin/LICENSE.txt |   6 +
 hadoop-ozone/dist/src/main/license/jar-report.txt  |   6 +
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |   7 +-
 pom.xml                                            |   2 +-
 18 files changed, 1515 insertions(+), 6 deletions(-)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 9fa3dd3ab8..ac2b5e204b 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1927,6 +1927,15 @@
     </description>
   </property>
 
+  <property>
+    <name>ozone.om.fs.snapshot.max.limit</name>
+    <value>1000</value>
+    <tag>OZONE, OM, MANAGEMENT</tag>
+    <description>
+      The maximum number of filesystem snapshot allowed in an Ozone Manager.
+    </description>
+  </property>
+
   <property>
     <name>ozone.acl.authorizer.class</name>
     <value>org.apache.hadoop.ozone.security.acl.OzoneAccessAuthorizer</value>
diff --git a/hadoop-hdds/framework/pom.xml b/hadoop-hdds/framework/pom.xml
index f10cd3010c..712c061198 100644
--- a/hadoop-hdds/framework/pom.xml
+++ b/hadoop-hdds/framework/pom.xml
@@ -145,6 +145,13 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
       <groupId>org.junit.jupiter</groupId>
       <artifactId>junit-jupiter-params</artifactId>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.ozone</groupId>
+      <artifactId>rocksdb-checkpoint-differ</artifactId>
+      <version>${hdds.version}</version>
+    </dependency>
+
   </dependencies>
 
 
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
index f385da9b89..f44bc82897 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
@@ -27,6 +27,7 @@ import java.util.Map;
 
 import org.apache.hadoop.hdds.annotation.InterfaceStability;
 import org.apache.hadoop.hdds.utils.db.cache.TableCache;
+import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
 
 /**
  * The DBStore interface provides the ability to create Tables, which store
@@ -96,6 +97,11 @@ public interface DBStore extends Closeable, BatchOperationHandler {
    */
   void flushLog(boolean sync) throws IOException;
 
+  /**
+   * Returns the RocksDB checkpoint differ.
+   */
+  RocksDBCheckpointDiffer getRocksDBCheckpointDiffer();
+
   /**
    * Compact the entire database.
    *
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
index 6bd2881bbc..fd90343ae4 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
@@ -90,6 +90,7 @@ public final class DBStoreBuilder {
   private RocksDBConfiguration rocksDBConfiguration;
   // Flag to indicate if the RocksDB should be opened readonly.
   private boolean openReadOnly = false;
+  private int maxFSSnapshots = 0;
 
   /**
    * Create DBStoreBuilder from a generic DBDefinition.
@@ -190,7 +191,12 @@ public final class DBStoreBuilder {
     }
 
     return new RDBStore(dbFile, rocksDBOption, writeOptions, tableConfigs,
-        registry, openReadOnly);
+        registry, openReadOnly, maxFSSnapshots);
+  }
+
+  public DBStoreBuilder setMaxFSSnapshots(int maxFSSnapshots) {
+    this.maxFSSnapshots = maxFSSnapshots;
+    return this;
   }
 
   public DBStoreBuilder setName(String name) {
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
index a0ccd10e4f..4b98213af1 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedTransactionLogIterator;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions;
 import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
 
 import com.google.common.base.Preconditions;
 import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -61,17 +62,18 @@ public class RDBStore implements DBStore {
   private final String checkpointsParentDir;
   private final String snapshotsParentDir;
   private final RDBMetrics rdbMetrics;
+  private final RocksDBCheckpointDiffer rocksDBCheckpointDiffer;
 
   @VisibleForTesting
   public RDBStore(File dbFile, ManagedDBOptions options,
                   Set<TableConfig> families) throws IOException {
     this(dbFile, options, new ManagedWriteOptions(), families,
-        new CodecRegistry(), false);
+        new CodecRegistry(), false, 1000);
   }
 
   public RDBStore(File dbFile, ManagedDBOptions dbOptions,
                   ManagedWriteOptions writeOptions, Set<TableConfig> families,
-                  CodecRegistry registry, boolean readOnly)
+                  CodecRegistry registry, boolean readOnly, int maxFSSnapshots)
       throws IOException {
     Preconditions.checkNotNull(dbFile, "DB file location cannot be null");
     Preconditions.checkNotNull(families);
@@ -80,6 +82,14 @@ public class RDBStore implements DBStore {
     dbLocation = dbFile;
 
     try {
+      rocksDBCheckpointDiffer =
+          new RocksDBCheckpointDiffer(
+              dbLocation.getAbsolutePath(), maxFSSnapshots,
+          Paths.get(dbLocation.getParent(), "db.checkpoints").toString(),
+          Paths.get(dbLocation.getParent(), "db.savedSSTFiles").toString(),
+          dbLocation.getAbsolutePath(), 0, "Snapshot_");
+      rocksDBCheckpointDiffer.setRocksDBForCompactionTracking(dbOptions);
+
       db = RocksDatabase.open(dbFile, dbOptions, writeOptions,
           families, readOnly);
 
@@ -126,7 +136,7 @@ public class RDBStore implements DBStore {
       checkPointManager = new RDBCheckpointManager(db, dbLocation.getName());
       rdbMetrics = RDBMetrics.create();
 
-    } catch (IOException e) {
+    } catch (IOException | RocksDBException e) {
       String msg = "Failed init RocksDB, db path : " + dbFile.getAbsolutePath()
           + ", " + "exception :" + (e.getCause() == null ?
           e.getClass().getCanonicalName() + " " + e.getMessage() :
@@ -144,6 +154,11 @@ public class RDBStore implements DBStore {
     }
   }
 
+  @VisibleForTesting
+  public RocksDBCheckpointDiffer getRocksDBCheckpointDiffer() {
+    return rocksDBCheckpointDiffer;
+  }
+
   @Override
   public void compactDB() throws IOException {
     db.compactRange();
diff --git a/hadoop-hdds/pom.xml b/hadoop-hdds/pom.xml
index d981032018..fa4efa9fe5 100644
--- a/hadoop-hdds/pom.xml
+++ b/hadoop-hdds/pom.xml
@@ -39,6 +39,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
     <module>client</module>
     <module>common</module>
     <module>framework</module>
+    <module>rocksdb-checkpoint-differ</module>
     <module>container-service</module>
     <module>server-scm</module>
     <module>tools</module>
@@ -131,6 +132,12 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
         <version>${hdds.version}</version>
       </dependency>
 
+    <dependency>
+      <groupId>org.apache.ozone</groupId>
+      <artifactId>rocksdb-checkpoint-differ</artifactId>
+      <version>${hdds.version}</version>
+    </dependency>
+
       <dependency>
         <groupId>org.apache.ozone</groupId>
         <artifactId>hdds-server-scm</artifactId>
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/README.md b/hadoop-hdds/rocksdb-checkpoint-differ/README.md
new file mode 100644
index 0000000000..a8c0a43595
--- /dev/null
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/README.md
@@ -0,0 +1,18 @@
+<!---
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+--->
+
+# RocksDiff
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/dev-support/findbugsExcludeFile.xml b/hadoop-hdds/rocksdb-checkpoint-differ/dev-support/findbugsExcludeFile.xml
new file mode 100644
index 0000000000..55abc26301
--- /dev/null
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/dev-support/findbugsExcludeFile.xml
@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<FindBugsFilter>
+</FindBugsFilter>
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
new file mode 100644
index 0000000000..85a56f8cf6
--- /dev/null
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/pom.xml
@@ -0,0 +1,191 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+https://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.ozone</groupId>
+    <artifactId>hdds</artifactId>
+    <version>1.3.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>rocksdb-checkpoint-differ</artifactId>
+  <version>1.3.0-SNAPSHOT</version>
+  <description>RocksDB Checkpoint Differ</description>
+  <name>RocksDB Checkpoint Differ</name>
+  <packaging>jar</packaging>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.rocksdb</groupId>
+      <artifactId>rocksdbjni</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.github.vlsi.mxgraph</groupId>
+      <artifactId>jgraphx</artifactId>
+      <version>4.2.2</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.jgrapht</groupId>
+      <artifactId>jgrapht-core</artifactId>
+      <version>1.5.0</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.jgrapht</groupId>
+      <artifactId>jgrapht-guava</artifactId>
+      <version>1.5.0</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.jgrapht</groupId>
+      <artifactId>jgrapht-ext</artifactId>
+      <version>1.4.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.github.spotbugs</groupId>
+      <artifactId>spotbugs-annotations</artifactId>
+      <scope>compile</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <resources>
+      <resource>
+        <directory>${basedir}/src/main/resources</directory>
+        <excludes>
+          <exclude>ozone-version-info.properties</exclude>
+        </excludes>
+        <filtering>false</filtering>
+      </resource>
+      <resource>
+        <directory>${basedir}/src/main/resources</directory>
+        <includes>
+          <include>ozone-version-info.properties</include>
+        </includes>
+        <filtering>true</filtering>
+      </resource>
+    </resources>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-maven-plugins</artifactId>
+        <executions>
+          <execution>
+            <id>version-info</id>
+            <phase>generate-resources</phase>
+            <goals>
+              <goal>version-info</goal>
+            </goals>
+            <configuration>
+              <source>
+                <directory>${basedir}/../</directory>
+                <includes>
+                  <include>*/src/main/java/**/*.java</include>
+                  <include>*/src/main/proto/*.proto</include>
+                </includes>
+              </source>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>com.github.spotbugs</groupId>
+        <artifactId>spotbugs-maven-plugin</artifactId>
+        <configuration>
+          <excludeFilterFile>${basedir}/dev-support/findbugsExcludeFile.xml</excludeFilterFile>
+        </configuration>
+      </plugin>
+      <plugin>
+        <artifactId>maven-enforcer-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>depcheck</id>
+            <phase></phase>
+          </execution>
+          <execution>
+            <id>banned-rocksdb-imports</id>
+            <phase>process-sources</phase>
+            <goals>
+              <goal>enforce</goal>
+            </goals>
+            <configuration>
+              <rules>
+                <RestrictImports>
+                  <includeTestCode>false</includeTestCode>
+                  <reason>Use managed RocksObjects under org.apache.hadoop.hdds.utils.db.managed instead.</reason>
+                  <!-- By default, ban all the classes in org.rocksdb -->
+                  <bannedImport>org.rocksdb.**</bannedImport>
+                  <allowedImports>
+                    <allowedImport>org.rocksdb.AbstractEventListener</allowedImport>
+                    <allowedImport>org.rocksdb.Checkpoint</allowedImport>
+                    <allowedImport>org.rocksdb.CompactionJobInfo</allowedImport>
+                    <allowedImport>org.rocksdb.CompressionType</allowedImport>
+                    <allowedImport>org.rocksdb.DBOptions</allowedImport>
+                    <allowedImport>org.rocksdb.FlushOptions</allowedImport>
+                    <allowedImport>org.rocksdb.LiveFileMetaData</allowedImport>
+                    <allowedImport>org.rocksdb.Options</allowedImport>
+                    <allowedImport>org.rocksdb.RocksDB</allowedImport>
+                    <allowedImport>org.rocksdb.RocksDBException</allowedImport>
+                    <allowedImport>org.rocksdb.SstFileReader</allowedImport>
+                    <allowedImport>org.rocksdb.TableProperties</allowedImport>
+                  </allowedImports>
+                  <exclusion>org.apache.hadoop.hdds.utils.db.managed.*</exclusion>
+                </RestrictImports>
+              </rules>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+  <profiles>
+    <profile>
+      <id>k8s-dev</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>io.fabric8</groupId>
+            <artifactId>docker-maven-plugin</artifactId>
+            <configuration>
+              <images>
+                <image>
+                  <name>${user.name}/ozone:${project.version}</name>
+                  <build>
+                    <dockerFileDir>${project.basedir}</dockerFileDir>
+                  </build>
+                </image>
+              </images>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+</project>
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RelationshipEdge.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RelationshipEdge.java
new file mode 100644
index 0000000000..7e63a60e6c
--- /dev/null
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RelationshipEdge.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocksdiff;
+
+// import org.jgrapht.graph.DefaultEdge;
+// Enable this import and extend DefaultEdge if We need to
+// pcitorially represent the DAG constructed.
+
+//class RelationshipEdge extends DefaultEdge {
+class RelationshipEdge {
+  //@Override
+  public String toString() {
+    return "";
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
new file mode 100644
index 0000000000..32e9e268c0
--- /dev/null
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -0,0 +1,817 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ozone.rocksdiff;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+
+import org.rocksdb.AbstractEventListener;
+import org.rocksdb.Checkpoint;
+import org.rocksdb.CompactionJobInfo;
+import org.rocksdb.CompressionType;
+import org.rocksdb.DBOptions;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.LiveFileMetaData;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.SstFileReader;
+import org.rocksdb.TableProperties;
+
+import com.google.common.graph.GraphBuilder;
+import com.google.common.graph.MutableGraph;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+// TODO
+//  1. Create a local instance of RocksDiff-local-RocksDB. This is the
+//  rocksDB that we can use for maintaining DAG and any other state. This is
+//  a per node state so it it doesn't have to go through RATIS anyway.
+//  2. Store fwd DAG in Diff-Local-RocksDB in Compaction Listener
+//  3. Store fwd DAG in Diff-Local-RocksDB in Compaction Listener
+//  4. Store last-Snapshot-counter/Compaction-generation-counter in Diff-Local
+//  -RocksDB in Compaction Listener
+//  5. System Restart handling. Read the DAG from Disk and load it in memory.
+//  6. Take the base snapshot. All the SST file nodes in the base snapshot
+//  should be arked with that Snapshot generation. Subsequently, all SST file
+//  node should have a snapshot-generation count and Compaction-generation
+//  count.
+//  6. DAG based SST file pruning. Start from the oldest snapshot and we can
+//  unlink any SST
+//  file from the SaveCompactedFilePath directory that is reachable in the
+//  reverse DAG.
+//  7. DAG pruning : For each snapshotted bucket, We can recycle the part of
+//  the DAG that is older than the predefined policy for the efficient snapdiff.
+//  E.g. we may decide not to support efficient snapdiff from any snapshot that
+//  is older than 2 weeks.
+//  Note on 8. & 9 .
+//  ==================
+//  A simple handling is to just iterate over all keys in keyspace when the
+//  compaction DAG is lost, instead of optimizing every case. And start
+//  Compaction-DAG afresh from the latest snapshot.
+//  --
+//  8. Handle bootstrapping rocksDB for a new OM follower node
+//      - new node will receive Active object store as well as all existing
+//      rocksDB checkpoints.
+//      - This bootstrapping should also receive the compaction-DAG information
+//  9. Handle rebuilding the DAG for a lagging follower. There are two cases
+//      - recieve RATIS transactions to replay. Nothing needs to be done in
+//      thise case.
+//      - Getting the DB sync. This case needs to handle getting the
+//      compaction-DAG information as well.
+//
+//
+/**
+ *  RocksDBCheckpointDiffer class.
+ */
+//CHECKSTYLE:OFF
+@SuppressWarnings({"NM_METHOD_NAMING_CONVENTION"})
+public class RocksDBCheckpointDiffer {
+  private final String rocksDbPath;
+  private String cpPath;
+  private String cfDBPath;
+  private String saveCompactedFilePath;
+  private int maxSnapshots;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RocksDBCheckpointDiffer.class);
+
+  // keeps track of all the snapshots created so far.
+  private int lastSnapshotCounter;
+  private String lastSnapshotPrefix;
+
+  // tracks number of compactions so far
+  private static final long UNKNOWN_COMPACTION_GEN = 0;
+  private long currentCompactionGen = 0;
+
+  // Something to track all the snapshots created so far.
+  private Snapshot[] allSnapshots;
+
+  public RocksDBCheckpointDiffer(String dbPath,
+                                 int maxSnapshots,
+                                 String checkpointPath,
+                                 String sstFileSaveDir,
+                                 String cfPath,
+                                 int initialSnapshotCounter,
+                                 String snapPrefix) {
+    this.maxSnapshots = maxSnapshots;
+    allSnapshots = new Snapshot[this.maxSnapshots];
+    cpPath = checkpointPath;
+
+    saveCompactedFilePath = sstFileSaveDir;
+    rocksDbPath = dbPath;
+    cfDBPath = cfPath;
+
+    // TODO: This module should be self sufficient in tracking the last
+    //  snapshotCounter and currentCompactionGen for a given dbPath. It needs
+    //  to be persisted.
+    lastSnapshotCounter = initialSnapshotCounter;
+    lastSnapshotPrefix = snapPrefix;
+    currentCompactionGen = lastSnapshotCounter;
+
+    // TODO: this should also independently persist every compaction e.g.
+    //  (input files) ->
+    //  {  (output files) + lastSnapshotCounter + currentCompactionGen }
+    //  mapping.
+  }
+
+  // Node in the DAG to represent an SST file
+  private class CompactionNode {
+    public String fileName;   // Name of the SST file
+    public String snapshotId; // The last snapshot that was created before this
+    // node came into existance;
+    public long snapshotGeneration;
+    public long totalNumberOfKeys;
+    public long cumulativeKeysReverseTraversal;
+
+    CompactionNode (String f, String sid, long numKeys, long compactionGen) {
+      fileName = f;
+      snapshotId = sid;
+      snapshotGeneration = lastSnapshotCounter;
+      totalNumberOfKeys = numKeys;
+      cumulativeKeysReverseTraversal = 0;
+    }
+  }
+
+  private static class Snapshot {
+    String dbPath;
+    String snapshotID;
+    long snapshotGeneration;
+
+    Snapshot(String db, String id, long gen) {
+      dbPath = db;
+      snapshotID = id;
+      snapshotGeneration = gen;
+    }
+  }
+
+  public enum GType {FNAME, KEYSIZE, CUMUTATIVE_SIZE};
+
+
+  // Hash table to track Compaction node for a given SST File.
+  private ConcurrentHashMap<String, CompactionNode> compactionNodeTable =
+      new ConcurrentHashMap<>();
+
+  // We are mainiting a two way DAG. This allows easy traversal from
+  // source snapshot to destination snapshot as well as the other direction.
+  // TODO : Persist this information to the disk.
+  // TODO: A system crash while the edge is inserted in DAGFwd but not in
+  //  DAGReverse will compromise the two way DAG. Set of input/output files shud
+  //  be written to // disk(RocksDB) first, would avoid this problem.
+
+  private MutableGraph<CompactionNode> compactionDAGFwd =
+      GraphBuilder.directed().build();
+
+  private MutableGraph<CompactionNode> compactionDAGReverse =
+      GraphBuilder.directed().build();
+
+  public static final Integer DEBUG_DAG_BUILD_UP = 2;
+  public static final Integer DEBUG_DAG_TRAVERSAL = 3;
+  public static final Integer DEBUG_DAG_LIVE_NODES = 4;
+  public static final Integer DEBUG_READ_ALL_DB_KEYS = 5;
+  private static final HashSet<Integer> DEBUG_LEVEL = new HashSet<>();
+
+  static {
+    addDebugLevel(DEBUG_DAG_BUILD_UP);
+    addDebugLevel(DEBUG_DAG_TRAVERSAL);
+    addDebugLevel(DEBUG_DAG_LIVE_NODES);
+  }
+
+  static {
+    RocksDB.loadLibrary();
+  }
+
+  public static void addDebugLevel(Integer level) {
+    DEBUG_LEVEL.add(level);
+  }
+
+  // Flushes the WAL and Creates a RocksDB checkpoint
+  @SuppressWarnings({"NM_METHOD_NAMING_CONVENTION"})
+  public void createCheckPoint(String dbPathArg, String cpPathArg,
+                               RocksDB rocksDB) {
+    LOG.warn("Creating Checkpoint for RocksDB instance : " +
+        dbPathArg + "in a CheckPoint Location" + cpPathArg);
+    try {
+      rocksDB.flush(new FlushOptions());
+      Checkpoint cp = Checkpoint.create(rocksDB);
+      cp.createCheckpoint(cpPathArg);
+    } catch (RocksDBException e) {
+      throw new RuntimeException(e.getMessage());
+    }
+  }
+
+  public void setRocksDBForCompactionTracking(DBOptions rocksOptions)
+      throws RocksDBException {
+    setRocksDBForCompactionTracking(rocksOptions,
+        new ArrayList<AbstractEventListener>());
+  }
+
+  public void setRocksDBForCompactionTracking(
+      DBOptions rocksOptions, List<AbstractEventListener> list) {
+    final AbstractEventListener onCompactionCompletedListener =
+        new AbstractEventListener() {
+          @Override
+          @SuppressFBWarnings({
+              "AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
+              "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
+          public void onCompactionCompleted(
+              final RocksDB db, final CompactionJobInfo compactionJobInfo) {
+            synchronized (db) {
+              LOG.warn(compactionJobInfo.compactionReason().toString());
+              LOG.warn("List of input files:");
+              for (String file : compactionJobInfo.inputFiles()) {
+                LOG.warn(file);
+                String saveLinkFileName =
+                    saveCompactedFilePath + new File(file).getName();
+                Path link = Paths.get(saveLinkFileName);
+                Path srcFile = Paths.get(file);
+                try {
+                  Files.createLink(link, srcFile);
+                } catch (IOException e) {
+                  LOG.warn("Exception in creating hardlink");
+                  e.printStackTrace();
+                }
+              }
+              LOG.warn("List of output files:");
+              for (String file : compactionJobInfo.outputFiles()) {
+                LOG.warn(file + ",");
+              }
+              // Let us also build the graph
+              for (String outFilePath : compactionJobInfo.outputFiles()) {
+                String outfile =
+                    Paths.get(outFilePath).getFileName().toString();
+                CompactionNode outfileNode = compactionNodeTable.get(outfile);
+                if (outfileNode == null) {
+                  long numKeys = 0;
+                  try {
+                    numKeys = getSSTFileSummary(outfile);
+                  } catch (Exception e) {
+                    LOG.warn(e.getMessage());
+                  }
+                  outfileNode = new CompactionNode(outfile,
+                      lastSnapshotPrefix, numKeys,
+                      currentCompactionGen);
+                  compactionDAGFwd.addNode(outfileNode);
+                  compactionDAGReverse.addNode(outfileNode);
+                  compactionNodeTable.put(outfile, outfileNode);
+                }
+                for (String inFilePath : compactionJobInfo.inputFiles()) {
+                  String infile =
+                      Paths.get(inFilePath).getFileName().toString();
+                  CompactionNode infileNode = compactionNodeTable.get(infile);
+                  if (infileNode == null) {
+                    long numKeys = 0;
+                    try {
+                      numKeys = getSSTFileSummary(infile);
+                    } catch (Exception e) {
+                      LOG.warn(e.getMessage());
+                    }
+                    infileNode = new CompactionNode(infile,
+                        lastSnapshotPrefix,
+                        numKeys, UNKNOWN_COMPACTION_GEN);
+                    compactionDAGFwd.addNode(infileNode);
+                    compactionDAGReverse.addNode(infileNode);
+                    compactionNodeTable.put(infile, infileNode);
+                  }
+                  if (outfileNode.fileName.compareToIgnoreCase(
+                      infileNode.fileName) != 0) {
+                    compactionDAGFwd.putEdge(outfileNode, infileNode);
+                    compactionDAGReverse.putEdge(infileNode, outfileNode);
+                  }
+                }
+              }
+              if (debugEnabled(DEBUG_DAG_BUILD_UP)) {
+                printMutableGraph(null, null, compactionDAGFwd);
+              }
+            }
+          }
+        };
+
+    list.add(onCompactionCompletedListener);
+    rocksOptions.setListeners(list);
+  }
+
+
+
+  public void setRocksDBForCompactionTracking(Options rocksOptions)
+      throws RocksDBException {
+    setRocksDBForCompactionTracking(rocksOptions,
+        new ArrayList<AbstractEventListener>());
+  }
+
+  public void setRocksDBForCompactionTracking(
+      Options rocksOptions, List<AbstractEventListener> list) {
+    final AbstractEventListener onCompactionCompletedListener =
+        new AbstractEventListener() {
+          @Override
+          @SuppressFBWarnings({
+              "AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
+              "NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
+          public void onCompactionCompleted(
+              final RocksDB db,final CompactionJobInfo compactionJobInfo) {
+            synchronized (db) {
+              LOG.warn(compactionJobInfo.compactionReason().toString());
+              LOG.warn("List of input files:");
+              for (String file : compactionJobInfo.inputFiles()) {
+                LOG.warn(file);
+                String saveLinkFileName =
+                    saveCompactedFilePath + new File(file).getName();
+                Path link = Paths.get(saveLinkFileName);
+                Path srcFile = Paths.get(file);
+                try {
+                  Files.createLink(link, srcFile);
+                } catch (IOException e) {
+                  LOG.warn("Exception in creating hardlink");
+                  e.printStackTrace();
+                }
+              }
+              LOG.warn("List of output files:");
+              for (String file : compactionJobInfo.outputFiles()) {
+                LOG.warn(file);
+              }
+              // Let us also build the graph
+              for (String outFilePath : compactionJobInfo.outputFiles()) {
+                String outfile =
+                    Paths.get(outFilePath).getFileName().toString();
+                CompactionNode outfileNode = compactionNodeTable.get(outfile);
+                if (outfileNode == null) {
+                  long numKeys = 0;
+                  try {
+                    numKeys = getSSTFileSummary(outfile);
+                  } catch (Exception e) {
+                    LOG.warn(e.getMessage());
+                  }
+                  outfileNode = new CompactionNode(outfile,
+                      lastSnapshotPrefix,
+                      numKeys, currentCompactionGen);
+                  compactionDAGFwd.addNode(outfileNode);
+                  compactionDAGReverse.addNode(outfileNode);
+                  compactionNodeTable.put(outfile, outfileNode);
+                }
+                for (String inFilePath : compactionJobInfo.inputFiles()) {
+                  String infile =
+                      Paths.get(inFilePath).getFileName().toString();
+                  CompactionNode infileNode = compactionNodeTable.get(infile);
+                  if (infileNode == null) {
+                    long numKeys = 0;
+                    try {
+                      numKeys = getSSTFileSummary(infile);
+                    } catch (Exception e) {
+                      LOG.warn(e.getMessage());
+                    }
+                    infileNode = new CompactionNode(infile,
+                        lastSnapshotPrefix, numKeys,
+                        UNKNOWN_COMPACTION_GEN);
+                    compactionDAGFwd.addNode(infileNode);
+                    compactionDAGReverse.addNode(infileNode);
+                    compactionNodeTable.put(infile, infileNode);
+                  }
+                  if (outfileNode.fileName.compareToIgnoreCase(
+                      infileNode.fileName) != 0) {
+                    compactionDAGFwd.putEdge(outfileNode, infileNode);
+                    compactionDAGReverse.putEdge(infileNode, outfileNode);
+                  }
+                }
+              }
+              if (debugEnabled(DEBUG_DAG_BUILD_UP)) {
+                printMutableGraph(null, null,
+                    compactionDAGFwd);
+              }
+            }
+          }
+        };
+
+    list.add(onCompactionCompletedListener);
+    rocksOptions.setListeners(list);
+  }
+
+  public RocksDB getRocksDBInstanceWithCompactionTracking(String dbPath)
+      throws RocksDBException {
+    final Options opt = new Options().setCreateIfMissing(true)
+        .setCompressionType(CompressionType.NO_COMPRESSION);
+    opt.setMaxBytesForLevelMultiplier(2);
+    setRocksDBForCompactionTracking(opt);
+    return RocksDB.open(opt, dbPath);
+  }
+
+  // Get a summary of a given SST file
+  public long getSSTFileSummary(String filename)
+      throws RocksDBException {
+    Options option = new Options();
+    SstFileReader reader = new SstFileReader(option);
+    try {
+      reader.open(saveCompactedFilePath + filename);
+    } catch (RocksDBException e) {
+      reader.open(rocksDbPath + "/"+ filename);
+    }
+    TableProperties properties = reader.getTableProperties();
+    LOG.warn("getSSTFileSummary " + filename + ":: " +
+        properties.getNumEntries());
+    return properties.getNumEntries();
+  }
+
+  // Read the current Live manifest for a given RocksDB instance (Active or
+  // Checkpoint). Returns the list of currently active SST FileNames.
+  @SuppressFBWarnings({"NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
+  public HashSet<String> readRocksDBLiveFiles(String dbPathArg) {
+    RocksDB rocksDB = null;
+    HashSet<String> liveFiles = new HashSet<>();
+    //
+    try (final Options options =
+             new Options().setParanoidChecks(true)
+                 .setCreateIfMissing(true)
+                 .setCompressionType(CompressionType.NO_COMPRESSION)
+                 .setForceConsistencyChecks(false)) {
+      rocksDB = RocksDB.openReadOnly(options, dbPathArg);
+      List<LiveFileMetaData> liveFileMetaDataList =
+          rocksDB.getLiveFilesMetaData();
+      LOG.warn("Live File Metadata for DB: " + dbPathArg);
+      for (LiveFileMetaData m : liveFileMetaDataList) {
+        LOG.warn("\tFile :" + m.fileName());
+        LOG.warn("\tLevel :" + m.level());
+        liveFiles.add(Paths.get(m.fileName()).getFileName().toString());
+      }
+    } catch (RocksDBException e) {
+      e.printStackTrace();
+    } finally {
+      if (rocksDB != null) {
+        rocksDB.close();
+      }
+    }
+    return liveFiles;
+  }
+
+  // Given the src and destination Snapshots, it prints a Diff list.
+  private synchronized void printSnapdiffSSTFiles(
+      Snapshot src, Snapshot dest) throws RocksDBException {
+    LOG.warn("Src Snapshot files :" + src.dbPath);
+    HashSet<String> srcSnapFiles = readRocksDBLiveFiles(src.dbPath);
+    LOG.warn("dest Snapshot files :" + dest.dbPath);
+    HashSet<String> destSnapFiles = readRocksDBLiveFiles(dest.dbPath);
+
+    HashSet<String> fwdDAGSameFiles = new HashSet<>();
+    HashSet<String> fwdDAGDifferentFiles = new HashSet<>();
+
+    LOG.warn("Doing forward diff between source and destination " +
+        "Snapshots:" + src.dbPath + ", " + dest.dbPath);
+    realPrintSnapdiffSSTFiles(src, dest, srcSnapFiles, destSnapFiles,
+        compactionDAGFwd,
+        fwdDAGSameFiles,
+        fwdDAGDifferentFiles);
+
+    LOG.warn("Overall Summary \n" +
+            "Doing Overall diff between source and destination Snapshots:" +
+        src.dbPath + ", " + dest.dbPath);
+    System.out.print("fwd DAG Same files :");
+    for (String file : fwdDAGSameFiles) {
+      System.out.print(file + ", ");
+    }
+    LOG.warn("");
+    System.out.print("\nFwd DAG Different files :");
+    for (String file : fwdDAGDifferentFiles) {
+      CompactionNode n = compactionNodeTable.get(file);
+      System.out.print(file + ", ");
+    }
+    LOG.warn("");
+  }
+
+  @SuppressFBWarnings({"NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
+  public synchronized void realPrintSnapdiffSSTFiles(
+      Snapshot src, Snapshot dest,
+      HashSet<String> srcSnapFiles,
+      HashSet<String> destSnapFiles,
+      MutableGraph<CompactionNode> mutableGraph,
+      HashSet<String> sameFiles, HashSet<String> differentFiles) {
+
+
+    for (String fileName : srcSnapFiles) {
+      if (destSnapFiles.contains(fileName)) {
+        LOG.warn("SrcSnapshot : " + src.dbPath + " and Dest " +
+            "Snapshot" + dest.dbPath + " Contain Same file " + fileName);
+        sameFiles.add(fileName);
+        continue;
+      }
+      CompactionNode infileNode =
+          compactionNodeTable.get(Paths.get(fileName).getFileName().toString());
+      if (infileNode == null) {
+        LOG.warn("SrcSnapshot : " + src.dbPath + "File " + fileName + "was " +
+            "never compacted");
+        differentFiles.add(fileName);
+        continue;
+      }
+      System.out.print(" Expandin File:" + fileName + ":\n");
+      Set<CompactionNode> nextLevel = new HashSet<>();
+      nextLevel.add(infileNode);
+      Set<CompactionNode> currentLevel = new HashSet<>();
+      currentLevel.addAll(nextLevel);
+      nextLevel = new HashSet<>();
+      int i = 1;
+      while (currentLevel.size() != 0) {
+        LOG.warn("DAG Level :" + i++);
+        for (CompactionNode current : currentLevel) {
+          LOG.warn("acknowledging file " + current.fileName);
+          if (current.snapshotGeneration <= dest.snapshotGeneration) {
+            LOG.warn("Reached dest generation count. SrcSnapshot : " +
+                src.dbPath + " and Dest " + "Snapshot" + dest.dbPath +
+                " Contain Diffrent file " + current.fileName);
+            differentFiles.add(current.fileName);
+            continue;
+          }
+          Set<CompactionNode> successors = mutableGraph.successors(current);
+          if (successors == null || successors.size() == 0) {
+            LOG.warn("No further compaction for the file" +
+                ".SrcSnapshot : " + src.dbPath + " and Dest " +
+                "Snapshot" + dest.dbPath + " Contain Diffrent file " +
+                current.fileName);
+            differentFiles.add(current.fileName);
+          } else {
+            for (CompactionNode oneSucc : successors) {
+              if (sameFiles.contains(oneSucc.fileName) ||
+                  differentFiles.contains(oneSucc.fileName)) {
+                LOG.warn("Skipping file :" + oneSucc.fileName);
+                continue;
+              }
+              if (destSnapFiles.contains(oneSucc.fileName)) {
+                LOG.warn("SrcSnapshot : " + src.dbPath + " and Dest " +
+                    "Snapshot" + dest.dbPath + " Contain Same file " +
+                    oneSucc.fileName);
+                sameFiles.add(oneSucc.fileName);
+                continue;
+              } else {
+                LOG.warn("SrcSnapshot : " + src.dbPath + " and Dest " +
+                    "Snapshot" + dest.dbPath + " Contain Diffrent file " +
+                    oneSucc.fileName);
+                nextLevel.add(oneSucc);
+              }
+            }
+          }
+        }
+        currentLevel = new HashSet<>();
+        currentLevel.addAll(nextLevel);
+        nextLevel = new HashSet<>();
+        LOG.warn("");
+      }
+    }
+    LOG.warn("Summary :");
+    for (String file : sameFiles) {
+      System.out.print("Same File : " + file);
+    }
+    LOG.warn("");
+
+    for (String file : differentFiles) {
+      System.out.print("Different File : " + file);
+    }
+    LOG.warn("");
+  }
+
+  @SuppressFBWarnings("SIC_INNER_SHOULD_BE_STATIC")
+  class NodeComparator implements Comparator<CompactionNode>
+  {
+    public int compare(CompactionNode a, CompactionNode b)
+    {
+      return a.fileName.compareToIgnoreCase(b.fileName);
+    }
+
+    @Override
+    public Comparator<CompactionNode> reversed() {
+      return null;
+    }
+  }
+
+
+  public void dumpCompactioNodeTable() {
+    List<CompactionNode> nodeList =
+        compactionNodeTable.values().stream().collect(Collectors.toList());
+    Collections.sort(nodeList, new NodeComparator());
+    for (CompactionNode n : nodeList ) {
+      LOG.warn("File : " + n.fileName + " :: Total keys : "
+          + n.totalNumberOfKeys);
+      LOG.warn("File : " + n.fileName + " :: Cumulative keys : "  +
+          n.cumulativeKeysReverseTraversal);
+    }
+  }
+
+  @SuppressFBWarnings({"NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"})
+  public synchronized void printMutableGraphFromAGivenNode(
+      String fileName, int level, MutableGraph<CompactionNode> mutableGraph) {
+    CompactionNode infileNode =
+        compactionNodeTable.get(Paths.get(fileName).getFileName().toString());
+    if (infileNode == null) {
+      return;
+    }
+    System.out.print("\nCompaction Level : " + level + " Expandin File:" +
+        fileName + ":\n");
+    Set<CompactionNode> nextLevel = new HashSet<>();
+    nextLevel.add(infileNode);
+    Set<CompactionNode> currentLevel = new HashSet<>();
+    currentLevel.addAll(nextLevel);
+    int i = 1;
+    while (currentLevel.size() != 0) {
+      LOG.warn("DAG Level :" + i++);
+      for (CompactionNode current : currentLevel) {
+        Set<CompactionNode> successors = mutableGraph.successors(current);
+        for (CompactionNode oneSucc : successors) {
+          System.out.print(oneSucc.fileName + " ");
+          nextLevel.add(oneSucc);
+        }
+      }
+      currentLevel = new HashSet<>();
+      currentLevel.addAll(nextLevel);
+      nextLevel = new HashSet<>();
+      LOG.warn("");
+    }
+  }
+
+  public synchronized void printMutableGraph(
+      String srcSnapId, String destSnapId,
+      MutableGraph<CompactionNode> mutableGraph) {
+    LOG.warn("Printing the Graph");
+    Set<CompactionNode> topLevelNodes = new HashSet<>();
+    Set<CompactionNode> allNodes = new HashSet<>();
+    for (CompactionNode n : mutableGraph.nodes()) {
+      if (srcSnapId == null ||
+          n.snapshotId.compareToIgnoreCase(srcSnapId) == 0) {
+        topLevelNodes.add(n);
+      }
+    }
+    Iterator iter = topLevelNodes.iterator();
+    while (iter.hasNext()) {
+      CompactionNode n = (CompactionNode) iter.next();
+      Set<CompactionNode> succ = mutableGraph.successors(n);
+      LOG.warn("Parent Node :" + n.fileName);
+      if (succ.size() == 0) {
+        LOG.warn("No Children Node ");
+        allNodes.add(n);
+        iter.remove();
+        iter = topLevelNodes.iterator();
+        continue;
+      }
+      for (CompactionNode oneSucc : succ) {
+        LOG.warn("Children Node :" + oneSucc.fileName);
+        if (srcSnapId == null||
+            oneSucc.snapshotId.compareToIgnoreCase(destSnapId) == 0) {
+          allNodes.add(oneSucc);
+        } else {
+          topLevelNodes.add(oneSucc);
+        }
+      }
+      iter.remove();
+      iter = topLevelNodes.iterator();
+    }
+    LOG.warn("src snap:" + srcSnapId);
+    LOG.warn("dest snap:" + destSnapId);
+    for (CompactionNode n : allNodes) {
+      LOG.warn("Files are :" + n.fileName);
+    }
+  }
+
+
+  public void createSnapshot(RocksDB rocksDB) throws InterruptedException {
+
+    LOG.warn("Current time is::" + System.currentTimeMillis());
+    long t1 = System.currentTimeMillis();
+
+    cpPath = cpPath + lastSnapshotCounter;
+    createCheckPoint(rocksDbPath, cpPath, rocksDB);
+    allSnapshots[lastSnapshotCounter] = new Snapshot(cpPath,
+    lastSnapshotPrefix, lastSnapshotCounter);
+
+    long t2 = System.currentTimeMillis();
+    LOG.warn("Current time is::" + t2);
+
+    LOG.warn("millisecond difference is ::" + (t2 - t1));
+   Thread.sleep(100);
+   ++lastSnapshotCounter;
+   lastSnapshotPrefix = "sid_" + lastSnapshotCounter;
+   LOG.warn("done :: 1");
+  }
+
+
+  public void printAllSnapshots() throws InterruptedException {
+    for (Snapshot snap : allSnapshots) {
+      if (snap == null) {
+        break;
+      }
+      LOG.warn("Snapshot id" + snap.snapshotID);
+      LOG.warn("Snapshot path" + snap.dbPath);
+      LOG.warn("Snapshot Generation" + snap.snapshotGeneration);
+      LOG.warn("");
+    }
+  }
+
+  public void diffAllSnapshots() throws InterruptedException, RocksDBException {
+    for (Snapshot snap : allSnapshots) {
+      if (snap == null) {
+        break;
+      }
+      printSnapdiffSSTFiles(allSnapshots[lastSnapshotCounter - 1], snap);
+    }
+  }
+
+  public MutableGraph<CompactionNode> getCompactionFwdDAG() {
+    return compactionDAGFwd;
+  }
+
+  public MutableGraph<CompactionNode> getCompactionReverseDAG() {
+    return compactionDAGFwd;
+  }
+
+  public synchronized void traverseGraph(
+      MutableGraph<CompactionNode> reverseMutableGraph,
+      MutableGraph<CompactionNode> fwdMutableGraph) {
+
+      List<CompactionNode> nodeList =
+        compactionNodeTable.values().stream().collect(Collectors.toList());
+    Collections.sort(nodeList, new NodeComparator());
+
+    for (CompactionNode  infileNode : nodeList ) {
+      // fist go through fwdGraph to find nodes that don't have succesors.
+      // These nodes will be the top level nodes in reverse graph
+      Set<CompactionNode> successors = fwdMutableGraph.successors(infileNode);
+      if (successors == null || successors.size() == 0) {
+        LOG.warn("traverseGraph : No successors. cumulative " +
+            "keys : " + infileNode.cumulativeKeysReverseTraversal + "::total " +
+            "keys ::" + infileNode.totalNumberOfKeys);
+        infileNode.cumulativeKeysReverseTraversal =
+            infileNode.totalNumberOfKeys;
+      }
+    }
+
+    HashSet<CompactionNode> visited = new HashSet<>();
+    for (CompactionNode  infileNode : nodeList ) {
+      if (visited.contains(infileNode)) {
+        continue;
+      }
+      visited.add(infileNode);
+      System.out.print("traverseGraph: Visiting node " + infileNode.fileName +
+          ":\n");
+      Set<CompactionNode> nextLevel = new HashSet<>();
+      nextLevel.add(infileNode);
+      Set<CompactionNode> currentLevel = new HashSet<>();
+      currentLevel.addAll(nextLevel);
+      nextLevel = new HashSet<>();
+      int i = 1;
+      while (currentLevel.size() != 0) {
+        LOG.warn("traverseGraph : DAG Level :" + i++);
+        for (CompactionNode current : currentLevel) {
+          LOG.warn("traverseGraph : expanding node " + current.fileName);
+          Set<CompactionNode> successors =
+              reverseMutableGraph.successors(current);
+          if (successors == null || successors.size() == 0) {
+            LOG.warn("traverseGraph : No successors. cumulative " +
+                "keys : " + current.cumulativeKeysReverseTraversal);
+          } else {
+            for (CompactionNode oneSucc : successors) {
+              LOG.warn("traverseGraph : Adding to the next level : " +
+                  oneSucc.fileName);
+              LOG.warn("traverseGraph : " + oneSucc.fileName + "cum" + " keys"
+                  + oneSucc.cumulativeKeysReverseTraversal + "parent" + " " +
+                  current.fileName + " total " + current.totalNumberOfKeys);
+              oneSucc.cumulativeKeysReverseTraversal +=
+                  current.cumulativeKeysReverseTraversal;
+              nextLevel.add(oneSucc);
+            }
+          }
+        }
+        currentLevel = new HashSet<>();
+        currentLevel.addAll(nextLevel);
+        nextLevel = new HashSet<>();
+        LOG.warn("");
+      }
+    }
+  }
+
+  public boolean debugEnabled(Integer level) {
+    return DEBUG_LEVEL.contains(level);
+  }
+}
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/package-info.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/package-info.java
new file mode 100644
index 0000000000..1a51012839
--- /dev/null
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone;
+
+/**
+ * Generic ozone specific classes.
+ */
\ No newline at end of file
diff --git a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
new file mode 100644
index 0000000000..a94fb32236
--- /dev/null
+++ b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ozone.rocksdiff;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DEBUG_DAG_LIVE_NODES;
+import static org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DEBUG_READ_ALL_DB_KEYS;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompressionType;
+import org.rocksdb.DBOptions;
+import org.rocksdb.LiveFileMetaData;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+////CHECKSTYLE:OFF
+public class TestRocksDBCheckpointDiffer {
+
+  private static final String dbPath   = "./rocksdb-data";
+  private static final int NUM_ROW = 25000000;
+  private static final int SNAPSHOT_EVERY_SO_MANY_KEYS = 999999;
+
+  // keeps track of all the snapshots created so far.
+  private static int lastSnapshotCounter = 0;
+  private static String lastSnapshotPrefix = "snap_id_";
+
+
+  public static void main(String[] args) throws Exception {
+    TestRocksDBCheckpointDiffer tester= new TestRocksDBCheckpointDiffer();
+    RocksDBCheckpointDiffer differ = new RocksDBCheckpointDiffer(
+        "./rocksdb-data",
+        1000,
+        "./rocksdb-data-cp",
+        "./SavedCompacted_Files/",
+        "./rocksdb-data-cf/",
+        0,
+        "snap_id_");
+    lastSnapshotPrefix = "snap_id_" + lastSnapshotCounter;
+    RocksDB rocksDB = tester.createRocksDBInstance(dbPath, differ);
+    Thread.sleep(10000);
+
+    tester.readRocksDBInstance(dbPath, rocksDB, null, differ);
+    differ.printAllSnapshots();
+    differ.traverseGraph(
+        differ.getCompactionReverseDAG(),
+        differ.getCompactionFwdDAG());
+    differ.diffAllSnapshots();
+    differ.dumpCompactioNodeTable();
+    for (RocksDBCheckpointDiffer.GType gtype :
+        RocksDBCheckpointDiffer.GType.values()) {
+      String fname = "fwdGraph_" + gtype.toString() +  ".png";
+      String rname = "reverseGraph_"+ gtype.toString() + ".png";
+
+      //differ.pngPrintMutableGrapth(differ.getCompactionFwdDAG(),
+      //  fname, gtype);
+      //differ.pngPrintMutableGrapth(differ.getCompactionReverseDAG(), rname,
+      //    gtype);
+    }
+    rocksDB.close();
+  }
+
+  private String getRandomString(Random random, int length) {
+    // Ref: https://www.baeldung.com/java-random-string
+    final int leftLimit = 48; // numeral '0'
+    final int rightLimit = 122; // letter 'z'
+
+    return random.ints(leftLimit, rightLimit + 1)
+        .filter(i -> (i <= 57 || i >= 65) && (i <= 90 || i >= 97))
+        .limit(7)
+        .collect(StringBuilder::new,
+            StringBuilder::appendCodePoint, StringBuilder::append)
+        .toString();
+  }
+
+  //  Test Code to create sample RocksDB instance.
+  public RocksDB createRocksDBInstance(String dbPathArg,
+                                       RocksDBCheckpointDiffer differ)
+      throws RocksDBException, InterruptedException {
+
+    System.out.println("Creating RocksDB instance at :" + dbPathArg);
+
+    RocksDB rocksDB = null;
+    rocksDB = differ.getRocksDBInstanceWithCompactionTracking(dbPathArg);
+
+    Random random = new Random();
+    // key-value
+    for (int i = 0; i < NUM_ROW; ++i) {
+      String generatedString = getRandomString(random, 7);
+      String keyStr = " My" + generatedString + "StringKey" + i;
+      String valueStr = " My " + generatedString + "StringValue" + i;
+      byte[] key = keyStr.getBytes(UTF_8);
+      rocksDB.put(key, valueStr.getBytes(UTF_8));
+      if (i % SNAPSHOT_EVERY_SO_MANY_KEYS == 0) {
+        differ.createSnapshot(rocksDB);
+      }
+      //System.out.println(toStr(rocksDB.get(key));
+    }
+    differ.createSnapshot(rocksDB);
+    return rocksDB;
+  }
+
+  //  RocksDB.DEFAULT_COLUMN_FAMILY
+  private void UpdateRocksDBInstance(String dbPathArg, RocksDB rocksDB) {
+    System.out.println("Updating RocksDB instance at :" + dbPathArg);
+    //
+    try (final Options options =
+             new Options().setCreateIfMissing(true).
+                 setCompressionType(CompressionType.NO_COMPRESSION)) {
+      if (rocksDB == null) {
+        rocksDB = RocksDB.open(options, dbPathArg);
+      }
+
+      Random random = new Random();
+      // key-value
+      for (int i = 0; i< NUM_ROW; ++i) {
+        String generatedString = getRandomString(random, 7);
+        String keyStr = " MyUpdated" + generatedString + "StringKey" + i;
+        String valueStr = " My Updated" + generatedString + "StringValue" + i;
+        byte[] key = keyStr.getBytes(UTF_8);
+        rocksDB.put(key, valueStr.getBytes(UTF_8));
+        System.out.println(toStr(rocksDB.get(key)));
+      }
+    } catch (RocksDBException e) {
+      e.printStackTrace();
+    }
+  }
+
+  //  RocksDB.DEFAULT_COLUMN_FAMILY
+  public void testDefaultColumnFamilyOriginal() {
+    System.out.println("testDefaultColumnFamily begin...");
+    //
+    try (final Options options = new Options().setCreateIfMissing(true)) {
+      try (final RocksDB rocksDB = RocksDB.open(options, "./rocksdb-data")) {
+        // key-value
+        byte[] key = "Hello".getBytes(UTF_8);
+        rocksDB.put(key, "World".getBytes(UTF_8));
+
+        System.out.println(toStr(rocksDB.get(key)));
+
+        rocksDB.put("SecondKey".getBytes(UTF_8), "SecondValue".getBytes(UTF_8));
+
+        // List
+        List<byte[]> keys = Arrays.asList(key, "SecondKey".getBytes(UTF_8),
+            "missKey".getBytes(UTF_8));
+        List<byte[]> values = rocksDB.multiGetAsList(keys);
+        for (int i = 0; i < keys.size(); i++) {
+          System.out.println("multiGet " + toStr(keys.get(i)) + ":" +
+              (values.get(i) != null ? toStr(values.get(i)) : null));
+        }
+
+        // [key - value]
+        RocksIterator iter = rocksDB.newIterator();
+        for (iter.seekToFirst(); iter.isValid(); iter.next()) {
+          System.out.println("iterator key:" + toStr(iter.key()) + ", " +
+              "iter value:" + toStr(iter.value()));
+        }
+
+        // key
+        rocksDB.delete(key);
+        System.out.println("after remove key:" + toStr(key));
+
+        iter = rocksDB.newIterator();
+        for (iter.seekToFirst(); iter.isValid(); iter.next()) {
+          System.out.println("iterator key:" + toStr(iter.key()) + ", " +
+              "iter value:" + toStr(iter.value()));
+        }
+      }
+    } catch (RocksDBException e) {
+      e.printStackTrace();
+    }
+  }
+
+  // (table)
+  public void testCertainColumnFamily() {
+    System.out.println("\ntestCertainColumnFamily begin...");
+    try (final ColumnFamilyOptions cfOpts =
+             new ColumnFamilyOptions().optimizeUniversalStyleCompaction()) {
+      String cfName = "my-first-columnfamily";
+      // list of column family descriptors, first entry must always be
+      // default column family
+      final List<ColumnFamilyDescriptor> cfDescriptors = Arrays.asList(
+          new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOpts),
+          new ColumnFamilyDescriptor(cfName.getBytes(UTF_8), cfOpts)
+      );
+
+      List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
+      try (final DBOptions dbOptions = new DBOptions().setCreateIfMissing(true).
+          setCreateMissingColumnFamilies(true);
+           final RocksDB rocksDB = RocksDB.open(dbOptions, "./rocksdb-data-cf" +
+               "/", cfDescriptors, cfHandles)) {
+        ColumnFamilyHandle cfHandle = cfHandles.stream().filter(x -> {
+          try {
+            return (toStr(x.getName())).equals(cfName);
+          } catch (RocksDBException e) {
+            return false;
+          }
+        }).collect(Collectors.toList()).get(0);
+
+        // key/value
+        String key = "FirstKey";
+        rocksDB.put(cfHandles.get(0), key.getBytes(UTF_8),
+            "FirstValue".getBytes(UTF_8));
+        // key
+        byte[] getValue = rocksDB.get(cfHandles.get(0), key.getBytes(UTF_8));
+        System.out.println("get Value : " + toStr(getValue));
+        // key/value
+        rocksDB.put(cfHandles.get(1), "SecondKey".getBytes(UTF_8),
+            "SecondValue".getBytes(UTF_8));
+
+        List<byte[]> keys = Arrays.asList(key.getBytes(UTF_8),
+            "SecondKey".getBytes(UTF_8));
+        List<ColumnFamilyHandle> cfHandleList = Arrays.asList(cfHandle,
+            cfHandle);
+        // key
+        List<byte[]> values = rocksDB.multiGetAsList(cfHandleList, keys);
+        for (int i = 0; i < keys.size(); i++) {
+          System.out.println("multiGet:" + toStr(keys.get(i)) + "--" +
+              (values.get(i) == null ? null : toStr(values.get(i))));
+        }
+        //rocksDB.compactRange();
+        //rocksDB.compactFiles();
+        List<LiveFileMetaData> liveFileMetaDataList =
+            rocksDB.getLiveFilesMetaData();
+        for (LiveFileMetaData m : liveFileMetaDataList) {
+          System.out.println("Live File Metadata");
+          System.out.println("\tFile :" + m.fileName());
+          System.out.println("\ttable :" + toStr(m.columnFamilyName()));
+          System.out.println("\tKey Range :" + toStr(m.smallestKey()) +
+              " " + "<->" + toStr(m.largestKey()));
+        }
+        // key
+        rocksDB.delete(cfHandle, key.getBytes(UTF_8));
+
+        // key
+        RocksIterator iter = rocksDB.newIterator(cfHandle);
+        for (iter.seekToFirst(); iter.isValid(); iter.next()) {
+          System.out.println("iterator:" + toStr(iter.key()) + ":" +
+              toStr(iter.value()));
+        }
+      } finally {
+        // NOTE frees the column family handles before freeing the db
+        for (final ColumnFamilyHandle cfHandle : cfHandles) {
+          cfHandle.close();
+        }
+      }
+    } catch (RocksDBException e) {
+      e.printStackTrace();
+    } // frees the column family options
+  }
+
+  // Read from a given RocksDB instance and optionally write all the
+  // keys to a given file.
+  //
+  public void readRocksDBInstance(String dbPathArg, RocksDB rocksDB,
+                                  FileWriter file,
+                                  RocksDBCheckpointDiffer differ) {
+    System.out.println("Reading RocksDB instance at : " + dbPathArg);
+    boolean createdDB = false;
+    //
+    try (final Options options =
+             new Options().setParanoidChecks(true)
+                 .setCreateIfMissing(true)
+                 .setCompressionType(CompressionType.NO_COMPRESSION)
+                 .setForceConsistencyChecks(false)) {
+
+      if (rocksDB == null) {
+        rocksDB = RocksDB.openReadOnly(options, dbPathArg);
+        createdDB = true;
+      }
+
+      List<LiveFileMetaData> liveFileMetaDataList =
+          rocksDB.getLiveFilesMetaData();
+      for (LiveFileMetaData m : liveFileMetaDataList) {
+        System.out.println("Live File Metadata");
+        System.out.println("\tFile : " + m.fileName());
+        System.out.println("\tLevel : " + m.level());
+        System.out.println("\tTable : " + toStr(m.columnFamilyName()));
+        System.out.println("\tKey Range : " + toStr(m.smallestKey())
+            + " <-> " + toStr(m.largestKey()));
+        if (differ.debugEnabled(DEBUG_DAG_LIVE_NODES)) {
+          differ.printMutableGraphFromAGivenNode(m.fileName(), m.level(),
+              differ.getCompactionFwdDAG());
+        }
+      }
+
+      if(differ.debugEnabled(DEBUG_READ_ALL_DB_KEYS)) {
+        RocksIterator iter = rocksDB.newIterator();
+        for (iter.seekToFirst(); iter.isValid(); iter.next()) {
+          System.out.println("iterator key:" + toStr(iter.key()) + ", " +
+              "iter value:" + toStr(iter.value()));
+          if (file != null) {
+            file.write("iterator key:" + toStr(iter.key()) + ", iter " +
+                "value:" + toStr(iter.value()));
+            file.write("\n");
+          }
+        }
+      }
+    } catch (IOException | RocksDBException e) {
+      e.printStackTrace();
+    } finally {
+      if (createdDB){
+        rocksDB.close();
+      }
+    }
+  }
+
+  /**
+   * Return String object encoded in UTF-8 from a byte array.
+   */
+  private String toStr(byte[] bytes) {
+    return new String(bytes, UTF_8);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 99d56d2b02..fb71fb78f7 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -207,6 +207,10 @@ public final class OMConfigKeys {
       OZONE_OM_SNAPSHOT_PROVIDER_REQUEST_TIMEOUT_DEFAULT =
       TimeDuration.valueOf(5000, TimeUnit.MILLISECONDS);
 
+  public static final String OZONE_OM_FS_SNAPSHOT_MAX_LIMIT =
+      "ozone.om.fs.snapshot.max.limit";
+  public static final int OZONE_OM_FS_SNAPSHOT_MAX_LIMIT_DEFAULT = 1000;
+
   public static final String OZONE_OM_KERBEROS_KEYTAB_FILE_KEY = "ozone.om."
       + "kerberos.keytab.file";
   public static final String OZONE_OM_KERBEROS_PRINCIPAL_KEY = "ozone.om"
diff --git a/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt b/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
index 1e147aa5a5..b000b623cd 100644
--- a/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
+++ b/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
@@ -226,6 +226,8 @@ BSD
 =====================
 
    org.codehaus.woodstox:stax2-api
+   com.github.vlsi.mxgraph:jgraphx
+
 
 CDDL
 =====================
@@ -337,6 +339,7 @@ Apache License
    org.hamcrest:hamcrest-all
    org.javassist:javassist
    org.jboss.weld.servlet:weld-servlet
+   org.jheaps:jheaps
    org.jooq:jooq
    org.jooq:jooq-codegen
    org.jooq:jooq-meta
@@ -365,6 +368,9 @@ EPL 2.0
 
    jakarta.annotation:jakarta.annotation-api
    jakarta.ws.rs:jakarta.ws.rs-api
+   org.jgrapht:jgrapht-core
+   org.jgrapht:jgrapht-guava
+   org.jgrapht:jgrapht-ext
 
 
 CDDL + GPLv2 with classpath exception
diff --git a/hadoop-ozone/dist/src/main/license/jar-report.txt b/hadoop-ozone/dist/src/main/license/jar-report.txt
index 21a7448531..9581b6dbfb 100644
--- a/hadoop-ozone/dist/src/main/license/jar-report.txt
+++ b/hadoop-ozone/dist/src/main/license/jar-report.txt
@@ -145,6 +145,11 @@ share/ozone/lib/jetty-util-ajax.jar
 share/ozone/lib/jetty-util.jar
 share/ozone/lib/jetty-webapp.jar
 share/ozone/lib/jetty-xml.jar
+share/ozone/lib/jgrapht-core.jar
+share/ozone/lib/jgrapht-ext.jar
+share/ozone/lib/jgrapht-guava.jar
+share/ozone/lib/jgraphx.jar
+share/ozone/lib/jheaps.jar
 share/ozone/lib/jmespath-java.jar
 share/ozone/lib/jna.jar
 share/ozone/lib/jna-platform.jar
@@ -250,6 +255,7 @@ share/ozone/lib/ratis-thirdparty-misc.jar
 share/ozone/lib/ratis-tools.jar
 share/ozone/lib/re2j.jar
 share/ozone/lib/reflections.jar
+share/ozone/lib/rocksdb-checkpoint-differ.jar
 share/ozone/lib/rocksdbjni.jar
 share/ozone/lib/simpleclient_common.jar
 share/ozone/lib/simpleclient_dropwizard.jar
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index cf067b282c..cdaef81a15 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -94,6 +94,8 @@ import org.apache.commons.lang3.StringUtils;
 import static org.apache.hadoop.ozone.OzoneConsts.DB_TRANSIENT_MARKER;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_FS_SNAPSHOT_MAX_LIMIT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_FS_SNAPSHOT_MAX_LIMIT_DEFAULT;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
 
@@ -418,11 +420,14 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
 
   public static DBStore loadDB(OzoneConfiguration configuration, File metaDir,
       String dbName) throws IOException {
+    final int maxFSSnapshots = configuration.getInt(
+        OZONE_OM_FS_SNAPSHOT_MAX_LIMIT, OZONE_OM_FS_SNAPSHOT_MAX_LIMIT_DEFAULT);
     RocksDBConfiguration rocksDBConfiguration =
         configuration.getObject(RocksDBConfiguration.class);
     DBStoreBuilder dbStoreBuilder = DBStoreBuilder.newBuilder(configuration,
         rocksDBConfiguration).setName(dbName)
-        .setPath(Paths.get(metaDir.getPath()));
+        .setPath(Paths.get(metaDir.getPath()))
+        .setMaxFSSnapshots(maxFSSnapshots);
     DBStore dbStore = addOMTablesAndCodecs(dbStoreBuilder).build();
     return dbStore;
   }
diff --git a/pom.xml b/pom.xml
index e767e77b87..95ce2294d6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1630,7 +1630,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
       <dependency>
         <groupId>org.rocksdb</groupId>
         <artifactId>rocksdbjni</artifactId>
-        <version>6.29.5</version>
+        <version>7.4.5</version> 
       </dependency>
       <dependency>
         <groupId>org.xerial</groupId>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org