You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2019/03/09 17:48:41 UTC

[hadoop] 01/02: HDDS-594. SCM CA: DN sends CSR and uses certificate issued by SCM. Contributed by Ajay Kumar. (#547)

This is an automated email from the ASF dual-hosted git repository.

xyao pushed a commit to branch ozone-0.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 6465125a5acf148d069a2bdfce10d119c8dc4399
Author: Ajay Yadav <78...@users.noreply.github.com>
AuthorDate: Thu Mar 7 14:41:52 2019 -0800

    HDDS-594. SCM CA: DN sends CSR and uses certificate issued by SCM. Contributed by Ajay Kumar. (#547)
    
    
    (cherry picked from commit 064f38b3a51be5f99a0ba09d0d48c11286017017)
---
 .../java/org/apache/hadoop/hdds/HddsUtils.java     |  34 +++
 .../certificate/authority/CertificateApprover.java |   7 +-
 .../certificate/authority/DefaultApprover.java     |  36 ++-
 .../certificate/authority/DefaultCAServer.java     |   2 +-
 .../certificates/utils/CertificateSignRequest.java |   4 -
 .../x509/certificate/authority/MockApprover.java   |   7 +-
 .../certificate/authority/TestDefaultCAServer.java |  84 ++++++-
 .../certificates/TestCertificateSignRequest.java   |  18 --
 .../apache/hadoop/ozone/HddsDatanodeService.java   | 130 +++++++++-
 .../hadoop/ozone/TestHddsSecureDatanodeInit.java   | 269 +++++++++++++++++++++
 10 files changed, 552 insertions(+), 39 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
index 1556a57..2ca42d5 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdds;
 
 import javax.management.ObjectName;
+import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
@@ -36,7 +37,15 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
+import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
@@ -48,6 +57,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_K
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED_DEFAULT;
+
+import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -162,6 +173,29 @@ public final class HddsUtils {
   }
 
   /**
+   * Create a scm security client.
+   * @param conf    - Ozone configuration.
+   * @param address - inet socket address of scm.
+   *
+   * @return {@link SCMSecurityProtocol}
+   * @throws IOException
+   */
+  public static SCMSecurityProtocol getScmSecurityClient(
+      OzoneConfiguration conf, InetSocketAddress address) throws IOException {
+    RPC.setProtocolEngine(conf, SCMSecurityProtocolPB.class,
+        ProtobufRpcEngine.class);
+    long scmVersion =
+        RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class);
+    SCMSecurityProtocolClientSideTranslatorPB scmSecurityClient =
+        new SCMSecurityProtocolClientSideTranslatorPB(
+            RPC.getProxy(SCMSecurityProtocolPB.class, scmVersion,
+                address, UserGroupInformation.getCurrentUser(),
+                conf, NetUtils.getDefaultSocketFactory(conf),
+                Client.getRpcTimeout(conf)));
+    return scmSecurityClient;
+  }
+
+  /**
    * Retrieve the hostname, trying the supplied config keys in order.
    * Each config value may be absent, or if present in the format
    * host:port (the :port part is optional).
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateApprover.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateApprover.java
index f9adb61..31d0aea 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateApprover.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateApprover.java
@@ -60,17 +60,22 @@ public interface CertificateApprover {
    * @param validFrom - Begin Date
    * @param validTill - End Date
    * @param certificationRequest - Certification Request.
+   * @param scmId - SCM id.
+   * @param clusterId - Cluster id.
    * @return Signed Certificate.
    * @throws IOException - On Error
    * @throws OperatorCreationException - on Error.
    */
+  @SuppressWarnings("ParameterNumber")
   X509CertificateHolder sign(
       SecurityConfig config,
       PrivateKey caPrivate,
       X509CertificateHolder caCertificate,
       Date validFrom,
       Date validTill,
-      PKCS10CertificationRequest certificationRequest)
+      PKCS10CertificationRequest certificationRequest,
+      String scmId,
+      String clusterId)
       throws IOException, OperatorCreationException;
 
 
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultApprover.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultApprover.java
index 827c82f..c7f37c1 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultApprover.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultApprover.java
@@ -22,7 +22,10 @@ package org.apache.hadoop.hdds.security.x509.certificate.authority;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.authority.PKIProfiles.PKIProfile;
+import org.apache.hadoop.hdds.security.x509.keys.SecurityUtil;
 import org.apache.hadoop.util.Time;
