You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by li...@apache.org on 2020/06/02 03:11:10 UTC

[hadoop-ozone] 01/07: HDDS-3185 Construct a standalone ratis server for SCM. (#720)

This is an automated email from the ASF dual-hosted git repository.

licheng pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit 8c25beb6a03be702f29e576e428a93d67ae0aafe
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Fri Mar 27 17:22:40 2020 +0800

    HDDS-3185 Construct a standalone ratis server for SCM. (#720)
    
    Contributed-by: Li Cheng <ti...@tencent.com>
---
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |  87 ++++
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   5 +-
 .../common/src/main/resources/ozone-default.xml    | 180 ++++++++
 .../org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java  |  37 ++
 .../apache/hadoop/hdds/scm/ha/SCMNodeDetails.java  | 169 ++++++++
 .../apache/hadoop/hdds/scm/ha/package-info.java    |  22 +
 .../hadoop/hdds/scm/ratis/SCMRatisServer.java      | 461 +++++++++++++++++++++
 .../hadoop/hdds/scm/ratis/SCMStateMachine.java     |  35 ++
 .../hdds/scm/server/StorageContainerManager.java   |  29 +-
 .../hadoop/hdds/scm/ratis/TestSCMRatisServer.java  | 158 +++++++
 .../hadoop/ozone/TestOzoneConfigurationFields.java |   3 +
 11 files changed, 1183 insertions(+), 3 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index c6b1100..37a1833 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -274,6 +274,16 @@ public final class ScmConfigKeys {
   // able to send back a new list to the datanodes.
   public static final String OZONE_SCM_NAMES = "ozone.scm.names";
 
+  public static final String OZONE_SCM_INTERNAL_SERVICE_ID =
+      "ozone.scm.internal.service.id";
+
+  public static final String OZONE_SCM_SERVICE_IDS_KEY =
+      "ozone.scm.service.ids";
+  public static final String OZONE_SCM_NODES_KEY =
+      "ozone.scm.nodes";
+  public static final String OZONE_SCM_NODE_ID_KEY =
+      "ozone.scm.node.id";
+
   public static final int OZONE_SCM_DEFAULT_PORT =
       OZONE_SCM_DATANODE_PORT_DEFAULT;
   // The path where datanode ID is to be written to.
@@ -357,6 +367,83 @@ public final class ScmConfigKeys {
   public static final String HDDS_TRACING_ENABLED = "hdds.tracing.enabled";
   public static final boolean HDDS_TRACING_ENABLED_DEFAULT = false;
 
+  // SCM Ratis related
+  public static final String OZONE_SCM_HA_ENABLE_KEY
+      = "ozone.scm.ratis.enable";
+  public static final boolean OZONE_SCM_HA_ENABLE_DEFAULT
+      = false;
+  public static final String OZONE_SCM_RATIS_PORT_KEY
+      = "ozone.scm.ratis.port";
+  public static final int OZONE_SCM_RATIS_PORT_DEFAULT
+      = 9864;
+  public static final String OZONE_SCM_RATIS_RPC_TYPE_KEY
+      = "ozone.scm.ratis.rpc.type";
+  public static final String OZONE_SCM_RATIS_RPC_TYPE_DEFAULT
+      = "GRPC";
+
+  // SCM Ratis Log configurations
+  public static final String OZONE_SCM_RATIS_STORAGE_DIR
+      = "ozone.scm.ratis.storage.dir";
+  public static final String OZONE_SCM_RATIS_SEGMENT_SIZE_KEY
+      = "ozone.scm.ratis.segment.size";
+  public static final String OZONE_SCM_RATIS_SEGMENT_SIZE_DEFAULT
+      = "16KB";
+  public static final String OZONE_SCM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY
+      = "ozone.scm.ratis.segment.preallocated.size";
+  public static final String OZONE_SCM_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT
+      = "16KB";
+
+  // SCM Ratis Log Appender configurations
+  public static final String
+      OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS =
+      "ozone.scm.ratis.log.appender.queue.num-elements";
+  public static final int
+      OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS_DEFAULT = 1024;
+  public static final String OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT =
+      "ozone.scm.ratis.log.appender.queue.byte-limit";
+  public static final String
+      OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT = "32MB";
+  public static final String OZONE_SCM_RATIS_LOG_PURGE_GAP =
+      "ozone.scm.ratis.log.purge.gap";
+  public static final int OZONE_SCM_RATIS_LOG_PURGE_GAP_DEFAULT = 1000000;
+
+  // SCM Ratis server configurations
+  public static final String OZONE_SCM_RATIS_SERVER_REQUEST_TIMEOUT_KEY
+      = "ozone.scm.ratis.server.request.timeout";
+  public static final TimeDuration
+      OZONE_SCM_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT
+      = TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS);
+  public static final String
+      OZONE_SCM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_KEY
+      = "ozone.scm.ratis.server.retry.cache.timeout";
+  public static final TimeDuration
+      OZONE_SCM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DEFAULT
+      = TimeDuration.valueOf(600000, TimeUnit.MILLISECONDS);
+  public static final String OZONE_SCM_RATIS_MINIMUM_TIMEOUT_KEY
+      = "ozone.scm.ratis.minimum.timeout";
+  public static final TimeDuration OZONE_SCM_RATIS_MINIMUM_TIMEOUT_DEFAULT
+      = TimeDuration.valueOf(1, TimeUnit.SECONDS);
+
+  // SCM Ratis Leader Election configurations
+  public static final String
+      OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY =
+      "ozone.scm.leader.election.minimum.timeout.duration";
+  public static final TimeDuration
+      OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT =
+      TimeDuration.valueOf(1, TimeUnit.SECONDS);
+  public static final String OZONE_SCM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY
+      = "ozone.scm.ratis.server.failure.timeout.duration";
+  public static final TimeDuration
+      OZONE_SCM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT
+      = TimeDuration.valueOf(120, TimeUnit.SECONDS);
+
+  // SCM Leader server role check interval
+  public static final String OZONE_SCM_RATIS_SERVER_ROLE_CHECK_INTERVAL_KEY
+      = "ozone.scm.ratis.server.role.check.interval";
+  public static final TimeDuration
+      OZONE_SCM_RATIS_SERVER_ROLE_CHECK_INTERVAL_DEFAULT
+      = TimeDuration.valueOf(15, TimeUnit.SECONDS);
+
   /**
    * Never constructed.
    */
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index b31f80d..bb8e145 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -345,10 +345,11 @@ public final class OzoneConsts {
   public static final String GDPR_LENGTH = "length";
   public static final String GDPR_SECRET = "secret";
   public static final String GDPR_ALGORITHM = "algorithm";
-
-
+  
   // Transaction Info
   public static final String TRANSACTION_INFO_KEY = "#TRANSACTIONINFO";
   public static final String TRANSACTION_INFO_SPLIT_KEY = "#";
 
+  // SCM HA
+  public static final String SCM_SERVICE_ID_DEFAULT = "scmServiceIdDefault";
 }
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 26bf2ba..8657939 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1906,6 +1906,186 @@
     <tag>OZONE, HDDS, SECURITY</tag>
     <description>SCM security server port.</description>
   </property>
+  <property>
+    <name>ozone.scm.service.ids</name>
+    <value></value>
+    <tag>OZONE, SCM, HA</tag>
+    <description>
+      Comma-separated list of SCM service Ids. This property allows the client
+      to figure out quorum of OzoneManager address.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.internal.service.id</name>
+    <value></value>
+    <tag>OZONE, SCM, HA</tag>
+    <description>
+      Service ID of the SCM. If this is not set fall back to
+      ozone.scm.service.ids to find the service ID it belongs to.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.nodes.EXAMPLESCMSERVICEID</name>
+    <value></value>
+    <tag>OZONE, SCM, HA</tag>
+    <description>
+      Comma-separated list of SCM node Ids for a given SCM service ID (eg.
+      EXAMPLESCMSERVICEID). The SCM service ID should be the value (one of the
+      values if there are multiple) set for the parameter ozone.scm.service.ids.
+
+      Unique identifiers for each SCM Node, delimited by commas. This will be
+      used by SCMs in HA setup to determine all the SCMs
+      belonging to the same SCM in the cluster. For example, if you
+      used “scmService1” as the SCM service ID previously, and you wanted to
+      use “scm1”, “scm2” and "scm3" as the individual IDs of the SCMs,
+      you would configure a property ozone.scm.nodes.scmService1, and its value
+      "scm1,scm2,scm3".
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.node.id</name>
+    <value></value>
+    <tag>OZONE, SCM, HA</tag>
+    <description>
+      The ID of this SCM node. If the SCM node ID is not configured it
+      is determined automatically by matching the local node's address
+      with the configured address.
+
+      If node ID is not deterministic from the configuration, then it is set
+      to the scmId from the SCM version file.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.ratis.enable</name>
+    <value>false</value>
+    <tag>OZONE, SCM, HA, RATIS</tag>
+    <description>Property to enable or disable Ratis server on SCM.
+      Please note - this is a temporary property to disable SCM Ratis server.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.scm.ratis.port</name>
+    <value>9872</value>
+    <tag>OZONE, SCM, HA, RATIS</tag>
+    <description>
+      The port number of the SCM's Ratis server.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.scm.ratis.rpc.type</name>
+    <value>GRPC</value>
+    <tag>OZONE, SCM, HA, RATIS</tag>
+    <description>Ratis supports different kinds of transports like netty, GRPC,
+      Hadoop RPC etc. This picks one of those for this cluster.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.scm.ratis.storage.dir</name>
+    <value/>
+    <tag>OZONE, SCM, HA, RATIS, STORAGE</tag>
+    <description>This directory is used for storing SCM's Ratis metadata like
+      logs. If this is not set then default metadata dirs is used. A warning
+      will be logged if this not set. Ideally, this should be mapped to a
+      fast disk like an SSD.
+      If undefined, SCM ratis storage dir will fallback to ozone.metadata.dirs.
+      This fallback approach is not recommended for production environments.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.scm.ratis.segment.size</name>
+    <value>16KB</value>
+    <tag>OZONE, SCM, HA, RATIS, PERFORMANCE</tag>
+    <description>The size of the raft segment used by Apache Ratis on SCM.
+      (16 KB by default)
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.scm.ratis.segment.preallocated.size</name>
+    <value>16KB</value>
+    <tag>OZONE, SCM, HA, RATIS, PERFORMANCE</tag>
+    <description>The size of the buffer which is preallocated for raft segment
+      used by Apache Ratis on SCM.(16 KB by default)
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.scm.ratis.log.appender.queue.num-elements</name>
+    <value>1024</value>
+    <tag>OZONE, DEBUG, SCM, HA, RATIS</tag>
+    <description>Number of operation pending with Raft's Log Worker.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.ratis.log.appender.queue.byte-limit</name>
+    <value>32MB</value>
+    <tag>OZONE, DEBUG, SCM, HA, RATIS</tag>
+    <description>Byte limit for Raft's Log Worker queue.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.ratis.log.purge.gap</name>
+    <value>1000000</value>
+    <tag>OZONE, SCM, HA, RATIS</tag>
+    <description>The minimum gap between log indices for Raft server to purge
+      its log segments after taking snapshot.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.ratis.server.request.timeout</name>
+    <value>3s</value>
+    <tag>OZONE, SCM, HA, RATIS</tag>
+    <description>The timeout duration for SCM's ratis server request .</description>
+  </property>
+
+  <property>
+    <name>ozone.scm.ratis.server.retry.cache.timeout</name>
+    <value>600000ms</value>
+    <tag>OZONE, SCM, HA, RATIS</tag>
+    <description>Retry Cache entry timeout for SCM's ratis server.</description>
+  </property>
+
+  <property>
+    <name>ozone.scm.ratis.minimum.timeout</name>
+    <value>1s</value>
+    <tag>OZONE, SCM, HA, RATIS</tag>
+    <description>The minimum timeout duration for SCM's Ratis server rpc.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.scm.leader.election.minimum.timeout.duration</name>
+    <value>1s</value>
+    <tag>OZONE, SCM, HA, RATIS</tag>
+    <description>The minimum timeout duration for SCM ratis leader election.
+      Default is 1s.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.scm.ratis.server.failure.timeout.duration</name>
+    <value>120s</value>
+    <tag>OZONE, SCM, HA, RATIS</tag>
+    <description>The timeout duration for ratis server failure detection,
+      once the threshold has reached, the ratis state machine will be informed
+      about the failure in the ratis ring.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.scm.ratis.server.role.check.interval</name>
+    <value>15s</value>
+    <tag>OZONE, SCM, HA, RATIS</tag>
+    <description>The interval between SCM leader performing a role
+      check on its ratis server. Ratis server informs SCM if it
+      loses the leader role. The scheduled check is an secondary
+      check to ensure that the leader role is updated periodically
+      .</description>
+  </property>
 
   <property>
     <name>hdds.metadata.dir</name>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
new file mode 100644
index 0000000..c0364ad
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+
+/**
+ * Utility class used by SCM HA.
+ */
+public final class SCMHAUtils {
+  private SCMHAUtils() {
+    // not used
+  }
+
+  // Check if SCM HA is enabled.
+  public static boolean isSCMHAEnabled(OzoneConfiguration conf) {
+    return conf.getBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY,
+        ScmConfigKeys.OZONE_SCM_HA_ENABLE_DEFAULT);
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java
new file mode 100644
index 0000000..8d66187
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_INTERNAL_SERVICE_ID;
+
+/**
+ * Construct SCM node details.
+ */
+public final class SCMNodeDetails {
+  private String scmServiceId;
+  private String scmNodeId;
+  private InetSocketAddress rpcAddress;
+  private int rpcPort;
+  private int ratisPort;
+  private String httpAddress;
+  private String httpsAddress;
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMNodeDetails.class);
+
+  /**
+   * Constructs SCMNodeDetails object.
+   */
+  private SCMNodeDetails(String serviceId, String nodeId,
+                        InetSocketAddress rpcAddr, int rpcPort, int ratisPort,
+                        String httpAddress, String httpsAddress) {
+    this.scmServiceId = serviceId;
+    this.scmNodeId = nodeId;
+    this.rpcAddress = rpcAddr;
+    this.rpcPort = rpcPort;
+    this.ratisPort = ratisPort;
+    this.httpAddress = httpAddress;
+    this.httpsAddress = httpsAddress;
+  }
+
+  @Override
+  public String toString() {
+    return "SCMNodeDetails["
+        + "scmServiceId=" + scmServiceId +
+        ", scmNodeId=" + scmNodeId +
+        ", rpcAddress=" + rpcAddress +
+        ", rpcPort=" + rpcPort +
+        ", ratisPort=" + ratisPort +
+        ", httpAddress=" + httpAddress +
+        ", httpsAddress=" + httpsAddress +
+        "]";
+  }
+
+  /**
+   * Builder class for SCMNodeDetails.
+   */
+  public static class Builder {
+    private String scmServiceId;
+    private String scmNodeId;
+    private InetSocketAddress rpcAddress;
+    private int rpcPort;
+    private int ratisPort;
+    private String httpAddr;
+    private String httpsAddr;
+
+    public Builder setRpcAddress(InetSocketAddress rpcAddr) {
+      this.rpcAddress = rpcAddr;
+      this.rpcPort = rpcAddress.getPort();
+      return this;
+    }
+
+    public Builder setRatisPort(int port) {
+      this.ratisPort = port;
+      return this;
+    }
+
+    public Builder setSCMServiceId(String serviceId) {
+      this.scmServiceId = serviceId;
+      return this;
+    }
+
+    public Builder setSCMNodeId(String nodeId) {
+      this.scmNodeId = nodeId;
+      return this;
+    }
+
+    public Builder setHttpAddress(String httpAddress) {
+      this.httpAddr = httpAddress;
+      return this;
+    }
+
+    public Builder setHttpsAddress(String httpsAddress) {
+      this.httpsAddr = httpsAddress;
+      return this;
+    }
+
+    public SCMNodeDetails build() {
+      return new SCMNodeDetails(scmServiceId, scmNodeId, rpcAddress, rpcPort,
+          ratisPort, httpAddr, httpsAddr);
+    }
+  }
+
+  public String getSCMServiceId() {
+    return scmServiceId;
+  }
+
+  public String getSCMNodeId() {
+    return scmNodeId;
+  }
+
+  public InetSocketAddress getRpcAddress() {
+    return rpcAddress;
+  }
+
+  public InetAddress getAddress() {
+    return rpcAddress.getAddress();
+  }
+
+  public int getRatisPort() {
+    return ratisPort;
+  }
+
+  public int getRpcPort() {
+    return rpcPort;
+  }
+
+  public String getRpcAddressString() {
+    return NetUtils.getHostPortString(rpcAddress);
+  }
+
+  public static SCMNodeDetails initStandAlone(
+      OzoneConfiguration conf) throws IOException {
+    String localSCMServiceId = conf.getTrimmed(OZONE_SCM_INTERNAL_SERVICE_ID);
+    int ratisPort = conf.getInt(
+        ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY,
+        ScmConfigKeys.OZONE_SCM_RATIS_PORT_DEFAULT);
+    InetSocketAddress rpcAddress = new InetSocketAddress(
+        InetAddress.getLocalHost(), 0);
+    SCMNodeDetails scmNodeDetails = new SCMNodeDetails.Builder()
+        .setRatisPort(ratisPort)
+        .setRpcAddress(rpcAddress)
+        .setSCMNodeId(localSCMServiceId)
+        .setSCMServiceId(OzoneConsts.SCM_SERVICE_ID_DEFAULT)
+        .build();
+    return scmNodeDetails;
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/package-info.java
new file mode 100644
index 0000000..06fe168
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+/**
+ * This package contains classes related to SCM HA.
+ */
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMRatisServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMRatisServer.java
new file mode 100644
index 0000000..af1e5c2
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMRatisServer.java
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ratis;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.GroupInfoReply;
+import org.apache.ratis.protocol.GroupInfoRequest;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Class for SCM Ratis Server.
+ */
+public final class SCMRatisServer {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(SCMRatisServer.class);
+
+  private final StorageContainerManager scm;
+  private final SCMStateMachine scmStateMachine;
+
+  private final int port;
+  private final InetSocketAddress scmRatisAddress;
+  private final RaftServer server;
+  private final RaftGroupId raftGroupId;
+  private final RaftGroup raftGroup;
+  private final RaftPeerId raftPeerId;
+
+  private final ClientId clientId = ClientId.randomId();
+  private final ScheduledExecutorService scheduledRoleChecker;
+  private long roleCheckInitialDelayMs = 1000; // 1 second default
+  private long roleCheckIntervalMs;
+  private ReentrantReadWriteLock roleCheckLock = new ReentrantReadWriteLock();
+  private Optional<RaftProtos.RaftPeerRole> cachedPeerRole = Optional.empty();
+  private Optional<RaftPeerId> cachedLeaderPeerId = Optional.empty();
+
+  private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
+  private static long nextCallId() {
+    return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
+  }
+
+  private SCMRatisServer(Configuration conf,
+                         StorageContainerManager scm,
+                         String raftGroupIdStr, RaftPeerId localRaftPeerId,
+                         InetSocketAddress addr, List<RaftPeer> raftPeers)
+      throws IOException {
+    this.scm = scm;
+    this.scmRatisAddress = addr;
+    this.port = addr.getPort();
+    RaftProperties serverProperties = newRaftProperties(conf);
+
+    this.raftPeerId = localRaftPeerId;
+    this.raftGroupId = RaftGroupId.valueOf(
+        getRaftGroupIdFromOmServiceId(raftGroupIdStr));
+    this.raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers);
+
+    StringBuilder raftPeersStr = new StringBuilder();
+    for (RaftPeer peer : raftPeers) {
+      raftPeersStr.append(", ").append(peer.getAddress());
+    }
+    LOG.info("Instantiating SCM Ratis server with GroupID: {} and " +
+        "Raft Peers: {}", raftGroupIdStr, raftPeersStr.toString().substring(2));
+    this.scmStateMachine = getStateMachine();
+
+    this.server = RaftServer.newBuilder()
+        .setServerId(this.raftPeerId)
+        .setGroup(this.raftGroup)
+        .setProperties(serverProperties)
+        .setStateMachine(scmStateMachine)
+        .build();
+
+    // Run a scheduler to check and update the server role on the leader
+    // periodically
+    this.scheduledRoleChecker = Executors.newSingleThreadScheduledExecutor();
+    this.scheduledRoleChecker.scheduleWithFixedDelay(new Runnable() {
+      @Override
+      public void run() {
+        // Run this check only on the leader OM
+        if (cachedPeerRole.isPresent() &&
+            cachedPeerRole.get() == RaftProtos.RaftPeerRole.LEADER) {
+          updateServerRole();
+        }
+      }
+    }, roleCheckInitialDelayMs, roleCheckIntervalMs, TimeUnit.MILLISECONDS);
+  }
+
+  public static SCMRatisServer newSCMRatisServer(
+      Configuration conf, StorageContainerManager scm,
+      SCMNodeDetails scmNodeDetails, List<SCMNodeDetails> peers)
+      throws IOException {
+    String scmServiceId = scmNodeDetails.getSCMServiceId();
+
+    String scmNodeId = scmNodeDetails.getSCMNodeId();
+    RaftPeerId localRaftPeerId = RaftPeerId.getRaftPeerId(scmNodeId);
+    InetSocketAddress ratisAddr = new InetSocketAddress(
+        scmNodeDetails.getAddress(), scmNodeDetails.getRatisPort());
+
+    RaftPeer localRaftPeer = new RaftPeer(localRaftPeerId, ratisAddr);
+
+    List<RaftPeer> raftPeers = new ArrayList<>();
+    raftPeers.add(localRaftPeer);
+
+    for (SCMNodeDetails peer : peers) {
+      String peerNodeId = peer.getSCMNodeId();
+      InetSocketAddress peerRatisAddr = new InetSocketAddress(
+          peer.getAddress(), peer.getRatisPort());
+      RaftPeerId raftPeerId = RaftPeerId.valueOf(peerNodeId);
+      RaftPeer raftPeer = new RaftPeer(raftPeerId, peerRatisAddr);
+      // Add other SCMs in Ratis ring
+      raftPeers.add(raftPeer);
+    }
+
+    return new SCMRatisServer(conf, scm, scmServiceId, localRaftPeerId,
+        ratisAddr, raftPeers);
+  }
+
+  private UUID getRaftGroupIdFromOmServiceId(String scmServiceId) {
+    return UUID.nameUUIDFromBytes(scmServiceId.getBytes(
+        StandardCharsets.UTF_8));
+  }
+
+  private SCMStateMachine getStateMachine() {
+    return new SCMStateMachine(this);
+  }
+
+  private RaftProperties newRaftProperties(Configuration conf){
+    final RaftProperties properties = new RaftProperties();
+    // Set RPC type
+    final String rpcType = conf.get(
+        ScmConfigKeys.OZONE_SCM_RATIS_RPC_TYPE_KEY,
+        ScmConfigKeys.OZONE_SCM_RATIS_RPC_TYPE_DEFAULT);
+    final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType);
+    RaftConfigKeys.Rpc.setType(properties, rpc);
+    // Set the ratis port number
+    if (rpc == SupportedRpcType.GRPC) {
+      GrpcConfigKeys.Server.setPort(properties, port);
+    } else if (rpc == SupportedRpcType.NETTY) {
+      NettyConfigKeys.Server.setPort(properties, port);
+    }
+    // Set Ratis storage directory
+    String storageDir = SCMRatisServer.getSCMRatisDirectory(conf);
+    RaftServerConfigKeys.setStorageDirs(properties,
+        Collections.singletonList(new File(storageDir)));
+    // Set RAFT segment size
+    final int raftSegmentSize = (int) conf.getStorageSize(
+        ScmConfigKeys.OZONE_SCM_RATIS_SEGMENT_SIZE_KEY,
+        ScmConfigKeys.OZONE_SCM_RATIS_SEGMENT_SIZE_DEFAULT,
+        StorageUnit.BYTES);
+    RaftServerConfigKeys.Log.setSegmentSizeMax(properties,
+        SizeInBytes.valueOf(raftSegmentSize));
+    // Set RAFT segment pre-allocated size
+    final int raftSegmentPreallocatedSize = (int) conf.getStorageSize(
+        ScmConfigKeys.OZONE_SCM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY,
+        ScmConfigKeys.OZONE_SCM_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT,
+        StorageUnit.BYTES);
+    int logAppenderQueueNumElements = conf.getInt(
+        ScmConfigKeys.OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS,
+        ScmConfigKeys.OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS_DEFAULT);
+    final int logAppenderQueueByteLimit = (int) conf.getStorageSize(
+        ScmConfigKeys.OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT,
+        ScmConfigKeys.OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT,
+        StorageUnit.BYTES);
+    RaftServerConfigKeys.Log.Appender.setBufferElementLimit(properties,
+        logAppenderQueueNumElements);
+    RaftServerConfigKeys.Log.Appender.setBufferByteLimit(properties,
+        SizeInBytes.valueOf(logAppenderQueueByteLimit));
+    RaftServerConfigKeys.Log.setPreallocatedSize(properties,
+        SizeInBytes.valueOf(raftSegmentPreallocatedSize));
+    RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(properties,
+        false);
+    final int logPurgeGap = conf.getInt(
+        ScmConfigKeys.OZONE_SCM_RATIS_LOG_PURGE_GAP,
+        ScmConfigKeys.OZONE_SCM_RATIS_LOG_PURGE_GAP_DEFAULT);
+    RaftServerConfigKeys.Log.setPurgeGap(properties, logPurgeGap);
+    // For grpc set the maximum message size
+    // TODO: calculate the optimal max message size
+    GrpcConfigKeys.setMessageSizeMax(properties,
+        SizeInBytes.valueOf(logAppenderQueueByteLimit));
+
+    // Set the server request timeout
+    TimeUnit serverRequestTimeoutUnit =
+        ScmConfigKeys.OZONE_SCM_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT.getUnit();
+    long serverRequestTimeoutDuration = conf.getTimeDuration(
+        ScmConfigKeys.OZONE_SCM_RATIS_SERVER_REQUEST_TIMEOUT_KEY,
+        ScmConfigKeys.OZONE_SCM_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT
+            .getDuration(), serverRequestTimeoutUnit);
+    final TimeDuration serverRequestTimeout = TimeDuration.valueOf(
+        serverRequestTimeoutDuration, serverRequestTimeoutUnit);
+    RaftServerConfigKeys.Rpc.setRequestTimeout(properties,
+        serverRequestTimeout);
+    // Set timeout for server retry cache entry
+    TimeUnit retryCacheTimeoutUnit = ScmConfigKeys
+        .OZONE_SCM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DEFAULT.getUnit();
+    long retryCacheTimeoutDuration = conf.getTimeDuration(
+        ScmConfigKeys.OZONE_SCM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_KEY,
+        ScmConfigKeys.OZONE_SCM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DEFAULT
+            .getDuration(), retryCacheTimeoutUnit);
+    final TimeDuration retryCacheTimeout = TimeDuration.valueOf(
+        retryCacheTimeoutDuration, retryCacheTimeoutUnit);
+    RaftServerConfigKeys.RetryCache.setExpiryTime(properties,
+        retryCacheTimeout);
+    // Set the server min and max timeout
+    TimeUnit serverMinTimeoutUnit =
+        ScmConfigKeys.OZONE_SCM_RATIS_MINIMUM_TIMEOUT_DEFAULT.getUnit();
+    long serverMinTimeoutDuration = conf.getTimeDuration(
+        ScmConfigKeys.OZONE_SCM_RATIS_MINIMUM_TIMEOUT_KEY,
+        ScmConfigKeys.OZONE_SCM_RATIS_MINIMUM_TIMEOUT_DEFAULT
+            .getDuration(), serverMinTimeoutUnit);
+    final TimeDuration serverMinTimeout = TimeDuration.valueOf(
+        serverMinTimeoutDuration, serverMinTimeoutUnit);
+    long serverMaxTimeoutDuration =
+        serverMinTimeout.toLong(TimeUnit.MILLISECONDS) + 200;
+    final TimeDuration serverMaxTimeout = TimeDuration.valueOf(
+        serverMaxTimeoutDuration, serverMinTimeoutUnit);
+    RaftServerConfigKeys.Rpc.setTimeoutMin(properties,
+        serverMinTimeout);
+    RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
+        serverMaxTimeout);
+    // Set the number of maximum cached segments
+    RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2);
+    // TODO: set max write buffer size
+    // Set the ratis leader election timeout
+    TimeUnit leaderElectionMinTimeoutUnit =
+        ScmConfigKeys.OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT
+            .getUnit();
+    long leaderElectionMinTimeoutduration = conf.getTimeDuration(
+        ScmConfigKeys.OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
+        ScmConfigKeys.OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT
+            .getDuration(), leaderElectionMinTimeoutUnit);
+    final TimeDuration leaderElectionMinTimeout = TimeDuration.valueOf(
+        leaderElectionMinTimeoutduration, leaderElectionMinTimeoutUnit);
+    RaftServerConfigKeys.Rpc.setTimeoutMin(properties,
+        leaderElectionMinTimeout);
+    long leaderElectionMaxTimeout = leaderElectionMinTimeout.toLong(
+        TimeUnit.MILLISECONDS) + 200;
+    RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
+        TimeDuration.valueOf(leaderElectionMaxTimeout, TimeUnit.MILLISECONDS));
+    TimeUnit nodeFailureTimeoutUnit =
+        ScmConfigKeys.OZONE_SCM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT
+            .getUnit();
+    long nodeFailureTimeoutDuration = conf.getTimeDuration(
+        ScmConfigKeys.OZONE_SCM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY,
+        ScmConfigKeys.OZONE_SCM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT
+            .getDuration(), nodeFailureTimeoutUnit);
+    final TimeDuration nodeFailureTimeout = TimeDuration.valueOf(
+        nodeFailureTimeoutDuration, nodeFailureTimeoutUnit);
+    RaftServerConfigKeys.Notification.setNoLeaderTimeout(properties,
+        nodeFailureTimeout);
+    RaftServerConfigKeys.Rpc.setSlownessTimeout(properties,
+        nodeFailureTimeout);
+
+    // Ratis leader role check
+    TimeUnit roleCheckIntervalUnit =
+        ScmConfigKeys.OZONE_SCM_RATIS_SERVER_ROLE_CHECK_INTERVAL_DEFAULT
+            .getUnit();
+    long roleCheckIntervalDuration = conf.getTimeDuration(
+        ScmConfigKeys.OZONE_SCM_RATIS_SERVER_ROLE_CHECK_INTERVAL_KEY,
+        ScmConfigKeys.OZONE_SCM_RATIS_SERVER_ROLE_CHECK_INTERVAL_DEFAULT
+            .getDuration(), nodeFailureTimeoutUnit);
+    this.roleCheckIntervalMs = TimeDuration.valueOf(
+        roleCheckIntervalDuration, roleCheckIntervalUnit)
+        .toLong(TimeUnit.MILLISECONDS);
+    this.roleCheckInitialDelayMs = leaderElectionMinTimeout
+        .toLong(TimeUnit.MILLISECONDS);
+
+    return properties;
+  }
+
+  /**
+   * Start the Ratis server.
+   * @throws IOException
+   */
+  public void start() throws IOException {
+    LOG.info("Starting {} {} at port {}", getClass().getSimpleName(),
+        server.getId(), port);
+    server.start();
+  }
+
+  /**
+   * Stop the Ratis server.
+   */
+  public void stop() {
+    try {
+      server.close();
+      scmStateMachine.stop();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private boolean checkCachedPeerRoleIsLeader() {
+    this.roleCheckLock.readLock().lock();
+    try {
+      if (cachedPeerRole.isPresent() &&
+          cachedPeerRole.get() == RaftProtos.RaftPeerRole.LEADER) {
+        return true;
+      }
+      return false;
+    } finally {
+      this.roleCheckLock.readLock().unlock();
+    }
+  }
+
+  public boolean isLeader() {
+    if (checkCachedPeerRoleIsLeader()) {
+      return true;
+    }
+
+    // Get the server role from ratis server and update the cached values.
+    updateServerRole();
+
+    // After updating the server role, check and return if leader or not.
+    return checkCachedPeerRoleIsLeader();
+  }
+
+  @VisibleForTesting
+  public LifeCycle.State getServerState() {
+    return server.getLifeCycleState();
+  }
+
+  @VisibleForTesting
+  public RaftPeerId getRaftPeerId() {
+    return this.raftPeerId;
+  }
+
+  public RaftGroup getRaftGroup() {
+    return this.raftGroup;
+  }
+
+  /**
+   * Get the local directory where ratis logs will be stored.
+   */
+  public static String getSCMRatisDirectory(Configuration conf) {
+    String storageDir = conf.get(ScmConfigKeys.OZONE_SCM_RATIS_STORAGE_DIR);
+
+    if (Strings.isNullOrEmpty(storageDir)) {
+      storageDir = ServerUtils.getDefaultRatisDirectory(conf);
+    }
+    return storageDir;
+  }
+
+  public Optional<RaftPeerId> getCachedLeaderPeerId() {
+    this.roleCheckLock.readLock().lock();
+    try {
+      return cachedLeaderPeerId;
+    } finally {
+      this.roleCheckLock.readLock().unlock();
+    }
+  }
+
+  public int getServerPort() {
+    return port;
+  }
+
+  public void updateServerRole() {
+    try {
+      GroupInfoReply groupInfo = getGroupInfo();
+      RaftProtos.RoleInfoProto roleInfoProto = groupInfo.getRoleInfoProto();
+      RaftProtos.RaftPeerRole thisNodeRole = roleInfoProto.getRole();
+
+      if (thisNodeRole.equals(RaftProtos.RaftPeerRole.LEADER)) {
+        setServerRole(thisNodeRole, raftPeerId);
+
+      } else if (thisNodeRole.equals(RaftProtos.RaftPeerRole.FOLLOWER)) {
+        ByteString leaderNodeId = roleInfoProto.getFollowerInfo()
+            .getLeaderInfo().getId().getId();
+        // There may be a chance, here we get leaderNodeId as null. For
+        // example, in 3 node OM Ratis, if 2 SCM nodes are down, there will
+        // be no leader.
+        RaftPeerId leaderPeerId = null;
+        if (leaderNodeId != null && !leaderNodeId.isEmpty()) {
+          leaderPeerId = RaftPeerId.valueOf(leaderNodeId);
+        }
+
+        setServerRole(thisNodeRole, leaderPeerId);
+
+      } else {
+        setServerRole(thisNodeRole, null);
+
+      }
+    } catch (IOException e) {
+      LOG.error("Failed to retrieve RaftPeerRole. Setting cached role to " +
+          "{} and resetting leader info.",
+          RaftProtos.RaftPeerRole.UNRECOGNIZED, e);
+      setServerRole(null, null);
+    }
+  }
+
+  private GroupInfoReply getGroupInfo() throws IOException {
+    GroupInfoRequest groupInfoRequest = new GroupInfoRequest(clientId,
+        raftPeerId, raftGroupId, nextCallId());
+    GroupInfoReply groupInfo = server.getGroupInfo(groupInfoRequest);
+    return groupInfo;
+  }
+
+  private void setServerRole(RaftProtos.RaftPeerRole currentRole,
+                             RaftPeerId leaderPeerId) {
+    this.roleCheckLock.writeLock().lock();
+    try {
+      this.cachedPeerRole = Optional.ofNullable(currentRole);
+      this.cachedLeaderPeerId = Optional.ofNullable(leaderPeerId);
+    } finally {
+      this.roleCheckLock.writeLock().unlock();
+    }
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMStateMachine.java
new file mode 100644
index 0000000..502260a
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMStateMachine.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ratis;
+
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+
+/**
+ * Class for SCM StateMachine.
+ */
+public class SCMStateMachine extends BaseStateMachine {
+  //TODO to be implemented
+  public SCMStateMachine(SCMRatisServer ratisServer) {
+
+  }
+
+  public void stop() {
+    return;
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index a6cd5d4..6f978bd 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -30,6 +30,7 @@ import java.security.cert.X509Certificate;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Collections;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
@@ -42,6 +43,10 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
+import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
+import org.apache.hadoop.hdds.scm.ratis.SCMRatisServer;
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdds.scm.ScmConfig;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.block.BlockManager;
@@ -90,7 +95,6 @@ import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
 import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
-import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdds.utils.HddsVersionInfo;
 import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -189,6 +193,9 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
   private CertificateServer certificateServer;
   private GrpcTlsConfig grpcTlsConfig;
 
+  // SCM HA related
+  private SCMRatisServer scmRatisServer;
+
   private JvmPauseMonitor jvmPauseMonitor;
   private final OzoneConfiguration configuration;
   private SCMContainerMetrics scmContainerMetrics;
@@ -256,6 +263,12 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
       loginAsSCMUser(conf);
     }
 
+    if (SCMHAUtils.isSCMHAEnabled(conf)) {
+      initializeRatisServer();
+    } else {
+      scmRatisServer = null;
+    }
+
     // Creates the SCM DBs or opens them if it exists.
     // A valid pointer to the store is required by all the other services below.
     initalizeMetadataStore(conf, configurator);
@@ -1110,4 +1123,18 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
   public NetworkTopology getClusterMap() {
     return this.clusterMap;
   }
+
+  private void initializeRatisServer() throws IOException {
+    if (scmRatisServer == null) {
+      SCMNodeDetails scmNodeDetails = SCMNodeDetails
+          .initStandAlone(configuration);
+      //TODO enable Ratis ring
+      scmRatisServer = SCMRatisServer.newSCMRatisServer(configuration, this,
+          scmNodeDetails, Collections.EMPTY_LIST);
+      if (scmRatisServer != null) {
+        LOG.info("SCM Ratis server initialized at port {}",
+            scmRatisServer.getServerPort());
+      }
+    }
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ratis/TestSCMRatisServer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ratis/TestSCMRatisServer.java
new file mode 100644
index 0000000..f29fb5f
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ratis/TestSCMRatisServer.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ratis;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.util.LifeCycle;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+
+/**
+ * Test class for SCM Ratis Server.
+ */
+public class TestSCMRatisServer {
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  private OzoneConfiguration conf;
+  private SCMRatisServer scmRatisServer;
+  private StorageContainerManager scm;
+  private String scmId;
+  private SCMNodeDetails scmNodeDetails;
+  private static final long LEADER_ELECTION_TIMEOUT = 500L;
+
+  @Before
+  public void init() throws Exception {
+    conf = new OzoneConfiguration();
+    scmId = UUID.randomUUID().toString();
+    conf.setTimeDuration(
+        ScmConfigKeys.OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
+        LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+    int ratisPort = conf.getInt(
+        ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY,
+        ScmConfigKeys.OZONE_SCM_RATIS_PORT_DEFAULT);
+    InetSocketAddress rpcAddress = new InetSocketAddress(
+        InetAddress.getLocalHost(), 0);
+    scmNodeDetails = new SCMNodeDetails.Builder()
+        .setRatisPort(ratisPort)
+        .setRpcAddress(rpcAddress)
+        .setSCMNodeId(scmId)
+        .setSCMServiceId(OzoneConsts.SCM_SERVICE_ID_DEFAULT)
+        .build();
+
+    // Standalone SCM Ratis server
+    initSCM();
+    scm = HddsTestUtils.getScm(conf);
+    scm.start();
+    scmRatisServer = SCMRatisServer.newSCMRatisServer(
+        conf, scm, scmNodeDetails, Collections.EMPTY_LIST);
+    scmRatisServer.start();
+  }
+
+  @After
+  public void shutdown() {
+    if (scmRatisServer != null) {
+      scmRatisServer.stop();
+    }
+    if (scm != null) {
+      scm.stop();
+    }
+  }
+
+  @Test
+  public void testStartSCMRatisServer() throws Exception {
+    Assert.assertEquals("Ratis Server should be in running state",
+        LifeCycle.State.RUNNING, scmRatisServer.getServerState());
+  }
+
+  @Test
+  public void verifyRaftGroupIdGenerationWithCustomOmServiceId() throws
+      Exception {
+    String customScmServiceId = "scmIdCustom123";
+    OzoneConfiguration newConf = new OzoneConfiguration();
+    String newOmId = UUID.randomUUID().toString();
+    String path = GenericTestUtils.getTempPath(newOmId);
+    Path metaDirPath = Paths.get(path, "scm-meta");
+    newConf.set(HddsConfigKeys.OZONE_METADATA_DIRS, metaDirPath.toString());
+    newConf.setTimeDuration(
+        ScmConfigKeys.OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
+        LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+    int ratisPort = 9873;
+    InetSocketAddress rpcAddress = new InetSocketAddress(
+        InetAddress.getLocalHost(), 0);
+    SCMNodeDetails nodeDetails = new SCMNodeDetails.Builder()
+        .setRpcAddress(rpcAddress)
+        .setRatisPort(ratisPort)
+        .setSCMNodeId(newOmId)
+        .setSCMServiceId(customScmServiceId)
+        .build();
+    // Starts a single node Ratis server
+    scmRatisServer.stop();
+    SCMRatisServer newScmRatisServer = SCMRatisServer
+        .newSCMRatisServer(newConf, scm, nodeDetails,
+            Collections.emptyList());
+    newScmRatisServer.start();
+
+    UUID uuid = UUID.nameUUIDFromBytes(customScmServiceId.getBytes());
+    RaftGroupId raftGroupId = newScmRatisServer.getRaftGroup().getGroupId();
+    Assert.assertEquals(uuid, raftGroupId.getUuid());
+    Assert.assertEquals(raftGroupId.toByteString().size(), 16);
+    newScmRatisServer.stop();
+  }
+
+  private void initSCM() throws IOException {
+    String clusterId = UUID.randomUUID().toString();
+    scmId = UUID.randomUUID().toString();
+
+    final String path = folder.newFolder().toString();
+    Path scmPath = Paths.get(path, "scm-meta");
+    Files.createDirectories(scmPath);
+    conf.set(OZONE_METADATA_DIRS, scmPath.toString());
+    SCMStorageConfig scmStore = new SCMStorageConfig(conf);
+    scmStore.setClusterId(clusterId);
+    scmStore.setScmId(scmId);
+    // writes the version file properties
+    scmStore.initialize();
+  }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index b9ae815..41f68b6 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@ -54,6 +54,8 @@ public class TestOzoneConfigurationFields extends TestConfigurationFieldsBase {
     errorIfMissingXmlProps = true;
     xmlPropsToSkipCompare.add("hadoop.tags.custom");
     xmlPropsToSkipCompare.add("ozone.om.nodes.EXAMPLEOMSERVICEID");
+    xmlPropsToSkipCompare.add("ozone.scm.nodes.EXAMPLESCMSERVICEID");
+    xmlPrefixToSkipCompare.add("ipc.client.rpc-timeout.ms");
     addPropertiesNotInXml();
   }
 
@@ -65,6 +67,7 @@ public class TestOzoneConfigurationFields extends TestConfigurationFieldsBase {
         HddsConfigKeys.HDDS_SECURITY_PROVIDER,
         HddsConfigKeys.HDDS_X509_CRL_NAME, // HDDS-2873
         OMConfigKeys.OZONE_OM_NODES_KEY,
+        ScmConfigKeys.OZONE_SCM_NODES_KEY,
         OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE,
         OzoneConfigKeys.OZONE_S3_AUTHINFO_MAX_LIFETIME_KEY,
         ReconServerConfigKeys.OZONE_RECON_SCM_DB_DIR


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org