You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sh...@apache.org on 2021/02/18 23:52:29 UTC

[ozone] branch HDDS-2823 updated: HDDS-4773. Add functionality to transfer Rocks db checkpoint from leader to follower (#1870)

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-2823 by this push:
     new 1cbd6e3  HDDS-4773. Add functionality to transfer Rocks db checkpoint from leader to follower (#1870)
1cbd6e3 is described below

commit 1cbd6e3c5229b398c98e4fe8fc5a39114283e3be
Author: bshashikant <sh...@apache.org>
AuthorDate: Fri Feb 19 05:22:15 2021 +0530

    HDDS-4773. Add functionality to transfer Rocks db checkpoint from leader to follower (#1870)
---
 .../java/org/apache/hadoop/hdds/HddsUtils.java     |   9 ++
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |   7 +-
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   4 +
 .../apache/hadoop/hdds/utils/HddsServerUtil.java   |  59 +++++++
 .../dev-support/findbugsExcludeFile.xml            |   3 +
 hadoop-hdds/interface-client/pom.xml               |  13 ++
 .../src/main/proto/InterSCMProtocol.proto          |  46 ++++++
 .../hadoop/hdds/scm/ha/InterSCMGrpcClient.java     | 172 +++++++++++++++++++++
 .../hdds/scm/ha/InterSCMGrpcProtocolService.java   |  85 ++++++++++
 .../hadoop/hdds/scm/ha/InterSCMGrpcService.java    |  66 ++++++++
 .../hadoop/hdds/scm/ha/MockSCMHAManager.java       |   5 +
 .../hdds/scm/ha/SCMDBCheckpointProvider.java       |  88 +++++++++++
 .../hadoop/hdds/scm/ha/SCMGrpcOutputStream.java    | 131 ++++++++++++++++
 .../hadoop/hdds/scm/ha/SCMHAConfiguration.java     |  41 +++++
 .../apache/hadoop/hdds/scm/ha/SCMHAManager.java    |   5 +
 .../hadoop/hdds/scm/ha/SCMHAManagerImpl.java       |  76 +++++++++
 .../org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java  |  39 +++++
 ...CMHAManager.java => SCMSnapshotDownloader.java} |  37 ++---
 .../hadoop/hdds/scm/ha/SCMSnapshotProvider.java    | 143 +++++++++++++++++
 .../hadoop/hdds/scm/TestSCMInstallSnapshot.java    | 100 ++++++++++++
 20 files changed, 1106 insertions(+), 23 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
index ae8b00b..d8b6439 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdds;
 
 import javax.management.ObjectName;
+import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -490,6 +491,14 @@ public final class HddsUtils {
         "Path should be a descendant of %s", ancestor);
   }
 
+  public static File createDir(String dirPath) {
+    File dirFile = new File(dirPath);
+    if (!dirFile.mkdirs() && !dirFile.exists()) {
+      throw new IllegalArgumentException("Unable to create path: " + dirFile);
+    }
+    return dirFile;
+  }
+
   /**
    * Leverages the Configuration.getPassword method to attempt to get
    * passwords from the CredentialProvider API before falling back to
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index b96ab64..17fb7a0 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -195,6 +195,10 @@ public final class ScmConfigKeys {
   public static final String OZONE_SCM_HTTPS_ADDRESS_KEY =
       "ozone.scm.https-address";
 
+  public static final String OZONE_SCM_ADDRESS_KEY =
+      "ozone.scm.address";
+  public static final String OZONE_SCM_BIND_HOST_DEFAULT =
+      "0.0.0.0";
   public static final String OZONE_SCM_HTTP_BIND_HOST_DEFAULT = "0.0.0.0";
   public static final int OZONE_SCM_HTTP_BIND_PORT_DEFAULT = 9876;
   public static final int OZONE_SCM_HTTPS_BIND_PORT_DEFAULT = 9877;
@@ -283,9 +287,6 @@ public final class ScmConfigKeys {
   public static final String OZONE_SCM_NODE_ID_KEY =
       "ozone.scm.node.id";
 
-  public static final String OZONE_SCM_ADDRESS_KEY =
-      "ozone.scm.address";
-
   public static final int OZONE_SCM_DEFAULT_PORT =
       OZONE_SCM_DATANODE_PORT_DEFAULT;
   // The path where datanode ID is to be written to.
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 6e05136..1fd54e5 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -125,6 +125,7 @@ public final class OzoneConsts {
   public static final String PIPELINE_DB_SUFFIX = "pipeline.db";
   public static final String DN_CONTAINER_DB = "-dn-"+ CONTAINER_DB_SUFFIX;
   public static final String OM_DB_NAME = "om.db";
+  public static final String SCM_DB_NAME = "scm.db";
   public static final String OM_DB_BACKUP_PREFIX = "om.db.backup.";
 
   public static final String STORAGE_DIR_CHUNKS = "chunks";
@@ -398,7 +399,10 @@ public final class OzoneConsts {
   public static final String DB_TRANSIENT_MARKER = "dbInconsistentMarker";
 
 
+  // TODO : rename this to OZONE_RATIS_SNAPSHOT_DIR and use it in both
+  // SCM and OM
   public static final String OM_RATIS_SNAPSHOT_DIR = "snapshot";
+  public static final String SCM_RATIS_SNAPSHOT_DIR = "snapshot";
 
   public static final long DEFAULT_OM_UPDATE_ID = -1L;  
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
index 13e08a1..6188262 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
@@ -18,15 +18,28 @@
 package org.apache.hadoop.hdds.utils;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import com.google.protobuf.BlockingService;
+import org.apache.commons.compress.archivers.ArchiveEntry;
+import org.apache.commons.compress.archivers.ArchiveOutputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.compress.compressors.CompressorException;
+import org.apache.commons.compress.compressors.CompressorOutputStream;
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
+import org.apache.commons.compress.utils.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -39,6 +52,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
 import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ipc.Client;
@@ -487,4 +501,49 @@ public final class HddsServerUtil {
     }
     return metricsSystem;
   }
+
+  /**
+   * Write DB Checkpoint to an output stream as a compressed file (tgz).
+   *
+   * @param checkpoint  checkpoint file
+   * @param destination destination output stream.
+   * @throws IOException
+   */
+  public static void writeDBCheckpointToStream(DBCheckpoint checkpoint,
+      OutputStream destination)
+      throws IOException {
+    try (CompressorOutputStream gzippedOut = new CompressorStreamFactory()
+        .createCompressorOutputStream(CompressorStreamFactory.GZIP,
+            destination);
+        ArchiveOutputStream archiveOutputStream =
+            new TarArchiveOutputStream(gzippedOut);
+        Stream<Path> files =
+            Files.list(checkpoint.getCheckpointLocation())) {
+      for (Path path : files.collect(Collectors.toList())) {
+        if (path != null) {
+          Path fileName = path.getFileName();
+          if (fileName != null) {
+            includeFile(path.toFile(), fileName.toString(),
+                archiveOutputStream);
+          }
+        }
+      }
+    } catch (CompressorException e) {
+      throw new IOException(
+          "Can't compress the checkpoint: " +
+              checkpoint.getCheckpointLocation(), e);
+    }
+  }
+
+  private static void includeFile(File file, String entryName,
+      ArchiveOutputStream archiveOutputStream)
+      throws IOException {
+    ArchiveEntry archiveEntry =
+        archiveOutputStream.createArchiveEntry(file, entryName);
+    archiveOutputStream.putArchiveEntry(archiveEntry);
+    try (FileInputStream fis = new FileInputStream(file)) {
+      IOUtils.copy(fis, archiveOutputStream);
+    }
+    archiveOutputStream.closeArchiveEntry();
+  }
 }
