You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2023/04/25 13:16:39 UTC

[ozone] branch master updated: HDDS-7995. Support incremental snapshot on OM (#4294)

This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new cc1d2b3984 HDDS-7995. Support incremental snapshot on OM (#4294)
cc1d2b3984 is described below

commit cc1d2b3984194d22027fc119c05329e1c9e73c5f
Author: NibiruXu <ax...@qq.com>
AuthorDate: Tue Apr 25 21:16:33 2023 +0800

    HDDS-7995. Support incremental snapshot on OM (#4294)
---
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   6 +
 .../hadoop/hdds/utils/DBCheckpointMetrics.java     |  21 ++
 .../hadoop/hdds/utils/DBCheckpointServlet.java     |  49 +++-
 .../apache/hadoop/hdds/utils/FaultInjector.java    |  43 ++++
 .../java/org/apache/hadoop/hdds/utils/HAUtils.java |  49 +++-
 .../apache/hadoop/hdds/utils/HddsServerUtil.java   |  26 +-
 .../hadoop/hdds/utils/RDBSnapshotProvider.java     | 233 +++++++++++++++++
 .../hadoop/hdds/utils/TestRDBSnapshotProvider.java | 234 +++++++++++++++++
 .../apache/hadoop/hdds/utils/db/TestRDBStore.java  |  71 +++++-
 .../hdds/scm/ha/SCMDBCheckpointProvider.java       |   4 +-
 .../hadoop/ozone/om/helpers/OMNodeDetails.java     |  43 ++--
 .../hdds/scm/TestSCMDbCheckpointServlet.java       |   2 +-
 .../hadoop/ozone/om/TestOMDbCheckpointServlet.java |  50 +++-
 .../hadoop/ozone/om/TestOMRatisSnapshots.java      | 277 ++++++++++++++++++++-
 .../snapshot/TestOzoneManagerSnapshotProvider.java |   2 +-
 .../hadoop/ozone/om/OMDBCheckpointServlet.java     |  22 +-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  49 ++--
 .../ozone/om/ratis/OzoneManagerStateMachine.java   |  26 ++
 .../om/ratis_snapshot/OmRatisSnapshotProvider.java |  96 +++----
 19 files changed, 1161 insertions(+), 142 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 9ba770c214..99108fc766 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -143,6 +143,8 @@ public final class OzoneConsts {
       "flushBeforeCheckpoint";
   public static final String OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA =
       "includeSnapshotData";
+  public static final String OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST =
+      "toExcludeSST";
 
   public static final String RANGER_OZONE_SERVICE_VERSION_KEY =
       "#RANGEROZONESERVICEVERSION";
@@ -458,6 +460,10 @@ public final class OzoneConsts {
 
   public static final long DEFAULT_OM_UPDATE_ID = -1L;
 
+  // RocksDB snapshot
+  public static final String SNAPSHOT_CANDIDATE_DIR = ".candidate";
+  public static final String ROCKSDB_SST_SUFFIX = ".sst";
+
   // SCM default service Id and node Id in non-HA where config is not defined
   // in non-HA style.
   public static final String SCM_DUMMY_NODEID = "scmNodeId";
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointMetrics.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointMetrics.java
index d3f3cd1d8f..52b6e1ef53 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointMetrics.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointMetrics.java
@@ -40,8 +40,11 @@ public class DBCheckpointMetrics {
   // Metrics to track checkpoint statistics from last run.
   private @Metric MutableGaugeLong lastCheckpointCreationTimeTaken;
   private @Metric MutableGaugeLong lastCheckpointStreamingTimeTaken;
+  private @Metric MutableGaugeLong lastCheckpointStreamingNumSSTExcluded;
+  // NOTE: numCheckpoints includes numIncrementalCheckpoints
   private @Metric MutableCounterLong numCheckpoints;
   private @Metric MutableCounterLong numCheckpointFails;
+  private @Metric MutableCounterLong numIncrementalCheckpoints;
 
   public DBCheckpointMetrics() {
   }
@@ -68,6 +71,10 @@ public class DBCheckpointMetrics {
     this.lastCheckpointStreamingTimeTaken.set(val);
   }
 
+  public void setLastCheckpointStreamingNumSSTExcluded(long val) {
+    this.lastCheckpointStreamingNumSSTExcluded.set(val);
+  }
+
   @VisibleForTesting
   public void incNumCheckpoints() {
     numCheckpoints.incr();
@@ -78,6 +85,11 @@ public class DBCheckpointMetrics {
     numCheckpointFails.incr();
   }
 
+  @VisibleForTesting
+  public void incNumIncrementalCheckpoint() {
+    numIncrementalCheckpoints.incr();
+  }
+
   @VisibleForTesting
   public long getLastCheckpointCreationTimeTaken() {
     return lastCheckpointCreationTimeTaken.value();
@@ -88,6 +100,11 @@ public class DBCheckpointMetrics {
     return numCheckpoints.value();
   }
 
+  @VisibleForTesting
+  public long getNumIncrementalCheckpoints() {
+    return numIncrementalCheckpoints.value();
+  }
+
   @VisibleForTesting
   public long getNumCheckpointFails() {
     return numCheckpointFails.value();
@@ -97,4 +114,8 @@ public class DBCheckpointMetrics {
   public long getLastCheckpointStreamingTimeTaken() {
     return lastCheckpointStreamingTimeTaken.value();
   }
+
+  public long getLastCheckpointStreamingNumSSTExcluded() {
+    return lastCheckpointStreamingNumSSTExcluded.value();
+  }
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java
index 21b7c7cfde..291dd3b27d 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java
@@ -26,7 +26,12 @@ import java.io.OutputStream;
 import java.nio.file.Path;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.server.OzoneAdmins;
 import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
@@ -36,6 +41,8 @@ import org.apache.commons.lang3.StringUtils;
 
 import static org.apache.hadoop.hdds.utils.HddsServerUtil.writeDBCheckpointToStream;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST;
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
 
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
@@ -135,12 +142,24 @@ public class DBCheckpointServlet extends HttpServlet {
 
     DBCheckpoint checkpoint = null;
     try {
-
       boolean flush = false;
       String flushParam =
           request.getParameter(OZONE_DB_CHECKPOINT_REQUEST_FLUSH);
       if (StringUtils.isNotEmpty(flushParam)) {
-        flush = Boolean.valueOf(flushParam);
+        flush = Boolean.parseBoolean(flushParam);
+      }
+
+      List<String> receivedSstList = new ArrayList<>();
+      List<String> excludedSstList = new ArrayList<>();
+      String[] sstParam = request.getParameterValues(
+          OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST);
+      if (sstParam != null) {
+        receivedSstList.addAll(
+            Arrays.stream(sstParam)
+            .filter(s -> s.endsWith(ROCKSDB_SST_SUFFIX))
+            .distinct()
+            .collect(Collectors.toList()));
+        LOG.info("Received excluding SST {}", receivedSstList);
       }
 
       checkpoint = dbStore.getCheckpoint(flush);
@@ -164,12 +183,20 @@ public class DBCheckpointServlet extends HttpServlet {
 
       Instant start = Instant.now();
       writeDbDataToStream(checkpoint, request,
-          response.getOutputStream());
+          response.getOutputStream(), receivedSstList, excludedSstList);
       Instant end = Instant.now();
 
       long duration = Duration.between(start, end).toMillis();
       LOG.info("Time taken to write the checkpoint to response output " +
           "stream: {} milliseconds", duration);
+
+      LOG.info("Excluded SST {} from the latest checkpoint.",
+          excludedSstList);
+      if (!excludedSstList.isEmpty()) {
+        dbMetrics.incNumIncrementalCheckpoint();
+      }
+      dbMetrics.setLastCheckpointStreamingNumSSTExcluded(
+          excludedSstList.size());
       dbMetrics.setLastCheckpointStreamingTimeTaken(duration);
       dbMetrics.incNumCheckpoints();
     } catch (Exception e) {
@@ -196,12 +223,20 @@ public class DBCheckpointServlet extends HttpServlet {
    * @param ignoredRequest The httpRequest which generated this checkpoint.
    *        (Parameter is ignored in this class but used in child classes).
    * @param destination The stream to write to.
+   * @param toExcludeList the files to be excluded
+   * @param excludedList  the files excluded
+   *
    */
   public void writeDbDataToStream(DBCheckpoint checkpoint,
-                                  HttpServletRequest ignoredRequest,
-                                  OutputStream destination)
+      HttpServletRequest ignoredRequest,
+      OutputStream destination,
+      List<String> toExcludeList,
+      List<String> excludedList)
       throws IOException, InterruptedException {
-    writeDBCheckpointToStream(checkpoint, destination);
-  }
+    Objects.requireNonNull(toExcludeList);
+    Objects.requireNonNull(excludedList);
 
+    writeDBCheckpointToStream(checkpoint, destination,
+        toExcludeList, excludedList);
+  }
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/FaultInjector.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/FaultInjector.java
new file mode 100644
index 0000000000..27957d162a
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/FaultInjector.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+
+/**
+ * Used to inject certain faults for testing.
+ */
+public abstract class FaultInjector {
+
+  @VisibleForTesting
+  public void init() {
+  }
+
+  @VisibleForTesting
+  public void pause() throws IOException {
+  }
+
+  @VisibleForTesting
+  public void resume() throws IOException {
+  }
+
+  @VisibleForTesting
+  public void reset() throws IOException {
+  }
+}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
index df368e5319..055b904d02 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
@@ -17,6 +17,7 @@
 package org.apache.hadoop.hdds.utils;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -53,6 +54,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -71,6 +73,7 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_INFO_WAIT_DURAT
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_INFO_WAIT_DURATION_DEFAULT;
 import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
 import static org.apache.hadoop.ozone.OzoneConsts.DB_TRANSIENT_MARKER;
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
 
 /**
@@ -168,6 +171,7 @@ public final class HAUtils {
 
   /**
    * Replace the current DB with the new DB checkpoint.
+   * (checkpoint in checkpointPath will not be deleted here)
    *
    * @param lastAppliedIndex the last applied index in the current SCM DB.
    * @param checkpointPath   path to the new DB checkpoint
@@ -201,13 +205,16 @@ public final class HAUtils {
       // an inconsistent state and this marker file will fail it from
       // starting up.
       Files.createFile(markerFile);
-      FileUtils.moveDirectory(checkpointPath, oldDB.toPath());
+      // Copy the candidate DB to real DB
+      org.apache.commons.io.FileUtils.copyDirectory(checkpointPath.toFile(),
+          oldDB);
       Files.deleteIfExists(markerFile);
     } catch (IOException e) {
       LOG.error("Failed to move downloaded DB checkpoint {} to metadata "
               + "directory {}. Resetting to original DB.", checkpointPath,
           oldDB.toPath());
       try {
+        FileUtil.fullyDelete(oldDB);
         Files.move(dbBackup.toPath(), oldDB.toPath());
         Files.deleteIfExists(markerFile);
       } catch (IOException ex) {
@@ -225,7 +232,7 @@ public final class HAUtils {
    */
   public static TransactionInfo getTrxnInfoFromCheckpoint(
       OzoneConfiguration conf, Path dbPath, DBDefinition definition)
-      throws Exception {
+      throws IOException {
 
     if (dbPath != null) {
       Path dbDir = dbPath.getParent();
@@ -245,12 +252,12 @@ public final class HAUtils {
    * @param tempConfig
    * @param dbDir path to DB
    * @return TransactionInfo
-   * @throws Exception
+   * @throws IOException
    */
   private static TransactionInfo getTransactionInfoFromDB(
       OzoneConfiguration tempConfig, Path dbDir, String dbName,
       DBDefinition definition)
-      throws Exception {
+      throws IOException {
 
     try (DBStore dbStore = loadDB(tempConfig, dbDir.toFile(),
         dbName, definition)) {
@@ -297,7 +304,7 @@ public final class HAUtils {
       long lastAppliedIndex, String leaderId, Path newDBlocation,
       Logger logger) {
     if (transactionInfo.getTransactionIndex() <= lastAppliedIndex) {
-      logger.error("Failed to install checkpoint from SCM leader: {}"
+      logger.error("Failed to install checkpoint from the leader: {}"
               + ". The last applied index: {} is greater than or equal to the "
               + "checkpoint's applied index: {}. Deleting the downloaded "
               + "checkpoint {}", leaderId, lastAppliedIndex,
@@ -319,7 +326,8 @@ public final class HAUtils {
         configuration.getObject(RocksDBConfiguration.class);
     DBStoreBuilder dbStoreBuilder =
         DBStoreBuilder.newBuilder(configuration, rocksDBConfiguration)
-            .setName(dbName).setPath(Paths.get(metaDir.getPath()));
+            .setName(dbName)
+            .setPath(Paths.get(metaDir.getPath()));
     // Add column family names and codecs.
     for (DBColumnFamilyDefinition columnFamily : definition
         .getColumnFamilies()) {
@@ -347,6 +355,35 @@ public final class HAUtils {
     }
     return metadataDir;
   }
+
+  /**
+   * Scan the DB dir and return the existing SST files.
+   * SSTs could be used for avoiding repeated download.
+   *
+   * @param db the file representing the DB to be scanned
+   * @return the list of SST file name. If db not exist, will return empty list
+   */
+  public static List<String> getExistingSstFiles(File db) {
+    List<String> sstList = new ArrayList<>();
+    if (!db.exists()) {
+      return sstList;
+    }
+    FilenameFilter filter = new FilenameFilter() {
+      @Override
+      public boolean accept(File dir, String name) {
+        return name.endsWith(ROCKSDB_SST_SUFFIX);
+      }
+    };
+    String[] tempArray = db.list(filter);
+    if (tempArray != null) {
+      sstList = Arrays.asList(tempArray);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Scanned SST files {} in {}.", sstList, db.getAbsolutePath());
+      }
+    }
+    return sstList;
+  }
+
   /**
    * Build CA list which need to be passed to client.
    *
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
index 201a70e42b..f120ffad33 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
@@ -26,6 +26,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.concurrent.TimeUnit;
@@ -525,12 +526,17 @@ public final class HddsServerUtil {
   /**
    * Write DB Checkpoint to an output stream as a compressed file (tar).
    *
-   * @param checkpoint  checkpoint file
-   * @param destination destination output stream.
+   * @param checkpoint    checkpoint file
+   * @param destination   destination output stream.
+   * @param toExcludeList the files to be excluded
+   * @param excludedList  the files excluded
    * @throws IOException
    */
-  public static void writeDBCheckpointToStream(DBCheckpoint checkpoint,
-      OutputStream destination)
+  public static void writeDBCheckpointToStream(
+      DBCheckpoint checkpoint,
+      OutputStream destination,
+      List<String> toExcludeList,
+      List<String> excludedList)
       throws IOException {
     try (TarArchiveOutputStream archiveOutputStream =
             new TarArchiveOutputStream(destination);
@@ -540,10 +546,14 @@ public final class HddsServerUtil {
           TarArchiveOutputStream.BIGNUMBER_POSIX);
       for (Path path : files.collect(Collectors.toList())) {
         if (path != null) {
-          Path fileName = path.getFileName();
-          if (fileName != null) {
-            includeFile(path.toFile(), fileName.toString(),
-                archiveOutputStream);
+          Path fileNamePath = path.getFileName();
+          if (fileNamePath != null) {
+            String fileName = fileNamePath.toString();
+            if (!toExcludeList.contains(fileName)) {
+              includeFile(path.toFile(), fileName, archiveOutputStream);
+            } else {
+              excludedList.add(fileName);
+            }
           }
         }
       }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/RDBSnapshotProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/RDBSnapshotProvider.java
new file mode 100644
index 0000000000..c934e7d2b9
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/RDBSnapshotProvider.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.RocksDBCheckpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.hadoop.ozone.OzoneConsts.SNAPSHOT_CANDIDATE_DIR;
+
+/**
+ * The RocksDB specified snapshot provider.
+ * Supports Incremental Snapshot and Full Snapshot.
+ *
+ * The process is as the followings:
+ * 1. Download the snapshot file from the leader
+ * 2. Untar the snapshot file to candidate dir
+ * 3. Return the candidate dir as DBCheckpoint
+ *
+ * The difference between incremental and full snapshot is whether to send
+ * the existing SST file list to the leader or not.
+ *
+ */
+public abstract class RDBSnapshotProvider implements Closeable {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RDBSnapshotProvider.class);
+
+  private final File snapshotDir;
+  private final File candidateDir;
+  private final String dbName;
+  private final AtomicReference<String> lastLeaderRef;
+  private final AtomicLong numDownloaded;
+  private FaultInjector injector;
+
+  public RDBSnapshotProvider(File snapshotDir, String dbName) {
+    this.snapshotDir = snapshotDir;
+    this.candidateDir = new File(snapshotDir, dbName + SNAPSHOT_CANDIDATE_DIR);
+    this.dbName = dbName;
+    this.injector = null;
+    this.lastLeaderRef = new AtomicReference<>(null);
+    this.numDownloaded = new AtomicLong();
+    init();
+  }
+
+  /**
+   * Initialize or reinitialize the RDB snapshot provider.
+   */
+  public synchronized void init() {
+    // check parent snapshot dir
+    if (!snapshotDir.exists()) {
+      HddsUtils.createDir(snapshotDir.toString());
+    }
+
+    LOG.info("Cleaning up the candidate dir: {}", candidateDir);
+    // cleanup candidate dir
+    if (candidateDir.exists()) {
+      FileUtil.fullyDeleteContents(candidateDir);
+    } else {
+      // create candidate dir
+      HddsUtils.createDir(candidateDir.toString());
+    }
+
+    // reset leader info
+    lastLeaderRef.set(null);
+  }
+
+  /**
+   * Download the latest DB snapshot(checkpoint) from the Leader.
+   *
+   * @param leaderNodeID the ID of leader node
+   * @return {@link DBCheckpoint}
+   * @throws IOException
+   */
+  public DBCheckpoint downloadDBSnapshotFromLeader(String leaderNodeID)
+      throws IOException {
+    LOG.info("Prepare to download the snapshot from leader OM {} and " +
+        "reloading state from the snapshot.", leaderNodeID);
+    checkLeaderConsistent(leaderNodeID);
+
+    String snapshotFileName = getSnapshotFileName(leaderNodeID);
+    File targetFile = new File(snapshotDir, snapshotFileName);
+    downloadSnapshot(leaderNodeID, targetFile);
+    LOG.info("Successfully download the latest snapshot {} from leader OM: {}",
+        targetFile, leaderNodeID);
+
+    RocksDBCheckpoint checkpoint = getCheckpointFromSnapshotFile(targetFile,
+        candidateDir, true);
+    LOG.info("Successfully untar the downloaded snapshot {} at {}.", targetFile,
+        checkpoint.getCheckpointLocation());
+
+    numDownloaded.incrementAndGet();
+    injectPause();
+    return checkpoint;
+  }
+
+  /**
+   * Clean up the candidate DB for the following reason:
+   * 1. If leader changes when installing incremental snapshot
+   *    Notice: here prevents downloading the error IC from the new leader,
+   *    instead, will ask for a full snapshot directly
+   * 2. Ready to download the full snapshot
+   *
+   * @param currentLeader the ID of leader node
+   */
+  private void checkLeaderConsistent(String currentLeader) {
+    String lastLeader = lastLeaderRef.get();
+    if (lastLeader != null) {
+      if (!lastLeader.equals(currentLeader)) {
+        LOG.info("Last leader for install snapshot is {}, but current leader " +
+            "is {}. ", lastLeader, currentLeader);
+        init();
+        lastLeaderRef.set(currentLeader);
+      }
+      return;
+    }
+
+    List<String> files = HAUtils.getExistingSstFiles(candidateDir);
+    if (!files.isEmpty()) {
+      LOG.warn("Candidate DB directory {} is not empty when last leader is " +
+          "null.", candidateDir);
+      init();
+    }
+    lastLeaderRef.set(currentLeader);
+  }
+
+  /**
+   * Get the snapshot file name.
+   *
+   * @param leaderNodeID the ID of leader node
+   * @return snapshot file name
+   */
+  public String getSnapshotFileName(String leaderNodeID) {
+    String snapshotTime = Long.toString(System.currentTimeMillis());
+    return dbName + "-" + leaderNodeID + "-" + snapshotTime + ".tar";
+  }
+
+  /**
+   * Untar the downloaded snapshot and convert to {@link RocksDBCheckpoint}.
+   *
+   * @param snapshot the downloaded snapshot tar file
+   * @param untarDir the directory to place the untarred files
+   * @param deleteSnapshot whether to delete the downloaded snapshot tar file
+   * @return {@link RocksDBCheckpoint}
+   * @throws IOException
+   */
+  public RocksDBCheckpoint getCheckpointFromSnapshotFile(File snapshot,
+      File untarDir, boolean deleteSnapshot) throws IOException {
+    // Untar the checkpoint file.
+    Path untarredDbDir = untarDir.toPath();
+    FileUtil.unTar(snapshot, untarredDbDir.toFile());
+
+    if (deleteSnapshot) {
+      FileUtil.fullyDelete(snapshot);
+    }
+    return new RocksDBCheckpoint(untarredDbDir);
+  }
+
+  /**
+   * The abstract method to download the snapshot.
+   * Could be implemented in HTTP, GRPC, etc.
+   *
+   * @param leaderNodeID the ID of leader node
+   * @param targetFile   the snapshot file to be downloaded in
+   * @throws IOException
+   */
+  public abstract void downloadSnapshot(String leaderNodeID, File targetFile)
+      throws IOException;
+
+  /**
+   * Inject pause for test only.
+   *
+   * @throws IOException
+   */
+  private void injectPause() throws IOException {
+    if (injector != null) {
+      injector.pause();
+    }
+  }
+
+  @VisibleForTesting
+  public File getSnapshotDir() {
+    return snapshotDir;
+  }
+
+  @VisibleForTesting
+  public File getCandidateDir() {
+    return candidateDir;
+  }
+
+  @VisibleForTesting
+  public FaultInjector getInjector() {
+    return injector;
+  }
+
+  @VisibleForTesting
+  public void setInjector(FaultInjector injector) {
+    this.injector = injector;
+  }
+
+  @VisibleForTesting
+  public long getNumDownloaded() {
+    return numDownloaded.get();
+  }
+}
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestRDBSnapshotProvider.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestRDBSnapshotProvider.java
new file mode 100644
index 0000000000..3fcb53da14
--- /dev/null
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestRDBSnapshotProvider.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.utils;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.StringUtils;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
+import org.apache.hadoop.hdds.utils.db.TableConfig;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.rocksdb.RocksDB;
+import org.rocksdb.Statistics;
+import org.rocksdb.StatsLevel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.utils.HddsServerUtil.writeDBCheckpointToStream;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test Common RocksDB's snapshot provider service.
+ */
+public class TestRDBSnapshotProvider {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      TestRDBSnapshotProvider.class);
+
+  private final List<String> families =
+      Arrays.asList(StringUtils.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
+          "First", "Second", "Third");
+  public static final int MAX_DB_UPDATES_SIZE_THRESHOLD = 80;
+
+  private RDBStore rdbStore = null;
+  private ManagedDBOptions options = null;
+  private Set<TableConfig> configSet;
+  private RDBSnapshotProvider rdbSnapshotProvider;
+  private File testDir;
+  private final int numUsedCF = 3;
+  private final String leaderId = "leaderNode-1";
+  private final AtomicReference<DBCheckpoint> latestCK =
+      new AtomicReference<>(null);
+
+  @BeforeEach
+  public void init(@TempDir File tempDir) throws Exception {
+    options = getNewDBOptions();
+    configSet = new HashSet<>();
+    for (String name : families) {
+      TableConfig newConfig = new TableConfig(name,
+          new ManagedColumnFamilyOptions());
+      configSet.add(newConfig);
+    }
+    testDir = tempDir;
+    rdbStore = new RDBStore(tempDir, options, configSet,
+        MAX_DB_UPDATES_SIZE_THRESHOLD);
+    rdbSnapshotProvider = new RDBSnapshotProvider(testDir, "test.db") {
+      @Override
+      public void close() throws IOException {
+      }
+
+      @Override
+      public void downloadSnapshot(String leaderNodeID, File targetFile)
+          throws IOException {
+        for (int i = 0; i < 10; i++) {
+          insertDataToDB(numUsedCF);
+        }
+        DBCheckpoint dbCheckpoint = rdbStore.getCheckpoint(true);
+        latestCK.set(dbCheckpoint);
+        File[] files = dbCheckpoint.getCheckpointLocation().toFile().
+            listFiles();
+        assertNotNull(files);
+        LOG.info("Db files: {}", Arrays.stream(files)
+            .map(a -> "".concat(a.getName()).concat(" length: ").
+                concat(String.valueOf(a.length())))
+            .collect(Collectors.toList()));
+        try (OutputStream outputStream = new FileOutputStream(targetFile)) {
+          writeDBCheckpointToStream(dbCheckpoint, outputStream,
+              HAUtils.getExistingSstFiles(
+                  rdbSnapshotProvider.getCandidateDir()), new ArrayList<>());
+        }
+      }
+    };
+  }
+
+  @AfterEach
+  public void down() throws Exception {
+    if (rdbStore != null) {
+      rdbStore.close();
+    }
+    if (testDir.exists()) {
+      FileUtil.fullyDelete(testDir);
+    }
+  }
+
+  @Test
+  public void testDownloadDBSnapshotFromLeader() throws Exception {
+    File candidateDir = rdbSnapshotProvider.getCandidateDir();
+    assertTrue(candidateDir.exists());
+
+    DBCheckpoint checkpoint;
+    int before = HAUtils.getExistingSstFiles(
+        rdbSnapshotProvider.getCandidateDir()).size();
+    assertEquals(0, before);
+
+    // Get first snapshot
+    checkpoint = rdbSnapshotProvider.downloadDBSnapshotFromLeader(leaderId);
+    File checkpointDir = checkpoint.getCheckpointLocation().toFile();
+    assertEquals(candidateDir, checkpointDir);
+    int first = HAUtils.getExistingSstFiles(
+        rdbSnapshotProvider.getCandidateDir()).size();
+
+    // Get second snapshot
+    checkpoint = rdbSnapshotProvider.downloadDBSnapshotFromLeader(leaderId);
+    int second = HAUtils.getExistingSstFiles(
+        rdbSnapshotProvider.getCandidateDir()).size();
+    assertTrue(second > first, "The second snapshot should" +
+        " have more SST files");
+    DBCheckpoint latestCheckpoint = latestCK.get();
+    compareDB(latestCheckpoint.getCheckpointLocation().toFile(),
+        checkpoint.getCheckpointLocation().toFile(), numUsedCF);
+
+    // Get third snapshot
+    checkpoint = rdbSnapshotProvider.downloadDBSnapshotFromLeader(leaderId);
+    int third = HAUtils.getExistingSstFiles(
+        rdbSnapshotProvider.getCandidateDir()).size();
+    assertTrue(third > second, "The third snapshot should" +
+        " have more SST files");
+    compareDB(latestCK.get().getCheckpointLocation().toFile(),
+        checkpoint.getCheckpointLocation().toFile(), numUsedCF);
+
+    // Test cleanup candidateDB
+    rdbSnapshotProvider.init();
+    assertEquals(0, HAUtils.getExistingSstFiles(
+        rdbSnapshotProvider.getCandidateDir()).size());
+  }
+
+  public void compareDB(File db1, File db2, int columnFamilyUsed)
+      throws Exception {
+    try (RDBStore rdbStore1 = new RDBStore(db1, getNewDBOptions(),
+             configSet, MAX_DB_UPDATES_SIZE_THRESHOLD);
+         RDBStore rdbStore2 = new RDBStore(db2, getNewDBOptions(),
+             configSet, MAX_DB_UPDATES_SIZE_THRESHOLD)) {
+      // all entries should be same from two DB
+      for (int i = 0; i < columnFamilyUsed; i++) {
+        try (TableIterator<byte[], ? extends KeyValue<byte[], byte[]>> iterator
+                 = rdbStore1.getTable(families.get(i)).iterator()) {
+          while (iterator.hasNext()) {
+            KeyValue<byte[], byte[]> keyValue = iterator.next();
+            byte[] key = keyValue.getKey();
+            byte[] value1 = keyValue.getValue();
+            byte[] value2 = rdbStore2.getTable(families.get(i))
+                .getIfExist(key);
+            assertArrayEquals(value1, value2);
+          }
+        }
+      }
+    }
+  }
+
+  private void insertDataToDB(int columnFamilyUsed) throws IOException {
+    for (int i = 0; i < columnFamilyUsed; i++) {
+      insertRandomData(rdbStore, i);
+    }
+  }
+
+  public ManagedDBOptions getNewDBOptions() {
+    ManagedDBOptions managedOptions = new ManagedDBOptions();
+    managedOptions.setCreateIfMissing(true);
+    managedOptions.setCreateMissingColumnFamilies(true);
+
+    Statistics statistics = new Statistics();
+    statistics.setStatsLevel(StatsLevel.ALL);
+    managedOptions.setStatistics(statistics);
+    return managedOptions;
+  }
+
+  public void insertRandomData(RDBStore dbStore, int familyIndex)
+      throws IOException {
+    try (Table<byte[], byte[]> firstTable = dbStore.getTable(families.
+        get(familyIndex))) {
+      Assertions.assertNotNull(firstTable, "Table cannot be null");
+      for (int x = 0; x < 100; x++) {
+        byte[] key =
+            RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
+        byte[] value =
+            RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
+        firstTable.put(key, value);
+      }
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+}
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java
index c7ca903be0..0a9e220597 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java
@@ -20,7 +20,10 @@
 package org.apache.hadoop.hdds.utils.db;
 
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FilenameFilter;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.util.Arrays;
@@ -44,6 +47,8 @@ import org.rocksdb.RocksDB;
 import org.rocksdb.Statistics;
 import org.rocksdb.StatsLevel;
 
+import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX;
+
 /**
  * RDBStore Tests.
  */
@@ -83,9 +88,11 @@ public class TestRDBStore {
       rdbStore.close();
     }
   }
-  private void insertRandomData(RDBStore dbStore, int familyIndex)
-      throws Exception {
-    try (Table firstTable = dbStore.getTable(families.get(familyIndex))) {
+
+  public void insertRandomData(RDBStore dbStore, int familyIndex)
+      throws IOException {
+    try (Table<byte[], byte[]> firstTable = dbStore.getTable(families.
+        get(familyIndex))) {
       Assertions.assertNotNull(firstTable, "Table cannot be null");
       for (int x = 0; x < 100; x++) {
         byte[] key =
@@ -94,6 +101,8 @@ public class TestRDBStore {
           RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
         firstTable.put(key, value);
       }
+    } catch (Exception e) {
+      throw new IOException(e);
     }
   }
 
@@ -359,4 +368,60 @@ public class TestRDBStore {
     }
   }
 
+  @Test
+  public void testSstConsistency() throws IOException {
+    for (int i = 0; i < 10; i++) {
+      insertRandomData(rdbStore, 0);
+      insertRandomData(rdbStore, 1);
+      insertRandomData(rdbStore, 2);
+    }
+    DBCheckpoint dbCheckpoint1 = rdbStore.getCheckpoint(true);
+
+    for (int i = 0; i < 10; i++) {
+      insertRandomData(rdbStore, 0);
+      insertRandomData(rdbStore, 1);
+      insertRandomData(rdbStore, 2);
+    }
+    DBCheckpoint dbCheckpoint2 = rdbStore.getCheckpoint(true);
+    compareSstWithSameName(dbCheckpoint1.getCheckpointLocation().toFile(),
+        dbCheckpoint2.getCheckpointLocation().toFile());
+
+    for (int i = 0; i < 10; i++) {
+      insertRandomData(rdbStore, 0);
+      insertRandomData(rdbStore, 1);
+      insertRandomData(rdbStore, 2);
+    }
+    DBCheckpoint dbCheckpoint3 = rdbStore.getCheckpoint(true);
+    compareSstWithSameName(dbCheckpoint2.getCheckpointLocation().toFile(),
+        dbCheckpoint3.getCheckpointLocation().toFile());
+  }
+
+  private void compareSstWithSameName(File checkpoint1, File checkpoint2)
+      throws IOException {
+    FilenameFilter filter = (dir, name) -> name.endsWith(ROCKSDB_SST_SUFFIX);
+    String[] files1 = checkpoint1.list(filter);
+    String[] files2 = checkpoint1.list(filter);
+    assert files1 != null;
+    assert files2 != null;
+    // Get all file names in the both checkpoints
+    List<String> result = Arrays.asList(files1);
+    result.retainAll(Arrays.asList(files2));
+
+    for (String name: result) {
+      File fileInCk1 = new File(checkpoint1.getAbsoluteFile(), name);
+      File fileInCk2 = new File(checkpoint2.getAbsoluteFile(), name);
+      long length1 = fileInCk1.length();
+      long length2 = fileInCk2.length();
+      Assertions.assertEquals(length1, length2, name);
+
+      try (InputStream fileStream1 = new FileInputStream(fileInCk1);
+           InputStream fileStream2 = new FileInputStream(fileInCk2)) {
+        byte[] content1 = new byte[fileStream1.available()];
+        byte[] content2 = new byte[fileStream2.available()];
+        fileStream1.read(content1);
+        fileStream2.read(content2);
+        Assertions.assertArrayEquals(content1, content2);
+      }
+    }
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBCheckpointProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBCheckpointProvider.java
index 7a0becdf19..afea097198 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBCheckpointProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBCheckpointProvider.java
@@ -28,6 +28,7 @@ import java.io.OutputStream;
 import java.nio.file.Path;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.ArrayList;
 
 // TODO: define a generic interface for this
 /**
@@ -66,7 +67,8 @@ public class SCMDBCheckpointProvider {
       }
 
       Instant start = Instant.now();
-      HddsServerUtil.writeDBCheckpointToStream(checkpoint, stream);
+      HddsServerUtil.writeDBCheckpointToStream(checkpoint, stream,
+          new ArrayList<>(), new ArrayList<>());
       Instant end = Instant.now();
 
       long duration = Duration.between(start, end).toMillis();
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java
index ee680ef854..b061c14b1f 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java
@@ -17,8 +17,6 @@
 
 package org.apache.hadoop.ozone.om.helpers;
 
-import org.apache.commons.lang3.StringUtils;
-
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.OmUtils;
@@ -26,13 +24,19 @@ import org.apache.hadoop.ozone.ha.ConfUtils;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerAdminProtocolProtos.OMNodeInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerAdminProtocolProtos.NodeState;
 import org.apache.hadoop.hdds.NodeDetails;
+import org.apache.http.client.utils.URIBuilder;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.List;
 
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_HTTP_ENDPOINT;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_PORT_KEY;
@@ -160,23 +164,28 @@ public final class OMNodeDetails extends NodeDetails {
     }
   }
 
-  public String getOMDBCheckpointEnpointUrl(boolean isHttpPolicy) {
-    if (isHttpPolicy) {
-      if (StringUtils.isNotEmpty(getHttpAddress())) {
-        return "http://" + getHttpAddress() +
-            OZONE_DB_CHECKPOINT_HTTP_ENDPOINT +
-            "?" + OZONE_DB_CHECKPOINT_REQUEST_FLUSH + "=true&" +
-            OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA + "=true";
-      }
-    } else {
-      if (StringUtils.isNotEmpty(getHttpsAddress())) {
-        return "https://" + getHttpsAddress() +
-            OZONE_DB_CHECKPOINT_HTTP_ENDPOINT +
-            "?" + OZONE_DB_CHECKPOINT_REQUEST_FLUSH + "=true&" +
-            OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA + "=true";
+  public URL getOMDBCheckpointEndpointUrl(boolean isHttp, boolean flush,
+      List<String> sstList) throws IOException {
+    URL url;
+    try {
+      URIBuilder urlBuilder = new URIBuilder().
+          setScheme(isHttp ? "http" : "https").
+          setHost(isHttp ? getHttpAddress() : getHttpsAddress()).
+          setPath(OZONE_DB_CHECKPOINT_HTTP_ENDPOINT).
+          addParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA, "true").
+          addParameter(OZONE_DB_CHECKPOINT_REQUEST_FLUSH,
+              flush ? "true" : "false");
+      if (sstList != null && !sstList.isEmpty()) {
+        for (String s: sstList) {
+          urlBuilder.addParameter(
+              OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST, s);
+        }
       }
+      url = urlBuilder.build().toURL();
+    } catch (URISyntaxException | MalformedURLException e) {
+      throw new IOException("Could not get OM DB Checkpoint Endpoint Url", e);
     }
-    return null;
+    return url;
   }
 
   public String getOMPrintInfo() {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java
index 8c47a197d6..28184a8ff8 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java
@@ -120,7 +120,7 @@ public class TestSCMDbCheckpointServlet {
           Collections.emptyList(),
           false);
       doCallRealMethod().when(scmDbCheckpointServletMock)
-         .writeDbDataToStream(any(), any(), any());
+         .writeDbDataToStream(any(), any(), any(), any(), any());
 
       HttpServletRequest requestMock = mock(HttpServletRequest.class);
       HttpServletResponse responseMock = mock(HttpServletResponse.class);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
index 316130c2e2..637b402acd 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
@@ -26,6 +26,7 @@ import javax.servlet.http.HttpServletResponse;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -193,7 +194,7 @@ public class TestOMDbCheckpointServlet {
         responseMock);
 
     doCallRealMethod().when(omDbCheckpointServletMock)
-        .writeDbDataToStream(any(), any(), any());
+        .writeDbDataToStream(any(), any(), any(), any(), any());
   }
 
   @Test
@@ -300,7 +301,7 @@ public class TestOMDbCheckpointServlet {
     // Get the tarball.
     try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) {
       omDbCheckpointServletMock.writeDbDataToStream(dbCheckpoint, requestMock,
-          fileOutputStream);
+          fileOutputStream, new ArrayList<>(), new ArrayList<>());
     }
 
     // Untar the file into a temp folder to be examined.
@@ -380,7 +381,7 @@ public class TestOMDbCheckpointServlet {
     // Get the tarball.
     try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) {
       omDbCheckpointServletMock.writeDbDataToStream(dbCheckpoint, requestMock,
-          fileOutputStream);
+          fileOutputStream, new ArrayList<>(), new ArrayList<>());
     }
 
     // Untar the file into a temp folder to be examined.
@@ -399,6 +400,49 @@ public class TestOMDbCheckpointServlet {
     Assert.assertEquals(initialCheckpointSet, finalCheckpointSet);
   }
 
+  @Test
+  public void testWriteDbDataWithToExcludeFileList()
+      throws Exception {
+    prepSnapshotData();
+
+    File dummyFile = new File(dbCheckpoint.getCheckpointLocation().toString(),
+        "dummy.sst");
+    try (OutputStreamWriter writer = new OutputStreamWriter(
+        new FileOutputStream(dummyFile), StandardCharsets.UTF_8)) {
+      writer.write("Dummy data.");
+    }
+    Assert.assertTrue(dummyFile.exists());
+    List<String> toExcludeList = new ArrayList<>();
+    List<String> excludedList = new ArrayList<>();
+    toExcludeList.add(dummyFile.getName());
+
+    // Set http param to exclude snapshot data.
+    when(requestMock.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA))
+        .thenReturn(null);
+
+    // Get the tarball.
+    try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) {
+      omDbCheckpointServletMock.writeDbDataToStream(dbCheckpoint, requestMock,
+          fileOutputStream, toExcludeList, excludedList);
+    }
+
+    // Untar the file into a temp folder to be examined.
+    String testDirName = folder.newFolder().getAbsolutePath();
+    int testDirLength = testDirName.length() + 1;
+    FileUtil.unTar(tempFile, new File(testDirName));
+
+    // Confirm the checkpoint directories match.
+    Path checkpointLocation = dbCheckpoint.getCheckpointLocation();
+    Set<String> initialCheckpointSet = getFiles(checkpointLocation,
+        checkpointLocation.toString().length() + 1);
+    Path finalCheckpointLocation = Paths.get(testDirName);
+    Set<String> finalCheckpointSet = getFiles(finalCheckpointLocation,
+        testDirLength);
+
+    initialCheckpointSet.removeAll(finalCheckpointSet);
+    Assert.assertTrue(initialCheckpointSet.contains(dummyFile.getName()));
+  }
+
   private void prepSnapshotData() throws Exception {
     setupCluster();
     metaDir = OMStorage.getOmDbDir(conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
index dd124be723..1850ba0b5a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
@@ -20,6 +20,9 @@ import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.hdds.ExitManager;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.utils.DBCheckpointMetrics;
+import org.apache.hadoop.hdds.utils.FaultInjector;
+import org.apache.hadoop.hdds.utils.HAUtils;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
@@ -61,8 +64,10 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -73,7 +78,9 @@ import java.util.stream.Stream;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
 import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPath;
 import static org.apache.hadoop.ozone.om.TestOzoneManagerHAWithData.createKey;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 /**
  * Tests the Ratis snapshots feature in OM.
@@ -305,6 +312,236 @@ public class TestOMRatisSnapshots {
     Assertions.assertTrue(hardLinkCount > 0, "No hard links were found");
   }
 
+  @Test
+  @Timeout(300)
+  public void testInstallIncrementalSnapshot() throws Exception {
+    // Get the leader OM
+    String leaderOMNodeId = OmFailoverProxyUtil
+        .getFailoverProxyProvider(objectStore.getClientProxy())
+        .getCurrentProxyOMNodeId();
+
+    OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
+    OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer();
+
+    // Find the inactive OM
+    String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId();
+    if (cluster.isOMActive(followerNodeId)) {
+      followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId();
+    }
+    OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
+
+    // Set fault injector to pause before install
+    FaultInjector faultInjector = new SnapshotPauseInjector();
+    followerOM.getOmSnapshotProvider().setInjector(faultInjector);
+
+    // Do some transactions so that the log index increases
+    List<String> firstKeys = writeKeysToIncreaseLogIndex(leaderRatisServer,
+        80);
+
+    // Start the inactive OM. Checkpoint installation will happen spontaneously.
+    cluster.startInactiveOM(followerNodeId);
+
+    // Wait the follower download the snapshot,but get stuck by injector
+    GenericTestUtils.waitFor(() -> {
+      return followerOM.getOmSnapshotProvider().getNumDownloaded() == 1;
+    }, 1000, 10000);
+
+    // Do some transactions, let leader OM take a new snapshot and purge the
+    // old logs, so that follower must download the new snapshot again.
+    List<String> secondKeys = writeKeysToIncreaseLogIndex(leaderRatisServer,
+        160);
+
+    // Resume the follower thread, it would download the incremental snapshot.
+    faultInjector.resume();
+
+    // Get the latest db checkpoint from the leader OM.
+    TransactionInfo transactionInfo =
+        TransactionInfo.readTransactionInfo(leaderOM.getMetadataManager());
+    TermIndex leaderOMTermIndex =
+        TermIndex.valueOf(transactionInfo.getTerm(),
+            transactionInfo.getTransactionIndex());
+    long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex();
+
+    // The recently started OM should be lagging behind the leader OM.
+    // Wait & for follower to update transactions to leader snapshot index.
+    // Timeout error if follower does not load update within 10s
+    GenericTestUtils.waitFor(() -> {
+      return followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex()
+          >= leaderOMSnapshotIndex - 1;
+    }, 1000, 10000);
+
+    assertEquals(2, followerOM.getOmSnapshotProvider().getNumDownloaded());
+
+    // Verify that the follower OM's DB contains the transactions which were
+    // made while it was inactive.
+    OMMetadataManager followerOMMetaMngr = followerOM.getMetadataManager();
+    assertNotNull(followerOMMetaMngr.getVolumeTable().get(
+        followerOMMetaMngr.getVolumeKey(volumeName)));
+    assertNotNull(followerOMMetaMngr.getBucketTable().get(
+        followerOMMetaMngr.getBucketKey(volumeName, bucketName)));
+
+    for (String key : firstKeys) {
+      assertNotNull(followerOMMetaMngr.getKeyTable(TEST_BUCKET_LAYOUT)
+          .get(followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
+    }
+    for (String key : secondKeys) {
+      assertNotNull(followerOMMetaMngr.getKeyTable(TEST_BUCKET_LAYOUT)
+          .get(followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
+    }
+
+    // Verify the metrics recording the incremental checkpoint at leader side
+    DBCheckpointMetrics dbMetrics = leaderOM.getMetrics().
+        getDBCheckpointMetrics();
+    Assertions.assertTrue(
+        dbMetrics.getLastCheckpointStreamingNumSSTExcluded() > 0);
+    assertEquals(1, dbMetrics.getNumIncrementalCheckpoints());
+
+    // Verify RPC server is running
+    GenericTestUtils.waitFor(() -> {
+      return followerOM.isOmRpcServerRunning();
+    }, 100, 5000);
+
+    // Read & Write after snapshot installed.
+    List<String> newKeys = writeKeys(1);
+    readKeys(newKeys);
+    assertNotNull(followerOMMetaMngr.getKeyTable(
+        TEST_BUCKET_LAYOUT).get(followerOMMetaMngr.getOzoneKey(
+        volumeName, bucketName, newKeys.get(0))));
+
+    // Verify follower candidate directory get cleaned
+    String[] filesInCandidate = followerOM.getOmSnapshotProvider().
+        getCandidateDir().list();
+    assertNotNull(filesInCandidate);
+    assertEquals(0, filesInCandidate.length);
+  }
+
+  @Test
+  @Timeout(300)
+  public void testInstallIncrementalSnapshotWithFailure() throws Exception {
+    // Get the leader OM
+    String leaderOMNodeId = OmFailoverProxyUtil
+        .getFailoverProxyProvider(objectStore.getClientProxy())
+        .getCurrentProxyOMNodeId();
+
+    OzoneManager leaderOM = cluster.getOzoneManager(leaderOMNodeId);
+    OzoneManagerRatisServer leaderRatisServer = leaderOM.getOmRatisServer();
+
+    // Find the inactive OM
+    String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId();
+    if (cluster.isOMActive(followerNodeId)) {
+      followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId();
+    }
+    OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
+
+    // Set fault injector to pause before install
+    FaultInjector faultInjector = new SnapshotPauseInjector();
+    followerOM.getOmSnapshotProvider().setInjector(faultInjector);
+
+    // Do some transactions so that the log index increases
+    List<String> firstKeys = writeKeysToIncreaseLogIndex(leaderRatisServer,
+        80);
+
+    // Start the inactive OM. Checkpoint installation will happen spontaneously.
+    cluster.startInactiveOM(followerNodeId);
+
+    // Wait the follower download the snapshot,but get stuck by injector
+    GenericTestUtils.waitFor(() -> {
+      return followerOM.getOmSnapshotProvider().getNumDownloaded() == 1;
+    }, 1000, 10000);
+
+    // Do some transactions, let leader OM take a new snapshot and purge the
+    // old logs, so that follower must download the new snapshot again.
+    List<String> secondKeys = writeKeysToIncreaseLogIndex(leaderRatisServer,
+        160);
+
+    // Resume the follower thread, it would download the incremental snapshot.
+    faultInjector.resume();
+
+    // Pause the follower thread again to block the second-time install
+    faultInjector.reset();
+
+    // Wait the follower download the incremental snapshot, but get stuck
+    // by injector
+    GenericTestUtils.waitFor(() -> {
+      return followerOM.getOmSnapshotProvider().getNumDownloaded() == 2;
+    }, 1000, 10000);
+
+    // Corrupt the mixed checkpoint in the candidate DB dir
+    File followerCandidateDir = followerOM.getOmSnapshotProvider().
+        getCandidateDir();
+    List<String> sstList = HAUtils.getExistingSstFiles(followerCandidateDir);
+    Assertions.assertTrue(sstList.size() > 0);
+    Collections.shuffle(sstList);
+    List<String> victimSstList = sstList.subList(0, sstList.size() / 3);
+    for (String sst: victimSstList) {
+      File victimSst = new File(followerCandidateDir, sst);
+      Assertions.assertTrue(victimSst.delete());
+    }
+
+    // Resume the follower thread, it would download the full snapshot again
+    // as the installation will fail for the corruption detected.
+    faultInjector.resume();
+
+    // Get the latest db checkpoint from the leader OM.
+    TransactionInfo transactionInfo =
+        TransactionInfo.readTransactionInfo(leaderOM.getMetadataManager());
+    TermIndex leaderOMTermIndex =
+        TermIndex.valueOf(transactionInfo.getTerm(),
+            transactionInfo.getTransactionIndex());
+    long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex();
+
+    // Wait & for follower to update transactions to leader snapshot index.
+    // Timeout error if follower does not load update within 10s
+    GenericTestUtils.waitFor(() -> {
+      return followerOM.getOmRatisServer().getLastAppliedTermIndex().getIndex()
+          >= leaderOMSnapshotIndex - 1;
+    }, 1000, 10000);
+
+    // Verify that the follower OM's DB contains the transactions which were
+    // made while it was inactive.
+    OMMetadataManager followerOMMetaMngr = followerOM.getMetadataManager();
+    assertNotNull(followerOMMetaMngr.getVolumeTable().get(
+        followerOMMetaMngr.getVolumeKey(volumeName)));
+    assertNotNull(followerOMMetaMngr.getBucketTable().get(
+        followerOMMetaMngr.getBucketKey(volumeName, bucketName)));
+
+    // Verify that the follower OM's DB contains the transactions which were
+    // made while it was inactive.
+    for (String key : firstKeys) {
+      assertNotNull(followerOMMetaMngr.getKeyTable(TEST_BUCKET_LAYOUT)
+          .get(followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
+    }
+    for (String key : secondKeys) {
+      assertNotNull(followerOMMetaMngr.getKeyTable(TEST_BUCKET_LAYOUT)
+          .get(followerOMMetaMngr.getOzoneKey(volumeName, bucketName, key)));
+    }
+
+    // Verify the metrics
+    DBCheckpointMetrics dbMetrics = leaderOM.getMetrics().
+        getDBCheckpointMetrics();
+    assertEquals(0, dbMetrics.getLastCheckpointStreamingNumSSTExcluded());
+    assertTrue(dbMetrics.getNumIncrementalCheckpoints() >= 1);
+    assertTrue(dbMetrics.getNumCheckpoints() >= 3);
+
+    // Verify RPC server is running
+    GenericTestUtils.waitFor(() -> {
+      return followerOM.isOmRpcServerRunning();
+    }, 100, 5000);
+
+    // Read & Write after snapshot installed.
+    List<String> newKeys = writeKeys(1);
+    readKeys(newKeys);
+    assertNotNull(followerOMMetaMngr.getKeyTable(
+        TEST_BUCKET_LAYOUT).get(followerOMMetaMngr.getOzoneKey(
+        volumeName, bucketName, newKeys.get(0))));
+
+    // Verify follower candidate directory get cleaned
+    String[] filesInCandidate = followerOM.getOmSnapshotProvider().
+        getCandidateDir().list();
+    assertNotNull(filesInCandidate);
+    assertEquals(0, filesInCandidate.length);
+  }
+
   @Ignore("Enable this unit test after RATIS-1481 used")
   public void testInstallSnapshotWithClientWrite() throws Exception {
     // Get the leader OM
@@ -642,8 +879,7 @@ public class TestOMRatisSnapshots {
     return keys;
   }
 
-  private List<String> writeKeys(long keyCount) throws IOException,
-      InterruptedException {
+  private List<String> writeKeys(long keyCount) throws IOException {
     List<String> keys = new ArrayList<>();
     long index = 0;
     while (index < keyCount) {
@@ -687,4 +923,39 @@ public class TestOMRatisSnapshots {
       log.error("System Exit: " + message, throwable);
     }
   }
+
+  private static class SnapshotPauseInjector extends FaultInjector {
+    private CountDownLatch ready;
+    private CountDownLatch wait;
+
+    SnapshotPauseInjector() {
+      init();
+    }
+
+    @Override
+    public void init() {
+      this.ready = new CountDownLatch(1);
+      this.wait = new CountDownLatch(1);
+    }
+
+    @Override
+    public void pause() throws IOException {
+      ready.countDown();
+      try {
+        wait.await();
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+    }
+
+    @Override
+    public void resume() throws IOException {
+      wait.countDown();
+    }
+
+    @Override
+    public void reset() throws IOException {
+      init();
+    }
+  }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java
index 32146a8269..6b39c5d0fc 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java
@@ -123,7 +123,7 @@ public class TestOzoneManagerSnapshotProvider {
 
     // Download latest checkpoint from leader OM to follower OM
     DBCheckpoint omSnapshot = followerOM.getOmSnapshotProvider()
-        .getOzoneManagerDBSnapshot(leaderOMNodeId);
+        .downloadDBSnapshotFromLeader(leaderOMNodeId);
 
     long leaderSnapshotIndex = leaderOM.getRatisSnapshotIndex();
     long downloadedSnapshotIndex = getDownloadedSnapshotIndex(omSnapshot);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
index 0f8e8b7edb..15423a16da 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
@@ -47,7 +47,9 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -115,8 +117,13 @@ public class OMDBCheckpointServlet extends DBCheckpointServlet {
   @Override
   public void writeDbDataToStream(DBCheckpoint checkpoint,
                                   HttpServletRequest request,
-                                  OutputStream destination)
+                                  OutputStream destination,
+                                  List<String> toExcludeList,
+                                  List<String> excludedList)
       throws IOException, InterruptedException {
+    Objects.requireNonNull(toExcludeList);
+    Objects.requireNonNull(excludedList);
+
     // Map of inodes to path.
     Map<Object, Path> copyFiles = new HashMap<>();
     // Map of link to path.
@@ -125,13 +132,24 @@ public class OMDBCheckpointServlet extends DBCheckpointServlet {
     getFilesForArchive(checkpoint, copyFiles, hardLinkFiles,
         includeSnapshotData(request));
 
+    // Exclude file
+    Map<Object, Path> finalCopyFiles = new HashMap<>();
+    copyFiles.forEach((o, path) -> {
+      String fName = path.getFileName().toString();
+      if (!toExcludeList.contains(fName)) {
+        finalCopyFiles.put(o, path);
+      } else {
+        excludedList.add(fName);
+      }
+    });
+
     try (TarArchiveOutputStream archiveOutputStream =
             new TarArchiveOutputStream(destination)) {
       archiveOutputStream
           .setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);
       archiveOutputStream
           .setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX);
-      writeFilesToArchive(copyFiles, hardLinkFiles, archiveOutputStream);
+      writeFilesToArchive(finalCopyFiles, hardLinkFiles, archiveOutputStream);
     }
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index bc4162177f..14a7a0c815 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
@@ -2199,7 +2200,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
         jvmPauseMonitor.stop();
       }
       if (omRatisSnapshotProvider != null) {
-        omRatisSnapshotProvider.stop();
+        omRatisSnapshotProvider.close();
       }
       OMPerformanceMetrics.unregister();
       RatisDropwizardExports.clear(ratisMetricsMap, ratisReporterList);
@@ -3536,12 +3537,14 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       return null;
     }
 
-    DBCheckpoint omDBCheckpoint = getDBCheckpointFromLeader(leaderId);
-    if (omDBCheckpoint == null) {
+    DBCheckpoint omDBCheckpoint;
+    try {
+      omDBCheckpoint = omRatisSnapshotProvider.
+          downloadDBSnapshotFromLeader(leaderId);
+    } catch (IOException ex) {
+      LOG.error("Failed to download snapshot from Leader {}.", leaderId,  ex);
       return null;
     }
-    LOG.info("Downloaded checkpoint from Leader {} to the location {}",
-        leaderId, omDBCheckpoint.getCheckpointLocation());
 
     TermIndex termIndex = null;
     try {
@@ -3553,7 +3556,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   }
 
   /**
-   * Install checkpoint. If the checkpoints snapshot index is greater than
+   * Install checkpoint. If the checkpoint snapshot index is greater than
    * OM's last applied transaction index, then re-initialize the OM
    * state via this checkpoint. Before re-initializing OM state, the OM Ratis
    * server should be stopped so that no new transactions can be applied.
@@ -3633,8 +3636,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       }
       try {
         time = Time.monotonicNow();
-        dbBackup = replaceOMDBWithCheckpoint(lastAppliedIndex, oldDBLocation,
-            checkpointLocation);
+        dbBackup = replaceOMDBWithCheckpoint(lastAppliedIndex,
+            oldDBLocation, checkpointLocation);
         term = checkpointTrxnInfo.getTerm();
         lastAppliedIndex = checkpointTrxnInfo.getTransactionIndex();
         LOG.info("Replaced DB with checkpoint from OM: {}, term: {}, " +
@@ -3642,7 +3645,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
             Time.monotonicNow() - time);
       } catch (Exception e) {
         LOG.error("Failed to install Snapshot from {} as OM failed to replace" +
-            " DB with downloaded checkpoint. Reloading old OM state.", e);
+            " DB with downloaded checkpoint. Reloading old OM state.",
+            leaderId, e);
       }
     } else {
       LOG.warn("Cannot proceed with InstallSnapshot as OM is at TermIndex {} " +
@@ -3701,7 +3705,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
         FileUtils.deleteFully(dbBackup);
       }
     } catch (Exception e) {
-      LOG.error("Failed to delete the backup of the original DB {}", dbBackup);
+      LOG.error("Failed to delete the backup of the original DB {}",
+          dbBackup, e);
     }
 
     if (lastAppliedIndex != checkpointTrxnInfo.getTransactionIndex()) {
@@ -3719,25 +3724,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     return newTermIndex;
   }
 
-
-  /**
-   * Download the latest OM DB checkpoint from the leader OM.
-   *
-   * @param leaderId OMNodeID of the leader OM node.
-   * @return latest DB checkpoint from leader OM.
-   */
-  private DBCheckpoint getDBCheckpointFromLeader(String leaderId) {
-    LOG.info("Downloading checkpoint from leader OM {} and reloading state " +
-        "from the checkpoint.", leaderId);
-
-    try {
-      return omRatisSnapshotProvider.getOzoneManagerDBSnapshot(leaderId);
-    } catch (IOException e) {
-      LOG.error("Failed to download checkpoint from OM leader {}", leaderId, e);
-    }
-    return null;
-  }
-
   private void stopTrashEmptier() {
     if (this.emptier != null) {
       emptier.interrupt();
@@ -3793,7 +3779,9 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       // an inconsistent state and this marker file will fail OM from
       // starting up.
       Files.createFile(markerFile);
-      FileUtils.moveDirectory(checkpointPath, oldDB.toPath());
+      // Copy the candidate DB to real DB
+      org.apache.commons.io.FileUtils.copyDirectory(checkpointPath.toFile(),
+          oldDB);
       moveOmSnapshotData(oldDB.toPath(), dbSnapshotsDir.toPath());
       Files.deleteIfExists(markerFile);
     } catch (IOException e) {
@@ -3801,6 +3789,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
               "directory {}. Resetting to original DB.", checkpointPath,
           oldDB.toPath());
       try {
+        FileUtil.fullyDelete(oldDB);
         Files.move(dbBackup.toPath(), oldDB.toPath());
         if (dbSnapshotsBackup.exists()) {
           Files.move(dbSnapshotsBackup.toPath(), dbSnapshotsDir.toPath());
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index a7775f5bc0..12343524c4 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -55,6 +55,7 @@ import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
@@ -208,6 +209,31 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
     ozoneManager.updatePeerList(newPeerIds);
   }
 
+  /**
+   * Called to notify state machine about the snapshot install result.
+   * Trigger the cleanup of candidate DB dir.
+   * @param result InstallSnapshotResult
+   * @param snapshotIndex the index of installed snapshot
+   * @param peer the peer which fini
+   */
+  @Override
+  public void notifySnapshotInstalled(RaftProtos.InstallSnapshotResult result,
+                                      long snapshotIndex, RaftPeer peer) {
+    LOG.info("Receive notifySnapshotInstalled event {} for the peer: {}" +
+        " snapshotIndex: {}.", result, peer.getId(), snapshotIndex);
+    switch (result) {
+    case SUCCESS:
+    case SNAPSHOT_UNAVAILABLE:
+      // Currently, only trigger for the one who installed snapshot
+      if (ozoneManager.getOmRatisServer().getServer().getPeer().equals(peer)) {
+        ozoneManager.getOmSnapshotProvider().init();
+      }
+      break;
+    default:
+      break;
+    }
+  }
+
   /**
    * Validate/pre-process the incoming update request in the state machine.
    * @return the content to be written to the log entry. Null means the request
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis_snapshot/OmRatisSnapshotProvider.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis_snapshot/OmRatisSnapshotProvider.java
index e5b2fdf963..b924a2294f 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis_snapshot/OmRatisSnapshotProvider.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis_snapshot/OmRatisSnapshotProvider.java
@@ -23,18 +23,15 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
 import java.net.URL;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.conf.MutableConfigurationSource;
 import org.apache.hadoop.hdds.server.http.HttpConfig;
+import org.apache.hadoop.hdds.utils.HAUtils;
+import org.apache.hadoop.hdds.utils.RDBSnapshotProvider;
 import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
-import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
-import org.apache.hadoop.hdds.utils.db.RocksDBCheckpoint;
 import org.apache.hadoop.hdfs.web.URLConnectionFactory;
 import org.apache.hadoop.ozone.om.helpers.OMNodeDetails;
 
@@ -71,24 +68,21 @@ import org.slf4j.LoggerFactory;
  * bootstrap.  The follower needs these copies to respond the users
  * snapshot requests when it becomes the leader.
  */
-public class OmRatisSnapshotProvider {
+public class OmRatisSnapshotProvider extends RDBSnapshotProvider {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(OmRatisSnapshotProvider.class);
 
-  private final File omSnapshotDir;
-  private Map<String, OMNodeDetails> peerNodesMap;
+  private final Map<String, OMNodeDetails> peerNodesMap;
   private final HttpConfig.Policy httpPolicy;
   private final boolean spnegoEnabled;
   private final URLConnectionFactory connectionFactory;
 
   public OmRatisSnapshotProvider(MutableConfigurationSource conf,
       File omRatisSnapshotDir, Map<String, OMNodeDetails> peerNodeDetails) {
-
+    super(omRatisSnapshotDir, OM_DB_NAME);
     LOG.info("Initializing OM Snapshot Provider");
-    this.omSnapshotDir = omRatisSnapshotDir;
-
-    this.peerNodesMap = new HashMap<>();
+    this.peerNodesMap = new ConcurrentHashMap<>();
     peerNodesMap.putAll(peerNodeDetails);
 
     this.httpPolicy = HttpConfig.getHttpPolicy(conf);
@@ -115,78 +109,60 @@ public class OmRatisSnapshotProvider {
   }
 
   /**
-   * Download the latest checkpoint from OM Leader via HTTP.
-   * @param leaderOMNodeID leader OM Node ID.
-   * @return the DB checkpoint (including the ratis snapshot index)
+   * When a new OM is bootstrapped, add it to the peerNode map.
    */
-  public DBCheckpoint getOzoneManagerDBSnapshot(String leaderOMNodeID)
-      throws IOException {
-    String snapshotTime = Long.toString(System.currentTimeMillis());
-    String snapshotFileName = OM_DB_NAME + "-" + leaderOMNodeID
-        + "-" + snapshotTime;
-    String snapshotFilePath = Paths.get(omSnapshotDir.getAbsolutePath(),
-        snapshotFileName).toFile().getAbsolutePath();
-    File targetFile = new File(snapshotFilePath + ".tar");
+  public void addNewPeerNode(OMNodeDetails newOMNode) {
+    peerNodesMap.put(newOMNode.getNodeId(), newOMNode);
+  }
 
-    String omCheckpointUrl = peerNodesMap.get(leaderOMNodeID)
-        .getOMDBCheckpointEnpointUrl(httpPolicy.isHttpEnabled());
+  /**
+   * When an OM is decommissioned, remove it from the peerNode map.
+   */
+  public void removeDecommissionedPeerNode(String decommNodeId) {
+    peerNodesMap.remove(decommNodeId);
+  }
 
+  @Override
+  public void downloadSnapshot(String leaderNodeID, File targetFile)
+      throws IOException {
+    OMNodeDetails leader = peerNodesMap.get(leaderNodeID);
+    URL omCheckpointUrl = leader.getOMDBCheckpointEndpointUrl(
+        httpPolicy.isHttpEnabled(), true,
+        HAUtils.getExistingSstFiles(getCandidateDir()));
     LOG.info("Downloading latest checkpoint from Leader OM {}. Checkpoint " +
-        "URL: {}", leaderOMNodeID, omCheckpointUrl);
+        "URL: {}", leaderNodeID, omCheckpointUrl);
     SecurityUtil.doAsCurrentUser(() -> {
-      HttpURLConnection httpURLConnection = (HttpURLConnection)
-          connectionFactory.openConnection(new URL(omCheckpointUrl),
-              spnegoEnabled);
-      httpURLConnection.connect();
-      int errorCode = httpURLConnection.getResponseCode();
+      HttpURLConnection connection = (HttpURLConnection)
+          connectionFactory.openConnection(omCheckpointUrl, spnegoEnabled);
+      connection.setRequestMethod("GET");
+      connection.connect();
+      int errorCode = connection.getResponseCode();
       if ((errorCode != HTTP_OK) && (errorCode != HTTP_CREATED)) {
         throw new IOException("Unexpected exception when trying to reach " +
             "OM to download latest checkpoint. Checkpoint URL: " +
             omCheckpointUrl + ". ErrorCode: " + errorCode);
       }
 
-      try (InputStream inputStream = httpURLConnection.getInputStream()) {
+      try (InputStream inputStream = connection.getInputStream()) {
         FileUtils.copyInputStreamToFile(inputStream, targetFile);
       } catch (IOException ex) {
-        LOG.error("OM snapshot {} cannot be downloaded.", targetFile, ex);
         boolean deleted = FileUtils.deleteQuietly(targetFile);
         if (!deleted) {
           LOG.error("OM snapshot which failed to download {} cannot be deleted",
               targetFile);
         }
+        throw ex;
+      } finally {
+        connection.disconnect();
       }
       return null;
     });
-
-    // Untar the checkpoint file.
-    Path untarredDbDir = Paths.get(snapshotFilePath);
-    FileUtil.unTar(targetFile, untarredDbDir.toFile());
-    FileUtils.deleteQuietly(targetFile);
-
-    LOG.info("Successfully downloaded latest checkpoint from leader OM: {}",
-        leaderOMNodeID);
-
-    RocksDBCheckpoint omCheckpoint = new RocksDBCheckpoint(untarredDbDir);
-    return omCheckpoint;
   }
 
-  public void stop() {
+  @Override
+  public void close() throws IOException {
     if (connectionFactory != null) {
       connectionFactory.destroy();
     }
   }
-
-  /**
-   * When a new OM is bootstrapped, add it to the peerNode map.
-   */
-  public void addNewPeerNode(OMNodeDetails newOMNode) {
-    peerNodesMap.put(newOMNode.getNodeId(), newOMNode);
-  }
-
-  /**
-   * When an OM is decommissioned, remove it from the peerNode map.
-   */
-  public void removeDecommissionedPeerNode(String decommNodeId) {
-    peerNodesMap.remove(decommNodeId);
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org