You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ha...@apache.org on 2021/11/18 17:46:38 UTC

[ozone] branch master updated: HDDS-5534. Verify config is updated on all OMs before proceeding with Bootstrap (#2491)

This is an automated email from the ASF dual-hosted git repository.

hanishakoneru pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 07b54e2  HDDS-5534. Verify config is updated on all OMs before proceeding with Bootstrap (#2491)
07b54e2 is described below

commit 07b54e2c56719ec2d00a812fc10c2c2319302cca
Author: Hanisha Koneru <ha...@apache.org>
AuthorDate: Thu Nov 18 09:46:15 2021 -0800

    HDDS-5534. Verify config is updated on all OMs before proceeding with Bootstrap (#2491)
---
 .../common/src/main/resources/ozone-default.xml    |  18 ++
 .../java/org/apache/hadoop/hdds/ExitManager.java   |  12 +-
 .../main/java/org/apache/hadoop/ozone/OmUtils.java |  82 ++++++++
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |  11 ++
 .../hadoop/ozone/om/helpers/OMNodeDetails.java     |  50 ++++-
 .../hadoop/ozone/om/protocol/OMAdminProtocol.java  |  23 ++-
 .../hadoop/ozone/om/protocol/OMConfiguration.java  |  92 +++++++++
 .../protocolPB/OMAdminProtocolClientSideImpl.java  | 136 ++++++++++++++
 .../ozone/om/protocolPB/OMAdminProtocolPB.java     |  37 ++++
 .../hadoop/ozone/MiniOzoneHAClusterImpl.java       | 150 +++++++++++----
 .../hadoop/ozone/om/TestOzoneManagerBootstrap.java | 184 ++++++++++++++----
 .../src/main/proto/OMAdminProtocol.proto           |  65 +++++++
 .../apache/hadoop/ozone/om/OMPolicyProvider.java   |   5 +-
 .../apache/hadoop/ozone/om/OMStarterInterface.java |   2 +-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   | 208 ++++++++++++++++++---
 .../hadoop/ozone/om/OzoneManagerStarter.java       |  45 +++--
 .../apache/hadoop/ozone/om/ha/OMHANodeDetails.java |   6 +-
 .../ozone/om/ratis/OzoneManagerRatisServer.java    |  24 +--
 .../protocolPB/OMAdminProtocolServerSideImpl.java  |  66 +++++++
 .../hadoop/ozone/om/TestOzoneManagerStarter.java   |   4 +-
 20 files changed, 1070 insertions(+), 150 deletions(-)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 4cff33d..6a97c85 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -2324,6 +2324,24 @@
     </description>
   </property>
   <property>
+    <name>ozone.om.admin.protocol.max.retries</name>
+    <value>20</value>
+    <tag>OM, MANAGEMENT</tag>
+    <description>
+      Expert only. The maximum number of retries for Ozone Manager Admin
+      protocol.
+    </description>
+  </property>
+  <property>
+    <name>ozone.om.admin.protocol.wait.between.retries</name>
+    <value>1000</value>
+    <tag>OM, MANAGEMENT</tag>
+    <description>
+      Expert only. The time to wait, in milliseconds, between retry attempts
+      for Ozone Manager Admin protocol.
+    </description>
+  </property>
+  <property>
     <name>ozone.recon.http.enabled</name>
     <value>true</value>
     <tag>RECON, MANAGEMENT</tag>
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/ExitManager.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/ExitManager.java
index e48c149..425a1f8 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/ExitManager.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/ExitManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdds;
 
+import java.io.IOException;
 import org.apache.ratis.util.ExitUtils;
 import org.slf4j.Logger;
 
@@ -27,7 +28,16 @@ import org.slf4j.Logger;
 public class ExitManager {
 
   public void exitSystem(int status, String message, Throwable throwable,
-      Logger log) {
+      Logger log) throws IOException {
     ExitUtils.terminate(status, message, throwable, log);
   }
+
+  public void exitSystem(int status, String message, Logger log)
+      throws IOException {
+    ExitUtils.terminate(status, message, log);
+  }
+
+  public void forceExit(int status, Exception ex, Logger log) {
+    ExitUtils.terminate(status, ex.getLocalizedMessage(), ex, log);
+  }
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index 4892f1b..433fc11 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -39,6 +39,7 @@ import java.util.Set;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
@@ -46,6 +47,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.conf.OMClientConfig;
 import org.apache.hadoop.ozone.ha.ConfUtils;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OMNodeDetails;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
@@ -67,6 +69,8 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_BIND_PORT_DE
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_INTERNAL_SERVICE_ID;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODES_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_PORT_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_PORT_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
 
 import org.slf4j.Logger;
@@ -672,4 +676,82 @@ public final class OmUtils {
     }
     return omHosts;
   }
+
+  /**
+   * Get a list of all OM details (address and ports) from the specified config.
+   */
+  public static List<OMNodeDetails> getAllOMAddresses(OzoneConfiguration conf,
+      String omServiceId, String currentOMNodeId) {
+
+    List<OMNodeDetails> omNodesList = new ArrayList<>();
+    Collection<String> omNodeIds = OmUtils.getOMNodeIds(conf, omServiceId);
+
+    String rpcAddrStr, hostAddr, httpAddr, httpsAddr;
+    int rpcPort, ratisPort;
+    if (omNodeIds.size() == 0) {
+      //Check if it is non-HA cluster
+      rpcAddrStr = OmUtils.getOmRpcAddress(conf, OZONE_OM_ADDRESS_KEY);
+      if (rpcAddrStr == null || rpcAddrStr.isEmpty()) {
+        return omNodesList;
+      }
+      hostAddr = HddsUtils.getHostName(rpcAddrStr).orElse(null);
+      rpcPort = HddsUtils.getHostPort(rpcAddrStr).orElse(0);
+      ratisPort = conf.getInt(OZONE_OM_RATIS_PORT_KEY,
+          OZONE_OM_RATIS_PORT_DEFAULT);
+      httpAddr = OmUtils.getHttpAddressForOMPeerNode(conf,
+          null, null, hostAddr);
+      httpsAddr = OmUtils.getHttpsAddressForOMPeerNode(conf,
+          null, null, hostAddr);
+
+      omNodesList.add(new OMNodeDetails.Builder()
+          .setOMNodeId(currentOMNodeId)
+          .setHostAddress(hostAddr)
+          .setRpcPort(rpcPort)
+          .setRatisPort(ratisPort)
+          .setHttpAddress(httpAddr)
+          .setHttpsAddress(httpsAddr)
+          .build());
+      return omNodesList;
+    }
+
+    for (String nodeId : omNodeIds) {
+      try {
+        OMNodeDetails omNodeDetails = OMNodeDetails.getOMNodeDetailsFromConf(
+            conf, omServiceId, nodeId);
+        omNodesList.add(omNodeDetails);
+      } catch (IOException e) {
+        String omRpcAddressStr = OMNodeDetails.getOMNodeAddressFromConf(conf,
+            omServiceId, nodeId);
+        LOG.error("OM {} is present in config file but it's address {} could " +
+            "not be resolved. Hence, OM {} is not added to list of peer nodes.",
+            nodeId, omRpcAddressStr, nodeId);
+      }
+    }
+
+    return omNodesList;
+  }
+
+  /**
+   * Return a comma separated list of OM node details
+   * (NodeID[HostAddress:RpcPort]).
+   */
+  public static String getOMAddressListPrintString(List<OMNodeDetails> omList) {
+    if (omList.size() == 0) {
+      return null;
+    }
+    StringBuilder printString = new StringBuilder();
+    printString.append("OM");
+    if (omList.size() == 1) {
+      printString.append(" [");
+    } else {
+      printString.append("(s) [");
+    }
+    printString.append(omList.get(0).getOMPrintInfo());
+    for (int i = 1; i < omList.size(); i++) {
+      printString.append(",")
+          .append(omList.get(i).getOMPrintInfo());
+    }
+    printString.append("]");
+    return printString.toString();
+  }
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 4502db4..c42d640 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -290,4 +290,15 @@ public final class OMConfigKeys {
       "ozone.path.deleting.limit.per.task";
   public static final int OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT = 10000;
 
+  /**
+   * Configuration properties for OMAdminProtcol service.
+   */
+  public static final String OZONE_OM_ADMIN_PROTOCOL_MAX_RETRIES_KEY =
+      "ozone.om.admin.protocol.max.retries";
+  public static final int OZONE_OM_ADMIN_PROTOCOL_MAX_RETRIES_DEFAULT = 20;
+  public static final String OZONE_OM_ADMIN_PROTOCOL_WAIT_BETWEEN_RETRIES_KEY =
+      "ozone.om.admin.protocol.wait.between.retries";
+  public static final long OZONE_OM_ADMIN_PROTOCOL_WAIT_BETWEEN_RETRIES_DEFAULT
+      = 1000;
+
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java
index d908b57..71c51d8 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMNodeDetails.java
@@ -23,8 +23,10 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.ha.ConfUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerAdminProtocolProtos.OMNodeInfo;
 import org.apache.hadoop.hdds.NodeDetails;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH;
@@ -164,39 +166,67 @@ public final class OMNodeDetails extends NodeDetails {
     return null;
   }
 
-  public static OMNodeDetails getOMNodeDetailsFromConf(OzoneConfiguration conf,
+  public String getOMPrintInfo() {
+    return getNodeId() + "[" + getHostAddress() + ":" + getRpcPort() + "]";
+  }
+
+  public static String getOMNodeAddressFromConf(OzoneConfiguration conf,
       String omServiceId, String omNodeId) {
     String rpcAddrKey = ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
         omServiceId, omNodeId);
-    String rpcAddrStr = OmUtils.getOmRpcAddress(conf, rpcAddrKey);
+    return OmUtils.getOmRpcAddress(conf, rpcAddrKey);
+  }
+
+  public static OMNodeDetails getOMNodeDetailsFromConf(OzoneConfiguration conf,
+      String omServiceId, String omNodeId) throws IOException {
+
+    String rpcAddrStr = getOMNodeAddressFromConf(conf, omServiceId, omNodeId);
     if (rpcAddrStr == null || rpcAddrStr.isEmpty()) {
       return null;
     }
 
-    String ratisPortKey = ConfUtils.addKeySuffixes(OZONE_OM_RATIS_PORT_KEY,
-        omServiceId, omNodeId);
-    int ratisPort = conf.getInt(ratisPortKey, OZONE_OM_RATIS_PORT_DEFAULT);
-
-    InetSocketAddress omRpcAddress = null;
+    InetSocketAddress omRpcAddress;
     try {
       omRpcAddress = NetUtils.createSocketAddr(rpcAddrStr);
     } catch (Exception e) {
-      throw new IllegalArgumentException("Couldn't create socket address" +
+      throw new IOException("Couldn't create socket address" +
           " for OM " + omNodeId + " at " + rpcAddrStr, e);
     }
 
+    String ratisPortKey = ConfUtils.addKeySuffixes(OZONE_OM_RATIS_PORT_KEY,
+        omServiceId, omNodeId);
+    int ratisPort = conf.getInt(ratisPortKey, OZONE_OM_RATIS_PORT_DEFAULT);
+
     String httpAddr = OmUtils.getHttpAddressForOMPeerNode(conf,
         omServiceId, omNodeId, omRpcAddress.getHostName());
     String httpsAddr = OmUtils.getHttpsAddressForOMPeerNode(conf,
         omServiceId, omNodeId, omRpcAddress.getHostName());
 
-    return new OMNodeDetails.Builder()
+    return new Builder()
         .setOMNodeId(omNodeId)
-        .setRpcAddress(omRpcAddress)
         .setRatisPort(ratisPort)
         .setHttpAddress(httpAddr)
         .setHttpsAddress(httpsAddr)
         .setOMServiceId(omServiceId)
+        .setRpcAddress(omRpcAddress)
+        .build();
+  }
+
+  public OMNodeInfo getProtobuf() {
+    return OMNodeInfo.newBuilder()
+        .setNodeID(getNodeId())
+        .setHostAddress(getHostAddress())
+        .setRpcPort(getRpcPort())
+        .setRatisPort(getRatisPort())
+        .build();
+  }
+
+  public static OMNodeDetails getFromProtobuf(OMNodeInfo omNodeInfo) {
+    return new Builder()
+        .setOMNodeId(omNodeInfo.getNodeID())
+        .setHostAddress(omNodeInfo.getHostAddress())
+        .setRpcPort(omNodeInfo.getRpcPort())
+        .setRatisPort(omNodeInfo.getRatisPort())
         .build();
   }
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/ExitManager.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OMAdminProtocol.java
similarity index 61%
copy from hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/ExitManager.java
copy to hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OMAdminProtocol.java
index e48c149..1a90ee7 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/ExitManager.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OMAdminProtocol.java
@@ -15,19 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdds;
+package org.apache.hadoop.ozone.om.protocol;
 
-import org.apache.ratis.util.ExitUtils;
-import org.slf4j.Logger;
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.security.KerberosInfo;
 
 /**
- * An Exit Manager used to shutdown service in case of unrecoverable error.
- * This class will be helpful to test exit functionality.
+ * Protocol for performing admin operations such as getting OM metadata.
  */
-public class ExitManager {
+@KerberosInfo(
+    serverPrincipal = OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY)
+public interface OMAdminProtocol extends Closeable {
 
-  public void exitSystem(int status, String message, Throwable throwable,
-      Logger log) {
-    ExitUtils.terminate(status, message, throwable, log);
-  }
+  /**
+   * Get the OM configuration.
+   */
+  OMConfiguration getOMConfiguration() throws IOException;
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OMConfiguration.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OMConfiguration.java
new file mode 100644
index 0000000..8bae3ea
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OMConfiguration.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om.protocol;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hdds.NodeDetails;
+import org.apache.hadoop.ozone.om.helpers.OMNodeDetails;
+
+/**
+ * Class storing the OM configuration information such as the node details in
+ * memory and node details when config is reloaded from disk.
+ * Note that this class is used as a structure to transfer the OM configuration
+ * information through the {@link OMAdminProtocol} and not for storing the
+ * configuration information in OzoneManager itself.
+ */
+public final class OMConfiguration {
+
+  // OM nodes present in OM's memory
+  private List<OMNodeDetails> omNodesInMemory = new ArrayList<>();
+  // OM nodes reloaded from new config on disk
+  private List<OMNodeDetails> omNodesInNewConf = new ArrayList<>();
+
+  private OMConfiguration(List<OMNodeDetails> inMemoryNodeList,
+      List<OMNodeDetails> onDiskNodeList) {
+    this.omNodesInMemory.addAll(inMemoryNodeList);
+    this.omNodesInNewConf.addAll(onDiskNodeList);
+  }
+
+  /**
+   * OMConfiguration Builder class.
+   */
+  public static class Builder {
+    private List<OMNodeDetails> omNodesInMemory;
+    private List<OMNodeDetails> omNodesInNewConf;
+
+    public Builder() {
+      this.omNodesInMemory = new ArrayList<>();
+      this.omNodesInNewConf = new ArrayList<>();
+    }
+
+    public Builder addToNodesInMemory(OMNodeDetails nodeDetails) {
+      this.omNodesInMemory.add(nodeDetails);
+      return this;
+    }
+
+    public Builder addToNodesInNewConf(OMNodeDetails nodeDetails) {
+      this.omNodesInNewConf.add(nodeDetails);
+      return this;
+    }
+
+    public OMConfiguration build() {
+      return new OMConfiguration(omNodesInMemory, omNodesInNewConf);
+    }
+  }
+
+  /**
+   * Return list of all current OM peer's nodeIds (does not reload
+   * configuration from disk to find newly configured OMs).
+   */
+  public List<String> getCurrentPeerList() {
+    return omNodesInMemory.stream().map(NodeDetails::getNodeId)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Reload configuration from disk and return all the OM nodes present in
+   * the new conf under current serviceId.
+   */
+  public Map<String, OMNodeDetails> getOmNodesInNewConf() {
+    return omNodesInNewConf.stream().collect(Collectors.toMap(
+        NodeDetails::getNodeId,
+        omNodeDetails -> omNodeDetails,
+        (nodeId, omNodeDetails) -> omNodeDetails));
+  }
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMAdminProtocolClientSideImpl.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMAdminProtocolClientSideImpl.java
new file mode 100644
index 0000000..35ba130
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMAdminProtocolClientSideImpl.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.protocolPB;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.helpers.OMNodeDetails;
+import org.apache.hadoop.ozone.om.protocol.OMConfiguration;
+import org.apache.hadoop.ozone.om.protocol.OMAdminProtocol;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerAdminProtocolProtos.OMConfigurationRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerAdminProtocolProtos.OMConfigurationResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerAdminProtocolProtos.OMNodeInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Protocol implementation for OM admin operations.
+ */
+public class OMAdminProtocolClientSideImpl implements OMAdminProtocol {
+
+  /**
+   * RpcController is not used and hence is set to null.
+   */
+  private static final RpcController NULL_RPC_CONTROLLER = null;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMAdminProtocolClientSideImpl.class);
+
+  private final OMNodeDetails remoteOmNodeDetails;
+  private final OMAdminProtocolPB rpcProxy;
+
+  public OMAdminProtocolClientSideImpl(ConfigurationSource conf,
+      UserGroupInformation ugi, OMNodeDetails omNodeDetails)
+      throws IOException {
+
+    RPC.setProtocolEngine(OzoneConfiguration.of(conf),
+        OMAdminProtocolPB.class, ProtobufRpcEngine.class);
+
+    this.remoteOmNodeDetails = omNodeDetails;
+
+    int maxRetries = conf.getInt(
+        OMConfigKeys.OZONE_OM_ADMIN_PROTOCOL_MAX_RETRIES_KEY,
+        OMConfigKeys.OZONE_OM_ADMIN_PROTOCOL_MAX_RETRIES_DEFAULT);
+    long waitBetweenRetries = conf.getLong(
+        OMConfigKeys.OZONE_OM_ADMIN_PROTOCOL_WAIT_BETWEEN_RETRIES_KEY,
+        OMConfigKeys.OZONE_OM_ADMIN_PROTOCOL_WAIT_BETWEEN_RETRIES_DEFAULT);
+
+    // OM metadata is requested from a specific OM and hence there is no need
+    // of any failover provider.
+    RetryPolicy connectionRetryPolicy = RetryPolicies
+        .retryUpToMaximumCountWithFixedSleep(maxRetries, waitBetweenRetries,
+            TimeUnit.MILLISECONDS);
+    Configuration hadoopConf = LegacyHadoopConfigurationSource
+        .asHadoopConfiguration(conf);
+
+    OMAdminProtocolPB proxy = RPC.getProtocolProxy(
+        OMAdminProtocolPB.class,
+        RPC.getProtocolVersion(OMAdminProtocolPB.class),
+        remoteOmNodeDetails.getRpcAddress(), ugi, hadoopConf,
+        NetUtils.getDefaultSocketFactory(hadoopConf),
+        (int) OmUtils.getOMClientRpcTimeOut(conf), connectionRetryPolicy)
+        .getProxy();
+
+    RetryPolicy retryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+        10, 1000, TimeUnit.MILLISECONDS);
+
+    this.rpcProxy = (OMAdminProtocolPB) RetryProxy.create(
+        OMAdminProtocolPB.class, proxy, retryPolicy);
+  }
+
+  @Override
+  public OMConfiguration getOMConfiguration() throws IOException {
+    try {
+      OMConfigurationResponse getConfigResponse = rpcProxy.getOMConfiguration(
+          NULL_RPC_CONTROLLER, OMConfigurationRequest.newBuilder().build());
+
+      OMConfiguration.Builder omMedatataBuilder = new OMConfiguration.Builder();
+      if (getConfigResponse.getSuccess()) {
+        if (getConfigResponse.getNodesInMemoryCount() > 0) {
+          for (OMNodeInfo omNodeInfo :
+              getConfigResponse.getNodesInMemoryList()) {
+            omMedatataBuilder.addToNodesInMemory(
+                OMNodeDetails.getFromProtobuf(omNodeInfo));
+          }
+        }
+        if (getConfigResponse.getNodesInNewConfCount() > 0) {
+          for (OMNodeInfo omNodeInfo :
+              getConfigResponse.getNodesInNewConfList()) {
+            omMedatataBuilder.addToNodesInNewConf(
+                OMNodeDetails.getFromProtobuf(omNodeInfo));
+          }
+        }
+      }
+      return omMedatataBuilder.build();
+    } catch (ServiceException e) {
+      LOG.error("Failed to retrieve configuration of OM {}",
+          remoteOmNodeDetails.getOMPrintInfo(), e);
+    }
+    return null;
+  }
+
+  @Override
+  public void close() throws IOException {
+    RPC.stopProxy(rpcProxy);
+  }
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMAdminProtocolPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMAdminProtocolPB.java
new file mode 100644
index 0000000..ff12da1
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMAdminProtocolPB.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.protocolPB;
+
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerAdminProtocolProtos.OzoneManagerAdminService;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol used for communication between OMs.
+ */
+@ProtocolInfo(protocolName =
+    "org.apache.hadoop.ozone.om.protocol.OzoneManagerMetadataProtocol",
+    protocolVersion = 1)
+@KerberosInfo(
+    serverPrincipal = OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY)
+@InterfaceAudience.Private
+public interface OMAdminProtocolPB
+    extends OzoneManagerAdminService.BlockingInterface {
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
index f114578..9bfae2a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
@@ -19,8 +19,10 @@
 package org.apache.hadoop.ozone;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.hadoop.hdds.ExitManager;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.TestUtils;
@@ -28,7 +30,6 @@ import org.apache.hadoop.hdds.scm.ha.CheckedConsumer;
 import org.apache.hadoop.hdds.scm.safemode.HealthyPipelineSafeModeRule;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.ha.ConfUtils;
@@ -64,7 +65,7 @@ import static org.apache.hadoop.ozone.om.OmUpgradeConfig.ConfigStrings.OZONE_OM_
  */
 public class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
 
-  private static final Logger LOG =
+  public static final Logger LOG =
       LoggerFactory.getLogger(MiniOzoneHAClusterImpl.class);
 
   private final OMHAService omhaService;
@@ -182,6 +183,23 @@ public class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
     return this.scmhaService.getServiceByIndex(index);
   }
 
+  private OzoneManager getOMLeader(boolean waitForLeaderElection)
+      throws TimeoutException, InterruptedException {
+    if (waitForLeaderElection) {
+      final OzoneManager[] om = new OzoneManager[1];
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          om[0] = getOMLeader();
+          return om[0] != null;
+        }
+      }, 200, waitForClusterToBeReadyTimeout);
+      return om[0];
+    } else {
+      return getOMLeader();
+    }
+  }
+
   /**
    * Get OzoneManager leader object.
    * @return OzoneManager object, null if there isn't one or more than one
@@ -491,11 +509,6 @@ public class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
             }
           }
 
-          // Set default OM address to point to the first OM. Clients would
-          // try connecting to this address by default
-          conf.set(OMConfigKeys.OZONE_OM_ADDRESS_KEY,
-              NetUtils.getHostPortString(omList.get(0).getOmRpcServerAddr()));
-
           break;
         } catch (BindException e) {
           for (OzoneManager om : omList) {
@@ -686,17 +699,33 @@ public class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
   }
 
   /**
-   * Bootstrap new OM and add to existing OM HA service ring.
-   * @return new OM nodeId
+   * Bootstrap new OM by updating existing OM configs.
    */
   public void bootstrapOzoneManager(String omNodeId) throws Exception {
+    bootstrapOzoneManager(omNodeId, true, false);
+  }
 
-    int basePort;
-    int retryCount = 0;
+  /**
+   * Bootstrap new OM and add to existing OM HA service ring.
+   * @param omNodeId nodeId of new OM
+   * @param updateConfigs if true, update the old OM configs with new node
+   *                      information
+   * @param force if true, start new OM with FORCE_BOOTSTRAP option.
+   *              Otherwise, start new OM with BOOTSTRAP option.
+   */
+  public void bootstrapOzoneManager(String omNodeId,
+      boolean updateConfigs, boolean force) throws Exception {
 
+    // Set testReloadConfigFlag to true so that
+    // OzoneManager#reloadConfiguration does not reload config as it will
+    // return the default configurations.
+    OzoneManager.setTestReloadConfigFlag(true);
+
+    int retryCount = 0;
     OzoneManager om = null;
 
-    long leaderSnapshotIndex = getOMLeader().getRatisSnapshotIndex();
+    OzoneManager omLeader = getOMLeader(true);
+    long leaderSnapshotIndex = omLeader.getRatisSnapshotIndex();
 
     while (true) {
       try {
@@ -704,7 +733,11 @@ public class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
         OzoneConfiguration newConf = addNewOMToConfig(getOMServiceId(),
             omNodeId, portSet);
 
-        om = bootstrapNewOM(omNodeId);
+        if (updateConfigs) {
+          updateOMConfigs(newConf);
+        }
+
+        om = bootstrapNewOM(omNodeId, newConf, force);
 
         LOG.info("Bootstrapped OzoneManager {} RPC server at {}", omNodeId,
             om.getOmRpcServerAddr());
@@ -712,7 +745,6 @@ public class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
         // Add new OMs to cluster's in memory map and update existing OMs conf.
         setConf(newConf);
 
-        omhaService.addInstance(om, true);
         break;
       } catch (IOException e) {
         // Existing OM config could have been updated with new conf. Reset it.
@@ -731,7 +763,9 @@ public class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
     }
 
     waitForBootstrappedNodeToBeReady(om, leaderSnapshotIndex);
-    waitForConfigUpdateOnAllOMs(omNodeId);
+    if (updateConfigs) {
+      waitForConfigUpdateOnActiveOMs(omNodeId);
+    }
   }
 
   /**
@@ -739,7 +773,8 @@ public class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
    */
   private OzoneConfiguration addNewOMToConfig(String omServiceId,
       String omNodeId, List<Integer> portList) {
-    OzoneConfiguration newConf = getConf();
+
+    OzoneConfiguration newConf = new OzoneConfiguration(getConf());
     String omNodesKey = ConfUtils.addKeySuffixes(
         OMConfigKeys.OZONE_OM_NODES_KEY, omServiceId);
     StringBuilder omNodesKeyValue = new StringBuilder();
@@ -766,34 +801,48 @@ public class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
   }
 
   /**
-   * Start a new OM in Bootstrap mode. Configs for the new OM must already be
-   * set.
+   * Update the configurations of the given list of OMs.
+   */
+  public void updateOMConfigs(OzoneConfiguration newConf) {
+    for (OzoneManager om : omhaService.getActiveServices()) {
+      om.setConfiguration(newConf);
+    }
+  }
+
+  /**
+   * Start a new OM in Bootstrap mode. Configs (address and ports) for the new
+   * OM must already be set in the newConf.
    */
-  private OzoneManager bootstrapNewOM(String nodeId)
-      throws IOException, AuthenticationException {
-    OzoneConfiguration config = new OzoneConfiguration(getConf());
+  private OzoneManager bootstrapNewOM(String nodeId, OzoneConfiguration newConf,
+      boolean force) throws IOException, AuthenticationException {
+
+    OzoneConfiguration config = new OzoneConfiguration(newConf);
+
+    // For bootstrapping node, set the nodeId config also.
     config.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeId);
-    // Set the OM rpc and http(s) address to null so that the cluster picks
-    // up the address set with service ID and node ID
-    config.set(OMConfigKeys.OZONE_OM_ADDRESS_KEY, "");
-    config.set(OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY, "");
-    config.set(OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY, "");
 
     // Set metadata/DB dir base path
     String metaDirPath = clusterMetaPath + "/" + nodeId;
     config.set(OZONE_METADATA_DIRS, metaDirPath);
 
-    // Update existing OMs config
-    for (OzoneManager existingOM : omhaService.getServices()) {
-      existingOM.setConfiguration(config);
+    OzoneManager.omInit(config);
+    OzoneManager om;
+
+    if (force) {
+      om = OzoneManager.createOm(config,
+          OzoneManager.StartupOption.FORCE_BOOTSTRAP);
+    } else {
+      om = OzoneManager.createOm(config, OzoneManager.StartupOption.BOOTSTRAP);
     }
 
-    OzoneManager.omInit(config);
-    OzoneManager om = OzoneManager.createOm(config,
-        OzoneManager.StartupOption.BOOTSTRAP);
+    ExitManagerForOM exitManager = new ExitManagerForOM(this, nodeId);
+    om.setExitManagerForTesting(exitManager);
+    omhaService.addInstance(om, false);
+
     om.start();
-    return om;
+    omhaService.activate(om);
 
+    return om;
   }
 
   /**
@@ -814,7 +863,7 @@ public class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
     }, 1000, waitForClusterToBeReadyTimeout);
   }
 
-  private void waitForConfigUpdateOnAllOMs(String newOMNodeId)
+  private void waitForConfigUpdateOnActiveOMs(String newOMNodeId)
       throws Exception {
     OzoneManager newOMNode = omhaService.getServiceById(newOMNodeId);
     OzoneManagerRatisServer newOMRatisServer = newOMNode.getOmRatisServer();
@@ -837,6 +886,12 @@ public class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
     }, 1000, waitForClusterToBeReadyTimeout);
   }
 
+  public void setupExitManagerForTesting() {
+    for (OzoneManager om : omhaService.getServices()) {
+      om.setExitManagerForTesting(new ExitManagerForOM(this, om.getOMNodeId()));
+    }
+  }
+
   /**
    * MiniOzoneHAService is a helper class used for both SCM and OM HA.
    * This class keeps track of active and inactive OM/SCM services
@@ -973,4 +1028,31 @@ public class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
         .map(inetSocketAddress -> inetSocketAddress.getPort())
         .collect(Collectors.toList());
   }
+
+  private static final class ExitManagerForOM extends ExitManager {
+
+    private MiniOzoneHAClusterImpl cluster;
+    private String omNodeId;
+
+    private ExitManagerForOM(MiniOzoneHAClusterImpl cluster, String nodeId) {
+      this.cluster = cluster;
+      this.omNodeId = nodeId;
+    }
+
+    @Override
+    public void exitSystem(int status, String message, Throwable throwable,
+        Logger log) throws IOException {
+      LOG.error(omNodeId + " - System Exit: " + message, throwable);
+      cluster.stopOzoneManager(omNodeId);
+      throw new IOException(throwable);
+    }
+
+    @Override
+    public void exitSystem(int status, String message, Logger log)
+        throws IOException {
+      LOG.error(omNodeId + " - System Exit: " + message);
+      cluster.stopOzoneManager(omNodeId);
+      throw new IOException(message);
+    }
+  }
 }
\ No newline at end of file
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerBootstrap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerBootstrap.java
index a71b2ea..115c9b3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerBootstrap.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerBootstrap.java
@@ -18,8 +18,10 @@
 
 package org.apache.hadoop.ozone.om;
 
+import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.FileFilter;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
@@ -29,18 +31,23 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
 import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.grpc.server.GrpcLogAppender;
+import org.apache.ratis.server.leader.FollowerInfo;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.Timeout;
+import org.slf4j.event.Level;
 
 import static org.apache.hadoop.ozone.OzoneConsts.SCM_DUMMY_SERVICE_ID;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT;
@@ -63,8 +70,6 @@ public class TestOzoneManagerBootstrap {
   private final String clusterId = UUID.randomUUID().toString();
   private final String scmId = UUID.randomUUID().toString();
 
-  private static final int NUM_INITIAL_OMS = 3;
-
   private static final String OM_SERVICE_ID = "om-bootstrap";
   private static final String VOLUME_NAME;
   private static final String BUCKET_NAME;
@@ -76,12 +81,9 @@ public class TestOzoneManagerBootstrap {
     BUCKET_NAME = "bucket" + RandomStringUtils.randomNumeric(5);
   }
 
-  private void setupCluster() throws Exception {
-    setupCluster(NUM_INITIAL_OMS);
-  }
-
   private void setupCluster(int numInitialOMs) throws Exception {
     conf = new OzoneConfiguration();
+    conf.setInt(OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 5);
     cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
         .setClusterId(clusterId)
         .setScmId(scmId)
@@ -161,36 +163,20 @@ public class TestOzoneManagerBootstrap {
   }
 
   /**
-   * Add 1 new OM to cluster.
-   * @throws Exception
-   */
-  @Test
-  public void testBootstrapOneNewOM() throws Exception {
-    setupCluster();
-    testBootstrapOMs(1);
-  }
-
-  /**
-   * Add 2 new OMs to cluster.
-   * @throws Exception
-   */
-  @Test
-  public void testBootstrapTwoNewOMs() throws Exception {
-    setupCluster();
-    testBootstrapOMs(2);
-  }
-
-  /**
-   * Add 2 new OMs to a 1 node OM cluster. Verify that one of the new OMs
-   * must becomes the leader by stopping the old OM.
+   * 1. Add 2 new OMs to an existing 1 node OM cluster.
+   * 2. Verify that one of the new OMs must becomes the leader by stopping
+   * the old OM.
    */
   @Test
-  public void testLeaderChangeToNewOM() throws Exception {
+  public void testBootstrap() throws Exception {
     setupCluster(1);
     OzoneManager oldOM = cluster.getOzoneManager();
+
+    // 1. Add 2 new OMs to an existing 1 node OM cluster.
     List<String> newOMNodeIds = testBootstrapOMs(2);
 
-    // Stop old OM
+    // 2. Verify that one of the new OMs becomes the leader by stopping the
+    // old OM.
     cluster.stopOzoneManager(oldOM.getOMNodeId());
 
     // Wait for Leader Election timeout
@@ -205,12 +191,146 @@ public class TestOzoneManagerBootstrap {
         "other OMs are down", newOMNodeIds.contains(omLeader.getOMNodeId()));
 
     // Perform some read and write operations with new OM leader
-    objectStore = OzoneClientFactory.getRpcClient(OM_SERVICE_ID, conf)
-        .getObjectStore();
+    objectStore = OzoneClientFactory.getRpcClient(OM_SERVICE_ID,
+        cluster.getConf()).getObjectStore();
+
     OzoneVolume volume = objectStore.getVolume(VOLUME_NAME);
     OzoneBucket bucket = volume.getBucket(BUCKET_NAME);
     String key = createKey(bucket);
 
     Assert.assertNotNull(bucket.getKey(key));
   }
+
+  /**
+   * Tests the following scenarios:
+   * 1. Bootstrap without updating config on any existing OM -> fail
+   * 2. Force bootstrap without upating config on any OM -> fail
+   */
+  @Test
+  public void testBootstrapWithoutConfigUpdate() throws Exception {
+    // Setup 1 node cluster
+    setupCluster(1);
+    cluster.setupExitManagerForTesting();
+    OzoneManager existingOM = cluster.getOzoneManager(0);
+    String existingOMNodeId = existingOM.getOMNodeId();
+
+    GenericTestUtils.LogCapturer omLog =
+        GenericTestUtils.LogCapturer.captureLogs(OzoneManager.LOG);
+    GenericTestUtils.LogCapturer miniOzoneClusterLog =
+        GenericTestUtils.LogCapturer.captureLogs(MiniOzoneHAClusterImpl.LOG);
+
+    /***************************************************************************
+     * 1. Bootstrap without updating config on any existing OM -> fail
+     **************************************************************************/
+
+    // Bootstrap a new node without updating the configs on existing OMs.
+    // This should result in the bootstrap failing.
+    String newNodeId = "omNode-bootstrap-1";
+    try {
+      cluster.bootstrapOzoneManager(newNodeId, false, false);
+      Assert.fail("Bootstrap should have failed as configs are not updated on" +
+          " all OMs.");
+    } catch (Exception e) {
+      Assert.assertEquals(OmUtils.getOMAddressListPrintString(
+          Lists.newArrayList(existingOM.getNodeDetails())) + " do not have or" +
+          " have incorrect information of the bootstrapping OM. Update their " +
+          "ozone-site.xml before proceeding.", e.getMessage());
+      Assert.assertTrue(omLog.getOutput().contains("Remote OM config check " +
+          "failed on OM " + existingOMNodeId));
+      Assert.assertTrue(miniOzoneClusterLog.getOutput().contains(newNodeId +
+          " - System Exit"));
+    }
+
+    /***************************************************************************
+     * 2. Force bootstrap without updating config on any OM -> fail
+     **************************************************************************/
+
+    // Force Bootstrap a new node without updating the configs on existing OMs.
+    // This should avoid the bootstrap check but the bootstrap should fail
+    // eventually as the SetConfiguration request cannot succeed.
+
+    miniOzoneClusterLog.clearOutput();
+    omLog.clearOutput();
+
+    newNodeId = "omNode-bootstrap-2";
+    try {
+      cluster.bootstrapOzoneManager(newNodeId, false, true);
+      Assert.fail();
+    } catch (IOException e) {
+      Assert.assertTrue(omLog.getOutput().contains("Couldn't add OM " +
+          newNodeId + " to peer list."));
+      Assert.assertTrue(miniOzoneClusterLog.getOutput().contains(
+          existingOMNodeId + " - System Exit: There is no OM configuration " +
+              "for node ID " + newNodeId + " in ozone-site.xml."));
+
+      // Verify that the existing OM has stopped.
+      Assert.assertFalse(cluster.getOzoneManager(existingOMNodeId).isRunning());
+    }
+  }
+
+  /**
+   * Tests the following scenarios:
+   * 1. Stop 1 OM and update configs on rest, bootstrap new node -> fail
+   * 2. Force bootstrap (with 1 node down and updated configs on rest) -> pass
+   */
+  @Test
+  public void testForceBootstrap() throws Exception {
+    GenericTestUtils.setLogLevel(GrpcLogAppender.LOG, Level.ERROR);
+    GenericTestUtils.setLogLevel(FollowerInfo.LOG, Level.ERROR);
+    // Setup a 3 node cluster and stop 1 OM.
+    setupCluster(3);
+    OzoneManager downOM = cluster.getOzoneManager(2);
+    String downOMNodeId = downOM.getOMNodeId();
+    cluster.stopOzoneManager(downOMNodeId);
+
+    // Set a smaller value for OM Metadata and Client protocol retry attempts
+    OzoneConfiguration config = cluster.getConf();
+    config.setInt(OMConfigKeys.OZONE_OM_ADMIN_PROTOCOL_MAX_RETRIES_KEY, 2);
+    config.setInt(
+        OMConfigKeys.OZONE_OM_ADMIN_PROTOCOL_WAIT_BETWEEN_RETRIES_KEY, 100);
+    cluster.setConf(config);
+
+    GenericTestUtils.LogCapturer omLog =
+        GenericTestUtils.LogCapturer.captureLogs(OzoneManager.LOG);
+    GenericTestUtils.LogCapturer miniOzoneClusterLog =
+        GenericTestUtils.LogCapturer.captureLogs(MiniOzoneHAClusterImpl.LOG);
+
+    /***************************************************************************
+     * 1. Force bootstrap (with 1 node down and updated configs on rest) -> pass
+     **************************************************************************/
+
+    // Update configs on all active OMs and Bootstrap a new node
+    String newNodeId = "omNode-bootstrap-1";
+    try {
+      cluster.bootstrapOzoneManager(newNodeId, true, false);
+      Assert.fail("Bootstrap should have failed as configs are not updated on" +
+          " all OMs.");
+    } catch (IOException e) {
+      Assert.assertEquals(OmUtils.getOMAddressListPrintString(
+          Lists.newArrayList(downOM.getNodeDetails())) + " do not have or " +
+          "have incorrect information of the bootstrapping OM. Update their " +
+          "ozone-site.xml before proceeding.", e.getMessage());
+      Assert.assertTrue(omLog.getOutput().contains("Remote OM " + downOMNodeId +
+          " configuration returned null"));
+      Assert.assertTrue(omLog.getOutput().contains("Remote OM config check " +
+          "failed on OM " + downOMNodeId));
+      Assert.assertTrue(miniOzoneClusterLog.getOutput().contains(newNodeId +
+          " - System Exit"));
+    }
+
+    /***************************************************************************
+     * 2. Force bootstrap (with 1 node down and updated configs on rest) -> pass
+     **************************************************************************/
+
+    miniOzoneClusterLog.clearOutput();
+    omLog.clearOutput();
+
+    // Update configs on all active OMs and Force Bootstrap a new node
+    newNodeId = "omNode-bootstrap-2";
+    cluster.bootstrapOzoneManager(newNodeId, true, true);
+    OzoneManager newOM = cluster.getOzoneManager(newNodeId);
+
+    // Verify that the newly bootstrapped OM is running
+    Assert.assertTrue(newOM.isRunning());
+  }
 }
diff --git a/hadoop-ozone/interface-client/src/main/proto/OMAdminProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OMAdminProtocol.proto
new file mode 100644
index 0000000..bfba946
--- /dev/null
+++ b/hadoop-ozone/interface-client/src/main/proto/OMAdminProtocol.proto
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * These .proto interfaces are private and unstable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *unstable* .proto interface.
+ */
+
+syntax = "proto2";
+option java_package = "org.apache.hadoop.ozone.protocol.proto";
+option java_outer_classname = "OzoneManagerAdminProtocolProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.ozone;
+
+/**
+This file contains the admin protocol for Ozone Manager(s). This involves
+getting the meta information about an individual OM.
+*/
+
+message OMConfigurationRequest {
+}
+
+message OMConfigurationResponse {
+    required bool success = 1;
+    optional string errorMsg = 2;
+    // OM nodes present in OM's memory
+    repeated OMNodeInfo nodesInMemory = 3;
+    // OM nodes reloaded from new config on disk
+    repeated OMNodeInfo nodesInNewConf = 4;
+
+}
+
+message OMNodeInfo {
+    required string nodeID = 1;
+    required string hostAddress = 2;
+    required uint32 rpcPort = 3;
+    required uint32 ratisPort = 4;
+}
+
+/**
+ The service for OM admin operations.
+*/
+service OzoneManagerAdminService {
+    // RPC request to OM to return its confugration -  in memory OM nodes list
+    // and the anticipated nodes list from the config files (upon reloading).
+    rpc getOMConfiguration(OMConfigurationRequest)
+    returns(OMConfigurationResponse);
+}
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMPolicyProvider.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMPolicyProvider.java
index f8f265e..36fe2c1 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMPolicyProvider.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMPolicyProvider.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.annotation.InterfaceAudience.Private;
 import org.apache.hadoop.hdds.annotation.InterfaceStability;
 import org.apache.hadoop.hdds.annotation.InterfaceStability.Unstable;
 import org.apache.hadoop.ozone.om.protocol.OMInterServiceProtocol;
+import org.apache.hadoop.ozone.om.protocol.OMAdminProtocol;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.Service;
@@ -59,7 +60,9 @@ public final class OMPolicyProvider extends PolicyProvider {
           new Service(OZONE_OM_SECURITY_CLIENT_PROTOCOL_ACL,
               OzoneManagerProtocol.class),
           new Service(OZONE_OM_SECURITY_ADMIN_PROTOCOL_ACL,
-              OMInterServiceProtocol.class)
+              OMInterServiceProtocol.class),
+          new Service(OZONE_OM_SECURITY_ADMIN_PROTOCOL_ACL,
+              OMAdminProtocol.class)
       };
 
   @SuppressFBWarnings("EI_EXPOSE_REP")
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStarterInterface.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStarterInterface.java
index bafeec3..cc12ce3 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStarterInterface.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStarterInterface.java
@@ -30,7 +30,7 @@ public interface OMStarterInterface {
       AuthenticationException;
   boolean init(OzoneConfiguration conf) throws IOException,
       AuthenticationException;
-  void bootstrap(OzoneConfiguration conf) throws IOException,
+  void bootstrap(OzoneConfiguration conf, boolean force) throws IOException,
       AuthenticationException;
   void startAndCancelPrepare(OzoneConfiguration conf) throws IOException,
       AuthenticationException;
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 37cf06c..32396c8 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -98,7 +98,6 @@ import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.OzoneIllegalArgumentException;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
 import org.apache.hadoop.ozone.audit.AuditAction;
 import org.apache.hadoop.ozone.audit.AuditEventStatus;
@@ -126,9 +125,12 @@ import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
 import org.apache.hadoop.ozone.om.protocol.OMInterServiceProtocol;
+import org.apache.hadoop.ozone.om.protocol.OMConfiguration;
 import org.apache.hadoop.ozone.om.protocolPB.OMInterServiceProtocolClientSideImpl;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.ozone.om.protocolPB.OMInterServiceProtocolPB;
+import org.apache.hadoop.ozone.om.protocolPB.OMAdminProtocolClientSideImpl;
+import org.apache.hadoop.ozone.om.protocolPB.OMAdminProtocolPB;
 import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
 import org.apache.hadoop.ozone.common.ha.ratis.RatisSnapshotInfo;
 import org.apache.hadoop.hdds.security.OzoneSecurityException;
@@ -139,6 +141,7 @@ import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.om.snapshot.OzoneManagerSnapshotProvider;
 import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager;
 import org.apache.hadoop.ozone.om.upgrade.OMUpgradeFinalizer;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerAdminProtocolProtos.OzoneManagerAdminService;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
@@ -146,6 +149,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRoleI
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.S3Authentication;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort;
 import org.apache.hadoop.ozone.protocolPB.OMInterServiceProtocolServerSideImpl;
+import org.apache.hadoop.ozone.protocolPB.OMAdminProtocolServerSideImpl;
 import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos.PersistedUserVolumeInfo;
 import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
 import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
@@ -282,7 +286,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   private CertificateClient certClient;
   private String caCertPem = null;
   private List<String> caCertPemList = new ArrayList<>();
-  private static boolean testSecureOmFlag = false;
   private final Text omRpcAddressTxt;
   private OzoneConfiguration configuration;
   private RPC.Server omRpcServer;
@@ -363,12 +366,20 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
 
   private OzoneManagerPrepareState prepareState;
 
+  private boolean isBootstrapping = false;
+  private boolean isForcedBootstrapping = false;
+
+  // Test flags
+  private static boolean testReloadConfigFlag = false;
+  private static boolean testSecureOmFlag = false;
+
   /**
    * OM Startup mode.
    */
   public enum StartupOption {
     REGUALR,
-    BOOTSTRAP
+    BOOTSTRAP,
+    FORCE_BOOTSTRAP
   }
 
   private enum State {
@@ -378,8 +389,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     STOPPED
   }
 
-  private boolean isBootstrapping = false;
-
   // Used in MiniOzoneCluster testing
   private State omState;
   private Thread emptier;
@@ -413,7 +422,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     // In case of single OM Node Service there will be no OM Node ID
     // specified, set it to value from om storage
     if (this.omNodeDetails.getNodeId() == null) {
-      this.omNodeDetails = OMHANodeDetails.getOMNodeDetails(conf,
+      this.omNodeDetails = OMHANodeDetails.getOMNodeDetailsForNonHA(conf,
           omNodeDetails.getServiceId(),
           omStorage.getOmId(), omNodeDetails.getRpcAddress(),
           omNodeDetails.getRatisPort());
@@ -522,12 +531,14 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
 
     if (startupOption == StartupOption.BOOTSTRAP) {
       isBootstrapping = true;
+    } else if (startupOption == StartupOption.FORCE_BOOTSTRAP) {
+      isForcedBootstrapping = true;
     }
 
     this.omRatisSnapshotInfo = new RatisSnapshotInfo();
 
     initializeRatisDirs(conf);
-    initializeRatisServer(isBootstrapping);
+    initializeRatisServer(isBootstrapping || isForcedBootstrapping);
 
     metrics = OMMetrics.create();
     omClientProtocolMetrics = ProtocolMessageMetrics
@@ -544,7 +555,12 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     };
     ShutdownHookManager.get().addShutdownHook(shutdownHook,
         SHUTDOWN_HOOK_PRIORITY);
-    omState = State.INITIALIZED;
+
+    if (isBootstrapping || isForcedBootstrapping) {
+      omState = State.BOOTSTRAPPING;
+    } else {
+      omState = State.INITIALIZED;
+    }
   }
 
   /**
@@ -773,6 +789,13 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     }
   }
 
+  public void shutdown(String errorMsg) throws IOException {
+    if (omState != State.STOPPED) {
+      stop();
+      exitManager.exitSystem(1, errorMsg, LOG);
+    }
+  }
+
   /**
    * Class which schedule saving metrics to a file.
    */
@@ -1010,8 +1033,14 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
         OzoneManagerInterService.newReflectiveBlockingService(
             omInterServerProtocol);
 
+    OMAdminProtocolServerSideImpl omMetadataServerProtocol =
+        new OMAdminProtocolServerSideImpl(this);
+    BlockingService omAdminService =
+        OzoneManagerAdminService.newReflectiveBlockingService(
+            omMetadataServerProtocol);
+
     return startRpcServer(configuration, omNodeRpcAddr, omService,
-        omInterService, handlerCount);
+        omInterService, omAdminService, handlerCount);
   }
 
   /**
@@ -1028,7 +1057,9 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
    */
   private RPC.Server startRpcServer(OzoneConfiguration conf,
       InetSocketAddress addr, BlockingService clientProtocolService,
-      BlockingService interOMProtocolService, int handlerCount)
+      BlockingService interOMProtocolService,
+      BlockingService omMetadataProtocolService,
+      int handlerCount)
       throws IOException {
     RPC.Server rpcServer = new RPC.Builder(conf)
         .setProtocol(OzoneManagerProtocolPB.class)
@@ -1042,6 +1073,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
 
     HddsServerUtil.addPBProtocol(conf, OMInterServiceProtocolPB.class,
         interOMProtocolService, rpcServer);
+    HddsServerUtil.addPBProtocol(conf, OMAdminProtocolPB.class,
+        omMetadataProtocolService, rpcServer);
 
     if (conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
         false)) {
@@ -1317,6 +1350,16 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   public void start() throws IOException {
     initFSOLayout();
 
+    if (omState == State.BOOTSTRAPPING) {
+      if (isBootstrapping) {
+        // Check that all OM configs have been updated with the new OM info.
+        checkConfigBeforeBootstrap();
+      } else if (isForcedBootstrapping) {
+        LOG.warn("Skipped checking whether existing OM configs have been " +
+            "updated with this OM information as force bootstrap is called.");
+      }
+    }
+
     omClientProtocolMetrics.register();
     HddsServerUtil.initializeMetrics(configuration, "OzoneManager");
 
@@ -1395,8 +1438,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     startJVMPauseMonitor();
     setStartTime();
 
-    if (isBootstrapping) {
-      omState = State.BOOTSTRAPPING;
+    if (omState == State.BOOTSTRAPPING) {
       bootstrap(omNodeDetails);
     }
 
@@ -1470,6 +1512,65 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     omState = State.RUNNING;
   }
 
+  private void checkConfigBeforeBootstrap() throws IOException {
+    List<OMNodeDetails> omsWihtoutNewConfig = new ArrayList<>();
+    for (Map.Entry<String, OMNodeDetails> entry : peerNodesMap.entrySet()) {
+      String remoteNodeId = entry.getKey();
+      OMNodeDetails remoteNodeDetails = entry.getValue();
+      try (OMAdminProtocolClientSideImpl omMetadataProtocolClient =
+               new OMAdminProtocolClientSideImpl(configuration,
+                   getRemoteUser(), entry.getValue())) {
+
+        OMConfiguration remoteOMConfiguration =
+            omMetadataProtocolClient.getOMConfiguration();
+        checkRemoteOMConfig(remoteNodeId, remoteOMConfiguration);
+      } catch (IOException ioe) {
+        LOG.error("Remote OM config check failed on OM {}", remoteNodeId, ioe);
+        omsWihtoutNewConfig.add(remoteNodeDetails);
+      }
+    }
+    if (!omsWihtoutNewConfig.isEmpty()) {
+      String errorMsg = OmUtils.getOMAddressListPrintString(omsWihtoutNewConfig)
+          + " do not have or have incorrect information of the bootstrapping " +
+          "OM. Update their ozone-site.xml before proceeding.";
+      exitManager.exitSystem(1, errorMsg, LOG);
+    }
+  }
+
+  /**
+   * Verify that the remote OM configuration is updated for the bootstrapping
+   * OM.
+   */
+  private void checkRemoteOMConfig(String remoteNodeId,
+      OMConfiguration remoteOMConfig) throws IOException {
+    if (remoteOMConfig == null) {
+      throw new IOException("Remote OM " + remoteNodeId + " configuration " +
+          "returned null");
+    }
+
+    if (remoteOMConfig.getCurrentPeerList().contains(this.getOMNodeId())) {
+      throw new IOException("Remote OM " + remoteNodeId + " already contains " +
+          "bootstrapping OM(" + getOMNodeId() + ") as part of its Raft group " +
+          "peers.");
+    }
+
+    OMNodeDetails omNodeDetailsInRemoteConfig = remoteOMConfig
+        .getOmNodesInNewConf().get(getOMNodeId());
+    if (omNodeDetailsInRemoteConfig == null) {
+      throw new IOException("Remote OM " + remoteNodeId + " does not have the" +
+          " bootstrapping OM(" + getOMNodeId() + ") information on reloading " +
+          "configs or it could not resolve the address.");
+    }
+
+    if (!omNodeDetailsInRemoteConfig.getRpcAddress().equals(
+        this.omNodeDetails.getRpcAddress())) {
+      throw new IOException("Remote OM " + remoteNodeId + " configuration has" +
+          " bootstrapping OM(" + getOMNodeId() + ") address as " +
+          omNodeDetailsInRemoteConfig.getRpcAddress() + " where the " +
+          "bootstrapping OM address is " + omNodeDetails.getRpcAddress());
+    }
+  }
+
   @Override
   public void bootstrap(OMNodeDetails newOMNode) throws IOException {
     // Create InterOmServiceProtocol client to send request to other OMs
@@ -1482,6 +1583,9 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
 
         LOG.info("Successfully bootstrapped OM {} and joined the Ratis group " +
             "{}", getOMNodeId(), omRatisServer.getRaftGroup());
+      } catch (Exception e) {
+        LOG.error("Failed to Bootstrap OM.");
+        throw e;
       }
     } else {
       throw new IOException("OzoneManager can be bootstrapped only when ratis" +
@@ -1500,7 +1604,13 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       // Check if the OM NodeID is already present in the peer list or its
       // the local NodeID.
       if (!peerNodesMap.containsKey(omNodeId) && !isCurrentNode(omNodeId)) {
-        addOMNodeToPeers(omNodeId);
+        try {
+          addOMNodeToPeers(omNodeId);
+        } catch (IOException e) {
+          LOG.error("Fatal Error: Shutting down the system as otherwise it " +
+              "could lead to OM state divergence.", e);
+          exitManager.forceExit(1, e, LOG);
+        }
       } else {
         // Check if the OMNodeID is present in the RatisServer's peer list
         if (!ratisServerPeerIdsList.contains(omNodeId)) {
@@ -1534,21 +1644,28 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
    * after a SetConfiguration request has been successfully executed by the
    * Ratis server.
    */
-  public void addOMNodeToPeers(String newOMNodeId) {
-    OMNodeDetails newOMNodeDetails = OMNodeDetails.getOMNodeDetailsFromConf(
-        getConfiguration(), getOMServiceId(), newOMNodeId);
-    if (newOMNodeDetails == null) {
-      // Load new configuration object to read in new peer information
-      setConfiguration(new OzoneConfiguration());
+  private void addOMNodeToPeers(String newOMNodeId) throws IOException {
+    OMNodeDetails newOMNodeDetails = null;
+    try {
       newOMNodeDetails = OMNodeDetails.getOMNodeDetailsFromConf(
           getConfiguration(), getOMServiceId(), newOMNodeId);
-
       if (newOMNodeDetails == null) {
-        // If new node information is not present in the newly loaded
-        // configuration also, throw an exception
-        throw new OzoneIllegalArgumentException("There is no OM configuration "
-            + "for node ID " + newOMNodeId + " in ozone-site.xml.");
+        // Load new configuration object to read in new peer information
+        setConfiguration(reloadConfiguration());
+        newOMNodeDetails = OMNodeDetails.getOMNodeDetailsFromConf(
+            getConfiguration(), getOMServiceId(), newOMNodeId);
+
+        if (newOMNodeDetails == null) {
+          // If new node information is not present in the newly loaded
+          // configuration also, throw an exception
+          throw new IOException("There is no OM configuration for node ID "
+              + newOMNodeId + " in ozone-site.xml.");
+        }
       }
+    } catch (IOException e) {
+      LOG.error("{}: Couldn't add OM {} to peer list.", getOMNodeId(),
+          newOMNodeId);
+      exitManager.exitSystem(1, e.getLocalizedMessage(), e, LOG);
     }
 
     if (omSnapshotProvider == null) {
@@ -1579,6 +1696,26 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   }
 
   /**
+   * Return list of all current OM peers (does not reload configuration from
+   * disk to find newly configured OMs).
+   */
+  public List<OMNodeDetails> getAllOMNodesInMemory() {
+    List<OMNodeDetails> peerNodes = getPeerNodes();
+    // Add current node also to list
+    peerNodes.add(omNodeDetails);
+    return peerNodes;
+  }
+
+  /**
+   * Reload configuration from disk and return all the OM nodes present in
+   * the new conf under current serviceId.
+   */
+  public List<OMNodeDetails> getAllOMNodesInNewConf() {
+    OzoneConfiguration newConf = reloadConfiguration();
+    return OmUtils.getAllOMAddresses(newConf, getOMServiceId(), getOMNodeId());
+  }
+
+  /**
    * Starts a Trash Emptier thread that does an fs.trashRoots and performs
    * checkpointing & deletion.
    * @param conf
@@ -1695,7 +1832,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
    * Stop service.
    */
   public void stop() {
-    LOG.info("Stopping Ozone Manager");
+    LOG.info("{}: Stopping Ozone Manager", omNodeDetails.getOMPrintInfo());
     try {
       omState = State.STOPPED;
       // Cancel the metrics timer and set to null.
@@ -3146,10 +3283,26 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     this.configuration = conf;
   }
 
+  public OzoneConfiguration reloadConfiguration() {
+    if (testReloadConfigFlag) {
+      // If this flag is set, do not reload config
+      return this.configuration;
+    }
+    return new OzoneConfiguration();
+  }
+
+  public static void setTestReloadConfigFlag(boolean testReloadConfigFlag) {
+    OzoneManager.testReloadConfigFlag = testReloadConfigFlag;
+  }
+
   public static void setTestSecureOmFlag(boolean testSecureOmFlag) {
     OzoneManager.testSecureOmFlag = testSecureOmFlag;
   }
 
+  public OMNodeDetails getNodeDetails() {
+    return omNodeDetails;
+  }
+
   public String getOMNodeId() {
     return omNodeDetails.getNodeId();
   }
@@ -3383,11 +3536,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   }
 
   @VisibleForTesting
-  void setExitManagerForTesting(ExitManager exitManagerForTesting) {
-    this.exitManager = exitManagerForTesting;
+  public void setExitManagerForTesting(ExitManager exitManagerForTesting) {
+    exitManager = exitManagerForTesting;
   }
 
-
   public boolean getEnableFileSystemPaths() {
     return configuration.getBoolean(OZONE_OM_ENABLE_FILESYSTEM_PATHS,
         OZONE_OM_ENABLE_FILESYSTEM_PATHS_DEFAULT);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerStarter.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerStarter.java
index e087fdb..b945e88 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerStarter.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerStarter.java
@@ -125,22 +125,38 @@ public class OzoneManagerStarter extends GenericCli {
 
   /**
    * This function implements a sub-command to allow the OM to be bootstrapped
-   * initialized from the command line. After OM is initialized, it will
-   * contact the leader OM to add itself to the ring. Once the leader OM
-   * responds back affirmatively, bootstrap step is complete and the OM is
-   * functional.
+   * from the command line.
+   *
+   * During initialization, OM will get the metadata information from all the
+   * other OMs to check whether their on disk configs have been updated with
+   * this new OM information. If not, the bootstrap step will fail. This
+   * check is skipped with the --force option.
+   * Note that if an OM does not have updated configs, it can crash when a
+   * force bootstrap is initiated. The force option is provided for the
+   * scenario where one of the old OMs is down or not responding and the
+   * bootstrap needs to continue.
+   *
+   * Bootstrapping OM will request the leader OM to add itself to the ring.
+   * Once the leader OM responds back affirmatively, bootstrap step is
+   * complete and the OM is functional.
    */
   @CommandLine.Command(name = "--bootstrap",
-      customSynopsis = "ozone om [global options] --bootstrap",
+      customSynopsis = "ozone om [global options] --bootstrap [options]",
       hidden = false,
-      description = "Initialize if not already initialized and Bootstrap " +
-          "the Ozone Manager",
+      description = "Initializes and Bootstraps the Ozone Manager.",
       mixinStandardHelpOptions = true,
       versionProvider = HddsVersionProvider.class)
-  public void bootstrapOM()
+  public void bootstrapOM(@CommandLine.Option(names = {"--force"},
+      description = "This option will skip checking whether existing OMs " +
+          "configs have been updated with the new OM information. Force " +
+          "bootstrap can cause an existing OM to crash if it does not have " +
+          "updated configs. It should only be used if an existing OM is down " +
+          "or not responding and after manually checking that the ozone-site" +
+          ".xml config is updated on all OMs.",
+      defaultValue = "false") boolean force)
       throws Exception {
     commonInit();
-    receiver.bootstrap(conf);
+    receiver.bootstrap(conf, force);
   }
 
   /**
@@ -186,16 +202,21 @@ public class OzoneManagerStarter extends GenericCli {
     }
 
     @Override
-    public void bootstrap(OzoneConfiguration conf)
+    public void bootstrap(OzoneConfiguration conf, boolean force)
         throws IOException, AuthenticationException {
       // Initialize the Ozone Manager, if not already initialized.
       boolean initialize = OzoneManager.omInit(conf);
       if (!initialize) {
         throw new IOException("OM Init failed.");
       }
+      OzoneManager.StartupOption startupOption;
+      if (force) {
+        startupOption = OzoneManager.StartupOption.FORCE_BOOTSTRAP;
+      } else {
+        startupOption = OzoneManager.StartupOption.BOOTSTRAP;
+      }
       // Bootstrap the OM
-      OzoneManager om = OzoneManager.createOm(conf,
-          OzoneManager.StartupOption.BOOTSTRAP);
+      OzoneManager om = OzoneManager.createOm(conf, startupOption);
       om.start();
       om.join();
     }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMHANodeDetails.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMHANodeDetails.java
index 63fdf2b..41f33f1 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMHANodeDetails.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMHANodeDetails.java
@@ -183,8 +183,6 @@ public class OMHANodeDetails {
         } else {
           // This OMNode belongs to same OM service as the current OMNode.
           // Add it to peerNodes list.
-          // This OMNode belongs to same OM service as the current OMNode.
-          // Add it to peerNodes list.
           peerNodesList.add(getHAOMNodeDetails(conf, serviceId,
               nodeId, addr, ratisPort));
         }
@@ -219,7 +217,7 @@ public class OMHANodeDetails {
       LOG.info("Configuration does not have {} set. Falling back to the " +
           "default OM address {}", OZONE_OM_ADDRESS_KEY, omAddress);
 
-      return new OMHANodeDetails(getOMNodeDetails(conf, null,
+      return new OMHANodeDetails(getOMNodeDetailsForNonHA(conf, null,
           null, omAddress, ratisPort), new ArrayList<>());
 
     } else {
@@ -237,7 +235,7 @@ public class OMHANodeDetails {
    * @param ratisPort - Ratis port of the OM.
    * @return OMNodeDetails
    */
-  public static OMNodeDetails getOMNodeDetails(OzoneConfiguration conf,
+  public static OMNodeDetails getOMNodeDetailsForNonHA(OzoneConfiguration conf,
       String serviceId, String nodeId, InetSocketAddress rpcAddress,
       int ratisPort) {
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index 56b36f0..918d99b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -318,21 +318,15 @@ public final class OzoneManagerRatisServer {
     SetConfigurationRequest request = new SetConfigurationRequest(clientId,
         server.getId(), raftGroupId, nextCallId(), newPeersList);
 
-    try {
-      RaftClientReply raftClientReply = server.setConfiguration(request);
-      if (raftClientReply.isSuccess()) {
-        LOG.info("Added OM {} to Ratis group {}.", newOMNodeId, raftGroupId);
-      } else {
-        LOG.error("Failed to add OM {} to Ratis group {}. Ratis " +
-                "SetConfiguration reply: {}", newOMNodeId, raftGroupId,
-            raftClientReply);
-        throw new IOException("Failed to add OM " + newOMNodeId + " to Ratis " +
-            "ring.");
-      }
-    } catch (IOException e) {
-      LOG.error("Failed to update Ratis configuration and add OM {} to " +
-          "Ratis group {}", newOMNodeId, raftGroupId, e);
-      throw e;
+    RaftClientReply raftClientReply = server.setConfiguration(request);
+    if (raftClientReply.isSuccess()) {
+      LOG.info("Added OM {} to Ratis group {}.", newOMNodeId, raftGroupId);
+    } else {
+      LOG.error("Failed to add OM {} to Ratis group {}. Ratis " +
+              "SetConfiguration reply: {}", newOMNodeId, raftGroupId,
+          raftClientReply);
+      throw new IOException("Failed to add OM " + newOMNodeId + " to Ratis " +
+          "ring.");
     }
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OMAdminProtocolServerSideImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OMAdminProtocolServerSideImpl.java
new file mode 100644
index 0000000..efc9f0b
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OMAdminProtocolServerSideImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.protocolPB;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OMNodeDetails;
+import org.apache.hadoop.ozone.om.protocolPB.OMAdminProtocolPB;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerAdminProtocolProtos.OMConfigurationRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerAdminProtocolProtos.OMConfigurationResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerAdminProtocolProtos.OMNodeInfo;
+
+/**
+ * This class is the server-side translator that forwards requests received on
+ * {@link OMAdminProtocolPB} to the OMAdminProtocolServer implementation.
+ */
+public class OMAdminProtocolServerSideImpl implements OMAdminProtocolPB {
+
+  private final OzoneManager ozoneManager;
+
+  public OMAdminProtocolServerSideImpl(OzoneManager om) {
+    this.ozoneManager = om;
+  }
+
+  @Override
+  public OMConfigurationResponse getOMConfiguration(RpcController controller,
+      OMConfigurationRequest request) throws ServiceException {
+
+    List<OMNodeDetails> oldOMNodesList = ozoneManager.getAllOMNodesInMemory();
+    List<OMNodeDetails> newOMNodesList = ozoneManager.getAllOMNodesInNewConf();
+
+    List<OMNodeInfo> omNodesInMemory = new ArrayList<>(oldOMNodesList.size());
+    for (OMNodeDetails omNodeDetails : oldOMNodesList) {
+      omNodesInMemory.add(omNodeDetails.getProtobuf());
+    }
+
+    List<OMNodeInfo> omNodesInNewConf =
+        new ArrayList<>(newOMNodesList.size());
+    for (OMNodeDetails omNodeDetails : newOMNodesList) {
+      omNodesInNewConf.add(omNodeDetails.getProtobuf());
+    }
+
+    return OMConfigurationResponse.newBuilder()
+        .setSuccess(true)
+        .addAllNodesInMemory(omNodesInMemory)
+        .addAllNodesInNewConf(omNodesInNewConf)
+        .build();
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerStarter.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerStarter.java
index e140fd5..fb42efa 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerStarter.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerStarter.java
@@ -178,8 +178,8 @@ public class TestOzoneManagerStarter {
     }
 
     @Override
-    public void bootstrap(OzoneConfiguration conf) throws IOException,
-        AuthenticationException {
+    public void bootstrap(OzoneConfiguration conf, boolean force)
+        throws IOException, AuthenticationException {
       //TODO: Add test for bootstrap
     }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org