You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by si...@apache.org on 2023/03/16 09:10:34 UTC

[ozone] branch master updated: HDDS-7548. [Snapshot] SnapDiff pagination and partial diff report implementation (#4360)

This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 29acb2b188 HDDS-7548. [Snapshot] SnapDiff pagination and partial diff report implementation (#4360)
29acb2b188 is described below

commit 29acb2b1883ba76223c75f295ef1220d2a3c41ae
Author: Hemant Kumar <he...@gmail.com>
AuthorDate: Thu Mar 16 02:10:27 2023 -0700

    HDDS-7548. [Snapshot] SnapDiff pagination and partial diff report implementation (#4360)
---
 .../hadoop/hdds/utils/db/DBStoreBuilder.java       |   2 +-
 .../hdds/utils/db/managed/ManagedRocksDB.java      |   7 -
 .../org/apache/ozone/test/GenericTestUtils.java    |  15 +
 .../hadoop/ozone/snapshot/SnapshotDiffReport.java  |  10 +-
 .../apache/hadoop/ozone/om/OmSnapshotManager.java  | 258 ++++++++++++--
 .../hadoop/ozone/om/snapshot/PersistentList.java   |   2 -
 .../hadoop/ozone/om/snapshot/PersistentMap.java    |   2 +
 .../ozone/om/snapshot/RocksDbPersistentList.java   |  40 +--
 .../ozone/om/snapshot/RocksDbPersistentMap.java    |  15 +-
 .../ozone/om/snapshot/RocksDbPersistentSet.java    |   4 +-
 .../ozone/om/snapshot/SnapshotDiffManager.java     | 388 +++++++++++++--------
 .../om/snapshot/TestRocksDbPersistentList.java     | 122 ++++---
 .../om/snapshot/TestRocksDbPersistentMap.java      | 129 +++----
 .../om/snapshot/TestRocksDbPersistentSet.java      | 138 ++++----
 14 files changed, 711 insertions(+), 421 deletions(-)

diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
index c3b0151cb9..84daa45bd2 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
@@ -74,7 +74,7 @@ public final class DBStoreBuilder {
   public static final Logger ROCKS_DB_LOGGER =
       LoggerFactory.getLogger(ManagedRocksDB.ORIGINAL_CLASS);
 
-  private static final String DEFAULT_COLUMN_FAMILY_NAME =
+  public static final String DEFAULT_COLUMN_FAMILY_NAME =
       StringUtils.bytes2String(DEFAULT_COLUMN_FAMILY);
 
   // DB PKIProfile used by ROCKDB instances.
diff --git a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksDB.java b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksDB.java
index 954d7f355d..5413423508 100644
--- a/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksDB.java
+++ b/hadoop-hdds/managed-rocksdb/src/main/java/org/apache/hadoop/hdds/utils/db/managed/ManagedRocksDB.java
@@ -77,11 +77,4 @@ public class ManagedRocksDB extends ManagedObject<RocksDB> {
         RocksDB.open(path, columnFamilyDescriptors, columnFamilyHandles)
     );
   }
-
-  public static ManagedRocksDB open(
-      final ManagedOptions managedOptions,
-      final String path
-  ) throws RocksDBException {
-    return new ManagedRocksDB(RocksDB.open(managedOptions, path));
-  }
 }
diff --git a/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/GenericTestUtils.java b/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/GenericTestUtils.java
index 771f5137a4..01abe67291 100644
--- a/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/GenericTestUtils.java
+++ b/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/GenericTestUtils.java
@@ -293,6 +293,21 @@ public abstract class GenericTestUtils {
             .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
   }
 
+  /***
+   * Removed all files and dirs in the given dir recursively.
+   */
+  public static boolean deleteDirectory(File dir) {
+    File[] allContents = dir.listFiles();
+    if (allContents != null) {
+      for (File content : allContents) {
+        if (!deleteDirectory(content)) {
+          return false;
+        }
+      }
+    }
+    return dir.delete();
+  }
+
   /**
    * Class to capture logs for doing assertions.
    */
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffReport.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffReport.java
index 83b52db6bf..04732fa1a4 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffReport.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/snapshot/SnapshotDiffReport.java
@@ -186,19 +186,20 @@ public class SnapshotDiffReport {
   /**
    * subsequent token for the diff report.
    */
-  // TODO: [SNAPSHOT] will set it properly in HDDS-7548
-  private final String token = null;
+  private final String token;
 
   public SnapshotDiffReport(final String volumeName,
                             final String bucketName,
                             final String fromSnapshot,
                             final String toSnapshot,
-                            final List<DiffReportEntry> entryList) {
+                            final List<DiffReportEntry> entryList,
+                            final String token) {
     this.volumeName = volumeName;
     this.bucketName = bucketName;
     this.fromSnapshot = fromSnapshot;
     this.toSnapshot = toSnapshot;
     this.diffList = entryList != null ? entryList : Collections.emptyList();
+    this.token = token;
   }
 
   public List<DiffReportEntry> getDiffList() {
@@ -247,7 +248,8 @@ public class SnapshotDiffReport {
         report.getToSnapshot(),
         report.getDiffListList().stream()
             .map(DiffReportEntry::fromProtobuf)
-            .collect(Collectors.toList()));
+            .collect(Collectors.toList()),
+        report.getToken());
   }
 
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index ffad3677da..3f43fa623d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -18,25 +18,34 @@
 
 package org.apache.hadoop.ozone.om;
 
+import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.cache.RemovalListener;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.StringUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
 import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.hdds.utils.db.RocksDatabase;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
-import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
-import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
@@ -46,12 +55,16 @@ import org.apache.hadoop.ozone.om.snapshot.SnapshotDiffManager;
 import org.apache.hadoop.ozone.snapshot.SnapshotDiffReport;
 import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse;
 import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.hadoop.hdds.utils.db.DBStoreBuilder.DEFAULT_COLUMN_FAMILY_NAME;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIFF_DB_NAME;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_INDICATOR;
@@ -71,25 +84,86 @@ public final class OmSnapshotManager implements AutoCloseable {
   private static final long DB_TABLE_ITER_LOOP_THRESHOLD_NS = 100000;
 
   private final OzoneManager ozoneManager;
-  private final OMMetadataManager omMetadataManager;
   private final SnapshotDiffManager snapshotDiffManager;
   private final LoadingCache<String, OmSnapshot> snapshotCache;
-  private final ManagedRocksDB snapshotDiffDb;
+  private ManagedRocksDB snapshotDiffDb;
+
+  /**
+   * Contains all the snap diff job which are either queued, in_progress or
+   * done. This table is used to make sure that there is only single job for
+   * requests with the same snapshot pair at any point of time.
+   * |----------------------------------------------|
+   * |  KEY                         |  VALUE        |
+   * |----------------------------------------------|
+   * |  fromSnapshotId-toSnapshotId | snapDiffJobId |
+   * |----------------------------------------------|
+   */
+  private static final String SNAP_DIFF_JOB_TABLE_NAME =
+      "snap-diff-job-table";
+
+  /**
+   * Global table to keep the diff report. Each key is prefixed by the jobId
+   * to improve look up and clean up. JobId comes from snap-diff-job-table.
+   * |--------------------------------|
+   * |  KEY         |  VALUE          |
+   * |--------------------------------|
+   * |  jobId-index | DiffReportEntry |
+   * |--------------------------------|
+   */
+  private static final String SNAP_DIFF_REPORT_TABLE_NAME =
+      "snap-diff-report-table";
+
+  private final ManagedColumnFamilyOptions columnFamilyOptions;
+  private final ManagedDBOptions options;
+
+  // TODO: [SNAPSHOT] create config for max allowed page size.
+  private final int maxPageSize = 1000;
 
   OmSnapshotManager(OzoneManager ozoneManager) {
-    this.ozoneManager = ozoneManager;
-    this.omMetadataManager = ozoneManager.getMetadataManager();
+    this.options = new ManagedDBOptions();
+    this.options.setCreateIfMissing(true);
+    this.columnFamilyOptions = new ManagedColumnFamilyOptions();
 
-    // Pass in the differ
-    final RocksDBCheckpointDiffer differ = ozoneManager
-            .getMetadataManager()
-            .getStore()
-            .getRocksDBCheckpointDiffer();
+    List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
+    List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
+    ColumnFamilyHandle snapDiffJobCf;
+    ColumnFamilyHandle snapDiffReportCf;
+    String dbPath = getDbPath(ozoneManager.getConfiguration());
+
+    try {
+      // Add default CF
+      columnFamilyDescriptors.add(new ColumnFamilyDescriptor(
+          StringUtils.string2Bytes(DEFAULT_COLUMN_FAMILY_NAME),
+          new ManagedColumnFamilyOptions(columnFamilyOptions)));
+
+      columnFamilyDescriptors.addAll(getExitingColumnFamilyDescriptors(dbPath));
+
+      this.snapshotDiffDb = createRocksDbForSnapshotDiff(options,
+          dbPath, columnFamilyDescriptors, columnFamilyHandles);
+
+      snapDiffJobCf = getOrCreateColumnFamily(SNAP_DIFF_JOB_TABLE_NAME,
+              columnFamilyDescriptors, columnFamilyHandles);
+      snapDiffReportCf = getOrCreateColumnFamily(SNAP_DIFF_REPORT_TABLE_NAME,
+              columnFamilyDescriptors, columnFamilyHandles);
+
+    } catch (RuntimeException exception) {
+      closeRocksDbObjects(options,
+          columnFamilyOptions,
+          columnFamilyDescriptors,
+          columnFamilyHandles,
+          snapshotDiffDb);
+      throw exception;
+    }
+
+    this.ozoneManager = ozoneManager;
+    RocksDBCheckpointDiffer differ = ozoneManager
+        .getMetadataManager()
+        .getStore()
+        .getRocksDBCheckpointDiffer();
 
-    this.snapshotDiffDb =
-        createDbForSnapshotDiff(ozoneManager.getConfiguration());
     this.snapshotDiffManager = new SnapshotDiffManager(snapshotDiffDb, differ,
-        ozoneManager.getConfiguration());
+        ozoneManager.getConfiguration(), snapDiffJobCf, snapDiffReportCf,
+        columnFamilyOptions);
 
     // size of lru cache
     int cacheSize = ozoneManager.getConfiguration().getInt(
@@ -154,6 +228,7 @@ public final class OmSnapshotManager implements AutoCloseable {
                 notification.getKey(), e);
           }
         };
+
     // init LRU cache
     snapshotCache = CacheBuilder.newBuilder()
         .maximumSize(cacheSize)
@@ -292,7 +367,7 @@ public final class OmSnapshotManager implements AutoCloseable {
 
   // Get OmSnapshot if the keyname has ".snapshot" key indicator
   public IOmMetadataReader checkForSnapshot(String volumeName,
-                                     String bucketName, String keyname)
+                                            String bucketName, String keyname)
       throws IOException {
     if (keyname == null) {
       return ozoneManager.getOmMetadataReader();
@@ -353,27 +428,31 @@ public final class OmSnapshotManager implements AutoCloseable {
         (keyParts[0].compareTo(OM_SNAPSHOT_INDICATOR) == 0);
   }
 
-  // TODO: [SNAPSHOT] Will pass token and page size to snapshotDiffManager in
-  //  HDDS-7548
   public SnapshotDiffResponse getSnapshotDiffReport(final String volume,
                                                     final String bucket,
                                                     final String fromSnapshot,
                                                     final String toSnapshot,
                                                     final String token,
-                                                    final int pageSize)
+                                                    int pageSize)
       throws IOException {
     // Validate fromSnapshot and toSnapshot
     final SnapshotInfo fsInfo = getSnapshotInfo(volume, bucket, fromSnapshot);
     final SnapshotInfo tsInfo = getSnapshotInfo(volume, bucket, toSnapshot);
     verifySnapshotInfoForSnapDiff(fsInfo, tsInfo);
 
+    int index = getIndexFromToken(token);
+    if (pageSize <= 0 || pageSize > maxPageSize) {
+      pageSize = maxPageSize;
+    }
+
     final String fsKey = SnapshotInfo.getTableKey(volume, bucket, fromSnapshot);
     final String tsKey = SnapshotInfo.getTableKey(volume, bucket, toSnapshot);
     try {
       final OmSnapshot fs = snapshotCache.get(fsKey);
       final OmSnapshot ts = snapshotCache.get(tsKey);
-      SnapshotDiffReport snapshotDiffReport = snapshotDiffManager
-          .getSnapshotDiffReport(volume, bucket, fs, ts, fsInfo, tsInfo);
+      SnapshotDiffReport snapshotDiffReport =
+          snapshotDiffManager.getSnapshotDiffReport(volume, bucket, fs, ts,
+              fsInfo, tsInfo, index, pageSize);
       return new SnapshotDiffResponse(snapshotDiffReport, DONE, 0L);
     } catch (ExecutionException | RocksDBException e) {
       throw new IOException(e.getCause());
@@ -385,7 +464,7 @@ public final class OmSnapshotManager implements AutoCloseable {
       throws IOException {
     if ((fromSnapshot.getSnapshotStatus() != SnapshotStatus.SNAPSHOT_ACTIVE) ||
         (toSnapshot.getSnapshotStatus() != SnapshotStatus.SNAPSHOT_ACTIVE)) {
-      // TODO: throw custom snapshot exception.
+      // TODO: [SNAPSHOT] Throw custom snapshot exception.
       throw new IOException("Cannot generate snapshot diff for non-active " +
           "snapshots.");
     }
@@ -395,29 +474,146 @@ public final class OmSnapshotManager implements AutoCloseable {
     }
   }
 
-  private ManagedRocksDB createDbForSnapshotDiff(OzoneConfiguration config) {
-    final ManagedOptions managedOptions = new ManagedOptions();
-    managedOptions.setCreateIfMissing(true);
+  private int getIndexFromToken(final String token) throws IOException {
+    if (isBlank(token)) {
+      return 0;
+    }
 
-    final File dbDirPath =
-        ServerUtils.getDBPath(config, OZONE_OM_SNAPSHOT_DIFF_DB_DIR);
+    // Validate that token passed in the request is valid integer as of now.
+    // Later we can change it if we migrate to encrypted or cursor token.
+    try {
+      int index = Integer.parseInt(token);
+      if (index < 0) {
+        throw new IOException("Passed token is invalid. Resend the request " +
+            "with valid token returned in previous request.");
+      }
+      return index;
+    } catch (NumberFormatException exception) {
+      throw new IOException("Passed token is invalid. " +
+          "Resend the request with valid token returned in previous request.");
+    }
+  }
 
-    String dbPath = Paths.get(dbDirPath.toString(), OM_SNAPSHOT_DIFF_DB_NAME)
-        .toFile()
-        .getAbsolutePath();
+  private ManagedRocksDB createRocksDbForSnapshotDiff(
+      final ManagedDBOptions dbOptions, String dbPath,
+      final List<ColumnFamilyDescriptor> familyDescriptors,
+      final List<ColumnFamilyHandle> familyHandles
+  ) {
+    try {
+      return ManagedRocksDB.open(dbOptions,
+          dbPath,
+          familyDescriptors,
+          familyHandles);
+    } catch (RocksDBException exception) {
+      // TODO: [SNAPSHOT] Fail gracefully.
+      throw new RuntimeException(exception);
+    }
+  }
+
+  private String getDbPath(final OzoneConfiguration config) {
+    File dbDirPath = ServerUtils.getDBPath(config,
+        OZONE_OM_SNAPSHOT_DIFF_DB_DIR);
+    return Paths.get(dbDirPath.toString(), OM_SNAPSHOT_DIFF_DB_NAME)
+        .toFile().getAbsolutePath();
+  }
+
+  private List<ColumnFamilyDescriptor> getExitingColumnFamilyDescriptors(
+      final String path) {
+    try {
+      return RocksDatabase.listColumnFamiliesEmptyOptions(path)
+          .stream()
+          .map(columnFamilyName ->
+              new ColumnFamilyDescriptor(columnFamilyName,
+                  new ManagedColumnFamilyOptions(columnFamilyOptions)
+              ))
+          .collect(Collectors.toList());
+    } catch (RocksDBException exception) {
+      // TODO: [SNAPSHOT] Fail gracefully.
+      throw new RuntimeException(exception);
+    }
+  }
+
+  /**
+   * Return the column family from column family list if it was existing
+   * column family, otherwise create new column family.
+   * This is for backward and forward compatibility.
+   * For backward compatibility, when column family doesn't exist. it will
+   * create new one and return that.
+   * For forward compatibility, it will return the existing one.
+   */
+  private ColumnFamilyHandle getOrCreateColumnFamily(
+      final String columnFamilyName,
+      final List<ColumnFamilyDescriptor> familyDescriptors,
+      final List<ColumnFamilyHandle> familyHandles) {
+
+    for (int i = 0; i < familyDescriptors.size(); i++) {
+      String cfName = StringUtils.bytes2String(familyDescriptors.get(i)
+          .getName());
+      if (columnFamilyName.equals(cfName)) {
+        return familyHandles.get(i);
+      }
+    }
 
     try {
-      return ManagedRocksDB.open(managedOptions, dbPath);
+      ColumnFamilyDescriptor columnFamilyDescriptor =
+          new ColumnFamilyDescriptor(StringUtils.string2Bytes(columnFamilyName),
+              columnFamilyOptions);
+      ColumnFamilyHandle columnFamily = snapshotDiffDb.get()
+          .createColumnFamily(columnFamilyDescriptor);
+
+      // Add column family and descriptor so that they can be closed if needed.
+      familyHandles.add(columnFamily);
+      familyDescriptors.add(columnFamilyDescriptor);
+      return columnFamily;
     } catch (RocksDBException exception) {
-      // TODO: Fail gracefully.
+      // TODO: [SNAPSHOT] Fail gracefully.
       throw new RuntimeException(exception);
     }
   }
 
+  private void closeRocksDbObjects(
+      final ManagedDBOptions managedDBOptions,
+      final ManagedColumnFamilyOptions managedColumnFamilyOptions,
+      final List<ColumnFamilyDescriptor> columnFamilyDescriptors,
+      final List<ColumnFamilyHandle> columnFamilyHandles,
+      final ManagedRocksDB managedRocksDB) {
+
+    if (columnFamilyHandles != null) {
+      columnFamilyHandles.forEach(ColumnFamilyHandle::close);
+    }
+    if (managedRocksDB != null) {
+      managedRocksDB.close();
+    }
+    if (columnFamilyDescriptors != null) {
+      columnFamilyDescriptors.forEach(columnFamilyDescriptor ->
+          closeColumnFamilyOptions((ManagedColumnFamilyOptions)
+              columnFamilyDescriptor.getOptions()));
+    }
+    if (managedColumnFamilyOptions != null) {
+      closeColumnFamilyOptions(managedColumnFamilyOptions);
+    }
+    if (managedDBOptions != null) {
+      managedDBOptions.close();
+    }
+  }
+
+  private void closeColumnFamilyOptions(
+      final ManagedColumnFamilyOptions managedColumnFamilyOptions) {
+    Preconditions.checkArgument(!managedColumnFamilyOptions.isReused());
+    ManagedColumnFamilyOptions.closeDeeply(managedColumnFamilyOptions);
+  }
+
   @Override
   public void close() {
+    if (columnFamilyOptions != null) {
+      closeColumnFamilyOptions(columnFamilyOptions);
+    }
+
     if (snapshotDiffDb != null) {
       snapshotDiffDb.close();
     }
+    if (options != null) {
+      options.close();
+    }
   }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/PersistentList.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/PersistentList.java
index 56e544a758..1279697ec6 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/PersistentList.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/PersistentList.java
@@ -32,6 +32,4 @@ public interface PersistentList<E> {
   E get(int index);
 
   Iterator<E> iterator();
-
-  Iterator<E> iterator(int index);
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/PersistentMap.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/PersistentMap.java
index 11ea43224a..a2cf8a924b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/PersistentMap.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/PersistentMap.java
@@ -26,4 +26,6 @@ public interface PersistentMap<K, V> {
   V get(K key);
 
   void put(K key, V value);
+
+  void remove(K key);
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/RocksDbPersistentList.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/RocksDbPersistentList.java
index e7b9f4954c..4b842d4343 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/RocksDbPersistentList.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/RocksDbPersistentList.java
@@ -56,7 +56,7 @@ public class RocksDbPersistentList<E> implements PersistentList<E> {
       db.get().put(columnFamilyHandle, rawKey, rawValue);
       return true;
     } catch (IOException | RocksDBException exception) {
-      // TODO:: Fail gracefully.
+      // TODO: [SNAPSHOT] Fail gracefully.
       throw new RuntimeException(exception);
     }
   }
@@ -74,7 +74,7 @@ public class RocksDbPersistentList<E> implements PersistentList<E> {
       byte[] rawValue = db.get().get(columnFamilyHandle, rawKey);
       return codecRegistry.asObject(rawValue, entryType);
     } catch (IOException | RocksDBException exception) {
-      // TODO:: Fail gracefully.
+      // TODO: [SNAPSHOT] Fail gracefully.
       throw new RuntimeException(exception);
     }
   }
@@ -98,41 +98,7 @@ public class RocksDbPersistentList<E> implements PersistentList<E> {
         try {
           return codecRegistry.asObject(rawKey, entryType);
         } catch (IOException exception) {
-          // TODO:: Fail gracefully.
-          throw new RuntimeException(exception);
-        }
-      }
-    };
-  }
-
-  @Override
-  public Iterator<E> iterator(int index) {
-    byte[] target;
-    try {
-      target = codecRegistry.asRawData(index);
-    } catch (IOException e) {
-      // TODO:: Fail gracefully.
-      throw new RuntimeException(e);
-    }
-
-    ManagedRocksIterator managedRocksIterator =
-        new ManagedRocksIterator(db.get().newIterator(columnFamilyHandle));
-    managedRocksIterator.get().seek(target);
-
-    return new Iterator<E>() {
-      @Override
-      public boolean hasNext() {
-        return managedRocksIterator.get().isValid();
-      }
-
-      @Override
-      public E next() {
-        byte[] rawKey = managedRocksIterator.get().value();
-        managedRocksIterator.get().next();
-        try {
-          return codecRegistry.asObject(rawKey, entryType);
-        } catch (IOException exception) {
-          // TODO:: Fail gracefully.
+          // TODO: [SNAPSHOT] Fail gracefully.
           throw new RuntimeException(exception);
         }
       }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/RocksDbPersistentMap.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/RocksDbPersistentMap.java
index 9130411737..6f4cd1fba0 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/RocksDbPersistentMap.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/RocksDbPersistentMap.java
@@ -53,7 +53,7 @@ public class RocksDbPersistentMap<K, V> implements PersistentMap<K, V> {
       byte[] rawValue = db.get().get(columnFamilyHandle, rawKey);
       return codecRegistry.asObject(rawValue, valueType);
     } catch (IOException | RocksDBException exception) {
-      // TODO:: Fail gracefully.
+      // TODO: [SNAPSHOT] Fail gracefully.
       throw new RuntimeException(exception);
     }
   }
@@ -65,7 +65,18 @@ public class RocksDbPersistentMap<K, V> implements PersistentMap<K, V> {
       byte[] rawValue = codecRegistry.asRawData(value);
       db.get().put(columnFamilyHandle, rawKey, rawValue);
     } catch (IOException | RocksDBException exception) {
-      // TODO:: Fail gracefully.
+      // TODO: [SNAPSHOT] Fail gracefully.
+      throw new RuntimeException(exception);
+    }
+  }
+
+  @Override
+  public void remove(K key) {
+    try {
+      byte[] rawKey = codecRegistry.asRawData(key);
+      db.get().delete(columnFamilyHandle, rawKey);
+    } catch (IOException | RocksDBException exception) {
+      // TODO: [SNAPSHOT] Fail gracefully.
       throw new RuntimeException(exception);
     }
   }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/RocksDbPersistentSet.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/RocksDbPersistentSet.java
index 40336abfdc..7f1b538660 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/RocksDbPersistentSet.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/RocksDbPersistentSet.java
@@ -54,7 +54,7 @@ public class RocksDbPersistentSet<E> implements PersistentSet<E> {
       byte[] rawValue = codecRegistry.asRawData(emptyByteArray);
       db.get().put(columnFamilyHandle, rawKey, rawValue);
     } catch (IOException | RocksDBException exception) {
-      // TODO:: Fail gracefully.
+      // TODO: [SNAPSHOT] Fail gracefully.
       throw new RuntimeException(exception);
     }
   }
@@ -78,7 +78,7 @@ public class RocksDbPersistentSet<E> implements PersistentSet<E> {
         try {
           return codecRegistry.asObject(rawKey, entryType);
         } catch (IOException exception) {
-          // TODO:: Fail gracefully.
+          // TODO: [SNAPSHOT] Fail gracefully.
           throw new RuntimeException(exception);
         }
       }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
index 58006e108f..f2a47b9916 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
@@ -18,8 +18,20 @@
 
 package org.apache.hadoop.ozone.om.snapshot;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.StringUtils;
 import org.apache.hadoop.hdds.utils.db.CodecRegistry;
 import org.apache.hadoop.hdds.utils.db.IntegerCodec;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -37,9 +49,8 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.helpers.WithObjectID;
 import org.apache.hadoop.ozone.snapshot.SnapshotDiffReport;
-import org.apache.hadoop.ozone.snapshot.SnapshotDiffReport.DiffType;
 import org.apache.hadoop.ozone.snapshot.SnapshotDiffReport.DiffReportEntry;
-
+import org.apache.hadoop.ozone.snapshot.SnapshotDiffReport.DiffType;
 import org.apache.ozone.rocksdb.util.ManagedSstFileReader;
 import org.apache.ozone.rocksdb.util.RdbUtil;
 import org.apache.ozone.rocksdiff.DifferSnapshotInfo;
@@ -52,46 +63,75 @@ import org.rocksdb.RocksDBException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Stream;
-
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 
 /**
  * Class to generate snapshot diff.
  */
 public class SnapshotDiffManager {
-
   private static final Logger LOG =
           LoggerFactory.getLogger(SnapshotDiffManager.class);
+  private static final String DELIMITER = "-";
+  private static final String FROM_SNAP_TABLE_SUFFIX = "-from-snap";
+  private static final String TO_SNAP_TABLE_SUFFIX = "-to-snap";
+  private static final String UNIQUE_IDS_TABLE_SUFFIX = "-unique-ids";
+  private static final String DELETE_DIFF_TABLE_SUFFIX = "-delete-diff";
+  private static final String RENAME_DIFF_TABLE_SUFFIX = "-rename-diff";
+  private static final String CREATE_DIFF_TABLE_SUFFIX = "-create-diff";
+  private static final String MODIFY_DIFF_TABLE_SUFFIX = "-modify-diff";
+
   private final RocksDBCheckpointDiffer differ;
   private final ManagedRocksDB db;
   private final CodecRegistry codecRegistry;
+  private final ManagedColumnFamilyOptions familyOptions;
+
+  /**
+   * Global table to keep the diff report. Each key is prefixed by the jobID
+   * to improve look up and clean up.
+   * Note that byte array is used to reduce the unnecessary serialization and
+   * deserialization during intermediate steps.
+   */
+  private final PersistentMap<byte[], byte[]> snapDiffReportTable;
+
+  /**
+   * Contains all the snap diff jobs which are either queued, in_progress or
+   * done. This table is used to make sure that there is only single job for
+   * similar type of request at any point of time.
+   */
+  private final PersistentMap<String, String> snapDiffJobTable;
 
-  private OzoneConfiguration configuration;
+  private final OzoneConfiguration configuration;
 
 
   public SnapshotDiffManager(ManagedRocksDB db,
-      RocksDBCheckpointDiffer differ,
-      OzoneConfiguration conf) {
+                             RocksDBCheckpointDiffer differ,
+                             OzoneConfiguration configuration,
+                             ColumnFamilyHandle snapDiffJobCfh,
+                             ColumnFamilyHandle snapDiffReportCfh,
+                             ManagedColumnFamilyOptions familyOptions) {
     this.db = db;
     this.differ = differ;
-    this.configuration = conf;
+    this.configuration = configuration;
+    this.familyOptions = familyOptions;
     this.codecRegistry = new CodecRegistry();
 
     // Integers are used for indexing persistent list.
-    this.codecRegistry.addCodec(Integer.class,
-        new IntegerCodec());
-    // Need for Diff Report
+    this.codecRegistry.addCodec(Integer.class, new IntegerCodec());
+    // DiffReportEntry codec for Diff Report.
     this.codecRegistry.addCodec(DiffReportEntry.class,
         new OmDBDiffReportEntryCodec());
+
+    this.snapDiffJobTable = new RocksDbPersistentMap<>(db,
+        snapDiffJobCfh,
+        codecRegistry,
+        String.class,
+        String.class);
+
+    this.snapDiffReportTable = new RocksDbPersistentMap<>(db,
+        snapDiffReportCfh,
+        codecRegistry,
+        byte[].class,
+        byte[].class);
   }
 
   private Map<String, String> getTablePrefixes(
@@ -131,45 +171,93 @@ public class SnapshotDiffManager {
         getTablePrefixes(snapshotOMMM, volumeName, bucketName));
   }
 
-  @SuppressWarnings("checkstyle:methodlength")
+  @SuppressWarnings("parameternumber")
   public SnapshotDiffReport getSnapshotDiffReport(final String volume,
                                                   final String bucket,
                                                   final OmSnapshot fromSnapshot,
                                                   final OmSnapshot toSnapshot,
                                                   final SnapshotInfo fsInfo,
-                                                  final SnapshotInfo tsInfo)
+                                                  final SnapshotInfo tsInfo,
+                                                  final int index,
+                                                  final int pageSize)
       throws IOException, RocksDBException {
+    String diffJobKey = fsInfo.getSnapshotID() + DELIMITER +
+        tsInfo.getSnapshotID();
+
+    Pair<String, Boolean> jobIdToJobExist = getOrCreateJobId(diffJobKey);
+    String jobId = jobIdToJobExist.getLeft();
+    boolean jobExist = jobIdToJobExist.getRight();
+
+    // If snapshot diff doesn't exist, we generate the diff report first
+    // and add it to the table for future requests.
+    // This needs to be updated to queuing and job status base.
+    if (!jobExist) {
+      generateSnapshotDiffReport(jobId, volume, bucket, fromSnapshot,
+          toSnapshot, fsInfo, tsInfo);
+    }
 
-    final BucketLayout bucketLayout = getBucketLayout(volume, bucket,
-        fromSnapshot.getMetadataManager());
+    List<DiffReportEntry> diffReportList = new ArrayList<>();
 
-    // TODO: This should comes from request itself.
-    String requestId = UUID.randomUUID().toString();
+    boolean hasMoreEntries = true;
 
+    for (int idx = index; idx - index < pageSize; idx++) {
+      byte[] rawKey = codecRegistry.asRawData(jobId + DELIMITER + idx);
+      byte[] bytes = snapDiffReportTable.get(rawKey);
+      if (bytes == null) {
+        hasMoreEntries = false;
+        break;
+      }
+      diffReportList.add(codecRegistry.asObject(bytes, DiffReportEntry.class));
+    }
+
+    String tokenString = hasMoreEntries ?
+        String.valueOf(index + pageSize) : null;
+
+    return new SnapshotDiffReport(volume, bucket, fromSnapshot.getName(),
+        toSnapshot.getName(), diffReportList, tokenString);
+  }
+
+  /**
+   * Return the jobId from the table if it exists otherwise create a new one,
+   * add to the table and return that.
+   */
+  private synchronized Pair<String, Boolean> getOrCreateJobId(
+      String diffJobKey) {
+    String jobId = snapDiffJobTable.get(diffJobKey);
+
+    if (jobId != null) {
+      return Pair.of(jobId, true);
+    } else {
+      jobId = UUID.randomUUID().toString();
+      snapDiffJobTable.put(diffJobKey, jobId);
+      return Pair.of(jobId, false);
+    }
+  }
+
+  @SuppressWarnings("parameternumber")
+  private void generateSnapshotDiffReport(final String jobId,
+                                          final String volume,
+                                          final String bucket,
+                                          final OmSnapshot fromSnapshot,
+                                          final OmSnapshot toSnapshot,
+                                          final SnapshotInfo fsInfo,
+                                          final SnapshotInfo tsInfo)
+      throws RocksDBException {
     ColumnFamilyHandle fromSnapshotColumnFamily = null;
     ColumnFamilyHandle toSnapshotColumnFamily = null;
     ColumnFamilyHandle objectIDsColumnFamily = null;
-    ColumnFamilyHandle diffReportColumnFamily = null;
 
     try {
-      // RequestId is prepended to column family name to make it unique
+      // JobId is prepended to column families name to make them unique
+      // for request.
+      fromSnapshotColumnFamily =
+          createColumnFamily(jobId + FROM_SNAP_TABLE_SUFFIX);
+      toSnapshotColumnFamily =
+          createColumnFamily(jobId + TO_SNAP_TABLE_SUFFIX);
+      objectIDsColumnFamily =
+          createColumnFamily(jobId + UNIQUE_IDS_TABLE_SUFFIX);
+      // ReportId is prepended to column families name to make them unique
       // for request.
-      fromSnapshotColumnFamily = db.get().createColumnFamily(
-          new ColumnFamilyDescriptor(
-              codecRegistry.asRawData(requestId + "-fromSnapshot"),
-              new ManagedColumnFamilyOptions()));
-      toSnapshotColumnFamily = db.get().createColumnFamily(
-          new ColumnFamilyDescriptor(
-              codecRegistry.asRawData(requestId + "-toSnapshot"),
-              new ManagedColumnFamilyOptions()));
-      objectIDsColumnFamily = db.get().createColumnFamily(
-          new ColumnFamilyDescriptor(
-              codecRegistry.asRawData(requestId + "-objectIDs"),
-              new ManagedColumnFamilyOptions()));
-      diffReportColumnFamily = db.get().createColumnFamily(
-          new ColumnFamilyDescriptor(
-              codecRegistry.asRawData(requestId + "-diffReport"),
-              new ManagedColumnFamilyOptions()));
 
       // ObjectId to keyName map to keep key info for fromSnapshot.
       // objectIdToKeyNameMap is used to identify what keys were touched
@@ -177,34 +265,32 @@ public class SnapshotDiffManager {
       // creation, deletion, modify or rename.
       // Stores only keyName instead of OmKeyInfo to reduce the memory
       // footprint.
-      final PersistentMap<Long, String> objectIdToKeyNameMapForFromSnapshot =
+      // Note: Store objectId and keyName as byte array to reduce unnecessary
+      // serialization and deserialization.
+      final PersistentMap<byte[], byte[]> objectIdToKeyNameMapForFromSnapshot =
           new RocksDbPersistentMap<>(db,
               fromSnapshotColumnFamily,
               codecRegistry,
-              Long.class,
-              String.class);
+              byte[].class,
+              byte[].class);
 
       // ObjectId to keyName map to keep key info for toSnapshot.
-      final PersistentMap<Long, String> objectIdToKeyNameMapForToSnapshot =
+      final PersistentMap<byte[], byte[]> objectIdToKeyNameMapForToSnapshot =
           new RocksDbPersistentMap<>(db,
               toSnapshotColumnFamily,
               codecRegistry,
-              Long.class,
-              String.class);
+              byte[].class,
+              byte[].class);
 
       // Set of unique objectId between fromSnapshot and toSnapshot.
-      final PersistentSet<Long> objectIDsToCheckMap =
+      final PersistentSet<byte[]> objectIDsToCheckMap =
           new RocksDbPersistentSet<>(db,
               objectIDsColumnFamily,
               codecRegistry,
-              Long.class);
+              byte[].class);
 
-      // Final diff report.
-      final PersistentList<DiffReportEntry> diffReport =
-          new RocksDbPersistentList<>(db,
-              diffReportColumnFamily,
-              codecRegistry,
-              DiffReportEntry.class);
+      final BucketLayout bucketLayout = getBucketLayout(volume, bucket,
+          fromSnapshot.getMetadataManager());
 
       final Table<String, OmKeyInfo> fsKeyTable =
           fromSnapshot.getMetadataManager().getKeyTable(bucketLayout);
@@ -232,7 +318,6 @@ public class SnapshotDiffManager {
           tablePrefixes);
 
       if (bucketLayout.isFileSystemOptimized()) {
-        // add to object ID map for directory.
         final Table<String, OmDirectoryInfo> fsDirTable =
             fromSnapshot.getMetadataManager().getDirectoryTable();
         final Table<String, OmDirectoryInfo> tsDirTable =
@@ -250,23 +335,13 @@ public class SnapshotDiffManager {
             tablePrefixes);
       }
 
-      generateDiffReport(requestId,
+      generateDiffReport(jobId,
           objectIDsToCheckMap,
           objectIdToKeyNameMapForFromSnapshot,
-          objectIdToKeyNameMapForToSnapshot,
-          diffReport);
-
-      // TODO: Need to change it to pagination.
-      //  https://issues.apache.org/jira/browse/HDDS-7548
-      List<DiffReportEntry> diffReportList = new ArrayList<>();
-      diffReport.iterator().forEachRemaining(diffReportList::add);
-
-      return new SnapshotDiffReport(volume,
-          bucket,
-          fromSnapshot.getName(),
-          toSnapshot.getName(),
-          diffReportList);
-
+          objectIdToKeyNameMapForToSnapshot);
+    } catch (IOException | RocksDBException exception) {
+      // TODO: [SNAPSHOT] Fail gracefully.
+      throw new RuntimeException(exception);
     } finally {
       // Clean up: drop the intermediate column family and close them.
       if (fromSnapshotColumnFamily != null) {
@@ -281,31 +356,25 @@ public class SnapshotDiffManager {
         db.get().dropColumnFamily(objectIDsColumnFamily);
         objectIDsColumnFamily.close();
       }
-      // Drop diff report table for now. Otherwise, rocksDb creation will
-      // fail on OM restart because column family already exists.
-      // Will be fixed in pagination PR (Jira: HDDS-7548).
-      if (diffReportColumnFamily != null) {
-        db.get().dropColumnFamily(diffReportColumnFamily);
-        diffReportColumnFamily.close();
-      }
     }
   }
 
   private void addToObjectIdMap(Table<String, ? extends WithObjectID> fsTable,
                                 Table<String, ? extends WithObjectID> tsTable,
                                 Set<String> deltaFiles,
-                                PersistentMap<Long, String> oldObjIdToKeyMap,
-                                PersistentMap<Long, String> newObjIdToKeyMap,
-                                PersistentSet<Long> objectIDsToCheck,
+                                PersistentMap<byte[], byte[]> oldObjIdToKeyMap,
+                                PersistentMap<byte[], byte[]> newObjIdToKeyMap,
+                                PersistentSet<byte[]> objectIDsToCheck,
                                 Map<String, String> tablePrefixes)
       throws IOException {
 
-    boolean isDirectoryTable =
-        fsTable.getName().equals(OmMetadataManagerImpl.DIRECTORY_TABLE);
-
     if (deltaFiles.isEmpty()) {
       return;
     }
+
+    boolean isDirectoryTable =
+        fsTable.getName().equals(OmMetadataManagerImpl.DIRECTORY_TABLE);
+
     try (Stream<String> keysToCheck = new ManagedSstFileReader(deltaFiles)
             .getKeyStream()) {
       keysToCheck.forEach(key -> {
@@ -318,23 +387,26 @@ public class SnapshotDiffManager {
             return;
           }
           if (oldKey != null) {
-            final long oldObjId = oldKey.getObjectID();
-            oldObjIdToKeyMap.put(oldObjId,
-                    getKeyOrDirectoryName(isDirectoryTable, oldKey));
-            objectIDsToCheck.add(oldObjId);
+            byte[] rawObjId = codecRegistry.asRawData(oldKey.getObjectID());
+            byte[] rawValue = codecRegistry.asRawData(
+                getKeyOrDirectoryName(isDirectoryTable, oldKey));
+            oldObjIdToKeyMap.put(rawObjId, rawValue);
+            objectIDsToCheck.add(rawObjId);
           }
           if (newKey != null) {
-            final long newObjId = newKey.getObjectID();
-            newObjIdToKeyMap.put(newObjId,
-                    getKeyOrDirectoryName(isDirectoryTable, newKey));
-            objectIDsToCheck.add(newObjId);
+            byte[] rawObjId = codecRegistry.asRawData(newKey.getObjectID());
+            byte[] rawValue = codecRegistry.asRawData(
+                getKeyOrDirectoryName(isDirectoryTable, newKey));
+            newObjIdToKeyMap.put(rawObjId, rawValue);
+            objectIDsToCheck.add(rawObjId);
           }
         } catch (IOException e) {
           throw new RuntimeException(e);
         }
       });
     } catch (RocksDBException rocksDBException) {
-      // TODO: Gracefully handle exception e.g. when input files do not exist
+      // TODO: [SNAPSHOT] Gracefully handle exception
+      //  e.g. when input files do not exist
       throw new RuntimeException(rocksDBException);
     }
   }
@@ -356,7 +428,7 @@ public class SnapshotDiffManager {
       SnapshotInfo fsInfo, SnapshotInfo tsInfo,
       boolean useFullDiff, Map<String, String> tablePrefixes)
       throws RocksDBException, IOException {
-    // TODO: Refactor the parameter list
+    // TODO: [SNAPSHOT] Refactor the parameter list
 
     final Set<String> deltaFiles = new HashSet<>();
 
@@ -375,8 +447,8 @@ public class SnapshotDiffManager {
           differ.getSSTDiffListWithFullPath(toDSI, fromDSI);
       deltaFiles.addAll(sstDiffList);
 
-      // TODO: Remove the workaround below when the SnapDiff logic can read
-      //  tombstones in SST files.
+      // TODO: [SNAPSHOT] Remove the workaround below when the SnapDiff logic
+      //  can read tombstones in SST files.
       // Workaround: Append "From DB" SST files to the deltaFiles list so that
       //  the current SnapDiff logic correctly handles deleted keys.
       if (!deltaFiles.isEmpty()) {
@@ -415,50 +487,43 @@ public class SnapshotDiffManager {
   }
 
   private void generateDiffReport(
-      final String requestId,
-      final PersistentSet<Long> objectIDsToCheck,
-      final PersistentMap<Long, String> oldObjIdToKeyMap,
-      final PersistentMap<Long, String> newObjIdToKeyMap,
-      final PersistentList<DiffReportEntry> diffReport
-  ) throws RocksDBException, IOException {
+      final String jobId,
+      final PersistentSet<byte[]> objectIDsToCheck,
+      final PersistentMap<byte[], byte[]> oldObjIdToKeyMap,
+      final PersistentMap<byte[], byte[]> newObjIdToKeyMap
+  ) throws RocksDBException {
 
     ColumnFamilyHandle deleteDiffColumnFamily = null;
     ColumnFamilyHandle renameDiffColumnFamily = null;
     ColumnFamilyHandle createDiffColumnFamily = null;
     ColumnFamilyHandle modifyDiffColumnFamily = null;
 
+    // RequestId is prepended to column family name to make it unique
+    // for request.
     try {
-      // RequestId is prepended to column family name to make it unique
-      // for request.
-      deleteDiffColumnFamily = db.get().createColumnFamily(
-          new ColumnFamilyDescriptor(
-              codecRegistry.asRawData(requestId + "-deleteDiff"),
-              new ManagedColumnFamilyOptions()));
-      renameDiffColumnFamily = db.get().createColumnFamily(
-          new ColumnFamilyDescriptor(
-              codecRegistry.asRawData(requestId + "-renameDiff"),
-              new ManagedColumnFamilyOptions()));
-      createDiffColumnFamily = db.get().createColumnFamily(
-          new ColumnFamilyDescriptor(
-              codecRegistry.asRawData(requestId + "-createDiff"),
-              new ManagedColumnFamilyOptions()));
-      modifyDiffColumnFamily = db.get().createColumnFamily(
-          new ColumnFamilyDescriptor(
-              codecRegistry.asRawData(requestId + "-modifyDiff"),
-              new ManagedColumnFamilyOptions()));
-
-      final PersistentList<DiffReportEntry> deleteDiffs =
+      deleteDiffColumnFamily =
+          createColumnFamily(jobId + DELETE_DIFF_TABLE_SUFFIX);
+      renameDiffColumnFamily =
+          createColumnFamily(jobId + RENAME_DIFF_TABLE_SUFFIX);
+      createDiffColumnFamily =
+          createColumnFamily(jobId + CREATE_DIFF_TABLE_SUFFIX);
+      modifyDiffColumnFamily =
+          createColumnFamily(jobId + MODIFY_DIFF_TABLE_SUFFIX);
+
+      // Keep byte array instead of storing as DiffReportEntry to avoid
+      // unnecessary serialization and deserialization.
+      final PersistentList<byte[]> deleteDiffs =
           createDiffReportPersistentList(deleteDiffColumnFamily);
-      final PersistentList<DiffReportEntry> renameDiffs =
+      final PersistentList<byte[]> renameDiffs =
           createDiffReportPersistentList(renameDiffColumnFamily);
-      final PersistentList<DiffReportEntry> createDiffs =
+      final PersistentList<byte[]> createDiffs =
           createDiffReportPersistentList(createDiffColumnFamily);
-      final PersistentList<DiffReportEntry> modifyDiffs =
+      final PersistentList<byte[]> modifyDiffs =
           createDiffReportPersistentList(modifyDiffColumnFamily);
 
-      Iterator<Long> objectIdsIterator = objectIDsToCheck.iterator();
+      Iterator<byte[]> objectIdsIterator = objectIDsToCheck.iterator();
       while (objectIdsIterator.hasNext()) {
-        Long id = objectIdsIterator.next();
+        byte[] id = objectIdsIterator.next();
         /*
          * This key can be
          * -> Created after the old snapshot was taken, which means it will be
@@ -473,21 +538,29 @@ public class SnapshotDiffManager {
          *    different name and same Object ID.
          */
 
-        final String oldKeyName = oldObjIdToKeyMap.get(id);
-        final String newKeyName = newObjIdToKeyMap.get(id);
+        byte[] oldKeyName = oldObjIdToKeyMap.get(id);
+        byte[] newKeyName = newObjIdToKeyMap.get(id);
 
         if (oldKeyName == null && newKeyName == null) {
           // This cannot happen.
           throw new IllegalStateException("Old and new key name both are null");
         } else if (oldKeyName == null) { // Key Created.
-          createDiffs.add(DiffReportEntry.of(DiffType.CREATE, newKeyName));
+          String key = codecRegistry.asObject(newKeyName, String.class);
+          DiffReportEntry entry = DiffReportEntry.of(DiffType.CREATE, key);
+          createDiffs.add(codecRegistry.asRawData(entry));
         } else if (newKeyName == null) { // Key Deleted.
-          deleteDiffs.add(DiffReportEntry.of(DiffType.DELETE, oldKeyName));
-        } else if (oldKeyName.equals(newKeyName)) { // Key modified.
-          modifyDiffs.add(DiffReportEntry.of(DiffType.MODIFY, newKeyName));
+          String key = codecRegistry.asObject(oldKeyName, String.class);
+          DiffReportEntry entry = DiffReportEntry.of(DiffType.DELETE, key);
+          deleteDiffs.add(codecRegistry.asRawData(entry));
+        } else if (Arrays.equals(oldKeyName, newKeyName)) { // Key modified.
+          String key = codecRegistry.asObject(newKeyName, String.class);
+          DiffReportEntry entry = DiffReportEntry.of(DiffType.MODIFY, key);
+          modifyDiffs.add(codecRegistry.asRawData(entry));
         } else { // Key Renamed.
-          renameDiffs.add(
-              DiffReportEntry.of(DiffType.RENAME, oldKeyName, newKeyName));
+          String oldKey = codecRegistry.asObject(oldKeyName, String.class);
+          String newKey = codecRegistry.asObject(newKeyName, String.class);
+          renameDiffs.add(codecRegistry.asRawData(
+              DiffReportEntry.of(DiffType.RENAME, oldKey, newKey)));
         }
       }
 
@@ -527,10 +600,14 @@ public class SnapshotDiffManager {
        *
        */
 
-      diffReport.addAll(deleteDiffs);
-      diffReport.addAll(renameDiffs);
-      diffReport.addAll(createDiffs);
-      diffReport.addAll(modifyDiffs);
+      int index = 0;
+      index = addToReport(jobId, index, deleteDiffs);
+      index = addToReport(jobId, index, renameDiffs);
+      index = addToReport(jobId, index, createDiffs);
+      addToReport(jobId, index, modifyDiffs);
+    } catch (IOException e) {
+      // TODO: [SNAPSHOT] Fail gracefully.
+      throw new RuntimeException(e);
     } finally {
       if (deleteDiffColumnFamily != null) {
         db.get().dropColumnFamily(deleteDiffColumnFamily);
@@ -551,13 +628,35 @@ public class SnapshotDiffManager {
     }
   }
 
-  private PersistentList<DiffReportEntry> createDiffReportPersistentList(
+  private PersistentList<byte[]> createDiffReportPersistentList(
       ColumnFamilyHandle columnFamilyHandle
   ) {
     return new RocksDbPersistentList<>(db,
         columnFamilyHandle,
         codecRegistry,
-        DiffReportEntry.class);
+        byte[].class);
+  }
+
+  private ColumnFamilyHandle createColumnFamily(String columnFamilyName)
+      throws RocksDBException {
+    return db.get().createColumnFamily(
+        new ColumnFamilyDescriptor(
+            StringUtils.string2Bytes(columnFamilyName),
+            familyOptions));
+  }
+
+  private int addToReport(String jobId, int index,
+                          PersistentList<byte[]> diffReportEntries)
+      throws IOException {
+    Iterator<byte[]> diffReportIterator = diffReportEntries.iterator();
+    while (diffReportIterator.hasNext()) {
+
+      snapDiffReportTable.put(
+          codecRegistry.asRawData(jobId + DELIMITER + index),
+          diffReportIterator.next());
+      index++;
+    }
+    return index;
   }
 
   private BucketLayout getBucketLayout(final String volume,
@@ -578,7 +677,6 @@ public class SnapshotDiffManager {
     return false;
   }
 
-
   /**
    * check if the given key is in the bucket specified by tablePrefix map.
    */
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestRocksDbPersistentList.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestRocksDbPersistentList.java
index 9674acdc10..a14c258238 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestRocksDbPersistentList.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestRocksDbPersistentList.java
@@ -19,100 +19,112 @@ package org.apache.hadoop.ozone.om.snapshot;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import org.apache.hadoop.hdds.StringUtils;
 import org.apache.hadoop.hdds.utils.db.CodecRegistry;
 import org.apache.hadoop.hdds.utils.db.IntegerCodec;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
-import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
 
+import static org.apache.hadoop.hdds.utils.db.DBStoreBuilder.DEFAULT_COLUMN_FAMILY_NAME;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
  * Test persistent list backed by RocksDB.
  */
 public class TestRocksDbPersistentList {
-  private ColumnFamilyHandle columnFamily;
-  private ManagedRocksDB db;
-  private File file;
-  private CodecRegistry codecRegistry;
-
-  @BeforeEach
-  public void init() throws RocksDBException, IOException {
-    ManagedOptions options = new ManagedOptions();
-    options.setCreateIfMissing(true);
+  private static File file;
+  private static ManagedRocksDB db;
+  private static ManagedDBOptions dbOptions;
+  private static ManagedColumnFamilyOptions columnFamilyOptions;
+
+  @BeforeAll
+  public static void staticInit() throws RocksDBException {
+    dbOptions = new ManagedDBOptions();
+    dbOptions.setCreateIfMissing(true);
+    columnFamilyOptions = new ManagedColumnFamilyOptions();
+
     file = new File("./test-persistent-list");
     if (!file.mkdirs() && !file.exists()) {
       throw new IllegalArgumentException("Unable to create directory " +
           file);
     }
 
-    db = ManagedRocksDB.open(options,
-        Paths.get(file.toString(), "rocks.db").toFile().getAbsolutePath());
+    String absolutePath = Paths.get(file.toString(), "rocks.db").toFile()
+        .getAbsolutePath();
 
-    codecRegistry = new CodecRegistry();
-    codecRegistry.addCodec(Integer.class, new IntegerCodec());
+    List<ColumnFamilyDescriptor> columnFamilyDescriptors =
+        Collections.singletonList(new ColumnFamilyDescriptor(
+            StringUtils.string2Bytes(DEFAULT_COLUMN_FAMILY_NAME),
+            columnFamilyOptions));
 
-    columnFamily = db.get().createColumnFamily(
-        new ColumnFamilyDescriptor(
-            codecRegistry.asRawData("testList"),
-            new ManagedColumnFamilyOptions()));
-  }
+    List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
 
-  @AfterEach
-  public void teardown() throws RocksDBException {
-    deleteDirectory(file);
+    db = ManagedRocksDB.open(dbOptions, absolutePath, columnFamilyDescriptors,
+        columnFamilyHandles);
+  }
 
-    if (columnFamily != null && db != null) {
-      db.get().dropColumnFamily(columnFamily);
+  @AfterAll
+  public static void staticTearDown() {
+    if (dbOptions != null) {
+      dbOptions.close();
     }
-    if (columnFamily != null) {
-      columnFamily.close();
+    if (columnFamilyOptions != null) {
+      columnFamilyOptions.close();
     }
     if (db != null) {
       db.close();
     }
+
+    GenericTestUtils.deleteDirectory(file);
   }
 
-  @ValueSource(ints = {0, 3})
-  @ParameterizedTest
-  public void testRocksDBPersistentList(int index) {
-    PersistentList<String> persistentList = new RocksDbPersistentList<>(
-        db,
-        columnFamily,
-        codecRegistry,
-        String.class
-    );
+  @Test
+  public void testRocksDBPersistentList() throws IOException, RocksDBException {
+    ColumnFamilyHandle columnFamily = null;
 
-    List<String> testList = Arrays.asList("e1", "e2", "e3", "e1", "e2");
-    testList.forEach(persistentList::add);
+    try {
+      CodecRegistry codecRegistry = new CodecRegistry();
+      codecRegistry.addCodec(Integer.class, new IntegerCodec());
 
-    Iterator<String> iterator = index == 0 ? persistentList.iterator() :
-        persistentList.iterator(index);
+      columnFamily = db.get().createColumnFamily(new ColumnFamilyDescriptor(
+          codecRegistry.asRawData("testSet"), columnFamilyOptions));
 
-    while (iterator.hasNext()) {
-      assertEquals(iterator.next(), testList.get(index++));
-    }
-  }
 
-  private boolean deleteDirectory(File directoryToBeDeleted) {
-    File[] allContents = directoryToBeDeleted.listFiles();
-    if (allContents != null) {
-      for (File content : allContents) {
-        if (!deleteDirectory(content)) {
-          return false;
-        }
+      PersistentList<String> persistentList = new RocksDbPersistentList<>(
+          db,
+          columnFamily,
+          codecRegistry,
+          String.class
+      );
+
+      List<String> testList = Arrays.asList("e1", "e2", "e3", "e1", "e2");
+      testList.forEach(persistentList::add);
+
+      Iterator<String> iterator = persistentList.iterator();
+      int index = 0;
+
+      while (iterator.hasNext()) {
+        assertEquals(iterator.next(), testList.get(index++));
+      }
+      assertEquals(testList.size(), index);
+    } finally {
+      if (columnFamily != null) {
+        db.get().dropColumnFamily(columnFamily);
+        columnFamily.close();
       }
     }
-    return directoryToBeDeleted.delete();
   }
 }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestRocksDbPersistentMap.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestRocksDbPersistentMap.java
index 3cc3cfb421..849b8179d4 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestRocksDbPersistentMap.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestRocksDbPersistentMap.java
@@ -19,37 +19,42 @@ package org.apache.hadoop.ozone.om.snapshot;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.hadoop.hdds.StringUtils;
 import org.apache.hadoop.hdds.utils.db.CodecRegistry;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
-import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
+import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
 
+import static org.apache.hadoop.hdds.utils.db.DBStoreBuilder.DEFAULT_COLUMN_FAMILY_NAME;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
  * Test persistent map backed by RocksDB.
  */
 public class TestRocksDbPersistentMap {
+  private static File file;
+  private static ManagedRocksDB db;
+  private static ManagedDBOptions dbOptions;
+  private static ManagedColumnFamilyOptions columnFamilyOptions;
 
-  private ColumnFamilyHandle columnFamily;
-  private ManagedRocksDB db;
-  private File file;
-  private CodecRegistry codecRegistry;
-
-  @BeforeEach
-  public void init() throws RocksDBException, IOException {
-    ManagedOptions options = new ManagedOptions();
-    options.setCreateIfMissing(true);
+  @BeforeAll
+  public static void staticInit() throws RocksDBException {
+    dbOptions = new ManagedDBOptions();
+    dbOptions.setCreateIfMissing(true);
+    columnFamilyOptions = new ManagedColumnFamilyOptions();
 
     file = new File("./test-persistent-map");
     if (!file.mkdirs() && !file.exists()) {
@@ -57,71 +62,73 @@ public class TestRocksDbPersistentMap {
           file);
     }
 
-    db = ManagedRocksDB.open(options,
-        Paths.get(file.toString(), "rocks.db").toFile().getAbsolutePath());
+    String absolutePath = Paths.get(file.toString(), "rocks.db").toFile()
+        .getAbsolutePath();
+
+    List<ColumnFamilyDescriptor> columnFamilyDescriptors =
+        Collections.singletonList(new ColumnFamilyDescriptor(
+            StringUtils.string2Bytes(DEFAULT_COLUMN_FAMILY_NAME),
+            columnFamilyOptions));
 
-    codecRegistry = new CodecRegistry();
+    List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
 
-    columnFamily = db.get().createColumnFamily(
-        new ColumnFamilyDescriptor(
-            codecRegistry.asRawData("testList"),
-            new ManagedColumnFamilyOptions()));
+    db = ManagedRocksDB.open(dbOptions, absolutePath, columnFamilyDescriptors,
+        columnFamilyHandles);
   }
 
   @AfterEach
-  public void clean() throws RocksDBException {
-    deleteDirectory(file);
-
-    if (columnFamily != null && db != null) {
-      db.get()
-          .dropColumnFamily(columnFamily);
+  public void teardown() throws RocksDBException {
+    if (dbOptions != null) {
+      dbOptions.close();
     }
-    if (columnFamily != null) {
-      columnFamily.close();
+    if (columnFamilyOptions != null) {
+      columnFamilyOptions.close();
     }
     if (db != null) {
       db.close();
     }
+
+    GenericTestUtils.deleteDirectory(file);
   }
 
   @Test
-  public void testRocksDBPersistentMap() {
-    PersistentMap<String, String> persistentMap = new RocksDbPersistentMap<>(
-        db,
-        columnFamily,
-        codecRegistry,
-        String.class,
-        String.class
-    );
-
-    List<String> keys = Arrays.asList("Key1", "Key2", "Key3", "Key1", "Key2");
-    List<String> values =
-        Arrays.asList("value1", "value2", "Value3", "Value1", "Value2");
-
-    Map<String, String> expectedMap = new HashMap<>();
-
-    for (int i = 0; i < keys.size(); i++) {
-      String key = keys.get(i);
-      String value = values.get(i);
-
-      persistentMap.put(key, value);
-      expectedMap.put(key, value);
-    }
-
-    for (Map.Entry<String, String> entry : expectedMap.entrySet()) {
-      assertEquals(entry.getValue(), persistentMap.get(entry.getKey()));
-    }
-  }
+  public void testRocksDBPersistentMap() throws IOException, RocksDBException {
+    ColumnFamilyHandle columnFamily = null;
+    try {
+      CodecRegistry codecRegistry = new CodecRegistry();
+      columnFamily = db.get().createColumnFamily(new ColumnFamilyDescriptor(
+          codecRegistry.asRawData("testMap"), columnFamilyOptions));
+
+      PersistentMap<String, String> persistentMap = new RocksDbPersistentMap<>(
+          db,
+          columnFamily,
+          codecRegistry,
+          String.class,
+          String.class
+      );
+
+      List<String> keys = Arrays.asList("Key1", "Key2", "Key3", "Key1", "Key2");
+      List<String> values =
+          Arrays.asList("value1", "value2", "Value3", "Value1", "Value2");
+
+      Map<String, String> expectedMap = new HashMap<>();
+
+      for (int i = 0; i < keys.size(); i++) {
+        String key = keys.get(i);
+        String value = values.get(i);
+
+        persistentMap.put(key, value);
+        expectedMap.put(key, value);
+      }
 
-  private boolean deleteDirectory(File directoryToBeDeleted) {
-    File[] allContents = directoryToBeDeleted.listFiles();
-    if (allContents != null) {
-      for (File content : allContents) {
-        if (!deleteDirectory(content)) {
-          return false;
-        }
+      for (Map.Entry<String, String> entry : expectedMap.entrySet()) {
+        assertEquals(entry.getValue(), persistentMap.get(entry.getKey()));
+      }
+    } finally {
+      if (columnFamily != null) {
+        db.get().dropColumnFamily(columnFamily);
+        columnFamily.close();
       }
     }
-    return directoryToBeDeleted.delete();
   }
 }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestRocksDbPersistentSet.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestRocksDbPersistentSet.java
index 1e7162522a..fcf0e75eb8 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestRocksDbPersistentSet.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestRocksDbPersistentSet.java
@@ -19,22 +19,27 @@ package org.apache.hadoop.ozone.om.snapshot;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import org.apache.hadoop.hdds.StringUtils;
 import org.apache.hadoop.hdds.utils.db.CodecRegistry;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
-import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
 import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
 
+import static org.apache.hadoop.hdds.utils.db.DBStoreBuilder.DEFAULT_COLUMN_FAMILY_NAME;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 
@@ -42,102 +47,87 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
  * Test persistent set backed by RocksDB.
  */
 public class TestRocksDbPersistentSet {
-  private ColumnFamilyHandle columnFamily;
-  private ManagedRocksDB db;
-  private File file;
-  private CodecRegistry codecRegistry;
-
-  @BeforeEach
-  public void init() throws RocksDBException, IOException {
-    ManagedOptions options = new ManagedOptions();
-    options.setCreateIfMissing(true);
+  private static File file;
+  private static ManagedRocksDB db;
+  private static ManagedDBOptions dbOptions;
+  private static ManagedColumnFamilyOptions columnFamilyOptions;
+
+  @BeforeAll
+  public static void staticInit() throws RocksDBException {
+    dbOptions = new ManagedDBOptions();
+    dbOptions.setCreateIfMissing(true);
+    columnFamilyOptions = new ManagedColumnFamilyOptions();
+
     file = new File("./test-persistent-set");
     if (!file.mkdirs() && !file.exists()) {
       throw new IllegalArgumentException("Unable to create directory " +
           file);
     }
 
-    db = ManagedRocksDB.open(options,
-        Paths.get(file.toString(), "rocks.db").toFile().getAbsolutePath());
+    String absolutePath = Paths.get(file.toString(), "rocks.db").toFile()
+        .getAbsolutePath();
 
-    codecRegistry = new CodecRegistry();
+    List<ColumnFamilyDescriptor> columnFamilyDescriptors =
+        Collections.singletonList(new ColumnFamilyDescriptor(
+            StringUtils.string2Bytes(DEFAULT_COLUMN_FAMILY_NAME),
+            columnFamilyOptions));
 
-    columnFamily = db.get().createColumnFamily(
-        new ColumnFamilyDescriptor(
-            codecRegistry.asRawData("testSet"),
-            new ManagedColumnFamilyOptions()));
-  }
+    List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
+
+    db = ManagedRocksDB.open(dbOptions, absolutePath, columnFamilyDescriptors,
+        columnFamilyHandles);
 
-  @AfterEach
-  public void teardown() throws RocksDBException {
-    deleteDirectory(file);
+  }
 
-    if (columnFamily != null && db != null) {
-      db.get().dropColumnFamily(columnFamily);
+  @AfterAll
+  public static void staticTearDown() {
+    if (dbOptions != null) {
+      dbOptions.close();
     }
-    if (columnFamily != null) {
-      columnFamily.close();
+    if (columnFamilyOptions != null) {
+      columnFamilyOptions.close();
     }
     if (db != null) {
       db.close();
     }
-  }
-
-  @Test
-  public void testRocksDBPersistentSetForString() {
-    PersistentSet<String> persistentSet = new RocksDbPersistentSet<>(
-        db,
-        columnFamily,
-        codecRegistry,
-        String.class
-    );
 
-    List<String> testList = Arrays.asList("e1", "e1", "e2", "e2", "e3");
-    Set<String> testSet = new HashSet<>(testList);
-
-    testList.forEach(persistentSet::add);
-
-    Iterator<String> iterator = persistentSet.iterator();
-    Iterator<String> setIterator = testSet.iterator();
-
-    while (iterator.hasNext()) {
-      assertEquals(iterator.next(), setIterator.next());
-    }
-    assertFalse(setIterator.hasNext());
+    GenericTestUtils.deleteDirectory(file);
   }
 
   @Test
-  public void testRocksDBPersistentSetForLong() {
-    PersistentSet<Long> persistentSet = new RocksDbPersistentSet<>(
-        db,
-        columnFamily,
-        codecRegistry,
-        Long.class
-    );
+  public void testRocksDBPersistentSet() throws IOException, RocksDBException {
+    ColumnFamilyHandle columnFamily = null;
 
-    List<Long> testList = Arrays.asList(1L, 1L, 2L, 2L, 3L, 4L, 5L, 5L);
-    Set<Long> testSet = new HashSet<>(testList);
+    try {
+      CodecRegistry codecRegistry = new CodecRegistry();
+      columnFamily = db.get().createColumnFamily(
+          new ColumnFamilyDescriptor(
+              codecRegistry.asRawData("testSet"), columnFamilyOptions));
 
-    testList.forEach(persistentSet::add);
+      PersistentSet<String> persistentSet = new RocksDbPersistentSet<>(
+          db,
+          columnFamily,
+          codecRegistry,
+          String.class
+      );
 
-    Iterator<Long> iterator = persistentSet.iterator();
-    Iterator<Long> setIterator = testSet.iterator();
+      List<String> testList = Arrays.asList("e1", "e1", "e2", "e2", "e3");
+      Set<String> testSet = new HashSet<>(testList);
 
-    while (iterator.hasNext()) {
-      assertEquals(iterator.next(), setIterator.next());
-    }
-    assertFalse(setIterator.hasNext());
-  }
+      testList.forEach(persistentSet::add);
 
-  private boolean deleteDirectory(File directoryToBeDeleted) {
-    File[] allContents = directoryToBeDeleted.listFiles();
-    if (allContents != null) {
-      for (File content : allContents) {
-        if (!deleteDirectory(content)) {
-          return false;
-        }
+      Iterator<String> iterator = persistentSet.iterator();
+      Iterator<String> setIterator = testSet.iterator();
+
+      while (iterator.hasNext()) {
+        assertEquals(iterator.next(), setIterator.next());
+      }
+      assertFalse(setIterator.hasNext());
+    } finally {
+      if (columnFamily != null) {
+        db.get().dropColumnFamily(columnFamily);
+        columnFamily.close();
       }
     }
-    return directoryToBeDeleted.delete();
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org