diff --git a/hadoop-hdds/interface-client/dev-support/findbugsExcludeFile.xml b/hadoop-hdds/interface-client/dev-support/findbugsExcludeFile.xml
index ba54a4f..18cdb6c 100644
--- a/hadoop-hdds/interface-client/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdds/interface-client/dev-support/findbugsExcludeFile.xml
@@ -21,4 +21,7 @@
   <Match>
     <Package name="org.apache.hadoop.hdds.protocol.proto"/>
   </Match>
+  <Match>
+    <Package name="org.apache.hadoop.hdds.protocol.scm.proto"/>
+  </Match>
 </FindBugsFilter>
diff --git a/hadoop-hdds/interface-client/pom.xml b/hadoop-hdds/interface-client/pom.xml
index 31a3960..dccb84b 100644
--- a/hadoop-hdds/interface-client/pom.xml
+++ b/hadoop-hdds/interface-client/pom.xml
@@ -71,6 +71,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
               <protoSourceRoot>${basedir}/src/main/proto/</protoSourceRoot>
               <includes>
                 <include>DatanodeClientProtocol.proto</include>
+                <include>InterSCMProtocol.proto</include>
               </includes>
               <outputDirectory>target/generated-sources/java</outputDirectory>
               <clearOutputDirectory>false</clearOutputDirectory>
@@ -119,6 +120,18 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
                          value="org.apache.ratis.thirdparty.com.google.common"
                          dir="target/generated-sources/java/org/apache/hadoop/hdds/protocol/datanode/proto">
                 </replace>
+                <replace token="com.google.protobuf"
+                         value="org.apache.ratis.thirdparty.com.google.protobuf"
+                         dir="target/generated-sources/java/org/apache/hadoop/hdds/protocol/scm/proto">
+                </replace>
+                <replace token="io.grpc"
+                         value="org.apache.ratis.thirdparty.io.grpc"
+                         dir="target/generated-sources/java/org/apache/hadoop/hdds/protocol/scm/proto">
+                </replace>
+                <replace token="com.google.common"
+                         value="org.apache.ratis.thirdparty.com.google.common"
+                         dir="target/generated-sources/java/org/apache/hadoop/hdds/protocol/scm/proto">
+                </replace>
               </tasks>
             </configuration>
             <goals>
