You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by bh...@apache.org on 2021/02/26 16:54:43 UTC

[ozone] branch HDDS-2823 updated: HDDS-4861. [SCM HA Security] Implement generate SCM certificate. (#1958)

This is an automated email from the ASF dual-hosted git repository.

bharat pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-2823 by this push:
     new 8e56906  HDDS-4861. [SCM HA Security] Implement generate SCM certificate. (#1958)
8e56906 is described below

commit 8e569062d94d2b827e49ad2ac0e4cb89e8339c93
Author: Bharat Viswanadham <bh...@apache.org>
AuthorDate: Fri Feb 26 22:24:29 2021 +0530

    HDDS-4861. [SCM HA Security] Implement generate SCM certificate. (#1958)
---
 .../hadoop/hdds/protocol/SCMSecurityProtocol.java  | 13 ++++
 .../SCMSecurityProtocolClientSideTranslatorPB.java | 37 +++++++++++
 .../certificate/authority/CertificateServer.java   | 10 +--
 .../certificate/authority/CertificateStore.java    |  9 +++
 .../certificate/authority/DefaultCAServer.java     | 31 +++++++---
 .../x509/certificate/authority/MockCAStore.java    |  7 +++
 .../certificate/authority/TestDefaultCAServer.java |  8 ++-
 .../interface-client/src/main/proto/hdds.proto     |  6 ++
 .../src/main/proto/ScmServerSecurityProtocol.proto |  7 +++
 .../hadoop/hdds/scm/metadata/SCMDBDefinition.java  | 11 +++-
 .../hadoop/hdds/scm/metadata/SCMMetadataStore.java |  9 +++
 .../hdds/scm/metadata/SCMMetadataStoreImpl.java    | 13 ++++
 .../SCMSecurityProtocolServerSideTranslatorPB.java | 32 ++++++++++
 .../hadoop/hdds/scm/server/SCMCertStore.java       | 37 +++++++++--
 .../hdds/scm/server/SCMSecurityProtocolServer.java | 71 ++++++++++++++++------
 .../hdds/scm/server/StorageContainerManager.java   |  2 +-
 ...TestSCMStoreImplWithOldPipelineIDKeyFormat.java |  5 ++
 .../scm/server/TestSCMSecurityProtocolServer.java  |  2 +-
 18 files changed, 265 insertions(+), 45 deletions(-)

diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SCMSecurityProtocol.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SCMSecurityProtocol.java
index 52dc033..4a59570 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SCMSecurityProtocol.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SCMSecurityProtocol.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.OzoneManagerDetailsProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmNodeDetailsProto;
 import org.apache.hadoop.hdds.scm.ScmConfig;
 import org.apache.hadoop.security.KerberosInfo;
 
@@ -62,6 +63,18 @@ public interface SCMSecurityProtocol {
   String getOMCertificate(OzoneManagerDetailsProto omDetails,
       String certSignReq) throws IOException;
 
+
+  /**
+   * Get signed certificate for SCM.
+   *
+   * @param scmNodeDetails  - SCM Node Details.
+   * @param certSignReq     - Certificate signing request.
+   * @return String         - pem encoded SCM signed
+   *                          certificate.
+   */
+  String getSCMCertificate(ScmNodeDetailsProto scmNodeDetails,
+      String certSignReq) throws IOException;
+
   /**
    * Get SCM signed certificate for given certificate serial id if it exists.
    * Throws exception if it's not found.
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
index aeef50e..672b95e 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
@@ -25,7 +25,9 @@ import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.OzoneManagerDetailsProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmNodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetSCMCertRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCACertificateRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertificateRequestProto;
@@ -130,6 +132,41 @@ public class SCMSecurityProtocolClientSideTranslatorPB implements
   }
 
   /**
+   * Get signed certificate for SCM node.
+   *
+   * @param scmNodeDetails  - SCM Node Details.
+   * @param certSignReq     - Certificate signing request.
+   * @return String         - pem encoded SCM signed
+   *                          certificate.
+   */
+  public String getSCMCertificate(ScmNodeDetailsProto scmNodeDetails,
+      String certSignReq) throws IOException {
+    return getSCMCertChain(scmNodeDetails, certSignReq).getX509Certificate();
+  }
+
+
+  /**
+   * Get signed certificate for SCM node and root CA certificate.
+   *
+   * @param scmNodeDetails   - SCM Node Details.
+   * @param certSignReq      - Certificate signing request.
+   * @return SCMGetCertResponseProto  - SCMGetCertResponseProto which holds
+   * signed certificate and root CA certificate.
+   */
+  public SCMGetCertResponseProto getSCMCertChain(
+      ScmNodeDetailsProto scmNodeDetails, String certSignReq)
+      throws IOException {
+    SCMGetSCMCertRequestProto request =
+        SCMGetSCMCertRequestProto.newBuilder()
+            .setCSR(certSignReq)
+            .setScmDetails(scmNodeDetails)
+            .build();
+    return submitRequest(Type.GetSCMCertificate,
+        builder -> builder.setGetSCMCertificateRequest(request))
+        .getGetCertResponseProto();
+  }
+
+  /**
    * Get SCM signed certificate for OM.
    *
    * @param omDetails   - OzoneManager Details.
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateServer.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateServer.java
index 76512c5..80ad04d 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateServer.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateServer.java
@@ -19,7 +19,7 @@
 
 package org.apache.hadoop.hdds.security.x509.certificate.authority;
 
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateApprover.ApprovalType;
@@ -76,13 +76,14 @@ public interface CertificateServer {
    *
    * @param csr  - Certificate Signing Request.
    * @param type - An Enum which says what kind of approval process to follow.
+   * @param nodeType: OM/SCM/DN
    * @return A future that will have this certificate when this request is
    * approved.
    * @throws SCMSecurityException - on Error.
    */
   Future<X509CertificateHolder> requestCertificate(
       PKCS10CertificationRequest csr,
-      CertificateApprover.ApprovalType type)
+      CertificateApprover.ApprovalType type, NodeType nodeType)
       throws SCMSecurityException;
 
 
@@ -91,12 +92,13 @@ public interface CertificateServer {
    *
    * @param csr - Certificate Signing Request as a PEM encoded String.
    * @param type - An Enum which says what kind of approval process to follow.
+   * @param nodeType: OM/SCM/DN
    * @return A future that will have this certificate when this request is
    * approved.
    * @throws SCMSecurityException - on Error.
    */
   Future<X509CertificateHolder> requestCertificate(String csr,
-      ApprovalType type) throws IOException;
+      ApprovalType type, NodeType nodeType) throws IOException;
 
   /**
    * Revokes a Certificate issued by this CertificateServer.
@@ -122,7 +124,7 @@ public interface CertificateServer {
    * @return
    * @throws IOException
    */
-  List<X509Certificate> listCertificate(HddsProtos.NodeType type,
+  List<X509Certificate> listCertificate(NodeType type,
       long startSerialId, int count, boolean isRevoked) throws IOException;
 
   /**
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateStore.java
index 3ddb640..885d8e3 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateStore.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateStore.java
@@ -47,6 +47,15 @@ public interface CertificateStore {
                              X509Certificate certificate) throws IOException;
 
   /**
+   * Writes a new SCM certificate that was issued to the persistent store.
+   * @param serialID - Certificate Serial Number.
+   * @param certificate - Certificate to persist.
+   * @throws IOException - on Failure.
+   */
+  void storeValidScmCertificate(BigInteger serialID,
+      X509Certificate certificate) throws IOException;
+
+  /**
    * Moves a certificate in a transactional manner from valid certificate to
    * revoked certificate state.
    * @param serialID - Serial ID of the certificate.
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
index 0523209..80c2d90 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
@@ -22,7 +22,7 @@ package org.apache.hadoop.hdds.security.x509.certificate.authority;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.commons.validator.routines.DomainValidator;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.authority.PKIProfiles.DefaultProfile;
@@ -206,7 +206,7 @@ public class DefaultCAServer implements CertificateServer {
   @Override
   public Future<X509CertificateHolder> requestCertificate(
       PKCS10CertificationRequest csr,
-      CertificateApprover.ApprovalType approverType) {
+      CertificateApprover.ApprovalType approverType, NodeType nodeType) {
     LocalDate beginDate = LocalDate.now().atStartOfDay().toLocalDate();
     LocalDateTime temp = LocalDateTime.of(beginDate, LocalTime.MIDNIGHT);
     LocalDate endDate =
@@ -231,12 +231,12 @@ public class DefaultCAServer implements CertificateServer {
       case TESTING_AUTOMATIC:
         X509CertificateHolder xcert;
         try {
-          xcert = signAndStoreCertificate(beginDate, endDate, csr);
+          xcert = signAndStoreCertificate(beginDate, endDate, csr, nodeType);
         } catch (SCMSecurityException e) {
           // Certificate with conflicting serial id, retry again may resolve
           // this issue.
           LOG.error("Certificate storage failed, retrying one more time.", e);
-          xcert = signAndStoreCertificate(beginDate, endDate, csr);
+          xcert = signAndStoreCertificate(beginDate, endDate, csr, nodeType);
         }
 
         xcertHolder.complete(xcert);
@@ -252,23 +252,34 @@ public class DefaultCAServer implements CertificateServer {
   }
 
   private X509CertificateHolder signAndStoreCertificate(LocalDate beginDate,
-      LocalDate endDate, PKCS10CertificationRequest csr) throws IOException,
+      LocalDate endDate, PKCS10CertificationRequest csr, NodeType nodeType)
+      throws IOException,
       OperatorCreationException, CertificateException {
     X509CertificateHolder xcert = approver.sign(config,
         getCAKeys().getPrivate(),
         getCACertificate(), java.sql.Date.valueOf(beginDate),
         java.sql.Date.valueOf(endDate), csr, scmID, clusterID);
-    store.storeValidCertificate(xcert.getSerialNumber(),
-        CertificateCodec.getX509Certificate(xcert));
+    if (nodeType.equals(NodeType.SCM)) {
+      // If the role is SCM, store certificate in scm cert table and valid cert
+      // table. This is to help to return scm certs during getCertificate call.
+      store.storeValidScmCertificate(xcert.getSerialNumber(),
+          CertificateCodec.getX509Certificate(xcert));
+    } else {
+      // As we don't have different table for other roles, other role
+      // certificates will go to validCertsTable.
+      store.storeValidCertificate(xcert.getSerialNumber(),
+          CertificateCodec.getX509Certificate(xcert));
+    }
     return xcert;
   }
 
   @Override
   public Future<X509CertificateHolder> requestCertificate(String csr,
-      CertificateApprover.ApprovalType type) throws IOException {
+      CertificateApprover.ApprovalType type, NodeType nodeType)
+      throws IOException {
     PKCS10CertificationRequest request =
         getCertificationRequest(csr);
-    return requestCertificate(request, type);
+    return requestCertificate(request, type, nodeType);
   }
 
   @Override
@@ -300,7 +311,7 @@ public class DefaultCAServer implements CertificateServer {
    * @throws IOException
    */
   @Override
-  public List<X509Certificate> listCertificate(HddsProtos.NodeType role,
+  public List<X509Certificate> listCertificate(NodeType role,
       long startSerialId, int count, boolean isRevoked) throws IOException {
     return store.listCertificate(role, BigInteger.valueOf(startSerialId), count,
         isRevoked? CertificateStore.CertType.REVOKED_CERTS :
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/MockCAStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/MockCAStore.java
index 633ae19..b5af29e 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/MockCAStore.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/MockCAStore.java
@@ -39,6 +39,13 @@ public class MockCAStore implements CertificateStore {
   }
 
   @Override
+  public void storeValidScmCertificate(BigInteger serialID,
+      X509Certificate certificate)
+      throws IOException {
+
+  }
+
+  @Override
   public void revokeCertificate(BigInteger serialID) throws IOException {
 
   }
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/TestDefaultCAServer.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/TestDefaultCAServer.java
index b203305..23eeaa5 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/TestDefaultCAServer.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/TestDefaultCAServer.java
@@ -43,6 +43,8 @@ import java.util.concurrent.Future;
 import java.util.function.Consumer;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType.OM;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType.SCM;
 import static org.junit.Assert.*;
 
 /**
@@ -165,7 +167,7 @@ public class TestDefaultCAServer {
         CertificateServer.CAType.SELF_SIGNED_CA);
 
     Future<X509CertificateHolder> holder = testCA.requestCertificate(csrString,
-        CertificateApprover.ApprovalType.TESTING_AUTOMATIC);
+        CertificateApprover.ApprovalType.TESTING_AUTOMATIC, SCM);
     // Right now our calls are synchronous. Eventually this will have to wait.
     assertTrue(holder.isDone());
     assertNotNull(holder.get());
@@ -207,7 +209,7 @@ public class TestDefaultCAServer {
         CertificateServer.CAType.SELF_SIGNED_CA);
 
     Future<X509CertificateHolder> holder = testCA.requestCertificate(csrString,
-        CertificateApprover.ApprovalType.TESTING_AUTOMATIC);
+        CertificateApprover.ApprovalType.TESTING_AUTOMATIC, OM);
     // Right now our calls are synchronous. Eventually this will have to wait.
     assertTrue(holder.isDone());
     assertNotNull(holder.get());
@@ -243,7 +245,7 @@ public class TestDefaultCAServer {
         () -> {
           Future<X509CertificateHolder> holder =
               testCA.requestCertificate(csrString,
-                  CertificateApprover.ApprovalType.TESTING_AUTOMATIC);
+                  CertificateApprover.ApprovalType.TESTING_AUTOMATIC, OM);
           holder.isDone();
           holder.get();
         });
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index 3053e72..1636328 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -73,6 +73,12 @@ message OzoneManagerDetailsProto {
     repeated Port ports = 4;
 }
 
+message ScmNodeDetailsProto {
+    required string scmNodeId = 1;     // SCM Node Id.
+    required string clusterId = 2;     // Cluster Id of SCM cluster.
+    required string hostName = 3;      // Hostname of SCM.
+}
+
 message Port {
     required string name = 1;
     required uint32 value = 2;
diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
index 114d215..0455952 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
@@ -49,6 +49,7 @@ message SCMSecurityRequest {
     optional SCMGetCertificateRequestProto getCertificateRequest = 5;
     optional SCMGetCACertificateRequestProto getCACertificateRequest = 6;
     optional SCMListCertificateRequestProto listCertificateRequest = 7;
+    optional SCMGetSCMCertRequestProto getSCMCertificateRequest = 8;
 
 }
 
@@ -77,6 +78,7 @@ enum Type {
     GetCertificate = 3;
     GetCACertificate = 4;
     ListCertificate = 5;
+    GetSCMCertificate = 6;
 }
 
 enum Status {
@@ -100,6 +102,11 @@ message SCMGetOMCertRequestProto {
     required string CSR = 2;
 }
 
+message SCMGetSCMCertRequestProto {
+    required ScmNodeDetailsProto scmDetails = 1;
+    required string CSR = 2;
+}
+
 /**
 * Proto request to get a certificate with given serial id.
 */
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
index 6bf0d8b..df3d527 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
@@ -57,6 +57,15 @@ public class SCMDBDefinition implements DBDefinition {
           new X509CertificateCodec());
 
   public static final DBColumnFamilyDefinition<BigInteger, X509Certificate>
+      VALID_SCM_CERTS =
+      new DBColumnFamilyDefinition<>(
+          "validSCMCerts",
+          BigInteger.class,
+          new BigIntegerCodec(),
+          X509Certificate.class,
+          new X509CertificateCodec());
+
+  public static final DBColumnFamilyDefinition<BigInteger, X509Certificate>
       REVOKED_CERTS =
       new DBColumnFamilyDefinition<>(
           "revokedCerts",
@@ -105,6 +114,6 @@ public class SCMDBDefinition implements DBDefinition {
   @Override
   public DBColumnFamilyDefinition[] getColumnFamilies() {
     return new DBColumnFamilyDefinition[] {DELETED_BLOCKS, VALID_CERTS,
-        REVOKED_CERTS, PIPELINES, CONTAINERS, TRANSACTIONINFO};
+        VALID_SCM_CERTS, REVOKED_CERTS, PIPELINES, CONTAINERS, TRANSACTIONINFO};
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
index 27727f6..9e99f89 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
@@ -77,6 +77,15 @@ public interface SCMMetadataStore extends DBStoreHAManager {
    */
   Table<BigInteger, X509Certificate> getValidCertsTable();
 
+
+  /**
+   * A table that maintains all the valid certificates of SCM nodes issued by
+   * the SCM CA.
+   *
+   * @return Table
+   */
+  Table<BigInteger, X509Certificate> getValidSCMCertsTable();
+
   /**
    * A Table that maintains all revoked certificates until they expire.
    *
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java
index e352dbd..d21ffd2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java
@@ -41,6 +41,8 @@ import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.PIPELINES;
 import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.REVOKED_CERTS;
 import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.TRANSACTIONINFO;
 import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.VALID_CERTS;
+import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.VALID_SCM_CERTS;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +56,8 @@ public class SCMMetadataStoreImpl implements SCMMetadataStore {
 
   private Table<BigInteger, X509Certificate> validCertsTable;
 
+  private Table<BigInteger, X509Certificate> validSCMCertsTable;
+
   private Table<BigInteger, X509Certificate> revokedCertsTable;
 
   private Table<ContainerID, ContainerInfo> containerTable;
@@ -96,6 +100,10 @@ public class SCMMetadataStoreImpl implements SCMMetadataStore {
 
       checkTableStatus(validCertsTable, VALID_CERTS.getName());
 
+      validSCMCertsTable = VALID_SCM_CERTS.getTable(store);
+
+      checkTableStatus(validSCMCertsTable, VALID_SCM_CERTS.getName());
+
       revokedCertsTable = REVOKED_CERTS.getTable(store);
 
       checkTableStatus(revokedCertsTable, REVOKED_CERTS.getName());
@@ -139,6 +147,11 @@ public class SCMMetadataStoreImpl implements SCMMetadataStore {
   }
 
   @Override
+  public Table<BigInteger, X509Certificate> getValidSCMCertsTable() {
+    return validSCMCertsTable;
+  }
+
+  @Override
   public Table<BigInteger, X509Certificate> getRevokedCertsTable() {
     return revokedCertsTable;
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
index 3f405dc..babc87b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCer
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertificateRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetDataNodeCertRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetOMCertRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetSCMCertRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMListCertificateRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMListCertificateResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecurityRequest;
@@ -112,6 +113,13 @@ public class SCMSecurityProtocolServerSideTranslatorPB
             .setListCertificateResponseProto(
                 listCertificate(request.getListCertificateRequest()))
             .build();
+      case GetSCMCertificate:
+        return SCMSecurityResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setStatus(Status.OK)
+            .setGetCertResponseProto(getSCMCertificate(
+                request.getGetSCMCertificateRequest()))
+            .build();
       default:
         throw new IllegalArgumentException(
             "Unknown request type: " + request.getCmdType());
@@ -147,6 +155,30 @@ public class SCMSecurityProtocolServerSideTranslatorPB
   }
 
   /**
+   * Get signed certificate for SCM.
+   *
+   * @param request - SCMGetSCMCertRequestProto
+   * @return SCMGetCertResponseProto.
+   */
+
+  public SCMGetCertResponseProto getSCMCertificate(
+      SCMGetSCMCertRequestProto request)
+      throws IOException {
+
+    String certificate = impl.getSCMCertificate(request.getScmDetails(),
+        request.getCSR());
+    SCMGetCertResponseProto.Builder builder =
+        SCMGetCertResponseProto
+            .newBuilder()
+            .setResponseCode(ResponseCode.success)
+            .setX509Certificate(certificate)
+            .setX509CACertificate(impl.getCACertificate());
+
+    return builder.build();
+
+  }
+
+  /**
    * Get SCM signed certificate for OzoneManager.
    *
    * @param request
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMCertStore.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMCertStore.java
index e2602ee..3d1cf51 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMCertStore.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMCertStore.java
@@ -58,17 +58,38 @@ public class SCMCertStore implements CertificateStore {
     lock.lock();
     try {
       // This makes sure that no certificate IDs are reusable.
-      if ((getCertificateByID(serialID, CertType.VALID_CERTS) == null) &&
-          (getCertificateByID(serialID, CertType.REVOKED_CERTS) == null)) {
-        scmMetadataStore.getValidCertsTable().put(serialID, certificate);
-      } else {
-        throw new SCMSecurityException("Conflicting certificate ID");
-      }
+      checkValidCertID(serialID);
+      scmMetadataStore.getValidCertsTable().put(serialID, certificate);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void storeValidScmCertificate(BigInteger serialID,
+      X509Certificate certificate) throws IOException {
+    lock.lock();
+    try {
+      checkValidCertID(serialID);
+      BatchOperation batchOperation =
+          scmMetadataStore.getBatchHandler().initBatchOperation();
+      scmMetadataStore.getValidSCMCertsTable().putWithBatch(batchOperation,
+          serialID, certificate);
+      scmMetadataStore.getValidCertsTable().putWithBatch(batchOperation,
+          serialID, certificate);
+      scmMetadataStore.getStore().commitBatchOperation(batchOperation);
     } finally {
       lock.unlock();
     }
   }
 
+  private void checkValidCertID(BigInteger serialID) throws IOException {
+    if ((getCertificateByID(serialID, CertType.VALID_CERTS) != null) ||
+        (getCertificateByID(serialID, CertType.REVOKED_CERTS) != null)) {
+      throw new SCMSecurityException("Conflicting certificate ID");
+    }
+  }
+
   @Override
   public void revokeCertificate(BigInteger serialID) throws IOException {
     lock.lock();
@@ -93,6 +114,10 @@ public class SCMCertStore implements CertificateStore {
                scmMetadataStore.getStore().initBatchOperation();) {
         scmMetadataStore.getRevokedCertsTable()
             .putWithBatch(batch, serialID, cert);
+        if (scmMetadataStore.getValidSCMCertsTable().get(serialID) != null) {
+          scmMetadataStore.getValidSCMCertsTable().deleteWithBatch(batch,
+              serialID);
+        }
         scmMetadataStore.getValidCertsTable().deleteWithBatch(batch, serialID);
         scmMetadataStore.getStore().commitBatchOperation(batch);
       }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
index 8b8eff4..3f3b360 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
@@ -31,9 +31,10 @@ import java.util.concurrent.Future;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.OzoneManagerDetailsProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmNodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos;
 import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
 import org.apache.hadoop.hdds.scm.protocol.SCMSecurityProtocolServerSideTranslatorPB;
@@ -69,9 +70,12 @@ public class SCMSecurityProtocolServer implements SCMSecurityProtocol {
   private final RPC.Server rpcServer;
   private final InetSocketAddress rpcAddress;
   private final ProtocolMessageMetrics metrics;
+  private final StorageContainerManager storageContainerManager;
 
   SCMSecurityProtocolServer(OzoneConfiguration conf,
-      CertificateServer certificateServer) throws IOException {
+      CertificateServer certificateServer, StorageContainerManager scm)
+      throws IOException {
+    this.storageContainerManager = scm;
     this.certificateServer = certificateServer;
     final int handlerCount =
         conf.getInt(ScmConfigKeys.OZONE_SCM_SECURITY_HANDLER_COUNT_KEY,
@@ -115,18 +119,7 @@ public class SCMSecurityProtocolServer implements SCMSecurityProtocol {
     LOGGER.info("Processing CSR for dn {}, UUID: {}", dnDetails.getHostName(),
         dnDetails.getUuid());
     Objects.requireNonNull(dnDetails);
-    Future<X509CertificateHolder> future =
-        certificateServer.requestCertificate(certSignReq,
-            KERBEROS_TRUSTED);
-
-    try {
-      return CertificateCodec.getPEMEncodedString(future.get());
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IOException("getDataNodeCertificate operation failed. ", e);
-    } catch (ExecutionException e) {
-      throw new IOException("getDataNodeCertificate operation failed. ", e);
-    }
+    return getEncodedCertToString(certSignReq, NodeType.DATANODE);
   }
 
   /**
@@ -142,17 +135,57 @@ public class SCMSecurityProtocolServer implements SCMSecurityProtocol {
     LOGGER.info("Processing CSR for om {}, UUID: {}", omDetails.getHostName(),
         omDetails.getUuid());
     Objects.requireNonNull(omDetails);
+    return getEncodedCertToString(certSignReq, NodeType.OM);
+  }
+
+
+  /**
+   * Get signed certificate for SCM Node.
+   *
+   * @param scmNodeDetails   - SCM Node Details.
+   * @param certSignReq - Certificate signing request.
+   * @return String         - SCM signed pem encoded certificate.
+   */
+  @Override
+  public String getSCMCertificate(ScmNodeDetailsProto scmNodeDetails,
+      String certSignReq) throws IOException {
+    Objects.requireNonNull(scmNodeDetails);
+    LOGGER.info("Processing CSR for scm {}, nodeId: {}",
+        scmNodeDetails.getHostName(), scmNodeDetails.getScmNodeId());
+
+    // Check clusterID
+    if (storageContainerManager.getClusterId().equals(
+        scmNodeDetails.getClusterId())) {
+      throw new IOException("SCM ClusterId mismatch. Peer SCM ClusterId " +
+          scmNodeDetails.getClusterId() + ", primary SCM ClusterId "
+          + storageContainerManager.getClusterId());
+    }
+
+    return getEncodedCertToString(certSignReq, NodeType.SCM);
+
+  }
+
+  /**
+   *  Request certificate for the specified role.
+   * @param certSignReq - Certificate signing request.
+   * @param nodeType - role OM/SCM/DATANODE
+   * @return String         - SCM signed pem encoded certificate.
+   * @throws IOException
+   */
+  private String getEncodedCertToString(String certSignReq, NodeType nodeType)
+      throws IOException {
     Future<X509CertificateHolder> future =
         certificateServer.requestCertificate(certSignReq,
-            KERBEROS_TRUSTED);
-
+            KERBEROS_TRUSTED, nodeType);
     try {
       return CertificateCodec.getPEMEncodedString(future.get());
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
-      throw new IOException("getOMCertificate operation failed. ", e);
+      throw new IOException("generate" + nodeType.toString() + "Certificate " +
+          "operation failed. ", e);
     } catch (ExecutionException e) {
-      throw new IOException("getOMCertificate operation failed. ", e);
+      throw new IOException("generate" + nodeType.toString() + "Certificate " +
+          "operation failed.", e);
     }
   }
 
@@ -205,7 +238,7 @@ public class SCMSecurityProtocolServer implements SCMSecurityProtocol {
    * @throws IOException
    */
   @Override
-  public List<String> listCertificate(HddsProtos.NodeType role,
+  public List<String> listCertificate(NodeType role,
       long startSerialId, int count, boolean isRevoked) throws IOException {
     List<X509Certificate> certificates =
         certificateServer.listCertificate(role, startSerialId, count,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 6a4130d..24bb893 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -546,7 +546,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     certificateServer.init(new SecurityConfig(conf),
         CertificateServer.CAType.SELF_SIGNED_CA);
     securityProtocolServer = new SCMSecurityProtocolServer(conf,
-        certificateServer);
+        certificateServer, this);
 
     grpcTlsConfig = createTlsClientConfigForSCM(new SecurityConfig(conf),
             certificateServer);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java
index 5409416..5366b17 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java
@@ -93,6 +93,11 @@ public class TestSCMStoreImplWithOldPipelineIDKeyFormat
   }
 
   @Override
+  public Table<BigInteger, X509Certificate> getValidSCMCertsTable() {
+    return null;
+  }
+
+  @Override
   public Table<BigInteger, X509Certificate> getRevokedCertsTable() {
     return null;
   }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMSecurityProtocolServer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMSecurityProtocolServer.java
index 8040cb4..fa4385d 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMSecurityProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMSecurityProtocolServer.java
@@ -41,7 +41,7 @@ public class TestSCMSecurityProtocolServer {
     config = new OzoneConfiguration();
     config.set(OZONE_SCM_SECURITY_SERVICE_ADDRESS_KEY,
         OZONE_SCM_SECURITY_SERVICE_BIND_HOST_DEFAULT + ":0");
-    securityProtocolServer = new SCMSecurityProtocolServer(config, null);
+    securityProtocolServer = new SCMSecurityProtocolServer(config, null, null);
   }
 
   @After


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org