You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2021/11/13 00:52:33 UTC

[GitHub] [cassandra] jonmeredith commented on a change in pull request #1316: CASSANDRA-17031: Add support for PEM formatted key material for the SSL context

jonmeredith commented on a change in pull request #1316:
URL: https://github.com/apache/cassandra/pull/1316#discussion_r748648536



##########
File path: src/java/org/apache/cassandra/security/PEMBasedSslContextFactory.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SslContextFactory for the <a href="">PEM standard</a> encoded PKCS#8 private keys and X509 certificates/public-keys.
+ * It parses the key material based on the standard defined in the <a href="https://datatracker.ietf.org/doc/html/rfc7468">RFC 7468</a>.
+ * It creates <a href="https://datatracker.ietf.org/doc/html/rfc5208">PKCS# 8</a> based private key and X509 certificate(s)
+ * for the public key to build the required keystore and the truststore managers that are used for the SSL context creation.
+ * Internally it builds Java {@link KeyStore} with <a href="https://datatracker.ietf.org/doc/html/rfc7292">PKCS# 12</a> <a href="https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#keystore-types">store type</a>
+ * to be used for keystore and the truststore managers.
+ * <p>
+ * This factory also supports 'hot reloading' of the key material, the same way as defined by {@link FileBasedSslContextFactory},
+ * if it is file based. This factory ignores the existing 'store_type' configuration used for other file based store
+ * types like JKS.
+ * <p>
+ * You can configure this factory with either inline PEM data or with the files having the required PEM data as shown
+ * below,
+ *
+ * <b>Configuration: PEM keys/certs defined inline (mind the spaces in the YAML!)</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *         parameters:
+ *             private_key: |
+ *              -----BEGIN ENCRYPTED PRIVATE KEY----- OR -----BEGIN PRIVATE KEY-----
+ *              <your base64 encoded private key>
+ *              -----END ENCRYPTED PRIVATE KEY----- OR -----END PRIVATE KEY-----
+ *              -----BEGIN CERTIFICATE-----
+ *              <your base64 encoded certificate chain>
+ *              -----END CERTIFICATE-----
+ *
+ *             private_key_password: "<your password if the private key is encrypted with a password>"
+ *
+ *             trusted_certificates: |
+ *               -----BEGIN CERTIFICATE-----
+ *               <your base64 encoded certificate>
+ *               -----END CERTIFICATE-----
+ * </pre>
+ *
+ * <b>Configuration: PEM keys/certs defined in files</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *      keystore: <file path to the keystore file in the PEM format with the private key and the certificate chain>
+ *      keystore_password: "<your password if the private key is encrypted with a password>"
+ *      truststore: <file path to the truststore file in the PEM format>
+ * </pre>
+ */
+public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
+{
+    public static final String DEFAULT_TARGET_STORETYPE = "PKCS12";
+    private static final Logger logger = LoggerFactory.getLogger(PEMBasedSslContextFactory.class);
+    private String targetStoreType;
+    private String pemEncodedKey;
+    private String keyPassword;
+    private String pemEncodedCertificates;
+    private boolean maybeFileBasedPrivateKey;
+    private boolean maybeFileBasedTrustedCertificates;
+
+    public PEMBasedSslContextFactory()
+    {
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+    }
+
+    public PEMBasedSslContextFactory(Map<String, Object> parameters)
+    {
+        super(parameters);
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+        pemEncodedKey = getString(ConfigKey.ENCODED_KEY.getKeyName());
+        keyPassword = getString(ConfigKey.KEY_PASSWORD.getKeyName());
+        if (StringUtils.isEmpty(keyPassword))
+        {
+            keyPassword = keystore_password;
+        }
+        else if (!StringUtils.isEmpty(keystore_password) && !keyPassword.equals(keystore_password))
+        {
+            throw new IllegalArgumentException("'keystore_password' and 'key_password' both configurations are given and the " +
+                                               "values do not match");
+        }
+        else
+        {
+            logger.warn("'keystore_password' and 'key_password' both are configured but since the values match it's " +
+                        "okay. Ideally you should only specify one of them.");
+        }
+
+        if (!StringUtils.isEmpty(truststore_password))
+        {
+            logger.warn("PEM based truststore should not be using password. Ignoring the given value in " +
+                        "'truststore_password' configuration.");
+        }
+
+        pemEncodedCertificates = getString(ConfigKey.ENCODED_CERTIFICATES.getKeyName());
+
+        maybeFileBasedPrivateKey = StringUtils.isEmpty(pemEncodedKey);
+        maybeFileBasedTrustedCertificates = StringUtils.isEmpty(pemEncodedCertificates);

Review comment:
       What is the expected behavior if the user supplies both a `private_key` and a `keystore`? I think it should be mutually exclusive to avoid security sensitive misconfiguration.

##########
File path: src/java/org/apache/cassandra/security/PEMBasedSslContextFactory.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SslContextFactory for the <a href="">PEM standard</a> encoded PKCS#8 private keys and X509 certificates/public-keys.
+ * It parses the key material based on the standard defined in the <a href="https://datatracker.ietf.org/doc/html/rfc7468">RFC 7468</a>.
+ * It creates <a href="https://datatracker.ietf.org/doc/html/rfc5208">PKCS# 8</a> based private key and X509 certificate(s)
+ * for the public key to build the required keystore and the truststore managers that are used for the SSL context creation.
+ * Internally it builds Java {@link KeyStore} with <a href="https://datatracker.ietf.org/doc/html/rfc7292">PKCS# 12</a> <a href="https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#keystore-types">store type</a>
+ * to be used for keystore and the truststore managers.
+ * <p>
+ * This factory also supports 'hot reloading' of the key material, the same way as defined by {@link FileBasedSslContextFactory},
+ * if it is file based. This factory ignores the existing 'store_type' configuration used for other file based store
+ * types like JKS.
+ * <p>
+ * You can configure this factory with either inline PEM data or with the files having the required PEM data as shown
+ * below,
+ *
+ * <b>Configuration: PEM keys/certs defined inline (mind the spaces in the YAML!)</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *         parameters:
+ *             private_key: |
+ *              -----BEGIN ENCRYPTED PRIVATE KEY----- OR -----BEGIN PRIVATE KEY-----
+ *              <your base64 encoded private key>
+ *              -----END ENCRYPTED PRIVATE KEY----- OR -----END PRIVATE KEY-----
+ *              -----BEGIN CERTIFICATE-----
+ *              <your base64 encoded certificate chain>
+ *              -----END CERTIFICATE-----
+ *
+ *             private_key_password: "<your password if the private key is encrypted with a password>"
+ *
+ *             trusted_certificates: |
+ *               -----BEGIN CERTIFICATE-----
+ *               <your base64 encoded certificate>
+ *               -----END CERTIFICATE-----
+ * </pre>
+ *
+ * <b>Configuration: PEM keys/certs defined in files</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *      keystore: <file path to the keystore file in the PEM format with the private key and the certificate chain>
+ *      keystore_password: "<your password if the private key is encrypted with a password>"
+ *      truststore: <file path to the truststore file in the PEM format>
+ * </pre>
+ */
+public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
+{
+    public static final String DEFAULT_TARGET_STORETYPE = "PKCS12";
+    private static final Logger logger = LoggerFactory.getLogger(PEMBasedSslContextFactory.class);
+    private String targetStoreType;
+    private String pemEncodedKey;
+    private String keyPassword;
+    private String pemEncodedCertificates;
+    private boolean maybeFileBasedPrivateKey;
+    private boolean maybeFileBasedTrustedCertificates;
+
+    public PEMBasedSslContextFactory()
+    {
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+    }
+
+    public PEMBasedSslContextFactory(Map<String, Object> parameters)
+    {
+        super(parameters);
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+        pemEncodedKey = getString(ConfigKey.ENCODED_KEY.getKeyName());
+        keyPassword = getString(ConfigKey.KEY_PASSWORD.getKeyName());
+        if (StringUtils.isEmpty(keyPassword))
+        {
+            keyPassword = keystore_password;
+        }
+        else if (!StringUtils.isEmpty(keystore_password) && !keyPassword.equals(keystore_password))
+        {
+            throw new IllegalArgumentException("'keystore_password' and 'key_password' both configurations are given and the " +
+                                               "values do not match");
+        }
+        else
+        {
+            logger.warn("'keystore_password' and 'key_password' both are configured but since the values match it's " +
+                        "okay. Ideally you should only specify one of them.");
+        }
+
+        if (!StringUtils.isEmpty(truststore_password))
+        {
+            logger.warn("PEM based truststore should not be using password. Ignoring the given value in " +
+                        "'truststore_password' configuration.");
+        }
+
+        pemEncodedCertificates = getString(ConfigKey.ENCODED_CERTIFICATES.getKeyName());
+
+        maybeFileBasedPrivateKey = StringUtils.isEmpty(pemEncodedKey);
+        maybeFileBasedTrustedCertificates = StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * Decides if this factory has a keystore defined - key material specified in files or inline to the configuration.
+     *
+     * @return {@code true} if there is a keystore defined; {@code false} otherwise
+     */
+    @Override
+    public boolean hasKeystore()
+    {
+        return maybeFileBasedPrivateKey ? (keystore != null && new File(keystore).exists()) :
+               !StringUtils.isEmpty(pemEncodedKey);
+    }
+
+    /**
+     * Decides if this factory has a truststore defined - key material specified in files or inline to the
+     * configuration.
+     *
+     * @return {@code true} if there is a truststore defined; {@code false} otherwise
+     */
+    private boolean hasTruststore()
+    {
+        return maybeFileBasedTrustedCertificates ? (truststore != null && new File(truststore).exists()) :
+               !StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * This enables 'hot' reloading of the key/trust stores based on the last updated timestamps if they are file based.
+     */
+    @Override
+    public synchronized void initHotReloading()
+    {
+        List<HotReloadableFile> fileList = new ArrayList<>();
+        if (maybeFileBasedPrivateKey && hasKeystore())
+        {
+            fileList.add(new HotReloadableFile(keystore));
+        }
+        if (maybeFileBasedTrustedCertificates && hasTruststore())
+        {
+            fileList.add(new HotReloadableFile(truststore));
+        }
+        if (!fileList.isEmpty())
+        {
+            hotReloadableFiles = fileList;
+        }
+    }
+
+    /**
+     * Builds required KeyManagerFactory from the PEM based keystore. It also checks for the PrivateKey's certificate's
+     * expiry and logs {@code warning} for each expired PrivateKey's certitificate.
+     *
+     * @return KeyManagerFactory built from the PEM based keystore.
+     * @throws SSLException if any issues encountered during the build process
+     */
+    @Override
+    protected KeyManagerFactory buildKeyManagerFactory() throws SSLException
+    {
+        try
+        {
+            if (hasKeystore())
+            {
+                if (maybeFileBasedPrivateKey)
+                {
+                    pemEncodedKey = readPEMFile(keystore); // read PEM from the file
+                }
+
+                KeyManagerFactory kmf = KeyManagerFactory.getInstance(
+                algorithm == null ? KeyManagerFactory.getDefaultAlgorithm() : algorithm);
+                KeyStore ks = buildKeyStore();
+                if (!checkedExpiry)
+                {
+                    checkExpiredCerts(ks);
+                    checkedExpiry = true;
+                }
+                kmf.init(ks, keyPassword != null ? keyPassword.toCharArray() : null);
+                return kmf;
+            }
+            else
+            {
+                throw new SSLException("Can not build KeyManagerFactory in absence of the Private Key");

Review comment:
       Could this be rewritten as something actionable for the end user - `"Must provide keystore or private_key in configuration for PEMBasedSSlContextFactory "`

##########
File path: src/java/org/apache/cassandra/security/PEMBasedSslContextFactory.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SslContextFactory for the <a href="">PEM standard</a> encoded PKCS#8 private keys and X509 certificates/public-keys.
+ * It parses the key material based on the standard defined in the <a href="https://datatracker.ietf.org/doc/html/rfc7468">RFC 7468</a>.
+ * It creates <a href="https://datatracker.ietf.org/doc/html/rfc5208">PKCS# 8</a> based private key and X509 certificate(s)
+ * for the public key to build the required keystore and the truststore managers that are used for the SSL context creation.
+ * Internally it builds Java {@link KeyStore} with <a href="https://datatracker.ietf.org/doc/html/rfc7292">PKCS# 12</a> <a href="https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#keystore-types">store type</a>
+ * to be used for keystore and the truststore managers.
+ * <p>
+ * This factory also supports 'hot reloading' of the key material, the same way as defined by {@link FileBasedSslContextFactory},
+ * if it is file based. This factory ignores the existing 'store_type' configuration used for other file based store
+ * types like JKS.
+ * <p>
+ * You can configure this factory with either inline PEM data or with the files having the required PEM data as shown
+ * below,
+ *
+ * <b>Configuration: PEM keys/certs defined inline (mind the spaces in the YAML!)</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *         parameters:
+ *             private_key: |
+ *              -----BEGIN ENCRYPTED PRIVATE KEY----- OR -----BEGIN PRIVATE KEY-----
+ *              <your base64 encoded private key>
+ *              -----END ENCRYPTED PRIVATE KEY----- OR -----END PRIVATE KEY-----
+ *              -----BEGIN CERTIFICATE-----
+ *              <your base64 encoded certificate chain>
+ *              -----END CERTIFICATE-----
+ *
+ *             private_key_password: "<your password if the private key is encrypted with a password>"
+ *
+ *             trusted_certificates: |
+ *               -----BEGIN CERTIFICATE-----
+ *               <your base64 encoded certificate>
+ *               -----END CERTIFICATE-----
+ * </pre>
+ *
+ * <b>Configuration: PEM keys/certs defined in files</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *      keystore: <file path to the keystore file in the PEM format with the private key and the certificate chain>
+ *      keystore_password: "<your password if the private key is encrypted with a password>"
+ *      truststore: <file path to the truststore file in the PEM format>
+ * </pre>
+ */
+public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
+{
+    public static final String DEFAULT_TARGET_STORETYPE = "PKCS12";
+    private static final Logger logger = LoggerFactory.getLogger(PEMBasedSslContextFactory.class);
+    private String targetStoreType;
+    private String pemEncodedKey;
+    private String keyPassword;
+    private String pemEncodedCertificates;
+    private boolean maybeFileBasedPrivateKey;
+    private boolean maybeFileBasedTrustedCertificates;
+
+    public PEMBasedSslContextFactory()
+    {
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+    }
+
+    public PEMBasedSslContextFactory(Map<String, Object> parameters)
+    {
+        super(parameters);
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+        pemEncodedKey = getString(ConfigKey.ENCODED_KEY.getKeyName());
+        keyPassword = getString(ConfigKey.KEY_PASSWORD.getKeyName());
+        if (StringUtils.isEmpty(keyPassword))
+        {
+            keyPassword = keystore_password;
+        }
+        else if (!StringUtils.isEmpty(keystore_password) && !keyPassword.equals(keystore_password))
+        {
+            throw new IllegalArgumentException("'keystore_password' and 'key_password' both configurations are given and the " +
+                                               "values do not match");
+        }
+        else
+        {
+            logger.warn("'keystore_password' and 'key_password' both are configured but since the values match it's " +
+                        "okay. Ideally you should only specify one of them.");
+        }
+
+        if (!StringUtils.isEmpty(truststore_password))
+        {
+            logger.warn("PEM based truststore should not be using password. Ignoring the given value in " +
+                        "'truststore_password' configuration.");
+        }
+
+        pemEncodedCertificates = getString(ConfigKey.ENCODED_CERTIFICATES.getKeyName());
+
+        maybeFileBasedPrivateKey = StringUtils.isEmpty(pemEncodedKey);
+        maybeFileBasedTrustedCertificates = StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * Decides if this factory has a keystore defined - key material specified in files or inline to the configuration.
+     *
+     * @return {@code true} if there is a keystore defined; {@code false} otherwise
+     */
+    @Override
+    public boolean hasKeystore()
+    {
+        return maybeFileBasedPrivateKey ? (keystore != null && new File(keystore).exists()) :
+               !StringUtils.isEmpty(pemEncodedKey);
+    }
+
+    /**
+     * Decides if this factory has a truststore defined - key material specified in files or inline to the
+     * configuration.
+     *
+     * @return {@code true} if there is a truststore defined; {@code false} otherwise
+     */
+    private boolean hasTruststore()
+    {
+        return maybeFileBasedTrustedCertificates ? (truststore != null && new File(truststore).exists()) :
+               !StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * This enables 'hot' reloading of the key/trust stores based on the last updated timestamps if they are file based.
+     */
+    @Override
+    public synchronized void initHotReloading()
+    {
+        List<HotReloadableFile> fileList = new ArrayList<>();
+        if (maybeFileBasedPrivateKey && hasKeystore())
+        {
+            fileList.add(new HotReloadableFile(keystore));
+        }
+        if (maybeFileBasedTrustedCertificates && hasTruststore())
+        {
+            fileList.add(new HotReloadableFile(truststore));
+        }
+        if (!fileList.isEmpty())
+        {
+            hotReloadableFiles = fileList;
+        }
+    }
+
+    /**
+     * Builds required KeyManagerFactory from the PEM based keystore. It also checks for the PrivateKey's certificate's
+     * expiry and logs {@code warning} for each expired PrivateKey's certitificate.
+     *
+     * @return KeyManagerFactory built from the PEM based keystore.
+     * @throws SSLException if any issues encountered during the build process
+     */
+    @Override
+    protected KeyManagerFactory buildKeyManagerFactory() throws SSLException
+    {
+        try
+        {
+            if (hasKeystore())
+            {
+                if (maybeFileBasedPrivateKey)
+                {
+                    pemEncodedKey = readPEMFile(keystore); // read PEM from the file
+                }
+
+                KeyManagerFactory kmf = KeyManagerFactory.getInstance(
+                algorithm == null ? KeyManagerFactory.getDefaultAlgorithm() : algorithm);
+                KeyStore ks = buildKeyStore();
+                if (!checkedExpiry)
+                {
+                    checkExpiredCerts(ks);
+                    checkedExpiry = true;
+                }
+                kmf.init(ks, keyPassword != null ? keyPassword.toCharArray() : null);
+                return kmf;
+            }
+            else
+            {
+                throw new SSLException("Can not build KeyManagerFactory in absence of the Private Key");
+            }
+        }
+        catch (Exception e)
+        {
+            throw new SSLException("Failed to build key manager store for secure connections", e);
+        }
+    }
+
+    /**
+     * Builds TrustManagerFactory from the PEM based truststore.
+     *
+     * @return TrustManagerFactory from the PEM based truststore
+     * @throws SSLException if any issues encountered during the build process
+     */
+    @Override
+    protected TrustManagerFactory buildTrustManagerFactory() throws SSLException
+    {
+        try
+        {
+            if (hasTruststore())
+            {
+                if (maybeFileBasedTrustedCertificates)
+                {
+                    pemEncodedCertificates = readPEMFile(truststore); // read PEM from the file
+                }
+
+                TrustManagerFactory tmf = TrustManagerFactory.getInstance(
+                algorithm == null ? TrustManagerFactory.getDefaultAlgorithm() : algorithm);
+                KeyStore ts = buildTrustStore();
+                tmf.init(ts);
+                return tmf;
+            }
+            else
+            {
+                throw new SSLException("Can not build KeyManagerFactory in absence of the trusted certificates");
+            }
+        }
+        catch (Exception e)
+        {
+            throw new SSLException("Failed to build trust manager store for secure connections", e);
+        }
+    }
+
+    private String readPEMFile(String file) throws IOException
+    {
+        return new String(Files.readAllBytes(Paths.get(file)));
+    }
+
+    /**
+     * Builds KeyStore object given the {@link #targetStoreType} out of the PEM formatted private key material.
+     * It uses {@code cassandra-ssl-keystore} as the alias for the created key-entry.
+     */
+    private KeyStore buildKeyStore() throws GeneralSecurityException, IOException
+    {
+        boolean encryptedKey = keyPassword != null;
+        char[] keyPasswordArray = encryptedKey ? keyPassword.toCharArray() : null;
+        PrivateKey privateKey = encryptedKey ? PEMReader.extractPrivateKey(pemEncodedKey, keyPassword) :

Review comment:
       Why use the ternary here as `PEMReader.extractPrivateKey(pemEncodedKey)` is only called if keyPassword is `null` and the method calls `PEMReader.extractPrivateKey(pemEncodedKey, null)`

##########
File path: src/java/org/apache/cassandra/security/PEMBasedSslContextFactory.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SslContextFactory for the <a href="">PEM standard</a> encoded PKCS#8 private keys and X509 certificates/public-keys.
+ * It parses the key material based on the standard defined in the <a href="https://datatracker.ietf.org/doc/html/rfc7468">RFC 7468</a>.
+ * It creates <a href="https://datatracker.ietf.org/doc/html/rfc5208">PKCS# 8</a> based private key and X509 certificate(s)
+ * for the public key to build the required keystore and the truststore managers that are used for the SSL context creation.
+ * Internally it builds Java {@link KeyStore} with <a href="https://datatracker.ietf.org/doc/html/rfc7292">PKCS# 12</a> <a href="https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#keystore-types">store type</a>
+ * to be used for keystore and the truststore managers.
+ * <p>
+ * This factory also supports 'hot reloading' of the key material, the same way as defined by {@link FileBasedSslContextFactory},
+ * if it is file based. This factory ignores the existing 'store_type' configuration used for other file based store
+ * types like JKS.
+ * <p>
+ * You can configure this factory with either inline PEM data or with the files having the required PEM data as shown
+ * below,
+ *
+ * <b>Configuration: PEM keys/certs defined inline (mind the spaces in the YAML!)</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *         parameters:
+ *             private_key: |
+ *              -----BEGIN ENCRYPTED PRIVATE KEY----- OR -----BEGIN PRIVATE KEY-----
+ *              <your base64 encoded private key>
+ *              -----END ENCRYPTED PRIVATE KEY----- OR -----END PRIVATE KEY-----
+ *              -----BEGIN CERTIFICATE-----
+ *              <your base64 encoded certificate chain>
+ *              -----END CERTIFICATE-----
+ *
+ *             private_key_password: "<your password if the private key is encrypted with a password>"
+ *
+ *             trusted_certificates: |
+ *               -----BEGIN CERTIFICATE-----
+ *               <your base64 encoded certificate>
+ *               -----END CERTIFICATE-----
+ * </pre>
+ *
+ * <b>Configuration: PEM keys/certs defined in files</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *      keystore: <file path to the keystore file in the PEM format with the private key and the certificate chain>
+ *      keystore_password: "<your password if the private key is encrypted with a password>"
+ *      truststore: <file path to the truststore file in the PEM format>
+ * </pre>
+ */
+public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
+{
+    public static final String DEFAULT_TARGET_STORETYPE = "PKCS12";
+    private static final Logger logger = LoggerFactory.getLogger(PEMBasedSslContextFactory.class);
+    private String targetStoreType;
+    private String pemEncodedKey;
+    private String keyPassword;
+    private String pemEncodedCertificates;
+    private boolean maybeFileBasedPrivateKey;
+    private boolean maybeFileBasedTrustedCertificates;
+
+    public PEMBasedSslContextFactory()
+    {
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+    }
+
+    public PEMBasedSslContextFactory(Map<String, Object> parameters)
+    {
+        super(parameters);
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+        pemEncodedKey = getString(ConfigKey.ENCODED_KEY.getKeyName());
+        keyPassword = getString(ConfigKey.KEY_PASSWORD.getKeyName());
+        if (StringUtils.isEmpty(keyPassword))
+        {
+            keyPassword = keystore_password;
+        }
+        else if (!StringUtils.isEmpty(keystore_password) && !keyPassword.equals(keystore_password))
+        {
+            throw new IllegalArgumentException("'keystore_password' and 'key_password' both configurations are given and the " +
+                                               "values do not match");
+        }
+        else
+        {
+            logger.warn("'keystore_password' and 'key_password' both are configured but since the values match it's " +
+                        "okay. Ideally you should only specify one of them.");
+        }
+
+        if (!StringUtils.isEmpty(truststore_password))
+        {
+            logger.warn("PEM based truststore should not be using password. Ignoring the given value in " +
+                        "'truststore_password' configuration.");
+        }
+
+        pemEncodedCertificates = getString(ConfigKey.ENCODED_CERTIFICATES.getKeyName());
+
+        maybeFileBasedPrivateKey = StringUtils.isEmpty(pemEncodedKey);
+        maybeFileBasedTrustedCertificates = StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * Decides if this factory has a keystore defined - key material specified in files or inline to the configuration.
+     *
+     * @return {@code true} if there is a keystore defined; {@code false} otherwise
+     */
+    @Override
+    public boolean hasKeystore()
+    {
+        return maybeFileBasedPrivateKey ? (keystore != null && new File(keystore).exists()) :
+               !StringUtils.isEmpty(pemEncodedKey);
+    }
+
+    /**
+     * Decides if this factory has a truststore defined - key material specified in files or inline to the
+     * configuration.
+     *
+     * @return {@code true} if there is a truststore defined; {@code false} otherwise
+     */
+    private boolean hasTruststore()
+    {
+        return maybeFileBasedTrustedCertificates ? (truststore != null && new File(truststore).exists()) :
+               !StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * This enables 'hot' reloading of the key/trust stores based on the last updated timestamps if they are file based.
+     */
+    @Override
+    public synchronized void initHotReloading()
+    {
+        List<HotReloadableFile> fileList = new ArrayList<>();
+        if (maybeFileBasedPrivateKey && hasKeystore())
+        {
+            fileList.add(new HotReloadableFile(keystore));
+        }
+        if (maybeFileBasedTrustedCertificates && hasTruststore())
+        {
+            fileList.add(new HotReloadableFile(truststore));
+        }
+        if (!fileList.isEmpty())
+        {
+            hotReloadableFiles = fileList;
+        }
+    }
+
+    /**
+     * Builds required KeyManagerFactory from the PEM based keystore. It also checks for the PrivateKey's certificate's
+     * expiry and logs {@code warning} for each expired PrivateKey's certitificate.
+     *
+     * @return KeyManagerFactory built from the PEM based keystore.
+     * @throws SSLException if any issues encountered during the build process
+     */
+    @Override
+    protected KeyManagerFactory buildKeyManagerFactory() throws SSLException
+    {
+        try
+        {
+            if (hasKeystore())
+            {
+                if (maybeFileBasedPrivateKey)
+                {
+                    pemEncodedKey = readPEMFile(keystore); // read PEM from the file
+                }
+
+                KeyManagerFactory kmf = KeyManagerFactory.getInstance(
+                algorithm == null ? KeyManagerFactory.getDefaultAlgorithm() : algorithm);
+                KeyStore ks = buildKeyStore();
+                if (!checkedExpiry)
+                {
+                    checkExpiredCerts(ks);
+                    checkedExpiry = true;
+                }
+                kmf.init(ks, keyPassword != null ? keyPassword.toCharArray() : null);
+                return kmf;
+            }
+            else
+            {
+                throw new SSLException("Can not build KeyManagerFactory in absence of the Private Key");
+            }
+        }
+        catch (Exception e)
+        {
+            throw new SSLException("Failed to build key manager store for secure connections", e);
+        }
+    }
+
+    /**
+     * Builds TrustManagerFactory from the PEM based truststore.
+     *
+     * @return TrustManagerFactory from the PEM based truststore
+     * @throws SSLException if any issues encountered during the build process
+     */
+    @Override
+    protected TrustManagerFactory buildTrustManagerFactory() throws SSLException
+    {
+        try
+        {
+            if (hasTruststore())
+            {
+                if (maybeFileBasedTrustedCertificates)
+                {
+                    pemEncodedCertificates = readPEMFile(truststore); // read PEM from the file
+                }
+
+                TrustManagerFactory tmf = TrustManagerFactory.getInstance(
+                algorithm == null ? TrustManagerFactory.getDefaultAlgorithm() : algorithm);
+                KeyStore ts = buildTrustStore();
+                tmf.init(ts);
+                return tmf;
+            }
+            else
+            {
+                throw new SSLException("Can not build KeyManagerFactory in absence of the trusted certificates");
+            }
+        }
+        catch (Exception e)
+        {
+            throw new SSLException("Failed to build trust manager store for secure connections", e);
+        }
+    }
+
+    private String readPEMFile(String file) throws IOException
+    {
+        return new String(Files.readAllBytes(Paths.get(file)));
+    }
+
+    /**
+     * Builds KeyStore object given the {@link #targetStoreType} out of the PEM formatted private key material.
+     * It uses {@code cassandra-ssl-keystore} as the alias for the created key-entry.
+     */
+    private KeyStore buildKeyStore() throws GeneralSecurityException, IOException
+    {
+        boolean encryptedKey = keyPassword != null;
+        char[] keyPasswordArray = encryptedKey ? keyPassword.toCharArray() : null;
+        PrivateKey privateKey = encryptedKey ? PEMReader.extractPrivateKey(pemEncodedKey, keyPassword) :
+                                PEMReader.extractPrivateKey(pemEncodedKey);
+        Certificate[] certChainArray = PEMReader.extractCertificates(pemEncodedKey);
+        if (certChainArray == null || certChainArray.length == 0)
+        {
+            throw new SSLException("Could not read any certificates for the certChain for the private key");
+        }
+
+        KeyStore keyStore = KeyStore.getInstance(targetStoreType);
+        keyStore.load(null, null);
+        keyStore.setKeyEntry("cassandra-ssl-keystore", privateKey, keyPasswordArray, certChainArray);
+        return keyStore;
+    }
+
+    /**
+     * Builds KeyStore object given the {@link #targetStoreType} out of the PEM formatted certificates/public-key
+     * material.
+     * <p>
+     * It uses {@code cassandra-ssl-trusted-cert-<numeric-id>} as the alias for the created certificate-entry.
+     */
+    private KeyStore buildTrustStore() throws GeneralSecurityException, IOException
+    {
+        Certificate[] certChainArray = PEMReader.extractCertificates(pemEncodedCertificates);
+        if (certChainArray == null || certChainArray.length == 0)
+        {
+            throw new SSLException("Could not read any certificates from the given PEM");
+        }
+
+        KeyStore keyStore = KeyStore.getInstance(targetStoreType);
+        keyStore.load(null, null);
+        for (int i = 0; i < certChainArray.length; i++)
+        {
+            keyStore.setCertificateEntry("cassandra-ssl-trusted-cert-" + (i + 1), certChainArray[i]);
+        }
+        return keyStore;
+    }
+
+    public enum ConfigKey
+    {
+        ENCODED_KEY("private_key"),
+        KEY_PASSWORD("private_key_password"),
+        ENCODED_CERTIFICATES("trusted_certificates");
+
+        final String keyName;
+
+        ConfigKey(String keyName)
+        {
+            this.keyName = keyName;
+        }
+
+        static Set<String> asSet()
+        {
+            Set<String> valueSet = new HashSet<>();
+            ConfigKey[] values = values();
+            for (ConfigKey key : values)
+            {
+                valueSet.add(key.getKeyName().toLowerCase());
+            }
+            return valueSet;
+        }

Review comment:
       The IDE flagged this as not used.

##########
File path: src/java/org/apache/cassandra/security/PEMBasedSslContextFactory.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SslContextFactory for the <a href="">PEM standard</a> encoded PKCS#8 private keys and X509 certificates/public-keys.
+ * It parses the key material based on the standard defined in the <a href="https://datatracker.ietf.org/doc/html/rfc7468">RFC 7468</a>.
+ * It creates <a href="https://datatracker.ietf.org/doc/html/rfc5208">PKCS# 8</a> based private key and X509 certificate(s)
+ * for the public key to build the required keystore and the truststore managers that are used for the SSL context creation.
+ * Internally it builds Java {@link KeyStore} with <a href="https://datatracker.ietf.org/doc/html/rfc7292">PKCS# 12</a> <a href="https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#keystore-types">store type</a>
+ * to be used for keystore and the truststore managers.
+ * <p>
+ * This factory also supports 'hot reloading' of the key material, the same way as defined by {@link FileBasedSslContextFactory},
+ * if it is file based. This factory ignores the existing 'store_type' configuration used for other file based store
+ * types like JKS.
+ * <p>
+ * You can configure this factory with either inline PEM data or with the files having the required PEM data as shown
+ * below,
+ *
+ * <b>Configuration: PEM keys/certs defined inline (mind the spaces in the YAML!)</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *         parameters:
+ *             private_key: |
+ *              -----BEGIN ENCRYPTED PRIVATE KEY----- OR -----BEGIN PRIVATE KEY-----
+ *              <your base64 encoded private key>
+ *              -----END ENCRYPTED PRIVATE KEY----- OR -----END PRIVATE KEY-----
+ *              -----BEGIN CERTIFICATE-----
+ *              <your base64 encoded certificate chain>
+ *              -----END CERTIFICATE-----
+ *
+ *             private_key_password: "<your password if the private key is encrypted with a password>"
+ *
+ *             trusted_certificates: |
+ *               -----BEGIN CERTIFICATE-----
+ *               <your base64 encoded certificate>
+ *               -----END CERTIFICATE-----
+ * </pre>
+ *
+ * <b>Configuration: PEM keys/certs defined in files</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *      keystore: <file path to the keystore file in the PEM format with the private key and the certificate chain>
+ *      keystore_password: "<your password if the private key is encrypted with a password>"
+ *      truststore: <file path to the truststore file in the PEM format>
+ * </pre>
+ */
+public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
+{
+    public static final String DEFAULT_TARGET_STORETYPE = "PKCS12";
+    private static final Logger logger = LoggerFactory.getLogger(PEMBasedSslContextFactory.class);
+    private String targetStoreType;
+    private String pemEncodedKey;
+    private String keyPassword;
+    private String pemEncodedCertificates;
+    private boolean maybeFileBasedPrivateKey;
+    private boolean maybeFileBasedTrustedCertificates;
+
+    public PEMBasedSslContextFactory()
+    {
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+    }
+
+    public PEMBasedSslContextFactory(Map<String, Object> parameters)
+    {
+        super(parameters);
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+        pemEncodedKey = getString(ConfigKey.ENCODED_KEY.getKeyName());
+        keyPassword = getString(ConfigKey.KEY_PASSWORD.getKeyName());
+        if (StringUtils.isEmpty(keyPassword))
+        {
+            keyPassword = keystore_password;
+        }
+        else if (!StringUtils.isEmpty(keystore_password) && !keyPassword.equals(keystore_password))
+        {
+            throw new IllegalArgumentException("'keystore_password' and 'key_password' both configurations are given and the " +
+                                               "values do not match");
+        }
+        else
+        {
+            logger.warn("'keystore_password' and 'key_password' both are configured but since the values match it's " +
+                        "okay. Ideally you should only specify one of them.");
+        }
+
+        if (!StringUtils.isEmpty(truststore_password))
+        {
+            logger.warn("PEM based truststore should not be using password. Ignoring the given value in " +
+                        "'truststore_password' configuration.");
+        }
+
+        pemEncodedCertificates = getString(ConfigKey.ENCODED_CERTIFICATES.getKeyName());
+
+        maybeFileBasedPrivateKey = StringUtils.isEmpty(pemEncodedKey);
+        maybeFileBasedTrustedCertificates = StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * Decides if this factory has a keystore defined - key material specified in files or inline to the configuration.
+     *
+     * @return {@code true} if there is a keystore defined; {@code false} otherwise
+     */
+    @Override
+    public boolean hasKeystore()
+    {
+        return maybeFileBasedPrivateKey ? (keystore != null && new File(keystore).exists()) :
+               !StringUtils.isEmpty(pemEncodedKey);
+    }
+
+    /**
+     * Decides if this factory has a truststore defined - key material specified in files or inline to the
+     * configuration.
+     *
+     * @return {@code true} if there is a truststore defined; {@code false} otherwise
+     */
+    private boolean hasTruststore()
+    {
+        return maybeFileBasedTrustedCertificates ? (truststore != null && new File(truststore).exists()) :
+               !StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * This enables 'hot' reloading of the key/trust stores based on the last updated timestamps if they are file based.
+     */
+    @Override
+    public synchronized void initHotReloading()
+    {
+        List<HotReloadableFile> fileList = new ArrayList<>();
+        if (maybeFileBasedPrivateKey && hasKeystore())
+        {
+            fileList.add(new HotReloadableFile(keystore));
+        }
+        if (maybeFileBasedTrustedCertificates && hasTruststore())
+        {
+            fileList.add(new HotReloadableFile(truststore));
+        }
+        if (!fileList.isEmpty())
+        {
+            hotReloadableFiles = fileList;
+        }
+    }
+
+    /**
+     * Builds required KeyManagerFactory from the PEM based keystore. It also checks for the PrivateKey's certificate's
+     * expiry and logs {@code warning} for each expired PrivateKey's certitificate.
+     *
+     * @return KeyManagerFactory built from the PEM based keystore.
+     * @throws SSLException if any issues encountered during the build process
+     */
+    @Override
+    protected KeyManagerFactory buildKeyManagerFactory() throws SSLException
+    {
+        try
+        {
+            if (hasKeystore())
+            {
+                if (maybeFileBasedPrivateKey)
+                {
+                    pemEncodedKey = readPEMFile(keystore); // read PEM from the file
+                }
+
+                KeyManagerFactory kmf = KeyManagerFactory.getInstance(
+                algorithm == null ? KeyManagerFactory.getDefaultAlgorithm() : algorithm);
+                KeyStore ks = buildKeyStore();
+                if (!checkedExpiry)
+                {
+                    checkExpiredCerts(ks);
+                    checkedExpiry = true;
+                }
+                kmf.init(ks, keyPassword != null ? keyPassword.toCharArray() : null);
+                return kmf;
+            }
+            else
+            {
+                throw new SSLException("Can not build KeyManagerFactory in absence of the Private Key");
+            }
+        }
+        catch (Exception e)
+        {
+            throw new SSLException("Failed to build key manager store for secure connections", e);

Review comment:
       Maybe add `Check PEMBasedSslContextFactory parameters in configuration.` to help operators, although it could be obvious from the stack trace. The only info that may not be clear is what file it was trying to access. Perhaps it would be better to convert any IOExceptions in `readPEMFile` instead where it could have a more actionable exception and remove this outer try block.

##########
File path: src/java/org/apache/cassandra/security/PEMBasedSslContextFactory.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SslContextFactory for the <a href="">PEM standard</a> encoded PKCS#8 private keys and X509 certificates/public-keys.
+ * It parses the key material based on the standard defined in the <a href="https://datatracker.ietf.org/doc/html/rfc7468">RFC 7468</a>.
+ * It creates <a href="https://datatracker.ietf.org/doc/html/rfc5208">PKCS# 8</a> based private key and X509 certificate(s)
+ * for the public key to build the required keystore and the truststore managers that are used for the SSL context creation.
+ * Internally it builds Java {@link KeyStore} with <a href="https://datatracker.ietf.org/doc/html/rfc7292">PKCS# 12</a> <a href="https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#keystore-types">store type</a>
+ * to be used for keystore and the truststore managers.
+ * <p>
+ * This factory also supports 'hot reloading' of the key material, the same way as defined by {@link FileBasedSslContextFactory},
+ * if it is file based. This factory ignores the existing 'store_type' configuration used for other file based store

Review comment:
       perhaps add some bold to `if it is file based` to underscore that hot reloading only works for the file based case.

##########
File path: src/java/org/apache/cassandra/security/PEMBasedSslContextFactory.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SslContextFactory for the <a href="">PEM standard</a> encoded PKCS#8 private keys and X509 certificates/public-keys.
+ * It parses the key material based on the standard defined in the <a href="https://datatracker.ietf.org/doc/html/rfc7468">RFC 7468</a>.
+ * It creates <a href="https://datatracker.ietf.org/doc/html/rfc5208">PKCS# 8</a> based private key and X509 certificate(s)
+ * for the public key to build the required keystore and the truststore managers that are used for the SSL context creation.
+ * Internally it builds Java {@link KeyStore} with <a href="https://datatracker.ietf.org/doc/html/rfc7292">PKCS# 12</a> <a href="https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#keystore-types">store type</a>
+ * to be used for keystore and the truststore managers.
+ * <p>
+ * This factory also supports 'hot reloading' of the key material, the same way as defined by {@link FileBasedSslContextFactory},
+ * if it is file based. This factory ignores the existing 'store_type' configuration used for other file based store
+ * types like JKS.
+ * <p>
+ * You can configure this factory with either inline PEM data or with the files having the required PEM data as shown
+ * below,
+ *
+ * <b>Configuration: PEM keys/certs defined inline (mind the spaces in the YAML!)</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *         parameters:
+ *             private_key: |
+ *              -----BEGIN ENCRYPTED PRIVATE KEY----- OR -----BEGIN PRIVATE KEY-----
+ *              <your base64 encoded private key>
+ *              -----END ENCRYPTED PRIVATE KEY----- OR -----END PRIVATE KEY-----
+ *              -----BEGIN CERTIFICATE-----
+ *              <your base64 encoded certificate chain>
+ *              -----END CERTIFICATE-----
+ *
+ *             private_key_password: "<your password if the private key is encrypted with a password>"
+ *
+ *             trusted_certificates: |
+ *               -----BEGIN CERTIFICATE-----
+ *               <your base64 encoded certificate>
+ *               -----END CERTIFICATE-----
+ * </pre>
+ *
+ * <b>Configuration: PEM keys/certs defined in files</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *      keystore: <file path to the keystore file in the PEM format with the private key and the certificate chain>
+ *      keystore_password: "<your password if the private key is encrypted with a password>"
+ *      truststore: <file path to the truststore file in the PEM format>
+ * </pre>
+ */
+public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
+{
+    public static final String DEFAULT_TARGET_STORETYPE = "PKCS12";
+    private static final Logger logger = LoggerFactory.getLogger(PEMBasedSslContextFactory.class);
+    private String targetStoreType;

Review comment:
       Why not get rid of this and just use `DEFAULT_TARGET_STORETYPE` as it cannot be configured or overridden.

##########
File path: src/java/org/apache/cassandra/security/PEMReader.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.security.AlgorithmParameters;
+import java.security.GeneralSecurityException;
+import java.security.Key;
+import java.security.KeyFactory;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import java.security.spec.PKCS8EncodedKeySpec;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.crypto.Cipher;
+import javax.crypto.EncryptedPrivateKeyInfo;
+import javax.crypto.SecretKeyFactory;
+import javax.crypto.spec.PBEKeySpec;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a helper class to read private keys and X509 certifificates encoded based on <a href="https://datatracker.ietf.org/doc/html/rfc1421">PEM (RFC 1421)</a>
+ * format. It can read Password Based Encrypted (PBE henceforth) private keys as well as non-encrypred private keys
+ * along with the X509 certificates/cert-chain based on the textual encoding defined in the <a href="https://datatracker.ietf.org/doc/html/rfc7468">RFC 7468</a>
+ *
+ * The input private key must be in PKCS#8 format.
+ *
+ * It returns PKCS#8 formatted private key and X509 certificates.
+ */
+public final class PEMReader
+{
+    private static final Logger logger = LoggerFactory.getLogger(PEMReader.class);
+
+    private static final Pattern CERT_PATTERN = Pattern.compile("-+BEGIN\\s+.*CERTIFICATE[^-]*-+(?:\\s|\\r|\\n)+([a-z0-9+/=\\r\\n]+)-+END\\s+.*CERTIFICATE[^-]*-+", 2);
+    private static final Pattern KEY_PATTERN = Pattern.compile("-+BEGIN\\s+.*PRIVATE\\s+KEY[^-]*-+(?:\\s|\\r|\\n)+([a-z0-9+/=\\r\\n]+)-+END\\s+.*PRIVATE\\s+KEY[^-]*-+", 2);
+
+    /**
+     * The private key can be with any of these algorithms in order for this read to successfully parse it.
+     * Currently supported algorithms are,
+     * <pre>
+     *     RSA, DSA or EC
+     * </pre>
+     * The first one to be evaluated is RSA, being the most common for private keys.
+     */
+    public static final String[] SUPPORTED_PRIVATE_KEY_ALGORITHMS = new String[]{"RSA", "DSA", "EC"};
+
+    /**
+     * Extracts private key from the PEM content for the private key, assuming its not PBE.
+     * @param unencryptedPEMKey private key stored as PEM content
+     * @return {@link PrivateKey} upon successful reading of the private key
+     * @throws IOException in case PEM reading fails
+     * @throws GeneralSecurityException in case any issue encountered while reading the private key
+     */
+    public static PrivateKey extractPrivateKey(String unencryptedPEMKey) throws IOException, GeneralSecurityException
+    {
+        return extractPrivateKey(unencryptedPEMKey, null);
+    }
+
+    /**
+     * Extracts private key from the Password Based Encrypted PEM content for the private key.
+     * @param pemKey PBE private key stored as PEM content
+     * @param keyPassword password to be used for the private key decryption
+     * @return {@link PrivateKey} upon successful reading of the private key
+     * @throws IOException in case PEM reading fails
+     * @throws GeneralSecurityException in case any issue encountered while reading the private key
+     */
+    public static PrivateKey extractPrivateKey(String pemKey, String keyPassword) throws IOException,
+                                                                                         GeneralSecurityException
+    {
+        PKCS8EncodedKeySpec keySpec;
+        String base64EncodedKey = extractBase64EncodedKey(pemKey);
+        byte[] derKeyBytes = Base64.getDecoder().decode(base64EncodedKey);
+
+        if (keyPassword != null)
+        {
+            logger.debug("Encrypted key's length: {}",derKeyBytes.length);
+            logger.debug("Key's password length: {}", keyPassword.length());
+
+            EncryptedPrivateKeyInfo epki = new EncryptedPrivateKeyInfo(derKeyBytes);
+            logger.debug("Encrypted private key info's algorithm name: {}", epki.getAlgName());
+
+            AlgorithmParameters params = epki.getAlgParameters();
+            PBEKeySpec pbeKeySpec = new PBEKeySpec(keyPassword.toCharArray());
+            Key encryptionKey = SecretKeyFactory.getInstance(epki.getAlgName()).generateSecret(pbeKeySpec);
+            pbeKeySpec.clearPassword();
+            logger.debug("Key algorithm: {}", encryptionKey.getAlgorithm());
+            logger.debug("Key format: {}", encryptionKey.getFormat());
+
+            Cipher cipher = Cipher.getInstance(epki.getAlgName());
+            cipher.init(Cipher.DECRYPT_MODE, encryptionKey, params);
+            byte[] rawKeyBytes = cipher.doFinal(epki.getEncryptedData());
+            logger.debug("Decrypted private key's length: {}", rawKeyBytes.length);
+
+            keySpec = new PKCS8EncodedKeySpec(rawKeyBytes);
+        }
+        else
+        {
+            logger.debug("Key length: {}",derKeyBytes.length);
+            keySpec = new PKCS8EncodedKeySpec(derKeyBytes);
+        }
+
+        PrivateKey privateKey = null;
+
+        /*
+         * Ideally we can inspect the OID (Object Identifier) from the private key with ASN.1 parser and identify the
+         * actual algorithm of the private key. For doing that, we have to use some special library like BouncyCastle.
+         * However in the absence of that, below brute-force approach can work- that is to try out all the supported
+         * private key algorithms given that there are only three major algorithms to verify against.
+         */
+        for(int i=0;i<SUPPORTED_PRIVATE_KEY_ALGORITHMS.length;i++)
+        {
+            try
+            {
+                privateKey = KeyFactory.getInstance(SUPPORTED_PRIVATE_KEY_ALGORITHMS[i]).generatePrivate(keySpec);
+                logger.info("Parsing for the private key finished with {} algorithm.",
+                            SUPPORTED_PRIVATE_KEY_ALGORITHMS[i]);
+                return privateKey;
+            }
+            catch(Exception e)
+            {
+                logger.debug("Failed to parse the private key with {} algorithm. Will try the other supported " +
+                             "algorithms.", SUPPORTED_PRIVATE_KEY_ALGORITHMS[i]);
+            }
+        }
+
+       if(privateKey==null)
+       {
+           throw new GeneralSecurityException("The given private key could not be parsed with any of the supported " +
+                                              "algorithms. Please see PEMReader#SUPPORTED_PRIVATE_KEY_ALGORITHMS.");
+       }
+       // Must never come here
+       return null;

Review comment:
       Why the test on `privateKey` - could just always `throw` then you would never return `null`.

##########
File path: src/java/org/apache/cassandra/security/PEMReader.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.security.AlgorithmParameters;
+import java.security.GeneralSecurityException;
+import java.security.Key;
+import java.security.KeyFactory;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import java.security.spec.PKCS8EncodedKeySpec;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.crypto.Cipher;
+import javax.crypto.EncryptedPrivateKeyInfo;
+import javax.crypto.SecretKeyFactory;
+import javax.crypto.spec.PBEKeySpec;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a helper class to read private keys and X509 certifificates encoded based on <a href="https://datatracker.ietf.org/doc/html/rfc1421">PEM (RFC 1421)</a>
+ * format. It can read Password Based Encrypted (PBE henceforth) private keys as well as non-encrypred private keys

Review comment:
       typo: encrypred -> encrypted

##########
File path: src/java/org/apache/cassandra/security/PEMReader.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.security.AlgorithmParameters;
+import java.security.GeneralSecurityException;
+import java.security.Key;
+import java.security.KeyFactory;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import java.security.spec.PKCS8EncodedKeySpec;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.crypto.Cipher;
+import javax.crypto.EncryptedPrivateKeyInfo;
+import javax.crypto.SecretKeyFactory;
+import javax.crypto.spec.PBEKeySpec;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a helper class to read private keys and X509 certifificates encoded based on <a href="https://datatracker.ietf.org/doc/html/rfc1421">PEM (RFC 1421)</a>
+ * format. It can read Password Based Encrypted (PBE henceforth) private keys as well as non-encrypred private keys
+ * along with the X509 certificates/cert-chain based on the textual encoding defined in the <a href="https://datatracker.ietf.org/doc/html/rfc7468">RFC 7468</a>
+ *
+ * The input private key must be in PKCS#8 format.
+ *
+ * It returns PKCS#8 formatted private key and X509 certificates.
+ */
+public final class PEMReader
+{
+    private static final Logger logger = LoggerFactory.getLogger(PEMReader.class);
+
+    private static final Pattern CERT_PATTERN = Pattern.compile("-+BEGIN\\s+.*CERTIFICATE[^-]*-+(?:\\s|\\r|\\n)+([a-z0-9+/=\\r\\n]+)-+END\\s+.*CERTIFICATE[^-]*-+", 2);

Review comment:
       replacing `2` with `CASE_INSENSITIVE` is more readable - is there a reason to use the integer flag?

##########
File path: src/java/org/apache/cassandra/security/PEMReader.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.security.AlgorithmParameters;
+import java.security.GeneralSecurityException;
+import java.security.Key;
+import java.security.KeyFactory;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+import java.security.spec.PKCS8EncodedKeySpec;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.crypto.Cipher;
+import javax.crypto.EncryptedPrivateKeyInfo;
+import javax.crypto.SecretKeyFactory;
+import javax.crypto.spec.PBEKeySpec;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a helper class to read private keys and X509 certifificates encoded based on <a href="https://datatracker.ietf.org/doc/html/rfc1421">PEM (RFC 1421)</a>
+ * format. It can read Password Based Encrypted (PBE henceforth) private keys as well as non-encrypred private keys
+ * along with the X509 certificates/cert-chain based on the textual encoding defined in the <a href="https://datatracker.ietf.org/doc/html/rfc7468">RFC 7468</a>
+ *
+ * The input private key must be in PKCS#8 format.
+ *
+ * It returns PKCS#8 formatted private key and X509 certificates.
+ */
+public final class PEMReader
+{
+    private static final Logger logger = LoggerFactory.getLogger(PEMReader.class);
+
+    private static final Pattern CERT_PATTERN = Pattern.compile("-+BEGIN\\s+.*CERTIFICATE[^-]*-+(?:\\s|\\r|\\n)+([a-z0-9+/=\\r\\n]+)-+END\\s+.*CERTIFICATE[^-]*-+", 2);
+    private static final Pattern KEY_PATTERN = Pattern.compile("-+BEGIN\\s+.*PRIVATE\\s+KEY[^-]*-+(?:\\s|\\r|\\n)+([a-z0-9+/=\\r\\n]+)-+END\\s+.*PRIVATE\\s+KEY[^-]*-+", 2);
+
+    /**
+     * The private key can be with any of these algorithms in order for this read to successfully parse it.
+     * Currently supported algorithms are,
+     * <pre>
+     *     RSA, DSA or EC
+     * </pre>
+     * The first one to be evaluated is RSA, being the most common for private keys.
+     */
+    public static final String[] SUPPORTED_PRIVATE_KEY_ALGORITHMS = new String[]{"RSA", "DSA", "EC"};
+
+    /**
+     * Extracts private key from the PEM content for the private key, assuming its not PBE.
+     * @param unencryptedPEMKey private key stored as PEM content
+     * @return {@link PrivateKey} upon successful reading of the private key
+     * @throws IOException in case PEM reading fails
+     * @throws GeneralSecurityException in case any issue encountered while reading the private key
+     */
+    public static PrivateKey extractPrivateKey(String unencryptedPEMKey) throws IOException, GeneralSecurityException
+    {
+        return extractPrivateKey(unencryptedPEMKey, null);
+    }
+
+    /**
+     * Extracts private key from the Password Based Encrypted PEM content for the private key.
+     * @param pemKey PBE private key stored as PEM content
+     * @param keyPassword password to be used for the private key decryption
+     * @return {@link PrivateKey} upon successful reading of the private key
+     * @throws IOException in case PEM reading fails
+     * @throws GeneralSecurityException in case any issue encountered while reading the private key
+     */
+    public static PrivateKey extractPrivateKey(String pemKey, String keyPassword) throws IOException,
+                                                                                         GeneralSecurityException
+    {
+        PKCS8EncodedKeySpec keySpec;
+        String base64EncodedKey = extractBase64EncodedKey(pemKey);
+        byte[] derKeyBytes = Base64.getDecoder().decode(base64EncodedKey);
+
+        if (keyPassword != null)
+        {
+            logger.debug("Encrypted key's length: {}",derKeyBytes.length);
+            logger.debug("Key's password length: {}", keyPassword.length());
+
+            EncryptedPrivateKeyInfo epki = new EncryptedPrivateKeyInfo(derKeyBytes);
+            logger.debug("Encrypted private key info's algorithm name: {}", epki.getAlgName());
+
+            AlgorithmParameters params = epki.getAlgParameters();
+            PBEKeySpec pbeKeySpec = new PBEKeySpec(keyPassword.toCharArray());
+            Key encryptionKey = SecretKeyFactory.getInstance(epki.getAlgName()).generateSecret(pbeKeySpec);
+            pbeKeySpec.clearPassword();
+            logger.debug("Key algorithm: {}", encryptionKey.getAlgorithm());
+            logger.debug("Key format: {}", encryptionKey.getFormat());
+
+            Cipher cipher = Cipher.getInstance(epki.getAlgName());
+            cipher.init(Cipher.DECRYPT_MODE, encryptionKey, params);
+            byte[] rawKeyBytes = cipher.doFinal(epki.getEncryptedData());
+            logger.debug("Decrypted private key's length: {}", rawKeyBytes.length);
+
+            keySpec = new PKCS8EncodedKeySpec(rawKeyBytes);
+        }
+        else
+        {
+            logger.debug("Key length: {}",derKeyBytes.length);
+            keySpec = new PKCS8EncodedKeySpec(derKeyBytes);
+        }
+
+        PrivateKey privateKey = null;
+
+        /*
+         * Ideally we can inspect the OID (Object Identifier) from the private key with ASN.1 parser and identify the
+         * actual algorithm of the private key. For doing that, we have to use some special library like BouncyCastle.
+         * However in the absence of that, below brute-force approach can work- that is to try out all the supported
+         * private key algorithms given that there are only three major algorithms to verify against.
+         */
+        for(int i=0;i<SUPPORTED_PRIVATE_KEY_ALGORITHMS.length;i++)
+        {
+            try
+            {
+                privateKey = KeyFactory.getInstance(SUPPORTED_PRIVATE_KEY_ALGORITHMS[i]).generatePrivate(keySpec);
+                logger.info("Parsing for the private key finished with {} algorithm.",
+                            SUPPORTED_PRIVATE_KEY_ALGORITHMS[i]);
+                return privateKey;
+            }
+            catch(Exception e)
+            {
+                logger.debug("Failed to parse the private key with {} algorithm. Will try the other supported " +
+                             "algorithms.", SUPPORTED_PRIVATE_KEY_ALGORITHMS[i]);
+            }
+        }
+
+       if(privateKey==null)
+       {
+           throw new GeneralSecurityException("The given private key could not be parsed with any of the supported " +
+                                              "algorithms. Please see PEMReader#SUPPORTED_PRIVATE_KEY_ALGORITHMS.");
+       }
+       // Must never come here
+       return null;
+    }
+
+    /**
+     * Extracts the certificates/cert-chain from the PEM content.
+     * @param pemCerts certificates/cert-chain stored as PEM content
+     * @return X509 certiificate list
+     * @throws GeneralSecurityException in case any issue encountered while reading the certificates
+     */
+    public static Certificate[] extractCertificates(String pemCerts) throws GeneralSecurityException {
+        List<Certificate> certificateList = new ArrayList<>();
+        List<String> base64EncodedCerts = extractBase64EncodedCerts(pemCerts);
+        for (String base64EncodedCertificate : base64EncodedCerts)
+        {
+            certificateList.add(generateCertificate(base64EncodedCertificate));
+        }
+        Certificate[] certificates = certificateList.toArray(new Certificate[0]);
+        return certificates;
+    }
+
+    /**
+     * Generates the X509 certificate object given the base64 encoded PEM content.
+     * @param base64Certificate base64 encoded PEM content for the certificate
+     * @return X509 certificate
+     * @throws GeneralSecurityException in case any issue encountered while reading the certificate
+     */
+    private static Certificate generateCertificate(String base64Certificate) throws GeneralSecurityException {
+        byte[] decodedCertificateBytes = Base64.getDecoder().decode(base64Certificate);
+        CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");
+        X509Certificate certificate =
+        (X509Certificate) certificateFactory.generateCertificate(new ByteArrayInputStream(decodedCertificateBytes));
+        printCertificateDetails(certificate);
+        return certificate;
+    }
+
+    /**
+     * Prints X509 certificate details for the debugging purpose with {@code DEBUG} level log.
+     * Namely, it prints- Subject DN, Issuer DN, Certificate serial number and the certificate expiry date which
+     * could be very valuable for debugging any certificate related issues.
+     *
+     * @param certificate
+     */
+    private static void printCertificateDetails(X509Certificate certificate)

Review comment:
       maybe rename `logCertificateDetails`?

##########
File path: src/java/org/apache/cassandra/security/PEMBasedSslContextFactory.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SslContextFactory for the <a href="">PEM standard</a> encoded PKCS#8 private keys and X509 certificates/public-keys.
+ * It parses the key material based on the standard defined in the <a href="https://datatracker.ietf.org/doc/html/rfc7468">RFC 7468</a>.
+ * It creates <a href="https://datatracker.ietf.org/doc/html/rfc5208">PKCS# 8</a> based private key and X509 certificate(s)
+ * for the public key to build the required keystore and the truststore managers that are used for the SSL context creation.
+ * Internally it builds Java {@link KeyStore} with <a href="https://datatracker.ietf.org/doc/html/rfc7292">PKCS# 12</a> <a href="https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#keystore-types">store type</a>
+ * to be used for keystore and the truststore managers.
+ * <p>
+ * This factory also supports 'hot reloading' of the key material, the same way as defined by {@link FileBasedSslContextFactory},
+ * if it is file based. This factory ignores the existing 'store_type' configuration used for other file based store
+ * types like JKS.
+ * <p>
+ * You can configure this factory with either inline PEM data or with the files having the required PEM data as shown
+ * below,
+ *
+ * <b>Configuration: PEM keys/certs defined inline (mind the spaces in the YAML!)</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *         parameters:
+ *             private_key: |
+ *              -----BEGIN ENCRYPTED PRIVATE KEY----- OR -----BEGIN PRIVATE KEY-----
+ *              <your base64 encoded private key>
+ *              -----END ENCRYPTED PRIVATE KEY----- OR -----END PRIVATE KEY-----
+ *              -----BEGIN CERTIFICATE-----
+ *              <your base64 encoded certificate chain>
+ *              -----END CERTIFICATE-----
+ *
+ *             private_key_password: "<your password if the private key is encrypted with a password>"
+ *
+ *             trusted_certificates: |
+ *               -----BEGIN CERTIFICATE-----
+ *               <your base64 encoded certificate>
+ *               -----END CERTIFICATE-----
+ * </pre>
+ *
+ * <b>Configuration: PEM keys/certs defined in files</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *      keystore: <file path to the keystore file in the PEM format with the private key and the certificate chain>
+ *      keystore_password: "<your password if the private key is encrypted with a password>"
+ *      truststore: <file path to the truststore file in the PEM format>
+ * </pre>
+ */
+public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
+{
+    public static final String DEFAULT_TARGET_STORETYPE = "PKCS12";
+    private static final Logger logger = LoggerFactory.getLogger(PEMBasedSslContextFactory.class);
+    private String targetStoreType;
+    private String pemEncodedKey;
+    private String keyPassword;
+    private String pemEncodedCertificates;
+    private boolean maybeFileBasedPrivateKey;
+    private boolean maybeFileBasedTrustedCertificates;
+
+    public PEMBasedSslContextFactory()
+    {
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+    }
+
+    public PEMBasedSslContextFactory(Map<String, Object> parameters)
+    {
+        super(parameters);
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+        pemEncodedKey = getString(ConfigKey.ENCODED_KEY.getKeyName());
+        keyPassword = getString(ConfigKey.KEY_PASSWORD.getKeyName());
+        if (StringUtils.isEmpty(keyPassword))
+        {
+            keyPassword = keystore_password;
+        }
+        else if (!StringUtils.isEmpty(keystore_password) && !keyPassword.equals(keystore_password))
+        {
+            throw new IllegalArgumentException("'keystore_password' and 'key_password' both configurations are given and the " +
+                                               "values do not match");
+        }
+        else
+        {
+            logger.warn("'keystore_password' and 'key_password' both are configured but since the values match it's " +
+                        "okay. Ideally you should only specify one of them.");
+        }
+
+        if (!StringUtils.isEmpty(truststore_password))
+        {
+            logger.warn("PEM based truststore should not be using password. Ignoring the given value in " +
+                        "'truststore_password' configuration.");
+        }
+
+        pemEncodedCertificates = getString(ConfigKey.ENCODED_CERTIFICATES.getKeyName());
+
+        maybeFileBasedPrivateKey = StringUtils.isEmpty(pemEncodedKey);
+        maybeFileBasedTrustedCertificates = StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * Decides if this factory has a keystore defined - key material specified in files or inline to the configuration.
+     *
+     * @return {@code true} if there is a keystore defined; {@code false} otherwise
+     */
+    @Override
+    public boolean hasKeystore()
+    {
+        return maybeFileBasedPrivateKey ? (keystore != null && new File(keystore).exists()) :
+               !StringUtils.isEmpty(pemEncodedKey);
+    }
+
+    /**
+     * Decides if this factory has a truststore defined - key material specified in files or inline to the
+     * configuration.
+     *
+     * @return {@code true} if there is a truststore defined; {@code false} otherwise
+     */
+    private boolean hasTruststore()
+    {
+        return maybeFileBasedTrustedCertificates ? (truststore != null && new File(truststore).exists()) :
+               !StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * This enables 'hot' reloading of the key/trust stores based on the last updated timestamps if they are file based.
+     */
+    @Override
+    public synchronized void initHotReloading()
+    {
+        List<HotReloadableFile> fileList = new ArrayList<>();
+        if (maybeFileBasedPrivateKey && hasKeystore())
+        {
+            fileList.add(new HotReloadableFile(keystore));
+        }
+        if (maybeFileBasedTrustedCertificates && hasTruststore())
+        {
+            fileList.add(new HotReloadableFile(truststore));
+        }
+        if (!fileList.isEmpty())
+        {
+            hotReloadableFiles = fileList;
+        }
+    }
+
+    /**
+     * Builds required KeyManagerFactory from the PEM based keystore. It also checks for the PrivateKey's certificate's
+     * expiry and logs {@code warning} for each expired PrivateKey's certitificate.
+     *
+     * @return KeyManagerFactory built from the PEM based keystore.
+     * @throws SSLException if any issues encountered during the build process
+     */
+    @Override
+    protected KeyManagerFactory buildKeyManagerFactory() throws SSLException
+    {
+        try
+        {
+            if (hasKeystore())
+            {
+                if (maybeFileBasedPrivateKey)
+                {
+                    pemEncodedKey = readPEMFile(keystore); // read PEM from the file
+                }
+
+                KeyManagerFactory kmf = KeyManagerFactory.getInstance(
+                algorithm == null ? KeyManagerFactory.getDefaultAlgorithm() : algorithm);
+                KeyStore ks = buildKeyStore();
+                if (!checkedExpiry)
+                {
+                    checkExpiredCerts(ks);
+                    checkedExpiry = true;
+                }
+                kmf.init(ks, keyPassword != null ? keyPassword.toCharArray() : null);
+                return kmf;
+            }
+            else
+            {
+                throw new SSLException("Can not build KeyManagerFactory in absence of the Private Key");
+            }
+        }
+        catch (Exception e)
+        {
+            throw new SSLException("Failed to build key manager store for secure connections", e);
+        }
+    }
+
+    /**
+     * Builds TrustManagerFactory from the PEM based truststore.
+     *
+     * @return TrustManagerFactory from the PEM based truststore
+     * @throws SSLException if any issues encountered during the build process
+     */
+    @Override
+    protected TrustManagerFactory buildTrustManagerFactory() throws SSLException
+    {
+        try
+        {
+            if (hasTruststore())
+            {
+                if (maybeFileBasedTrustedCertificates)
+                {
+                    pemEncodedCertificates = readPEMFile(truststore); // read PEM from the file
+                }
+
+                TrustManagerFactory tmf = TrustManagerFactory.getInstance(
+                algorithm == null ? TrustManagerFactory.getDefaultAlgorithm() : algorithm);
+                KeyStore ts = buildTrustStore();
+                tmf.init(ts);
+                return tmf;
+            }
+            else
+            {
+                throw new SSLException("Can not build KeyManagerFactory in absence of the trusted certificates");
+            }
+        }
+        catch (Exception e)
+        {
+            throw new SSLException("Failed to build trust manager store for secure connections", e);

Review comment:
       same comments on messages to operators as the key store




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org