diff --git a/hadoop-hdds/interface-client/src/main/proto/InterSCMProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/InterSCMProtocol.proto
new file mode 100644
index 0000000..8c726c6
--- /dev/null
+++ b/hadoop-hdds/interface-client/src/main/proto/InterSCMProtocol.proto
@@ -0,0 +1,46 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+/**
+ * These .proto interfaces are private and unstable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *unstable* .proto interface.
+ */
+syntax = "proto2";
+option java_package = "org.apache.hadoop.hdds.protocol.scm.proto";
+option java_outer_classname = "InterSCMProtocolProtos";
+option java_generate_equals_and_hash = true;
+
+
+message CopyDBCheckpointRequestProto {
+  required bool flush = 1;
+}
+
+message CopyDBCheckpointResponseProto {
+  required string clusterId = 1;
+  required uint64 len = 2;
+  required bool eof = 3;
+  required bytes data = 4;
+  required uint64 readOffset = 6;
+  optional int64 checksum = 7;
+}
+
+service InterSCMProtocolService {
+  // An inter SCM service to copy SCM DB checkpoint from leader to follower
+  rpc download (CopyDBCheckpointRequestProto) returns (stream CopyDBCheckpointResponseProto);
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java
new file mode 100644
index 0000000..fc347e5
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolProtos;
+import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolProtos.CopyDBCheckpointResponseProto;
+import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolServiceGrpc;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
+import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Grpc client to download a Rocks db checkpoint from leader node
+ * in SCM HA ring.
+ */
+public class InterSCMGrpcClient implements SCMSnapshotDownloader{
+  private static final Logger LOG =
+      LoggerFactory.getLogger(InterSCMGrpcClient.class);
+
+  private final ManagedChannel channel;
+
+  private final InterSCMProtocolServiceGrpc.InterSCMProtocolServiceStub
+      client;
+
+  private final long timeout;
+
+  public InterSCMGrpcClient(final String host, final ConfigurationSource conf) {
+    Preconditions.checkNotNull(conf);
+    int port = conf.getObject(SCMHAConfiguration.class).getGrpcBindPort();
+    timeout =
+        conf.getObject(SCMHAConfiguration.class).getGrpcDeadlineInterval();
+    NettyChannelBuilder channelBuilder =
+        NettyChannelBuilder.forAddress(host, port).usePlaintext()
+            .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE);
+    channel = channelBuilder.build();
+    client = InterSCMProtocolServiceGrpc.newStub(channel).
+        withDeadlineAfter(timeout, TimeUnit.SECONDS);
+  }
+
+
+  @Override
+  public CompletableFuture<Path> download(final Path outputPath) {
+    // By default on every checkpoint, the rocks db will be flushed
+    InterSCMProtocolProtos.CopyDBCheckpointRequestProto request =
+        InterSCMProtocolProtos.CopyDBCheckpointRequestProto.newBuilder()
+            .setFlush(true)
+            .build();
+    CompletableFuture<Path> response = new CompletableFuture<>();
+
+
+    client.download(request,
+        new StreamDownloader(response, outputPath));
+
+    return response;
+  }
+
+  public void shutdown() {
+    channel.shutdown();
+    try {
+      channel.awaitTermination(5, TimeUnit.SECONDS);
+    } catch (Exception e) {
+      LOG.error("failed to shutdown replication channel", e);
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    shutdown();
+  }
+
+  /**
+   * gRPC stream observer to CompletableFuture adapter.
+   */
+  public static class StreamDownloader
+      implements StreamObserver<CopyDBCheckpointResponseProto> {
+
+    private final CompletableFuture<Path> response;
+    private final OutputStream stream;
+    private final Path outputPath;
+
+    public StreamDownloader(CompletableFuture<Path> response,
+        Path outputPath) {
+      this.response = response;
+      this.outputPath = outputPath;
+      try {
+        Preconditions.checkNotNull(outputPath, "Output path cannot be null");
+        stream = new FileOutputStream(outputPath.toFile());
+      } catch (IOException e) {
+        throw new UncheckedIOException(
+            "Output path can't be used: " + outputPath, e);
+      }
+    }
+
+    @Override
+    public void onNext(CopyDBCheckpointResponseProto checkPoint) {
+      try {
+        checkPoint.getData().writeTo(stream);
+      } catch (IOException e) {
+        onError(e);
+      }
+    }
+
+    @Override
+    public void onError(Throwable throwable) {
+      try {
+        LOG.error("Download of checkpoint {} was unsuccessful",
+            outputPath, throwable);
+        stream.close();
+        deleteOutputOnFailure();
+        response.completeExceptionally(throwable);
+      } catch (IOException e) {
+        LOG.error("Failed to close {}}",
+            outputPath, e);
+        response.completeExceptionally(e);
+      }
+    }
+
+    @Override
+    public void onCompleted() {
+      try {
+        stream.close();
+        LOG.info("Checkpoint is downloaded to {}", outputPath);
+        response.complete(outputPath);
+      } catch (IOException e) {
+        LOG.error("Downloaded checkpoint OK, but failed to close {}",
+            outputPath, e);
+        response.completeExceptionally(e);
+      }
+
+    }
+
+    private void deleteOutputOnFailure() {
+      try {
+        Files.delete(outputPath);
+      } catch (IOException ex) {
+        LOG.error("Failed to delete destination {} for " +
+                "unsuccessful download",
+            outputPath, ex);
+      }
+    }
+  }
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java
new file mode 100644
index 0000000..9967530
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.ratis.thirdparty.io.grpc.Server;
+import org.apache.ratis.thirdparty.io.grpc.ServerBuilder;
+import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service to serve SCM DB checkpoints available for SCM HA.
+ * Ideally should only be run on a ratis leader.
+ */
+public class InterSCMGrpcProtocolService {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(InterSCMGrpcService.class);
+
+  private final int port;
+  private Server server;
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+  public InterSCMGrpcProtocolService(final ConfigurationSource conf,
+      final StorageContainerManager scm) {
+    Preconditions.checkNotNull(conf);
+    this.port = conf.getObject(SCMHAConfiguration.class).getGrpcBindPort();
+
+    NettyServerBuilder nettyServerBuilder =
+        ((NettyServerBuilder) ServerBuilder.forPort(port))
+            .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE);
+
+    InterSCMGrpcService service = new InterSCMGrpcService(scm);
+    ServerBuilder b = nettyServerBuilder.addService(service);
+    Preconditions.checkNotNull(b);
+    server = nettyServerBuilder.build();
+  }
+
+  public int getPort() {
+    return this.port;
+  }
+
+  public void start() throws IOException {
+    if (!isStarted.compareAndSet(false, true)) {
+      LOG.info("Ignore. already started.");
+      return;
+    } else {
+      server.start();
+    }
+  }
+
+  public void stop() {
+    if (isStarted.get()) {
+      server.shutdown();
+      try {
+        server.awaitTermination(5, TimeUnit.SECONDS);
+      } catch (Exception e) {
+        LOG.error("failed to shutdown XceiverServerGrpc", e);
+      }
+      isStarted.set(false);
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcService.java
new file mode 100644
index 0000000..726f566
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcService.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolProtos.CopyDBCheckpointRequestProto;
+import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolProtos.CopyDBCheckpointResponseProto;
+import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolServiceGrpc;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service to handle Rocks db Checkpointing.
+ */
+public class InterSCMGrpcService extends
+    InterSCMProtocolServiceGrpc.InterSCMProtocolServiceImplBase {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(InterSCMGrpcService.class);
+
+  private static final int BUFFER_SIZE = 1024 * 1024;
+
+  private final String clusterId;
+  private final SCMDBCheckpointProvider provider;
+
+  public InterSCMGrpcService(final StorageContainerManager scm) {
+    Preconditions.checkNotNull(scm);
+    this.clusterId = scm.getClusterId();
+    provider =
+        new SCMDBCheckpointProvider(scm.getScmMetadataStore().getStore());
+  }
+
+  @Override
+  public void download(CopyDBCheckpointRequestProto request,
+      StreamObserver<CopyDBCheckpointResponseProto> responseObserver) {
+    try {
+      SCMGrpcOutputStream outputStream =
+          new SCMGrpcOutputStream(responseObserver, clusterId, BUFFER_SIZE);
+      provider.writeDBCheckPointToSream(outputStream, request.getFlush());
+
+    } catch (IOException e) {
+      LOG.error("Error streaming SCM DB checkpoint", e);
+      responseObserver.onError(e);
+    }
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
index 116994e..55818c5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
@@ -97,6 +97,11 @@ public final class MockSCMHAManager implements SCMHAManager {
     return transactionBuffer;
   }
 
+  @Override
+  public SCMSnapshotProvider getSCMSnapshotProvider() {
+    return null;
+  }
+
   /**
    * {@inheritDoc}
    */
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBCheckpointProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBCheckpointProvider.java
new file mode 100644
index 0000000..d588bb9
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMDBCheckpointProvider.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.time.Instant;
+
+
+// TODO: define a generic interface for this
+public class SCMDBCheckpointProvider {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SCMDBCheckpointProvider.class);
+  private transient DBStore scmDbStore;;
+
+  public SCMDBCheckpointProvider(DBStore scmDbStore) {
+    this.scmDbStore = scmDbStore;
+  }
+
+  public void writeDBCheckPointToSream(OutputStream stream, boolean flush)
+      throws IOException {
+    LOG.info("Received request to obtain OM DB checkpoint snapshot");
+    if (scmDbStore == null) {
+      LOG.error("Unable to process checkpointing request. DB Store is null");
+      return;
+    }
+
+    DBCheckpoint checkpoint = null;
+    try {
+
+      checkpoint = scmDbStore.getCheckpoint(flush);
+      if (checkpoint == null || checkpoint.getCheckpointLocation() == null) {
+        throw new IOException("Unable to process metadata snapshot request. "
+            + "Checkpoint request returned null.");
+      }
+
+      Path file = checkpoint.getCheckpointLocation().getFileName();
+      if (file == null) {
+        return;
+      }
+
+      Instant start = Instant.now();
+      HddsServerUtil.writeDBCheckpointToStream(checkpoint, stream);
+      Instant end = Instant.now();
+
+      long duration = Duration.between(start, end).toMillis();
+      LOG.info("Time taken to write the checkpoint to response output " +
+          "stream: {} milliseconds", duration);
+
+    } catch (IOException ioe) {
+      LOG.error("Unable to process metadata snapshot request. ", ioe);
+      throw ioe;
+    } finally {
+      if (checkpoint != null) {
+        try {
+          checkpoint.cleanupCheckpoint();
+        } catch (IOException e) {
+          LOG.error("Error trying to clean checkpoint at {} .",
+              checkpoint.getCheckpointLocation().toString());
+        }
+      }
+    }
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMGrpcOutputStream.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMGrpcOutputStream.java
new file mode 100644
index 0000000..1194a52
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMGrpcOutputStream.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Stream to which the tar db checkpoint will be transferred over to the
+ * destination over grpc.
+ * TODO: Make it a generic utility to be used both during container replication
+ * as well as SCM checkpoint transfer
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolProtos;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Adapter from {@code OutputStream} to gRPC {@code StreamObserver}.
+ * Data is buffered in a limited buffer of the specified size.
+ */
+class SCMGrpcOutputStream extends OutputStream {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SCMGrpcOutputStream.class);
+
+  private final StreamObserver<InterSCMProtocolProtos.
+      CopyDBCheckpointResponseProto> responseObserver;
+
+  private final ByteString.Output buffer;
+
+  private final String clusterId;
+
+  private final int bufferSize;
+
+  private long writtenBytes;
+
+  SCMGrpcOutputStream(
+      StreamObserver<InterSCMProtocolProtos.
+          CopyDBCheckpointResponseProto> responseObserver,
+      String clusterId, int bufferSize) {
+    this.responseObserver = responseObserver;
+    this.clusterId = clusterId;
+    this.bufferSize = bufferSize;
+    buffer = ByteString.newOutput(bufferSize);
+  }
+
+  @Override public void write(int b) {
+    try {
+      buffer.write(b);
+      if (buffer.size() >= bufferSize) {
+        flushBuffer(false);
+      }
+    } catch (Exception ex) {
+      responseObserver.onError(ex);
+    }
+  }
+
+  @Override public void write(@Nonnull byte[] data, int offset, int length) {
+    if ((offset < 0) || (offset > data.length) || (length < 0) || (
+        (offset + length) > data.length) || ((offset + length) < 0)) {
+      throw new IndexOutOfBoundsException();
+    } else if (length == 0) {
+      return;
+    }
+
+    try {
+      if (buffer.size() >= bufferSize) {
+        flushBuffer(false);
+      }
+
+      int remaining = length;
+      int off = offset;
+      int len = Math.min(remaining, bufferSize - buffer.size());
+      while (remaining > 0) {
+        buffer.write(data, off, len);
+        if (buffer.size() >= bufferSize) {
+          flushBuffer(false);
+        }
+        off += len;
+        remaining -= len;
+        len = Math.min(bufferSize, remaining);
+      }
+    } catch (Exception ex) {
+      responseObserver.onError(ex);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    flushBuffer(true);
+    LOG.info("Sent {} bytes for cluster {}", writtenBytes, clusterId);
+    responseObserver.onCompleted();
+    buffer.close();
+  }
+
+  private void flushBuffer(boolean eof) {
+    int length = buffer.size();
+    if (length > 0) {
+      ByteString data = buffer.toByteString();
+      LOG.debug("Sending {} bytes (of type {})", length,
+          data.getClass().getSimpleName());
+      InterSCMProtocolProtos.CopyDBCheckpointResponseProto response =
+          InterSCMProtocolProtos.CopyDBCheckpointResponseProto.newBuilder()
+              .setClusterId(clusterId).setData(data).setEof(eof)
+              .setReadOffset(writtenBytes).setLen(length).build();
+      responseObserver.onNext(response);
+      writtenBytes += length;
+      buffer.reset();
+    }
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java
index 867e8d0..f093e45 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java
@@ -177,10 +177,47 @@ public class SCMHAConfiguration {
   )
   private long ratisRoleCheckerInterval = 15 * 1000L;
 
+  @Config(key = "ratis.snapshot.dir",
+      type = ConfigType.STRING,
+      defaultValue = "",
+      tags = {SCM, OZONE, HA, RATIS},
+      description = "The ratis snapshot dir location"
+  )
+  private String ratisSnapshotDir;
+
+  @Config(key = "grpc.bind.port",
+      type = ConfigType.INT,
+      defaultValue = "9899",
+      tags = {OZONE, SCM, HA, RATIS},
+      description = "Port used by SCM for Grpc Server."
+  )
+  // TODO: fix the default grpc port
+  private int grpcBindPort = 9899;
+
+  @Config(key = "grpc.deadline.interval",
+      type = ConfigType.TIME,
+      defaultValue = "30m",
+      tags = {OZONE, SCM, HA, RATIS},
+      description = "Deadline for SCM DB checkpoint interval."
+  )
+  private long grpcDeadlineInterval = 30 * 60 * 1000L;
+
+  public long getGrpcDeadlineInterval() {
+    return grpcDeadlineInterval;
+  }
+
+  public int getGrpcBindPort() {
+    return grpcBindPort;
+  }
+
   public String getRatisStorageDir() {
     return ratisStorageDir;
   }
 
+  public String getRatisSnapshotDir() {
+    return ratisSnapshotDir;
+  }
+
   public void setRatisStorageDir(String dir) {
     this.ratisStorageDir = dir;
   }
@@ -217,6 +254,10 @@ public class SCMHAConfiguration {
     this.raftLogPurgeEnabled = enabled;
   }
 
+  public void setGrpcBindPort(int port) {
+    this.grpcBindPort = port;
+  }
+
   public int getRaftLogPurgeGap() {
     return raftLogPurgeGap;
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
index c8b3ff7..4eb2eed 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
@@ -35,6 +35,11 @@ public interface SCMHAManager {
   SCMRatisServer getRatisServer();
 
   /**
+   * Returns SCM snapshot provider.
+   */
+  SCMSnapshotProvider getSCMSnapshotProvider();
+
+  /**
    * Returns DB transaction buffer.
    */
   DBTransactionBuffer getDBTransactionBuffer();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index 1880363..b49ba74 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hdds.scm.ha;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.ratis.server.protocol.TermIndex;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,6 +42,10 @@ public class SCMHAManagerImpl implements SCMHAManager {
   private final SCMRatisServer ratisServer;
   private final ConfigurationSource conf;
   private final SCMDBTransactionBuffer transactionBuffer;
+  private final SCMSnapshotProvider scmSnapshotProvider;
+
+  // this should ideally be started only in a ratis leader
+  private final InterSCMGrpcProtocolService grpcServer;
 
   /**
    * Creates SCMHAManager instance.
@@ -51,6 +57,10 @@ public class SCMHAManagerImpl implements SCMHAManager {
         new SCMDBTransactionBuffer(scm);
     this.ratisServer = new SCMRatisServerImpl(
         conf.getObject(SCMHAConfiguration.class), conf, scm, transactionBuffer);
+    this.scmSnapshotProvider = new SCMSnapshotProvider(conf,
+        scm.getSCMHANodeDetails().getPeerNodeDetails());
+    grpcServer = new InterSCMGrpcProtocolService(conf, scm);
+
   }
 
   /**
@@ -59,6 +69,7 @@ public class SCMHAManagerImpl implements SCMHAManager {
   @Override
   public void start() throws IOException {
     ratisServer.start();
+    grpcServer.start();
   }
 
   public SCMRatisServer getRatisServer() {
@@ -70,11 +81,76 @@ public class SCMHAManagerImpl implements SCMHAManager {
     return transactionBuffer;
   }
 
+  @Override
+  public SCMSnapshotProvider getSCMSnapshotProvider() {
+    return scmSnapshotProvider;
+  }
+
+  /**
+   * Download and install latest checkpoint from leader OM.
+   *
+   * @param leaderId peerNodeID of the leader OM
+   * @return If checkpoint is installed successfully, return the
+   *         corresponding termIndex. Otherwise, return null.
+   */
+  public TermIndex installSnapshotFromLeader(String leaderId) {
+    if(scmSnapshotProvider == null) {
+      LOG.error("OM Snapshot Provider is not configured as there are no peer " +
+          "nodes.");
+      return null;
+    }
+
+    DBCheckpoint omDBCheckpoint = getDBCheckpointFromLeader(leaderId);
+    LOG.info("Downloaded checkpoint from Leader {} to the location {}",
+        leaderId, omDBCheckpoint.getCheckpointLocation());
+
+    TermIndex termIndex = null;
+    try {
+      termIndex = installCheckpoint(leaderId, omDBCheckpoint);
+    } catch (Exception ex) {
+      LOG.error("Failed to install snapshot from Leader OM.", ex);
+    }
+    return termIndex;
+  }
+
+  /**
+   * Install checkpoint. If the checkpoints snapshot index is greater than
+   * SCM's last applied transaction index, then re-initialize the OM
+   * state via this checkpoint. Before re-initializing OM state, the OM Ratis
+   * server should be stopped so that no new transactions can be applied.
+   */
+  TermIndex installCheckpoint(String leaderId, DBCheckpoint omDBCheckpoint)
+      throws Exception {
+    // TODO : implement install checkpoint
+    return null;
+  }
+
+
+  /**
+   * Download the latest SCM DB checkpoint from the leader OM.
+   *
+   * @param leaderId OMNodeID of the leader OM node.
+   * @return latest DB checkpoint from leader OM.
+   */
+  private DBCheckpoint getDBCheckpointFromLeader(String leaderId) {
+    LOG.info("Downloading checkpoint from leader SCM {} and reloading state " +
+        "from the checkpoint.", leaderId);
+
+    try {
+      return scmSnapshotProvider.getSCMDBSnapshot(leaderId);
+    } catch (IOException e) {
+      LOG.error("Failed to download checkpoint from OM leader {}", leaderId, e);
+    }
+    return null;
+  }
+
+
   /**
    * {@inheritDoc}
    */
   @Override
   public void shutdown() throws IOException {
     ratisServer.stop();
+    grpcServer.stop();
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
index 48946b4..b61023f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
@@ -18,16 +18,27 @@
 
 package org.apache.hadoop.hdds.scm.ha;
 
+import com.google.common.base.Strings;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.nio.file.Paths;
 import java.util.Collection;
 
+import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_RATIS_SNAPSHOT_DIR;
+
 /**
  * Utility class used by SCM HA.
  */
 public final class SCMHAUtils {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMHAUtils.class);
   private SCMHAUtils() {
     // not used
   }
@@ -62,4 +73,32 @@ public final class SCMHAUtils {
         "suffix '" + suffix + "' should not already have '.' prepended.";
     return key + "." + suffix;
   }
+
+  /**
+   * Get the local directory where ratis logs will be stored.
+   */
+  public static String getSCMRatisDirectory(ConfigurationSource conf) {
+    String scmRatisDirectory =
+        conf.getObject(SCMHAConfiguration.class).getRatisStorageDir();
+
+    if (Strings.isNullOrEmpty(scmRatisDirectory)) {
+      scmRatisDirectory = ServerUtils.getDefaultRatisDirectory(conf);
+    }
+    return scmRatisDirectory;
+  }
+
+  public static String getSCMRatisSnapshotDirectory(ConfigurationSource conf) {
+    String snapshotDir =
+        conf.getObject(SCMHAConfiguration.class).getRatisStorageDir();
+
+    // If ratis snapshot directory is not set, fall back to ozone.metadata.dir.
+    if (Strings.isNullOrEmpty(snapshotDir)) {
+      LOG.warn("SCM snapshot dir is not configured. Falling back to {} config",
+          OZONE_METADATA_DIRS);
+      File metaDirPath = ServerUtils.getOzoneMetaDirPath(conf);
+      snapshotDir =
+          Paths.get(metaDirPath.getPath(), OM_RATIS_SNAPSHOT_DIR).toString();
+    }
+    return snapshotDir;
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotDownloader.java
similarity index 61%
copy from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
copy to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotDownloader.java
index c8b3ff7..7d5d3eb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotDownloader.java
@@ -14,33 +14,30 @@
  * License for the specific language governing permissions and limitations under
  * the License.
  */
-
 package org.apache.hadoop.hdds.scm.ha;
 
 import java.io.IOException;
+import java.nio.file.Path;
+import java.util.concurrent.CompletableFuture;
 
 /**
- * SCMHAManager provides HA service for SCM.
+ * Contract to download a SCM Snapshot from remote server..
+ * <p>
+ *
+ * The underlying implementation is supposed to download SCM snapshot via
+ * any chosen protocol(for now its Grpc).
+ * images.
  */
-public interface SCMHAManager {
-
-  /**
-   * Starts HA service.
-   */
-  void start() throws IOException;
+public interface SCMSnapshotDownloader {
 
   /**
-   * Returns RatisServer instance associated with the SCM instance.
+   * Downloads the contents to the target file path.
+   *
+   * @param destination
+   * @return Future task for download progress
+   * @throws IOException
    */
-  SCMRatisServer getRatisServer();
+  CompletableFuture<Path> download(Path destination) throws IOException;
 
-  /**
-   * Returns DB transaction buffer.
-   */
-  DBTransactionBuffer getDBTransactionBuffer();
-
-  /**
-   * Stops the HA service.
-   */
-  void shutdown() throws IOException;
-}
+  void close() throws Exception;
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java
new file mode 100644
index 0000000..c0e1db3
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.RocksDBCheckpoint;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SCMSnapshotProvider downloads the latest checkpoint from the
+ * leader SCM and loads the checkpoint into State Machine.
+ */
+public class SCMSnapshotProvider {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SCMSnapshotProvider.class);
+
+  private final File scmSnapshotDir;
+
+
+  private final ConfigurationSource conf;
+
+  private SCMSnapshotDownloader client;
+
+  private Map<String, SCMNodeDetails> peerNodesMap;
+
+  public SCMSnapshotProvider(ConfigurationSource conf,
+      List<SCMNodeDetails> peerNodes) {
+    LOG.info("Initializing SCM Snapshot Provider");
+    this.conf = conf;
+    // Create Ratis storage dir
+    String scmRatisDirectory = SCMHAUtils.getSCMRatisDirectory(conf);
+
+    if (scmRatisDirectory == null || scmRatisDirectory.isEmpty()) {
+      throw new IllegalArgumentException(HddsConfigKeys.OZONE_METADATA_DIRS +
+          " must be defined.");
+    }
+    HddsUtils.createDir(scmRatisDirectory);
+
+    // Create Ratis snapshot dir
+    scmSnapshotDir = HddsUtils.createDir(
+        SCMHAUtils.getSCMRatisSnapshotDirectory(conf));
+    if (peerNodes != null) {
+      this.peerNodesMap = new HashMap<>();
+      for (SCMNodeDetails peerNode : peerNodes) {
+        this.peerNodesMap.put(peerNode.getNodeId(), peerNode);
+      }
+    }
+    this.client = null;
+  }
+
+  @VisibleForTesting
+  public void setPeerNodesMap(Map<String, SCMNodeDetails> peerNodesMap) {
+    this.peerNodesMap = peerNodesMap;
+  }
+  /**
+   * Download the latest checkpoint from SCM Leader .
+   * @param leaderSCMNodeID leader SCM Node ID.
+   * @return the DB checkpoint (including the ratis snapshot index)
+   */
+  public DBCheckpoint getSCMDBSnapshot(String leaderSCMNodeID)
+      throws IOException {
+    String snapshotTime = Long.toString(System.currentTimeMillis());
+    String snapshotFileName =
+        OzoneConsts.SCM_DB_NAME + "-" + leaderSCMNodeID + "-" + snapshotTime;
+    String snapshotFilePath =
+        Paths.get(scmSnapshotDir.getAbsolutePath(), snapshotFileName).toFile()
+            .getAbsolutePath();
+    File targetFile = new File(snapshotFileName + ".tar.gz");
+
+    // the client instance will be initialized only when first install snapshot
+    // notification from ratis leader will be received.
+    if (client == null) {
+      client = new InterSCMGrpcClient(
+          peerNodesMap.get(leaderSCMNodeID).getInetAddress().getHostAddress(),
+          conf);
+    }
+    try {
+      client.download(targetFile.toPath()).get();
+    } catch (InterruptedException | ExecutionException e) {
+      LOG.error("Rocks DB checkpoint downloading failed", e);
+      throw new IOException(e);
+    }
+
+
+    // Untar the checkpoint file.
+    Path untarredDbDir = Paths.get(snapshotFilePath);
+    FileUtil.unTar(targetFile, untarredDbDir.toFile());
+    FileUtils.deleteQuietly(targetFile);
+
+    LOG.info("Successfully downloaded latest checkpoint from leader SCM: {}",
+        leaderSCMNodeID);
+
+    RocksDBCheckpoint scmCheckpoint = new RocksDBCheckpoint(untarredDbDir);
+    return scmCheckpoint;
+  }
+
+  @VisibleForTesting
+  public File getScmSnapshotDir() {
+    return scmSnapshotDir;
+  }
+
+  public void stop() throws Exception {
+    if (client != null) {
+      client.close();
+    }
+  }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java
new file mode 100644
index 0000000..3768d95
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
+import org.apache.hadoop.hdds.scm.ha.SCMHAConfiguration;
+import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
+import org.apache.hadoop.hdds.scm.ha.SCMSnapshotProvider;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
+
+/**
+ * Class to test install snapshot feature for SCM HA.
+ */
+public class TestSCMInstallSnapshot {
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    conf = new OzoneConfiguration();
+    SCMHAConfiguration scmhaConfiguration = conf.getObject(
+        SCMHAConfiguration.class);
+    scmhaConfiguration.setRatisSnapshotThreshold(1L);
+    conf.setFromObject(scmhaConfiguration);
+    cluster = MiniOzoneCluster
+        .newBuilder(conf)
+        .setNumDatanodes(3)
+        .build();
+    cluster.waitForClusterToBeReady();
+  }
+
+  @AfterClass
+  public static void shutdown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testInstallSnapshot() throws Exception {
+    StorageContainerManager scm = cluster.getStorageContainerManager();
+    ContainerManagerV2 containerManager = scm.getContainerManager();
+    PipelineManager pipelineManager = scm.getPipelineManager();
+    Pipeline ratisPipeline1 = pipelineManager.getPipeline(
+        containerManager.allocateContainer(
+            RATIS, THREE, "Owner1").getPipelineID());
+    pipelineManager.openPipeline(ratisPipeline1.getId());
+    Pipeline ratisPipeline2 = pipelineManager.getPipeline(
+        containerManager.allocateContainer(
+            RATIS, ONE, "Owner2").getPipelineID());
+    pipelineManager.openPipeline(ratisPipeline2.getId());
+    SCMNodeDetails scmNodeDetails = new SCMNodeDetails.Builder()
+        .setRpcAddress(new InetSocketAddress("0.0.0.0", 0)).setSCMNodeId("scm1")
+        .build();
+    Map<String, SCMNodeDetails> peerMap = new HashMap<>();
+    peerMap.put(scmNodeDetails.getNodeId(), scmNodeDetails);
+    SCMSnapshotProvider provider =
+        scm.getScmHAManager().getSCMSnapshotProvider();
+    provider.setPeerNodesMap(peerMap);
+    provider.getSCMDBSnapshot(scmNodeDetails.getNodeId());
+    final File[] files = FileUtil.listFiles(provider.getScmSnapshotDir());
+    Assert.assertEquals(1, files.length);
+    Assert.assertTrue(files[0].getName().startsWith(
+        OzoneConsts.SCM_DB_NAME + "-" + scmNodeDetails.getNodeId()));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org