You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by vi...@apache.org on 2021/12/14 19:08:01 UTC

[ozone] branch HDDS-6030 updated: HDDS-6031. Add configs to support externalization of root CA (#2878)

This is an automated email from the ASF dual-hosted git repository.

vivekratnavel pushed a commit to branch HDDS-6030
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-6030 by this push:
     new bb94b35  HDDS-6031. Add configs to support externalization of root CA (#2878)
bb94b35 is described below

commit bb94b357a2c6fff91ffb9f53528099d8c1b9ac79
Author: Vivek Ratnavel Subramanian <vi...@gmail.com>
AuthorDate: Tue Dec 14 11:07:45 2021 -0800

    HDDS-6031. Add configs to support externalization of root CA (#2878)
---
 .../org/apache/hadoop/hdds/HddsConfigKeys.java     |  17 +-
 .../hadoop/hdds/security/x509/SecurityConfig.java  |  79 +++-
 .../x509/certificate/utils/CertificateCodec.java   |  18 +-
 .../common/src/main/resources/ozone-default.xml    |  58 +++
 .../container/replication/ReplicationServer.java   |   9 +-
 .../hadoop/ozone/TestHddsSecureDatanodeInit.java   |   3 +-
 .../x509/certificate/client/CertificateClient.java |   4 +-
 .../certificate/client/DNCertificateClient.java    |   6 +-
 .../client/DefaultCertificateClient.java           | 398 ++++++++++++++-------
 .../certificate/client/OMCertificateClient.java    |  11 +-
 .../certificate/client/SCMCertificateClient.java   |  10 +-
 .../hadoop/hdds/security/x509/keys/KeyCodec.java   | 118 ++++--
 .../hdds/security/x509/keys/SecurityUtil.java      |  92 ++++-
 .../java/org/apache/hadoop/hdds/utils/HAUtils.java |  47 ++-
 .../client/TestDefaultCertificateClient.java       |   2 +-
 .../hdds/security/x509/keys/TestKeyCodec.java      |  27 ++
 .../apache/hadoop/hdds/scm/ha/HASecurityUtils.java |   2 +-
 .../hdds/scm/server/StorageContainerManager.java   |   9 +-
 .../container/server/TestContainerServer.java      |   2 +-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  39 +-
 .../OzoneDelegationTokenSecretManager.java         |   5 +-
 21 files changed, 733 insertions(+), 223 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index f538595..01c0aeb 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with this
  * work for additional information regarding copyright ownership.  The ASF
@@ -130,6 +130,21 @@ public final class HddsConfigKeys {
   public static final String HDDS_DEFAULT_SECURITY_PROVIDER = "BC";
   public static final String HDDS_KEY_DIR_NAME = "hdds.key.dir.name";
   public static final String HDDS_KEY_DIR_NAME_DEFAULT = "keys";
+
+  public static final String HDDS_CUSTOM_ROOT_CA_ENABLED =
+      "hdds.custom.rootca.enabled";
+  public static final boolean HDDS_CUSTOM_ROOT_CA_ENABLED_DEFAULT = false;
+  public static final String HDDS_CUSTOM_KEYSTORE_FILE_PATH =
+      "hdds.custom.keystore.file.path";
+  public static final String HDDS_CUSTOM_KEYSTORE_FILE_PASSWORD =
+      "hdds.custom.keystore.file.password";
+  public static final String HDDS_CUSTOM_KEYSTORE_KEY_PASSWORD =
+      "hdds.custom.keystore.key.password";
+  public static final String HDDS_CUSTOM_TRUSTSTORE_FILE_PATH =
+      "hdds.custom.truststore.file.path";
+  public static final String HDDS_CUSTOM_TRUSTSTORE_PASSWORD =
+      "hdds.custom.truststore.password";
+
   // TODO : Talk to StorageIO classes and see if they can return a secure
   // storage location for each node.
   public static final String HDDS_METADATA_DIR_NAME = "hdds.metadata.dir";
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java
index b02ce1b..dd6da05 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java
@@ -19,6 +19,16 @@
 
 package org.apache.hadoop.hdds.security.x509;
 
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslProvider;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.security.Provider;
@@ -26,10 +36,6 @@ import java.security.Security;
 import java.time.Duration;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-
-import com.google.common.base.Preconditions;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_TOKEN_ENABLED;
@@ -69,10 +75,13 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
-import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslProvider;
-import org.bouncycastle.jce.provider.BouncyCastleProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CUSTOM_KEYSTORE_FILE_PASSWORD;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CUSTOM_KEYSTORE_FILE_PATH;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CUSTOM_KEYSTORE_KEY_PASSWORD;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CUSTOM_ROOT_CA_ENABLED;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CUSTOM_ROOT_CA_ENABLED_DEFAULT;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CUSTOM_TRUSTSTORE_FILE_PATH;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CUSTOM_TRUSTSTORE_PASSWORD;
 
 /**
  * A class that deals with all Security related configs in HDDS.
@@ -103,6 +112,13 @@ public class SecurityConfig {
   private final boolean isSecurityEnabled;
   private final String crlName;
   private boolean grpcTlsUseTestCert;
+  private final boolean isCustomCAEnabled;
+  // The following configs are used only when custom Root CA is enabled
+  private final String keystoreFilePath;
+  private char[] keystoreFilePassword;
+  private char[] keystoreKeyPassword;
+  private final String truststoreFilePath;
+  private char[] truststorePassword;
 
   /**
    * Constructs a SecurityConfig.
@@ -117,6 +133,9 @@ public class SecurityConfig {
         HDDS_DEFAULT_KEY_ALGORITHM);
     this.providerString = this.configuration.get(HDDS_SECURITY_PROVIDER,
         HDDS_DEFAULT_SECURITY_PROVIDER);
+    this.isCustomCAEnabled =
+        this.configuration.getBoolean(HDDS_CUSTOM_ROOT_CA_ENABLED,
+            HDDS_CUSTOM_ROOT_CA_ENABLED_DEFAULT);
 
     // Please Note: To make it easy for our customers we will attempt to read
     // HDDS metadata dir and if that is not set, we will use Ozone directory.
@@ -175,6 +194,22 @@ public class SecurityConfig {
     this.crlName = this.configuration.get(HDDS_X509_CRL_NAME,
                                           HDDS_X509_CRL_NAME_DEFAULT);
 
+    this.keystoreFilePath =
+        this.configuration.get(HDDS_CUSTOM_KEYSTORE_FILE_PATH);
+    this.truststoreFilePath =
+        this.configuration.get(HDDS_CUSTOM_TRUSTSTORE_FILE_PATH);
+    try {
+      this.keystoreFilePassword =
+          this.configuration.getPassword(HDDS_CUSTOM_KEYSTORE_FILE_PASSWORD);
+      this.keystoreKeyPassword =
+          this.configuration.getPassword(HDDS_CUSTOM_KEYSTORE_KEY_PASSWORD);
+      this.truststorePassword =
+          this.configuration.getPassword(HDDS_CUSTOM_TRUSTSTORE_PASSWORD);
+    } catch (IOException ioException) {
+      LOG.error("Error while getting custom Keystore / Truststore password.",
+          ioException);
+    }
+
     // First Startup -- if the provider is null, check for the provider.
     if (SecurityConfig.provider == null) {
       synchronized (SecurityConfig.class) {
@@ -406,4 +441,32 @@ public class SecurityConfig {
         OzoneConfigKeys.OZONE_S3_AUTHINFO_MAX_LIFETIME_KEY_DEFAULT,
         TimeUnit.MICROSECONDS);
   }
+
+  /**
+   * Returns true if custom Root CA is enabled.
+   * @return true if custom Root CA is enabled.
+   */
+  public boolean isCustomCAEnabled() {
+    return isCustomCAEnabled;
+  }
+
+  public String getKeystoreFilePath() {
+    return keystoreFilePath;
+  }
+
+  public char[] getKeystoreFilePassword() {
+    return keystoreFilePassword.clone();
+  }
+
+  public char[] getKeystoreKeyPassword() {
+    return keystoreKeyPassword.clone();
+  }
+
+  public String getTruststoreFilePath() {
+    return truststoreFilePath;
+  }
+
+  public char[] getTruststorePassword() {
+    return truststorePassword.clone();
+  }
 }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateCodec.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateCodec.java
index 03e4c53..6f0a1c5 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateCodec.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateCodec.java
@@ -265,6 +265,20 @@ public class CertificateCodec {
   }
 
   /**
+   * Returns the certificate from the specific PEM encoded file.
+   *
+   * @param certFile - Full path to the certificate file.
+   * @return X%09 Certificate
+   * @throws IOException          - on Error.
+   * @throws SCMSecurityException - on Error.
+   * @throws CertificateException - on Error.
+   */
+  public static X509CertificateHolder readCertificate(File certFile)
+      throws IOException, CertificateException {
+    return getX509CertificateHolder(certFile);
+  }
+
+  /**
    * Helper function to read certificate.
    *
    * @param certificateFile - Full path to certificate file.
@@ -272,8 +286,8 @@ public class CertificateCodec {
    * @throws IOException          - On Error.
    * @throws CertificateException - On Error.
    */
-  private X509CertificateHolder getX509CertificateHolder(File certificateFile)
-      throws IOException, CertificateException {
+  private static X509CertificateHolder getX509CertificateHolder(
+      File certificateFile) throws IOException, CertificateException {
     if (!certificateFile.exists()) {
       throw new IOException("Unable to find the requested certificate. Path: "
           + certificateFile.toString());
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 6a97c85..ee29d3d 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3028,4 +3028,62 @@
       will create intermediate directories.
     </description>
   </property>
+
+  <property>
+    <name>hdds.custom.rootca.enabled</name>
+    <value>false</value>
+    <tag>OZONE, SECURITY</tag>
+    <description>
+      This configuration is used to enable external root CA for Ozone.
+      If this is set to true, "hdds.custom.keystore.file.path",
+      "hdds.custom.keystore.file.password", "hdds.custom.truststore.file.path", "hdds.custom.truststore.password"
+      are also required to be configured to be able successfully start Ozone.
+    </description>
+  </property>
+
+  <property>
+    <name>hdds.custom.keystore.file.path</name>
+    <value></value>
+    <tag>OZONE, SECURITY</tag>
+    <description>
+      The keystore file in JCEKS format that contains the key and the certificate signed by the custom Root CA.
+    </description>
+  </property>
+
+  <property>
+    <name>hdds.custom.keystore.file.password</name>
+    <value></value>
+    <tag>OZONE, SECURITY</tag>
+    <description>
+      The keystore password to access the keystore file specified in "hdds.custom.keystore.file.path"
+    </description>
+  </property>
+
+  <property>
+    <name>hdds.custom.keystore.key.password</name>
+    <value>false</value>
+    <tag>OZONE, SECURITY</tag>
+    <description>
+      The keystore key password to access the keys store in the keystore file specified in "hdds.custom.keystore.file.path"
+    </description>
+  </property>
+
+  <property>
+    <name>hdds.custom.truststore.file.path</name>
+    <value></value>
+    <tag>OZONE, SECURITY</tag>
+    <description>
+      The truststore file that contains the trusted root CA and intermediate CA certificates.
+    </description>
+  </property>
+
+  <property>
+    <name>hdds.custom.truststore.password</name>
+    <value></value>
+    <tag>OZONE, SECURITY</tag>
+    <description>
+      The password to access the truststore file specified in "hdds.custom.truststore.file.path"
+    </description>
+  </property>
+
 </configuration>
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
index dd5f4c4..5af0b90 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hdds.conf.Config;
 import org.apache.hadoop.hdds.conf.ConfigGroup;
 import org.apache.hadoop.hdds.conf.ConfigTag;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.tracing.GrpcServerInterceptor;
@@ -62,7 +63,7 @@ public class ReplicationServer {
       ReplicationConfig replicationConfig,
       SecurityConfig secConf,
       CertificateClient caClient
-  ) {
+  ) throws SCMSecurityException {
     this.secConf = secConf;
     this.caClient = caClient;
     this.controller = controller;
@@ -70,7 +71,7 @@ public class ReplicationServer {
     init();
   }
 
-  public void init() {
+  public void init() throws IllegalArgumentException {
     NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port)
         .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE)
         .addService(ServerInterceptors.intercept(new GrpcReplicationService(
@@ -93,7 +94,7 @@ public class ReplicationServer {
       } catch (IOException ex) {
         throw new IllegalArgumentException(
             "Unable to setup TLS for secure datanode replication GRPC "
-                + "endpoint.", ex);
+                + "endpoint. Error while getting Certificate.", ex);
       }
     }
 
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/TestHddsSecureDatanodeInit.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/TestHddsSecureDatanodeInit.java
index 08ca4c9..4dcd4ed 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/TestHddsSecureDatanodeInit.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/TestHddsSecureDatanodeInit.java
@@ -17,6 +17,7 @@
 package org.apache.hadoop.ozone;
 
 import java.io.File;
+import java.io.IOException;
 import java.nio.file.Paths;
 import java.security.KeyPair;
 import java.security.PrivateKey;
@@ -114,7 +115,7 @@ public class TestHddsSecureDatanodeInit {
   }
 
   @Before
-  public void setUpDNCertClient(){
+  public void setUpDNCertClient() throws IOException {
 
     FileUtils.deleteQuietly(Paths.get(
         securityConfig.getKeyLocation(DN_COMPONENT).toString(),
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
index 396452f..6aba6f6 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
@@ -73,7 +73,7 @@ public interface CertificateClient {
    *
    * @return certificate or Null if there is no data.
    */
-  X509Certificate getCertificate();
+  X509Certificate getCertificate() throws IOException;
 
   /**
    * Return the latest CA certificate known to the client.
@@ -188,7 +188,7 @@ public interface CertificateClient {
    * Initialize certificate client.
    *
    * */
-  InitResponse init() throws CertificateException;
+  InitResponse init() throws IOException;
 
   /**
    * Represents initialization response of client.
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DNCertificateClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DNCertificateClient.java
index 40c5b0a..056bc00 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DNCertificateClient.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DNCertificateClient.java
@@ -26,6 +26,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 
+import java.io.IOException;
+
 /**
  * Certificate client for DataNodes.
  */
@@ -37,11 +39,11 @@ public class DNCertificateClient extends DefaultCertificateClient {
   public static final String COMPONENT_NAME = "dn";
 
   public DNCertificateClient(SecurityConfig securityConfig,
-      String certSerialId) {
+      String certSerialId) throws IOException {
     super(securityConfig, LOG, certSerialId, COMPONENT_NAME);
   }
 
-  public DNCertificateClient(SecurityConfig securityConfig) {
+  public DNCertificateClient(SecurityConfig securityConfig) throws IOException {
     super(securityConfig, LOG, null, COMPONENT_NAME);
   }
 
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
index d831c83..c751f1f 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
@@ -35,6 +35,8 @@ import java.security.PublicKey;
 import java.security.Signature;
 import java.security.SignatureException;
 import java.security.cert.CertStore;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
 import java.security.cert.X509Certificate;
 import java.security.spec.InvalidKeySpecException;
 import java.util.ArrayList;
@@ -47,6 +49,7 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
@@ -54,6 +57,7 @@ import org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRe
 import org.apache.hadoop.hdds.security.x509.exceptions.CertificateException;
 import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
 import org.apache.hadoop.hdds.security.x509.keys.KeyCodec;
+import org.apache.hadoop.hdds.security.x509.keys.SecurityUtil;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
 
 import com.google.common.base.Preconditions;
@@ -104,7 +108,7 @@ public abstract class DefaultCertificateClient implements CertificateClient {
   private final Lock lock;
 
   DefaultCertificateClient(SecurityConfig securityConfig, Logger log,
-      String certSerialId, String component) {
+      String certSerialId, String component) throws IOException {
     Objects.requireNonNull(securityConfig);
     this.securityConfig = securityConfig;
     keyCodec = new KeyCodec(securityConfig, component);
@@ -117,71 +121,137 @@ public abstract class DefaultCertificateClient implements CertificateClient {
     loadAllCertificates();
   }
 
+  private enum CertType {
+    X509_CERT, CA_CERT, ROOT_CA_CERT
+  }
+
+  private void cacheCertificate(File certFile, CertType certType) {
+    if (certFile.isFile() && certFile.exists()) {
+      try {
+        X509CertificateHolder x509CertificateHolder = CertificateCodec
+            .readCertificate(certFile);
+        X509Certificate cert =
+            CertificateCodec.getX509Certificate(x509CertificateHolder);
+        if (cert != null && cert.getSerialNumber() != null) {
+          switch (certType) {
+          case X509_CERT:
+            x509Certificate = cert;
+            break;
+          case CA_CERT:
+            caCertId = cert.getSerialNumber().toString();
+            break;
+          case ROOT_CA_CERT:
+            rootCaCertId = cert.getSerialNumber().toString();
+            break;
+          default:
+            break;
+          }
+          certificateMap.putIfAbsent(cert.getSerialNumber().toString(),
+              cert);
+          getLogger().info("Added certificate from file:{}.",
+              certFile.getAbsolutePath());
+        } else {
+          getLogger().error("Error reading certificate from file:{}.",
+              certFile.getAbsolutePath());
+        }
+      } catch (java.security.cert.CertificateException | IOException e) {
+        getLogger().error("Error reading certificate from file:{}.",
+            certFile.getAbsolutePath(), e);
+      }
+    } else {
+      getLogger().error("Error reading certificate from file:{}. Check if the" +
+              " certificate file exists in this path.",
+          certFile.getAbsolutePath());
+    }
+  }
+
   /**
    * Load all certificates from configured location.
    * */
-  private void loadAllCertificates() {
-    // See if certs directory exists in file system.
-    Path certPath = securityConfig.getCertificateLocation(component);
-    if (Files.exists(certPath) && Files.isDirectory(certPath)) {
-      getLogger().info("Loading certificate from location:{}.",
-          certPath);
-      File[] certFiles = certPath.toFile().listFiles();
-
-      if (certFiles != null) {
-        CertificateCodec certificateCodec =
-            new CertificateCodec(securityConfig, component);
-        long latestCaCertSerailId = -1L;
-        long latestRootCaCertSerialId = -1L;
-        for (File file : certFiles) {
-          if (file.isFile()) {
-            try {
-              X509CertificateHolder x509CertificateHolder = certificateCodec
-                  .readCertificate(certPath, file.getName());
-              X509Certificate cert =
-                  CertificateCodec.getX509Certificate(x509CertificateHolder);
-              if (cert != null && cert.getSerialNumber() != null) {
-                if (cert.getSerialNumber().toString().equals(certSerialId)) {
-                  x509Certificate = cert;
-                }
-                certificateMap.putIfAbsent(cert.getSerialNumber().toString(),
-                    cert);
-                if (file.getName().startsWith(CA_CERT_PREFIX)) {
-                  String certFileName = FilenameUtils.getBaseName(
-                      file.getName());
-                  long tmpCaCertSerailId = NumberUtils.toLong(
-                      certFileName.substring(CA_CERT_PREFIX_LEN));
-                  if (tmpCaCertSerailId > latestCaCertSerailId) {
-                    latestCaCertSerailId = tmpCaCertSerailId;
+  private void loadAllCertificates() throws IOException {
+    CertificateCodec certificateCodec =
+        new CertificateCodec(securityConfig, component);
+    if (securityConfig.isCustomCAEnabled()) {
+
+      Certificate cert = SecurityUtil.getCustomCertificate(securityConfig);
+      // String caCertsPath = securityConfig.getCaCertsPath();
+      if (cert != null) {
+        CertificateFactory cf = null;
+        try {
+          cf = CertificateFactory.getInstance("X.509");
+          ByteArrayInputStream bais =
+              new ByteArrayInputStream(cert.getEncoded());
+          x509Certificate = (X509Certificate) cf.generateCertificate(bais);
+        } catch (java.security.cert.CertificateException e) {
+          throw new SCMSecurityException("Error while getting " +
+              "the Certificate from Key Store.", e);
+        }
+      } else {
+        throw new SCMSecurityException("Error while getting " +
+            "the Certificate from Key Store.");
+      }
+    } else {
+      // See if certs directory exists in file system.
+      Path certPath = securityConfig.getCertificateLocation(component);
+      if (Files.exists(certPath) && Files.isDirectory(certPath)) {
+        getLogger().info("Loading certificates from location:{}.",
+            certPath);
+        File[] certFiles = certPath.toFile().listFiles();
+
+        if (certFiles != null) {
+          long latestCaCertSerialId = -1L;
+          long latestRootCaCertSerialId = -1L;
+          for (File file : certFiles) {
+            if (file.isFile()) {
+              try {
+                X509CertificateHolder x509CertificateHolder = certificateCodec
+                    .readCertificate(certPath, file.getName());
+                X509Certificate cert =
+                    CertificateCodec.getX509Certificate(x509CertificateHolder);
+                if (cert != null && cert.getSerialNumber() != null) {
+                  if (cert.getSerialNumber().toString().equals(certSerialId)) {
+                    x509Certificate = cert;
+                  }
+                  certificateMap.putIfAbsent(cert.getSerialNumber().toString(),
+                      cert);
+                  if (file.getName().startsWith(CA_CERT_PREFIX)) {
+                    String certFileName = FilenameUtils.getBaseName(
+                        file.getName());
+                    long tmpCaCertSerialId = NumberUtils.toLong(
+                        certFileName.substring(CA_CERT_PREFIX_LEN));
+                    if (tmpCaCertSerialId > latestCaCertSerialId) {
+                      latestCaCertSerialId = tmpCaCertSerialId;
+                    }
                   }
-                }
 
-                if (file.getName().startsWith(ROOT_CA_CERT_PREFIX)) {
-                  String certFileName = FilenameUtils.getBaseName(
-                      file.getName());
-                  long tmpRootCaCertSerailId = NumberUtils.toLong(
-                      certFileName.substring(ROOT_CA_PREFIX_LEN));
-                  if (tmpRootCaCertSerailId > latestRootCaCertSerialId) {
-                    latestRootCaCertSerialId = tmpRootCaCertSerailId;
+                  if (file.getName().startsWith(ROOT_CA_CERT_PREFIX)) {
+                    String certFileName = FilenameUtils.getBaseName(
+                        file.getName());
+                    long tmpRootCaCertSerailId = NumberUtils.toLong(
+                        certFileName.substring(ROOT_CA_PREFIX_LEN));
+                    if (tmpRootCaCertSerailId > latestRootCaCertSerialId) {
+                      latestRootCaCertSerialId = tmpRootCaCertSerailId;
+                    }
                   }
+                  getLogger().info("Added certificate from file:{}.",
+                      file.getAbsolutePath());
+                } else {
+                  getLogger().error("Error reading certificate from file:{}",
+                      file);
                 }
-                getLogger().info("Added certificate from file:{}.",
-                    file.getAbsolutePath());
-              } else {
-                getLogger().error("Error reading certificate from file:{}",
-                    file);
+              } catch (java.security.cert.CertificateException |
+                  IOException e) {
+                getLogger().error("Error reading certificate from file:{}.",
+                    file.getAbsolutePath(), e);
               }
-            } catch (java.security.cert.CertificateException | IOException e) {
-              getLogger().error("Error reading certificate from file:{}.",
-                  file.getAbsolutePath(), e);
             }
           }
-        }
-        if (latestCaCertSerailId != -1) {
-          caCertId = Long.toString(latestCaCertSerailId);
-        }
-        if (latestRootCaCertSerialId != -1) {
-          rootCaCertId = Long.toString(latestRootCaCertSerialId);
+          if (latestCaCertSerialId != -1) {
+            caCertId = Long.toString(latestCaCertSerialId);
+          }
+          if (latestRootCaCertSerialId != -1) {
+            rootCaCertId = Long.toString(latestRootCaCertSerialId);
+          }
         }
       }
     }
@@ -199,15 +269,26 @@ public abstract class DefaultCertificateClient implements CertificateClient {
       return privateKey;
     }
 
-    Path keyPath = securityConfig.getKeyLocation(component);
-    if (OzoneSecurityUtil.checkIfFileExist(keyPath,
-        securityConfig.getPrivateKeyFileName())) {
+    if (securityConfig.isCustomCAEnabled()) {
+      KeyPair customKeyPair = null;
       try {
-        privateKey = keyCodec.readPrivateKey();
-      } catch (InvalidKeySpecException | NoSuchAlgorithmException
-          | IOException e) {
+        customKeyPair = SecurityUtil.getCustomKeyPair(securityConfig);
+        privateKey = customKeyPair.getPrivate();
+        return privateKey;
+      } catch (Exception e) {
         getLogger().error("Error while getting private key.", e);
       }
+    } else {
+      Path keyPath = securityConfig.getKeyLocation(component);
+      if (OzoneSecurityUtil.checkIfFileExist(keyPath,
+          securityConfig.getPrivateKeyFileName())) {
+        try {
+          privateKey = keyCodec.readPrivateKey();
+        } catch (InvalidKeySpecException | NoSuchAlgorithmException
+            | IOException e) {
+          getLogger().error("Error while getting private key.", e);
+        }
+      }
     }
     return privateKey;
   }
@@ -223,15 +304,26 @@ public abstract class DefaultCertificateClient implements CertificateClient {
       return publicKey;
     }
 
-    Path keyPath = securityConfig.getKeyLocation(component);
-    if (OzoneSecurityUtil.checkIfFileExist(keyPath,
-        securityConfig.getPublicKeyFileName())) {
+    if (securityConfig.isCustomCAEnabled()) {
+      KeyPair customKeyPair = null;
       try {
-        publicKey = keyCodec.readPublicKey();
-      } catch (InvalidKeySpecException | NoSuchAlgorithmException
-          | IOException e) {
+        customKeyPair = SecurityUtil.getCustomKeyPair(securityConfig);
+        publicKey = customKeyPair.getPublic();
+        return publicKey;
+      } catch (Exception e) {
         getLogger().error("Error while getting public key.", e);
       }
+    } else {
+      Path keyPath = securityConfig.getKeyLocation(component);
+      if (OzoneSecurityUtil.checkIfFileExist(keyPath,
+          securityConfig.getPublicKeyFileName())) {
+        try {
+          publicKey = keyCodec.readPublicKey();
+        } catch (InvalidKeySpecException | NoSuchAlgorithmException
+            | IOException e) {
+          getLogger().error("Error while getting public key.", e);
+        }
+      }
     }
     return publicKey;
   }
@@ -242,20 +334,23 @@ public abstract class DefaultCertificateClient implements CertificateClient {
    * @return certificate or Null if there is no data.
    */
   @Override
-  public X509Certificate getCertificate() {
+  public X509Certificate getCertificate() throws IOException {
     if (x509Certificate != null) {
       return x509Certificate;
     }
 
-    if (certSerialId == null) {
-      getLogger().error("Default certificate serial id is not set. Can't " +
-          "locate the default certificate for this client.");
-      return null;
-    }
     // Refresh the cache from file system.
     loadAllCertificates();
-    if (certificateMap.containsKey(certSerialId)) {
-      x509Certificate = certificateMap.get(certSerialId);
+    if (!securityConfig.isCustomCAEnabled()) {
+      if (certSerialId == null) {
+        getLogger().error("Default certificate serial id is not set. Can't " +
+            "locate the default certificate for this client.");
+        return null;
+      }
+
+      if (certificateMap.containsKey(certSerialId)) {
+        x509Certificate = certificateMap.get(certSerialId);
+      }
     }
     return x509Certificate;
   }
@@ -275,7 +370,7 @@ public abstract class DefaultCertificateClient implements CertificateClient {
   /**
    * Returns the certificate  with the specified certificate serial id if it
    * exists else try to get it from SCM.
-   * @param  certId
+   * @param certId Certificate Serial ID
    *
    * @return certificate or Null if there is no data.
    */
@@ -286,8 +381,17 @@ public abstract class DefaultCertificateClient implements CertificateClient {
     if (certificateMap.containsKey(certId)) {
       return certificateMap.get(certId);
     }
-    // Try to get it from SCM.
-    return this.getCertificateFromScm(certId);
+
+    if (!securityConfig.isCustomCAEnabled()) {
+      // Try to get it from SCM.
+      if (getLogger().isDebugEnabled()) {
+        getLogger().debug("Certificate {} not found in cache. Getting it " +
+            "from SCM.", certId);
+      }
+      return this.getCertificateFromScm(certId);
+    }
+    throw new CertificateException(String.format("Certificate for serial id " +
+        "%s not found in cache.", certId));
   }
 
   @Override
@@ -686,7 +790,7 @@ public abstract class DefaultCertificateClient implements CertificateClient {
    *
    */
   @Override
-  public synchronized InitResponse init() throws CertificateException {
+  public synchronized InitResponse init() throws IOException {
     int initCase = 0;
     PrivateKey pvtKey= getPrivateKey();
     PublicKey pubKey = getPublicKey();
@@ -711,62 +815,92 @@ public abstract class DefaultCertificateClient implements CertificateClient {
   /**
    * Default handling of each {@link InitCase}.
    * */
-  protected InitResponse handleCase(InitCase init)
-      throws CertificateException {
-    switch (init) {
-    case NONE:
-      getLogger().info("Creating keypair for client as keypair and " +
-          "certificate not found.");
-      bootstrapClientKeys();
-      return GETCERT;
-    case CERT:
-      getLogger().error("Private key not found, while certificate is still" +
-          " present. Delete keypair and try again.");
-      return FAILURE;
-    case PUBLIC_KEY:
-      getLogger().error("Found public key but private key and certificate " +
-          "missing.");
-      return FAILURE;
-    case PRIVATE_KEY:
-      getLogger().info("Found private key but public key and certificate " +
-          "is missing.");
-      // TODO: Recovering public key from private might be possible in some
-      //  cases.
-      return FAILURE;
-    case PUBLICKEY_CERT:
-      getLogger().error("Found public key and certificate but private " +
-          "key is missing.");
-      return FAILURE;
-    case PRIVATEKEY_CERT:
-      getLogger().info("Found private key and certificate but public key" +
-          " missing.");
-      if (recoverPublicKey()) {
-        return SUCCESS;
-      } else {
-        getLogger().error("Public key recovery failed.");
+  protected InitResponse handleCase(InitCase init) throws IOException {
+    if (securityConfig.isCustomCAEnabled()) {
+      // For customCA, we can only handle the case when
+      // private key and certificate is available, but public key is missing.
+      // For all other cases, we cannot recover and should fail.
+      switch (init) {
+      case PRIVATEKEY_CERT:
+        getLogger().info("Found private key and certificate but public key" +
+            " missing.");
+        if (recoverPublicKey()) {
+          getLogger().info("Successfully recovered the Public Key.");
+          return SUCCESS;
+        } else {
+          getLogger().error("Public key recovery failed.");
+          return FAILURE;
+        }
+      case ALL:
+        getLogger().info("Found certificate file along with KeyPair.");
+        if (validateKeyPairAndCertificate()) {
+          return SUCCESS;
+        } else {
+          return FAILURE;
+        }
+      default:
+        getLogger().error("Private key or Certificate is missing. Cannot " +
+            "recover from this state when custom CA is enabled.");
+
         return FAILURE;
       }
-    case PUBLICKEY_PRIVATEKEY:
-      getLogger().info("Found private and public key but certificate is" +
-          " missing.");
-      if (validateKeyPair(getPublicKey())) {
+
+    } else {
+      switch (init) {
+      case NONE:
+        getLogger().info("Creating keypair for client as keypair and " +
+            "certificate not found.");
+        bootstrapClientKeys();
         return GETCERT;
-      } else {
-        getLogger().info("Keypair validation failed.");
+      case CERT:
+        getLogger().error("Private key not found, while certificate is still" +
+            " present. Delete keypair and try again.");
         return FAILURE;
-      }
-    case ALL:
-      getLogger().info("Found certificate file along with KeyPair.");
-      if (validateKeyPairAndCertificate()) {
-        return SUCCESS;
-      } else {
+      case PUBLIC_KEY:
+        getLogger().error("Found public key but private key and certificate " +
+            "missing.");
         return FAILURE;
-      }
-    default:
-      getLogger().error("Unexpected case: {} (private/public/cert)",
-          Integer.toBinaryString(init.ordinal()));
+      case PRIVATE_KEY:
+        getLogger().info("Found private key but public key and certificate " +
+            "is missing.");
+        // TODO: Recovering public key from private might be possible in some
+        //  cases.
+        return FAILURE;
+      case PUBLICKEY_CERT:
+        getLogger().error("Found public key and certificate but private " +
+            "key is missing.");
+        return FAILURE;
+      case PRIVATEKEY_CERT:
+        getLogger().info("Found private key and certificate but public key" +
+            " missing.");
+        if (recoverPublicKey()) {
+          return SUCCESS;
+        } else {
+          getLogger().error("Public key recovery failed.");
+          return FAILURE;
+        }
+      case PUBLICKEY_PRIVATEKEY:
+        getLogger().info("Found private and public key but certificate is" +
+            " missing.");
+        if (validateKeyPair(getPublicKey())) {
+          return GETCERT;
+        } else {
+          getLogger().info("Keypair validation failed.");
+          return FAILURE;
+        }
+      case ALL:
+        getLogger().info("Found certificate file along with KeyPair.");
+        if (validateKeyPairAndCertificate()) {
+          return SUCCESS;
+        } else {
+          return FAILURE;
+        }
+      default:
+        getLogger().error("Unexpected case: {} (private/public/cert)",
+            Integer.toBinaryString(init.ordinal()));
 
-      return FAILURE;
+        return FAILURE;
+      }
     }
   }
 
@@ -774,7 +908,7 @@ public abstract class DefaultCertificateClient implements CertificateClient {
    * Validate keypair and certificate.
    * */
   protected boolean validateKeyPairAndCertificate() throws
-      CertificateException {
+      IOException {
     if (validateKeyPair(getPublicKey())) {
       getLogger().info("Keypair validated.");
       // TODO: Certificates cryptographic validity can be checked as well.
@@ -796,7 +930,7 @@ public abstract class DefaultCertificateClient implements CertificateClient {
    * Tries to recover public key from certificate. Also validates recovered
    * public key.
    * */
-  protected boolean recoverPublicKey() throws CertificateException {
+  protected boolean recoverPublicKey() throws IOException {
     PublicKey pubKey = getCertificate().getPublicKey();
     try {
 
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/OMCertificateClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/OMCertificateClient.java
index 7aea596..6f563eb 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/OMCertificateClient.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/OMCertificateClient.java
@@ -26,6 +26,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.exceptions.CertificateException;
 
+import java.io.IOException;
+
 import static org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient.InitResponse.FAILURE;
 import static org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient.InitResponse.GETCERT;
 import static org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient.InitResponse.RECOVER;
@@ -42,24 +44,23 @@ public class OMCertificateClient extends DefaultCertificateClient {
   public static final String COMPONENT_NAME = "om";
 
   public OMCertificateClient(SecurityConfig securityConfig,
-      String certSerialId, String localCrlId) {
+      String certSerialId, String localCrlId) throws IOException {
     super(securityConfig, LOG, certSerialId, COMPONENT_NAME);
     this.setLocalCrlId(localCrlId!=null ?
         Long.parseLong(localCrlId): 0);
   }
 
   public OMCertificateClient(SecurityConfig securityConfig,
-      String certSerialId) {
+      String certSerialId) throws IOException{
     this(securityConfig, certSerialId, null);
   }
 
-  public OMCertificateClient(SecurityConfig securityConfig) {
+  public OMCertificateClient(SecurityConfig securityConfig) throws IOException {
     this(securityConfig, null, null);
   }
 
   @Override
-  protected InitResponse handleCase(InitCase init) throws
-      CertificateException {
+  protected InitResponse handleCase(InitCase init) throws IOException {
     switch (init) {
     case NONE:
       LOG.info("Creating keypair for client as keypair and certificate not " +
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/SCMCertificateClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/SCMCertificateClient.java
index d941d88..ec33e85 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/SCMCertificateClient.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/SCMCertificateClient.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.nio.file.Paths;
 
 import static org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient.InitResponse.FAILURE;
@@ -47,22 +48,23 @@ public class SCMCertificateClient extends DefaultCertificateClient {
           OzoneConsts.SCM_SUB_CA_PATH).toString();
 
   public SCMCertificateClient(SecurityConfig securityConfig,
-      String certSerialId) {
+      String certSerialId) throws IOException {
     super(securityConfig, LOG, certSerialId, COMPONENT_NAME);
   }
 
-  public SCMCertificateClient(SecurityConfig securityConfig) {
+  public SCMCertificateClient(SecurityConfig securityConfig)
+      throws IOException {
     super(securityConfig, LOG, null, COMPONENT_NAME);
   }
 
   public SCMCertificateClient(SecurityConfig securityConfig,
-      String certSerialId, String component) {
+      String certSerialId, String component) throws IOException {
     super(securityConfig, LOG, certSerialId, component);
   }
 
   @Override
   protected InitResponse handleCase(InitCase init)
-      throws CertificateException {
+      throws IOException {
     // This is similar to OM.
     switch (init) {
     case NONE:
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/keys/KeyCodec.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/keys/KeyCodec.java
index e57510c..de48a4e 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/keys/KeyCodec.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/keys/KeyCodec.java
@@ -22,6 +22,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.output.FileWriterWithEncoding;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.bouncycastle.util.io.pem.PemObject;
 import org.bouncycastle.util.io.pem.PemReader;
@@ -41,9 +43,12 @@ import java.nio.file.Paths;
 import java.nio.file.attribute.PosixFilePermission;
 import java.security.KeyFactory;
 import java.security.KeyPair;
+import java.security.KeyStoreException;
 import java.security.NoSuchAlgorithmException;
 import java.security.PrivateKey;
 import java.security.PublicKey;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
 import java.security.spec.InvalidKeySpecException;
 import java.security.spec.PKCS8EncodedKeySpec;
 import java.security.spec.X509EncodedKeySpec;
@@ -76,6 +81,8 @@ public class KeyCodec {
       Stream.of(OWNER_READ, OWNER_WRITE)
           .collect(Collectors.toSet());
   private Supplier<Boolean> isPosixFileSystem;
+  private PrivateKey customPrivateKey;
+  private PublicKey customPublicKey;
 
   /**
    * Creates a KeyCodec with component name.
@@ -178,11 +185,15 @@ public class KeyCodec {
    * @throws IOException - On I/O failure.
    */
   public void writePublicKey(PublicKey key) throws IOException {
+    if (securityConfig.isCustomCAEnabled()) {
+      throw new SCMSecurityException("Cannot write public key when " +
+          HddsConfigKeys.HDDS_CUSTOM_ROOT_CA_ENABLED + " is set to true.");
+    }
     File publicKeyFile = Paths.get(location.toString(),
         securityConfig.getPublicKeyFileName()).toFile();
 
     if (Files.exists(publicKeyFile.toPath())) {
-      throw new IOException("Private key already exist.");
+      throw new IOException("Public key already exists. Cannot overwrite.");
     }
 
     try (PemWriter keyWriter = new PemWriter(new
@@ -230,30 +241,21 @@ public class KeyCodec {
   private PKCS8EncodedKeySpec readKey(Path basePath, String keyFileName)
       throws IOException {
     File fileName = Paths.get(basePath.toString(), keyFileName).toFile();
-    String keyData = FileUtils.readFileToString(fileName, DEFAULT_CHARSET);
-    final byte[] pemContent;
-    try (PemReader pemReader = new PemReader(new StringReader(keyData))) {
-      PemObject keyObject = pemReader.readPemObject();
-      pemContent = keyObject.getContent();
-    }
-    return new PKCS8EncodedKeySpec(pemContent);
+    return readKey(fileName);
   }
 
   /**
-   * Returns a Private Key from a PEM encoded file.
+   * Returns a Private Key from a PKCS8EncodedKeySpec.
    *
-   * @param basePath - base path
-   * @param privateKeyFileName - private key file name.
+   * @param encodedKeySpec - PKCS8EncodedKeySpec of the Private Key.
    * @return PrivateKey
    * @throws InvalidKeySpecException  - on Error.
    * @throws NoSuchAlgorithmException - on Error.
    * @throws IOException              - on Error.
    */
-  public PrivateKey readPrivateKey(Path basePath, String privateKeyFileName)
+  public PrivateKey readPrivateKey(PKCS8EncodedKeySpec encodedKeySpec)
       throws InvalidKeySpecException, NoSuchAlgorithmException, IOException {
-    PKCS8EncodedKeySpec encodedKeySpec = readKey(basePath, privateKeyFileName);
-    final KeyFactory keyFactory =
-        KeyFactory.getInstance(securityConfig.getKeyAlgo());
+    KeyFactory keyFactory = KeyFactory.getInstance(securityConfig.getKeyAlgo());
     return
         keyFactory.generatePrivate(encodedKeySpec);
   }
@@ -267,34 +269,57 @@ public class KeyCodec {
    */
   public PublicKey readPublicKey() throws InvalidKeySpecException,
       NoSuchAlgorithmException, IOException {
-    return readPublicKey(this.location.toAbsolutePath(),
-        securityConfig.getPublicKeyFileName());
+    PKCS8EncodedKeySpec encodedKeySpec =
+        readKey(this.location.toAbsolutePath(),
+            securityConfig.getPublicKeyFileName());
+    return readPublicKey(encodedKeySpec);
   }
 
   /**
-   * Returns a public key from a PEM encoded file.
+   * Returns a Public Key from a PKCS8EncodedKeySpec.
    *
-   * @param basePath - base path.
-   * @param publicKeyFileName - public key file name.
+   * @param encodedKeySpec - PKCS8EncodedKeySpec of the Public Key.
    * @return PublicKey
    * @throws NoSuchAlgorithmException - on Error.
    * @throws InvalidKeySpecException  - on Error.
    * @throws IOException              - on Error.
    */
-  public PublicKey readPublicKey(Path basePath, String publicKeyFileName)
+  public PublicKey readPublicKey(PKCS8EncodedKeySpec encodedKeySpec)
       throws NoSuchAlgorithmException, InvalidKeySpecException, IOException {
-    PKCS8EncodedKeySpec encodedKeySpec = readKey(basePath, publicKeyFileName);
-    final KeyFactory keyFactory =
-        KeyFactory.getInstance(securityConfig.getKeyAlgo());
+    KeyFactory keyFactory = KeyFactory.getInstance(securityConfig.getKeyAlgo());
     return
         keyFactory.generatePublic(
             new X509EncodedKeySpec(encodedKeySpec.getEncoded()));
 
   }
 
+  private void loadCustomKeys() throws NoSuchAlgorithmException,
+      IOException, KeyStoreException, CertificateException,
+      UnrecoverableKeyException {
+    KeyPair keyPair = SecurityUtil.getCustomKeyPair(securityConfig);
+    assert keyPair != null;
+    customPrivateKey = keyPair.getPrivate();
+    customPublicKey = keyPair.getPublic();
+  }
+  /**
+   * Returns the custom public key for custom Root CA setup.
+   * @return PublicKey.
+   * @throws InvalidKeySpecException - On Error.
+   * @throws NoSuchAlgorithmException - On Error.
+   * @throws IOException - On Error.
+   */
+  public PublicKey readCustomPublicKey() throws InvalidKeySpecException,
+      NoSuchAlgorithmException, IOException, KeyStoreException,
+      CertificateException, UnrecoverableKeyException {
+    if (customPublicKey == null) {
+      loadCustomKeys();
+    }
+
+    return customPublicKey;
+  }
 
   /**
-   * Returns the private key  using defaults.
+   * Returns the private key using defaults.
    * @return PrivateKey.
    * @throws InvalidKeySpecException - On Error.
    * @throws NoSuchAlgorithmException - On Error.
@@ -302,11 +327,11 @@ public class KeyCodec {
    */
   public PrivateKey readPrivateKey() throws InvalidKeySpecException,
       NoSuchAlgorithmException, IOException {
-    return readPrivateKey(this.location.toAbsolutePath(),
+    PKCS8EncodedKeySpec encodedKeySpec = readKey(this.location.toAbsolutePath(),
         securityConfig.getPrivateKeyFileName());
+    return readPrivateKey(encodedKeySpec);
   }
 
-
   /**
    * Helper function that actually writes data to the files.
    *
@@ -317,8 +342,10 @@ public class KeyCodec {
    * @param force - forces overwriting the keys.
    * @throws IOException - On I/O failure.
    */
-  private synchronized void writeKey(Path basePath, KeyPair keyPair,
-      String privateKeyFileName, String publicKeyFileName, boolean force)
+  @VisibleForTesting
+  synchronized void writeKey(Path basePath, KeyPair keyPair,
+                             String privateKeyFileName,
+                             String publicKeyFileName, boolean force)
       throws IOException {
     checkPreconditions(basePath);
 
@@ -406,4 +433,37 @@ public class KeyCodec {
     }
   }
 
+  /**
+   * Returns the custom private key for custom Root CA setup.
+   * @return PrivateKey.
+   * @throws NoSuchAlgorithmException - On Error.
+   * @throws IOException - On Error.
+   */
+  public PrivateKey readCustomPrivateKey() throws NoSuchAlgorithmException,
+      IOException, UnrecoverableKeyException, CertificateException,
+      KeyStoreException {
+    if (customPrivateKey == null) {
+      loadCustomKeys();
+    }
+
+    return customPrivateKey;
+  }
+
+  /**
+   * Reads a Key from the PEM Encoded Store.
+   *
+   * @param path - Path, Location where the Key file is stored.
+   * @return PrivateKey Object.
+   * @throws IOException - on Error.
+   */
+  protected PKCS8EncodedKeySpec readKey(File path)
+      throws IOException {
+    String keyData = FileUtils.readFileToString(path, DEFAULT_CHARSET);
+    byte[] pemContent;
+    try (PemReader pemReader = new PemReader(new StringReader(keyData))) {
+      PemObject keyObject = pemReader.readPemObject();
+      pemContent = keyObject.getContent();
+    }
+    return new PKCS8EncodedKeySpec(pemContent);
+  }
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/keys/SecurityUtil.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/keys/SecurityUtil.java
index 6147d3a..d4e1eb4 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/keys/SecurityUtil.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/keys/SecurityUtil.java
@@ -18,14 +18,7 @@
  */
 package org.apache.hadoop.hdds.security.x509.keys;
 
-import java.security.KeyFactory;
-import java.security.NoSuchAlgorithmException;
-import java.security.NoSuchProviderException;
-import java.security.PrivateKey;
-import java.security.PublicKey;
-import java.security.spec.InvalidKeySpecException;
-import java.security.spec.PKCS8EncodedKeySpec;
-import java.security.spec.X509EncodedKeySpec;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.exceptions.CertificateException;
 import org.bouncycastle.asn1.ASN1ObjectIdentifier;
@@ -37,6 +30,23 @@ import org.bouncycastle.asn1.x500.X500Name;
 import org.bouncycastle.asn1.x509.Extensions;
 import org.bouncycastle.pkcs.PKCS10CertificationRequest;
 
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.Key;
+import java.security.KeyFactory;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.NoSuchProviderException;
+import java.security.PrivateKey;
+import java.security.PublicKey;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.Certificate;
+import java.security.spec.InvalidKeySpecException;
+import java.security.spec.PKCS8EncodedKeySpec;
+import java.security.spec.X509EncodedKeySpec;
+
 /**
  * Utility functions for Security modules for Ozone.
  */
@@ -118,12 +128,12 @@ public final class SecurityUtil {
   public static PublicKey getPublicKey(byte[] encodedKey,
       SecurityConfig secureConfig) {
     PublicKey key = null;
+    KeyFactory kf = null;
     if (encodedKey == null || encodedKey.length == 0) {
       return null;
     }
 
     try {
-      KeyFactory kf = null;
       kf = KeyFactory.getInstance(secureConfig.getKeyAlgo(),
           secureConfig.getProvider());
       key = kf.generatePublic(new X509EncodedKeySpec(encodedKey));
@@ -135,4 +145,68 @@ public final class SecurityUtil {
     return key;
   }
 
+  public static KeyStore getCustomKeystore(SecurityConfig securityConfig) throws
+      IOException, KeyStoreException, java.security.cert.CertificateException,
+      NoSuchAlgorithmException {
+    String keystorePath = securityConfig.getKeystoreFilePath();
+    char[] keystoreFilePassword = securityConfig.getKeystoreFilePassword();
+
+    KeyStore keystore;
+    try (FileInputStream is = new FileInputStream(keystorePath)) {
+      keystore = KeyStore.getInstance(KeyStore.getDefaultType());
+      keystore.load(is, keystoreFilePassword);
+    }
+    return keystore;
+  }
+
+  public static KeyStore getCustomTruststore(SecurityConfig securityConfig)
+      throws IOException, KeyStoreException, NoSuchAlgorithmException,
+      java.security.cert.CertificateException {
+    String truststorePath = securityConfig.getTruststoreFilePath();
+    char[] truststoreFilePassword = securityConfig.getTruststorePassword();
+
+    KeyStore keystore;
+    try (FileInputStream is = new FileInputStream(truststorePath)) {
+      keystore = KeyStore.getInstance(KeyStore.getDefaultType());
+      keystore.load(is, truststoreFilePassword);
+    }
+    return keystore;
+  }
+
+  public static KeyPair getCustomKeyPair(SecurityConfig securityConfig) throws
+      IOException, KeyStoreException, java.security.cert.CertificateException,
+      NoSuchAlgorithmException, UnrecoverableKeyException {
+
+    KeyStore keystore = getCustomKeystore(securityConfig);
+    char[] keystoreKeyPassword = securityConfig.getKeystoreKeyPassword();
+
+    String keyAlias = keystore.aliases().nextElement();
+
+    Key key = keystore.getKey(keyAlias, keystoreKeyPassword);
+    if (key instanceof PrivateKey) {
+      // Get certificate of public key
+      Certificate cert = keystore.getCertificate(keyAlias);
+      return new KeyPair(cert.getPublicKey(), (PrivateKey) key);
+    }
+    return null;
+  }
+
+  public static Certificate getCustomCertificate(SecurityConfig securityConfig)
+      throws IOException {
+    try {
+      KeyStore keystore = getCustomKeystore(securityConfig);
+      char[] keystoreKeyPassword = securityConfig.getKeystoreKeyPassword();
+      String keyAlias = keystore.aliases().nextElement();
+
+      Key key = keystore.getKey(keyAlias, keystoreKeyPassword);
+      if (key instanceof PrivateKey) {
+        // Get certificate of public key
+        return keystore.getCertificate(keyAlias);
+      }
+    } catch (Exception ex) {
+      throw new SCMSecurityException("Error while getting Certificate from " +
+          "keystore in " + securityConfig.getKeystoreFilePath(), ex);
+    }
+    return null;
+  }
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
index 1c39892..37005d6 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with this
  * work for additional information regarding copyright ownership.  The ASF
@@ -34,15 +34,17 @@ import org.apache.hadoop.hdds.scm.proxy.SCMBlockLocationFailoverProxyProvider;
 import org.apache.hadoop.hdds.scm.proxy.SCMClientConfig;
 import org.apache.hadoop.hdds.scm.proxy.SCMContainerLocationFailoverProxyProvider;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
+import org.apache.hadoop.hdds.security.x509.keys.SecurityUtil;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
-import org.apache.hadoop.hdds.utils.db.DBDefinition;
 import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
+import org.apache.hadoop.hdds.utils.db.DBDefinition;
 import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
 import org.apache.hadoop.hdds.utils.db.RocksDBConfiguration;
 import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
@@ -57,10 +59,13 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.security.KeyStore;
+import java.security.cert.Certificate;
 import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Enumeration;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
@@ -362,7 +367,17 @@ public final class HAUtils {
     long waitDuration =
         configuration.getTimeDuration(OZONE_SCM_CA_LIST_RETRY_INTERVAL,
             OZONE_SCM_CA_LIST_RETRY_INTERVAL_DEFAULT, TimeUnit.SECONDS);
-    if (certClient != null) {
+    SecurityConfig securityConfig = new SecurityConfig(configuration);
+
+    if (securityConfig.isCustomCAEnabled()) {
+      List<String> caCertsPem = new ArrayList<>();
+      List<X509Certificate> caCerts = buildCAX509List(certClient,
+          configuration);
+      for (X509Certificate cert: caCerts) {
+        caCertsPem.add(CertificateCodec.getPEMEncodedString(cert));
+      }
+      return caCertsPem;
+    } else if (certClient != null) {
       if (!SCMHAUtils.isSCMHAEnabled(configuration)) {
         return generateCAList(certClient);
       } else {
@@ -474,11 +489,32 @@ public final class HAUtils {
   public static List<X509Certificate> buildCAX509List(
       CertificateClient certClient,
       ConfigurationSource conf) throws IOException {
+    SecurityConfig securityConfig = new SecurityConfig(conf);
+    List<X509Certificate> x509Certificates = new ArrayList<>();
+    if (securityConfig.isCustomCAEnabled()) {
+      // Build CA list from trust store
+      try {
+        KeyStore truststore = SecurityUtil.getCustomTruststore(securityConfig);
+        for (Enumeration<String> e = truststore.aliases();
+             e.hasMoreElements();) {
+          String alias = e.nextElement();
+          if (truststore.isCertificateEntry(alias)) {
+            Certificate cert = truststore.getCertificate(alias);
+            if (cert instanceof X509Certificate) {
+              x509Certificates.add((X509Certificate)cert);
+            }
+          }
+        }
+        return x509Certificates;
+      } catch (Exception ex) {
+        throw new SCMSecurityException("Error while getting Truststore in " +
+            securityConfig.getTruststoreFilePath(), ex);
+      }
+    }
     if (certClient != null) {
       // Do this here to avoid extra conversion of X509 to pem and again to
       // X509 by buildCAList.
       if (!SCMHAUtils.isSCMHAEnabled(conf)) {
-        List<X509Certificate> x509Certificates = new ArrayList<>();
         if (certClient.getRootCACertificate() != null) {
           x509Certificates.add(certClient.getRootCACertificate());
         }
@@ -489,5 +525,4 @@ public final class HAUtils {
     List<String> pemEncodedCerts = HAUtils.buildCAList(certClient, conf);
     return OzoneSecurityUtil.convertToX509(pemEncodedCerts);
   }
-
 }
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/TestDefaultCertificateClient.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/TestDefaultCertificateClient.java
index 99dc67e..569014e 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/TestDefaultCertificateClient.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/TestDefaultCertificateClient.java
@@ -111,7 +111,7 @@ public class TestDefaultCertificateClient {
     getCertClient();
   }
 
-  private void getCertClient() {
+  private void getCertClient() throws IOException {
     omCertClient = new OMCertificateClient(omSecurityConfig, certSerialId);
     dnCertClient = new DNCertificateClient(dnSecurityConfig, certSerialId);
   }
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/keys/TestKeyCodec.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/keys/TestKeyCodec.java
index 2540956..a883667 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/keys/TestKeyCodec.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/keys/TestKeyCodec.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.hdds.security.x509.keys;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_METADATA_DIR_NAME;
 import static org.junit.Assert.assertNotNull;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -171,6 +172,32 @@ public class TestKeyCodec {
   }
 
   /**
+   * Assert reading key file returns PKCS8EncodedKeySpec.
+   *
+   * @throws NoSuchProviderException - On Error, due to missing Java
+   * dependencies.
+   * @throws NoSuchAlgorithmException - On Error, due to missing Java
+   * dependencies.
+   * @throws IOException - On I/O failure.
+   * @throws InvalidKeySpecException - on Invalid Key Spec.
+   */
+  @Test
+  public void testReadWritePrivateKey() throws NoSuchAlgorithmException,
+      NoSuchProviderException, IOException, InvalidKeySpecException {
+    KeyPair kp = keyGenerator.generateKey();
+    KeyCodec keyCodec = new KeyCodec(securityConfig, component);
+    Path location = securityConfig.getKeyLocation(component);
+    keyCodec.writeKey(location, kp, "prk", "pbk", false);
+
+    // Read private key
+    File prkLocation = Paths.get(location.toString(), "prk").toFile();
+    PKCS8EncodedKeySpec pkcs8EncodedKeySpec = keyCodec.readKey(prkLocation);
+    KeyFactory kf = KeyFactory.getInstance("RSA");
+    PrivateKey pk = kf.generatePrivate(pkcs8EncodedKeySpec);
+    Assert.assertEquals(pk, kp.getPrivate());
+  }
+
+  /**
    * Assert key rewrite fails without force option.
    *
    * @throws IOException - on I/O failure.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/HASecurityUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/HASecurityUtils.java
index 5023e93..54ed61e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/HASecurityUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/HASecurityUtils.java
@@ -326,7 +326,7 @@ public final class HASecurityUtils {
    * @return
    */
   public static GrpcTlsConfig createSCMRatisTLSConfig(SecurityConfig conf,
-      CertificateClient certificateClient) {
+      CertificateClient certificateClient) throws IOException {
     if (conf.isSecurityEnabled() && conf.isGrpcTlsEnabled()) {
       return new GrpcTlsConfig(
           certificateClient.getPrivateKey(), certificateClient.getCertificate(),
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 0db807c..5d52b95 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -356,7 +356,9 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     // Authenticate SCM if security is enabled, this initialization can only
     // be done after the metadata store is initialized.
     if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
-      initializeCAnSecurityProtocol(conf, configurator);
+      if (!securityConfig.isCustomCAEnabled()) {
+        initializeCAnSecurityProtocol(conf, configurator);
+      }
     } else {
       // if no Security, we do not create a Certificate Server at all.
       // This allows user to boot SCM without security temporarily
@@ -464,10 +466,11 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
 
   }
 
-  private void initializeCertificateClient() {
+  private void initializeCertificateClient() throws IOException {
     securityConfig = new SecurityConfig(configuration);
     if (OzoneSecurityUtil.isSecurityEnabled(configuration) &&
-        scmStorageConfig.checkPrimarySCMIdInitialized()) {
+        scmStorageConfig.checkPrimarySCMIdInitialized() &&
+        !securityConfig.isCustomCAEnabled()) {
       scmCertificateClient = new SCMCertificateClient(
           securityConfig, scmStorageConfig.getScmCertSerialId());
     }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
index 29f19eb..b068a69 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
@@ -84,7 +84,7 @@ public class TestContainerServer {
   private static CertificateClient caClient;
 
   @BeforeClass
-  public static void setup() {
+  public static void setup() throws IOException {
     CONF.set(HddsConfigKeys.HDDS_METADATA_DIR_NAME, TEST_DIR);
     caClient = new DNCertificateClient(new SecurityConfig(CONF));
   }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 77bec6e..cedfa5e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCer
 import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
@@ -511,10 +512,12 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       LOG.error("Fail to create Key Provider");
     }
     if (secConfig.isSecurityEnabled()) {
-      omComponent = OM_DAEMON + "-" + omId;
-      if (omStorage.getOmCertSerialId() == null) {
-        throw new RuntimeException("OzoneManager started in secure mode but " +
-            "doesn't have SCM signed certificate.");
+      if (!secConfig.isCustomCAEnabled()) {
+        omComponent = OM_DAEMON + "-" + omId;
+        if (omStorage.getOmCertSerialId() == null) {
+          throw new RuntimeException("OzoneManager started in secure mode " +
+              "but doesn't have SCM signed certificate.");
+        }
       }
       certClient = new OMCertificateClient(new SecurityConfig(conf),
           omStorage.getOmCertSerialId());
@@ -879,7 +882,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   }
 
   private OzoneBlockTokenSecretManager createBlockTokenSecretManager(
-      OzoneConfiguration conf) {
+      OzoneConfiguration conf) throws SCMSecurityException {
 
     long expiryTime = conf.getTimeDuration(
         HddsConfigKeys.HDDS_BLOCK_TOKEN_EXPIRY_TIME,
@@ -890,8 +893,15 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       return new OzoneBlockTokenSecretManager(secConfig, expiryTime, "1");
     }
     Objects.requireNonNull(certClient);
-    return new OzoneBlockTokenSecretManager(secConfig, expiryTime,
-        certClient.getCertificate().getSerialNumber().toString());
+    OzoneBlockTokenSecretManager ozoneBlockTokenSecretManager = null;
+    try {
+      ozoneBlockTokenSecretManager = new OzoneBlockTokenSecretManager(secConfig,
+          expiryTime, certClient.getCertificate().getSerialNumber().toString());
+    } catch (Exception ex) {
+      throw new SCMSecurityException("Error while getting Certificate of " +
+          "Ozone Manager. Cannot create OzoneBlockTokenSecretManager.", ex);
+    }
+    return ozoneBlockTokenSecretManager;
   }
 
   private void stopSecretManager() {
@@ -1181,7 +1191,12 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     CertificateClient certClient =
         new OMCertificateClient(new SecurityConfig(conf),
             omStore.getOmCertSerialId());
-    CertificateClient.InitResponse response = certClient.init();
+    CertificateClient.InitResponse response;
+    try {
+      response = certClient.init();
+    } catch (Exception ex) {
+      throw new RuntimeException("OM security initialization failed.", ex);
+    }
     LOG.info("Init response: {}", response);
     switch (response) {
     case SUCCESS:
@@ -1390,8 +1405,12 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
 
     // Perform this to make it work with old clients.
     if (certClient != null) {
-      caCertPem =
-          CertificateCodec.getPEMEncodedString(certClient.getCACertificate());
+      // When custom CA is enabled, there is no need to get just CA
+      // certificate. This is intended for SCM root CA.
+      if (!secConfig.isCustomCAEnabled()) {
+        caCertPem =
+            CertificateCodec.getPEMEncodedString(certClient.getCACertificate());
+      }
       caCertPemList = HAUtils.buildCAList(certClient, configuration);
     }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/OzoneDelegationTokenSecretManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/OzoneDelegationTokenSecretManager.java
index 5d34f6a..5a988a5 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/OzoneDelegationTokenSecretManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/OzoneDelegationTokenSecretManager.java
@@ -245,7 +245,8 @@ public class OzoneDelegationTokenSecretManager
    *
    * @param identifier the identifier to validate
    */
-  private void updateIdentifierDetails(OzoneTokenIdentifier identifier) {
+  private void updateIdentifierDetails(OzoneTokenIdentifier identifier)
+      throws IOException {
     int sequenceNum;
     long now = Time.now();
     sequenceNum = incrementDelegationTokenSeqNum();
@@ -260,7 +261,7 @@ public class OzoneDelegationTokenSecretManager
   /**
    * Get OM certificate serial id.
    * */
-  private String getOmCertificateSerialId() {
+  private String getOmCertificateSerialId() throws IOException {
     if (omCertificateSerialId == null) {
       omCertificateSerialId =
           getCertClient().getCertificate().getSerialNumber().toString();

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org