+import org.bouncycastle.asn1.x500.X500Name;
+import org.bouncycastle.asn1.x500.style.BCStyle;
 import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
 import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
 import org.bouncycastle.cert.X509CertificateHolder;
@@ -67,18 +70,22 @@ public class DefaultApprover extends BaseApprover {
    * @param validFrom - Begin Da te
    * @param validTill - End Date
    * @param certificationRequest - Certification Request.
+   * @param scmId - SCM id.
+   * @param clusterId - Cluster id.
    * @return Signed Certificate.
    * @throws IOException - On Error
    * @throws OperatorCreationException - on Error.
    */
+  @SuppressWarnings("ParameterNumber")
   public  X509CertificateHolder sign(
       SecurityConfig config,
       PrivateKey caPrivate,
       X509CertificateHolder caCertificate,
       Date validFrom,
       Date validTill,
-      PKCS10CertificationRequest certificationRequest)
-      throws IOException, OperatorCreationException {
+      PKCS10CertificationRequest certificationRequest,
+      String scmId,
+      String clusterId) throws IOException, OperatorCreationException {
 
     AlgorithmIdentifier sigAlgId = new
         DefaultSignatureAlgorithmIdentifierFinder().find(
@@ -91,6 +98,29 @@ public class DefaultApprover extends BaseApprover {
     SubjectPublicKeyInfo keyInfo =
         certificationRequest.getSubjectPublicKeyInfo();
 
+    // Get scmId and cluster Id from subject name.
+    X500Name x500Name = certificationRequest.getSubject();
+    String csrScmId = x500Name.getRDNs(BCStyle.OU)[0].getFirst().getValue().
+        toASN1Primitive().toString();
+    String csrClusterId = x500Name.getRDNs(BCStyle.O)[0].getFirst().getValue().
+        toASN1Primitive().toString();
+
+    if (!scmId.equals(csrScmId) || !clusterId.equals(csrClusterId)) {
+      if (csrScmId.equalsIgnoreCase("null") &&
+          csrClusterId.equalsIgnoreCase("null")) {
+        // Special case to handle DN certificate generation as DN might not know
+        // scmId and clusterId before registration. In secure mode registration
+        // will succeed only after datanode has a valid certificate.
+        String cn = x500Name.getRDNs(BCStyle.CN)[0].getFirst().getValue()
+            .toASN1Primitive().toString();
+        x500Name = SecurityUtil.getDistinguishedName(cn, scmId, clusterId);
+      } else {
+        // Throw exception if scmId and clusterId doesn't match.
+        throw new SCMSecurityException("ScmId and ClusterId in CSR subject" +
+            " are incorrect.");
+      }
+    }
+
     RSAKeyParameters rsa =
         (RSAKeyParameters) PublicKeyFactory.createKey(keyInfo);
     if (rsa.getModulus().bitLength() < config.getSize()) {
@@ -104,7 +134,7 @@ public class DefaultApprover extends BaseApprover {
             BigInteger.valueOf(Time.monotonicNowNanos()),
             validFrom,
             validTill,
-            certificationRequest.getSubject(), keyInfo);
+            x500Name, keyInfo);
 
     ContentSigner sigGen = new BcRSAContentSignerBuilder(sigAlgId, digAlgId)
         .build(asymmetricKP);
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
index fffde90..901c86c 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
@@ -227,7 +227,7 @@ public class DefaultCAServer implements CertificateServer {
         X509CertificateHolder xcert = approver.sign(config,
             getCAKeys().getPrivate(),
             getCACertificate(), java.sql.Date.valueOf(beginDate),
-            java.sql.Date.valueOf(endDate), csr);
+            java.sql.Date.valueOf(endDate), csr, scmID, clusterID);
         store.storeValidCertificate(xcert.getSerialNumber(),
             CertificateCodec.getX509Certificate(xcert));
         xcertHolder.complete(xcert);
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/utils/CertificateSignRequest.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/utils/CertificateSignRequest.java
index 42db909..28f853a 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/utils/CertificateSignRequest.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/utils/CertificateSignRequest.java
@@ -269,10 +269,6 @@ public final class CertificateSignRequest {
       Preconditions.checkNotNull(key, "KeyPair cannot be null");
       Preconditions.checkArgument(Strings.isNotBlank(subject), "Subject " +
           "cannot be blank");
-      Preconditions.checkArgument(Strings.isNotBlank(clusterID), "Cluster ID " +
-          "cannot be blank");
-      Preconditions.checkArgument(Strings.isNotBlank(scmID), "SCM ID cannot " +
-          "be blank");
 
       try {
         CertificateSignRequest csr = new CertificateSignRequest(subject, scmID,
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/MockApprover.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/MockApprover.java
index d1777e1..a8fa0af 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/MockApprover.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/MockApprover.java
@@ -47,9 +47,10 @@ public class MockApprover extends BaseApprover {
 
   @Override
   public X509CertificateHolder sign(SecurityConfig config, PrivateKey caPrivate,
-                                    X509CertificateHolder caCertificate,
-                                    Date validFrom, Date validTill,
-                                    PKCS10CertificationRequest request)
+      X509CertificateHolder caCertificate,
+      Date validFrom, Date validTill,
+      PKCS10CertificationRequest request,
+      String scmId, String clusterId)
       throws IOException, OperatorCreationException {
     return null;
   }
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/TestDefaultCAServer.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/TestDefaultCAServer.java
index 8d807b7..64eb4ba 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/TestDefaultCAServer.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/TestDefaultCAServer.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest;
 import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
+import org.apache.hadoop.test.LambdaTestUtils;
 import org.bouncycastle.cert.X509CertificateHolder;
 import org.bouncycastle.pkcs.PKCS10CertificationRequest;
 import org.junit.Before;
@@ -139,14 +140,57 @@ public class TestDefaultCAServer {
   public void testRequestCertificate() throws IOException,
       ExecutionException, InterruptedException,
       NoSuchProviderException, NoSuchAlgorithmException {
+    String scmId =  RandomStringUtils.randomAlphabetic(4);
+    String clusterId =  RandomStringUtils.randomAlphabetic(4);
+    KeyPair keyPair =
+        new HDDSKeyGenerator(conf).generateKey();
+    PKCS10CertificationRequest csr = new CertificateSignRequest.Builder()
+        .addDnsName("hadoop.apache.org")
+        .addIpAddress("8.8.8.8")
+        .setCA(false)
+        .setClusterID(clusterId)
+        .setScmID(scmId)
+        .setSubject("Ozone Cluster")
+        .setConfiguration(conf)
+        .setKey(keyPair)
+        .build();
+
+    // Let us convert this to a string to mimic the common use case.
+    String csrString = CertificateSignRequest.getEncodedString(csr);
+
+    CertificateServer testCA = new DefaultCAServer("testCA",
+        clusterId, scmId, caStore);
+    testCA.init(new SecurityConfig(conf),
+        CertificateServer.CAType.SELF_SIGNED_CA);
+
+    Future<X509CertificateHolder> holder = testCA.requestCertificate(csrString,
+        CertificateApprover.ApprovalType.TESTING_AUTOMATIC);
+    // Right now our calls are synchronous. Eventually this will have to wait.
+    assertTrue(holder.isDone());
+    assertNotNull(holder.get());
+  }
+
+  /**
+   * Tests that we are able
+   * to create a Test CA, creates it own self-Signed CA and then issue a
+   * certificate based on a CSR when scmId and clusterId are not set in
+   * csr subject.
+   * @throws SCMSecurityException - on ERROR.
+   * @throws ExecutionException - on ERROR.
+   * @throws InterruptedException - on ERROR.
+   * @throws NoSuchProviderException - on ERROR.
+   * @throws NoSuchAlgorithmException - on ERROR.
+   */
+  @Test
+  public void testRequestCertificateWithInvalidSubject() throws IOException,
+      ExecutionException, InterruptedException,
+      NoSuchProviderException, NoSuchAlgorithmException {
     KeyPair keyPair =
         new HDDSKeyGenerator(conf).generateKey();
     PKCS10CertificationRequest csr = new CertificateSignRequest.Builder()
         .addDnsName("hadoop.apache.org")
         .addIpAddress("8.8.8.8")
         .setCA(false)
-        .setClusterID("ClusterID")
-        .setScmID("SCMID")
         .setSubject("Ozone Cluster")
         .setConfiguration(conf)
         .setKey(keyPair)
@@ -168,4 +212,40 @@ public class TestDefaultCAServer {
     assertNotNull(holder.get());
   }
 
+  @Test
+  public void testRequestCertificateWithInvalidSubjectFailure()
+      throws Exception {
+    KeyPair keyPair =
+        new HDDSKeyGenerator(conf).generateKey();
+    PKCS10CertificationRequest csr = new CertificateSignRequest.Builder()
+        .addDnsName("hadoop.apache.org")
+        .addIpAddress("8.8.8.8")
+        .setCA(false)
+        .setScmID("wrong one")
+        .setClusterID("223432rf")
+        .setSubject("Ozone Cluster")
+        .setConfiguration(conf)
+        .setKey(keyPair)
+        .build();
+
+    // Let us convert this to a string to mimic the common use case.
+    String csrString = CertificateSignRequest.getEncodedString(csr);
+
+    CertificateServer testCA = new DefaultCAServer("testCA",
+        RandomStringUtils.randomAlphabetic(4),
+        RandomStringUtils.randomAlphabetic(4), caStore);
+    testCA.init(new SecurityConfig(conf),
+        CertificateServer.CAType.SELF_SIGNED_CA);
+
+    LambdaTestUtils.intercept(ExecutionException.class, "ScmId and " +
+            "ClusterId in CSR subject are incorrect",
+        () -> {
+          Future<X509CertificateHolder> holder =
+              testCA.requestCertificate(csrString,
+                  CertificateApprover.ApprovalType.TESTING_AUTOMATIC);
+          holder.isDone();
+          holder.get();
+        });
+  }
+
 }
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificates/TestCertificateSignRequest.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificates/TestCertificateSignRequest.java
index 16b3214..5720d27 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificates/TestCertificateSignRequest.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificates/TestCertificateSignRequest.java
@@ -213,24 +213,6 @@ public class TestCertificateSignRequest {
       builder.setSubject(subject);
     }
 
-    // Now try with blank/null SCM ID
-    try {
-      builder.setScmID(null);
-      builder.build();
-      Assert.fail("Null/Blank SCM ID should have thrown.");
-    } catch (IllegalArgumentException e) {
-      builder.setScmID(scmID);
-    }
-
-    // Now try with blank/null SCM ID
-    try {
-      builder.setClusterID(null);
-      builder.build();
-      Assert.fail("Null/Blank Cluster ID should have thrown.");
-    } catch (IllegalArgumentException e) {
-      builder.setClusterID(clusterID);
-    }
-
     // Now try with invalid IP address
     try {
       builder.addIpAddress("255.255.255.*");
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
index 3c205e6..260f348 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.ozone;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsUtils;
@@ -27,17 +26,24 @@ import org.apache.hadoop.hdds.cli.GenericCli;
 import org.apache.hadoop.hdds.cli.HddsVersionProvider;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.hadoop.hdds.security.x509.certificate.client.DNCertificateClient;
+import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
+import org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.util.ServicePlugin;
 import org.apache.hadoop.util.StringUtils;
+import org.bouncycastle.pkcs.PKCS10CertificationRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import picocli.CommandLine.Command;
@@ -45,9 +51,13 @@ import picocli.CommandLine.Command;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.security.KeyPair;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
 import java.util.List;
 import java.util.UUID;
 
+import static org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest.getEncodedString;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_DATANODE_PLUGINS_KEY;
 import static org.apache.hadoop.util.ExitUtil.terminate;
 
@@ -68,6 +78,8 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
   private DatanodeDetails datanodeDetails;
   private DatanodeStateMachine datanodeStateMachine;
   private List<ServicePlugin> plugins;
+  private CertificateClient dnCertClient;
+  private String component;
   private HddsDatanodeHttpServer httpServer;
 
   /**
@@ -135,6 +147,10 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
     }
   }
 
+  public static Logger getLogger() {
+    return LOG;
+  }
+
   /**
    * Starts HddsDatanode services.
    *
@@ -160,13 +176,15 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
                 .substring(0, 8));
         LOG.info("HddsDatanodeService host:{} ip:{}", hostname, ip);
         // Authenticate Hdds Datanode service if security is enabled
-        if (conf.getBoolean(OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY,
-            true)) {
+        if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
+          component = "dn-" + datanodeDetails.getUuidString();
+
+          dnCertClient = new DNCertificateClient(new SecurityConfig(conf));
+
           if (SecurityUtil.getAuthenticationMethod(conf).equals(
               UserGroupInformation.AuthenticationMethod.KERBEROS)) {
-            LOG.debug("Ozone security is enabled. Attempting login for Hdds " +
-                    "Datanode user. "
-                    + "Principal: {},keytab: {}", conf.get(
+            LOG.info("Ozone security is enabled. Attempting login for Hdds " +
+                    "Datanode user. Principal: {},keytab: {}", conf.get(
                 DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY),
                 conf.get(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY));
 
@@ -191,6 +209,9 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
         startPlugins();
         // Starting HDDS Daemons
         datanodeStateMachine.startDaemon();
+        if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
+          initializeCertificateClient(conf);
+        }
       } catch (IOException e) {
         throw new RuntimeException("Can't start the HDDS datanode plugin", e);
       } catch (AuthenticationException ex) {
@@ -201,6 +222,87 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
   }
 
   /**
+   * Initializes secure Datanode.
+   * */
+  @VisibleForTesting
+  public void initializeCertificateClient(OzoneConfiguration config)
+      throws IOException {
+    LOG.info("Initializing secure Datanode.");
+
+    CertificateClient.InitResponse response = dnCertClient.init();
+    LOG.info("Init response: {}", response);
+    switch (response) {
+    case SUCCESS:
+      LOG.info("Initialization successful, case:{}.", response);
+      break;
+    case GETCERT:
+      getSCMSignedCert(config);
+      LOG.info("Successfully stored SCM signed certificate, case:{}.",
+          response);
+      break;
+    case FAILURE:
+      LOG.error("DN security initialization failed, case:{}.", response);
+      throw new RuntimeException("DN security initialization failed.");
+    case RECOVER:
+      LOG.error("DN security initialization failed, case:{}. OM certificate " +
+          "is missing.", response);
+      throw new RuntimeException("DN security initialization failed.");
+    default:
+      LOG.error("DN security initialization failed. Init response: {}",
+          response);
+      throw new RuntimeException("DN security initialization failed.");
+    }
+  }
+
+  /**
+   * Get SCM signed certificate and store it using certificate client.
+   * @param config
+   * */
+  private void getSCMSignedCert(OzoneConfiguration config) {
+    try {
+      PKCS10CertificationRequest csr = getCSR(config);
+      // TODO: For SCM CA we should fetch certificate from multiple SCMs.
+      SCMSecurityProtocol secureScmClient =
+          HddsUtils.getScmSecurityClient(config,
+              HddsUtils.getScmAddressForSecurityProtocol(config));
+
+      String pemEncodedCert = secureScmClient.getDataNodeCertificate(
+          datanodeDetails.getProtoBufMessage(), getEncodedString(csr));
+
+      X509Certificate x509Certificate =
+          CertificateCodec.getX509Certificate(pemEncodedCert);
+      dnCertClient.storeCertificate(x509Certificate);
+    } catch (IOException | CertificateException e) {
+      LOG.error("Error while storing SCM signed certificate.", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Creates CSR for DN.
+   * @param config
+   * */
+  @VisibleForTesting
+  public PKCS10CertificationRequest getCSR(Configuration config)
+      throws IOException {
+    CertificateSignRequest.Builder builder = dnCertClient.getCSRBuilder();
+    KeyPair keyPair = new KeyPair(dnCertClient.getPublicKey(),
+        dnCertClient.getPrivateKey());
+
+    String hostname = InetAddress.getLocalHost().getCanonicalHostName();
+    String subject = UserGroupInformation.getCurrentUser()
+        .getShortUserName() + "@" + hostname;
+
+    builder.setCA(false)
+        .setKey(keyPair)
+        .setConfiguration(config)
+        .setSubject(subject);
+
+    LOG.info("Creating csr for DN-> subject:{}", subject);
+    return builder.build();
+  }
+
+  /**
    * Returns DatanodeDetails or null in case of Error.
    *
    * @return DatanodeDetails
@@ -324,4 +426,18 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
       }
     }
   }
+
+  @VisibleForTesting
+  public String getComponent() {
+    return component;
+  }
+
+  public CertificateClient getCertificateClient() {
+    return dnCertClient;
+  }
+
+  @VisibleForTesting
+  public void setCertificateClient(CertificateClient client) {
+    dnCertClient = client;
+  }
 }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/TestHddsSecureDatanodeInit.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/TestHddsSecureDatanodeInit.java
new file mode 100644
index 0000000..c6f65c7
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/TestHddsSecureDatanodeInit.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.hadoop.hdds.security.x509.certificate.client.DNCertificateClient;
+import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
+import org.apache.hadoop.hdds.security.x509.keys.KeyCodec;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.ServicePlugin;
+import org.bouncycastle.cert.X509CertificateHolder;
+import org.bouncycastle.pkcs.PKCS10CertificationRequest;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.security.KeyPair;
+import java.security.PrivateKey;
+import java.security.PublicKey;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.Callable;
+
+import static org.apache.hadoop.ozone.HddsDatanodeService.getLogger;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
+
+/**
+ * Test class for {@link HddsDatanodeService}.
+ */
+public class TestHddsSecureDatanodeInit {
+
+  private static File testDir;
+  private static OzoneConfiguration conf;
+  private static HddsDatanodeService service;
+  private static String[] args = new String[]{};
+  private static PrivateKey privateKey;
+  private static PublicKey publicKey;
+  private static GenericTestUtils.LogCapturer dnLogs;
+  private static CertificateClient client;
+  private static SecurityConfig securityConfig;
+  private static KeyCodec keyCodec;
+  private static CertificateCodec certCodec;
+  private static X509CertificateHolder certHolder;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    testDir = GenericTestUtils.getRandomizedTestDir();
+    conf = new OzoneConfiguration();
+    conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getPath());
+    String volumeDir = testDir + "/disk1";
+    conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, volumeDir);
+
+    conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
+    conf.setClass(OzoneConfigKeys.HDDS_DATANODE_PLUGINS_KEY,
+        TestHddsDatanodeService.MockService.class,
+        ServicePlugin.class);
+    securityConfig = new SecurityConfig(conf);
+
+    service = HddsDatanodeService.createHddsDatanodeService(args, conf);
+    dnLogs = GenericTestUtils.LogCapturer.captureLogs(getLogger());
+    callQuietly(() -> {
+      service.start(null);
+      return null;
+    });
+    callQuietly(() -> {
+      service.initializeCertificateClient(conf);
+      return null;
+    });
+    certCodec = new CertificateCodec(securityConfig);
+    keyCodec = new KeyCodec(securityConfig);
+    dnLogs.clearOutput();
+    privateKey = service.getCertificateClient().getPrivateKey();
+    publicKey = service.getCertificateClient().getPublicKey();
+    X509Certificate x509Certificate = null;
+
+    x509Certificate = KeyStoreTestUtil.generateCertificate(
+        "CN=Test", new KeyPair(publicKey, privateKey), 10,
+        securityConfig.getSignatureAlgo());
+    certHolder = new X509CertificateHolder(x509Certificate.getEncoded());
+
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    FileUtil.fullyDelete(testDir);
+  }
+
+  @Before
+  public void setUpDNCertClient(){
+    client = new DNCertificateClient(securityConfig);
+    service.setCertificateClient(client);
+    FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
+        .toString(), securityConfig.getPrivateKeyFileName()).toFile());
+    FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
+        .toString(), securityConfig.getPublicKeyFileName()).toFile());
+    FileUtils.deleteQuietly(Paths.get(securityConfig
+        .getCertificateLocation().toString(),
+        securityConfig.getCertificateFileName()).toFile());
+    dnLogs.clearOutput();
+
+  }
+
+  @Test
+  public void testSecureDnStartupCase0() throws Exception {
+
+    // Case 0: When keypair as well as certificate is missing. Initial keypair
+    // boot-up. Get certificate will fail as no SCM is not running.
+    LambdaTestUtils.intercept(Exception.class, "",
+        () -> service.initializeCertificateClient(conf));
+
+    Assert.assertNotNull(client.getPrivateKey());
+    Assert.assertNotNull(client.getPublicKey());
+    Assert.assertNull(client.getCertificate());
+    Assert.assertTrue(dnLogs.getOutput().contains("Init response: GETCERT"));
+  }
+
+  @Test
+  public void testSecureDnStartupCase1() throws Exception {
+    // Case 1: When only certificate is present.
+
+    certCodec.writeCertificate(certHolder);
+    LambdaTestUtils.intercept(RuntimeException.class, "DN security" +
+            " initialization failed",
+        () -> service.initializeCertificateClient(conf));
+    Assert.assertNull(client.getPrivateKey());
+    Assert.assertNull(client.getPublicKey());
+    Assert.assertNotNull(client.getCertificate());
+    Assert.assertTrue(dnLogs.getOutput().contains("Init response: FAILURE"));
+  }
+
+  @Test
+  public void testSecureDnStartupCase2() throws Exception {
+    // Case 2: When private key and certificate is missing.
+    keyCodec.writePublicKey(publicKey);
+    LambdaTestUtils.intercept(RuntimeException.class, "DN security" +
+            " initialization failed",
+        () -> service.initializeCertificateClient(conf));
+    Assert.assertNull(client.getPrivateKey());
+    Assert.assertNotNull(client.getPublicKey());
+    Assert.assertNull(client.getCertificate());
+    Assert.assertTrue(dnLogs.getOutput().contains("Init response: FAILURE"));
+  }
+
+  @Test
+  public void testSecureDnStartupCase3() throws Exception {
+    // Case 3: When only public key and certificate is present.
+    keyCodec.writePublicKey(publicKey);
+    certCodec.writeCertificate(certHolder);
+    LambdaTestUtils.intercept(RuntimeException.class, "DN security" +
+            " initialization failed",
+        () -> service.initializeCertificateClient(conf));
+    Assert.assertNull(client.getPrivateKey());
+    Assert.assertNotNull(client.getPublicKey());
+    Assert.assertNotNull(client.getCertificate());
+    Assert.assertTrue(dnLogs.getOutput().contains("Init response: FAILURE"));
+  }
+
+  @Test
+  public void testSecureDnStartupCase4() throws Exception {
+    // Case 4: When public key as well as certificate is missing.
+    keyCodec.writePrivateKey(privateKey);
+    LambdaTestUtils.intercept(RuntimeException.class, " DN security" +
+            " initialization failed",
+        () -> service.initializeCertificateClient(conf));
+    Assert.assertNotNull(client.getPrivateKey());
+    Assert.assertNull(client.getPublicKey());
+    Assert.assertNull(client.getCertificate());
+    Assert.assertTrue(dnLogs.getOutput().contains("Init response: FAILURE"));
+    dnLogs.clearOutput();
+  }
+
+  @Test
+  public void testSecureDnStartupCase5() throws Exception {
+    // Case 5: If private key and certificate is present.
+    certCodec.writeCertificate(certHolder);
+    keyCodec.writePrivateKey(privateKey);
+    service.initializeCertificateClient(conf);
+    Assert.assertNotNull(client.getPrivateKey());
+    Assert.assertNotNull(client.getPublicKey());
+    Assert.assertNotNull(client.getCertificate());
+    Assert.assertTrue(dnLogs.getOutput().contains("Init response: SUCCESS"));
+  }
+
+  @Test
+  public void testSecureDnStartupCase6() throws Exception {
+    // Case 6: If key pair already exist than response should be GETCERT.
+    keyCodec.writePublicKey(publicKey);
+    keyCodec.writePrivateKey(privateKey);
+    LambdaTestUtils.intercept(Exception.class, "",
+        () -> service.initializeCertificateClient(conf));
+    Assert.assertNotNull(client.getPrivateKey());
+    Assert.assertNotNull(client.getPublicKey());
+    Assert.assertNull(client.getCertificate());
+    Assert.assertTrue(dnLogs.getOutput().contains("Init response: GETCERT"));
+  }
+
+  @Test
+  public void testSecureDnStartupCase7() throws Exception {
+    // Case 7 When keypair and certificate is present.
+    keyCodec.writePublicKey(publicKey);
+    keyCodec.writePrivateKey(privateKey);
+    certCodec.writeCertificate(certHolder);
+
+    service.initializeCertificateClient(conf);
+    Assert.assertNotNull(client.getPrivateKey());
+    Assert.assertNotNull(client.getPublicKey());
+    Assert.assertNotNull(client.getCertificate());
+    Assert.assertTrue(dnLogs.getOutput().contains("Init response: SUCCESS"));
+  }
+
+  /**
+   * Invoke a callable; Ignore all exception.
+   * @param closure closure to execute
+   * @return
+   */
+  public static void callQuietly(Callable closure) {
+    try {
+      closure.call();
+    } catch (Throwable e) {
+      // Ignore all Throwable,
+    }
+  }
+
+  @Test
+  public void testGetCSR() throws Exception {
+    keyCodec.writePublicKey(publicKey);
+    keyCodec.writePrivateKey(privateKey);
+    service.setCertificateClient(client);
+    PKCS10CertificationRequest csr =
+        service.getCSR(conf);
+    Assert.assertNotNull(csr);
+
+    csr = service.getCSR(conf);
+    Assert.assertNotNull(csr);
+
+    csr = service.getCSR(conf);
+    Assert.assertNotNull(csr);
+
+    csr = service.getCSR(conf);
+    Assert.assertNotNull(csr);
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org