You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by bh...@apache.org on 2021/04/08 06:41:32 UTC

[ozone] branch master updated: HDDS-5052. [SCM HA Security] Handle leader changes between SCMInfo and getSCMSigned Cert in OM/SCM. (#2100)

This is an automated email from the ASF dual-hosted git repository.

bharat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b57765  HDDS-5052. [SCM HA Security] Handle leader changes between SCMInfo and getSCMSigned Cert in OM/SCM. (#2100)
6b57765 is described below

commit 6b57765fb3a90ae09b0ba4a42e9890b969ab2cfe
Author: Bharat Viswanadham <bh...@apache.org>
AuthorDate: Thu Apr 8 12:10:20 2021 +0530

    HDDS-5052. [SCM HA Security] Handle leader changes between SCMInfo and getSCMSigned Cert in OM/SCM. (#2100)
---
 .../certificate/authority/DefaultApprover.java     |  2 +-
 .../apache/hadoop/hdds/scm/ha/HASecurityUtils.java | 37 ++++++++-------
 .../hdds/scm/server/StorageContainerManager.java   | 55 +++++++++++-----------
 .../apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java  |  1 -
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |  5 +-
 .../apache/hadoop/ozone/TestDelegationToken.java   |  1 -
 .../hadoop/ozone/TestSecureOzoneCluster.java       |  6 +--
 .../ozone/om/TestOzoneManagerConfiguration.java    |  1 -
 .../hadoop/ozone/om/TestSecureOzoneManager.java    |  4 +-
 .../java/org/apache/hadoop/ozone/om/OMStorage.java | 18 -------
 .../org/apache/hadoop/ozone/om/OzoneManager.java   | 55 ++++++++++++----------
 .../apache/hadoop/ozone/genesis/GenesisUtil.java   |  1 -
 12 files changed, 80 insertions(+), 106 deletions(-)

diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultApprover.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultApprover.java
index 35122c7..01cbd07 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultApprover.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultApprover.java
@@ -111,7 +111,7 @@ public class DefaultApprover extends BaseApprover {
     String csrClusterId = x500Name.getRDNs(BCStyle.O)[0].getFirst().getValue().
         toASN1Primitive().toString();
 
-    if (!scmId.equals(csrScmId) || !clusterId.equals(csrClusterId)) {
+    if (!clusterId.equals(csrClusterId)) {
       if (csrScmId.equalsIgnoreCase("null") &&
           csrClusterId.equalsIgnoreCase("null")) {
         // Special case to handle DN certificate generation as DN might not know
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/HASecurityUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/HASecurityUtils.java
index a5d5ff7..70ff2b6 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/HASecurityUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/HASecurityUtils.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateSer
 import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
 import org.apache.hadoop.hdds.security.x509.certificate.authority.DefaultCAServer;
 import org.apache.hadoop.hdds.security.x509.certificate.authority.PKIProfiles.DefaultCAProfile;
+import org.apache.hadoop.hdds.security.x509.certificate.authority.PKIProfiles.PKIProfile;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient.InitResponse;
 import org.apache.hadoop.hdds.security.x509.certificate.client.SCMCertificateClient;
@@ -78,13 +79,12 @@ public final class HASecurityUtils {
    * Initialize Security which generates public, private key pair and get SCM
    * signed certificate and persist to local disk.
    * @param scmStorageConfig
-   * @param fetchedScmId
    * @param conf
    * @param scmAddress
    * @throws IOException
    */
   public static void initializeSecurity(SCMStorageConfig scmStorageConfig,
-      String fetchedScmId, OzoneConfiguration conf,
+      OzoneConfiguration conf,
       InetSocketAddress scmAddress, boolean primaryscm)
       throws IOException {
     LOG.info("Initializing secure StorageContainerManager.");
@@ -99,11 +99,11 @@ public final class HASecurityUtils {
       break;
     case GETCERT:
       if (!primaryscm) {
-        getRootCASignedSCMCert(certClient, conf, fetchedScmId, scmStorageConfig,
+        getRootCASignedSCMCert(certClient, conf, scmStorageConfig,
             scmAddress);
       } else {
-        getPrimarySCMSelfSignedCert(certClient, conf, fetchedScmId,
-            scmStorageConfig, scmAddress);
+        getPrimarySCMSelfSignedCert(certClient, conf, scmStorageConfig,
+            scmAddress);
       }
       LOG.info("Successfully stored SCM signed certificate.");
       break;
@@ -127,18 +127,18 @@ public final class HASecurityUtils {
    * client.
    */
   private static void getRootCASignedSCMCert(CertificateClient client,
-      OzoneConfiguration config, String fetchedSCMId,
+      OzoneConfiguration config,
       SCMStorageConfig scmStorageConfig, InetSocketAddress scmAddress) {
     try {
       // Generate CSR.
       PKCS10CertificationRequest csr = generateCSR(client, scmStorageConfig,
-          config, scmAddress, fetchedSCMId);
+          config, scmAddress);
 
       ScmNodeDetailsProto scmNodeDetailsProto =
           ScmNodeDetailsProto.newBuilder()
               .setClusterId(scmStorageConfig.getClusterID())
               .setHostName(scmAddress.getHostName())
-              .setScmNodeId(fetchedSCMId).build();
+              .setScmNodeId(scmStorageConfig.getScmId()).build();
 
       // Create SCM security client.
       SCMSecurityProtocolClientSideTranslatorPB secureScmClient =
@@ -179,16 +179,17 @@ public final class HASecurityUtils {
    * root CA certificate server and store it using certificate client.
    */
   private static void getPrimarySCMSelfSignedCert(CertificateClient client,
-      OzoneConfiguration config, String fetchedSCMId,
-      SCMStorageConfig scmStorageConfig, InetSocketAddress scmAddress) {
+      OzoneConfiguration config, SCMStorageConfig scmStorageConfig,
+      InetSocketAddress scmAddress) {
 
     try {
 
       CertificateServer rootCAServer =
-          initializeRootCertificateServer(config, null, scmStorageConfig);
+          initializeRootCertificateServer(config, null, scmStorageConfig,
+              new DefaultCAProfile());
 
       PKCS10CertificationRequest csr = generateCSR(client, scmStorageConfig,
-          config, scmAddress, fetchedSCMId);
+          config, scmAddress);
 
       X509CertificateHolder subSCMCertHolder = rootCAServer.
           requestCertificate(csr, KERBEROS_TRUSTED, SCM).get();
@@ -231,14 +232,14 @@ public final class HASecurityUtils {
    */
   public static CertificateServer initializeRootCertificateServer(
       OzoneConfiguration config, CertificateStore scmCertStore,
-      SCMStorageConfig scmStorageConfig)
+      SCMStorageConfig scmStorageConfig, PKIProfile pkiProfile)
       throws IOException {
     String subject = SCM_ROOT_CA_PREFIX +
         InetAddress.getLocalHost().getHostName();
 
     DefaultCAServer rootCAServer = new DefaultCAServer(subject,
         scmStorageConfig.getClusterID(),
-        scmStorageConfig.getScmId(), scmCertStore, new DefaultCAProfile(),
+        scmStorageConfig.getScmId(), scmCertStore, pkiProfile,
         SCM_ROOT_CA_COMPONENT_NAME);
 
     rootCAServer.init(new SecurityConfig(config),
@@ -252,8 +253,8 @@ public final class HASecurityUtils {
    */
   private static PKCS10CertificationRequest generateCSR(
       CertificateClient client, SCMStorageConfig scmStorageConfig,
-      OzoneConfiguration config, InetSocketAddress scmAddress,
-      String fetchedSCMId) throws IOException {
+      OzoneConfiguration config, InetSocketAddress scmAddress)
+      throws IOException {
     CertificateSignRequest.Builder builder = client.getCSRBuilder();
     KeyPair keyPair = new KeyPair(client.getPublicKey(),
         client.getPrivateKey());
@@ -265,13 +266,13 @@ public final class HASecurityUtils {
 
     builder.setKey(keyPair)
         .setConfiguration(config)
-        .setScmID(fetchedSCMId)
+        .setScmID(scmStorageConfig.getScmId())
         .setClusterID(scmStorageConfig.getClusterID())
         .setSubject(subject);
 
 
     LOG.info("Creating csr for SCM->hostName:{},scmId:{},clusterId:{}," +
-            "subject:{}", hostname, fetchedSCMId,
+            "subject:{}", hostname, scmStorageConfig.getScmId(),
         scmStorageConfig.getClusterID(), subject);
 
     return builder.build();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 76c87b3..0e0f95c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -37,9 +37,29 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl;
+import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
+import org.apache.hadoop.hdds.scm.ha.HASecurityUtils;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManagerImpl;
+import org.apache.hadoop.hdds.scm.ha.SCMHANodeDetails;
+import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
+import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
+import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
+import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
+import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
+import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
+import org.apache.hadoop.hdds.security.x509.certificate.authority.PKIProfiles.DefaultCAProfile;
+import org.apache.hadoop.hdds.security.x509.certificate.authority.PKIProfiles.DefaultProfile;
+import org.apache.hadoop.hdds.security.x509.certificate.client.SCMCertificateClient;
+import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
+import org.apache.hadoop.hdds.utils.HAUtils;
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdds.scm.ScmConfig;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.block.BlockManager;
 import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
 import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImplV2;
@@ -49,8 +69,6 @@ import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
 import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl;
-import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
 import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
 import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
 import org.apache.hadoop.hdds.scm.container.ReplicationManager;
@@ -62,17 +80,6 @@ import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
-import org.apache.hadoop.hdds.scm.ha.HASecurityUtils;
-import org.apache.hadoop.hdds.scm.ha.SCMContext;
-import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
-import org.apache.hadoop.hdds.scm.ha.SCMHAManagerImpl;
-import org.apache.hadoop.hdds.scm.ha.SCMHANodeDetails;
-import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
-import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
-import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
-import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
-import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
-import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
@@ -94,16 +101,10 @@ import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.PipelineChoosePolic
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateServer;
-import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
 import org.apache.hadoop.hdds.security.x509.certificate.authority.DefaultCAServer;
-import org.apache.hadoop.hdds.security.x509.certificate.authority.PKIProfiles.DefaultProfile;
-import org.apache.hadoop.hdds.security.x509.certificate.client.SCMCertificateClient;
-import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
 import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
-import org.apache.hadoop.hdds.utils.HAUtils;
-import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdds.utils.HddsVersionInfo;
 import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
 import org.apache.hadoop.io.IOUtils;
@@ -622,7 +623,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
         } else {
           rootCertificateServer =
               HASecurityUtils.initializeRootCertificateServer(
-              conf, certificateStore, scmStorageConfig);
+              conf, certificateStore, scmStorageConfig, new DefaultCAProfile());
         }
         persistPrimarySCMCerts();
       } else {
@@ -635,7 +636,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
       // one CA server which is issuing certificates to DN and OM.
       rootCertificateServer =
           HASecurityUtils.initializeRootCertificateServer(conf,
-              certificateStore, scmStorageConfig);
+              certificateStore, scmStorageConfig, new DefaultProfile());
       scmCertificateServer = rootCertificateServer;
     }
 
@@ -868,9 +869,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
         // to existing SCM ring post node regular start up.
 
         if(OzoneSecurityUtil.isSecurityEnabled(conf)) {
-          HASecurityUtils.initializeSecurity(scmStorageConfig,
-              scmInfo.getScmId(), config, getScmAddress(scmhaNodeDetails, conf),
-              false);
+          HASecurityUtils.initializeSecurity(scmStorageConfig, config,
+              getScmAddress(scmhaNodeDetails, conf), false);
         }
         scmStorageConfig.setPrimaryScmNodeId(scmInfo.getScmId());
         scmStorageConfig.initialize();
@@ -924,9 +924,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
         }
 
         if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
-          HASecurityUtils.initializeSecurity(scmStorageConfig,
-              scmStorageConfig.getScmId(), conf, getScmAddress(haDetails,
-                  conf), true);
+          HASecurityUtils.initializeSecurity(scmStorageConfig, conf,
+              getScmAddress(haDetails, conf), true);
         }
         scmStorageConfig.setPrimaryScmNodeId(scmStorageConfig.getScmId());
         scmStorageConfig.initialize();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java
index e13b50e..6c4ead2 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java
@@ -102,7 +102,6 @@ public class TestOzoneFsHAURLs {
 
     OMStorage omStore = new OMStorage(conf);
     omStore.setClusterId(clusterId);
-    omStore.setScmId(scmId);
     // writes the version file properties
     omStore.initialize();
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index ec51c42..b57d49e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -725,13 +725,10 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
         return;
       }
       omStorage.setClusterId(clusterId);
-      if (scmId.isPresent()) {
-        omStorage.setScmId(scmId.get());
-      }
       omStorage.setOmId(omId.orElse(UUID.randomUUID().toString()));
       // Initialize ozone certificate client if security is enabled.
       if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
-        OzoneManager.initializeSecurity(conf, omStorage);
+        OzoneManager.initializeSecurity(conf, omStorage, scmId.get());
       }
       omStorage.initialize();
     }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java
index 43221af..fc26a67 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestDelegationToken.java
@@ -391,7 +391,6 @@ public final class TestDelegationToken {
   private void setupOm(OzoneConfiguration config) throws Exception {
     OMStorage omStore = new OMStorage(config);
     omStore.setClusterId("testClusterId");
-    omStore.setScmId("testScmId");
     omStore.setOmCertSerialId(OM_CERT_SERIAL_ID);
     // writes the version file properties
     omStore.initialize();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
index 33d1c1c..f2f5235 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
@@ -341,7 +341,7 @@ public final class TestSecureOzoneCluster {
     SCMStorageConfig scmStore = new SCMStorageConfig(conf);
     scmStore.setClusterId(clusterId);
     scmStore.setScmId(scmId);
-    HASecurityUtils.initializeSecurity(scmStore, scmId, conf,
+    HASecurityUtils.initializeSecurity(scmStore, conf,
         NetUtils.createSocketAddr(InetAddress.getLocalHost().getHostName(),
             OZONE_SCM_CLIENT_PORT_DEFAULT), true);
     scmStore.setPrimaryScmNodeId(scmId);
@@ -571,7 +571,6 @@ public final class TestSecureOzoneCluster {
   private void setupOm(OzoneConfiguration config) throws Exception {
     OMStorage omStore = new OMStorage(config);
     omStore.setClusterId("testClusterId");
-    omStore.setScmId("testScmId");
     omStore.setOmCertSerialId(OM_CERT_SERIAL_ID);
     // writes the version file properties
     omStore.initialize();
@@ -772,11 +771,10 @@ public final class TestSecureOzoneCluster {
       return;
     }
     omStorage.setClusterId(clusterId);
-    omStorage.setScmId(scmId);
     omStorage.setOmId(omId);
     // Initialize ozone certificate client if security is enabled.
     if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
-      OzoneManager.initializeSecurity(conf, omStorage);
+      OzoneManager.initializeSecurity(conf, omStorage, scmId);
     }
     omStorage.initialize();
   }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerConfiguration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerConfiguration.java
index c06c504..46a9752 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerConfiguration.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerConfiguration.java
@@ -84,7 +84,6 @@ public class TestOzoneManagerConfiguration {
 
     OMStorage omStore = new OMStorage(conf);
     omStore.setClusterId("testClusterId");
-    omStore.setScmId("testScmId");
     // writes the version file properties
     omStore.initialize();
   }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java
index a177bae..457df3a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java
@@ -116,7 +116,6 @@ public class TestSecureOzoneManager {
         LogCapturer.captureLogs(OzoneManager.getLogger());
     OMStorage omStorage = new OMStorage(conf);
     omStorage.setClusterId(clusterId);
-    omStorage.setScmId(scmId);
     omStorage.setOmId(omId);
     omLogs.clearOutput();
 
@@ -206,12 +205,11 @@ public class TestSecureOzoneManager {
     OzoneConfiguration config = new OzoneConfiguration(conf);
     OMStorage omStorage = new OMStorage(config);
     omStorage.setClusterId(clusterId);
-    omStorage.setScmId(scmId);
     omStorage.setOmId(omId);
     config.set(OZONE_OM_ADDRESS_KEY, "om-unknown");
     LambdaTestUtils.intercept(RuntimeException.class, "Can't get SCM signed" +
             " certificate",
-        () -> OzoneManager.initializeSecurity(config, omStorage));
+        () -> OzoneManager.initializeSecurity(config, omStorage, scmId));
   }
 
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStorage.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStorage.java
index c527af9..3f9d2de 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStorage.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStorage.java
@@ -28,8 +28,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
 import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.ozone.common.Storage;
 
-import static org.apache.hadoop.ozone.OzoneConsts.SCM_ID;
-
 /**
  * OMStorage is responsible for management of the StorageDirectories used by
  * the Ozone Manager.
@@ -48,14 +46,6 @@ public class OMStorage extends Storage {
     super(NodeType.OM, getOmDbDir(conf), STORAGE_DIR);
   }
 
-  public void setScmId(String scmId) throws IOException {
-    if (getState() == StorageState.INITIALIZED) {
-      throw new IOException("OM is already initialized.");
-    } else {
-      getStorageInfo().setProperty(SCM_ID, scmId);
-    }
-  }
-
   public void setOmCertSerialId(String certSerialId) throws IOException {
     getStorageInfo().setProperty(OM_CERT_SERIAL_ID, certSerialId);
   }
@@ -69,14 +59,6 @@ public class OMStorage extends Storage {
   }
 
   /**
-   * Retrieves the SCM ID from the version file.
-   * @return SCM_ID
-   */
-  public String getScmId() {
-    return getStorageInfo().getProperty(SCM_ID);
-  }
-
-  /**
    * Retrieves the OM ID from the version file.
    * @return OM_ID
    */
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index d56499b..74d0fbd 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslator
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
@@ -186,11 +187,11 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERV
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT;
-import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients;
 import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients;
 import static org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest.getEncodedString;
 import static org.apache.hadoop.hdds.server.ServerUtils.getRemoteUserName;
 import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
+import static org.apache.hadoop.hdds.utils.HAUtils.getScmInfo;
 import static org.apache.hadoop.ozone.OmUtils.MAX_TRXN_ID;
 import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT;
@@ -403,9 +404,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     // For testing purpose only, not hit scm from om as Hadoop UGI can't login
     // two principals in the same JVM.
     if (!testSecureOmFlag) {
-      ScmInfo scmInfo = HAUtils.getScmInfo(configuration);
-      if (!(scmInfo.getClusterId().equals(omStorage.getClusterID()) && scmInfo
-          .getScmId().equals(omStorage.getScmId()))) {
+      ScmInfo scmInfo = getScmInfo(configuration);
+      if (!scmInfo.getClusterId().equals(omStorage.getClusterID())) {
         logVersionMismatch(conf, scmInfo);
         throw new OMException("SCM version info mismatch.",
             ResultCodes.SCM_VERSION_MISMATCH_ERROR);
@@ -495,18 +495,22 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   }
 
   private void logVersionMismatch(OzoneConfiguration conf, ScmInfo scmInfo) {
-    InetSocketAddress scmBlockAddress =
-        getScmAddressForBlockClients(conf);
+    List<SCMNodeInfo> scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf);
+    StringBuilder scmBlockAddressBuilder = new StringBuilder("");
+    for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) {
+      scmBlockAddressBuilder.append(scmNodeInfo.getBlockClientAddress())
+          .append(",");
+    }
+    String scmBlockAddress = scmBlockAddressBuilder.toString();
+    if (!StringUtils.isBlank(scmBlockAddress)) {
+      scmBlockAddress = scmBlockAddress.substring(0,
+          scmBlockAddress.lastIndexOf(","));
+    }
     if (!scmInfo.getClusterId().equals(omStorage.getClusterID())) {
       LOG.error("clusterId from {} is {}, but is {} in {}",
           scmBlockAddress, scmInfo.getClusterId(),
           omStorage.getClusterID(), omStorage.getVersionFile());
     }
-    if (!scmInfo.getScmId().equals(omStorage.getScmId())) {
-      LOG.error("scmId from {} is {}, but is {} in {}",
-          scmBlockAddress, scmInfo.getScmId(),
-          omStorage.getScmId(), omStorage.getVersionFile());
-    }
   }
 
   /**
@@ -930,7 +934,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     StorageState state = omStorage.getState();
     if (state != StorageState.INITIALIZED) {
       try {
-        ScmInfo scmInfo = HAUtils.getScmInfo(conf);
+        ScmInfo scmInfo = getScmInfo(conf);
         String clusterId = scmInfo.getClusterId();
         String scmId = scmInfo.getScmId();
         if (clusterId == null || clusterId.isEmpty()) {
@@ -940,9 +944,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
           throw new IOException("Invalid SCM ID");
         }
         omStorage.setClusterId(clusterId);
-        omStorage.setScmId(scmId);
         if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
-          initializeSecurity(conf, omStorage);
+          initializeSecurity(conf, omStorage, scmId);
         }
         omStorage.initialize();
         System.out.println(
@@ -959,8 +962,13 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     } else {
       if (OzoneSecurityUtil.isSecurityEnabled(conf) &&
           omStorage.getOmCertSerialId() == null) {
+        ScmInfo scmInfo = HAUtils.getScmInfo(conf);
+        String scmId = scmInfo.getScmId();
+        if (scmId == null || scmId.isEmpty()) {
+          throw new IOException("Invalid SCM ID");
+        }
         LOG.info("OM storage is already initialized. Initializing security");
-        initializeSecurity(conf, omStorage);
+        initializeSecurity(conf, omStorage, scmId);
         omStorage.persistCurrentState();
       }
       System.out.println(
@@ -977,7 +985,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
    */
   @VisibleForTesting
   public static void initializeSecurity(OzoneConfiguration conf,
-      OMStorage omStore)
+      OMStorage omStore, String scmId)
       throws IOException {
     LOG.info("Initializing secure OzoneManager.");
 
@@ -991,7 +999,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       LOG.info("Initialization successful.");
       break;
     case GETCERT:
-      getSCMSignedCert(certClient, conf, omStore);
+      getSCMSignedCert(certClient, conf, omStore, scmId);
       LOG.info("Successfully stored SCM signed certificate.");
       break;
     case FAILURE:
@@ -1028,11 +1036,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   }
 
   @VisibleForTesting
-  public ScmInfo getScmInfo() throws IOException {
-    return scmBlockClient.getScmInfo();
-  }
-
-  @VisibleForTesting
   public OMStorage getOmStorage() {
     return omStorage;
   }
@@ -1397,7 +1400,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
    * Get SCM signed certificate and store it using certificate client.
    */
   private static void getSCMSignedCert(CertificateClient client,
-      OzoneConfiguration config, OMStorage omStore) throws IOException {
+      OzoneConfiguration config, OMStorage omStore, String scmId)
+      throws IOException {
     CertificateSignRequest.Builder builder = client.getCSRBuilder();
     KeyPair keyPair = new KeyPair(client.getPublicKey(),
         client.getPrivateKey());
@@ -1425,7 +1429,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     builder.setCA(false)
         .setKey(keyPair)
         .setConfiguration(config)
-        .setScmID(omStore.getScmId())
+        .setScmID(scmId)
         .setClusterID(omStore.getClusterID())
         .setSubject(subject);
 
@@ -1437,8 +1441,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     }
 
     LOG.info("Creating csr for OM->dns:{},ip:{},scmId:{},clusterId:{}," +
-            "subject:{}", hostname, ip,
-        omStore.getScmId(), omStore.getClusterID(), subject);
+            "subject:{}", hostname, ip, scmId, omStore.getClusterID(), subject);
 
     HddsProtos.OzoneManagerDetailsProto.Builder omDetailsProtoBuilder =
         HddsProtos.OzoneManagerDetailsProto.newBuilder()
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/GenesisUtil.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/GenesisUtil.java
index 7071a6f..93e1d2d 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/GenesisUtil.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/GenesisUtil.java
@@ -180,7 +180,6 @@ public final class GenesisUtil {
     SCMStorageConfig scmStore = new SCMStorageConfig(conf);
     if (omStorage.getState() != Storage.StorageState.INITIALIZED) {
       omStorage.setClusterId(scmStore.getClusterID());
-      omStorage.setScmId(scmStore.getScmId());
       omStorage.setOmId(UUID.randomUUID().toString());
       omStorage.initialize();
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org