You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2021/02/26 07:58:01 UTC

[GitHub] [ozone] bharatviswa504 commented on a change in pull request #1953: HDDS-4718. Bootstrap new SCM node

bharatviswa504 commented on a change in pull request #1953:
URL: https://github.com/apache/ozone/pull/1953#discussion_r582742292



##########
File path: hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
##########
@@ -379,12 +379,18 @@
   // SCM Ratis related
   public static final String OZONE_SCM_HA_ENABLE_KEY
       = "ozone.scm.ratis.enable";
+  // setting this value to true, as right now, this switch is not accounted
+  // for spinning up SCM instances with/without SCM HA.

Review comment:
       The comment says it is set to true, code still has false.

##########
File path: hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
##########
@@ -131,4 +134,24 @@ public static String getScmServiceId(ConfigurationSource conf) {
     }
     return localScmServiceId;
   }
+
+  public static OzoneConfiguration removeSelfId(

Review comment:
       Nit: JavaDoc missing

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
##########
@@ -69,6 +74,24 @@ public SCMHAManagerImpl(final ConfigurationSource conf,
   @Override
   public void start() throws IOException {
     ratisServer.start();
+    if (SCMHAUtils.isSCMHAEnabled(conf) && ratisServer.getDivision().getGroup()

Review comment:
       During init this check is removed, here added and the default is still false.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
##########
@@ -63,69 +61,120 @@
   private static final Logger LOG =
       LoggerFactory.getLogger(SCMRatisServerImpl.class);
 
-  private final RaftServer.Division division;
+  private final RaftServer server;
+  private final SCMStateMachine stateMachine;
   private final StorageContainerManager scm;
-  private final InetSocketAddress address;
   private final ClientId clientId = ClientId.randomId();
   private final AtomicLong callId = new AtomicLong();
 
   // TODO: Refactor and remove ConfigurationSource and use only
   //  SCMHAConfiguration.
-  SCMRatisServerImpl(final SCMHAConfiguration haConf,
-      final ConfigurationSource conf, final StorageContainerManager scm,
-      final DBTransactionBuffer buffer) throws IOException {
+  SCMRatisServerImpl(final ConfigurationSource conf,
+      final StorageContainerManager scm, final DBTransactionBuffer buffer)
+      throws IOException {
     this.scm = scm;
-    this.address = haConf.getRatisBindAddress();
-    RaftServer server = newRaftServer(scm.getClusterId(), scm.getScmId(),
-        scm.getSCMHANodeDetails(), conf)
-        .setStateMachine(new SCMStateMachine(scm, this, buffer)).build();
+    this.stateMachine = new SCMStateMachine(scm, this, buffer);
+    final RaftGroupId groupId = buildRaftGroupId(scm.getClusterId());
+    LOG.info("starting Raft server for scm:{}", scm.getScmId());
+    // During SCM startup, the bootstrapped node will be started just with
+    // groupId information, so that it won't trigger any leader election
+    // as it doesn't have any peer info.
+
+    // The primary SCM node which is initialized using scm --init command,
+    // will initialize the raft server with the peer info and it will be
+    // persisted in the raft log post leader election. Now, when the primary
+    // scm boots up, it has peer info embedded in the raft log and will
+    // trigger leader election.
+    this.server =
+        newRaftServer(scm.getScmId(), conf).setStateMachine(stateMachine)
+            .setGroup(RaftGroup.valueOf(groupId)).build();
+  }
+
+  public static void initialize(String clusterId, String scmId,
+      SCMNodeDetails details, OzoneConfiguration conf) throws IOException {
+    final RaftGroup group = buildRaftGroup(details, scmId, clusterId);
+    RaftServer server = newRaftServer(scmId, conf)
+        .setGroup(group).build();
+    server.start();
+    waitforLeaderToBeReady(server, 60000, group);
+    server.close();
+  }
+
+  private static void waitforLeaderToBeReady(RaftServer server, long timeout,
+      RaftGroup group)
+      throws IOException {
+    boolean ready;
+    long st = Time.monotonicNow();
+    do {
+      ready = server.getDivision(group.getGroupId()).getInfo().isLeaderReady();
+      if (!ready) {
+        try {
+          Thread.sleep(2000);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    } while (!ready && Time.monotonicNow() - st < timeout);
 
-    this.division =
-        server.getDivision(server.getGroups().iterator().next().getGroupId());
+    if (!ready) {
+      throw new IOException(String
+          .format("Ratis group %s is not ready in %d ms", group.getGroupId(),
+              timeout));
+    }
   }
 
-  public static RaftServer.Builder newRaftServer(final String clusterId,
-      final String scmId, final SCMHANodeDetails haDetails,
-      final ConfigurationSource conf) throws IOException {
-    final String scmNodeId = haDetails.getLocalNodeDetails().getNodeId();
+  private static RaftServer.Builder newRaftServer(final String scmId,
+      final ConfigurationSource conf) {
     final SCMHAConfiguration haConf = conf.getObject(SCMHAConfiguration.class);
-    SCMHAGroupBuilder haGrpBuilder = scmNodeId != null ?
-        new SCMHAGroupBuilder(haDetails, clusterId, scmId) :
-        new SCMHAGroupBuilder(haConf, conf, clusterId);
     final RaftProperties serverProperties =
         RatisUtil.newRaftProperties(haConf, conf);
-    return RaftServer.newBuilder().setServerId(haGrpBuilder.getPeerId())
-        .setGroup(haGrpBuilder.getRaftGroup()).setProperties(serverProperties)
-        .setStateMachine(new SCMStateMachine());
+    return RaftServer.newBuilder().setServerId(RaftPeerId.getRaftPeerId(scmId))
+        .setProperties(serverProperties)
+        .setStateMachine(new SCMStateMachine(false));
   }
 
   @Override
   public void start() throws IOException {
-    division.getRaftServer().start();
+    server.start();
+  }
+
+  public RaftServer.Division getDivision() {
+    try {
+      return server

Review comment:
       When failed throw exception instead of returning null?

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java
##########
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.scm;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.OzoneKey;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.junit.Rule;
+
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
+import static org.apache.hadoop.hdds.client.ReplicationType.STAND_ALONE;
+
+/**
+ * Base class for Ozone Manager HA tests.
+ */
+public class TestStorageContainerManagerHA {
+
+  private MiniOzoneHAClusterImpl cluster = null;
+  private OzoneConfiguration conf;
+  private String clusterId;
+  private String scmId;
+  private String omServiceId;
+  private static int numOfOMs = 3;
+  private String scmServiceId;
+  // TODO: 3 servers will not work until SCM HA configs are considered in
+  // SCM HA Ratis server init
+  private static int numOfSCMs = 3;
+
+
+  @Rule
+  public Timeout timeout = new Timeout(300_000);
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true
+   *
+   * @throws IOException
+   */
+  @Before
+  public void init() throws Exception {
+    conf = new OzoneConfiguration();
+    clusterId = UUID.randomUUID().toString();
+    scmId = UUID.randomUUID().toString();
+    omServiceId = "om-service-test1";
+    scmServiceId = "scm-service-test1";
+    cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
+        .setClusterId(clusterId)
+        .setScmId(scmId)
+        .setOMServiceId(omServiceId)
+        .setSCMServiceId(scmServiceId)
+        .setNumOfStorageContainerManagers(numOfSCMs)
+        .setNumOfOzoneManagers(numOfOMs)
+        .build();
+    cluster.waitForClusterToBeReady();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @After
+  public void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testAllSCMAreRunning() throws Exception {
+    int count = 0;
+    List<StorageContainerManager> scms = cluster.getStorageContainerManagers();
+    Assert.assertEquals(numOfSCMs, scms.size());
+    int peerSize = cluster.getStorageContainerManager().getScmHAManager()
+        .getRatisServer().getDivision().getGroup().getPeers().size();
+    for (StorageContainerManager scm : scms) {
+      if (scm.checkLeader()) {
+        count++;
+      }
+      Assert.assertTrue(peerSize == numOfSCMs);
+    }
+    Assert.assertEquals(1, count);
+    count = 0;
+    List<OzoneManager> oms = cluster.getOzoneManagersList();
+    Assert.assertEquals(numOfOMs, oms.size());
+    for (OzoneManager om : oms) {
+      if (om.isLeaderReady()) {
+        count++;
+      }
+    }
+    Assert.assertEquals(1, count);
+    testPutKey();
+  }
+
+  public void testPutKey() throws IOException {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    Instant testStartTime = Instant.now();
+    ObjectStore store =
+        OzoneClientFactory.getRpcClient(cluster.getConf()).getObjectStore();
+    String value = "sample value";
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    String keyName = UUID.randomUUID().toString();
+
+    OzoneOutputStream out = bucket
+        .createKey(keyName, value.getBytes(UTF_8).length, STAND_ALONE, ONE,
+            new HashMap<>());
+    out.write(value.getBytes(UTF_8));
+    out.close();
+    OzoneKey key = bucket.getKey(keyName);
+    Assert.assertEquals(keyName, key.getName());
+    OzoneInputStream is = bucket.readKey(keyName);
+    byte[] fileContent = new byte[value.getBytes(UTF_8).length];
+    is.read(fileContent);
+    Assert.assertEquals(value, new String(fileContent, UTF_8));
+    Assert.assertFalse(key.getCreationTime().isBefore(testStartTime));
+    Assert.assertFalse(key.getModificationTime().isBefore(testStartTime));
+    is.close();
+    final OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+        .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
+        .setRefreshPipeline(true).build();
+    final OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+    final List<OmKeyLocationInfo> keyLocationInfos =
+        keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
+    final long containerID = keyLocationInfos.get(0).getContainerID();
+    for (int k = 0; k < numOfSCMs; k++) {
+      StorageContainerManager scm =
+          cluster.getStorageContainerManagers().get(k);
+      // flush to DB on each SCM

Review comment:
       Can we check if with out flush also, it is available on all SCMS.
   
   And also for this test, do you think we need to make sure first that each SCM catch up with transaction and then do checks. (As when request is processed, only leader applies to StateMachine)

##########
File path: hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
##########
@@ -53,6 +55,23 @@ public static ScmInfo getScmInfo(OzoneConfiguration conf)
     }
   }
 
+  public static boolean addSCM(OzoneConfiguration conf, AddSCMRequest request)
+      throws IOException {
+    OzoneConfiguration config = SCMHAUtils.removeSelfId(conf);
+    try {
+      RetryPolicy retryPolicy = retryUpToMaximumCountWithFixedSleep(
+          10, 5, TimeUnit.SECONDS);
+      RetriableTask<Boolean> retriable = new RetriableTask<>(
+          retryPolicy, "addSCM",
+          () -> getScmBlockClient(config).addSCM(request));
+      return retriable.call();
+    } catch (IOException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new IOException("Failed to get SCM info", e);

Review comment:
       Minor: Failed to add SCM

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
##########
@@ -63,69 +61,120 @@
   private static final Logger LOG =
       LoggerFactory.getLogger(SCMRatisServerImpl.class);
 
-  private final RaftServer.Division division;
+  private final RaftServer server;
+  private final SCMStateMachine stateMachine;
   private final StorageContainerManager scm;
-  private final InetSocketAddress address;
   private final ClientId clientId = ClientId.randomId();
   private final AtomicLong callId = new AtomicLong();
 
   // TODO: Refactor and remove ConfigurationSource and use only
   //  SCMHAConfiguration.
-  SCMRatisServerImpl(final SCMHAConfiguration haConf,
-      final ConfigurationSource conf, final StorageContainerManager scm,
-      final DBTransactionBuffer buffer) throws IOException {
+  SCMRatisServerImpl(final ConfigurationSource conf,
+      final StorageContainerManager scm, final DBTransactionBuffer buffer)
+      throws IOException {
     this.scm = scm;
-    this.address = haConf.getRatisBindAddress();
-    RaftServer server = newRaftServer(scm.getClusterId(), scm.getScmId(),
-        scm.getSCMHANodeDetails(), conf)
-        .setStateMachine(new SCMStateMachine(scm, this, buffer)).build();
+    this.stateMachine = new SCMStateMachine(scm, this, buffer);
+    final RaftGroupId groupId = buildRaftGroupId(scm.getClusterId());
+    LOG.info("starting Raft server for scm:{}", scm.getScmId());
+    // During SCM startup, the bootstrapped node will be started just with
+    // groupId information, so that it won't trigger any leader election
+    // as it doesn't have any peer info.
+
+    // The primary SCM node which is initialized using scm --init command,
+    // will initialize the raft server with the peer info and it will be
+    // persisted in the raft log post leader election. Now, when the primary
+    // scm boots up, it has peer info embedded in the raft log and will
+    // trigger leader election.
+    this.server =
+        newRaftServer(scm.getScmId(), conf).setStateMachine(stateMachine)
+            .setGroup(RaftGroup.valueOf(groupId)).build();
+  }
+
+  public static void initialize(String clusterId, String scmId,
+      SCMNodeDetails details, OzoneConfiguration conf) throws IOException {
+    final RaftGroup group = buildRaftGroup(details, scmId, clusterId);
+    RaftServer server = newRaftServer(scmId, conf)
+        .setGroup(group).build();
+    server.start();
+    waitforLeaderToBeReady(server, 60000, group);

Review comment:
       60000 hardcoded value, can we make this config?

##########
File path: hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
##########
@@ -22,14 +22,17 @@
 import com.google.common.base.Strings;
 import org.apache.hadoop.hdds.conf.ConfigurationException;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.ozone.ha.ConfUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.nio.file.Paths;
-import java.util.Collection;
+import java.util.*;

Review comment:
       Minor: Expand * import.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
##########
@@ -291,6 +292,35 @@ public ScmInfo getScmInfo() throws IOException {
     }
   }
 
+  @Override
+  public boolean addSCM(AddSCMRequest request) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding SCM {} addr {} with cluster id {}, with {}",

Review comment:
       Only 3 parameters, we have 4 {}

##########
File path: hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
##########
@@ -765,12 +765,18 @@ public void addGroup(HddsProtos.PipelineID pipelineId,
         clientId, server.getId(), nextCallId(), group);
 
     RaftClientReply reply;
+    if (LOG.isDebugEnabled()) {

Review comment:
       We donot require isDebugEnabled
   http://www.slf4j.org/faq.html#logging_performance
   
   
   ```
   Better yet, use parameterized messages
   
   There exists a very convenient alternative based on message formats. Assuming entry is an object, you can write:
   
   Object entry = new SomeObject();
   logger.debug("The entry is {}.", entry);
   After evaluating whether to log or not, and only if the decision is affirmative, will the logger implementation format the message and replace the '{}' pair with the string value of entry. In other words, this form does not incur the cost of parameter construction in case the log statement is disabled.
   
   ```

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
##########
@@ -291,6 +292,35 @@ public ScmInfo getScmInfo() throws IOException {
     }
   }
 
+  @Override
+  public boolean addSCM(AddSCMRequest request) throws IOException {
+    if (LOG.isDebugEnabled()) {

Review comment:
       Minor: We don't need isDebugEnabled()

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
##########
@@ -1169,11 +1168,12 @@ public ReplicationManager getReplicationManager() {
   }
 
   /**
-   * Check if the current scm is the leader.
-   * @return - if the current scm is the leader.
+   * Check if the current scm is the leader and ready for accepting requests.
+   * @return - if the current scm is the leader and is ready.
    */
   public boolean checkLeader() {
-    return scmContext.isLeader();
+    return scmContext.isLeader() && getScmHAManager().getRatisServer()

Review comment:
       Question Do we need this && check here. Ratis is source of truth, so  getScmHAManager().getRatisServer().getDivision().getInfo().isLeaderReady() will suffice here?

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
##########
@@ -156,16 +200,52 @@ public void stop() throws IOException {
   @Override
   public NotLeaderException triggerNotLeaderException() {
     return new NotLeaderException(
-        division.getMemberId(), null, division.getGroup().getPeers());
+        getDivision().getMemberId(), null, getDivision().getGroup().getPeers());
+  }
+
+  @Override
+  public boolean addSCM(AddSCMRequest request) throws IOException {
+    List<RaftPeer> newRaftPeerList =
+        new ArrayList<>(getDivision().getGroup().getPeers());
+    // add the SCM node to be added to the raft peer list
+
+    RaftPeer raftPeer = RaftPeer.newBuilder().setId(request.getScmId())
+        .setAddress(request.getRatisAddr()).build();
+    newRaftPeerList.add(raftPeer);
+
+    LOG.debug("{}: Submitting SetConfiguration request to Ratis server with" +

Review comment:
       Can we make this a info log

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
##########
@@ -708,14 +708,14 @@ public static boolean scmBootstrap(OzoneConfiguration conf)
     }
     // The node here will try to fetch the cluster id from any of existing
     // running SCM instances.
-    // TODO: need to avoid failover to local SCM Node here
-    final ScmInfo scmInfo = HAUtils.getScmInfo(conf);

Review comment:
       Here we need to call SCMHANodeDetails.loadSCMHAConfig(conf);
   Which will set node specific config key and then call getLocalNodeDetails to find localNodeId.
   Then this localNodeId can be removed.
   
   

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
##########
@@ -689,9 +689,9 @@ public static String buildRpcServerStartMessage(String description,
   }
 
   /**
-   * Routine to bootstrap the StorageContainerManager. Thsi will connect to a
+   * Routine to bootstrap the StorageContainerManager. Thiw will connect to a

Review comment:
       Minor: Thiw -> This

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
##########
@@ -291,6 +292,35 @@ public ScmInfo getScmInfo() throws IOException {
     }
   }
 
+  @Override
+  public boolean addSCM(AddSCMRequest request) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding SCM {} addr {} with cluster id {}, with {}",
+          request.getScmId(), request.getRatisAddr(), request.getRatisAddr());
+    }
+
+    Map<String, String> auditMap = Maps.newHashMap();
+    auditMap.put("scmId", String.valueOf(request.getScmId()));
+    auditMap.put("cluster", String.valueOf(request.getClusterId()));
+    auditMap.put("addr", String.valueOf(request.getRatisAddr()));
+    boolean auditSuccess = true;
+    try{
+      return scm.getScmHAManager().addSCM(request);
+    } catch (Exception ex) {
+      auditSuccess = false;
+      AUDIT.logReadFailure(
+          buildAuditMessageForFailure(SCMAction.ADD_SCM, auditMap, ex)
+      );
+      throw ex;
+    } finally {
+      if(auditSuccess) {
+        AUDIT.logReadSuccess(
+            buildAuditMessageForSuccess(SCMAction.ADD_SCM, null)

Review comment:
       null -> auditMap

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
##########
@@ -63,69 +61,120 @@
   private static final Logger LOG =
       LoggerFactory.getLogger(SCMRatisServerImpl.class);
 
-  private final RaftServer.Division division;
+  private final RaftServer server;
+  private final SCMStateMachine stateMachine;
   private final StorageContainerManager scm;
-  private final InetSocketAddress address;
   private final ClientId clientId = ClientId.randomId();
   private final AtomicLong callId = new AtomicLong();
 
   // TODO: Refactor and remove ConfigurationSource and use only
   //  SCMHAConfiguration.
-  SCMRatisServerImpl(final SCMHAConfiguration haConf,
-      final ConfigurationSource conf, final StorageContainerManager scm,
-      final DBTransactionBuffer buffer) throws IOException {
+  SCMRatisServerImpl(final ConfigurationSource conf,
+      final StorageContainerManager scm, final DBTransactionBuffer buffer)
+      throws IOException {
     this.scm = scm;
-    this.address = haConf.getRatisBindAddress();
-    RaftServer server = newRaftServer(scm.getClusterId(), scm.getScmId(),
-        scm.getSCMHANodeDetails(), conf)
-        .setStateMachine(new SCMStateMachine(scm, this, buffer)).build();
+    this.stateMachine = new SCMStateMachine(scm, this, buffer);
+    final RaftGroupId groupId = buildRaftGroupId(scm.getClusterId());
+    LOG.info("starting Raft server for scm:{}", scm.getScmId());
+    // During SCM startup, the bootstrapped node will be started just with
+    // groupId information, so that it won't trigger any leader election
+    // as it doesn't have any peer info.
+
+    // The primary SCM node which is initialized using scm --init command,
+    // will initialize the raft server with the peer info and it will be
+    // persisted in the raft log post leader election. Now, when the primary
+    // scm boots up, it has peer info embedded in the raft log and will
+    // trigger leader election.
+    this.server =
+        newRaftServer(scm.getScmId(), conf).setStateMachine(stateMachine)
+            .setGroup(RaftGroup.valueOf(groupId)).build();
+  }
+
+  public static void initialize(String clusterId, String scmId,
+      SCMNodeDetails details, OzoneConfiguration conf) throws IOException {
+    final RaftGroup group = buildRaftGroup(details, scmId, clusterId);
+    RaftServer server = newRaftServer(scmId, conf)
+        .setGroup(group).build();
+    server.start();
+    waitforLeaderToBeReady(server, 60000, group);
+    server.close();
+  }
+
+  private static void waitforLeaderToBeReady(RaftServer server, long timeout,
+      RaftGroup group)
+      throws IOException {
+    boolean ready;
+    long st = Time.monotonicNow();
+    do {
+      ready = server.getDivision(group.getGroupId()).getInfo().isLeaderReady();
+      if (!ready) {
+        try {
+          Thread.sleep(2000);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    } while (!ready && Time.monotonicNow() - st < timeout);
 
-    this.division =
-        server.getDivision(server.getGroups().iterator().next().getGroupId());
+    if (!ready) {
+      throw new IOException(String
+          .format("Ratis group %s is not ready in %d ms", group.getGroupId(),
+              timeout));
+    }
   }
 
-  public static RaftServer.Builder newRaftServer(final String clusterId,
-      final String scmId, final SCMHANodeDetails haDetails,
-      final ConfigurationSource conf) throws IOException {
-    final String scmNodeId = haDetails.getLocalNodeDetails().getNodeId();
+  private static RaftServer.Builder newRaftServer(final String scmId,
+      final ConfigurationSource conf) {
     final SCMHAConfiguration haConf = conf.getObject(SCMHAConfiguration.class);
-    SCMHAGroupBuilder haGrpBuilder = scmNodeId != null ?
-        new SCMHAGroupBuilder(haDetails, clusterId, scmId) :
-        new SCMHAGroupBuilder(haConf, conf, clusterId);
     final RaftProperties serverProperties =
         RatisUtil.newRaftProperties(haConf, conf);
-    return RaftServer.newBuilder().setServerId(haGrpBuilder.getPeerId())
-        .setGroup(haGrpBuilder.getRaftGroup()).setProperties(serverProperties)
-        .setStateMachine(new SCMStateMachine());
+    return RaftServer.newBuilder().setServerId(RaftPeerId.getRaftPeerId(scmId))
+        .setProperties(serverProperties)
+        .setStateMachine(new SCMStateMachine(false));
   }
 
   @Override
   public void start() throws IOException {
-    division.getRaftServer().start();
+    server.start();
+  }
+
+  public RaftServer.Division getDivision() {
+    try {
+      return server
+          .getDivision(server.getGroups().iterator().next().getGroupId());
+    } catch (Exception e) {
+      LOG.warn("Failed to get RaftServerDivision", e);
+      return null;
+    }
+  }
+
+  @VisibleForTesting
+  public SCMStateMachine getStateMachine() {
+    return stateMachine;
   }
 
   @Override
   public void registerStateMachineHandler(final RequestType handlerType,
                                           final Object handler) {
-    ((SCMStateMachine) division.getStateMachine())
-        .registerHandler(handlerType, handler);
+    stateMachine.registerHandler(handlerType, handler);
   }
 
   @Override
   public SCMRatisResponse submitRequest(SCMRatisRequest request)
       throws IOException, ExecutionException, InterruptedException {
     final RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
         .setClientId(clientId)

Review comment:
       Not related to this, but ClientID should be set with Server.getClientID() so that when requests retried, we know it is a retry request. With this after SCM restart, all old requests will come with new ClientID

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
##########
@@ -63,69 +61,120 @@
   private static final Logger LOG =
       LoggerFactory.getLogger(SCMRatisServerImpl.class);
 
-  private final RaftServer.Division division;
+  private final RaftServer server;
+  private final SCMStateMachine stateMachine;
   private final StorageContainerManager scm;
-  private final InetSocketAddress address;
   private final ClientId clientId = ClientId.randomId();
   private final AtomicLong callId = new AtomicLong();
 
   // TODO: Refactor and remove ConfigurationSource and use only
   //  SCMHAConfiguration.
-  SCMRatisServerImpl(final SCMHAConfiguration haConf,
-      final ConfigurationSource conf, final StorageContainerManager scm,
-      final DBTransactionBuffer buffer) throws IOException {
+  SCMRatisServerImpl(final ConfigurationSource conf,
+      final StorageContainerManager scm, final DBTransactionBuffer buffer)
+      throws IOException {
     this.scm = scm;
-    this.address = haConf.getRatisBindAddress();
-    RaftServer server = newRaftServer(scm.getClusterId(), scm.getScmId(),
-        scm.getSCMHANodeDetails(), conf)
-        .setStateMachine(new SCMStateMachine(scm, this, buffer)).build();
+    this.stateMachine = new SCMStateMachine(scm, this, buffer);
+    final RaftGroupId groupId = buildRaftGroupId(scm.getClusterId());
+    LOG.info("starting Raft server for scm:{}", scm.getScmId());
+    // During SCM startup, the bootstrapped node will be started just with
+    // groupId information, so that it won't trigger any leader election
+    // as it doesn't have any peer info.
+
+    // The primary SCM node which is initialized using scm --init command,
+    // will initialize the raft server with the peer info and it will be
+    // persisted in the raft log post leader election. Now, when the primary
+    // scm boots up, it has peer info embedded in the raft log and will
+    // trigger leader election.
+    this.server =
+        newRaftServer(scm.getScmId(), conf).setStateMachine(stateMachine)
+            .setGroup(RaftGroup.valueOf(groupId)).build();
+  }
+
+  public static void initialize(String clusterId, String scmId,
+      SCMNodeDetails details, OzoneConfiguration conf) throws IOException {
+    final RaftGroup group = buildRaftGroup(details, scmId, clusterId);
+    RaftServer server = newRaftServer(scmId, conf)
+        .setGroup(group).build();
+    server.start();
+    waitforLeaderToBeReady(server, 60000, group);
+    server.close();
+  }
+
+  private static void waitforLeaderToBeReady(RaftServer server, long timeout,
+      RaftGroup group)
+      throws IOException {
+    boolean ready;
+    long st = Time.monotonicNow();
+    do {
+      ready = server.getDivision(group.getGroupId()).getInfo().isLeaderReady();
+      if (!ready) {
+        try {
+          Thread.sleep(2000);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    } while (!ready && Time.monotonicNow() - st < timeout);
 
-    this.division =
-        server.getDivision(server.getGroups().iterator().next().getGroupId());
+    if (!ready) {
+      throw new IOException(String
+          .format("Ratis group %s is not ready in %d ms", group.getGroupId(),
+              timeout));
+    }
   }
 
-  public static RaftServer.Builder newRaftServer(final String clusterId,
-      final String scmId, final SCMHANodeDetails haDetails,
-      final ConfigurationSource conf) throws IOException {
-    final String scmNodeId = haDetails.getLocalNodeDetails().getNodeId();
+  private static RaftServer.Builder newRaftServer(final String scmId,
+      final ConfigurationSource conf) {
     final SCMHAConfiguration haConf = conf.getObject(SCMHAConfiguration.class);
-    SCMHAGroupBuilder haGrpBuilder = scmNodeId != null ?
-        new SCMHAGroupBuilder(haDetails, clusterId, scmId) :
-        new SCMHAGroupBuilder(haConf, conf, clusterId);
     final RaftProperties serverProperties =
         RatisUtil.newRaftProperties(haConf, conf);
-    return RaftServer.newBuilder().setServerId(haGrpBuilder.getPeerId())
-        .setGroup(haGrpBuilder.getRaftGroup()).setProperties(serverProperties)
-        .setStateMachine(new SCMStateMachine());
+    return RaftServer.newBuilder().setServerId(RaftPeerId.getRaftPeerId(scmId))
+        .setProperties(serverProperties)
+        .setStateMachine(new SCMStateMachine(false));
   }
 
   @Override
   public void start() throws IOException {
-    division.getRaftServer().start();
+    server.start();
+  }
+
+  public RaftServer.Division getDivision() {
+    try {
+      return server
+          .getDivision(server.getGroups().iterator().next().getGroupId());
+    } catch (Exception e) {
+      LOG.warn("Failed to get RaftServerDivision", e);
+      return null;
+    }
+  }
+
+  @VisibleForTesting
+  public SCMStateMachine getStateMachine() {
+    return stateMachine;
   }
 
   @Override
   public void registerStateMachineHandler(final RequestType handlerType,
                                           final Object handler) {
-    ((SCMStateMachine) division.getStateMachine())
-        .registerHandler(handlerType, handler);
+    stateMachine.registerHandler(handlerType, handler);
   }
 
   @Override
   public SCMRatisResponse submitRequest(SCMRatisRequest request)
       throws IOException, ExecutionException, InterruptedException {
     final RaftClientRequest raftClientRequest = RaftClientRequest.newBuilder()
         .setClientId(clientId)
-        .setServerId(division.getId())
-        .setGroupId(division.getGroup().getGroupId())
+        .setServerId(getDivision().getId())

Review comment:
       Can we store this locally instead of every time fetching from Ratis APIS?

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
##########
@@ -179,25 +193,79 @@ public long takeSnapshot() throws IOException {
     long startTime = Time.monotonicNow();
     TermIndex lastTermIndex = getLastAppliedTermIndex();
     long lastAppliedIndex = lastTermIndex.getIndex();
-    TransactionInfo lastAppliedTrxInfo =
-        TransactionInfo.fromTermIndex(lastTermIndex);
-    if (transactionBuffer.getLatestTrxInfo()
-        .compareTo(lastAppliedTrxInfo) < 0) {
+    if (isInitialized.get()) {
+      TransactionInfo lastAppliedTrxInfo =
+          TransactionInfo.fromTermIndex(lastTermIndex);
+      if (transactionBuffer.getLatestTrxInfo().compareTo(lastAppliedTrxInfo)
+          < 0) {
+        transactionBuffer.updateLatestTrxInfo(
+            TransactionInfo.builder().setCurrentTerm(lastTermIndex.getTerm())
+                .setTransactionIndex(lastTermIndex.getIndex()).build());
+        transactionBuffer.setLatestSnapshot(
+            transactionBuffer.getLatestTrxInfo().toSnapshotInfo());
+      } else {
+        lastAppliedIndex =
+            transactionBuffer.getLatestTrxInfo().getTransactionIndex();
+      }
+
+      transactionBuffer.flush();
+      LOG.info("Current Snapshot Index {}, takeSnapshot took {} ms",
+          lastAppliedIndex, Time.monotonicNow() - startTime);
+    }
+    super.takeSnapshot();

Review comment:
       Do we need this call?

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
##########
@@ -326,6 +329,10 @@ private void initSCM() throws IOException {
     scmStore.setScmId(scmId);
     // writes the version file properties
     scmStore.initialize();
+    if (!SCMHAUtils.isSCMHAEnabled(conf)) {

Review comment:
       Question: Why do we need this step here?

##########
File path: hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java
##########
@@ -77,6 +78,11 @@
    */
   ScmInfo getScmInfo() throws IOException;
 
+  /**
+   * Request to add SCM instance to HA group.
+   */
+  boolean addSCM(AddSCMRequest request) throws IOException;

Review comment:
       I also believe this is not the correct place for addSCM, as StorageContainerLocation is used for all admin ops, that might be good place. We need to conclude moving this to StorageContainerLocationProtocol before merge.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
##########
@@ -179,25 +193,79 @@ public long takeSnapshot() throws IOException {
     long startTime = Time.monotonicNow();
     TermIndex lastTermIndex = getLastAppliedTermIndex();
     long lastAppliedIndex = lastTermIndex.getIndex();
-    TransactionInfo lastAppliedTrxInfo =
-        TransactionInfo.fromTermIndex(lastTermIndex);
-    if (transactionBuffer.getLatestTrxInfo()
-        .compareTo(lastAppliedTrxInfo) < 0) {
+    if (isInitialized.get()) {
+      TransactionInfo lastAppliedTrxInfo =
+          TransactionInfo.fromTermIndex(lastTermIndex);
+      if (transactionBuffer.getLatestTrxInfo().compareTo(lastAppliedTrxInfo)
+          < 0) {
+        transactionBuffer.updateLatestTrxInfo(
+            TransactionInfo.builder().setCurrentTerm(lastTermIndex.getTerm())
+                .setTransactionIndex(lastTermIndex.getIndex()).build());
+        transactionBuffer.setLatestSnapshot(
+            transactionBuffer.getLatestTrxInfo().toSnapshotInfo());
+      } else {
+        lastAppliedIndex =
+            transactionBuffer.getLatestTrxInfo().getTransactionIndex();
+      }
+
+      transactionBuffer.flush();
+      LOG.info("Current Snapshot Index {}, takeSnapshot took {} ms",
+          lastAppliedIndex, Time.monotonicNow() - startTime);
+    }
+    super.takeSnapshot();
+    return lastAppliedIndex;
+  }
+
+  @Override
+  public void initialize(
+      RaftServer server, RaftGroupId id, RaftStorage raftStorage)
+      throws IOException {
+    super.initialize(server, id, raftStorage);
+    storage.init(raftStorage);
+    loadSnapshot(storage.getLatestSnapshot());
+  }
+
+  private long loadSnapshot(SingleFileSnapshotInfo snapshot)
+      throws IOException {
+    if (snapshot == null) {

Review comment:
       Here load snapshot should be loaded from TransactionInfo table?

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java
##########
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.scm;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.OzoneKey;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.junit.Rule;
+
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
+import static org.apache.hadoop.hdds.client.ReplicationType.STAND_ALONE;
+
+/**
+ * Base class for Ozone Manager HA tests.
+ */
+public class TestStorageContainerManagerHA {
+
+  private MiniOzoneHAClusterImpl cluster = null;
+  private OzoneConfiguration conf;
+  private String clusterId;
+  private String scmId;
+  private String omServiceId;
+  private static int numOfOMs = 3;
+  private String scmServiceId;
+  // TODO: 3 servers will not work until SCM HA configs are considered in
+  // SCM HA Ratis server init
+  private static int numOfSCMs = 3;
+
+
+  @Rule
+  public Timeout timeout = new Timeout(300_000);
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true
+   *
+   * @throws IOException
+   */
+  @Before
+  public void init() throws Exception {
+    conf = new OzoneConfiguration();
+    clusterId = UUID.randomUUID().toString();
+    scmId = UUID.randomUUID().toString();
+    omServiceId = "om-service-test1";
+    scmServiceId = "scm-service-test1";
+    cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
+        .setClusterId(clusterId)
+        .setScmId(scmId)
+        .setOMServiceId(omServiceId)
+        .setSCMServiceId(scmServiceId)
+        .setNumOfStorageContainerManagers(numOfSCMs)
+        .setNumOfOzoneManagers(numOfOMs)
+        .build();
+    cluster.waitForClusterToBeReady();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @After
+  public void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testAllSCMAreRunning() throws Exception {
+    int count = 0;
+    List<StorageContainerManager> scms = cluster.getStorageContainerManagers();
+    Assert.assertEquals(numOfSCMs, scms.size());
+    int peerSize = cluster.getStorageContainerManager().getScmHAManager()
+        .getRatisServer().getDivision().getGroup().getPeers().size();
+    for (StorageContainerManager scm : scms) {
+      if (scm.checkLeader()) {
+        count++;
+      }
+      Assert.assertTrue(peerSize == numOfSCMs);
+    }
+    Assert.assertEquals(1, count);
+    count = 0;
+    List<OzoneManager> oms = cluster.getOzoneManagersList();
+    Assert.assertEquals(numOfOMs, oms.size());
+    for (OzoneManager om : oms) {
+      if (om.isLeaderReady()) {
+        count++;
+      }
+    }
+    Assert.assertEquals(1, count);
+    testPutKey();
+  }
+
+  public void testPutKey() throws IOException {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    Instant testStartTime = Instant.now();
+    ObjectStore store =
+        OzoneClientFactory.getRpcClient(cluster.getConf()).getObjectStore();
+    String value = "sample value";
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    String keyName = UUID.randomUUID().toString();
+
+    OzoneOutputStream out = bucket
+        .createKey(keyName, value.getBytes(UTF_8).length, STAND_ALONE, ONE,
+            new HashMap<>());
+    out.write(value.getBytes(UTF_8));
+    out.close();
+    OzoneKey key = bucket.getKey(keyName);
+    Assert.assertEquals(keyName, key.getName());
+    OzoneInputStream is = bucket.readKey(keyName);
+    byte[] fileContent = new byte[value.getBytes(UTF_8).length];
+    is.read(fileContent);
+    Assert.assertEquals(value, new String(fileContent, UTF_8));
+    Assert.assertFalse(key.getCreationTime().isBefore(testStartTime));
+    Assert.assertFalse(key.getModificationTime().isBefore(testStartTime));
+    is.close();
+    final OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+        .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
+        .setRefreshPipeline(true).build();
+    final OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+    final List<OmKeyLocationInfo> keyLocationInfos =
+        keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
+    final long containerID = keyLocationInfos.get(0).getContainerID();
+    for (int k = 0; k < numOfSCMs; k++) {
+      StorageContainerManager scm =
+          cluster.getStorageContainerManagers().get(k);
+      // flush to DB on each SCM

Review comment:
       Question: Does without flush also container should be there right?

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
##########
@@ -179,25 +193,79 @@ public long takeSnapshot() throws IOException {
     long startTime = Time.monotonicNow();
     TermIndex lastTermIndex = getLastAppliedTermIndex();
     long lastAppliedIndex = lastTermIndex.getIndex();
-    TransactionInfo lastAppliedTrxInfo =
-        TransactionInfo.fromTermIndex(lastTermIndex);
-    if (transactionBuffer.getLatestTrxInfo()
-        .compareTo(lastAppliedTrxInfo) < 0) {
+    if (isInitialized.get()) {

Review comment:
       Here there is a chance of missing DB update.
   
   Lets take 1,2,4 metadata/confUpdate and 3 is applyTransaction (Still not added to TransactionDBBuffer)
   
   And now when **takeSnapshot** lastAppliedTermIndex will be 4, (As notifyConfigurationChanged will be called) and we flush. Restart SCM, we think 4 is applied but it is not.

##########
File path: hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
##########
@@ -63,69 +61,120 @@
   private static final Logger LOG =
       LoggerFactory.getLogger(SCMRatisServerImpl.class);
 
-  private final RaftServer.Division division;
+  private final RaftServer server;
+  private final SCMStateMachine stateMachine;
   private final StorageContainerManager scm;
-  private final InetSocketAddress address;
   private final ClientId clientId = ClientId.randomId();
   private final AtomicLong callId = new AtomicLong();
 
   // TODO: Refactor and remove ConfigurationSource and use only
   //  SCMHAConfiguration.
-  SCMRatisServerImpl(final SCMHAConfiguration haConf,
-      final ConfigurationSource conf, final StorageContainerManager scm,
-      final DBTransactionBuffer buffer) throws IOException {
+  SCMRatisServerImpl(final ConfigurationSource conf,
+      final StorageContainerManager scm, final DBTransactionBuffer buffer)
+      throws IOException {
     this.scm = scm;
-    this.address = haConf.getRatisBindAddress();
-    RaftServer server = newRaftServer(scm.getClusterId(), scm.getScmId(),
-        scm.getSCMHANodeDetails(), conf)
-        .setStateMachine(new SCMStateMachine(scm, this, buffer)).build();
+    this.stateMachine = new SCMStateMachine(scm, this, buffer);
+    final RaftGroupId groupId = buildRaftGroupId(scm.getClusterId());
+    LOG.info("starting Raft server for scm:{}", scm.getScmId());
+    // During SCM startup, the bootstrapped node will be started just with
+    // groupId information, so that it won't trigger any leader election
+    // as it doesn't have any peer info.
+
+    // The primary SCM node which is initialized using scm --init command,
+    // will initialize the raft server with the peer info and it will be
+    // persisted in the raft log post leader election. Now, when the primary
+    // scm boots up, it has peer info embedded in the raft log and will
+    // trigger leader election.
+    this.server =
+        newRaftServer(scm.getScmId(), conf).setStateMachine(stateMachine)
+            .setGroup(RaftGroup.valueOf(groupId)).build();
+  }
+
+  public static void initialize(String clusterId, String scmId,
+      SCMNodeDetails details, OzoneConfiguration conf) throws IOException {
+    final RaftGroup group = buildRaftGroup(details, scmId, clusterId);
+    RaftServer server = newRaftServer(scmId, conf)
+        .setGroup(group).build();
+    server.start();
+    waitforLeaderToBeReady(server, 60000, group);

Review comment:
       We are waiting for leader to be ready because RaftGroup on disk will be created after leader election?

##########
File path: hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
##########
@@ -53,6 +55,23 @@ public static ScmInfo getScmInfo(OzoneConfiguration conf)
     }
   }
 
+  public static boolean addSCM(OzoneConfiguration conf, AddSCMRequest request)
+      throws IOException {
+    OzoneConfiguration config = SCMHAUtils.removeSelfId(conf);
+    try {
+      RetryPolicy retryPolicy = retryUpToMaximumCountWithFixedSleep(
+          10, 5, TimeUnit.SECONDS);
+      RetriableTask<Boolean> retriable = new RetriableTask<>(
+          retryPolicy, "addSCM",
+          () -> getScmBlockClient(config).addSCM(request));
+      return retriable.call();
+    } catch (IOException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new IOException("Failed to get SCM info", e);

Review comment:
       Question: Do we need this retry policy here?
   As SCMFailOverProxy provider will have its own retry count and retry interval.
   
   ```
       this.maxRetryCount = config.getRetryCount();
       this.retryInterval = config.getRetryInterval();
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org