You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sh...@apache.org on 2021/02/28 22:45:32 UTC

[ozone] branch HDDS-2823 updated: HDDS-4718. Bootstrap new SCM node (#1953)

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-2823 by this push:
     new 920381c  HDDS-4718. Bootstrap new SCM node (#1953)
920381c is described below

commit 920381c56412a275d9138726b2b930adad8e18d3
Author: bshashikant <sh...@apache.org>
AuthorDate: Mon Mar 1 04:15:08 2021 +0530

    HDDS-4718. Bootstrap new SCM node (#1953)
---
 .../java/org/apache/hadoop/hdds/HddsUtils.java     |   4 +-
 .../org/apache/hadoop/hdds/scm/AddSCMRequest.java  | 113 ++++
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |   6 +-
 .../hadoop/hdds/scm/ha/SCMHAConfiguration.java     |  20 +-
 .../org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java  |  32 +
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   3 +-
 .../org/apache/hadoop/ozone/audit/SCMAction.java   |   3 +-
 .../apache/hadoop/ozone/common/StorageInfo.java    |   8 +-
 .../common/src/main/resources/ozone-default.xml    |  11 +-
 .../transport/server/ratis/XceiverServerRatis.java |   3 +
 .../scm/protocol/ScmBlockLocationProtocol.java     |   6 +
 ...lockLocationProtocolClientSideTranslatorPB.java |  21 +
 .../SCMBlockLocationFailoverProxyProvider.java     |   2 +-
 .../java/org/apache/hadoop/hdds/utils/HAUtils.java |  25 +-
 .../interface-client/src/main/proto/hdds.proto     |  10 +
 .../src/main/proto/ScmServerProtocol.proto         |   3 +
 .../hadoop/hdds/scm/ha/InterSCMGrpcClient.java     |   4 +-
 .../hdds/scm/ha/InterSCMGrpcProtocolService.java   |   4 +-
 .../hadoop/hdds/scm/ha/MockSCMHAManager.java       |  11 +
 .../org/apache/hadoop/hdds/scm/ha/RatisUtil.java   |  10 +-
 .../apache/hadoop/hdds/scm/ha/SCMHAManager.java    |   4 +
 .../hadoop/hdds/scm/ha/SCMHAManagerImpl.java       |  40 +-
 .../hadoop/hdds/scm/ha/SCMHANodeDetails.java       |  24 +-
 .../apache/hadoop/hdds/scm/ha/SCMNodeDetails.java  |  20 +-
 .../apache/hadoop/hdds/scm/ha/SCMRatisServer.java  |   3 +
 .../hadoop/hdds/scm/ha/SCMRatisServerImpl.java     | 327 ++++------
 .../apache/hadoop/hdds/scm/ha/SCMStateMachine.java |  79 ++-
 ...lockLocationProtocolServerSideTranslatorPB.java |  14 +
 .../hdds/scm/server/SCMBlockProtocolServer.java    |  28 +
 .../hadoop/hdds/scm/server/SCMStorageConfig.java   |  33 +-
 .../hdds/scm/server/StorageContainerManager.java   |  47 +-
 .../hadoop/hdds/scm/TestHddsServerUtils.java       |  13 -
 .../hdds/scm/ha/TestReplicationAnnotation.java     |   7 +
 .../hadoop/ozone/client/TestHddsClientUtils.java   |  22 -
 .../org/apache/hadoop/ozone/ha/NodeDetails.java    |   9 +
 .../apache/hadoop/ozone/MiniOzoneChaosCluster.java |  26 +-
 .../apache/hadoop/hdds/scm/TestSCMSnapshot.java    |   4 +
 .../org/apache/hadoop/ozone/MiniOzoneCluster.java  |  34 +-
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |  89 ++-
 .../hadoop/ozone/MiniOzoneHAClusterImpl.java       | 718 +++++++++++++++++++++
 .../hadoop/ozone/MiniOzoneOMHAClusterImpl.java     | 353 +---------
 .../hadoop/ozone/TestSecureOzoneCluster.java       |   7 +
 .../hadoop/ozone/TestStorageContainerManager.java  |  21 +-
 .../client/rpc/TestContainerStateMachine.java      |   1 +
 .../ozone/scm/TestStorageContainerManagerHA.java   | 211 ++++++
 .../ozone/om/ScmBlockLocationTestingClient.java    |   6 +
 46 files changed, 1706 insertions(+), 733 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
index 6c7bff7..bcf7f30 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
@@ -311,8 +311,8 @@ public final class HddsUtils {
   public static InetSocketAddress getSingleSCMAddress(
       ConfigurationSource conf) {
     Collection<InetSocketAddress> singleton = getSCMAddresses(conf);
-    Preconditions.checkArgument(singleton.size() == 1,
-        MULTIPLE_SCM_NOT_YET_SUPPORTED);
+   // Preconditions.checkArgument(singleton.size() == 1,
+    //    MULTIPLE_SCM_NOT_YET_SUPPORTED);
     return singleton.iterator().next();
   }
 
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/AddSCMRequest.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/AddSCMRequest.java
new file mode 100644
index 0000000..8133c57
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/AddSCMRequest.java
@@ -0,0 +1,113 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+
+/**
+ * Class for ADD SCM request to be sent by Bootstrapping SCM to existing
+ * leader SCM.
+ */
+public class AddSCMRequest {
+
+  private final String clusterId;
+  private final String scmId;
+  private final String ratisAddr;
+
+  public AddSCMRequest(String clusterId, String scmId, String addr) {
+    this.clusterId = clusterId;
+    this.scmId = scmId;
+    this.ratisAddr = addr;
+  }
+
+  public static AddSCMRequest getFromProtobuf(
+      HddsProtos.AddScmRequestProto proto) {
+    return new Builder().setClusterId(proto.getClusterId())
+        .setScmId(proto.getScmId()).setRatisAddr(proto.getRatisAddr()).build();
+  }
+
+  public HddsProtos.AddScmRequestProto getProtobuf() {
+    return HddsProtos.AddScmRequestProto.newBuilder().setClusterId(clusterId)
+        .setScmId(scmId).setRatisAddr(ratisAddr).build();
+  }
+  /**
+   * Builder for AddSCMRequest.
+   */
+  public static class Builder {
+    private String clusterId;
+    private String scmId;
+    private String ratisAddr;
+
+
+    /**
+     * sets the cluster id.
+     * @param cid clusterId to be set
+     * @return Builder for AddSCMRequest
+     */
+    public AddSCMRequest.Builder setClusterId(String cid) {
+      this.clusterId = cid;
+      return this;
+    }
+
+    /**
+     * sets the scmId.
+     * @param id scmId
+     * @return Builder for AddSCMRequest
+     */
+    public AddSCMRequest.Builder setScmId(String id) {
+      this.scmId = id;
+      return this;
+    }
+
+    /**
+     * Set ratis address in Scm HA.
+     * @param   addr  address in the format of [ip|hostname]:port
+     * @return  Builder for AddSCMRequest
+     */
+    public AddSCMRequest.Builder setRatisAddr(String addr) {
+      this.ratisAddr = addr;
+      return this;
+    }
+
+    public AddSCMRequest build() {
+      return new AddSCMRequest(clusterId, scmId, ratisAddr);
+    }
+  }
+
+  /**
+   * Gets the clusterId from the Version file.
+   * @return ClusterId
+   */
+  public String getClusterId() {
+    return clusterId;
+  }
+
+  /**
+   * Gets the SCM Id from the Version file.
+   * @return SCM Id
+   */
+  public String getScmId() {
+    return scmId;
+  }
+
+  public String getRatisAddr() {
+    return ratisAddr;
+  }
+
+}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 5c77cd9..1ffcda1 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -384,7 +384,11 @@ public final class ScmConfigKeys {
   public static final String OZONE_SCM_RATIS_PORT_KEY
       = "ozone.scm.ratis.port";
   public static final int OZONE_SCM_RATIS_PORT_DEFAULT
-      = 9864;
+      = 9865;
+  public static final String OZONE_SCM_GRPC_PORT_KEY
+      = "ozone.scm.grpc.port";
+  public static final int OZONE_SCM_GRPC_PORT_DEFAULT
+      = 9866;
   public static final String OZONE_SCM_RATIS_RPC_TYPE_KEY
       = "ozone.scm.ratis.rpc.type";
   public static final String OZONE_SCM_RATIS_RPC_TYPE_DEFAULT
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java
index f093e45..ddf008c 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java
@@ -149,12 +149,12 @@ public class SCMHAConfiguration {
 
   @Config(key = "ratis.leader.election.timeout",
       type = ConfigType.TIME,
-      defaultValue = "1s",
+      defaultValue = "5s",
       tags = {SCM, OZONE, HA, RATIS},
       description = "The minimum timeout duration for SCM ratis leader" +
           " election. Default is 1s."
   )
-  private long ratisLeaderElectionTimeout = 1 * 1000L;
+  private long ratisLeaderElectionTimeout = 5 * 1000L;
 
   @Config(key = "ratis.server.failure.timeout.duration",
       type = ConfigType.TIME,
@@ -185,15 +185,6 @@ public class SCMHAConfiguration {
   )
   private String ratisSnapshotDir;
 
-  @Config(key = "grpc.bind.port",
-      type = ConfigType.INT,
-      defaultValue = "9899",
-      tags = {OZONE, SCM, HA, RATIS},
-      description = "Port used by SCM for Grpc Server."
-  )
-  // TODO: fix the default grpc port
-  private int grpcBindPort = 9899;
-
   @Config(key = "grpc.deadline.interval",
       type = ConfigType.TIME,
       defaultValue = "30m",
@@ -206,9 +197,6 @@ public class SCMHAConfiguration {
     return grpcDeadlineInterval;
   }
 
-  public int getGrpcBindPort() {
-    return grpcBindPort;
-  }
 
   public String getRatisStorageDir() {
     return ratisStorageDir;
@@ -254,10 +242,6 @@ public class SCMHAConfiguration {
     this.raftLogPurgeEnabled = enabled;
   }
 
-  public void setGrpcBindPort(int port) {
-    this.grpcBindPort = port;
-  }
-
   public int getRaftLogPurgeGap() {
     return raftLogPurgeGap;
   }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
index 28b3dcd..45c0401 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
@@ -22,14 +22,19 @@ package org.apache.hadoop.hdds.scm.ha;
 import com.google.common.base.Strings;
 import org.apache.hadoop.hdds.conf.ConfigurationException;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.ozone.ha.ConfUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
+
 import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_RATIS_SNAPSHOT_DIR;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEFAULT_SERVICE_ID;
@@ -131,4 +136,31 @@ public final class SCMHAUtils {
     }
     return localScmServiceId;
   }
+
+  /**
+   * Removes the self node from the list of nodes in the
+   * configuration.
+   * @param configuration OzoneConfiguration
+   * @return Updated OzoneConfiguration
+   */
+
+  public static OzoneConfiguration removeSelfId(
+      OzoneConfiguration configuration) {
+    final OzoneConfiguration conf = new OzoneConfiguration(configuration);
+    String scmNodes = conf.get(ConfUtils
+        .addKeySuffixes(ScmConfigKeys.OZONE_SCM_NODES_KEY,
+            getScmServiceId(conf)));
+    if (scmNodes != null) {
+      final String selfId = conf.get(ScmConfigKeys.OZONE_SCM_NODE_ID_KEY);
+      String[] parts = scmNodes.split(",");
+      List<String> partsLeft = new ArrayList<>();
+      for (String part : parts) {
+        if (!part.equals(selfId)) {
+          partsLeft.add(part);
+        }
+      }
+      conf.set(ScmConfigKeys.OZONE_SCM_NODES_KEY, String.join(",", partsLeft));
+    }
+    return conf;
+  }
 }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index a5d3c7d..9d82444 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -38,8 +38,7 @@ public final class OzoneConsts {
 
   public static final String STORAGE_DIR = "scm";
   public static final String SCM_ID = "scmUuid";
-  public static final String SCM_NODES_INFO = "scmNodesInfo";
-  public static final String SCM_NODES_SEPARATOR = "-";
+  public static final String CLUSTER_ID_PREFIX = "CID-";
 
   public static final String OZONE_SIMPLE_ROOT_USER = "root";
   public static final String OZONE_SIMPLE_HDFS_USER = "hdfs";
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
index 102d47a..a4ae55e 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
@@ -44,7 +44,8 @@ public enum SCMAction implements AuditAction {
   START_REPLICATION_MANAGER,
   STOP_REPLICATION_MANAGER,
   GET_REPLICATION_MANAGER_STATUS,
-  GET_CONTAINER_WITH_PIPELINE_BATCH;
+  GET_CONTAINER_WITH_PIPELINE_BATCH,
+  ADD_SCM;
 
   @Override
   public String getAction() {
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/StorageInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/StorageInfo.java
index c88aaa9..492931d 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/StorageInfo.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/StorageInfo.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.common;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
+import org.apache.hadoop.ozone.OzoneConsts;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -27,6 +28,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.util.Properties;
+import java.util.UUID;
 
 /**
  * Common class for storage information. This class defines the common
@@ -197,11 +199,7 @@ public class StorageInfo {
    * @return new clusterID
    */
   public static String newClusterID() {
-    // TODO:
-    //  Please check https://issues.apache.org/jira/browse/HDDS-4538
-    //  hard code clusterID and scmUuid on HDDS-2823,
-    //  so that multi SCMs won't cause chaos in Datanode side.
-    return "CID-1df51ed9-19f1-4283-8f61-5d90a84c196c";
+    return OzoneConsts.CLUSTER_ID_PREFIX + UUID.randomUUID().toString();
   }
 
 }
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 8fac16d..d608720 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1890,7 +1890,7 @@
 
   <property>
     <name>ozone.scm.ratis.port</name>
-    <value>9872</value>
+    <value>9865</value>
     <tag>OZONE, SCM, HA, RATIS</tag>
     <description>
       The port number of the SCM's Ratis server.
@@ -1898,6 +1898,15 @@
   </property>
 
   <property>
+    <name>ozone.scm.grpc.port</name>
+    <value>9866</value>
+    <tag>OZONE, SCM, HA, RATIS</tag>
+    <description>
+      The port number of the SCM's grpc server.
+    </description>
+  </property>
+
+  <property>
     <name>ozone.scm.ratis.rpc.type</name>
     <value>GRPC</value>
     <tag>OZONE, SCM, HA, RATIS</tag>
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index dec60f4..f2c4e7e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -765,12 +765,15 @@ public final class XceiverServerRatis implements XceiverServerSpi {
         clientId, server.getId(), nextCallId(), group);
 
     RaftClientReply reply;
+    LOG.debug("Received addGroup request for pipeline {}", pipelineID);
+
     try {
       reply = server.groupManagement(request);
     } catch (Exception e) {
       throw new IOException(e.getMessage(), e);
     }
     processReply(reply);
+    LOG.info("Created group {}", pipelineID);
   }
 
   @Override
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java
index 9c9ba50..9137c6e 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdds.scm.protocol;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.AddSCMRequest;
 import org.apache.hadoop.hdds.scm.ScmConfig;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.security.KerberosInfo;
@@ -78,6 +79,11 @@ public interface ScmBlockLocationProtocol extends Closeable {
   ScmInfo getScmInfo() throws IOException;
 
   /**
+   * Request to add SCM instance to HA group.
+   */
+  boolean addSCM(AddSCMRequest request) throws IOException;
+
+  /**
    * Sort datanodes with distance to client.
    * @param nodes list of network name of each node.
    * @param clientMachine client address, depends, can be hostname or ipaddress.
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
index a9d974c..0662a81 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
     .SortDatanodesRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
     .SortDatanodesResponseProto;
+import org.apache.hadoop.hdds.scm.AddSCMRequest;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
@@ -248,6 +249,26 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB
   }
 
   /**
+   * Request to add SCM to existing SCM HA group.
+   * @return status
+   * @throws IOException
+   */
+  @Override
+  public boolean addSCM(AddSCMRequest request) throws IOException {
+    HddsProtos.AddScmRequestProto requestProto =
+        request.getProtobuf();
+    HddsProtos.AddScmResponseProto resp;
+    SCMBlockLocationRequest wrapper = createSCMBlockRequest(
+        Type.AddScm)
+        .setAddScmRequestProto(requestProto)
+        .build();
+
+    final SCMBlockLocationResponse wrappedResponse =
+        handleError(submitRequest(wrapper));
+    resp = wrappedResponse.getAddScmResponse();
+    return resp.getSuccess();
+  }
+  /**
    * Sort the datanodes based on distance from client.
    * @return List<DatanodeDetails></>
    * @throws IOException
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
index eb90b5a..d982cf5 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java
@@ -143,7 +143,7 @@ public class SCMBlockLocationFailoverProxyProvider implements
       nextProxyIndex();
     } else {
       if (!assignLeaderToNode(newLeader)) {
-        LOG.debug("Failing over OM proxy to nodeId: {}", newLeader);
+        LOG.debug("Failing over SCM proxy to nodeId: {}", newLeader);
         nextProxyIndex();
       }
     }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
index 1e50364..7d662f6 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
@@ -17,17 +17,15 @@
 package org.apache.hadoop.hdds.utils;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.AddSCMRequest;
 import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.scm.proxy.SCMBlockLocationFailoverProxyProvider;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
-import org.apache.hadoop.io.retry.RetryPolicy;
 
 import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithFixedSleep;
 
 /**
  * utility class used by SCM and OM for HA.
@@ -40,12 +38,7 @@ public final class HAUtils {
   public static ScmInfo getScmInfo(OzoneConfiguration conf)
       throws IOException {
     try {
-      RetryPolicy retryPolicy = retryUpToMaximumCountWithFixedSleep(
-          10, 5, TimeUnit.SECONDS);
-      RetriableTask<ScmInfo> retriable = new RetriableTask<>(
-          retryPolicy, "getScmInfo",
-          () -> getScmBlockClient(conf).getScmInfo());
-      return retriable.call();
+      return getScmBlockClient(conf).getScmInfo();
     } catch (IOException e) {
       throw e;
     } catch (Exception e) {
@@ -53,6 +46,18 @@ public final class HAUtils {
     }
   }
 
+  public static boolean addSCM(OzoneConfiguration conf, AddSCMRequest request)
+      throws IOException {
+    OzoneConfiguration config = SCMHAUtils.removeSelfId(conf);
+    try {
+      return getScmBlockClient(config).addSCM(request);
+    } catch (IOException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new IOException("Failed to add SCM", e);
+    }
+  }
+
   /**
    * Create a scm block client.
    *
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index 1636328..9b540ad 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -229,6 +229,16 @@ message GetScmInfoResponseProto {
     repeated string peerRoles = 3;
 }
 
+message AddScmRequestProto {
+    required string clusterId = 1;
+    required string scmId = 2;
+    required string ratisAddr = 3;
+}
+
+message AddScmResponseProto {
+    required bool success = 1;
+    optional string scmId = 2;
+}
 
 enum ReplicationType {
     RATIS = 1;
diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
index 51f2eef..1b73d6a 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
@@ -38,6 +38,7 @@ enum Type {
   DeleteScmKeyBlocks = 12;
   GetScmInfo         = 13;
   SortDatanodes      = 14;
+  AddScm             = 15;
 }
 
 message SCMBlockLocationRequest {
@@ -54,6 +55,7 @@ message SCMBlockLocationRequest {
   optional DeleteScmKeyBlocksRequestProto     deleteScmKeyBlocksRequest = 12;
   optional hadoop.hdds.GetScmInfoRequestProto getScmInfoRequest         = 13;
   optional SortDatanodesRequestProto          sortDatanodesRequest      = 14;
+  optional hadoop.hdds.AddScmRequestProto     addScmRequestProto       = 15;
 }
 
 message SCMBlockLocationResponse {
@@ -77,6 +79,7 @@ message SCMBlockLocationResponse {
   optional DeleteScmKeyBlocksResponseProto     deleteScmKeyBlocksResponse = 12;
   optional hadoop.hdds.GetScmInfoResponseProto getScmInfoResponse         = 13;
   optional SortDatanodesResponseProto          sortDatanodesResponse      = 14;
+  optional hadoop.hdds.AddScmResponseProto     addScmResponse        = 15;
 }
 
 /**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java
index fc347e5..2c4b98f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolProtos;
 import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolProtos.CopyDBCheckpointResponseProto;
 import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolServiceGrpc;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
 import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
@@ -55,7 +56,8 @@ public class InterSCMGrpcClient implements SCMSnapshotDownloader{
 
   public InterSCMGrpcClient(final String host, final ConfigurationSource conf) {
     Preconditions.checkNotNull(conf);
-    int port = conf.getObject(SCMHAConfiguration.class).getGrpcBindPort();
+    int port = conf.getInt(ScmConfigKeys.OZONE_SCM_GRPC_PORT_KEY,
+        ScmConfigKeys.OZONE_SCM_GRPC_PORT_DEFAULT);
     timeout =
         conf.getObject(SCMHAConfiguration.class).getGrpcDeadlineInterval();
     NettyChannelBuilder channelBuilder =
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java
index 9967530..d92220a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.ratis.thirdparty.io.grpc.Server;
@@ -46,7 +47,8 @@ public class InterSCMGrpcProtocolService {
   public InterSCMGrpcProtocolService(final ConfigurationSource conf,
       final StorageContainerManager scm) {
     Preconditions.checkNotNull(conf);
-    this.port = conf.getObject(SCMHAConfiguration.class).getGrpcBindPort();
+    this.port = conf.getInt(ScmConfigKeys.OZONE_SCM_GRPC_PORT_KEY,
+        ScmConfigKeys.OZONE_SCM_GRPC_PORT_DEFAULT);
 
     NettyServerBuilder nettyServerBuilder =
         ((NettyServerBuilder) ServerBuilder.forPort(port))
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
index 55818c5..76d8c14 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
@@ -26,6 +26,7 @@ import java.util.Map;
 
 import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import org.apache.hadoop.hdds.scm.AddSCMRequest;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
@@ -110,6 +111,11 @@ public final class MockSCMHAManager implements SCMHAManager {
     ratisServer.stop();
   }
 
+  @Override
+  public boolean addSCM(AddSCMRequest request) throws IOException {
+    return false;
+  }
+
   private class MockRatisServer implements SCMRatisServer {
 
     private Map<RequestType, Object> handlers =
@@ -221,5 +227,10 @@ public final class MockSCMHAManager implements SCMHAManager {
           RaftPeerId.valueOf("peer"), RaftGroupId.randomId()),
           null, new ArrayList<>());
     }
+
+    @Override
+    public boolean addSCM(AddSCMRequest request) throws IOException {
+      return false;
+    }
   }
 }
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
index db5ecf4..75db7bf 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.ha;
 
 import com.google.common.base.Strings;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
@@ -57,7 +58,7 @@ public final class RatisUtil {
     // TODO: Check the default values.
     final RaftProperties properties = new RaftProperties();
     setRaftStorageDir(properties, haConf, conf);
-    setRaftRpcProperties(properties, haConf);
+    setRaftRpcProperties(properties, haConf, conf);
     setRaftLogProperties(properties, haConf);
     setRaftRetryCacheProperties(properties, haConf);
     setRaftSnapshotProperties(properties, haConf);
@@ -95,11 +96,12 @@ public final class RatisUtil {
    * @param conf SCMHAConfiguration
    */
   private static void setRaftRpcProperties(final RaftProperties properties,
-                                           final SCMHAConfiguration conf) {
+      final SCMHAConfiguration conf, ConfigurationSource ozoneConf) {
     RaftConfigKeys.Rpc.setType(properties,
         RpcType.valueOf(conf.getRatisRpcType()));
-    GrpcConfigKeys.Server.setPort(properties,
-        conf.getRatisBindAddress().getPort());
+    GrpcConfigKeys.Server.setPort(properties, ozoneConf
+        .getInt(ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY,
+            ScmConfigKeys.OZONE_SCM_RATIS_PORT_DEFAULT));
     GrpcConfigKeys.setMessageSizeMax(properties,
         SizeInBytes.valueOf("32m"));
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
index 4eb2eed..937f0e3 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
@@ -17,6 +17,8 @@
 
 package org.apache.hadoop.hdds.scm.ha;
 
+import org.apache.hadoop.hdds.scm.AddSCMRequest;
+
 import java.io.IOException;
 
 /**
@@ -48,4 +50,6 @@ public interface SCMHAManager {
    * Stops the HA service.
    */
   void shutdown() throws IOException;
+
+  boolean addSCM(AddSCMRequest request) throws IOException;
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index b49ba74..3ca648e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -17,8 +17,12 @@
 
 package org.apache.hadoop.hdds.scm.ha;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.AddSCMRequest;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.utils.HAUtils;
 import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.slf4j.Logger;
@@ -43,6 +47,7 @@ public class SCMHAManagerImpl implements SCMHAManager {
   private final ConfigurationSource conf;
   private final SCMDBTransactionBuffer transactionBuffer;
   private final SCMSnapshotProvider scmSnapshotProvider;
+  private final StorageContainerManager scm;
 
   // this should ideally be started only in a ratis leader
   private final InterSCMGrpcProtocolService grpcServer;
@@ -53,10 +58,10 @@ public class SCMHAManagerImpl implements SCMHAManager {
   public SCMHAManagerImpl(final ConfigurationSource conf,
       final StorageContainerManager scm) throws IOException {
     this.conf = conf;
+    this.scm = scm;
     this.transactionBuffer =
         new SCMDBTransactionBuffer(scm);
-    this.ratisServer = new SCMRatisServerImpl(
-        conf.getObject(SCMHAConfiguration.class), conf, scm, transactionBuffer);
+    this.ratisServer = new SCMRatisServerImpl(conf, scm, transactionBuffer);
     this.scmSnapshotProvider = new SCMSnapshotProvider(conf,
         scm.getSCMHANodeDetails().getPeerNodeDetails());
     grpcServer = new InterSCMGrpcProtocolService(conf, scm);
@@ -69,6 +74,23 @@ public class SCMHAManagerImpl implements SCMHAManager {
   @Override
   public void start() throws IOException {
     ratisServer.start();
+    if (ratisServer.getDivision().getGroup().getPeers().isEmpty()) {
+      // this is a bootstrapped node
+      // It will first try to add itself to existing ring
+      boolean success = HAUtils.addSCM(OzoneConfiguration.of(conf),
+          new AddSCMRequest.Builder().setClusterId(scm.getClusterId())
+              .setScmId(scm.getScmId()).setRatisAddr(
+              scm.getSCMHANodeDetails().getLocalNodeDetails()
+                  // TODO : Should we use IP instead of hostname??
+                  .getRatisHostPortStr()).build());
+      if (!success) {
+        throw new IOException("Adding SCM to existing HA group failed");
+      }
+    } else {
+      LOG.info(" scm role is {} peers {}",
+          ratisServer.getDivision().getInfo().getCurrentRole(),
+          ratisServer.getDivision().getGroup().getPeers());
+    }
     grpcServer.start();
   }
 
@@ -153,4 +175,18 @@ public class SCMHAManagerImpl implements SCMHAManager {
     ratisServer.stop();
     grpcServer.stop();
   }
+
+  @Override
+  public boolean addSCM(AddSCMRequest request) throws IOException {
+    String clusterId = scm.getClusterId();
+    if (!request.getClusterId().equals(scm.getClusterId())) {
+      throw new IOException(
+          "SCM " + request.getScmId() + " with addr " + request.getRatisAddr()
+              + " has cluster Id " + request.getClusterId()
+              + " but leader SCM cluster id is " + clusterId);
+    }
+    Preconditions.checkNotNull(
+        getRatisServer().getDivision().getGroup().getGroupId());
+    return getRatisServer().addSCM(request);
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHANodeDetails.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHANodeDetails.java
index 499f2d5..b4d83e0 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHANodeDetails.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHANodeDetails.java
@@ -54,6 +54,8 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HTTP_BIND_HOST_
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEFAULT_SERVICE_ID;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_RATIS_PORT_DEFAULT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_GRPC_PORT_KEY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_GRPC_PORT_DEFAULT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_ADDRESS_KEY;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_BIND_HOST_KEY;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_SECURITY_SERVICE_PORT_KEY;
@@ -80,6 +82,7 @@ public class SCMHANodeDetails {
       OZONE_SCM_SECURITY_SERVICE_PORT_KEY,
       OZONE_SCM_SECURITY_SERVICE_BIND_HOST_KEY,
       OZONE_SCM_RATIS_PORT_KEY,
+      OZONE_SCM_GRPC_PORT_KEY,
       OZONE_SCM_HTTP_BIND_HOST_KEY,
       OZONE_SCM_HTTPS_BIND_HOST_KEY,
       OZONE_SCM_HTTP_ADDRESS_KEY,
@@ -107,10 +110,14 @@ public class SCMHANodeDetails {
     int ratisPort = conf.getInt(
         ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY,
         ScmConfigKeys.OZONE_SCM_RATIS_PORT_DEFAULT);
+    int grpcPort = conf.getInt(
+        ScmConfigKeys.OZONE_SCM_GRPC_PORT_KEY,
+        ScmConfigKeys.OZONE_SCM_GRPC_PORT_DEFAULT);
     InetSocketAddress rpcAddress = new InetSocketAddress(
         InetAddress.getLocalHost(), 0);
     SCMNodeDetails scmNodeDetails = new SCMNodeDetails.Builder()
         .setRatisPort(ratisPort)
+        .setGrpcPort(grpcPort)
         .setRpcAddress(rpcAddress)
         .setDatanodeProtocolServerAddress(
             HddsServerUtil.getScmDataNodeBindAddress(conf))
@@ -133,6 +140,7 @@ public class SCMHANodeDetails {
     String localScmServiceId = null;
     String localScmNodeId = null;
     int localRatisPort = 0;
+    int localGrpcPort = 0;
 
     Collection<String> scmServiceIds;
 
@@ -195,6 +203,11 @@ public class SCMHANodeDetails {
             serviceId, nodeId);
         int ratisPort = conf.getInt(ratisPortKey, OZONE_SCM_RATIS_PORT_DEFAULT);
 
+        String grpcPortKey = ConfUtils
+            .addKeySuffixes(ScmConfigKeys.OZONE_SCM_GRPC_PORT_KEY, serviceId,
+                nodeId);
+        int grpcPort = conf.getInt(grpcPortKey, OZONE_SCM_GRPC_PORT_DEFAULT);
+
         InetSocketAddress addr = null;
         try {
           addr = NetUtils.createSocketAddr(rpcAddrStr, ratisPort);
@@ -215,10 +228,11 @@ public class SCMHANodeDetails {
           localScmServiceId = serviceId;
           localScmNodeId = nodeId;
           localRatisPort = ratisPort;
+          localGrpcPort = grpcPort;
           found++;
         } else {
           peerNodesList.add(getHASCMNodeDetails(conf, serviceId,
-              nodeId, addr, ratisPort));
+              nodeId, addr, ratisPort, grpcPort));
         }
       }
 
@@ -232,8 +246,9 @@ public class SCMHANodeDetails {
         ConfUtils.setNodeSpecificConfigs(nodeSpecificConfigKeys, conf,
             localScmServiceId, localScmNodeId, LOG);
 
-        return new SCMHANodeDetails(getHASCMNodeDetails(conf, localScmServiceId,
-            localScmNodeId, localRpcAddress, localRatisPort), peerNodesList);
+        return new SCMHANodeDetails(
+            getHASCMNodeDetails(conf, localScmServiceId, localScmNodeId,
+                localRpcAddress, localRatisPort, localGrpcPort), peerNodesList);
 
       } else if (found > 1) {
         throwConfException("Configuration has multiple %s addresses that " +
@@ -253,7 +268,7 @@ public class SCMHANodeDetails {
 
   public static SCMNodeDetails getHASCMNodeDetails(OzoneConfiguration conf,
       String localScmServiceId, String localScmNodeId,
-      InetSocketAddress rpcAddress, int ratisPort) {
+      InetSocketAddress rpcAddress, int ratisPort, int grpcPort) {
     Preconditions.checkNotNull(localScmServiceId);
     Preconditions.checkNotNull(localScmNodeId);
 
@@ -261,6 +276,7 @@ public class SCMHANodeDetails {
     builder
         .setRpcAddress(rpcAddress)
         .setRatisPort(ratisPort)
+        .setGrpcPort(grpcPort)
         .setSCMServiceId(localScmServiceId)
         .setSCMNodeId(localScmNodeId)
         .setBlockProtocolServerAddress(
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java
index 2da4146..3f56e6a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java
@@ -36,7 +36,7 @@ public final class SCMNodeDetails extends NodeDetails {
   private String clientProtocolServerAddressKey;
   private InetSocketAddress datanodeProtocolServerAddress;
   private String datanodeAddressKey;
-
+  private int grpcPort;
   public static final Logger LOG =
       LoggerFactory.getLogger(SCMNodeDetails.class);
 
@@ -45,15 +45,17 @@ public final class SCMNodeDetails extends NodeDetails {
    */
   @SuppressWarnings("checkstyle:ParameterNumber")
   private SCMNodeDetails(String serviceId, String nodeId,
-      InetSocketAddress rpcAddr, int ratisPort, String httpAddress,
-      String httpsAddress, InetSocketAddress blockProtocolServerAddress,
+      InetSocketAddress rpcAddr, int ratisPort, int grpcPort,
+      String httpAddress, String httpsAddress,
+      InetSocketAddress blockProtocolServerAddress,
       InetSocketAddress clientProtocolServerAddress,
-      InetSocketAddress datanodeProtocolServerAddress,
-      RaftGroup group, RaftPeerId selfPeerId, String datanodeAddressKey,
+      InetSocketAddress datanodeProtocolServerAddress, RaftGroup group,
+      RaftPeerId selfPeerId, String datanodeAddressKey,
       String blockProtocolServerAddressKey,
       String clientProtocolServerAddressAddressKey) {
     super(serviceId, nodeId, rpcAddr, ratisPort,
         httpAddress, httpsAddress);
+    this.grpcPort = grpcPort;
     this.blockProtocolServerAddress = blockProtocolServerAddress;
     this.clientProtocolServerAddress = clientProtocolServerAddress;
     this.datanodeProtocolServerAddress = datanodeProtocolServerAddress;
@@ -85,6 +87,7 @@ public final class SCMNodeDetails extends NodeDetails {
     private String scmNodeId;
     private InetSocketAddress rpcAddress;
     private int ratisPort;
+    private int grpcPort;
     private String httpAddr;
     private String httpsAddr;
     private InetSocketAddress blockProtocolServerAddress;
@@ -146,6 +149,11 @@ public final class SCMNodeDetails extends NodeDetails {
       return this;
     }
 
+    public Builder setGrpcPort(int port) {
+      this.grpcPort = port;
+      return this;
+    }
+
     public Builder setSCMServiceId(String serviceId) {
       this.scmServiceId = serviceId;
       return this;
@@ -168,7 +176,7 @@ public final class SCMNodeDetails extends NodeDetails {
 
     public SCMNodeDetails build() {
       return new SCMNodeDetails(scmServiceId, scmNodeId, rpcAddress,
-          ratisPort, httpAddr, httpsAddr, blockProtocolServerAddress,
+          ratisPort, grpcPort, httpAddr, httpsAddr, blockProtocolServerAddress,
           clientProtocolServerAddress, datanodeProtocolServerAddress,
           raftGroup, selfPeerId, datanodeAddressKey,
           blockProtocolServerAddressKey, clientProtocolServerAddressKey);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
index d8a78be..c5a4bae 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdds.scm.ha;
 
 import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import org.apache.hadoop.hdds.scm.AddSCMRequest;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.server.RaftServer;
 
@@ -50,4 +51,6 @@ public interface SCMRatisServer {
    * Returns NotLeaderException with useful info.
    */
   NotLeaderException triggerNotLeaderException();
+
+  boolean addSCM(AddSCMRequest request) throws IOException;
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
index 505e152..4157046 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
@@ -19,10 +19,6 @@ package org.apache.hadoop.hdds.scm.ha;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -36,12 +32,13 @@ import java.util.stream.Stream;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.AddSCMRequest;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.util.Time;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftClientReply;
@@ -51,6 +48,7 @@ import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.protocol.SetConfigurationRequest;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.slf4j.Logger;
@@ -63,52 +61,100 @@ public class SCMRatisServerImpl implements SCMRatisServer {
   private static final Logger LOG =
       LoggerFactory.getLogger(SCMRatisServerImpl.class);
 
-  private final RaftServer.Division division;
+  private final RaftServer server;
+  private final SCMStateMachine stateMachine;
   private final StorageContainerManager scm;
-  private final InetSocketAddress address;
   private final ClientId clientId = ClientId.randomId();
   private final AtomicLong callId = new AtomicLong();
+  private final RaftServer.Division division;
 
   // TODO: Refactor and remove ConfigurationSource and use only
   //  SCMHAConfiguration.
-  SCMRatisServerImpl(final SCMHAConfiguration haConf,
-      final ConfigurationSource conf, final StorageContainerManager scm,
-      final DBTransactionBuffer buffer) throws IOException {
+  SCMRatisServerImpl(final ConfigurationSource conf,
+      final StorageContainerManager scm, final DBTransactionBuffer buffer)
+      throws IOException {
     this.scm = scm;
-    this.address = haConf.getRatisBindAddress();
-    RaftServer server = newRaftServer(scm.getClusterId(), scm.getScmId(),
-        scm.getSCMHANodeDetails(), conf)
-        .setStateMachine(new SCMStateMachine(scm, this, buffer)).build();
+    this.stateMachine = new SCMStateMachine(scm, this, buffer);
+    final RaftGroupId groupId = buildRaftGroupId(scm.getClusterId());
+    LOG.info("starting Raft server for scm:{}", scm.getScmId());
+    // During SCM startup, the bootstrapped node will be started just with
+    // groupId information, so that it won't trigger any leader election
+    // as it doesn't have any peer info.
+
+    // The primary SCM node which is initialized using scm --init command,
+    // will initialize the raft server with the peer info and it will be
+    // persisted in the raft log post leader election. Now, when the primary
+    // scm boots up, it has peer info embedded in the raft log and will
+    // trigger leader election.
+    this.server =
+        newRaftServer(scm.getScmId(), conf).setStateMachine(stateMachine)
+            .setGroup(RaftGroup.valueOf(groupId)).build();
+    this.division = server.getDivision(groupId);
+  }
+
+  public static void initialize(String clusterId, String scmId,
+      SCMNodeDetails details, OzoneConfiguration conf) throws IOException {
+    final RaftGroup group = buildRaftGroup(details, scmId, clusterId);
+    RaftServer server = newRaftServer(scmId, conf)
+        .setGroup(group).build();
+    server.start();
+    // TODO: Timeout and sleep interval should be made configurable
+    waitforLeaderToBeReady(server, 60000, group);
+    server.close();
+  }
+
+  private static void waitforLeaderToBeReady(RaftServer server, long timeout,
+      RaftGroup group)
+      throws IOException {
+    boolean ready;
+    long st = Time.monotonicNow();
+    do {
+      ready = server.getDivision(group.getGroupId()).getInfo().isLeaderReady();
+      if (!ready) {
+        try {
+          Thread.sleep(2000);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    } while (!ready && Time.monotonicNow() - st < timeout);
 
-    this.division =
-        server.getDivision(server.getGroups().iterator().next().getGroupId());
+    if (!ready) {
+      throw new IOException(String
+          .format("Ratis group %s is not ready in %d ms", group.getGroupId(),
+              timeout));
+    }
   }
 
-  public static RaftServer.Builder newRaftServer(final String clusterId,
-      final String scmId, final SCMHANodeDetails haDetails,
-      final ConfigurationSource conf) throws IOException {
-    final String scmNodeId = haDetails.getLocalNodeDetails().getNodeId();
+  private static RaftServer.Builder newRaftServer(final String scmId,
+      final ConfigurationSource conf) {
     final SCMHAConfiguration haConf = conf.getObject(SCMHAConfiguration.class);
-    SCMHAGroupBuilder haGrpBuilder = scmNodeId != null ?
-        new SCMHAGroupBuilder(haDetails, clusterId, scmId) :
-        new SCMHAGroupBuilder(haConf, conf, clusterId);
     final RaftProperties serverProperties =
         RatisUtil.newRaftProperties(haConf, conf);
-    return RaftServer.newBuilder().setServerId(haGrpBuilder.getPeerId())
-        .setGroup(haGrpBuilder.getRaftGroup()).setProperties(serverProperties)
-        .setStateMachine(new SCMStateMachine());
+    return RaftServer.newBuilder().setServerId(RaftPeerId.getRaftPeerId(scmId))
+        .setProperties(serverProperties)
+        .setStateMachine(new SCMStateMachine(false));
   }
 
   @Override
   public void start() throws IOException {
-    division.getRaftServer().start();
+    server.start();
+  }
+
+  @Override
+  public RaftServer.Division getDivision() {
+    return division;
+  }
+
+  @VisibleForTesting
+  public SCMStateMachine getStateMachine() {
+    return stateMachine;
   }
 
   @Override
   public void registerStateMachineHandler(final RequestType handlerType,
                                           final Object handler) {
-    ((SCMStateMachine) division.getStateMachine())
-        .registerHandler(handlerType, handler);
+    stateMachine.registerHandler(handlerType, handler);
   }
 
   @Override
@@ -116,16 +162,17 @@ public class SCMRatisServerImpl implements SCMRatisServer {
       throws IOException, ExecutionException, InterruptedException {
     final RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
         .setClientId(clientId)
-        .setServerId(division.getId())
-        .setGroupId(division.getGroup().getGroupId())
+        .setServerId(getDivision().getId())
+        .setGroupId(getDivision().getGroup().getGroupId())
         .setCallId(nextCallId())
         .setMessage(request.encode())
         .setType(RaftClientRequest.writeRequestType())
         .build();
     final RaftClientReply raftClientReply =
-        division.getRaftServer()
-            .submitClientRequestAsync(raftClientRequest)
-            .get();
+        server.submitClientRequestAsync(raftClientRequest).get();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("request {} Reply {}", raftClientRequest, raftClientReply);
+    }
     return SCMRatisResponse.decode(raftClientReply);
   }
 
@@ -135,12 +182,7 @@ public class SCMRatisServerImpl implements SCMRatisServer {
 
   @Override
   public void stop() throws IOException {
-    division.getRaftServer().close();
-  }
-
-  @Override
-  public RaftServer.Division getDivision() {
-    return division;
+    server.close();
   }
 
   @Override
@@ -159,13 +201,47 @@ public class SCMRatisServerImpl implements SCMRatisServer {
         division.getMemberId(), null, division.getGroup().getPeers());
   }
 
+  @Override
+  public boolean addSCM(AddSCMRequest request) throws IOException {
+    List<RaftPeer> newRaftPeerList =
+        new ArrayList<>(getDivision().getGroup().getPeers());
+    // add the SCM node to be added to the raft peer list
+
+    RaftPeer raftPeer = RaftPeer.newBuilder().setId(request.getScmId())
+        .setAddress(request.getRatisAddr()).build();
+    newRaftPeerList.add(raftPeer);
+
+    LOG.info("{}: Submitting SetConfiguration request to Ratis server with" +
+            " new SCM peers list: {}", scm.getScmId(),
+        newRaftPeerList);
+    SetConfigurationRequest configRequest =
+        new SetConfigurationRequest(clientId, division.getPeer().getId(),
+            division.getGroup().getGroupId(), nextCallId(), newRaftPeerList);
+
+    try {
+      RaftClientReply raftClientReply =
+          division.getRaftServer().setConfiguration(configRequest);
+      if (raftClientReply.isSuccess()) {
+        LOG.info("Successfully added new SCM: {}.", request.getScmId());
+      } else {
+        LOG.error("Failed to add new SCM: {}. Ratis reply: {}" +
+            request.getScmId(), raftClientReply);
+        throw new IOException(raftClientReply.getException());
+      }
+      return raftClientReply.isSuccess();
+    } catch (IOException e) {
+      LOG.error("Failed to update Ratis configuration and add new peer. " +
+          "Cannot add new SCM: {}.", scm.getScmId(), e);
+      throw e;
+    }
+  }
+
   @VisibleForTesting
   public static void validateRatisGroupExists(OzoneConfiguration conf,
       String clusterId) throws IOException {
     final SCMHAConfiguration haConf = conf.getObject(SCMHAConfiguration.class);
     final RaftProperties properties = RatisUtil.newRaftProperties(haConf, conf);
-    final RaftGroupId raftGroupId =
-        RaftGroupId.valueOf(SCMHAGroupBuilder.buildRaftGroupId(clusterId));
+    final RaftGroupId raftGroupId = buildRaftGroupId(clusterId);
     final AtomicBoolean found = new AtomicBoolean(false);
     RaftServerConfigKeys.storageDir(properties).parallelStream().forEach(
         (dir) -> Optional.ofNullable(dir.listFiles()).map(Arrays::stream)
@@ -199,150 +275,29 @@ public class SCMRatisServerImpl implements SCMRatisServer {
           "Could not find any ratis group with id " + raftGroupId);
     }
   }
-  /**
-   * If the SCM group starts from {@link ScmConfigKeys#OZONE_SCM_NAMES},
-   * its raft peers should locate on different nodes, and use the same port
-   * to communicate with each other.
-   *
-   * Each of the raft peer figures out its {@link RaftPeerId} by computing
-   * its position in {@link ScmConfigKeys#OZONE_SCM_NAMES}.
-   *
-   * Assume {@link ScmConfigKeys#OZONE_SCM_NAMES} is "ip0,ip1,ip2",
-   * scm with ip0 identifies its {@link RaftPeerId} as scm0,
-   * scm with ip1 identifies its {@link RaftPeerId} as scm1,
-   * scm with ip2 identifies its {@link RaftPeerId} as scm2.
-   *
-   * After startup, they will form a {@link RaftGroup} with groupID
-   * "SCM-HA-Service", and communicate with each other via
-   * ozone.scm.ha.ratis.bind.port.
-   */
-  private static class SCMHAGroupBuilder {
-    private static final String SCM_SERVICE_ID = "SCM-HA-Service";
 
-    private final RaftGroupId raftGroupId;
-    private final RaftGroup raftGroup;
-    private RaftPeerId selfPeerId;
-
-    /**
-     * @return raft group
-     */
-    public RaftGroup getRaftGroup() {
-      return raftGroup;
-    }
-
-    /**
-     * @return raft group id
-     */
-    public RaftGroupId getRaftGroupId() {
-      return raftGroupId;
-    }
-
-    /**
-     * @return raft peer id
-     */
-    public RaftPeerId getPeerId() {
-      return selfPeerId;
-    }
-
-    /**
-     * This will be used when the SCM HA config are not defined.
-     * This will be removed once the HA configs are will defined.
-     */
-    SCMHAGroupBuilder(final SCMHAConfiguration haConf,
-        final ConfigurationSource conf, String clusterId) throws IOException {
-      // fetch port
-      int port = haConf.getRatisBindAddress().getPort();
-
-      // fetch localhost
-      InetAddress localHost = InetAddress.getLocalHost();
-
-      // fetch hosts from ozone.scm.names
-      List<String> hosts = parseHosts(conf);
-
-      final List<RaftPeer> raftPeers = new ArrayList<>();
-      for (int i = 0; i < hosts.size(); ++i) {
-        String nodeId = "scm" + i;
-        RaftPeerId peerId = RaftPeerId.getRaftPeerId(nodeId);
-
-        String host = hosts.get(i);
-        if (InetAddress.getByName(host).equals(localHost)) {
-          selfPeerId = peerId;
-          raftPeers.add(RaftPeer.newBuilder()
-              .setId(peerId)
-              .setAddress(host + ":" + port)
-              .build());
-          break;
-        }
-      }
-
-      if (selfPeerId == null) {
-        String errorMessage = "localhost " +  localHost
-            + " does not exist in ozone.scm.names "
-            + conf.get(ScmConfigKeys.OZONE_SCM_NAMES);
-        throw new IOException(errorMessage);
-      }
-
-      LOG.info("Build a RaftGroup for SCMHA, " +
-              "localHost: {}, OZONE_SCM_NAMES: {}, selfPeerId: {}",
-          localHost, conf.get(ScmConfigKeys.OZONE_SCM_NAMES), selfPeerId);
-
-      String groupId = clusterId == null ? SCM_SERVICE_ID : clusterId;
-      raftGroupId = RaftGroupId.valueOf(
-          UUID.nameUUIDFromBytes(groupId.getBytes(StandardCharsets.UTF_8)));
-
-      raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers);
-    }
-
-    SCMHAGroupBuilder(final SCMHANodeDetails details, final String clusterId,
-        final String scmId) {
-
-      Preconditions.checkNotNull(clusterId);
-      // RaftGroupId is the clusterId
-      raftGroupId = RaftGroupId.valueOf(buildRaftGroupId(clusterId));
-      Preconditions.checkNotNull(scmId);
-      selfPeerId = RaftPeerId.getRaftPeerId(scmId);
-      SCMNodeDetails localNodeDetails = details.getLocalNodeDetails();
-
-      InetSocketAddress ratisAddr =
-          new InetSocketAddress(localNodeDetails.getInetAddress(),
-              localNodeDetails.getRatisPort());
-
-      RaftPeer localRaftPeer =
-          RaftPeer.newBuilder().setId(selfPeerId).setAddress(ratisAddr).build();
-
-      List<RaftPeer> raftPeers = new ArrayList<>();
-      // Add this Ratis server to the Ratis ring
-      raftPeers.add(localRaftPeer);
-      // always start as a standalone server
-      raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers);
-    }
-
-    private static UUID buildRaftGroupId(String id) {
-      return UUID.nameUUIDFromBytes(id.getBytes(StandardCharsets.UTF_8));
-    }
-
-    private List<String> parseHosts(final ConfigurationSource conf)
-        throws UnknownHostException {
-      // fetch hosts from ozone.scm.names
-      List<String> hosts =
-          Arrays.stream(conf.getTrimmedStrings(ScmConfigKeys.OZONE_SCM_NAMES))
-              .map(scmName -> HddsUtils.getHostName(scmName).get())
-              .collect(Collectors.toList());
-
-      // if this is not a conf for a multi-server raft cluster,
-      // it means we are in integration test, and need to augment
-      // the conf to help build a single-server raft cluster.
-      if (hosts.size() == 0) {
-        // ozone.scm.names is not set
-        hosts.add(InetAddress.getLocalHost().getHostName());
-      } else if (hosts.size() == 1) {
-        // ozone.scm.names is set, yet the conf may not be usable.
-        hosts.set(0, InetAddress.getLocalHost().getHostName());
-      }
+  private static RaftGroup buildRaftGroup(SCMNodeDetails details,
+      String scmId, String clusterId) {
+    Preconditions.checkNotNull(scmId);
+    final RaftGroupId groupId = buildRaftGroupId(clusterId);
+    RaftPeerId selfPeerId = RaftPeerId.getRaftPeerId(scmId);
+
+    RaftPeer localRaftPeer = RaftPeer.newBuilder().setId(selfPeerId)
+        // TODO : Should we use IP instead of hostname??
+        .setAddress(details.getRatisHostPortStr()).build();
+
+    List<RaftPeer> raftPeers = new ArrayList<>();
+    // Add this Ratis server to the Ratis ring
+    raftPeers.add(localRaftPeer);
+    final RaftGroup group =
+        RaftGroup.valueOf(groupId, raftPeers);
+    return group;
+  }
 
-      LOG.info("fetch hosts {} from ozone.scm.names {}.",
-          hosts, conf.get(ScmConfigKeys.OZONE_SCM_NAMES));
-      return hosts;
-    }
+  private static RaftGroupId buildRaftGroupId(String clusterId) {
+    Preconditions.checkNotNull(clusterId);
+    return RaftGroupId.valueOf(
+        UUID.fromString(clusterId.replace(OzoneConsts.CLUSTER_ID_PREFIX, "")));
   }
+
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index d1a7ec5..921a17e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -20,11 +20,12 @@ package org.apache.hadoop.hdds.scm.ha;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.InvalidProtocolBufferException;
@@ -32,17 +33,19 @@ import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
 import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImplV2;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.util.Time;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.util.Time;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 
 import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,6 +62,9 @@ public class SCMStateMachine extends BaseStateMachine {
   private SCMRatisServer ratisServer;
   private Map<RequestType, Object> handlers;
   private DBTransactionBuffer transactionBuffer;
+  private final SimpleStateMachineStorage storage =
+      new SimpleStateMachineStorage();
+  private final AtomicBoolean isInitialized;
 
   public SCMStateMachine(final StorageContainerManager scm,
       final SCMRatisServer ratisServer, DBTransactionBuffer buffer)
@@ -79,9 +85,11 @@ public class SCMStateMachine extends BaseStateMachine {
             ), SCM_NOT_INITIALIZED);
       }
     }
+    isInitialized = new AtomicBoolean(true);
   }
 
-  public SCMStateMachine() {
+  public SCMStateMachine(boolean init) {
+    isInitialized = new AtomicBoolean(init);
   }
   public void registerHandler(RequestType type, Object handler) {
     handlers.put(type, handler);
@@ -143,6 +151,9 @@ public class SCMStateMachine extends BaseStateMachine {
   public void notifyNotLeader(Collection<TransactionContext> pendingEntries) {
     LOG.info("current leader SCM steps down.");
 
+    if (!isInitialized.get()) {
+      return;
+    }
     scm.getScmContext().updateLeaderAndTerm(false, 0);
     scm.getSCMServiceManager().notifyStatusChanged();
   }
@@ -155,6 +166,10 @@ public class SCMStateMachine extends BaseStateMachine {
       return;
     }
 
+    if (!isInitialized.get()) {
+      return;
+    }
+
     long term = scm.getScmHAManager()
         .getRatisServer()
         .getDivision()
@@ -179,25 +194,47 @@ public class SCMStateMachine extends BaseStateMachine {
     long startTime = Time.monotonicNow();
     TermIndex lastTermIndex = getLastAppliedTermIndex();
     long lastAppliedIndex = lastTermIndex.getIndex();
-    TransactionInfo lastAppliedTrxInfo =
-        TransactionInfo.fromTermIndex(lastTermIndex);
-    if (transactionBuffer.getLatestTrxInfo()
-        .compareTo(lastAppliedTrxInfo) < 0) {
+    if (isInitialized.get()) {
+      TransactionInfo lastAppliedTrxInfo =
+          TransactionInfo.fromTermIndex(lastTermIndex);
+      if (transactionBuffer.getLatestTrxInfo().compareTo(lastAppliedTrxInfo)
+          < 0) {
+        transactionBuffer.updateLatestTrxInfo(
+            TransactionInfo.builder().setCurrentTerm(lastTermIndex.getTerm())
+                .setTransactionIndex(lastTermIndex.getIndex()).build());
+        transactionBuffer.setLatestSnapshot(
+            transactionBuffer.getLatestTrxInfo().toSnapshotInfo());
+      } else {
+        lastAppliedIndex =
+            transactionBuffer.getLatestTrxInfo().getTransactionIndex();
+      }
+
+      transactionBuffer.flush();
+      LOG.info("Current Snapshot Index {}, takeSnapshot took {} ms",
+          lastAppliedIndex, Time.monotonicNow() - startTime);
+    }
+    super.takeSnapshot();
+    return lastAppliedIndex;
+  }
+
+  @Override
+  public void notifyTermIndexUpdated(long term, long index) {
+    if (transactionBuffer != null) {
       transactionBuffer.updateLatestTrxInfo(
-          TransactionInfo.builder()
-              .setCurrentTerm(lastTermIndex.getTerm())
-              .setTransactionIndex(lastTermIndex.getIndex())
-              .build());
-      transactionBuffer.setLatestSnapshot(
-          transactionBuffer.getLatestTrxInfo().toSnapshotInfo());
-    } else {
-      lastAppliedIndex =
-          transactionBuffer.getLatestTrxInfo().getTransactionIndex();
+          TransactionInfo.builder().setCurrentTerm(term)
+              .setTransactionIndex(index).build());
     }
+    // We need to call updateLastApplied here because now in ratis when a
+    // node becomes leader, it is checking stateMachineIndex >=
+    // placeHolderIndex (when a node becomes leader, it writes a conf entry
+    // with some information like its peers and termIndex). So, calling
+    // updateLastApplied updates lastAppliedTermIndex.
+    updateLastAppliedTermIndex(term, index);
+  }
 
-    transactionBuffer.flush();
-    LOG.info("Current Snapshot Index {}, takeSnapshot took {} ms",
-        lastAppliedIndex, Time.monotonicNow() - startTime);
-    return lastAppliedIndex;
+
+  @Override
+  public void notifyConfigurationChanged(long term, long index,
+      RaftProtos.RaftConfigurationProto newRaftConfiguration) {
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
index caec525..3385774 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMB
 import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SortDatanodesRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SortDatanodesResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Status;
+import org.apache.hadoop.hdds.scm.AddSCMRequest;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
@@ -135,6 +136,10 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
         response.setGetScmInfoResponse(
             getScmInfo(request.getGetScmInfoRequest()));
         break;
+      case AddScm:
+        response.setAddScmResponse(
+            getAddSCMResponse(request.getAddScmRequestProto()));
+        break;
       case SortDatanodes:
         response.setSortDatanodesResponse(sortDatanodes(
             request.getSortDatanodesRequest(), request.getVersion()
@@ -221,6 +226,15 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
         .build();
   }
 
+  public HddsProtos.AddScmResponseProto getAddSCMResponse(
+      HddsProtos.AddScmRequestProto req)
+      throws IOException {
+    boolean status = impl.addSCM(AddSCMRequest.getFromProtobuf(req));
+    return HddsProtos.AddScmResponseProto.newBuilder()
+        .setSuccess(status)
+        .build();
+  }
+
   public SortDatanodesResponseProto sortDatanodes(
       SortDatanodesRequestProto request, int clientVersion)
       throws ServiceException {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
index d8014f6..3a1181e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
+import org.apache.hadoop.hdds.scm.AddSCMRequest;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult;
@@ -292,6 +293,33 @@ public class SCMBlockProtocolServer implements
   }
 
   @Override
+  public boolean addSCM(AddSCMRequest request) throws IOException {
+    LOG.debug("Adding SCM {} addr {} cluster id {}",
+        request.getScmId(), request.getRatisAddr(), request.getClusterId());
+
+    Map<String, String> auditMap = Maps.newHashMap();
+    auditMap.put("scmId", String.valueOf(request.getScmId()));
+    auditMap.put("cluster", String.valueOf(request.getClusterId()));
+    auditMap.put("addr", String.valueOf(request.getRatisAddr()));
+    boolean auditSuccess = true;
+    try{
+      return scm.getScmHAManager().addSCM(request);
+    } catch (Exception ex) {
+      auditSuccess = false;
+      AUDIT.logReadFailure(
+          buildAuditMessageForFailure(SCMAction.ADD_SCM, auditMap, ex)
+      );
+      throw ex;
+    } finally {
+      if(auditSuccess) {
+        AUDIT.logReadSuccess(
+            buildAuditMessageForSuccess(SCMAction.ADD_SCM, auditMap)
+        );
+      }
+    }
+  }
+
+  @Override
   public List<DatanodeDetails> sortDatanodes(List<String> nodes,
       String clientMachine) throws IOException {
     boolean auditSuccess = true;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorageConfig.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorageConfig.java
index 91b9cd6..013dfe6 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorageConfig.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMStorageConfig.java
@@ -25,11 +25,10 @@ import org.apache.hadoop.ozone.common.Storage;
 import java.io.File;
 import java.io.IOException;
 import java.util.Properties;
+import java.util.UUID;
 
 import static org.apache.hadoop.ozone.OzoneConsts.SCM_ID;
 import static org.apache.hadoop.ozone.OzoneConsts.STORAGE_DIR;
-import static org.apache.hadoop.ozone.OzoneConsts.SCM_NODES_INFO;
-import static org.apache.hadoop.ozone.OzoneConsts.SCM_NODES_SEPARATOR;
 
 
 /**
@@ -59,17 +58,6 @@ public class SCMStorageConfig extends Storage {
     }
   }
 
-  public void setScmNodeInfo(String hostname) throws IOException {
-    if (getState() == StorageState.INITIALIZED) {
-      throw new IOException("SCM is already initialized.");
-    } else {
-      final StringBuilder b =
-          new StringBuilder(hostname).append(SCM_NODES_SEPARATOR)
-              .append(getScmId());
-      getStorageInfo().setProperty(SCM_NODES_INFO, b.toString());
-    }
-  }
-
   /**
    * Retrieves the SCM ID from the version file.
    * @return SCM_ID
@@ -78,31 +66,14 @@ public class SCMStorageConfig extends Storage {
     return getStorageInfo().getProperty(SCM_ID);
   }
 
-  /**
-   * Retrieves the SCM Node info from the version file.
-   * @return SCM_NODES
-   */
-  public String getScmNodeInfo() {
-    return getStorageInfo().getProperty(SCM_NODES_INFO);
-  }
-
   @Override
   protected Properties getNodeProperties() {
     String scmId = getScmId();
     if (scmId == null) {
-      // TODO:
-      //  Please check https://issues.apache.org/jira/browse/HDDS-4538
-      //  hard code clusterID and scmUuid on HDDS-2823,
-      //  so that multi SCMs won't cause chaos in Datanode side.
-      scmId = "3a11fedb-cce5-46ac-bb0d-cfdf17df9a19";
+      scmId = UUID.randomUUID().toString();
     }
     Properties scmProperties = new Properties();
     scmProperties.setProperty(SCM_ID, scmId);
-    String scmNodeInfo = getScmNodeInfo();
-    if (scmNodeInfo != null) {
-      // FOR NON-HA setup, SCM_NODES can be null
-      scmProperties.setProperty(SCM_NODES_INFO, getScmNodeInfo());
-    }
     return scmProperties;
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 24bb893..4a34445 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -38,6 +38,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
@@ -131,7 +132,6 @@ import org.apache.hadoop.util.JvmPauseMonitor;
 
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT_DEFAULT;
 import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.server.RaftServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -689,9 +689,9 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
   }
 
   /**
-   * Routine to bootstrap the StorageContainerManager. Thsi will connect to a
+   * Routine to bootstrap the StorageContainerManager. This will connect to a
    * running SCM instance which has valid cluster id and fetch the cluster id
-   * from there. SCM ids will be also be exchanged here.
+   * from there.
    *
    * TODO: once SCM HA security is enabled, CSR cerificates will be fetched from
    * running scm leader instance as well.
@@ -708,14 +708,15 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     }
     // The node here will try to fetch the cluster id from any of existing
     // running SCM instances.
-    // TODO: need to avoid failover to local SCM Node here
-    final ScmInfo scmInfo = HAUtils.getScmInfo(conf);
+    SCMHANodeDetails.loadSCMHAConfig(conf);
+    OzoneConfiguration config = SCMHAUtils.removeSelfId(conf);
+    final ScmInfo scmInfo = HAUtils.getScmInfo(config);
     SCMStorageConfig scmStorageConfig = new SCMStorageConfig(conf);
     final String persistedClusterId = scmStorageConfig.getClusterID();
     final String fetchedId = scmInfo.getClusterId();
     Preconditions.checkNotNull(fetchedId);
     StorageState state = scmStorageConfig.getState();
-    if (state != StorageState.INITIALIZED) {
+    if (state == StorageState.INITIALIZED) {
       Preconditions.checkNotNull(scmStorageConfig.getScmId());
       if (!fetchedId.equals(persistedClusterId)) {
         LOG.error(
@@ -757,22 +758,22 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     if (state != StorageState.INITIALIZED) {
       try {
         if (clusterId != null && !clusterId.isEmpty()) {
+          // clusterId must be an UUID
+          Preconditions.checkNotNull(UUID.fromString(clusterId));
           scmStorageConfig.setClusterId(clusterId);
         }
-        // TODO: set the scm node info in the version file during upgrade
-        //  from non-HA SCM to SCM HA set up.
-        if (SCMHAUtils.isSCMHAEnabled(conf)) {
-          RaftServer server = SCMRatisServerImpl
-              .newRaftServer(clusterId, scmStorageConfig.getScmId(), haDetails,
-                  conf).build();
-          // ensure the ratis group exists
-          server.start();
-          server.close();
-          // TODO: Revisit if we need to set the Node info in SCM version file
-          scmStorageConfig
-              .setScmNodeInfo(haDetails.getLocalNodeDetails().getHostName());
-        }
         scmStorageConfig.initialize();
+        // TODO: Removing the HA enabled check right now as
+        //  when the SCM starts up , it always spins up the ratis
+        //  server irrespective of the check. If the ratis server is not
+        //  initialized here and starts up during the regular start,
+        //  it won't be starting a leader election and hence won't work. The
+        //  check will be re-introduced one we have clear segregation path with
+        //  ratis enable/disable switch.
+       // if (SCMHAUtils.isSCMHAEnabled(conf)) {
+        SCMRatisServerImpl.initialize(scmStorageConfig.getClusterID(),
+            scmStorageConfig.getScmId(), haDetails.getLocalNodeDetails(), conf);
+       // }
         LOG.info("SCM initialization succeeded. Current cluster id for sd={}"
                 + "; cid={}; layoutVersion={}; scmId={}",
             scmStorageConfig.getStorageDir(), scmStorageConfig.getClusterID(),
@@ -789,7 +790,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
           clusterId, scmStorageConfig.getLayoutVersion());
       if (SCMHAUtils.isSCMHAEnabled(conf)) {
         SCMRatisServerImpl.validateRatisGroupExists(conf, clusterId);
-        Preconditions.checkNotNull(scmStorageConfig.getScmNodeInfo());
       }
       return true;
     }
@@ -1169,11 +1169,12 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
   }
 
   /**
-   * Check if the current scm is the leader.
-   * @return - if the current scm is the leader.
+   * Check if the current scm is the leader and ready for accepting requests.
+   * @return - if the current scm is the leader and is ready.
    */
   public boolean checkLeader() {
-    return scmContext.isLeader();
+    return scmContext.isLeader() && getScmHAManager().getRatisServer()
+        .getDivision().getInfo().isLeaderReady();
   }
 
   public void checkAdminAccess(String remoteUser) throws IOException {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestHddsServerUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestHddsServerUtils.java
index 56d265a..b9efe10 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestHddsServerUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestHddsServerUtils.java
@@ -150,19 +150,6 @@ public class TestHddsServerUtils {
   }
 
   /**
-   * getScmAddressForDataNodes should fail when OZONE_SCM_NAMES has
-   * multiple addresses.
-   */
-  @Test
-  public void testClientFailsWithMultipleScmNames() {
-    final String scmHost = "host123,host456";
-    final OzoneConfiguration conf = new OzoneConfiguration();
-    conf.set(OZONE_SCM_NAMES, scmHost);
-    thrown.expect(IllegalArgumentException.class);
-    HddsServerUtil.getScmAddressForDataNodes(conf);
-  }
-
-  /**
    * Test {@link ServerUtils#getScmDbDir}.
    */
   @Test
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java
index d82faa2..a7eb637 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.ha;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol;
 import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import org.apache.hadoop.hdds.scm.AddSCMRequest;
 import org.apache.hadoop.hdds.scm.container.ContainerStateManagerV2;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.server.RaftServer;
@@ -73,6 +74,12 @@ public class TestReplicationAnnotation {
       public NotLeaderException triggerNotLeaderException() {
         return null;
       }
+
+      @Override
+      public boolean addSCM(AddSCMRequest request)
+          throws IOException {
+        return false;
+      }
     };
 
     scmhaInvocationHandler = new SCMHAInvocationHandler(
diff --git a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestHddsClientUtils.java b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestHddsClientUtils.java
index 6e1e336..fe4df7d 100644
--- a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestHddsClientUtils.java
+++ b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestHddsClientUtils.java
@@ -202,28 +202,6 @@ public class TestHddsClientUtils {
   }
 
   @Test
-  public void testBlockClientFailsWithMultipleScmNames() {
-    // When OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY and OZONE_SCM_CLIENT_ADDRESS_KEY
-    // are undefined, fail if OZONE_SCM_NAMES has multiple SCMs.
-    final String scmHost = "host123,host456";
-    final OzoneConfiguration conf = new OzoneConfiguration();
-    conf.set(OZONE_SCM_NAMES, scmHost);
-    thrown.expect(IllegalArgumentException.class);
-    HddsUtils.getScmAddressForBlockClients(conf);
-  }
-
-  @Test
-  public void testClientFailsWithMultipleScmNames() {
-    // When OZONE_SCM_CLIENT_ADDRESS_KEY is undefined, fail if OZONE_SCM_NAMES
-    // has multiple SCMs.
-    final String scmHost = "host123,host456";
-    final OzoneConfiguration conf = new OzoneConfiguration();
-    conf.set(OZONE_SCM_NAMES, scmHost);
-    thrown.expect(IllegalArgumentException.class);
-    HddsUtils.getScmAddressForClients(conf);
-  }
-
-  @Test
   public void testVerifyResourceName() {
     final String validName = "my-bucket.01";
     HddsClientUtils.verifyResourceName(validName);
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/ha/NodeDetails.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/ha/NodeDetails.java
index 76beaa6..c73abb9 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/ha/NodeDetails.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/ha/NodeDetails.java
@@ -76,6 +76,15 @@ public class NodeDetails {
     return hostPort.toString();
   }
 
+  public String getRatisAddressPortStr() {
+    StringBuilder hostPort = new StringBuilder();
+    hostPort.append(getInetAddress().getHostAddress())
+        .append(":")
+        .append(ratisPort);
+    return hostPort.toString();
+  }
+
+
   public int getRatisPort() {
     return ratisPort;
   }
diff --git a/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java b/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
index 751777c..45680d3 100644
--- a/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
+++ b/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
@@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory;
 /**
  * This class causes random failures in the chaos cluster.
  */
-public class MiniOzoneChaosCluster extends MiniOzoneOMHAClusterImpl {
+public class MiniOzoneChaosCluster extends MiniOzoneHAClusterImpl {
 
   static final Logger LOG =
       LoggerFactory.getLogger(MiniOzoneChaosCluster.class);
@@ -91,10 +91,10 @@ public class MiniOzoneChaosCluster extends MiniOzoneOMHAClusterImpl {
   }
 
   public MiniOzoneChaosCluster(OzoneConfiguration conf,
-      List<OzoneManager> ozoneManagers, StorageContainerManager scm,
+      List<OzoneManager> ozoneManagers, List<StorageContainerManager> scms,
       List<HddsDatanodeService> hddsDatanodes, String omServiceID,
-      Set<Class<? extends Failures>> clazzes) {
-    super(conf, ozoneManagers, scm, hddsDatanodes, omServiceID);
+      String scmServiceId, Set<Class<? extends Failures>> clazzes) {
+    super(conf, ozoneManagers, scms, hddsDatanodes, omServiceID, scmServiceId);
     this.numDatanodes = getHddsDatanodes().size();
     this.numOzoneManagers = ozoneManagers.size();
 
@@ -267,11 +267,17 @@ public class MiniOzoneChaosCluster extends MiniOzoneOMHAClusterImpl {
         initOMRatisConf();
       }
 
-      StorageContainerManager scm;
       List<OzoneManager> omList;
+      List<StorageContainerManager> scmList;
       try {
-        scm = createSCM();
-        scm.start();
+        if (numOfSCMs > 1) {
+          throw new IOException("no implemented now");
+          // TODO: do later
+        } else {
+          StorageContainerManager scm = createSCM();
+          scm.start();
+          scmList = Arrays.asList(scm);
+        }
         if (numOfOMs > 1) {
           omList = createOMService();
         } else {
@@ -284,11 +290,11 @@ public class MiniOzoneChaosCluster extends MiniOzoneOMHAClusterImpl {
       }
 
       final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(
-          scm, null);
+          scmList, null);
 
       MiniOzoneChaosCluster cluster =
-          new MiniOzoneChaosCluster(conf, omList, scm, hddsDatanodes,
-              omServiceId, clazzes);
+          new MiniOzoneChaosCluster(conf, omList, scmList, hddsDatanodes,
+              omServiceId, scmServiceId, clazzes);
 
       if (startDataNodes) {
         cluster.startHddsDatanodes();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMSnapshot.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMSnapshot.java
index ac8b00d..463d4b3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMSnapshot.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMSnapshot.java
@@ -32,6 +32,8 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.UUID;
+
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
@@ -44,6 +46,7 @@ public class TestSCMSnapshot {
   @BeforeClass
   public static void setup() throws Exception {
     conf = new OzoneConfiguration();
+    conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, true);
     SCMHAConfiguration scmhaConfiguration = conf.getObject(
         SCMHAConfiguration.class);
     scmhaConfiguration.setRatisSnapshotThreshold(1L);
@@ -51,6 +54,7 @@ public class TestSCMSnapshot {
     cluster = MiniOzoneCluster
         .newBuilder(conf)
         .setNumDatanodes(3)
+        .setScmId(UUID.randomUUID().toString())
         .build();
     cluster.waitForClusterToBeReady();
   }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index c3d9815..153b312 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -64,6 +64,10 @@ public interface MiniOzoneCluster {
     return new MiniOzoneOMHAClusterImpl.Builder(conf);
   }
 
+  static Builder newHABuilder(OzoneConfiguration conf) {
+    return new MiniOzoneHAClusterImpl.Builder(conf);
+  }
+
   /**
    * Returns the configuration object associated with the MiniOzoneCluster.
    *
@@ -115,7 +119,15 @@ public interface MiniOzoneCluster {
    *
    * @return Service ID String
    */
-  String getServiceId();
+  String getOMServiceId();
+
+
+  /**
+   * Returns StorageContainerManager Service ID.
+   *
+   * @return Service ID String
+   */
+  String getSCMServiceId();
 
   /**
    * Returns {@link StorageContainerManager} associated with this
@@ -274,6 +286,7 @@ public interface MiniOzoneCluster {
     protected static final int DEFAULT_HB_INTERVAL_MS = 1000;
     protected static final int DEFAULT_HB_PROCESSOR_INTERVAL_MS = 100;
     protected static final int ACTIVE_OMS_NOT_SET = -1;
+    protected static final int ACTIVE_SCMS_NOT_SET = -1;
     protected static final int DEFAULT_PIPELIME_LIMIT = 3;
     protected static final int DEFAULT_RATIS_RPC_TIMEOUT_SEC = 1;
 
@@ -285,6 +298,10 @@ public interface MiniOzoneCluster {
     protected int numOfOMs;
     protected int numOfActiveOMs = ACTIVE_OMS_NOT_SET;
 
+    protected String scmServiceId;
+    protected int numOfSCMs;
+    protected int numOfActiveSCMs = ACTIVE_SCMS_NOT_SET;
+
     protected Optional<Boolean> enableTrace = Optional.of(false);
     protected Optional<Integer> hbInterval = Optional.empty();
     protected Optional<Integer> hbProcessorInterval = Optional.empty();
@@ -524,6 +541,21 @@ public interface MiniOzoneCluster {
       return this;
     }
 
+    public Builder setNumOfStorageContainerManagers(int numSCMs) {
+      this.numOfSCMs = numSCMs;
+      return this;
+    }
+
+    public Builder setNumOfActiveSCMs(int numActiveSCMs) {
+      this.numOfActiveSCMs = numActiveSCMs;
+      return this;
+    }
+
+    public Builder setSCMServiceId(String serviceId) {
+      this.scmServiceId = serviceId;
+      return this;
+    }
+
     /**
      * Constructs and returns MiniOzoneCluster.
      *
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 757c6b8..c2ecd34 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.UUID;
+import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -43,6 +44,9 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.ha.SCMHANodeDetails;
+import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
+import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.scm.proxy.SCMContainerLocationFailoverProxyProvider;
@@ -153,26 +157,58 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
     this.reconServer = reconServer;
   }
 
+  /**
+   * Creates a new MiniOzoneCluster without the OzoneManager and
+   * StorageContainerManager. This is used by
+   * {@link MiniOzoneHAClusterImpl} for starting multiple
+   * OzoneManagers and StorageContainerManagers.
+   *
+   * @param conf
+   * @param hddsDatanodes
+   */
+  MiniOzoneClusterImpl(OzoneConfiguration conf,
+      List<HddsDatanodeService> hddsDatanodes, ReconServer reconServer) {
+    this.conf = conf;
+    this.hddsDatanodes = hddsDatanodes;
+    this.reconServer = reconServer;
+  }
+
   public OzoneConfiguration getConf() {
     return conf;
   }
 
-  public String getServiceId() {
+  public String getOMServiceId() {
+    // Non-HA cluster doesn't have OM Service Id.
+    return null;
+  }
+
+  public String getSCMServiceId() {
     // Non-HA cluster doesn't have OM Service Id.
     return null;
   }
 
+  public void waitForSCMToBeReady() throws TimeoutException,
+      InterruptedException {
+    // Nothing implemented here
+  }
+
+  public StorageContainerManager getActiveSCM() {
+    return scm;
+  }
+
   /**
    * Waits for the Ozone cluster to be ready for processing requests.
    */
   @Override
   public void waitForClusterToBeReady()
       throws TimeoutException, InterruptedException {
+    waitForSCMToBeReady();
     GenericTestUtils.waitFor(() -> {
-      final int healthy = scm.getNodeCount(HEALTHY);
+      StorageContainerManager activeScm = getActiveSCM();
+      final int healthy = activeScm.getNodeCount(HEALTHY);
       final boolean isNodeReady = healthy == hddsDatanodes.size();
-      final boolean exitSafeMode = !scm.isInSafeMode();
-      final boolean checkScmLeader = scm.checkLeader();
+      final boolean exitSafeMode = !activeScm.isInSafeMode();
+      final boolean checkScmLeader = activeScm.checkLeader();
 
       LOG.info("{}. Got {} of {} DN Heartbeats.",
           isNodeReady ? "Nodes are ready" : "Waiting for nodes to be ready",
@@ -383,13 +419,17 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
     shutdownHddsDatanode(getHddsDatanodeIndex(dn));
   }
 
+  public String getClusterId() throws IOException {
+    return scm.getClientProtocolServer().getScmInfo().getClusterId();
+  }
+
   @Override
   public void shutdown() {
     try {
       LOG.info("Shutting down the Mini Ozone Cluster");
       File baseDir = new File(GenericTestUtils.getTempPath(
           MiniOzoneClusterImpl.class.getSimpleName() + "-" +
-              scm.getClientProtocolServer().getScmInfo().getClusterId()));
+              getClusterId()));
       stop();
       FileUtils.deleteDirectory(baseDir);
       ContainerCache.getInstance(conf).shutdownCache();
@@ -538,7 +578,8 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
           reconServer.execute(new String[] {});
         }
 
-        hddsDatanodes = createHddsDatanodes(scm, reconServer);
+        hddsDatanodes = createHddsDatanodes(
+            Collections.singletonList(scm), reconServer);
 
         MiniOzoneClusterImpl cluster = new MiniOzoneClusterImpl(conf, om, scm,
             hddsDatanodes, reconServer);
@@ -578,6 +619,7 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
       Path metaDir = Paths.get(path, "ozone-meta");
       Files.createDirectories(metaDir);
       conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, metaDir.toString());
+      conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, true);
       if (!chunkSize.isPresent()) {
         //set it to 1MB by default in tests
         chunkSize = Optional.of(1);
@@ -649,7 +691,7 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
       return scm;
     }
 
-    private void initializeScmStorage(SCMStorageConfig scmStore)
+    protected void initializeScmStorage(SCMStorageConfig scmStore)
         throws IOException {
       if (scmStore.getState() == StorageState.INITIALIZED) {
         return;
@@ -659,10 +701,11 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
         scmId = Optional.of(UUID.randomUUID().toString());
       }
       scmStore.setScmId(scmId.get());
-      // TODO : Ratis is enabled by default for mini ozone cluster. Set scm node
-      //  to local host in the beginning. Fix it MiniozoneSCM HA cluster
-      scmStore.setScmNodeInfo("localhost");
       scmStore.initialize();
+      if (SCMHAUtils.isSCMHAEnabled(conf)) {
+        SCMRatisServerImpl.initialize(clusterId, scmId.get(),
+            SCMHANodeDetails.loadSCMHAConfig(conf).getLocalNodeDetails(), conf);
+      }
     }
 
     void initializeOmStorage(OMStorage omStorage) throws IOException {
@@ -670,7 +713,9 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
         return;
       }
       omStorage.setClusterId(clusterId);
-      omStorage.setScmId(scmId.get());
+      if (scmId.isPresent()) {
+        omStorage.setScmId(scmId.get());
+      }
       omStorage.setOmId(omId.orElse(UUID.randomUUID().toString()));
       // Initialize ozone certificate client if security is enabled.
       if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
@@ -693,6 +738,21 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
       return OzoneManager.createOm(conf);
     }
 
+    protected String getSCMAddresses(List<StorageContainerManager> scms) {
+      StringBuilder stringBuilder = new StringBuilder();
+      Iterator<StorageContainerManager> iter = scms.iterator();
+
+      while (iter.hasNext()) {
+        StorageContainerManager scm = iter.next();
+        stringBuilder.append(scm.getDatanodeRpcAddress().getHostString() +
+            ":" + scm.getDatanodeRpcAddress().getPort());
+        if (iter.hasNext()) {
+          stringBuilder.append(",");
+        }
+
+      }
+      return stringBuilder.toString();
+    }
     /**
      * Creates HddsDatanodeService(s) instance.
      *
@@ -700,11 +760,10 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
      * @throws IOException
      */
     protected List<HddsDatanodeService> createHddsDatanodes(
-        StorageContainerManager scm, ReconServer reconServer)
+        List<StorageContainerManager> scms, ReconServer reconServer)
         throws IOException {
       configureHddsDatanodes();
-      String scmAddress = scm.getDatanodeRpcAddress().getHostString() +
-          ":" + scm.getDatanodeRpcAddress().getPort();
+      String scmAddress = getSCMAddresses(scms);
       String[] args = new String[] {};
       conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, scmAddress);
       List<HddsDatanodeService> hddsDatanodes = new ArrayList<>();
@@ -747,7 +806,7 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
       return hddsDatanodes;
     }
 
-    private void configureSCM() {
+    protected void configureSCM() {
       conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
       conf.set(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
       conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
new file mode 100644
index 0000000..5cf9d9b
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
@@ -0,0 +1,718 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.safemode.HealthyPipelineSafeModeRule;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.ha.ConfUtils;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.recon.ReconServer;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+
+/**
+ * MiniOzoneHAClusterImpl creates a complete in-process Ozone cluster
+ * with OM HA and SCM HA suitable for running tests.
+ * The cluster consists of a set of
+ * OzoneManagers, StorageContainerManagers and multiple DataNodes.
+ */
+public class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MiniOzoneHAClusterImpl.class);
+
+  private final OMHAService omhaService;
+  private final SCMHAService scmhaService;
+
+  private int waitForClusterToBeReadyTimeout = 120000; // 2 min
+
+  private static final Random RANDOM = new Random();
+  private static final int RATIS_RPC_TIMEOUT = 1000; // 1 second
+  public static final int NODE_FAILURE_TIMEOUT = 2000; // 2 seconds
+
+  /**
+   * Creates a new MiniOzoneCluster.
+   *
+   * @throws IOException if there is an I/O error
+   */
+  @SuppressWarnings("checkstyle:ParameterNumber")
+  public MiniOzoneHAClusterImpl(
+      OzoneConfiguration conf,
+      List<OzoneManager> activeOMList,
+      List<OzoneManager> inactiveOMList,
+      List<StorageContainerManager> activeSCMList,
+      List<StorageContainerManager> inactiveSCMList,
+      List<HddsDatanodeService> hddsDatanodes,
+      String omServiceId,
+      String scmServiceId,
+      ReconServer reconServer) {
+    super(conf, hddsDatanodes, reconServer);
+    omhaService =
+        new OMHAService(activeOMList, inactiveOMList, omServiceId);
+    scmhaService =
+        new SCMHAService(activeSCMList, inactiveSCMList, scmServiceId);
+  }
+
+  /**
+   * Creates a new MiniOzoneCluster with all OMs active.
+   * This is used by MiniOzoneChaosCluster.
+   */
+  protected MiniOzoneHAClusterImpl(
+      OzoneConfiguration conf,
+      List<OzoneManager> omList,
+      List<StorageContainerManager> scmList,
+      List<HddsDatanodeService> hddsDatanodes,
+      String omServiceId,
+      String scmServiceId) {
+    this(conf, omList, null, scmList, null, hddsDatanodes,
+        omServiceId, scmServiceId, null);
+  }
+
+  @Override
+  public String getOMServiceId() {
+    return omhaService.getServiceId();
+  }
+
+  @Override
+  public String getSCMServiceId() {
+    return scmhaService.getServiceId();
+  }
+
+  /**
+   * Returns the first OzoneManager from the list.
+   * @return
+   */
+  @Override
+  public OzoneManager getOzoneManager() {
+    return this.omhaService.getServices().get(0);
+  }
+
+  @Override
+  public OzoneClient getRpcClient() throws IOException {
+    String omServiceId = omhaService.getServiceId();
+    if (omServiceId == null) {
+      // Non-HA cluster.
+      return OzoneClientFactory.getRpcClient(getConf());
+    } else {
+      // HA cluster
+      return OzoneClientFactory.getRpcClient(omServiceId, getConf());
+    }
+  }
+
+  public boolean isOMActive(String omNodeId) {
+    return omhaService.isServiceActive(omNodeId);
+  }
+
+  public OzoneManager getOzoneManager(int index) {
+    return this.omhaService.getServiceByIndex(index);
+  }
+
+  public OzoneManager getOzoneManager(String omNodeId) {
+    return this.omhaService.getServiceById(omNodeId);
+  }
+
+  public List<OzoneManager> getOzoneManagersList() {
+    return omhaService.getServices();
+  }
+
+  /**
+   * Get OzoneManager leader object.
+   * @return OzoneManager object, null if there isn't one or more than one
+   */
+  public OzoneManager getOMLeader() {
+    OzoneManager res = null;
+    for (OzoneManager ozoneManager : this.omhaService.getServices()) {
+      if (ozoneManager.isLeaderReady()) {
+        if (res != null) {
+          // Found more than one leader
+          // Return null, expect the caller to retry in a while
+          return null;
+        }
+        // Found a leader
+        res = ozoneManager;
+      }
+    }
+    return res;
+  }
+
+  /**
+   * Start a previously inactive OM.
+   */
+  public void startInactiveOM(String omNodeID) throws IOException {
+    omhaService.startInactiveService(omNodeID, OzoneManager::start);
+  }
+
+  @Override
+  public void restartOzoneManager() throws IOException {
+    for (OzoneManager ozoneManager : this.omhaService.getServices()) {
+      ozoneManager.stop();
+      ozoneManager.restart();
+    }
+  }
+
+  public void shutdownOzoneManager(OzoneManager ozoneManager) {
+    LOG.info("Shutting down OzoneManager " + ozoneManager.getOMNodeId());
+
+    ozoneManager.stop();
+  }
+
+  public void restartOzoneManager(OzoneManager ozoneManager, boolean waitForOM)
+      throws IOException, TimeoutException, InterruptedException {
+    LOG.info("Restarting OzoneManager " + ozoneManager.getOMNodeId());
+    ozoneManager.restart();
+
+    if (waitForOM) {
+      GenericTestUtils.waitFor(ozoneManager::isRunning,
+          1000, waitForClusterToBeReadyTimeout);
+    }
+  }
+
+  public String getClusterId() throws IOException {
+    return scmhaService.getServices().get(0)
+        .getClientProtocolServer().getScmInfo().getClusterId();
+  }
+
+  public StorageContainerManager getActiveSCM() {
+    for (StorageContainerManager scm : scmhaService.getServices()) {
+      if (scm.checkLeader()) {
+        return scm;
+      }
+    }
+    return null;
+  }
+
+  public void waitForSCMToBeReady()
+      throws TimeoutException, InterruptedException  {
+    GenericTestUtils.waitFor(() -> {
+      for (StorageContainerManager scm : scmhaService.getServices()) {
+        if (scm.checkLeader()) {
+          return true;
+        }
+      }
+      return false;
+    }, 1000, waitForClusterToBeReadyTimeout);
+  }
+
+  @Override
+  public void stop() {
+    for (OzoneManager ozoneManager : this.omhaService.getServices()) {
+      if (ozoneManager != null) {
+        LOG.info("Stopping the OzoneManager {}", ozoneManager.getOMNodeId());
+        ozoneManager.stop();
+        ozoneManager.join();
+      }
+    }
+
+    for (StorageContainerManager scm : this.scmhaService.getServices()) {
+      if (scm != null) {
+        LOG.info("Stopping the StorageContainerManager {}", scm.getScmId());
+        scm.stop();
+        scm.join();
+      }
+    }
+    super.stop();
+  }
+
+  public void stopOzoneManager(int index) {
+    omhaService.getServices().get(index).stop();
+    omhaService.getServices().get(index).join();
+  }
+
+  public void stopOzoneManager(String omNodeId) {
+    omhaService.getServiceById(omNodeId).stop();
+    omhaService.getServiceById(omNodeId).join();
+  }
+
+  /**
+   * Builder for configuring the MiniOzoneCluster to run.
+   */
+  public static class Builder extends MiniOzoneClusterImpl.Builder {
+
+    private static final String OM_NODE_ID_PREFIX = "omNode-";
+    private List<OzoneManager> activeOMs = new ArrayList<>();
+    private List<OzoneManager> inactiveOMs = new ArrayList<>();
+
+    private static final String SCM_NODE_ID_PREFIX = "scmNode-";
+    private List<StorageContainerManager> activeSCMs = new ArrayList<>();
+    private List<StorageContainerManager> inactiveSCMs = new ArrayList<>();
+
+    /**
+     * Creates a new Builder.
+     *
+     * @param conf configuration
+     */
+    public Builder(OzoneConfiguration conf) {
+      super(conf);
+    }
+
+    public List<OzoneManager> getActiveOMs() {
+      return activeOMs;
+    }
+
+    public List<OzoneManager> getInactiveOMs() {
+      return inactiveOMs;
+    }
+
+    @Override
+    public MiniOzoneCluster build() throws IOException {
+      if (numOfActiveOMs > numOfOMs) {
+        throw new IllegalArgumentException("Number of active OMs cannot be " +
+            "more than the total number of OMs");
+      }
+
+      // If num of ActiveOMs is not set, set it to numOfOMs.
+      if (numOfActiveOMs == ACTIVE_OMS_NOT_SET) {
+        numOfActiveOMs = numOfOMs;
+      }
+
+      // If num of ActiveOMs is not set, set it to numOfOMs.
+      if (numOfActiveSCMs == ACTIVE_SCMS_NOT_SET) {
+        numOfActiveSCMs = numOfSCMs;
+      }
+
+      DefaultMetricsSystem.setMiniClusterMode(true);
+      initializeConfiguration();
+      initOMRatisConf();
+      StorageContainerManager scm;
+      ReconServer reconServer = null;
+      try {
+        createSCMService();
+        createOMService();
+        if (includeRecon) {
+          configureRecon();
+          reconServer = new ReconServer();
+          reconServer.execute(new String[] {});
+        }
+      } catch (AuthenticationException ex) {
+        throw new IOException("Unable to build MiniOzoneCluster. ", ex);
+      }
+
+      final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(
+          activeSCMs, reconServer);
+
+      MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf,
+          activeOMs, inactiveOMs, activeSCMs, inactiveSCMs,
+          hddsDatanodes, omServiceId, scmServiceId, reconServer);
+
+      if (startDataNodes) {
+        cluster.startHddsDatanodes();
+      }
+      return cluster;
+    }
+
+    protected void initOMRatisConf() {
+      conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true);
+      conf.setInt(OMConfigKeys.OZONE_OM_HANDLER_COUNT_KEY, numOfOmHandlers);
+      
+      // If test change the following config values we will respect,
+      // otherwise we will set lower timeout values.
+      long defaultDuration = OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_DEFAULT
+              .getDuration();
+      long curRatisRpcTimeout = conf.getTimeDuration(
+          OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_KEY,
+              defaultDuration, TimeUnit.MILLISECONDS);
+      conf.setTimeDuration(OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_KEY,
+          defaultDuration == curRatisRpcTimeout ?
+              RATIS_RPC_TIMEOUT : curRatisRpcTimeout, TimeUnit.MILLISECONDS);
+
+      long defaultNodeFailureTimeout =
+          OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT.
+              getDuration();
+      long curNodeFailureTimeout = conf.getTimeDuration(
+          OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY,
+          defaultNodeFailureTimeout,
+          OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT.
+              getUnit());
+      conf.setTimeDuration(
+          OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY,
+          curNodeFailureTimeout == defaultNodeFailureTimeout ?
+              NODE_FAILURE_TIMEOUT : curNodeFailureTimeout,
+          TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Start OM service with multiple OMs.
+     */
+    protected List<OzoneManager> createOMService() throws IOException,
+        AuthenticationException {
+
+      List<OzoneManager> omList = Lists.newArrayList();
+
+      int retryCount = 0;
+      int basePort = 10000;
+
+      while (true) {
+        try {
+          basePort = 10000 + RANDOM.nextInt(1000) * 4;
+          initOMHAConfig(basePort);
+
+          for (int i = 1; i<= numOfOMs; i++) {
+            // Set nodeId
+            String nodeId = OM_NODE_ID_PREFIX + i;
+            OzoneConfiguration config = new OzoneConfiguration(conf);
+            config.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeId);
+            // Set the OM http(s) address to null so that the cluster picks
+            // up the address set with service ID and node ID in initHAConfig
+            config.set(OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY, "");
+            config.set(OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY, "");
+
+            // Set metadata/DB dir base path
+            String metaDirPath = path + "/" + nodeId;
+            config.set(OZONE_METADATA_DIRS, metaDirPath);
+           // OMStorage omStore = new OMStorage(config);
+           // initializeOmStorage(omStore);
+            OzoneManager.omInit(config);
+            OzoneManager om = OzoneManager.createOm(config);
+            if (certClient != null) {
+              om.setCertClient(certClient);
+            }
+            omList.add(om);
+
+            if (i <= numOfActiveOMs) {
+              om.start();
+              activeOMs.add(om);
+              LOG.info("Started OzoneManager RPC server at {}",
+                  om.getOmRpcServerAddr());
+            } else {
+              inactiveOMs.add(om);
+              LOG.info("Intialized OzoneManager at {}. This OM is currently "
+                      + "inactive (not running).", om.getOmRpcServerAddr());
+            }
+          }
+
+          // Set default OM address to point to the first OM. Clients would
+          // try connecting to this address by default
+          conf.set(OMConfigKeys.OZONE_OM_ADDRESS_KEY,
+              NetUtils.getHostPortString(omList.get(0).getOmRpcServerAddr()));
+
+          break;
+        } catch (BindException e) {
+          for (OzoneManager om : omList) {
+            om.stop();
+            om.join();
+            LOG.info("Stopping OzoneManager server at {}",
+                om.getOmRpcServerAddr());
+          }
+          omList.clear();
+          ++retryCount;
+          LOG.info("MiniOzoneHACluster port conflicts, retried {} times",
+                  retryCount);
+        }
+      }
+      return omList;
+    }
+
+    /**
+     * Start OM service with multiple OMs.
+     */
+    protected List<StorageContainerManager> createSCMService()
+        throws IOException, AuthenticationException {
+      List<StorageContainerManager> scmList = Lists.newArrayList();
+
+      int retryCount = 0;
+      int basePort = 12000;
+
+      while (true) {
+        try {
+          basePort = 12000 + RANDOM.nextInt(1000) * 4;
+          initSCMHAConfig(basePort);
+
+          for (int i = 1; i<= numOfSCMs; i++) {
+            // Set nodeId
+            String nodeId = SCM_NODE_ID_PREFIX + i;
+            String metaDirPath = path + "/" + nodeId;
+            OzoneConfiguration scmConfig = new OzoneConfiguration(conf);
+            scmConfig.set(OZONE_METADATA_DIRS, metaDirPath);
+            scmConfig.set(ScmConfigKeys.OZONE_SCM_NODE_ID_KEY, nodeId);
+            scmConfig.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, true);
+
+            configureSCM();
+            if (i == 1) {
+              StorageContainerManager.scmInit(scmConfig, clusterId);
+            } else {
+              StorageContainerManager.scmBootstrap(scmConfig);
+            }
+            StorageContainerManager scm = TestUtils.getScmSimple(scmConfig);
+            HealthyPipelineSafeModeRule rule =
+                scm.getScmSafeModeManager().getHealthyPipelineSafeModeRule();
+            if (rule != null) {
+              // Set threshold to wait for safe mode exit -
+              // this is needed since a pipeline is marked open only after
+              // leader election.
+              rule.setHealthyPipelineThresholdCount(numOfDatanodes / 3);
+            }
+            scmList.add(scm);
+
+            if (i <= numOfActiveSCMs) {
+              scm.start();
+              activeSCMs.add(scm);
+              LOG.info("Started SCM RPC server at {}",
+                  scm.getClientProtocolServer());
+            } else {
+              inactiveSCMs.add(scm);
+              LOG.info("Intialized SCM at {}. This SCM is currently "
+                  + "inactive (not running).", scm.getClientProtocolServer());
+            }
+          }
+
+
+          break;
+        } catch (BindException e) {
+          for (StorageContainerManager scm : scmList) {
+            scm.stop();
+            scm.join();
+            LOG.info("Stopping StorageContainerManager server at {}",
+                scm.getClientProtocolServer());
+          }
+          scmList.clear();
+          ++retryCount;
+          LOG.info("MiniOzoneHACluster port conflicts, retried {} times",
+              retryCount);
+        }
+      }
+      return scmList;
+    }
+
+    /**
+     * Initialize HA related configurations.
+     */
+    private void initSCMHAConfig(int basePort) throws IOException {
+      // Set configurations required for starting OM HA service, because that
+      // is the serviceID being passed to start Ozone HA cluster.
+      // Here setting internal service and OZONE_OM_SERVICE_IDS_KEY, in this
+      // way in OM start it uses internal service id to find it's service id.
+      conf.set(ScmConfigKeys.OZONE_SCM_SERVICE_IDS_KEY, scmServiceId);
+      conf.set(ScmConfigKeys.OZONE_SCM_DEFAULT_SERVICE_ID, scmServiceId);
+      String scmNodesKey = ConfUtils.addKeySuffixes(
+          ScmConfigKeys.OZONE_SCM_NODES_KEY, scmServiceId);
+      StringBuilder scmNodesKeyValue = new StringBuilder();
+      StringBuilder scmNames = new StringBuilder();
+
+      int port = basePort;
+
+      for (int i = 1; i <= numOfSCMs; i++, port+=10) {
+        String scmNodeId = SCM_NODE_ID_PREFIX + i;
+        scmNodesKeyValue.append(",").append(scmNodeId);
+        String scmAddrKey = ConfUtils.addKeySuffixes(
+            ScmConfigKeys.OZONE_SCM_ADDRESS_KEY, scmServiceId, scmNodeId);
+        String scmHttpAddrKey = ConfUtils.addKeySuffixes(
+            ScmConfigKeys.OZONE_SCM_HTTP_ADDRESS_KEY, scmServiceId, scmNodeId);
+        String scmHttpsAddrKey = ConfUtils.addKeySuffixes(
+            ScmConfigKeys.OZONE_SCM_HTTPS_ADDRESS_KEY, scmServiceId, scmNodeId);
+        String scmRatisPortKey = ConfUtils.addKeySuffixes(
+            ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY, scmServiceId, scmNodeId);
+        String dnPortKey = ConfUtils.addKeySuffixes(
+            ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY,
+            scmServiceId, scmNodeId);
+        String blockClientKey = ConfUtils.addKeySuffixes(
+            ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY,
+            scmServiceId, scmNodeId);
+        String ssClientKey = ConfUtils.addKeySuffixes(
+            ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY,
+            scmServiceId, scmNodeId);
+        String scmGrpcPortKey = ConfUtils.addKeySuffixes(
+            ScmConfigKeys.OZONE_SCM_GRPC_PORT_KEY, scmServiceId, scmNodeId);
+
+        conf.set(scmAddrKey, "127.0.0.1");
+        conf.set(scmHttpAddrKey, "127.0.0.1:" + (port + 2));
+        conf.set(scmHttpsAddrKey, "127.0.0.1:" + (port + 3));
+        conf.setInt(scmRatisPortKey, port + 4);
+        //conf.setInt("ozone.scm.ha.ratis.bind.port", port + 4);
+        conf.set(dnPortKey, "127.0.0.1:" + (port + 5));
+        conf.set(blockClientKey, "127.0.0.1:" + (port + 6));
+        conf.set(ssClientKey, "127.0.0.1:" + (port + 7));
+        conf.setInt(scmGrpcPortKey, port + 8);
+        scmNames.append(",").append("localhost:" + (port + 5));
+        conf.set(ScmConfigKeys.
+            OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, "127.0.0.1:" + (port + 6));
+      }
+
+      conf.set(scmNodesKey, scmNodesKeyValue.substring(1));
+      conf.set(ScmConfigKeys.OZONE_SCM_NAMES, scmNames.substring(1));
+    }
+
+    /**
+     * Initialize HA related configurations.
+     */
+    private void initOMHAConfig(int basePort) throws IOException {
+      // Set configurations required for starting OM HA service, because that
+      // is the serviceID being passed to start Ozone HA cluster.
+      // Here setting internal service and OZONE_OM_SERVICE_IDS_KEY, in this
+      // way in OM start it uses internal service id to find it's service id.
+      conf.set(OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY, omServiceId);
+      conf.set(OMConfigKeys.OZONE_OM_INTERNAL_SERVICE_ID, omServiceId);
+      String omNodesKey = ConfUtils.addKeySuffixes(
+          OMConfigKeys.OZONE_OM_NODES_KEY, omServiceId);
+      StringBuilder omNodesKeyValue = new StringBuilder();
+
+      int port = basePort;
+
+      for (int i = 1; i <= numOfOMs; i++, port+=6) {
+        String omNodeId = OM_NODE_ID_PREFIX + i;
+        omNodesKeyValue.append(",").append(omNodeId);
+        String omAddrKey = ConfUtils.addKeySuffixes(
+            OMConfigKeys.OZONE_OM_ADDRESS_KEY, omServiceId, omNodeId);
+        String omHttpAddrKey = ConfUtils.addKeySuffixes(
+            OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY, omServiceId, omNodeId);
+        String omHttpsAddrKey = ConfUtils.addKeySuffixes(
+            OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY, omServiceId, omNodeId);
+        String omRatisPortKey = ConfUtils.addKeySuffixes(
+            OMConfigKeys.OZONE_OM_RATIS_PORT_KEY, omServiceId, omNodeId);
+
+        conf.set(omAddrKey, "127.0.0.1:" + port);
+        conf.set(omHttpAddrKey, "127.0.0.1:" + (port + 2));
+        conf.set(omHttpsAddrKey, "127.0.0.1:" + (port + 3));
+        conf.setInt(omRatisPortKey, port + 4);
+      }
+
+      conf.set(omNodesKey, omNodesKeyValue.substring(1));
+    }
+  }
+
+  @FunctionalInterface
+  public interface CheckedConsumer<T> {
+    void apply(T t) throws IOException;
+  }
+
+  /**
+   * MiniOzoneHAService is a helper class used for both SCM and OM HA.
+   * This class keeps track of active and inactive OM/SCM services
+   * @param <Type>
+   */
+  static class MiniOzoneHAService<Type> {
+    private Map<String, Type> serviceMap;
+    private List<Type> services;
+    private String serviceId;
+    private String serviceName;
+
+    // Active services s denote OM/SCM services which are up and running
+    private List<Type> activeServices;
+    private List<Type> inactiveServices;
+
+    MiniOzoneHAService(String name, List<Type> activeList,
+        List<Type> inactiveList, String serviceId,
+        Function<Type, String> idProvider) {
+      this.serviceName = name;
+      this.serviceMap = Maps.newHashMap();
+      if (activeList != null) {
+        for (Type service : activeList) {
+          this.serviceMap.put(idProvider.apply(service), service);
+        }
+      }
+      if (inactiveList != null) {
+        for (Type service : inactiveList) {
+          this.serviceMap.put(idProvider.apply(service), service);
+        }
+      }
+      this.services = new ArrayList<>(serviceMap.values());
+      this.activeServices = activeList;
+      this.inactiveServices = inactiveList;
+      this.serviceId = serviceId;
+
+      // If the serviceID is null, then this should be a non-HA cluster.
+      if (serviceId == null) {
+        Preconditions.checkArgument(services.size() <= 1);
+      }
+    }
+
+    public String getServiceId() {
+      return serviceId;
+    }
+
+    public List<Type> getServices() {
+      return services;
+    }
+
+    public boolean isServiceActive(String id) {
+      return activeServices.contains(serviceMap.get(id));
+    }
+
+    public Type getServiceByIndex(int index) {
+      return this.services.get(index);
+    }
+
+    public Type getServiceById(String id) {
+      return this.serviceMap.get(id);
+    }
+
+    public void startInactiveService(String id,
+        CheckedConsumer<Type> serviceStarter) throws IOException {
+      Type service = serviceMap.get(id);
+      if (!inactiveServices.contains(service)) {
+        throw new IOException(serviceName + " is already active.");
+      } else {
+        serviceStarter.apply(service);
+        activeServices.add(service);
+        inactiveServices.remove(service);
+      }
+    }
+  }
+
+  static class OMHAService extends MiniOzoneHAService<OzoneManager> {
+    OMHAService(List<OzoneManager> activeList, List<OzoneManager> inactiveList,
+                String serviceId) {
+      super("OM", activeList, inactiveList, serviceId,
+          OzoneManager::getOMNodeId);
+    }
+  }
+
+  static class SCMHAService extends
+      MiniOzoneHAService<StorageContainerManager> {
+    SCMHAService(List<StorageContainerManager> activeList,
+                 List<StorageContainerManager> inactiveList,
+                 String serviceId) {
+      super("SCM", activeList, inactiveList, serviceId,
+          StorageContainerManager::getScmId);
+    }
+  }
+
+  public List<StorageContainerManager> getStorageContainerManagers() {
+    return this.scmhaService.getServices();
+  }
+
+  public StorageContainerManager getStorageContainerManager() {
+    return getStorageContainerManagers().get(0);
+  }
+
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneOMHAClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneOMHAClusterImpl.java
index aa40b2e..a28c07a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneOMHAClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneOMHAClusterImpl.java
@@ -18,63 +18,27 @@
 
 package org.apache.hadoop.ozone;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.ozone.client.OzoneClient;
-import org.apache.hadoop.ozone.client.OzoneClientFactory;
-import org.apache.hadoop.ozone.ha.ConfUtils;
-import org.apache.hadoop.ozone.om.OMConfigKeys;
-import org.apache.hadoop.ozone.om.OMStorage;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.recon.ReconServer;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.net.BindException;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+import java.util.Collections;
 
 /**
  * MiniOzoneOMHAClusterImpl creates a complete in-process Ozone cluster
  * with OM HA suitable for running tests.  The cluster consists of a set of
  * OzoneManagers, StorageContainerManager and multiple DataNodes.
  */
-public class MiniOzoneOMHAClusterImpl extends MiniOzoneClusterImpl {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(MiniOzoneOMHAClusterImpl.class);
-
-  private Map<String, OzoneManager> ozoneManagerMap;
-  private List<OzoneManager> ozoneManagers;
-  private String omServiceId;
-
-  // Active OMs denote OMs which are up and running
-  private List<OzoneManager> activeOMs;
-  private List<OzoneManager> inactiveOMs;
-
-  private int waitForOMToBeReadyTimeout = 120000; // 2 min
-
-  private static final Random RANDOM = new Random();
-  private static final int RATIS_RPC_TIMEOUT = 1000; // 1 second
+public final class MiniOzoneOMHAClusterImpl extends MiniOzoneHAClusterImpl {
   public static final int NODE_FAILURE_TIMEOUT = 2000; // 2 seconds
 
   /**
-   * Creates a new MiniOzoneCluster.
+   * Creates a new MiniOzoneOMHACluster.
    *
    * @throws IOException if there is an I/O error
    */
@@ -87,173 +51,14 @@ public class MiniOzoneOMHAClusterImpl extends MiniOzoneClusterImpl {
       List<HddsDatanodeService> hddsDatanodes,
       String omServiceId,
       ReconServer reconServer) {
-    super(conf, scm, hddsDatanodes, reconServer);
-
-    this.ozoneManagerMap = Maps.newHashMap();
-    if (activeOMList != null) {
-      for (OzoneManager om : activeOMList) {
-        this.ozoneManagerMap.put(om.getOMNodeId(), om);
-      }
-    }
-    if (inactiveOMList != null) {
-      for (OzoneManager om : inactiveOMList) {
-        this.ozoneManagerMap.put(om.getOMNodeId(), om);
-      }
-    }
-    this.ozoneManagers = new ArrayList<>(ozoneManagerMap.values());
-    this.activeOMs = activeOMList;
-    this.inactiveOMs = inactiveOMList;
-    this.omServiceId = omServiceId;
-
-    // If the serviceID is null, then this should be a non-HA cluster.
-    if (omServiceId == null) {
-      Preconditions.checkArgument(ozoneManagers.size() <= 1);
-    }
-  }
-
-  /**
-   * Creates a new MiniOzoneCluster with all OMs active.
-   * This is used by MiniOzoneChaosCluster.
-   */
-  protected MiniOzoneOMHAClusterImpl(
-      OzoneConfiguration conf,
-      List<OzoneManager> omList,
-      StorageContainerManager scm,
-      List<HddsDatanodeService> hddsDatanodes,
-      String omServiceId) {
-    this(conf, omList, null, scm, hddsDatanodes, omServiceId, null);
-  }
-
-  @Override
-  public String getServiceId() {
-    return omServiceId;
-  }
-
-  /**
-   * Returns the first OzoneManager from the list.
-   * @return
-   */
-  @Override
-  public OzoneManager getOzoneManager() {
-    return this.ozoneManagers.get(0);
-  }
-
-  @Override
-  public OzoneClient getRpcClient() throws IOException {
-    if (omServiceId == null) {
-      // Non-HA cluster.
-      return OzoneClientFactory.getRpcClient(getConf());
-    } else {
-      // HA cluster
-      return OzoneClientFactory.getRpcClient(omServiceId, getConf());
-    }
-  }
-
-  public boolean isOMActive(String omNodeId) {
-    return activeOMs.contains(ozoneManagerMap.get(omNodeId));
-  }
-
-  public OzoneManager getOzoneManager(int index) {
-    return this.ozoneManagers.get(index);
-  }
-
-  public OzoneManager getOzoneManager(String omNodeId) {
-    return this.ozoneManagerMap.get(omNodeId);
-  }
-
-  public List<OzoneManager> getOzoneManagersList() {
-    return ozoneManagers;
-  }
-
-  /**
-   * Get OzoneManager leader object.
-   * @return OzoneManager object, null if there isn't one or more than one
-   */
-  public OzoneManager getOMLeader() {
-    OzoneManager res = null;
-    for (OzoneManager ozoneManager : this.ozoneManagers) {
-      if (ozoneManager.isLeaderReady()) {
-        if (res != null) {
-          // Found more than one leader
-          // Return null, expect the caller to retry in a while
-          return null;
-        }
-        // Found a leader
-        res = ozoneManager;
-      }
-    }
-    return res;
-  }
-
-  /**
-   * Start a previously inactive OM.
-   */
-  public void startInactiveOM(String omNodeID) throws IOException {
-    OzoneManager ozoneManager = ozoneManagerMap.get(omNodeID);
-    if (!inactiveOMs.contains(ozoneManager)) {
-      throw new IOException("OM is already active.");
-    } else {
-      ozoneManager.start();
-      activeOMs.add(ozoneManager);
-      inactiveOMs.remove(ozoneManager);
-    }
-  }
-
-  @Override
-  public void restartOzoneManager() throws IOException {
-    for (OzoneManager ozoneManager : ozoneManagers) {
-      ozoneManager.stop();
-      ozoneManager.restart();
-    }
-  }
-
-  public void shutdownOzoneManager(OzoneManager ozoneManager) {
-    LOG.info("Shutting down OzoneManager " + ozoneManager.getOMNodeId());
-
-    ozoneManager.stop();
-  }
-
-  public void restartOzoneManager(OzoneManager ozoneManager, boolean waitForOM)
-      throws IOException, TimeoutException, InterruptedException {
-    LOG.info("Restarting OzoneManager " + ozoneManager.getOMNodeId());
-    ozoneManager.restart();
-
-    if (waitForOM) {
-      GenericTestUtils.waitFor(ozoneManager::isRunning,
-          1000, waitForOMToBeReadyTimeout);
-    }
-  }
-
-  @Override
-  public void stop() {
-    for (OzoneManager ozoneManager : ozoneManagers) {
-      if (ozoneManager != null) {
-        LOG.info("Stopping the OzoneManager {}", ozoneManager.getOMNodeId());
-        ozoneManager.stop();
-        ozoneManager.join();
-      }
-    }
-    super.stop();
-  }
-
-  public void stopOzoneManager(int index) {
-    ozoneManagers.get(index).stop();
-    ozoneManagers.get(index).join();
-  }
-
-  public void stopOzoneManager(String omNodeId) {
-    ozoneManagerMap.get(omNodeId).stop();
-    ozoneManagerMap.get(omNodeId).join();
+    super(conf, activeOMList, inactiveOMList, Collections.singletonList(scm),
+        null, hddsDatanodes, omServiceId, null, reconServer);
   }
 
   /**
    * Builder for configuring the MiniOzoneCluster to run.
    */
-  public static class Builder extends MiniOzoneClusterImpl.Builder {
-
-    private static final String NODE_ID_PREFIX = "omNode-";
-    private List<OzoneManager> activeOMs = new ArrayList<>();
-    private List<OzoneManager> inactiveOMs = new ArrayList<>();
+  public static class Builder extends MiniOzoneHAClusterImpl.Builder {
 
     /**
      * Creates a new Builder.
@@ -295,154 +100,16 @@ public class MiniOzoneOMHAClusterImpl extends MiniOzoneClusterImpl {
       }
 
       final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(
-          scm, reconServer);
+          Collections.singletonList(scm), reconServer);
 
-      MiniOzoneOMHAClusterImpl cluster = new MiniOzoneOMHAClusterImpl(conf,
-          activeOMs, inactiveOMs, scm, hddsDatanodes, omServiceId, reconServer);
+      MiniOzoneClusterImpl cluster = new MiniOzoneOMHAClusterImpl(conf,
+          getActiveOMs(), getInactiveOMs(), scm, hddsDatanodes,
+          omServiceId, reconServer);
 
       if (startDataNodes) {
         cluster.startHddsDatanodes();
       }
       return cluster;
     }
-
-    protected void initOMRatisConf() {
-      conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true);
-      conf.setInt(OMConfigKeys.OZONE_OM_HANDLER_COUNT_KEY, numOfOmHandlers);
-      
-      // If test change the following config values we will respect,
-      // otherwise we will set lower timeout values.
-      long defaultDuration = OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_DEFAULT
-              .getDuration();
-      long curRatisRpcTimeout = conf.getTimeDuration(
-          OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_KEY,
-              defaultDuration, TimeUnit.MILLISECONDS);
-      conf.setTimeDuration(OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_KEY,
-          defaultDuration == curRatisRpcTimeout ?
-              RATIS_RPC_TIMEOUT : curRatisRpcTimeout, TimeUnit.MILLISECONDS);
-
-      long defaultNodeFailureTimeout =
-          OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT.
-              getDuration();
-      long curNodeFailureTimeout = conf.getTimeDuration(
-          OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY,
-          defaultNodeFailureTimeout,
-          OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT.
-              getUnit());
-      conf.setTimeDuration(
-          OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY,
-          curNodeFailureTimeout == defaultNodeFailureTimeout ?
-              NODE_FAILURE_TIMEOUT : curNodeFailureTimeout,
-          TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     * Start OM service with multiple OMs.
-     */
-    protected List<OzoneManager> createOMService() throws IOException,
-        AuthenticationException {
-
-      List<OzoneManager> omList = Lists.newArrayList();
-
-      int retryCount = 0;
-      int basePort = 10000;
-
-      while (true) {
-        try {
-          basePort = 10000 + RANDOM.nextInt(1000) * 4;
-          initHAConfig(basePort);
-
-          for (int i = 1; i<= numOfOMs; i++) {
-            // Set nodeId
-            String nodeId = NODE_ID_PREFIX + i;
-            OzoneConfiguration config = new OzoneConfiguration(conf);
-            config.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeId);
-            // Set the OM http(s) address to null so that the cluster picks
-            // up the address set with service ID and node ID in initHAConfig
-            config.set(OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY, "");
-            config.set(OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY, "");
-
-            // Set metadata/DB dir base path
-            String metaDirPath = path + "/" + nodeId;
-            config.set(OZONE_METADATA_DIRS, metaDirPath);
-            OMStorage omStore = new OMStorage(config);
-            initializeOmStorage(omStore);
-
-            OzoneManager om = OzoneManager.createOm(config);
-            if (certClient != null) {
-              om.setCertClient(certClient);
-            }
-            omList.add(om);
-
-            if (i <= numOfActiveOMs) {
-              om.start();
-              activeOMs.add(om);
-              LOG.info("Started OzoneManager RPC server at {}",
-                  om.getOmRpcServerAddr());
-            } else {
-              inactiveOMs.add(om);
-              LOG.info("Intialized OzoneManager at {}. This OM is currently "
-                      + "inactive (not running).", om.getOmRpcServerAddr());
-            }
-          }
-
-          // Set default OM address to point to the first OM. Clients would
-          // try connecting to this address by default
-          conf.set(OMConfigKeys.OZONE_OM_ADDRESS_KEY,
-              NetUtils.getHostPortString(omList.get(0).getOmRpcServerAddr()));
-
-          break;
-        } catch (BindException e) {
-          for (OzoneManager om : omList) {
-            om.stop();
-            om.join();
-            LOG.info("Stopping OzoneManager server at {}",
-                om.getOmRpcServerAddr());
-          }
-          omList.clear();
-          ++retryCount;
-          LOG.info("MiniOzoneOMHACluster port conflicts, retried {} times",
-                  retryCount);
-        }
-      }
-      return omList;
-    }
-
-    /**
-     * Initialize HA related configurations.
-     */
-    private void initHAConfig(int basePort) throws IOException {
-      // Set configurations required for starting OM HA service, because that
-      // is the serviceID being passed to start Ozone HA cluster.
-      // Here setting internal service and OZONE_OM_SERVICE_IDS_KEY, in this
-      // way in OM start it uses internal service id to find it's service id.
-      conf.set(OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY, omServiceId);
-      conf.set(OMConfigKeys.OZONE_OM_INTERNAL_SERVICE_ID, omServiceId);
-      String omNodesKey = ConfUtils.addKeySuffixes(
-          OMConfigKeys.OZONE_OM_NODES_KEY, omServiceId);
-      StringBuilder omNodesKeyValue = new StringBuilder();
-
-      int port = basePort;
-
-      for (int i = 1; i <= numOfOMs; i++, port+=6) {
-        String omNodeId = NODE_ID_PREFIX + i;
-        omNodesKeyValue.append(",").append(omNodeId);
-        String omAddrKey = ConfUtils.addKeySuffixes(
-            OMConfigKeys.OZONE_OM_ADDRESS_KEY, omServiceId, omNodeId);
-        String omHttpAddrKey = ConfUtils.addKeySuffixes(
-            OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY, omServiceId, omNodeId);
-        String omHttpsAddrKey = ConfUtils.addKeySuffixes(
-            OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY, omServiceId, omNodeId);
-        String omRatisPortKey = ConfUtils.addKeySuffixes(
-            OMConfigKeys.OZONE_OM_RATIS_PORT_KEY, omServiceId, omNodeId);
-
-        conf.set(omAddrKey, "127.0.0.1:" + port);
-        conf.set(omHttpAddrKey, "127.0.0.1:" + (port + 2));
-        conf.set(omHttpsAddrKey, "127.0.0.1:" + (port + 3));
-        conf.setInt(omRatisPortKey, port + 4);
-      }
-
-      conf.set(omNodesKey, omNodesKeyValue.substring(1));
-    }
   }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
index 9a6db3a..76a4f82 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
@@ -39,6 +39,9 @@ import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
 import org.apache.hadoop.hdds.scm.ScmConfig;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.ha.SCMHANodeDetails;
+import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
+import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
 import org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig;
 import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -326,6 +329,10 @@ public final class TestSecureOzoneCluster {
     scmStore.setScmId(scmId);
     // writes the version file properties
     scmStore.initialize();
+    if (!SCMHAUtils.isSCMHAEnabled(conf)) {
+      SCMRatisServerImpl.initialize(clusterId, scmId,
+          SCMHANodeDetails.loadSCMHAConfig(conf).getLocalNodeDetails(), conf);
+    }
   }
 
   @Test
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index b6d46fd..fce4e57 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -463,15 +463,17 @@ public class TestStorageContainerManager {
     Path scmPath = Paths.get(path, "scm-meta");
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
 
+    UUID clusterId = UUID.randomUUID();
+    String testClusterId = clusterId.toString();
     // This will initialize SCM
-    StorageContainerManager.scmInit(conf, "testClusterId");
+    StorageContainerManager.scmInit(conf, testClusterId);
 
     SCMStorageConfig scmStore = new SCMStorageConfig(conf);
     Assert.assertEquals(NodeType.SCM, scmStore.getNodeType());
-    Assert.assertEquals("testClusterId", scmStore.getClusterID());
-    StorageContainerManager.scmInit(conf, "testClusterIdNew");
+    Assert.assertEquals(testClusterId, scmStore.getClusterID());
+    StorageContainerManager.scmInit(conf, testClusterId);
     Assert.assertEquals(NodeType.SCM, scmStore.getNodeType());
-    Assert.assertEquals("testClusterId", scmStore.getClusterID());
+    Assert.assertEquals(testClusterId, scmStore.getClusterID());
   }
 
   @Test
@@ -483,9 +485,10 @@ public class TestStorageContainerManager {
     Path scmPath = Paths.get(path, "scm-meta");
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
 
+    final UUID clusterId = UUID.randomUUID();
     // This will initialize SCM
-    StorageContainerManager.scmInit(conf, "testClusterId");
-    SCMRatisServerImpl.validateRatisGroupExists(conf, "testClusterId");
+    StorageContainerManager.scmInit(conf, clusterId.toString());
+    SCMRatisServerImpl.validateRatisGroupExists(conf, clusterId.toString());
   }
 
   @Test
@@ -500,11 +503,11 @@ public class TestStorageContainerManager {
         MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
     cluster.waitForClusterToBeReady();
     try {
+      final UUID clusterId = UUID.randomUUID();
       // This will initialize SCM
-      StorageContainerManager.scmInit(conf, "testClusterId");
+      StorageContainerManager.scmInit(conf, clusterId.toString());
       SCMStorageConfig scmStore = new SCMStorageConfig(conf);
-      Assert.assertEquals(NodeType.SCM, scmStore.getNodeType());
-      Assert.assertNotEquals("testClusterId", scmStore.getClusterID());
+      Assert.assertNotEquals(clusterId.toString(), scmStore.getClusterID());
     } finally {
       cluster.shutdown();
     }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
index 9057b0c..15f4581 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
@@ -114,6 +114,7 @@ public class TestContainerStateMachine {
             .setHbInterval(200)
             .setCertificateClient(new CertificateClientTestImpl(conf))
             .build();
+    cluster.setWaitForClusterToBeReadyTimeout(300000);
     cluster.waitForClusterToBeReady();
     cluster.getOzoneManager().startSecretManager();
     //the easiest way to create an open container is creating a key
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java
new file mode 100644
index 0000000..abf1e3d
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.scm;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.OzoneKey;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.junit.Rule;
+
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
+import static org.apache.hadoop.hdds.client.ReplicationType.STAND_ALONE;
+
+/**
+ * Base class for Ozone Manager HA tests.
+ */
+public class TestStorageContainerManagerHA {
+
+  private MiniOzoneHAClusterImpl cluster = null;
+  private OzoneConfiguration conf;
+  private String clusterId;
+  private String scmId;
+  private String omServiceId;
+  private static int numOfOMs = 3;
+  private String scmServiceId;
+  private static int numOfSCMs = 3;
+
+
+  @Rule
+  public Timeout timeout = new Timeout(300_000);
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true
+   *
+   * @throws IOException
+   */
+  @Before
+  public void init() throws Exception {
+    conf = new OzoneConfiguration();
+    clusterId = UUID.randomUUID().toString();
+    scmId = UUID.randomUUID().toString();
+    omServiceId = "om-service-test1";
+    scmServiceId = "scm-service-test1";
+    cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
+        .setClusterId(clusterId)
+        .setScmId(scmId)
+        .setOMServiceId(omServiceId)
+        .setSCMServiceId(scmServiceId)
+        .setNumOfStorageContainerManagers(numOfSCMs)
+        .setNumOfOzoneManagers(numOfOMs)
+        .build();
+    cluster.waitForClusterToBeReady();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @After
+  public void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testAllSCMAreRunning() throws Exception {
+    int count = 0;
+    List<StorageContainerManager> scms = cluster.getStorageContainerManagers();
+    Assert.assertEquals(numOfSCMs, scms.size());
+    int peerSize = cluster.getStorageContainerManager().getScmHAManager()
+        .getRatisServer().getDivision().getGroup().getPeers().size();
+    for (StorageContainerManager scm : scms) {
+      if (scm.checkLeader()) {
+        count++;
+      }
+      Assert.assertTrue(peerSize == numOfSCMs);
+    }
+    Assert.assertEquals(1, count);
+    count = 0;
+    List<OzoneManager> oms = cluster.getOzoneManagersList();
+    Assert.assertEquals(numOfOMs, oms.size());
+    for (OzoneManager om : oms) {
+      if (om.isLeaderReady()) {
+        count++;
+      }
+    }
+    Assert.assertEquals(1, count);
+    testPutKey();
+  }
+
+  public void testPutKey() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    Instant testStartTime = Instant.now();
+    ObjectStore store =
+        OzoneClientFactory.getRpcClient(cluster.getConf()).getObjectStore();
+    String value = "sample value";
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    String keyName = UUID.randomUUID().toString();
+
+    OzoneOutputStream out = bucket
+        .createKey(keyName, value.getBytes(UTF_8).length, STAND_ALONE, ONE,
+            new HashMap<>());
+    out.write(value.getBytes(UTF_8));
+    out.close();
+    OzoneKey key = bucket.getKey(keyName);
+    Assert.assertEquals(keyName, key.getName());
+    OzoneInputStream is = bucket.readKey(keyName);
+    byte[] fileContent = new byte[value.getBytes(UTF_8).length];
+    is.read(fileContent);
+    Assert.assertEquals(value, new String(fileContent, UTF_8));
+    Assert.assertFalse(key.getCreationTime().isBefore(testStartTime));
+    Assert.assertFalse(key.getModificationTime().isBefore(testStartTime));
+    is.close();
+    final OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+        .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
+        .setRefreshPipeline(true).build();
+    final OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+    final List<OmKeyLocationInfo> keyLocationInfos =
+        keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
+    long index = -1;
+    for (StorageContainerManager scm : cluster.getStorageContainerManagers()) {
+      if (scm.checkLeader()) {
+        index = getLastAppliedIndex(scm);
+      }
+    }
+    Assert.assertFalse(index == -1);
+    long finalIndex = index;
+    // Ensure all follower scms have caught up with the leader
+    GenericTestUtils.waitFor(() -> areAllScmInSync(finalIndex), 100, 10000);
+    final long containerID = keyLocationInfos.get(0).getContainerID();
+    for (int k = 0; k < numOfSCMs; k++) {
+      StorageContainerManager scm =
+          cluster.getStorageContainerManagers().get(k);
+      // flush to DB on each SCM
+      ((SCMRatisServerImpl) scm.getScmHAManager().getRatisServer())
+          .getStateMachine().takeSnapshot();
+      Assert.assertTrue(scm.getContainerManager()
+          .containerExist(ContainerID.valueOf(containerID)));
+      Assert.assertNotNull(scm.getScmMetadataStore().getContainerTable()
+          .get(ContainerID.valueOf(containerID)));
+    }
+  }
+
+  private long getLastAppliedIndex(StorageContainerManager scm) {
+    return scm.getScmHAManager().getRatisServer().getDivision().getInfo()
+        .getLastAppliedIndex();
+  }
+
+  private boolean areAllScmInSync(long leaderIndex) {
+    List<StorageContainerManager> scms = cluster.getStorageContainerManagers();
+    boolean sync = false;
+    for (StorageContainerManager scm : scms) {
+      sync = getLastAppliedIndex(scm) == leaderIndex;
+    }
+    return sync;
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
index 49d9f47..32b2ac9 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestingClient.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ContainerBlockID;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.AddSCMRequest;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult;
@@ -183,6 +184,11 @@ public class ScmBlockLocationTestingClient implements ScmBlockLocationProtocol {
   }
 
   @Override
+  public boolean addSCM(AddSCMRequest request) throws IOException {
+    return false;
+  }
+
+  @Override
   public List<DatanodeDetails> sortDatanodes(List<String> nodes,
       String clientMachine) throws IOException {
     return null;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org