You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2021/11/05 21:34:46 UTC

[GitHub] [cassandra] smiklosovic commented on a change in pull request #1267: CASSANDRA-17031: Add support for PEM formatted key material for the SSL context

smiklosovic commented on a change in pull request #1267:
URL: https://github.com/apache/cassandra/pull/1267#discussion_r743480953



##########
File path: examples/ssl-factory/test/unit/org/apache/cassandra/security/KubernetesSecretsPEMSslContextFactoryTest.java
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.cassandra.security.KubernetesSecretsPEMSslContextFactory.DEFAULT_PRIVATE_KEY_ENV_VAR_NAME;
+import static org.apache.cassandra.security.KubernetesSecretsPEMSslContextFactory.DEFAULT_PRIVATE_KEY_PASSWORD_ENV_VAR_NAME;
+import static org.apache.cassandra.security.KubernetesSecretsPEMSslContextFactory.PEMConfigKey.PRIVATE_KEY_PASSWORD_ENV_VAR;
+import static org.apache.cassandra.security.KubernetesSecretsPEMSslContextFactory.PEMConfigKey.TRUSTED_CERTIFICATE_ENV_VAR;
+import static org.apache.cassandra.security.KubernetesSecretsSslContextFactory.ConfigKeys.KEYSTORE_UPDATED_TIMESTAMP_PATH;
+import static org.apache.cassandra.security.KubernetesSecretsSslContextFactory.ConfigKeys.TRUSTSTORE_UPDATED_TIMESTAMP_PATH;
+
+public class KubernetesSecretsPEMSslContextFactoryTest
+{
+    private static final Logger logger = LoggerFactory.getLogger(KubernetesSecretsPEMSslContextFactoryTest.class);
+
+    private Map<String, Object> commonConfig = new HashMap<>();
+    private final static String truststoreUpdatedTimestampFilepath = "build/test/conf/cassandra_truststore_last_updatedtime";
+    private final static String keystoreUpdatedTimestampFilepath = "build/test/conf/cassandra_keystore_last_updatedtime";
+
+    private static class KubernetesSecretsPEMSslContextFactoryForTestOnly extends KubernetesSecretsPEMSslContextFactory
+    {
+
+        public KubernetesSecretsPEMSslContextFactoryForTestOnly()
+        {
+        }
+
+        public KubernetesSecretsPEMSslContextFactoryForTestOnly(Map<String, Object> config)
+        {
+            super(config);
+        }
+
+        /*
+         * This is overriden to first give priority to the input map configuration since we should not be setting env
+         * variables from the unit tests. However, if the input map configuration doesn't have the value for the
+         * given key then fallback to loading from the real environment variables.
+         */
+        @Override
+        String getValueFromEnv(String envVarName, String defaultValue)
+        {
+            String envVarValue = parameters.get(envVarName) != null ? parameters.get(envVarName).toString() : null;
+            if (StringUtils.isEmpty(envVarValue))
+            {
+                logger.info("Configuration doesn't have env variable {}. Will use parent's implementation", envVarName);
+                return super.getValueFromEnv(envVarName, defaultValue);
+            }
+            else
+            {
+                logger.info("Configuration has env variable {} with value {}. Will use that.",

Review comment:
       env -> environment

##########
File path: src/java/org/apache/cassandra/security/PEMBasedSslContextFactory.java
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SslContextFactory for the <a href="">PEM standard</a> encoded PKCS#8 private keys and X509 certificates/public-keys.
+ * It parses the key material based on the standard defined in the <a href="https://datatracker.ietf.org/doc/html/rfc7468">RFC 7468</a>.
+ * It creates <a href="https://datatracker.ietf.org/doc/html/rfc5208">PKCS# 8</a> based private key and X509 certificate(s)
+ * for the public key to build the required keystore and the truststore managers that are used for the SSL context creation.
+ * Internally it builds Java {@link KeyStore} with <a href="https://datatracker.ietf.org/doc/html/rfc7292">PKCS# 12</a> <a href="https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#keystore-types">store type</a>
+ * to be used for keystore and the truststore managers.
+ *
+ * This factory also supports 'hot reloading' of the key material, the same way as defined by {@link FileBasedSslContextFactory},
+ * if it is file based. This factory ignores the existing 'store_type' configuration used for other file based store
+ * types like JKS.
+ *
+ * You can configure this factory with either inline PEM data or with the files having the required PEM data as shown
+ * below,
+ *
+ * <b>Configuration: PEM keys/certs defined inline (mind the spaces in the YAML!)</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *         parameters:
+ *             private_key: |
+ *              -----BEGIN ENCRYPTED PRIVATE KEY----- OR -----BEGIN PRIVATE KEY-----
+ *              <your base64 encoded private key>
+ *              -----END ENCRYPTED PRIVATE KEY----- OR -----END PRIVATE KEY-----
+ *              -----BEGIN CERTIFICATE-----
+ *              <your base64 encoded certificate chain>
+ *              -----END CERTIFICATE-----
+ *
+ *             private_key_password: "<your password if the private key is encrypted with a password>"
+ *
+ *             trusted_certificates: |
+ *               -----BEGIN CERTIFICATE-----
+ *               <your base64 encoded certificate>
+ *               -----END CERTIFICATE-----
+ * </pre>
+ *
+ * <b>Configuration: PEM keys/certs defined in files</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *      keystore: <file path to the keystore file in the PEM format with the private key and the certificate chain>
+ *      keystore_password: "<your password if the private key is encrypted with a password>"
+ *      truststore: <file path to the truststore file in the PEM format>
+ * </pre>
+ *
+ */
+public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
+{
+    private static final Logger logger = LoggerFactory.getLogger(PEMBasedSslContextFactory.class);
+
+    private String targetStoreType;
+    private String pemEncodedKey;
+    private String keyPassword;
+    private String pemEncodedCertificates;
+    private boolean maybeFileBasedPrivateKey;
+    private boolean maybeFileBasedTrustedCertificates;
+
+    public static final String DEFAULT_TARGET_STORETYPE = "PKCS12";
+
+    public enum ConfigKey
+    {
+        ENCODED_KEY("private_key"),
+        KEY_PASSWORD("private_key_password"),
+        ENCODED_CERTIFICATES("trusted_certificates");
+
+        final String keyName;
+
+        ConfigKey(String keyName)
+        {
+            this.keyName=keyName;

Review comment:
       pay attention to the right formatting, I am not sure if your IDE is formatting it just that way, I see this is the re-occuring issue when I am reviewing your patches. Same for stuff like "for(String s : strings)", there needs to be space between "for" and opening bracket and similar.

##########
File path: src/java/org/apache/cassandra/security/PEMBasedSslContextFactory.java
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SslContextFactory for the <a href="">PEM standard</a> encoded PKCS#8 private keys and X509 certificates/public-keys.
+ * It parses the key material based on the standard defined in the <a href="https://datatracker.ietf.org/doc/html/rfc7468">RFC 7468</a>.
+ * It creates <a href="https://datatracker.ietf.org/doc/html/rfc5208">PKCS# 8</a> based private key and X509 certificate(s)
+ * for the public key to build the required keystore and the truststore managers that are used for the SSL context creation.
+ * Internally it builds Java {@link KeyStore} with <a href="https://datatracker.ietf.org/doc/html/rfc7292">PKCS# 12</a> <a href="https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#keystore-types">store type</a>
+ * to be used for keystore and the truststore managers.
+ *
+ * This factory also supports 'hot reloading' of the key material, the same way as defined by {@link FileBasedSslContextFactory},
+ * if it is file based. This factory ignores the existing 'store_type' configuration used for other file based store
+ * types like JKS.
+ *
+ * You can configure this factory with either inline PEM data or with the files having the required PEM data as shown
+ * below,
+ *
+ * <b>Configuration: PEM keys/certs defined inline (mind the spaces in the YAML!)</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *         parameters:
+ *             private_key: |
+ *              -----BEGIN ENCRYPTED PRIVATE KEY----- OR -----BEGIN PRIVATE KEY-----
+ *              <your base64 encoded private key>
+ *              -----END ENCRYPTED PRIVATE KEY----- OR -----END PRIVATE KEY-----
+ *              -----BEGIN CERTIFICATE-----
+ *              <your base64 encoded certificate chain>
+ *              -----END CERTIFICATE-----
+ *
+ *             private_key_password: "<your password if the private key is encrypted with a password>"
+ *
+ *             trusted_certificates: |
+ *               -----BEGIN CERTIFICATE-----
+ *               <your base64 encoded certificate>
+ *               -----END CERTIFICATE-----
+ * </pre>
+ *
+ * <b>Configuration: PEM keys/certs defined in files</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *      keystore: <file path to the keystore file in the PEM format with the private key and the certificate chain>
+ *      keystore_password: "<your password if the private key is encrypted with a password>"
+ *      truststore: <file path to the truststore file in the PEM format>
+ * </pre>
+ *
+ */
+public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
+{
+    private static final Logger logger = LoggerFactory.getLogger(PEMBasedSslContextFactory.class);
+
+    private String targetStoreType;
+    private String pemEncodedKey;
+    private String keyPassword;
+    private String pemEncodedCertificates;
+    private boolean maybeFileBasedPrivateKey;
+    private boolean maybeFileBasedTrustedCertificates;
+
+    public static final String DEFAULT_TARGET_STORETYPE = "PKCS12";
+
+    public enum ConfigKey
+    {
+        ENCODED_KEY("private_key"),
+        KEY_PASSWORD("private_key_password"),
+        ENCODED_CERTIFICATES("trusted_certificates");
+
+        final String keyName;
+
+        ConfigKey(String keyName)
+        {
+            this.keyName=keyName;
+        }
+
+        String getKeyName()
+        {
+            return keyName;
+        }
+
+        static Set<String> asSet()
+        {
+            Set<String> valueSet = new HashSet<>();
+            ConfigKey[] values = values();
+            for(ConfigKey key: values) {

Review comment:
       see?

##########
File path: src/java/org/apache/cassandra/security/PEMBasedSslContextFactory.java
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SslContextFactory for the <a href="">PEM standard</a> encoded PKCS#8 private keys and X509 certificates/public-keys.
+ * It parses the key material based on the standard defined in the <a href="https://datatracker.ietf.org/doc/html/rfc7468">RFC 7468</a>.
+ * It creates <a href="https://datatracker.ietf.org/doc/html/rfc5208">PKCS# 8</a> based private key and X509 certificate(s)
+ * for the public key to build the required keystore and the truststore managers that are used for the SSL context creation.
+ * Internally it builds Java {@link KeyStore} with <a href="https://datatracker.ietf.org/doc/html/rfc7292">PKCS# 12</a> <a href="https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#keystore-types">store type</a>
+ * to be used for keystore and the truststore managers.
+ *
+ * This factory also supports 'hot reloading' of the key material, the same way as defined by {@link FileBasedSslContextFactory},
+ * if it is file based. This factory ignores the existing 'store_type' configuration used for other file based store
+ * types like JKS.
+ *
+ * You can configure this factory with either inline PEM data or with the files having the required PEM data as shown
+ * below,
+ *
+ * <b>Configuration: PEM keys/certs defined inline (mind the spaces in the YAML!)</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *         parameters:
+ *             private_key: |
+ *              -----BEGIN ENCRYPTED PRIVATE KEY----- OR -----BEGIN PRIVATE KEY-----
+ *              <your base64 encoded private key>
+ *              -----END ENCRYPTED PRIVATE KEY----- OR -----END PRIVATE KEY-----
+ *              -----BEGIN CERTIFICATE-----
+ *              <your base64 encoded certificate chain>
+ *              -----END CERTIFICATE-----
+ *
+ *             private_key_password: "<your password if the private key is encrypted with a password>"
+ *
+ *             trusted_certificates: |
+ *               -----BEGIN CERTIFICATE-----
+ *               <your base64 encoded certificate>
+ *               -----END CERTIFICATE-----
+ * </pre>
+ *
+ * <b>Configuration: PEM keys/certs defined in files</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *      keystore: <file path to the keystore file in the PEM format with the private key and the certificate chain>
+ *      keystore_password: "<your password if the private key is encrypted with a password>"
+ *      truststore: <file path to the truststore file in the PEM format>
+ * </pre>
+ *
+ */
+public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
+{
+    private static final Logger logger = LoggerFactory.getLogger(PEMBasedSslContextFactory.class);
+
+    private String targetStoreType;
+    private String pemEncodedKey;
+    private String keyPassword;
+    private String pemEncodedCertificates;
+    private boolean maybeFileBasedPrivateKey;
+    private boolean maybeFileBasedTrustedCertificates;
+
+    public static final String DEFAULT_TARGET_STORETYPE = "PKCS12";
+
+    public enum ConfigKey
+    {
+        ENCODED_KEY("private_key"),
+        KEY_PASSWORD("private_key_password"),
+        ENCODED_CERTIFICATES("trusted_certificates");
+
+        final String keyName;
+
+        ConfigKey(String keyName)
+        {
+            this.keyName=keyName;
+        }
+
+        String getKeyName()
+        {
+            return keyName;
+        }
+
+        static Set<String> asSet()
+        {
+            Set<String> valueSet = new HashSet<>();
+            ConfigKey[] values = values();
+            for(ConfigKey key: values) {
+                valueSet.add(key.getKeyName().toLowerCase());
+            }
+            return valueSet;
+        }
+    }
+
+    public PEMBasedSslContextFactory()
+    {
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+    }
+
+    public PEMBasedSslContextFactory(Map<String, Object> parameters)
+    {
+        super(parameters);
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+        pemEncodedKey = getString(ConfigKey.ENCODED_KEY.getKeyName());
+        keyPassword = getString(ConfigKey.KEY_PASSWORD.getKeyName());
+        if (StringUtils.isEmpty(keyPassword))
+        {
+            keyPassword = keystore_password;
+        }
+        else if (!StringUtils.isEmpty(keystore_password) && !keyPassword.equals(keystore_password))
+        {
+            throw new IllegalArgumentException("'keystore_password' and 'key_password' both configurations are given and the " +
+                                   "values don't match");
+        }
+        else
+        {
+            logger.warn("'keystore_password' and 'key_password' both are configured but since the values match it's " +
+                        "okay. Ideally you should only specify one of them.");
+        }
+
+        if (!StringUtils.isEmpty(truststore_password))
+        {
+            logger.warn("PEM based truststore should not be using password. Ignoring the given value in " +
+                        "'truststore_password' configuration.");
+        }
+
+        pemEncodedCertificates = getString(ConfigKey.ENCODED_CERTIFICATES.getKeyName());
+
+        maybeFileBasedPrivateKey = StringUtils.isEmpty(pemEncodedKey);
+        maybeFileBasedTrustedCertificates = StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * Decides if this factory has a keystore defined - key material specified in files or inline to the configuration.
+     * @return {@code true} if there is a keystore defined; {@code false} otherwise
+     */
+    @Override
+    public boolean hasKeystore()
+    {
+        return maybeFileBasedPrivateKey ? (keystore != null && new File(keystore).exists()) :
+               !StringUtils.isEmpty(pemEncodedKey);
+    }
+
+    /**
+     * Decides if this factory has a truststore defined - key material specified in files or inline to the
+     * configuration.
+     * @return {@code true} if there is a truststore defined; {@code false} otherwise
+     */
+    private boolean hasTruststore()
+    {
+        return maybeFileBasedTrustedCertificates ? (truststore != null && new File(truststore).exists()) :
+               !StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * This enables 'hot' reloading of the key/trust stores based on the last updated timestamps if they are file based.
+     */
+    @Override
+    public synchronized void initHotReloading()
+    {
+        List<HotReloadableFile> fileList = new ArrayList<>();
+        if (maybeFileBasedPrivateKey && hasKeystore())
+        {
+            fileList.add(new HotReloadableFile(keystore));
+        }
+        if (maybeFileBasedTrustedCertificates && hasTruststore())
+        {
+            fileList.add(new HotReloadableFile(truststore));
+        }
+        if (!fileList.isEmpty())
+        {
+            hotReloadableFiles = fileList;
+        }
+    }
+
+    /**
+     * Builds required KeyManagerFactory from the PEM based keystore. It also checks for the PrivateKey's certificate's
+     * expiry and logs {@code warning} for each expired PrivateKey's certitificate.
+     *
+     * @return KeyManagerFactory built from the PEM based keystore.
+     * @throws SSLException if any issues encountered during the build process
+     */
+    @Override
+    protected KeyManagerFactory buildKeyManagerFactory() throws SSLException
+    {
+        try
+        {
+            if (hasKeystore())
+            {
+                if (maybeFileBasedPrivateKey)
+                {
+                    pemEncodedKey = readPEMFile(keystore); // read PEM from the file
+                }
+
+                KeyManagerFactory kmf = KeyManagerFactory.getInstance(
+                algorithm == null ? KeyManagerFactory.getDefaultAlgorithm() : algorithm);
+                KeyStore ks = buildKeyStore();
+                if (!checkedExpiry)
+                {
+                    checkExpiredCerts(ks);
+                    checkedExpiry = true;
+                }
+                kmf.init(ks, keyPassword != null ? keyPassword.toCharArray() : null);
+                return kmf;
+            }
+            else
+            {
+                throw new SSLException("Can't build KeyManagerFactory in absence of the Private Key");
+            }
+        } catch(Exception e) {

Review comment:
       space

##########
File path: src/java/org/apache/cassandra/security/PEMBasedSslContextFactory.java
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SslContextFactory for the <a href="">PEM standard</a> encoded PKCS#8 private keys and X509 certificates/public-keys.
+ * It parses the key material based on the standard defined in the <a href="https://datatracker.ietf.org/doc/html/rfc7468">RFC 7468</a>.
+ * It creates <a href="https://datatracker.ietf.org/doc/html/rfc5208">PKCS# 8</a> based private key and X509 certificate(s)
+ * for the public key to build the required keystore and the truststore managers that are used for the SSL context creation.
+ * Internally it builds Java {@link KeyStore} with <a href="https://datatracker.ietf.org/doc/html/rfc7292">PKCS# 12</a> <a href="https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#keystore-types">store type</a>
+ * to be used for keystore and the truststore managers.
+ *
+ * This factory also supports 'hot reloading' of the key material, the same way as defined by {@link FileBasedSslContextFactory},
+ * if it is file based. This factory ignores the existing 'store_type' configuration used for other file based store
+ * types like JKS.
+ *
+ * You can configure this factory with either inline PEM data or with the files having the required PEM data as shown
+ * below,
+ *
+ * <b>Configuration: PEM keys/certs defined inline (mind the spaces in the YAML!)</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *         parameters:
+ *             private_key: |
+ *              -----BEGIN ENCRYPTED PRIVATE KEY----- OR -----BEGIN PRIVATE KEY-----
+ *              <your base64 encoded private key>
+ *              -----END ENCRYPTED PRIVATE KEY----- OR -----END PRIVATE KEY-----
+ *              -----BEGIN CERTIFICATE-----
+ *              <your base64 encoded certificate chain>
+ *              -----END CERTIFICATE-----
+ *
+ *             private_key_password: "<your password if the private key is encrypted with a password>"
+ *
+ *             trusted_certificates: |
+ *               -----BEGIN CERTIFICATE-----
+ *               <your base64 encoded certificate>
+ *               -----END CERTIFICATE-----
+ * </pre>
+ *
+ * <b>Configuration: PEM keys/certs defined in files</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *      keystore: <file path to the keystore file in the PEM format with the private key and the certificate chain>
+ *      keystore_password: "<your password if the private key is encrypted with a password>"
+ *      truststore: <file path to the truststore file in the PEM format>
+ * </pre>
+ *
+ */
+public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
+{
+    private static final Logger logger = LoggerFactory.getLogger(PEMBasedSslContextFactory.class);
+
+    private String targetStoreType;
+    private String pemEncodedKey;
+    private String keyPassword;
+    private String pemEncodedCertificates;
+    private boolean maybeFileBasedPrivateKey;
+    private boolean maybeFileBasedTrustedCertificates;
+
+    public static final String DEFAULT_TARGET_STORETYPE = "PKCS12";
+
+    public enum ConfigKey
+    {
+        ENCODED_KEY("private_key"),
+        KEY_PASSWORD("private_key_password"),
+        ENCODED_CERTIFICATES("trusted_certificates");
+
+        final String keyName;
+
+        ConfigKey(String keyName)
+        {
+            this.keyName=keyName;
+        }
+
+        String getKeyName()
+        {
+            return keyName;
+        }
+
+        static Set<String> asSet()
+        {
+            Set<String> valueSet = new HashSet<>();
+            ConfigKey[] values = values();
+            for(ConfigKey key: values) {
+                valueSet.add(key.getKeyName().toLowerCase());
+            }
+            return valueSet;
+        }
+    }
+
+    public PEMBasedSslContextFactory()
+    {
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+    }
+
+    public PEMBasedSslContextFactory(Map<String, Object> parameters)
+    {
+        super(parameters);
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+        pemEncodedKey = getString(ConfigKey.ENCODED_KEY.getKeyName());
+        keyPassword = getString(ConfigKey.KEY_PASSWORD.getKeyName());
+        if (StringUtils.isEmpty(keyPassword))
+        {
+            keyPassword = keystore_password;
+        }
+        else if (!StringUtils.isEmpty(keystore_password) && !keyPassword.equals(keystore_password))
+        {
+            throw new IllegalArgumentException("'keystore_password' and 'key_password' both configurations are given and the " +
+                                   "values don't match");
+        }
+        else
+        {
+            logger.warn("'keystore_password' and 'key_password' both are configured but since the values match it's " +
+                        "okay. Ideally you should only specify one of them.");
+        }
+
+        if (!StringUtils.isEmpty(truststore_password))
+        {
+            logger.warn("PEM based truststore should not be using password. Ignoring the given value in " +
+                        "'truststore_password' configuration.");
+        }
+
+        pemEncodedCertificates = getString(ConfigKey.ENCODED_CERTIFICATES.getKeyName());
+
+        maybeFileBasedPrivateKey = StringUtils.isEmpty(pemEncodedKey);
+        maybeFileBasedTrustedCertificates = StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * Decides if this factory has a keystore defined - key material specified in files or inline to the configuration.
+     * @return {@code true} if there is a keystore defined; {@code false} otherwise
+     */
+    @Override
+    public boolean hasKeystore()
+    {
+        return maybeFileBasedPrivateKey ? (keystore != null && new File(keystore).exists()) :
+               !StringUtils.isEmpty(pemEncodedKey);
+    }
+
+    /**
+     * Decides if this factory has a truststore defined - key material specified in files or inline to the
+     * configuration.
+     * @return {@code true} if there is a truststore defined; {@code false} otherwise
+     */
+    private boolean hasTruststore()
+    {
+        return maybeFileBasedTrustedCertificates ? (truststore != null && new File(truststore).exists()) :
+               !StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * This enables 'hot' reloading of the key/trust stores based on the last updated timestamps if they are file based.
+     */
+    @Override
+    public synchronized void initHotReloading()
+    {
+        List<HotReloadableFile> fileList = new ArrayList<>();
+        if (maybeFileBasedPrivateKey && hasKeystore())
+        {
+            fileList.add(new HotReloadableFile(keystore));
+        }
+        if (maybeFileBasedTrustedCertificates && hasTruststore())
+        {
+            fileList.add(new HotReloadableFile(truststore));
+        }
+        if (!fileList.isEmpty())
+        {
+            hotReloadableFiles = fileList;
+        }
+    }
+
+    /**
+     * Builds required KeyManagerFactory from the PEM based keystore. It also checks for the PrivateKey's certificate's
+     * expiry and logs {@code warning} for each expired PrivateKey's certitificate.
+     *
+     * @return KeyManagerFactory built from the PEM based keystore.
+     * @throws SSLException if any issues encountered during the build process
+     */
+    @Override
+    protected KeyManagerFactory buildKeyManagerFactory() throws SSLException
+    {
+        try
+        {
+            if (hasKeystore())
+            {
+                if (maybeFileBasedPrivateKey)
+                {
+                    pemEncodedKey = readPEMFile(keystore); // read PEM from the file
+                }
+
+                KeyManagerFactory kmf = KeyManagerFactory.getInstance(
+                algorithm == null ? KeyManagerFactory.getDefaultAlgorithm() : algorithm);
+                KeyStore ks = buildKeyStore();
+                if (!checkedExpiry)
+                {
+                    checkExpiredCerts(ks);
+                    checkedExpiry = true;
+                }
+                kmf.init(ks, keyPassword != null ? keyPassword.toCharArray() : null);
+                return kmf;
+            }
+            else
+            {
+                throw new SSLException("Can't build KeyManagerFactory in absence of the Private Key");
+            }
+        } catch(Exception e) {
+            throw new SSLException("failed to build key manager store for secure connections", e);
+        }
+    }
+
+    /**
+     * Builds TrustManagerFactory from the PEM based truststore.
+     *
+     * @return TrustManagerFactory from the PEM based truststore
+     * @throws SSLException if any issues encountered during the build process
+     */
+    @Override
+    protected TrustManagerFactory buildTrustManagerFactory() throws SSLException
+    {
+        try
+        {
+            if (hasTruststore())
+            {
+                if (maybeFileBasedTrustedCertificates)
+                {
+                    pemEncodedCertificates = readPEMFile(truststore); // read PEM from the file
+                }
+
+                TrustManagerFactory tmf = TrustManagerFactory.getInstance(
+                algorithm == null ? TrustManagerFactory.getDefaultAlgorithm() : algorithm);
+                KeyStore ts = buildTrustStore();
+                tmf.init(ts);
+                return tmf;
+            }
+            else
+            {
+                throw new SSLException("Can't build KeyManagerFactory in absence of the trusted certificates");
+            }
+        } catch(Exception e) {
+            throw new SSLException("failed to build trust manager store for secure connections", e);
+        }
+    }
+
+    private String readPEMFile(String file) throws IOException

Review comment:
       cant be the logic in this method just replaced by this?
   
   ```
   return new String(Files.readAllBytes(Paths.get(file)));
   ```
   
   I would also catch the exception and logged it what happened and then rethrow if necessary.

##########
File path: src/java/org/apache/cassandra/security/PEMBasedSslContextFactory.java
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SslContextFactory for the <a href="">PEM standard</a> encoded PKCS#8 private keys and X509 certificates/public-keys.
+ * It parses the key material based on the standard defined in the <a href="https://datatracker.ietf.org/doc/html/rfc7468">RFC 7468</a>.
+ * It creates <a href="https://datatracker.ietf.org/doc/html/rfc5208">PKCS# 8</a> based private key and X509 certificate(s)
+ * for the public key to build the required keystore and the truststore managers that are used for the SSL context creation.
+ * Internally it builds Java {@link KeyStore} with <a href="https://datatracker.ietf.org/doc/html/rfc7292">PKCS# 12</a> <a href="https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#keystore-types">store type</a>
+ * to be used for keystore and the truststore managers.
+ *
+ * This factory also supports 'hot reloading' of the key material, the same way as defined by {@link FileBasedSslContextFactory},
+ * if it is file based. This factory ignores the existing 'store_type' configuration used for other file based store
+ * types like JKS.
+ *
+ * You can configure this factory with either inline PEM data or with the files having the required PEM data as shown
+ * below,
+ *
+ * <b>Configuration: PEM keys/certs defined inline (mind the spaces in the YAML!)</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *         parameters:
+ *             private_key: |
+ *              -----BEGIN ENCRYPTED PRIVATE KEY----- OR -----BEGIN PRIVATE KEY-----
+ *              <your base64 encoded private key>
+ *              -----END ENCRYPTED PRIVATE KEY----- OR -----END PRIVATE KEY-----
+ *              -----BEGIN CERTIFICATE-----
+ *              <your base64 encoded certificate chain>
+ *              -----END CERTIFICATE-----
+ *
+ *             private_key_password: "<your password if the private key is encrypted with a password>"
+ *
+ *             trusted_certificates: |
+ *               -----BEGIN CERTIFICATE-----
+ *               <your base64 encoded certificate>
+ *               -----END CERTIFICATE-----
+ * </pre>
+ *
+ * <b>Configuration: PEM keys/certs defined in files</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *      keystore: <file path to the keystore file in the PEM format with the private key and the certificate chain>
+ *      keystore_password: "<your password if the private key is encrypted with a password>"
+ *      truststore: <file path to the truststore file in the PEM format>
+ * </pre>
+ *
+ */
+public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
+{
+    private static final Logger logger = LoggerFactory.getLogger(PEMBasedSslContextFactory.class);
+
+    private String targetStoreType;
+    private String pemEncodedKey;
+    private String keyPassword;
+    private String pemEncodedCertificates;
+    private boolean maybeFileBasedPrivateKey;
+    private boolean maybeFileBasedTrustedCertificates;
+
+    public static final String DEFAULT_TARGET_STORETYPE = "PKCS12";
+
+    public enum ConfigKey
+    {
+        ENCODED_KEY("private_key"),
+        KEY_PASSWORD("private_key_password"),
+        ENCODED_CERTIFICATES("trusted_certificates");
+
+        final String keyName;
+
+        ConfigKey(String keyName)
+        {
+            this.keyName=keyName;
+        }
+
+        String getKeyName()
+        {
+            return keyName;
+        }
+
+        static Set<String> asSet()
+        {
+            Set<String> valueSet = new HashSet<>();
+            ConfigKey[] values = values();
+            for(ConfigKey key: values) {
+                valueSet.add(key.getKeyName().toLowerCase());
+            }
+            return valueSet;
+        }
+    }
+
+    public PEMBasedSslContextFactory()
+    {
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+    }
+
+    public PEMBasedSslContextFactory(Map<String, Object> parameters)
+    {
+        super(parameters);
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+        pemEncodedKey = getString(ConfigKey.ENCODED_KEY.getKeyName());
+        keyPassword = getString(ConfigKey.KEY_PASSWORD.getKeyName());
+        if (StringUtils.isEmpty(keyPassword))
+        {
+            keyPassword = keystore_password;
+        }
+        else if (!StringUtils.isEmpty(keystore_password) && !keyPassword.equals(keystore_password))
+        {
+            throw new IllegalArgumentException("'keystore_password' and 'key_password' both configurations are given and the " +
+                                   "values don't match");
+        }
+        else
+        {
+            logger.warn("'keystore_password' and 'key_password' both are configured but since the values match it's " +
+                        "okay. Ideally you should only specify one of them.");
+        }
+
+        if (!StringUtils.isEmpty(truststore_password))
+        {
+            logger.warn("PEM based truststore should not be using password. Ignoring the given value in " +
+                        "'truststore_password' configuration.");
+        }
+
+        pemEncodedCertificates = getString(ConfigKey.ENCODED_CERTIFICATES.getKeyName());
+
+        maybeFileBasedPrivateKey = StringUtils.isEmpty(pemEncodedKey);
+        maybeFileBasedTrustedCertificates = StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * Decides if this factory has a keystore defined - key material specified in files or inline to the configuration.
+     * @return {@code true} if there is a keystore defined; {@code false} otherwise
+     */
+    @Override
+    public boolean hasKeystore()
+    {
+        return maybeFileBasedPrivateKey ? (keystore != null && new File(keystore).exists()) :
+               !StringUtils.isEmpty(pemEncodedKey);
+    }
+
+    /**
+     * Decides if this factory has a truststore defined - key material specified in files or inline to the
+     * configuration.
+     * @return {@code true} if there is a truststore defined; {@code false} otherwise
+     */
+    private boolean hasTruststore()
+    {
+        return maybeFileBasedTrustedCertificates ? (truststore != null && new File(truststore).exists()) :
+               !StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * This enables 'hot' reloading of the key/trust stores based on the last updated timestamps if they are file based.
+     */
+    @Override
+    public synchronized void initHotReloading()
+    {
+        List<HotReloadableFile> fileList = new ArrayList<>();
+        if (maybeFileBasedPrivateKey && hasKeystore())
+        {
+            fileList.add(new HotReloadableFile(keystore));
+        }
+        if (maybeFileBasedTrustedCertificates && hasTruststore())
+        {
+            fileList.add(new HotReloadableFile(truststore));
+        }
+        if (!fileList.isEmpty())
+        {
+            hotReloadableFiles = fileList;
+        }
+    }
+
+    /**
+     * Builds required KeyManagerFactory from the PEM based keystore. It also checks for the PrivateKey's certificate's
+     * expiry and logs {@code warning} for each expired PrivateKey's certitificate.
+     *
+     * @return KeyManagerFactory built from the PEM based keystore.
+     * @throws SSLException if any issues encountered during the build process
+     */
+    @Override
+    protected KeyManagerFactory buildKeyManagerFactory() throws SSLException
+    {
+        try
+        {
+            if (hasKeystore())
+            {
+                if (maybeFileBasedPrivateKey)
+                {
+                    pemEncodedKey = readPEMFile(keystore); // read PEM from the file
+                }
+
+                KeyManagerFactory kmf = KeyManagerFactory.getInstance(
+                algorithm == null ? KeyManagerFactory.getDefaultAlgorithm() : algorithm);
+                KeyStore ks = buildKeyStore();
+                if (!checkedExpiry)
+                {
+                    checkExpiredCerts(ks);
+                    checkedExpiry = true;
+                }
+                kmf.init(ks, keyPassword != null ? keyPassword.toCharArray() : null);
+                return kmf;
+            }
+            else
+            {
+                throw new SSLException("Can't build KeyManagerFactory in absence of the Private Key");
+            }
+        } catch(Exception e) {
+            throw new SSLException("failed to build key manager store for secure connections", e);
+        }
+    }
+
+    /**
+     * Builds TrustManagerFactory from the PEM based truststore.
+     *
+     * @return TrustManagerFactory from the PEM based truststore
+     * @throws SSLException if any issues encountered during the build process
+     */
+    @Override
+    protected TrustManagerFactory buildTrustManagerFactory() throws SSLException
+    {
+        try
+        {
+            if (hasTruststore())
+            {
+                if (maybeFileBasedTrustedCertificates)
+                {
+                    pemEncodedCertificates = readPEMFile(truststore); // read PEM from the file
+                }
+
+                TrustManagerFactory tmf = TrustManagerFactory.getInstance(
+                algorithm == null ? TrustManagerFactory.getDefaultAlgorithm() : algorithm);
+                KeyStore ts = buildTrustStore();
+                tmf.init(ts);
+                return tmf;
+            }
+            else
+            {
+                throw new SSLException("Can't build KeyManagerFactory in absence of the trusted certificates");
+            }
+        } catch(Exception e) {
+            throw new SSLException("failed to build trust manager store for secure connections", e);
+        }
+    }
+
+    private String readPEMFile(String file) throws IOException
+    {
+        StringBuilder fileData = new StringBuilder();
+        try(BufferedReader br = new BufferedReader(new FileReader(file)))
+        {
+            String line = null;
+            while( (line = br.readLine()) != null)
+            {
+                fileData.append(line).append(System.lineSeparator());
+            }
+        }
+        return fileData.toString();
+    }
+
+    /**
+     * Builds KeyStore object given the {@link #targetStoreType} out of the PEM formatted private key material.
+     * It uses {@code cassandra-ssl-keystore} as the alias for the created key-entry.
+     */
+    private KeyStore buildKeyStore() throws GeneralSecurityException, IOException
+    {
+        boolean encryptedKey = keyPassword != null;
+        char[] keyPasswordArray = encryptedKey ? keyPassword.toCharArray() : null;
+        PrivateKey privateKey = encryptedKey ? PEMReader.extractPrivateKey(pemEncodedKey, keyPassword) :
+                                PEMReader.extractPrivateKey(pemEncodedKey);
+        Certificate[] certChainArray = PEMReader.extractCertificates(pemEncodedKey);
+        if (certChainArray == null || certChainArray.length == 0)
+        {
+            throw new SSLException("Could not read any certificates for the certChain for the private key");
+        }
+
+        KeyStore keyStore = KeyStore.getInstance(targetStoreType);
+        keyStore.load(null, null);
+        keyStore.setKeyEntry("cassandra-ssl-keystore", privateKey, keyPasswordArray, certChainArray);
+        return keyStore;
+    }
+
+    /**
+     * Builds KeyStore object given the {@link #targetStoreType} out of the PEM formatted certificates/public-key
+     * material.
+     *
+     * It uses {@code cassandra-ssl-trusted-cert-<numeric-id>} as the alias for the created certificate-entry.
+     */
+    private KeyStore buildTrustStore() throws GeneralSecurityException, IOException
+    {
+        Certificate[] certChainArray = PEMReader.extractCertificates(pemEncodedCertificates);
+        if (certChainArray == null || certChainArray.length == 0)
+        {
+            throw new SSLException("Could not read any certificates from the given PEM");
+        }
+
+        KeyStore keyStore = KeyStore.getInstance(targetStoreType);
+        keyStore.load(null, null);
+        for (int i=0; i < certChainArray.length; i++) {

Review comment:
       mostly spaces and brackets
   
   ```
   for (int i = 0; i < certChainArray.lenght; i++)
   {
       keyStore.setCertificateEntry("blabla" + (i + 1), ...);
   }
   ```

##########
File path: examples/ssl-factory/test/unit/org/apache/cassandra/security/KubernetesSecretsPEMSslContextFactoryTest.java
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.cassandra.security.KubernetesSecretsPEMSslContextFactory.DEFAULT_PRIVATE_KEY_ENV_VAR_NAME;
+import static org.apache.cassandra.security.KubernetesSecretsPEMSslContextFactory.DEFAULT_PRIVATE_KEY_PASSWORD_ENV_VAR_NAME;
+import static org.apache.cassandra.security.KubernetesSecretsPEMSslContextFactory.PEMConfigKey.PRIVATE_KEY_PASSWORD_ENV_VAR;
+import static org.apache.cassandra.security.KubernetesSecretsPEMSslContextFactory.PEMConfigKey.TRUSTED_CERTIFICATE_ENV_VAR;
+import static org.apache.cassandra.security.KubernetesSecretsSslContextFactory.ConfigKeys.KEYSTORE_UPDATED_TIMESTAMP_PATH;
+import static org.apache.cassandra.security.KubernetesSecretsSslContextFactory.ConfigKeys.TRUSTSTORE_UPDATED_TIMESTAMP_PATH;
+
+public class KubernetesSecretsPEMSslContextFactoryTest
+{
+    private static final Logger logger = LoggerFactory.getLogger(KubernetesSecretsPEMSslContextFactoryTest.class);
+
+    private Map<String, Object> commonConfig = new HashMap<>();
+    private final static String truststoreUpdatedTimestampFilepath = "build/test/conf/cassandra_truststore_last_updatedtime";
+    private final static String keystoreUpdatedTimestampFilepath = "build/test/conf/cassandra_keystore_last_updatedtime";
+
+    private static class KubernetesSecretsPEMSslContextFactoryForTestOnly extends KubernetesSecretsPEMSslContextFactory
+    {
+
+        public KubernetesSecretsPEMSslContextFactoryForTestOnly()
+        {
+        }
+
+        public KubernetesSecretsPEMSslContextFactoryForTestOnly(Map<String, Object> config)
+        {
+            super(config);
+        }
+
+        /*
+         * This is overriden to first give priority to the input map configuration since we should not be setting env
+         * variables from the unit tests. However, if the input map configuration doesn't have the value for the
+         * given key then fallback to loading from the real environment variables.
+         */
+        @Override
+        String getValueFromEnv(String envVarName, String defaultValue)
+        {
+            String envVarValue = parameters.get(envVarName) != null ? parameters.get(envVarName).toString() : null;
+            if (StringUtils.isEmpty(envVarValue))
+            {
+                logger.info("Configuration doesn't have env variable {}. Will use parent's implementation", envVarName);
+                return super.getValueFromEnv(envVarName, defaultValue);
+            }
+            else
+            {
+                logger.info("Configuration has env variable {} with value {}. Will use that.",
+                            envVarName, envVarValue);
+                return envVarValue;
+            }
+        }
+    }
+
+    private static final String private_key =

Review comment:
       cant be this somehow fetched from a helper file in test resources?

##########
File path: src/java/org/apache/cassandra/security/PEMBasedSslContextFactory.java
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SslContextFactory for the <a href="">PEM standard</a> encoded PKCS#8 private keys and X509 certificates/public-keys.
+ * It parses the key material based on the standard defined in the <a href="https://datatracker.ietf.org/doc/html/rfc7468">RFC 7468</a>.
+ * It creates <a href="https://datatracker.ietf.org/doc/html/rfc5208">PKCS# 8</a> based private key and X509 certificate(s)
+ * for the public key to build the required keystore and the truststore managers that are used for the SSL context creation.
+ * Internally it builds Java {@link KeyStore} with <a href="https://datatracker.ietf.org/doc/html/rfc7292">PKCS# 12</a> <a href="https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#keystore-types">store type</a>
+ * to be used for keystore and the truststore managers.
+ *
+ * This factory also supports 'hot reloading' of the key material, the same way as defined by {@link FileBasedSslContextFactory},
+ * if it is file based. This factory ignores the existing 'store_type' configuration used for other file based store
+ * types like JKS.
+ *
+ * You can configure this factory with either inline PEM data or with the files having the required PEM data as shown
+ * below,
+ *
+ * <b>Configuration: PEM keys/certs defined inline (mind the spaces in the YAML!)</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *         parameters:
+ *             private_key: |
+ *              -----BEGIN ENCRYPTED PRIVATE KEY----- OR -----BEGIN PRIVATE KEY-----
+ *              <your base64 encoded private key>
+ *              -----END ENCRYPTED PRIVATE KEY----- OR -----END PRIVATE KEY-----
+ *              -----BEGIN CERTIFICATE-----
+ *              <your base64 encoded certificate chain>
+ *              -----END CERTIFICATE-----
+ *
+ *             private_key_password: "<your password if the private key is encrypted with a password>"
+ *
+ *             trusted_certificates: |
+ *               -----BEGIN CERTIFICATE-----
+ *               <your base64 encoded certificate>
+ *               -----END CERTIFICATE-----
+ * </pre>
+ *
+ * <b>Configuration: PEM keys/certs defined in files</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *      keystore: <file path to the keystore file in the PEM format with the private key and the certificate chain>
+ *      keystore_password: "<your password if the private key is encrypted with a password>"
+ *      truststore: <file path to the truststore file in the PEM format>
+ * </pre>
+ *
+ */
+public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
+{
+    private static final Logger logger = LoggerFactory.getLogger(PEMBasedSslContextFactory.class);
+
+    private String targetStoreType;
+    private String pemEncodedKey;
+    private String keyPassword;
+    private String pemEncodedCertificates;
+    private boolean maybeFileBasedPrivateKey;
+    private boolean maybeFileBasedTrustedCertificates;
+
+    public static final String DEFAULT_TARGET_STORETYPE = "PKCS12";
+
+    public enum ConfigKey
+    {
+        ENCODED_KEY("private_key"),
+        KEY_PASSWORD("private_key_password"),
+        ENCODED_CERTIFICATES("trusted_certificates");
+
+        final String keyName;
+
+        ConfigKey(String keyName)
+        {
+            this.keyName=keyName;
+        }
+
+        String getKeyName()
+        {
+            return keyName;
+        }
+
+        static Set<String> asSet()
+        {
+            Set<String> valueSet = new HashSet<>();
+            ConfigKey[] values = values();
+            for(ConfigKey key: values) {
+                valueSet.add(key.getKeyName().toLowerCase());
+            }
+            return valueSet;
+        }
+    }
+
+    public PEMBasedSslContextFactory()
+    {
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+    }
+
+    public PEMBasedSslContextFactory(Map<String, Object> parameters)
+    {
+        super(parameters);
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+        pemEncodedKey = getString(ConfigKey.ENCODED_KEY.getKeyName());
+        keyPassword = getString(ConfigKey.KEY_PASSWORD.getKeyName());
+        if (StringUtils.isEmpty(keyPassword))
+        {
+            keyPassword = keystore_password;

Review comment:
       why is that "keystore_password"? I think we are just using camel cases here.

##########
File path: src/java/org/apache/cassandra/security/PEMBasedSslContextFactory.java
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SslContextFactory for the <a href="">PEM standard</a> encoded PKCS#8 private keys and X509 certificates/public-keys.
+ * It parses the key material based on the standard defined in the <a href="https://datatracker.ietf.org/doc/html/rfc7468">RFC 7468</a>.
+ * It creates <a href="https://datatracker.ietf.org/doc/html/rfc5208">PKCS# 8</a> based private key and X509 certificate(s)
+ * for the public key to build the required keystore and the truststore managers that are used for the SSL context creation.
+ * Internally it builds Java {@link KeyStore} with <a href="https://datatracker.ietf.org/doc/html/rfc7292">PKCS# 12</a> <a href="https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#keystore-types">store type</a>
+ * to be used for keystore and the truststore managers.
+ *
+ * This factory also supports 'hot reloading' of the key material, the same way as defined by {@link FileBasedSslContextFactory},
+ * if it is file based. This factory ignores the existing 'store_type' configuration used for other file based store
+ * types like JKS.
+ *
+ * You can configure this factory with either inline PEM data or with the files having the required PEM data as shown
+ * below,
+ *
+ * <b>Configuration: PEM keys/certs defined inline (mind the spaces in the YAML!)</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *         parameters:
+ *             private_key: |
+ *              -----BEGIN ENCRYPTED PRIVATE KEY----- OR -----BEGIN PRIVATE KEY-----
+ *              <your base64 encoded private key>
+ *              -----END ENCRYPTED PRIVATE KEY----- OR -----END PRIVATE KEY-----
+ *              -----BEGIN CERTIFICATE-----
+ *              <your base64 encoded certificate chain>
+ *              -----END CERTIFICATE-----
+ *
+ *             private_key_password: "<your password if the private key is encrypted with a password>"
+ *
+ *             trusted_certificates: |
+ *               -----BEGIN CERTIFICATE-----
+ *               <your base64 encoded certificate>
+ *               -----END CERTIFICATE-----
+ * </pre>
+ *
+ * <b>Configuration: PEM keys/certs defined in files</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *      keystore: <file path to the keystore file in the PEM format with the private key and the certificate chain>
+ *      keystore_password: "<your password if the private key is encrypted with a password>"
+ *      truststore: <file path to the truststore file in the PEM format>
+ * </pre>
+ *
+ */
+public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
+{
+    private static final Logger logger = LoggerFactory.getLogger(PEMBasedSslContextFactory.class);
+
+    private String targetStoreType;
+    private String pemEncodedKey;
+    private String keyPassword;
+    private String pemEncodedCertificates;
+    private boolean maybeFileBasedPrivateKey;
+    private boolean maybeFileBasedTrustedCertificates;
+
+    public static final String DEFAULT_TARGET_STORETYPE = "PKCS12";
+
+    public enum ConfigKey
+    {
+        ENCODED_KEY("private_key"),
+        KEY_PASSWORD("private_key_password"),
+        ENCODED_CERTIFICATES("trusted_certificates");
+
+        final String keyName;
+
+        ConfigKey(String keyName)
+        {
+            this.keyName=keyName;
+        }
+
+        String getKeyName()
+        {
+            return keyName;
+        }
+
+        static Set<String> asSet()
+        {
+            Set<String> valueSet = new HashSet<>();
+            ConfigKey[] values = values();
+            for(ConfigKey key: values) {
+                valueSet.add(key.getKeyName().toLowerCase());
+            }
+            return valueSet;
+        }
+    }
+
+    public PEMBasedSslContextFactory()
+    {
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+    }
+
+    public PEMBasedSslContextFactory(Map<String, Object> parameters)
+    {
+        super(parameters);
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+        pemEncodedKey = getString(ConfigKey.ENCODED_KEY.getKeyName());
+        keyPassword = getString(ConfigKey.KEY_PASSWORD.getKeyName());
+        if (StringUtils.isEmpty(keyPassword))
+        {
+            keyPassword = keystore_password;
+        }
+        else if (!StringUtils.isEmpty(keystore_password) && !keyPassword.equals(keystore_password))
+        {
+            throw new IllegalArgumentException("'keystore_password' and 'key_password' both configurations are given and the " +
+                                   "values don't match");
+        }
+        else
+        {
+            logger.warn("'keystore_password' and 'key_password' both are configured but since the values match it's " +
+                        "okay. Ideally you should only specify one of them.");
+        }
+
+        if (!StringUtils.isEmpty(truststore_password))
+        {
+            logger.warn("PEM based truststore should not be using password. Ignoring the given value in " +
+                        "'truststore_password' configuration.");
+        }
+
+        pemEncodedCertificates = getString(ConfigKey.ENCODED_CERTIFICATES.getKeyName());
+
+        maybeFileBasedPrivateKey = StringUtils.isEmpty(pemEncodedKey);
+        maybeFileBasedTrustedCertificates = StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * Decides if this factory has a keystore defined - key material specified in files or inline to the configuration.
+     * @return {@code true} if there is a keystore defined; {@code false} otherwise
+     */
+    @Override
+    public boolean hasKeystore()
+    {
+        return maybeFileBasedPrivateKey ? (keystore != null && new File(keystore).exists()) :
+               !StringUtils.isEmpty(pemEncodedKey);
+    }
+
+    /**
+     * Decides if this factory has a truststore defined - key material specified in files or inline to the
+     * configuration.
+     * @return {@code true} if there is a truststore defined; {@code false} otherwise
+     */
+    private boolean hasTruststore()
+    {
+        return maybeFileBasedTrustedCertificates ? (truststore != null && new File(truststore).exists()) :
+               !StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * This enables 'hot' reloading of the key/trust stores based on the last updated timestamps if they are file based.
+     */
+    @Override
+    public synchronized void initHotReloading()
+    {
+        List<HotReloadableFile> fileList = new ArrayList<>();
+        if (maybeFileBasedPrivateKey && hasKeystore())
+        {
+            fileList.add(new HotReloadableFile(keystore));
+        }
+        if (maybeFileBasedTrustedCertificates && hasTruststore())
+        {
+            fileList.add(new HotReloadableFile(truststore));
+        }
+        if (!fileList.isEmpty())
+        {
+            hotReloadableFiles = fileList;
+        }
+    }
+
+    /**
+     * Builds required KeyManagerFactory from the PEM based keystore. It also checks for the PrivateKey's certificate's
+     * expiry and logs {@code warning} for each expired PrivateKey's certitificate.
+     *
+     * @return KeyManagerFactory built from the PEM based keystore.
+     * @throws SSLException if any issues encountered during the build process
+     */
+    @Override
+    protected KeyManagerFactory buildKeyManagerFactory() throws SSLException
+    {
+        try
+        {
+            if (hasKeystore())
+            {
+                if (maybeFileBasedPrivateKey)
+                {
+                    pemEncodedKey = readPEMFile(keystore); // read PEM from the file
+                }
+
+                KeyManagerFactory kmf = KeyManagerFactory.getInstance(
+                algorithm == null ? KeyManagerFactory.getDefaultAlgorithm() : algorithm);
+                KeyStore ks = buildKeyStore();
+                if (!checkedExpiry)
+                {
+                    checkExpiredCerts(ks);
+                    checkedExpiry = true;
+                }
+                kmf.init(ks, keyPassword != null ? keyPassword.toCharArray() : null);
+                return kmf;
+            }
+            else
+            {
+                throw new SSLException("Can't build KeyManagerFactory in absence of the Private Key");
+            }
+        } catch(Exception e) {
+            throw new SSLException("failed to build key manager store for secure connections", e);
+        }
+    }
+
+    /**
+     * Builds TrustManagerFactory from the PEM based truststore.
+     *
+     * @return TrustManagerFactory from the PEM based truststore
+     * @throws SSLException if any issues encountered during the build process
+     */
+    @Override
+    protected TrustManagerFactory buildTrustManagerFactory() throws SSLException
+    {
+        try
+        {
+            if (hasTruststore())
+            {
+                if (maybeFileBasedTrustedCertificates)
+                {
+                    pemEncodedCertificates = readPEMFile(truststore); // read PEM from the file
+                }
+
+                TrustManagerFactory tmf = TrustManagerFactory.getInstance(
+                algorithm == null ? TrustManagerFactory.getDefaultAlgorithm() : algorithm);
+                KeyStore ts = buildTrustStore();
+                tmf.init(ts);
+                return tmf;
+            }
+            else
+            {
+                throw new SSLException("Can't build KeyManagerFactory in absence of the trusted certificates");
+            }
+        } catch(Exception e) {

Review comment:
       same

##########
File path: src/java/org/apache/cassandra/security/PEMBasedSslContextFactory.java
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SslContextFactory for the <a href="">PEM standard</a> encoded PKCS#8 private keys and X509 certificates/public-keys.
+ * It parses the key material based on the standard defined in the <a href="https://datatracker.ietf.org/doc/html/rfc7468">RFC 7468</a>.
+ * It creates <a href="https://datatracker.ietf.org/doc/html/rfc5208">PKCS# 8</a> based private key and X509 certificate(s)
+ * for the public key to build the required keystore and the truststore managers that are used for the SSL context creation.
+ * Internally it builds Java {@link KeyStore} with <a href="https://datatracker.ietf.org/doc/html/rfc7292">PKCS# 12</a> <a href="https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#keystore-types">store type</a>
+ * to be used for keystore and the truststore managers.
+ *
+ * This factory also supports 'hot reloading' of the key material, the same way as defined by {@link FileBasedSslContextFactory},
+ * if it is file based. This factory ignores the existing 'store_type' configuration used for other file based store
+ * types like JKS.
+ *
+ * You can configure this factory with either inline PEM data or with the files having the required PEM data as shown
+ * below,
+ *
+ * <b>Configuration: PEM keys/certs defined inline (mind the spaces in the YAML!)</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *         parameters:
+ *             private_key: |
+ *              -----BEGIN ENCRYPTED PRIVATE KEY----- OR -----BEGIN PRIVATE KEY-----
+ *              <your base64 encoded private key>
+ *              -----END ENCRYPTED PRIVATE KEY----- OR -----END PRIVATE KEY-----
+ *              -----BEGIN CERTIFICATE-----
+ *              <your base64 encoded certificate chain>
+ *              -----END CERTIFICATE-----
+ *
+ *             private_key_password: "<your password if the private key is encrypted with a password>"
+ *
+ *             trusted_certificates: |
+ *               -----BEGIN CERTIFICATE-----
+ *               <your base64 encoded certificate>
+ *               -----END CERTIFICATE-----
+ * </pre>
+ *
+ * <b>Configuration: PEM keys/certs defined in files</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *      keystore: <file path to the keystore file in the PEM format with the private key and the certificate chain>
+ *      keystore_password: "<your password if the private key is encrypted with a password>"
+ *      truststore: <file path to the truststore file in the PEM format>
+ * </pre>
+ *
+ */
+public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
+{
+    private static final Logger logger = LoggerFactory.getLogger(PEMBasedSslContextFactory.class);
+
+    private String targetStoreType;
+    private String pemEncodedKey;
+    private String keyPassword;
+    private String pemEncodedCertificates;
+    private boolean maybeFileBasedPrivateKey;
+    private boolean maybeFileBasedTrustedCertificates;
+
+    public static final String DEFAULT_TARGET_STORETYPE = "PKCS12";
+
+    public enum ConfigKey
+    {
+        ENCODED_KEY("private_key"),
+        KEY_PASSWORD("private_key_password"),
+        ENCODED_CERTIFICATES("trusted_certificates");
+
+        final String keyName;
+
+        ConfigKey(String keyName)
+        {
+            this.keyName=keyName;
+        }
+
+        String getKeyName()
+        {
+            return keyName;
+        }
+
+        static Set<String> asSet()
+        {
+            Set<String> valueSet = new HashSet<>();
+            ConfigKey[] values = values();
+            for(ConfigKey key: values) {
+                valueSet.add(key.getKeyName().toLowerCase());
+            }
+            return valueSet;
+        }
+    }
+
+    public PEMBasedSslContextFactory()
+    {
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+    }
+
+    public PEMBasedSslContextFactory(Map<String, Object> parameters)
+    {
+        super(parameters);
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+        pemEncodedKey = getString(ConfigKey.ENCODED_KEY.getKeyName());
+        keyPassword = getString(ConfigKey.KEY_PASSWORD.getKeyName());
+        if (StringUtils.isEmpty(keyPassword))
+        {
+            keyPassword = keystore_password;
+        }
+        else if (!StringUtils.isEmpty(keystore_password) && !keyPassword.equals(keystore_password))
+        {
+            throw new IllegalArgumentException("'keystore_password' and 'key_password' both configurations are given and the " +
+                                   "values don't match");

Review comment:
       don't -> do not, it's on the next warn -> it is
   
   I think it is better to stick to the "proper" English. It does not kill anybody to read "it is" instead of "it's". There is nowhere to hurry :)

##########
File path: src/java/org/apache/cassandra/security/PEMBasedSslContextFactory.java
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SslContextFactory for the <a href="">PEM standard</a> encoded PKCS#8 private keys and X509 certificates/public-keys.
+ * It parses the key material based on the standard defined in the <a href="https://datatracker.ietf.org/doc/html/rfc7468">RFC 7468</a>.
+ * It creates <a href="https://datatracker.ietf.org/doc/html/rfc5208">PKCS# 8</a> based private key and X509 certificate(s)
+ * for the public key to build the required keystore and the truststore managers that are used for the SSL context creation.
+ * Internally it builds Java {@link KeyStore} with <a href="https://datatracker.ietf.org/doc/html/rfc7292">PKCS# 12</a> <a href="https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#keystore-types">store type</a>
+ * to be used for keystore and the truststore managers.
+ *
+ * This factory also supports 'hot reloading' of the key material, the same way as defined by {@link FileBasedSslContextFactory},
+ * if it is file based. This factory ignores the existing 'store_type' configuration used for other file based store
+ * types like JKS.
+ *
+ * You can configure this factory with either inline PEM data or with the files having the required PEM data as shown
+ * below,
+ *
+ * <b>Configuration: PEM keys/certs defined inline (mind the spaces in the YAML!)</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *         parameters:
+ *             private_key: |
+ *              -----BEGIN ENCRYPTED PRIVATE KEY----- OR -----BEGIN PRIVATE KEY-----
+ *              <your base64 encoded private key>
+ *              -----END ENCRYPTED PRIVATE KEY----- OR -----END PRIVATE KEY-----
+ *              -----BEGIN CERTIFICATE-----
+ *              <your base64 encoded certificate chain>
+ *              -----END CERTIFICATE-----
+ *
+ *             private_key_password: "<your password if the private key is encrypted with a password>"
+ *
+ *             trusted_certificates: |
+ *               -----BEGIN CERTIFICATE-----
+ *               <your base64 encoded certificate>
+ *               -----END CERTIFICATE-----
+ * </pre>
+ *
+ * <b>Configuration: PEM keys/certs defined in files</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *      keystore: <file path to the keystore file in the PEM format with the private key and the certificate chain>
+ *      keystore_password: "<your password if the private key is encrypted with a password>"
+ *      truststore: <file path to the truststore file in the PEM format>
+ * </pre>
+ *
+ */
+public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
+{
+    private static final Logger logger = LoggerFactory.getLogger(PEMBasedSslContextFactory.class);
+
+    private String targetStoreType;
+    private String pemEncodedKey;
+    private String keyPassword;
+    private String pemEncodedCertificates;
+    private boolean maybeFileBasedPrivateKey;
+    private boolean maybeFileBasedTrustedCertificates;
+
+    public static final String DEFAULT_TARGET_STORETYPE = "PKCS12";
+
+    public enum ConfigKey
+    {
+        ENCODED_KEY("private_key"),
+        KEY_PASSWORD("private_key_password"),
+        ENCODED_CERTIFICATES("trusted_certificates");
+
+        final String keyName;
+
+        ConfigKey(String keyName)
+        {
+            this.keyName=keyName;
+        }
+
+        String getKeyName()
+        {
+            return keyName;
+        }
+
+        static Set<String> asSet()
+        {
+            Set<String> valueSet = new HashSet<>();
+            ConfigKey[] values = values();
+            for(ConfigKey key: values) {
+                valueSet.add(key.getKeyName().toLowerCase());
+            }
+            return valueSet;
+        }
+    }
+
+    public PEMBasedSslContextFactory()
+    {
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+    }
+
+    public PEMBasedSslContextFactory(Map<String, Object> parameters)
+    {
+        super(parameters);
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+        pemEncodedKey = getString(ConfigKey.ENCODED_KEY.getKeyName());
+        keyPassword = getString(ConfigKey.KEY_PASSWORD.getKeyName());
+        if (StringUtils.isEmpty(keyPassword))
+        {
+            keyPassword = keystore_password;
+        }
+        else if (!StringUtils.isEmpty(keystore_password) && !keyPassword.equals(keystore_password))

Review comment:
       why do we actually need two variables with seemingly same names and there we are doing a logic around their possible equality? That is quite confusing to go through if you ask me. Or "keystore" and "key" passwords are password for completely different things? I would welcome if we put more Javadoc on top of relevant properties / methods so we can know without any ambiguity which is which and for what (it they are meant to stay).

##########
File path: src/java/org/apache/cassandra/security/PEMBasedSslContextFactory.java
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SslContextFactory for the <a href="">PEM standard</a> encoded PKCS#8 private keys and X509 certificates/public-keys.
+ * It parses the key material based on the standard defined in the <a href="https://datatracker.ietf.org/doc/html/rfc7468">RFC 7468</a>.
+ * It creates <a href="https://datatracker.ietf.org/doc/html/rfc5208">PKCS# 8</a> based private key and X509 certificate(s)
+ * for the public key to build the required keystore and the truststore managers that are used for the SSL context creation.
+ * Internally it builds Java {@link KeyStore} with <a href="https://datatracker.ietf.org/doc/html/rfc7292">PKCS# 12</a> <a href="https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#keystore-types">store type</a>
+ * to be used for keystore and the truststore managers.
+ *
+ * This factory also supports 'hot reloading' of the key material, the same way as defined by {@link FileBasedSslContextFactory},
+ * if it is file based. This factory ignores the existing 'store_type' configuration used for other file based store
+ * types like JKS.
+ *
+ * You can configure this factory with either inline PEM data or with the files having the required PEM data as shown
+ * below,
+ *
+ * <b>Configuration: PEM keys/certs defined inline (mind the spaces in the YAML!)</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *         parameters:
+ *             private_key: |
+ *              -----BEGIN ENCRYPTED PRIVATE KEY----- OR -----BEGIN PRIVATE KEY-----
+ *              <your base64 encoded private key>
+ *              -----END ENCRYPTED PRIVATE KEY----- OR -----END PRIVATE KEY-----
+ *              -----BEGIN CERTIFICATE-----
+ *              <your base64 encoded certificate chain>
+ *              -----END CERTIFICATE-----
+ *
+ *             private_key_password: "<your password if the private key is encrypted with a password>"
+ *
+ *             trusted_certificates: |
+ *               -----BEGIN CERTIFICATE-----
+ *               <your base64 encoded certificate>
+ *               -----END CERTIFICATE-----
+ * </pre>
+ *
+ * <b>Configuration: PEM keys/certs defined in files</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *      keystore: <file path to the keystore file in the PEM format with the private key and the certificate chain>
+ *      keystore_password: "<your password if the private key is encrypted with a password>"
+ *      truststore: <file path to the truststore file in the PEM format>
+ * </pre>
+ *
+ */
+public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
+{
+    private static final Logger logger = LoggerFactory.getLogger(PEMBasedSslContextFactory.class);
+
+    private String targetStoreType;
+    private String pemEncodedKey;
+    private String keyPassword;
+    private String pemEncodedCertificates;
+    private boolean maybeFileBasedPrivateKey;
+    private boolean maybeFileBasedTrustedCertificates;
+
+    public static final String DEFAULT_TARGET_STORETYPE = "PKCS12";
+
+    public enum ConfigKey
+    {
+        ENCODED_KEY("private_key"),
+        KEY_PASSWORD("private_key_password"),
+        ENCODED_CERTIFICATES("trusted_certificates");
+
+        final String keyName;
+
+        ConfigKey(String keyName)
+        {
+            this.keyName=keyName;
+        }
+
+        String getKeyName()
+        {
+            return keyName;
+        }
+
+        static Set<String> asSet()
+        {
+            Set<String> valueSet = new HashSet<>();
+            ConfigKey[] values = values();
+            for(ConfigKey key: values) {
+                valueSet.add(key.getKeyName().toLowerCase());
+            }
+            return valueSet;
+        }
+    }
+
+    public PEMBasedSslContextFactory()
+    {
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+    }
+
+    public PEMBasedSslContextFactory(Map<String, Object> parameters)
+    {
+        super(parameters);
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+        pemEncodedKey = getString(ConfigKey.ENCODED_KEY.getKeyName());
+        keyPassword = getString(ConfigKey.KEY_PASSWORD.getKeyName());
+        if (StringUtils.isEmpty(keyPassword))
+        {
+            keyPassword = keystore_password;
+        }
+        else if (!StringUtils.isEmpty(keystore_password) && !keyPassword.equals(keystore_password))
+        {
+            throw new IllegalArgumentException("'keystore_password' and 'key_password' both configurations are given and the " +
+                                   "values don't match");
+        }
+        else
+        {
+            logger.warn("'keystore_password' and 'key_password' both are configured but since the values match it's " +
+                        "okay. Ideally you should only specify one of them.");
+        }
+
+        if (!StringUtils.isEmpty(truststore_password))
+        {
+            logger.warn("PEM based truststore should not be using password. Ignoring the given value in " +
+                        "'truststore_password' configuration.");
+        }
+
+        pemEncodedCertificates = getString(ConfigKey.ENCODED_CERTIFICATES.getKeyName());
+
+        maybeFileBasedPrivateKey = StringUtils.isEmpty(pemEncodedKey);
+        maybeFileBasedTrustedCertificates = StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * Decides if this factory has a keystore defined - key material specified in files or inline to the configuration.
+     * @return {@code true} if there is a keystore defined; {@code false} otherwise
+     */
+    @Override
+    public boolean hasKeystore()
+    {
+        return maybeFileBasedPrivateKey ? (keystore != null && new File(keystore).exists()) :
+               !StringUtils.isEmpty(pemEncodedKey);
+    }
+
+    /**
+     * Decides if this factory has a truststore defined - key material specified in files or inline to the
+     * configuration.
+     * @return {@code true} if there is a truststore defined; {@code false} otherwise
+     */
+    private boolean hasTruststore()
+    {
+        return maybeFileBasedTrustedCertificates ? (truststore != null && new File(truststore).exists()) :
+               !StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * This enables 'hot' reloading of the key/trust stores based on the last updated timestamps if they are file based.
+     */
+    @Override
+    public synchronized void initHotReloading()
+    {
+        List<HotReloadableFile> fileList = new ArrayList<>();
+        if (maybeFileBasedPrivateKey && hasKeystore())
+        {
+            fileList.add(new HotReloadableFile(keystore));
+        }
+        if (maybeFileBasedTrustedCertificates && hasTruststore())
+        {
+            fileList.add(new HotReloadableFile(truststore));
+        }
+        if (!fileList.isEmpty())
+        {
+            hotReloadableFiles = fileList;
+        }
+    }
+
+    /**
+     * Builds required KeyManagerFactory from the PEM based keystore. It also checks for the PrivateKey's certificate's
+     * expiry and logs {@code warning} for each expired PrivateKey's certitificate.
+     *
+     * @return KeyManagerFactory built from the PEM based keystore.
+     * @throws SSLException if any issues encountered during the build process
+     */
+    @Override
+    protected KeyManagerFactory buildKeyManagerFactory() throws SSLException
+    {
+        try
+        {
+            if (hasKeystore())
+            {
+                if (maybeFileBasedPrivateKey)
+                {
+                    pemEncodedKey = readPEMFile(keystore); // read PEM from the file
+                }
+
+                KeyManagerFactory kmf = KeyManagerFactory.getInstance(
+                algorithm == null ? KeyManagerFactory.getDefaultAlgorithm() : algorithm);
+                KeyStore ks = buildKeyStore();
+                if (!checkedExpiry)
+                {
+                    checkExpiredCerts(ks);
+                    checkedExpiry = true;
+                }
+                kmf.init(ks, keyPassword != null ? keyPassword.toCharArray() : null);
+                return kmf;
+            }
+            else
+            {
+                throw new SSLException("Can't build KeyManagerFactory in absence of the Private Key");
+            }
+        } catch(Exception e) {
+            throw new SSLException("failed to build key manager store for secure connections", e);
+        }
+    }
+
+    /**
+     * Builds TrustManagerFactory from the PEM based truststore.
+     *
+     * @return TrustManagerFactory from the PEM based truststore
+     * @throws SSLException if any issues encountered during the build process
+     */
+    @Override
+    protected TrustManagerFactory buildTrustManagerFactory() throws SSLException
+    {
+        try
+        {
+            if (hasTruststore())
+            {
+                if (maybeFileBasedTrustedCertificates)
+                {
+                    pemEncodedCertificates = readPEMFile(truststore); // read PEM from the file
+                }
+
+                TrustManagerFactory tmf = TrustManagerFactory.getInstance(
+                algorithm == null ? TrustManagerFactory.getDefaultAlgorithm() : algorithm);
+                KeyStore ts = buildTrustStore();
+                tmf.init(ts);
+                return tmf;
+            }
+            else
+            {
+                throw new SSLException("Can't build KeyManagerFactory in absence of the trusted certificates");
+            }
+        } catch(Exception e) {
+            throw new SSLException("failed to build trust manager store for secure connections", e);

Review comment:
       some messages starts with lower case, some with upper case, cant be this same one style or the other everywhere?

##########
File path: src/java/org/apache/cassandra/security/PEMBasedSslContextFactory.java
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SslContextFactory for the <a href="">PEM standard</a> encoded PKCS#8 private keys and X509 certificates/public-keys.
+ * It parses the key material based on the standard defined in the <a href="https://datatracker.ietf.org/doc/html/rfc7468">RFC 7468</a>.
+ * It creates <a href="https://datatracker.ietf.org/doc/html/rfc5208">PKCS# 8</a> based private key and X509 certificate(s)
+ * for the public key to build the required keystore and the truststore managers that are used for the SSL context creation.
+ * Internally it builds Java {@link KeyStore} with <a href="https://datatracker.ietf.org/doc/html/rfc7292">PKCS# 12</a> <a href="https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#keystore-types">store type</a>
+ * to be used for keystore and the truststore managers.
+ *
+ * This factory also supports 'hot reloading' of the key material, the same way as defined by {@link FileBasedSslContextFactory},
+ * if it is file based. This factory ignores the existing 'store_type' configuration used for other file based store
+ * types like JKS.
+ *
+ * You can configure this factory with either inline PEM data or with the files having the required PEM data as shown
+ * below,
+ *
+ * <b>Configuration: PEM keys/certs defined inline (mind the spaces in the YAML!)</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *         parameters:
+ *             private_key: |
+ *              -----BEGIN ENCRYPTED PRIVATE KEY----- OR -----BEGIN PRIVATE KEY-----
+ *              <your base64 encoded private key>
+ *              -----END ENCRYPTED PRIVATE KEY----- OR -----END PRIVATE KEY-----
+ *              -----BEGIN CERTIFICATE-----
+ *              <your base64 encoded certificate chain>
+ *              -----END CERTIFICATE-----
+ *
+ *             private_key_password: "<your password if the private key is encrypted with a password>"
+ *
+ *             trusted_certificates: |
+ *               -----BEGIN CERTIFICATE-----
+ *               <your base64 encoded certificate>
+ *               -----END CERTIFICATE-----
+ * </pre>
+ *
+ * <b>Configuration: PEM keys/certs defined in files</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *      keystore: <file path to the keystore file in the PEM format with the private key and the certificate chain>
+ *      keystore_password: "<your password if the private key is encrypted with a password>"
+ *      truststore: <file path to the truststore file in the PEM format>
+ * </pre>
+ *
+ */
+public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
+{
+    private static final Logger logger = LoggerFactory.getLogger(PEMBasedSslContextFactory.class);
+
+    private String targetStoreType;
+    private String pemEncodedKey;
+    private String keyPassword;
+    private String pemEncodedCertificates;
+    private boolean maybeFileBasedPrivateKey;
+    private boolean maybeFileBasedTrustedCertificates;
+
+    public static final String DEFAULT_TARGET_STORETYPE = "PKCS12";
+
+    public enum ConfigKey
+    {
+        ENCODED_KEY("private_key"),
+        KEY_PASSWORD("private_key_password"),
+        ENCODED_CERTIFICATES("trusted_certificates");
+
+        final String keyName;
+
+        ConfigKey(String keyName)
+        {
+            this.keyName=keyName;
+        }
+
+        String getKeyName()
+        {
+            return keyName;
+        }
+
+        static Set<String> asSet()
+        {
+            Set<String> valueSet = new HashSet<>();
+            ConfigKey[] values = values();
+            for(ConfigKey key: values) {
+                valueSet.add(key.getKeyName().toLowerCase());
+            }
+            return valueSet;
+        }
+    }
+
+    public PEMBasedSslContextFactory()
+    {
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+    }
+
+    public PEMBasedSslContextFactory(Map<String, Object> parameters)
+    {
+        super(parameters);
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+        pemEncodedKey = getString(ConfigKey.ENCODED_KEY.getKeyName());
+        keyPassword = getString(ConfigKey.KEY_PASSWORD.getKeyName());
+        if (StringUtils.isEmpty(keyPassword))
+        {
+            keyPassword = keystore_password;
+        }
+        else if (!StringUtils.isEmpty(keystore_password) && !keyPassword.equals(keystore_password))
+        {
+            throw new IllegalArgumentException("'keystore_password' and 'key_password' both configurations are given and the " +
+                                   "values don't match");
+        }
+        else
+        {
+            logger.warn("'keystore_password' and 'key_password' both are configured but since the values match it's " +
+                        "okay. Ideally you should only specify one of them.");
+        }
+
+        if (!StringUtils.isEmpty(truststore_password))
+        {
+            logger.warn("PEM based truststore should not be using password. Ignoring the given value in " +
+                        "'truststore_password' configuration.");
+        }
+
+        pemEncodedCertificates = getString(ConfigKey.ENCODED_CERTIFICATES.getKeyName());
+
+        maybeFileBasedPrivateKey = StringUtils.isEmpty(pemEncodedKey);
+        maybeFileBasedTrustedCertificates = StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * Decides if this factory has a keystore defined - key material specified in files or inline to the configuration.
+     * @return {@code true} if there is a keystore defined; {@code false} otherwise
+     */
+    @Override
+    public boolean hasKeystore()
+    {
+        return maybeFileBasedPrivateKey ? (keystore != null && new File(keystore).exists()) :
+               !StringUtils.isEmpty(pemEncodedKey);
+    }
+
+    /**
+     * Decides if this factory has a truststore defined - key material specified in files or inline to the
+     * configuration.
+     * @return {@code true} if there is a truststore defined; {@code false} otherwise
+     */
+    private boolean hasTruststore()
+    {
+        return maybeFileBasedTrustedCertificates ? (truststore != null && new File(truststore).exists()) :
+               !StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * This enables 'hot' reloading of the key/trust stores based on the last updated timestamps if they are file based.
+     */
+    @Override
+    public synchronized void initHotReloading()
+    {
+        List<HotReloadableFile> fileList = new ArrayList<>();
+        if (maybeFileBasedPrivateKey && hasKeystore())
+        {
+            fileList.add(new HotReloadableFile(keystore));
+        }
+        if (maybeFileBasedTrustedCertificates && hasTruststore())
+        {
+            fileList.add(new HotReloadableFile(truststore));
+        }
+        if (!fileList.isEmpty())
+        {
+            hotReloadableFiles = fileList;
+        }
+    }
+
+    /**
+     * Builds required KeyManagerFactory from the PEM based keystore. It also checks for the PrivateKey's certificate's
+     * expiry and logs {@code warning} for each expired PrivateKey's certitificate.
+     *
+     * @return KeyManagerFactory built from the PEM based keystore.
+     * @throws SSLException if any issues encountered during the build process
+     */
+    @Override
+    protected KeyManagerFactory buildKeyManagerFactory() throws SSLException
+    {
+        try
+        {
+            if (hasKeystore())
+            {
+                if (maybeFileBasedPrivateKey)
+                {
+                    pemEncodedKey = readPEMFile(keystore); // read PEM from the file
+                }
+
+                KeyManagerFactory kmf = KeyManagerFactory.getInstance(
+                algorithm == null ? KeyManagerFactory.getDefaultAlgorithm() : algorithm);
+                KeyStore ks = buildKeyStore();
+                if (!checkedExpiry)
+                {
+                    checkExpiredCerts(ks);
+                    checkedExpiry = true;
+                }
+                kmf.init(ks, keyPassword != null ? keyPassword.toCharArray() : null);
+                return kmf;
+            }
+            else
+            {
+                throw new SSLException("Can't build KeyManagerFactory in absence of the Private Key");
+            }
+        } catch(Exception e) {
+            throw new SSLException("failed to build key manager store for secure connections", e);
+        }
+    }
+
+    /**
+     * Builds TrustManagerFactory from the PEM based truststore.
+     *
+     * @return TrustManagerFactory from the PEM based truststore
+     * @throws SSLException if any issues encountered during the build process
+     */
+    @Override
+    protected TrustManagerFactory buildTrustManagerFactory() throws SSLException
+    {
+        try
+        {
+            if (hasTruststore())
+            {
+                if (maybeFileBasedTrustedCertificates)
+                {
+                    pemEncodedCertificates = readPEMFile(truststore); // read PEM from the file
+                }
+
+                TrustManagerFactory tmf = TrustManagerFactory.getInstance(
+                algorithm == null ? TrustManagerFactory.getDefaultAlgorithm() : algorithm);
+                KeyStore ts = buildTrustStore();

Review comment:
       maybe this makes perfect sense from the implementation perspective, I am just saying that if I am _building a truststore_, I do not expect that I assign it to a variable which has type `KeyStore`. 

##########
File path: src/java/org/apache/cassandra/security/PEMBasedSslContextFactory.java
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SslContextFactory for the <a href="">PEM standard</a> encoded PKCS#8 private keys and X509 certificates/public-keys.
+ * It parses the key material based on the standard defined in the <a href="https://datatracker.ietf.org/doc/html/rfc7468">RFC 7468</a>.
+ * It creates <a href="https://datatracker.ietf.org/doc/html/rfc5208">PKCS# 8</a> based private key and X509 certificate(s)
+ * for the public key to build the required keystore and the truststore managers that are used for the SSL context creation.
+ * Internally it builds Java {@link KeyStore} with <a href="https://datatracker.ietf.org/doc/html/rfc7292">PKCS# 12</a> <a href="https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#keystore-types">store type</a>
+ * to be used for keystore and the truststore managers.
+ *
+ * This factory also supports 'hot reloading' of the key material, the same way as defined by {@link FileBasedSslContextFactory},
+ * if it is file based. This factory ignores the existing 'store_type' configuration used for other file based store
+ * types like JKS.
+ *
+ * You can configure this factory with either inline PEM data or with the files having the required PEM data as shown
+ * below,
+ *
+ * <b>Configuration: PEM keys/certs defined inline (mind the spaces in the YAML!)</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *         parameters:
+ *             private_key: |
+ *              -----BEGIN ENCRYPTED PRIVATE KEY----- OR -----BEGIN PRIVATE KEY-----
+ *              <your base64 encoded private key>
+ *              -----END ENCRYPTED PRIVATE KEY----- OR -----END PRIVATE KEY-----
+ *              -----BEGIN CERTIFICATE-----
+ *              <your base64 encoded certificate chain>
+ *              -----END CERTIFICATE-----
+ *
+ *             private_key_password: "<your password if the private key is encrypted with a password>"
+ *
+ *             trusted_certificates: |
+ *               -----BEGIN CERTIFICATE-----
+ *               <your base64 encoded certificate>
+ *               -----END CERTIFICATE-----
+ * </pre>
+ *
+ * <b>Configuration: PEM keys/certs defined in files</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *      keystore: <file path to the keystore file in the PEM format with the private key and the certificate chain>
+ *      keystore_password: "<your password if the private key is encrypted with a password>"
+ *      truststore: <file path to the truststore file in the PEM format>
+ * </pre>
+ *
+ */
+public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
+{
+    private static final Logger logger = LoggerFactory.getLogger(PEMBasedSslContextFactory.class);
+
+    private String targetStoreType;
+    private String pemEncodedKey;
+    private String keyPassword;
+    private String pemEncodedCertificates;
+    private boolean maybeFileBasedPrivateKey;
+    private boolean maybeFileBasedTrustedCertificates;
+
+    public static final String DEFAULT_TARGET_STORETYPE = "PKCS12";
+
+    public enum ConfigKey
+    {
+        ENCODED_KEY("private_key"),
+        KEY_PASSWORD("private_key_password"),
+        ENCODED_CERTIFICATES("trusted_certificates");
+
+        final String keyName;
+
+        ConfigKey(String keyName)
+        {
+            this.keyName=keyName;
+        }
+
+        String getKeyName()
+        {
+            return keyName;
+        }
+
+        static Set<String> asSet()
+        {
+            Set<String> valueSet = new HashSet<>();
+            ConfigKey[] values = values();
+            for(ConfigKey key: values) {
+                valueSet.add(key.getKeyName().toLowerCase());
+            }
+            return valueSet;
+        }
+    }
+
+    public PEMBasedSslContextFactory()
+    {
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+    }
+
+    public PEMBasedSslContextFactory(Map<String, Object> parameters)
+    {
+        super(parameters);
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+        pemEncodedKey = getString(ConfigKey.ENCODED_KEY.getKeyName());
+        keyPassword = getString(ConfigKey.KEY_PASSWORD.getKeyName());
+        if (StringUtils.isEmpty(keyPassword))
+        {
+            keyPassword = keystore_password;
+        }
+        else if (!StringUtils.isEmpty(keystore_password) && !keyPassword.equals(keystore_password))
+        {
+            throw new IllegalArgumentException("'keystore_password' and 'key_password' both configurations are given and the " +
+                                   "values don't match");
+        }
+        else
+        {
+            logger.warn("'keystore_password' and 'key_password' both are configured but since the values match it's " +
+                        "okay. Ideally you should only specify one of them.");
+        }
+
+        if (!StringUtils.isEmpty(truststore_password))
+        {
+            logger.warn("PEM based truststore should not be using password. Ignoring the given value in " +
+                        "'truststore_password' configuration.");
+        }
+
+        pemEncodedCertificates = getString(ConfigKey.ENCODED_CERTIFICATES.getKeyName());
+
+        maybeFileBasedPrivateKey = StringUtils.isEmpty(pemEncodedKey);
+        maybeFileBasedTrustedCertificates = StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * Decides if this factory has a keystore defined - key material specified in files or inline to the configuration.
+     * @return {@code true} if there is a keystore defined; {@code false} otherwise
+     */
+    @Override
+    public boolean hasKeystore()
+    {
+        return maybeFileBasedPrivateKey ? (keystore != null && new File(keystore).exists()) :
+               !StringUtils.isEmpty(pemEncodedKey);
+    }
+
+    /**
+     * Decides if this factory has a truststore defined - key material specified in files or inline to the
+     * configuration.
+     * @return {@code true} if there is a truststore defined; {@code false} otherwise
+     */
+    private boolean hasTruststore()
+    {
+        return maybeFileBasedTrustedCertificates ? (truststore != null && new File(truststore).exists()) :
+               !StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * This enables 'hot' reloading of the key/trust stores based on the last updated timestamps if they are file based.
+     */
+    @Override
+    public synchronized void initHotReloading()
+    {
+        List<HotReloadableFile> fileList = new ArrayList<>();
+        if (maybeFileBasedPrivateKey && hasKeystore())
+        {
+            fileList.add(new HotReloadableFile(keystore));
+        }
+        if (maybeFileBasedTrustedCertificates && hasTruststore())
+        {
+            fileList.add(new HotReloadableFile(truststore));
+        }
+        if (!fileList.isEmpty())
+        {
+            hotReloadableFiles = fileList;
+        }
+    }
+
+    /**
+     * Builds required KeyManagerFactory from the PEM based keystore. It also checks for the PrivateKey's certificate's
+     * expiry and logs {@code warning} for each expired PrivateKey's certitificate.
+     *
+     * @return KeyManagerFactory built from the PEM based keystore.
+     * @throws SSLException if any issues encountered during the build process
+     */
+    @Override
+    protected KeyManagerFactory buildKeyManagerFactory() throws SSLException
+    {
+        try
+        {
+            if (hasKeystore())
+            {
+                if (maybeFileBasedPrivateKey)
+                {
+                    pemEncodedKey = readPEMFile(keystore); // read PEM from the file
+                }
+
+                KeyManagerFactory kmf = KeyManagerFactory.getInstance(
+                algorithm == null ? KeyManagerFactory.getDefaultAlgorithm() : algorithm);
+                KeyStore ks = buildKeyStore();
+                if (!checkedExpiry)
+                {
+                    checkExpiredCerts(ks);
+                    checkedExpiry = true;
+                }
+                kmf.init(ks, keyPassword != null ? keyPassword.toCharArray() : null);
+                return kmf;
+            }
+            else
+            {
+                throw new SSLException("Can't build KeyManagerFactory in absence of the Private Key");
+            }
+        } catch(Exception e) {
+            throw new SSLException("failed to build key manager store for secure connections", e);
+        }
+    }
+
+    /**
+     * Builds TrustManagerFactory from the PEM based truststore.
+     *
+     * @return TrustManagerFactory from the PEM based truststore
+     * @throws SSLException if any issues encountered during the build process
+     */
+    @Override
+    protected TrustManagerFactory buildTrustManagerFactory() throws SSLException
+    {
+        try
+        {
+            if (hasTruststore())
+            {
+                if (maybeFileBasedTrustedCertificates)
+                {
+                    pemEncodedCertificates = readPEMFile(truststore); // read PEM from the file
+                }
+
+                TrustManagerFactory tmf = TrustManagerFactory.getInstance(
+                algorithm == null ? TrustManagerFactory.getDefaultAlgorithm() : algorithm);
+                KeyStore ts = buildTrustStore();
+                tmf.init(ts);
+                return tmf;
+            }
+            else
+            {
+                throw new SSLException("Can't build KeyManagerFactory in absence of the trusted certificates");
+            }
+        } catch(Exception e) {
+            throw new SSLException("failed to build trust manager store for secure connections", e);
+        }
+    }
+
+    private String readPEMFile(String file) throws IOException
+    {
+        StringBuilder fileData = new StringBuilder();
+        try(BufferedReader br = new BufferedReader(new FileReader(file)))
+        {
+            String line = null;
+            while( (line = br.readLine()) != null)
+            {
+                fileData.append(line).append(System.lineSeparator());
+            }
+        }
+        return fileData.toString();
+    }
+
+    /**
+     * Builds KeyStore object given the {@link #targetStoreType} out of the PEM formatted private key material.
+     * It uses {@code cassandra-ssl-keystore} as the alias for the created key-entry.
+     */
+    private KeyStore buildKeyStore() throws GeneralSecurityException, IOException
+    {
+        boolean encryptedKey = keyPassword != null;
+        char[] keyPasswordArray = encryptedKey ? keyPassword.toCharArray() : null;
+        PrivateKey privateKey = encryptedKey ? PEMReader.extractPrivateKey(pemEncodedKey, keyPassword) :
+                                PEMReader.extractPrivateKey(pemEncodedKey);
+        Certificate[] certChainArray = PEMReader.extractCertificates(pemEncodedKey);
+        if (certChainArray == null || certChainArray.length == 0)
+        {
+            throw new SSLException("Could not read any certificates for the certChain for the private key");
+        }
+
+        KeyStore keyStore = KeyStore.getInstance(targetStoreType);
+        keyStore.load(null, null);
+        keyStore.setKeyEntry("cassandra-ssl-keystore", privateKey, keyPasswordArray, certChainArray);

Review comment:
       is "cassandra-ssl-keystore" going to be a constant here?

##########
File path: src/java/org/apache/cassandra/security/PEMBasedSslContextFactory.java
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SslContextFactory for the <a href="">PEM standard</a> encoded PKCS#8 private keys and X509 certificates/public-keys.
+ * It parses the key material based on the standard defined in the <a href="https://datatracker.ietf.org/doc/html/rfc7468">RFC 7468</a>.
+ * It creates <a href="https://datatracker.ietf.org/doc/html/rfc5208">PKCS# 8</a> based private key and X509 certificate(s)
+ * for the public key to build the required keystore and the truststore managers that are used for the SSL context creation.
+ * Internally it builds Java {@link KeyStore} with <a href="https://datatracker.ietf.org/doc/html/rfc7292">PKCS# 12</a> <a href="https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#keystore-types">store type</a>
+ * to be used for keystore and the truststore managers.
+ *
+ * This factory also supports 'hot reloading' of the key material, the same way as defined by {@link FileBasedSslContextFactory},
+ * if it is file based. This factory ignores the existing 'store_type' configuration used for other file based store
+ * types like JKS.
+ *
+ * You can configure this factory with either inline PEM data or with the files having the required PEM data as shown
+ * below,
+ *
+ * <b>Configuration: PEM keys/certs defined inline (mind the spaces in the YAML!)</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *         parameters:
+ *             private_key: |
+ *              -----BEGIN ENCRYPTED PRIVATE KEY----- OR -----BEGIN PRIVATE KEY-----
+ *              <your base64 encoded private key>
+ *              -----END ENCRYPTED PRIVATE KEY----- OR -----END PRIVATE KEY-----
+ *              -----BEGIN CERTIFICATE-----
+ *              <your base64 encoded certificate chain>
+ *              -----END CERTIFICATE-----
+ *
+ *             private_key_password: "<your password if the private key is encrypted with a password>"
+ *
+ *             trusted_certificates: |
+ *               -----BEGIN CERTIFICATE-----
+ *               <your base64 encoded certificate>
+ *               -----END CERTIFICATE-----
+ * </pre>
+ *
+ * <b>Configuration: PEM keys/certs defined in files</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *      keystore: <file path to the keystore file in the PEM format with the private key and the certificate chain>
+ *      keystore_password: "<your password if the private key is encrypted with a password>"
+ *      truststore: <file path to the truststore file in the PEM format>
+ * </pre>
+ *
+ */
+public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
+{
+    private static final Logger logger = LoggerFactory.getLogger(PEMBasedSslContextFactory.class);
+
+    private String targetStoreType;
+    private String pemEncodedKey;
+    private String keyPassword;
+    private String pemEncodedCertificates;
+    private boolean maybeFileBasedPrivateKey;
+    private boolean maybeFileBasedTrustedCertificates;
+
+    public static final String DEFAULT_TARGET_STORETYPE = "PKCS12";
+
+    public enum ConfigKey
+    {
+        ENCODED_KEY("private_key"),
+        KEY_PASSWORD("private_key_password"),
+        ENCODED_CERTIFICATES("trusted_certificates");
+
+        final String keyName;
+
+        ConfigKey(String keyName)
+        {
+            this.keyName=keyName;
+        }
+
+        String getKeyName()
+        {
+            return keyName;
+        }
+
+        static Set<String> asSet()
+        {
+            Set<String> valueSet = new HashSet<>();
+            ConfigKey[] values = values();
+            for(ConfigKey key: values) {
+                valueSet.add(key.getKeyName().toLowerCase());
+            }
+            return valueSet;
+        }
+    }
+
+    public PEMBasedSslContextFactory()
+    {
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+    }
+
+    public PEMBasedSslContextFactory(Map<String, Object> parameters)
+    {
+        super(parameters);
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+        pemEncodedKey = getString(ConfigKey.ENCODED_KEY.getKeyName());
+        keyPassword = getString(ConfigKey.KEY_PASSWORD.getKeyName());
+        if (StringUtils.isEmpty(keyPassword))
+        {
+            keyPassword = keystore_password;
+        }
+        else if (!StringUtils.isEmpty(keystore_password) && !keyPassword.equals(keystore_password))
+        {
+            throw new IllegalArgumentException("'keystore_password' and 'key_password' both configurations are given and the " +
+                                   "values don't match");
+        }
+        else
+        {
+            logger.warn("'keystore_password' and 'key_password' both are configured but since the values match it's " +
+                        "okay. Ideally you should only specify one of them.");
+        }
+
+        if (!StringUtils.isEmpty(truststore_password))
+        {
+            logger.warn("PEM based truststore should not be using password. Ignoring the given value in " +
+                        "'truststore_password' configuration.");
+        }
+
+        pemEncodedCertificates = getString(ConfigKey.ENCODED_CERTIFICATES.getKeyName());
+
+        maybeFileBasedPrivateKey = StringUtils.isEmpty(pemEncodedKey);
+        maybeFileBasedTrustedCertificates = StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * Decides if this factory has a keystore defined - key material specified in files or inline to the configuration.
+     * @return {@code true} if there is a keystore defined; {@code false} otherwise
+     */
+    @Override
+    public boolean hasKeystore()
+    {
+        return maybeFileBasedPrivateKey ? (keystore != null && new File(keystore).exists()) :
+               !StringUtils.isEmpty(pemEncodedKey);
+    }
+
+    /**
+     * Decides if this factory has a truststore defined - key material specified in files or inline to the
+     * configuration.
+     * @return {@code true} if there is a truststore defined; {@code false} otherwise
+     */
+    private boolean hasTruststore()
+    {
+        return maybeFileBasedTrustedCertificates ? (truststore != null && new File(truststore).exists()) :
+               !StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * This enables 'hot' reloading of the key/trust stores based on the last updated timestamps if they are file based.
+     */
+    @Override
+    public synchronized void initHotReloading()
+    {
+        List<HotReloadableFile> fileList = new ArrayList<>();
+        if (maybeFileBasedPrivateKey && hasKeystore())
+        {
+            fileList.add(new HotReloadableFile(keystore));
+        }
+        if (maybeFileBasedTrustedCertificates && hasTruststore())
+        {
+            fileList.add(new HotReloadableFile(truststore));
+        }
+        if (!fileList.isEmpty())
+        {
+            hotReloadableFiles = fileList;
+        }
+    }
+
+    /**
+     * Builds required KeyManagerFactory from the PEM based keystore. It also checks for the PrivateKey's certificate's
+     * expiry and logs {@code warning} for each expired PrivateKey's certitificate.
+     *
+     * @return KeyManagerFactory built from the PEM based keystore.
+     * @throws SSLException if any issues encountered during the build process
+     */
+    @Override
+    protected KeyManagerFactory buildKeyManagerFactory() throws SSLException
+    {
+        try
+        {
+            if (hasKeystore())
+            {
+                if (maybeFileBasedPrivateKey)
+                {
+                    pemEncodedKey = readPEMFile(keystore); // read PEM from the file
+                }
+
+                KeyManagerFactory kmf = KeyManagerFactory.getInstance(
+                algorithm == null ? KeyManagerFactory.getDefaultAlgorithm() : algorithm);
+                KeyStore ks = buildKeyStore();
+                if (!checkedExpiry)
+                {
+                    checkExpiredCerts(ks);
+                    checkedExpiry = true;
+                }
+                kmf.init(ks, keyPassword != null ? keyPassword.toCharArray() : null);
+                return kmf;
+            }
+            else
+            {
+                throw new SSLException("Can't build KeyManagerFactory in absence of the Private Key");
+            }
+        } catch(Exception e) {
+            throw new SSLException("failed to build key manager store for secure connections", e);
+        }
+    }
+
+    /**
+     * Builds TrustManagerFactory from the PEM based truststore.
+     *
+     * @return TrustManagerFactory from the PEM based truststore
+     * @throws SSLException if any issues encountered during the build process
+     */
+    @Override
+    protected TrustManagerFactory buildTrustManagerFactory() throws SSLException
+    {
+        try
+        {
+            if (hasTruststore())
+            {
+                if (maybeFileBasedTrustedCertificates)
+                {
+                    pemEncodedCertificates = readPEMFile(truststore); // read PEM from the file
+                }
+
+                TrustManagerFactory tmf = TrustManagerFactory.getInstance(
+                algorithm == null ? TrustManagerFactory.getDefaultAlgorithm() : algorithm);
+                KeyStore ts = buildTrustStore();
+                tmf.init(ts);
+                return tmf;
+            }
+            else
+            {
+                throw new SSLException("Can't build KeyManagerFactory in absence of the trusted certificates");

Review comment:
       Can not

##########
File path: src/java/org/apache/cassandra/security/PEMBasedSslContextFactory.java
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SslContextFactory for the <a href="">PEM standard</a> encoded PKCS#8 private keys and X509 certificates/public-keys.
+ * It parses the key material based on the standard defined in the <a href="https://datatracker.ietf.org/doc/html/rfc7468">RFC 7468</a>.
+ * It creates <a href="https://datatracker.ietf.org/doc/html/rfc5208">PKCS# 8</a> based private key and X509 certificate(s)
+ * for the public key to build the required keystore and the truststore managers that are used for the SSL context creation.
+ * Internally it builds Java {@link KeyStore} with <a href="https://datatracker.ietf.org/doc/html/rfc7292">PKCS# 12</a> <a href="https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#keystore-types">store type</a>
+ * to be used for keystore and the truststore managers.
+ *
+ * This factory also supports 'hot reloading' of the key material, the same way as defined by {@link FileBasedSslContextFactory},
+ * if it is file based. This factory ignores the existing 'store_type' configuration used for other file based store
+ * types like JKS.
+ *
+ * You can configure this factory with either inline PEM data or with the files having the required PEM data as shown
+ * below,
+ *
+ * <b>Configuration: PEM keys/certs defined inline (mind the spaces in the YAML!)</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *         parameters:
+ *             private_key: |
+ *              -----BEGIN ENCRYPTED PRIVATE KEY----- OR -----BEGIN PRIVATE KEY-----
+ *              <your base64 encoded private key>
+ *              -----END ENCRYPTED PRIVATE KEY----- OR -----END PRIVATE KEY-----
+ *              -----BEGIN CERTIFICATE-----
+ *              <your base64 encoded certificate chain>
+ *              -----END CERTIFICATE-----
+ *
+ *             private_key_password: "<your password if the private key is encrypted with a password>"
+ *
+ *             trusted_certificates: |
+ *               -----BEGIN CERTIFICATE-----
+ *               <your base64 encoded certificate>
+ *               -----END CERTIFICATE-----
+ * </pre>
+ *
+ * <b>Configuration: PEM keys/certs defined in files</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *      keystore: <file path to the keystore file in the PEM format with the private key and the certificate chain>
+ *      keystore_password: "<your password if the private key is encrypted with a password>"
+ *      truststore: <file path to the truststore file in the PEM format>
+ * </pre>
+ *
+ */
+public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
+{
+    private static final Logger logger = LoggerFactory.getLogger(PEMBasedSslContextFactory.class);
+
+    private String targetStoreType;
+    private String pemEncodedKey;
+    private String keyPassword;
+    private String pemEncodedCertificates;
+    private boolean maybeFileBasedPrivateKey;
+    private boolean maybeFileBasedTrustedCertificates;
+
+    public static final String DEFAULT_TARGET_STORETYPE = "PKCS12";
+
+    public enum ConfigKey
+    {
+        ENCODED_KEY("private_key"),
+        KEY_PASSWORD("private_key_password"),
+        ENCODED_CERTIFICATES("trusted_certificates");
+
+        final String keyName;
+
+        ConfigKey(String keyName)
+        {
+            this.keyName=keyName;
+        }
+
+        String getKeyName()
+        {
+            return keyName;
+        }
+
+        static Set<String> asSet()
+        {
+            Set<String> valueSet = new HashSet<>();
+            ConfigKey[] values = values();
+            for(ConfigKey key: values) {
+                valueSet.add(key.getKeyName().toLowerCase());
+            }
+            return valueSet;
+        }
+    }
+
+    public PEMBasedSslContextFactory()
+    {
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+    }
+
+    public PEMBasedSslContextFactory(Map<String, Object> parameters)
+    {
+        super(parameters);
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+        pemEncodedKey = getString(ConfigKey.ENCODED_KEY.getKeyName());
+        keyPassword = getString(ConfigKey.KEY_PASSWORD.getKeyName());
+        if (StringUtils.isEmpty(keyPassword))
+        {
+            keyPassword = keystore_password;
+        }
+        else if (!StringUtils.isEmpty(keystore_password) && !keyPassword.equals(keystore_password))
+        {
+            throw new IllegalArgumentException("'keystore_password' and 'key_password' both configurations are given and the " +
+                                   "values don't match");
+        }
+        else
+        {
+            logger.warn("'keystore_password' and 'key_password' both are configured but since the values match it's " +
+                        "okay. Ideally you should only specify one of them.");
+        }
+
+        if (!StringUtils.isEmpty(truststore_password))
+        {
+            logger.warn("PEM based truststore should not be using password. Ignoring the given value in " +
+                        "'truststore_password' configuration.");
+        }
+
+        pemEncodedCertificates = getString(ConfigKey.ENCODED_CERTIFICATES.getKeyName());
+
+        maybeFileBasedPrivateKey = StringUtils.isEmpty(pemEncodedKey);
+        maybeFileBasedTrustedCertificates = StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * Decides if this factory has a keystore defined - key material specified in files or inline to the configuration.
+     * @return {@code true} if there is a keystore defined; {@code false} otherwise
+     */
+    @Override
+    public boolean hasKeystore()
+    {
+        return maybeFileBasedPrivateKey ? (keystore != null && new File(keystore).exists()) :
+               !StringUtils.isEmpty(pemEncodedKey);
+    }
+
+    /**
+     * Decides if this factory has a truststore defined - key material specified in files or inline to the
+     * configuration.
+     * @return {@code true} if there is a truststore defined; {@code false} otherwise
+     */
+    private boolean hasTruststore()
+    {
+        return maybeFileBasedTrustedCertificates ? (truststore != null && new File(truststore).exists()) :
+               !StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * This enables 'hot' reloading of the key/trust stores based on the last updated timestamps if they are file based.
+     */
+    @Override
+    public synchronized void initHotReloading()
+    {
+        List<HotReloadableFile> fileList = new ArrayList<>();
+        if (maybeFileBasedPrivateKey && hasKeystore())
+        {
+            fileList.add(new HotReloadableFile(keystore));
+        }
+        if (maybeFileBasedTrustedCertificates && hasTruststore())
+        {
+            fileList.add(new HotReloadableFile(truststore));
+        }
+        if (!fileList.isEmpty())
+        {
+            hotReloadableFiles = fileList;
+        }
+    }
+
+    /**
+     * Builds required KeyManagerFactory from the PEM based keystore. It also checks for the PrivateKey's certificate's
+     * expiry and logs {@code warning} for each expired PrivateKey's certitificate.
+     *
+     * @return KeyManagerFactory built from the PEM based keystore.
+     * @throws SSLException if any issues encountered during the build process
+     */
+    @Override
+    protected KeyManagerFactory buildKeyManagerFactory() throws SSLException
+    {
+        try
+        {
+            if (hasKeystore())
+            {
+                if (maybeFileBasedPrivateKey)
+                {
+                    pemEncodedKey = readPEMFile(keystore); // read PEM from the file
+                }
+
+                KeyManagerFactory kmf = KeyManagerFactory.getInstance(
+                algorithm == null ? KeyManagerFactory.getDefaultAlgorithm() : algorithm);
+                KeyStore ks = buildKeyStore();
+                if (!checkedExpiry)
+                {
+                    checkExpiredCerts(ks);
+                    checkedExpiry = true;
+                }
+                kmf.init(ks, keyPassword != null ? keyPassword.toCharArray() : null);
+                return kmf;
+            }
+            else
+            {
+                throw new SSLException("Can't build KeyManagerFactory in absence of the Private Key");
+            }
+        } catch(Exception e) {
+            throw new SSLException("failed to build key manager store for secure connections", e);
+        }
+    }
+
+    /**
+     * Builds TrustManagerFactory from the PEM based truststore.
+     *
+     * @return TrustManagerFactory from the PEM based truststore
+     * @throws SSLException if any issues encountered during the build process
+     */
+    @Override
+    protected TrustManagerFactory buildTrustManagerFactory() throws SSLException
+    {
+        try
+        {
+            if (hasTruststore())
+            {
+                if (maybeFileBasedTrustedCertificates)
+                {
+                    pemEncodedCertificates = readPEMFile(truststore); // read PEM from the file
+                }
+
+                TrustManagerFactory tmf = TrustManagerFactory.getInstance(
+                algorithm == null ? TrustManagerFactory.getDefaultAlgorithm() : algorithm);
+                KeyStore ts = buildTrustStore();
+                tmf.init(ts);
+                return tmf;
+            }
+            else
+            {
+                throw new SSLException("Can't build KeyManagerFactory in absence of the trusted certificates");
+            }
+        } catch(Exception e) {
+            throw new SSLException("failed to build trust manager store for secure connections", e);
+        }
+    }
+
+    private String readPEMFile(String file) throws IOException
+    {
+        StringBuilder fileData = new StringBuilder();
+        try(BufferedReader br = new BufferedReader(new FileReader(file)))

Review comment:
       space

##########
File path: test/conf/cassandra-pem-jks-sslcontextfactory.yaml
##########
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Testing for pluggable ssl_context_factory option for client and server encryption options with a valid and a missing
+# implementation classes.
+#
+cluster_name: Test Cluster
+# memtable_allocation_type: heap_buffers
+memtable_allocation_type: offheap_objects
+commitlog_sync: batch
+commitlog_sync_batch_window_in_ms: 1.0
+commitlog_segment_size_in_mb: 5
+commitlog_directory: build/test/cassandra/commitlog
+# commitlog_compression:
+# - class_name: LZ4Compressor
+cdc_raw_directory: build/test/cassandra/cdc_raw
+cdc_enabled: false
+hints_directory: build/test/cassandra/hints
+partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner
+listen_address: 127.0.0.1
+storage_port: 7012
+ssl_storage_port: 17012
+start_native_transport: true
+native_transport_port: 9042
+column_index_size_in_kb: 4
+saved_caches_directory: build/test/cassandra/saved_caches
+data_file_directories:
+    - build/test/cassandra/data
+disk_access_mode: mmap
+seed_provider:
+    - class_name: org.apache.cassandra.locator.SimpleSeedProvider
+      parameters:
+          - seeds: "127.0.0.1:7012"
+endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
+dynamic_snitch: true
+client_encryption_options:
+    ssl_context_factory:
+        class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+        parameters:
+          private_key: |
+            -----BEGIN ENCRYPTED PRIVATE KEY-----

Review comment:
       what is in these keys? what is their content, how they look like in plain text? some dump would help, for debugging purposes.

##########
File path: src/java/org/apache/cassandra/security/PEMBasedSslContextFactory.java
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.security;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * SslContextFactory for the <a href="">PEM standard</a> encoded PKCS#8 private keys and X509 certificates/public-keys.
+ * It parses the key material based on the standard defined in the <a href="https://datatracker.ietf.org/doc/html/rfc7468">RFC 7468</a>.
+ * It creates <a href="https://datatracker.ietf.org/doc/html/rfc5208">PKCS# 8</a> based private key and X509 certificate(s)
+ * for the public key to build the required keystore and the truststore managers that are used for the SSL context creation.
+ * Internally it builds Java {@link KeyStore} with <a href="https://datatracker.ietf.org/doc/html/rfc7292">PKCS# 12</a> <a href="https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#keystore-types">store type</a>
+ * to be used for keystore and the truststore managers.
+ *
+ * This factory also supports 'hot reloading' of the key material, the same way as defined by {@link FileBasedSslContextFactory},
+ * if it is file based. This factory ignores the existing 'store_type' configuration used for other file based store
+ * types like JKS.
+ *
+ * You can configure this factory with either inline PEM data or with the files having the required PEM data as shown
+ * below,
+ *
+ * <b>Configuration: PEM keys/certs defined inline (mind the spaces in the YAML!)</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *         parameters:
+ *             private_key: |
+ *              -----BEGIN ENCRYPTED PRIVATE KEY----- OR -----BEGIN PRIVATE KEY-----
+ *              <your base64 encoded private key>
+ *              -----END ENCRYPTED PRIVATE KEY----- OR -----END PRIVATE KEY-----
+ *              -----BEGIN CERTIFICATE-----
+ *              <your base64 encoded certificate chain>
+ *              -----END CERTIFICATE-----
+ *
+ *             private_key_password: "<your password if the private key is encrypted with a password>"
+ *
+ *             trusted_certificates: |
+ *               -----BEGIN CERTIFICATE-----
+ *               <your base64 encoded certificate>
+ *               -----END CERTIFICATE-----
+ * </pre>
+ *
+ * <b>Configuration: PEM keys/certs defined in files</b>
+ * <pre>
+ *     client/server_encryption_options:
+ *      ssl_context_factory:
+ *         class_name: org.apache.cassandra.security.PEMBasedSslContextFactory
+ *      keystore: <file path to the keystore file in the PEM format with the private key and the certificate chain>
+ *      keystore_password: "<your password if the private key is encrypted with a password>"
+ *      truststore: <file path to the truststore file in the PEM format>
+ * </pre>
+ *
+ */
+public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
+{
+    private static final Logger logger = LoggerFactory.getLogger(PEMBasedSslContextFactory.class);
+
+    private String targetStoreType;
+    private String pemEncodedKey;
+    private String keyPassword;
+    private String pemEncodedCertificates;
+    private boolean maybeFileBasedPrivateKey;
+    private boolean maybeFileBasedTrustedCertificates;
+
+    public static final String DEFAULT_TARGET_STORETYPE = "PKCS12";
+
+    public enum ConfigKey
+    {
+        ENCODED_KEY("private_key"),
+        KEY_PASSWORD("private_key_password"),
+        ENCODED_CERTIFICATES("trusted_certificates");
+
+        final String keyName;
+
+        ConfigKey(String keyName)
+        {
+            this.keyName=keyName;
+        }
+
+        String getKeyName()
+        {
+            return keyName;
+        }
+
+        static Set<String> asSet()
+        {
+            Set<String> valueSet = new HashSet<>();
+            ConfigKey[] values = values();
+            for(ConfigKey key: values) {
+                valueSet.add(key.getKeyName().toLowerCase());
+            }
+            return valueSet;
+        }
+    }
+
+    public PEMBasedSslContextFactory()
+    {
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+    }
+
+    public PEMBasedSslContextFactory(Map<String, Object> parameters)
+    {
+        super(parameters);
+        targetStoreType = DEFAULT_TARGET_STORETYPE;
+        pemEncodedKey = getString(ConfigKey.ENCODED_KEY.getKeyName());
+        keyPassword = getString(ConfigKey.KEY_PASSWORD.getKeyName());
+        if (StringUtils.isEmpty(keyPassword))
+        {
+            keyPassword = keystore_password;
+        }
+        else if (!StringUtils.isEmpty(keystore_password) && !keyPassword.equals(keystore_password))
+        {
+            throw new IllegalArgumentException("'keystore_password' and 'key_password' both configurations are given and the " +
+                                   "values don't match");
+        }
+        else
+        {
+            logger.warn("'keystore_password' and 'key_password' both are configured but since the values match it's " +
+                        "okay. Ideally you should only specify one of them.");
+        }
+
+        if (!StringUtils.isEmpty(truststore_password))
+        {
+            logger.warn("PEM based truststore should not be using password. Ignoring the given value in " +
+                        "'truststore_password' configuration.");
+        }
+
+        pemEncodedCertificates = getString(ConfigKey.ENCODED_CERTIFICATES.getKeyName());
+
+        maybeFileBasedPrivateKey = StringUtils.isEmpty(pemEncodedKey);
+        maybeFileBasedTrustedCertificates = StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * Decides if this factory has a keystore defined - key material specified in files or inline to the configuration.
+     * @return {@code true} if there is a keystore defined; {@code false} otherwise
+     */
+    @Override
+    public boolean hasKeystore()
+    {
+        return maybeFileBasedPrivateKey ? (keystore != null && new File(keystore).exists()) :
+               !StringUtils.isEmpty(pemEncodedKey);
+    }
+
+    /**
+     * Decides if this factory has a truststore defined - key material specified in files or inline to the
+     * configuration.
+     * @return {@code true} if there is a truststore defined; {@code false} otherwise
+     */
+    private boolean hasTruststore()
+    {
+        return maybeFileBasedTrustedCertificates ? (truststore != null && new File(truststore).exists()) :
+               !StringUtils.isEmpty(pemEncodedCertificates);
+    }
+
+    /**
+     * This enables 'hot' reloading of the key/trust stores based on the last updated timestamps if they are file based.
+     */
+    @Override
+    public synchronized void initHotReloading()
+    {
+        List<HotReloadableFile> fileList = new ArrayList<>();
+        if (maybeFileBasedPrivateKey && hasKeystore())
+        {
+            fileList.add(new HotReloadableFile(keystore));
+        }
+        if (maybeFileBasedTrustedCertificates && hasTruststore())
+        {
+            fileList.add(new HotReloadableFile(truststore));
+        }
+        if (!fileList.isEmpty())
+        {
+            hotReloadableFiles = fileList;
+        }
+    }
+
+    /**
+     * Builds required KeyManagerFactory from the PEM based keystore. It also checks for the PrivateKey's certificate's
+     * expiry and logs {@code warning} for each expired PrivateKey's certitificate.
+     *
+     * @return KeyManagerFactory built from the PEM based keystore.
+     * @throws SSLException if any issues encountered during the build process
+     */
+    @Override
+    protected KeyManagerFactory buildKeyManagerFactory() throws SSLException
+    {
+        try
+        {
+            if (hasKeystore())
+            {
+                if (maybeFileBasedPrivateKey)
+                {
+                    pemEncodedKey = readPEMFile(keystore); // read PEM from the file
+                }
+
+                KeyManagerFactory kmf = KeyManagerFactory.getInstance(
+                algorithm == null ? KeyManagerFactory.getDefaultAlgorithm() : algorithm);
+                KeyStore ks = buildKeyStore();
+                if (!checkedExpiry)
+                {
+                    checkExpiredCerts(ks);
+                    checkedExpiry = true;
+                }
+                kmf.init(ks, keyPassword != null ? keyPassword.toCharArray() : null);
+                return kmf;
+            }
+            else
+            {
+                throw new SSLException("Can't build KeyManagerFactory in absence of the Private Key");
+            }
+        } catch(Exception e) {
+            throw new SSLException("failed to build key manager store for secure connections", e);
+        }
+    }
+
+    /**
+     * Builds TrustManagerFactory from the PEM based truststore.
+     *
+     * @return TrustManagerFactory from the PEM based truststore
+     * @throws SSLException if any issues encountered during the build process
+     */
+    @Override
+    protected TrustManagerFactory buildTrustManagerFactory() throws SSLException
+    {
+        try
+        {
+            if (hasTruststore())
+            {
+                if (maybeFileBasedTrustedCertificates)
+                {
+                    pemEncodedCertificates = readPEMFile(truststore); // read PEM from the file
+                }
+
+                TrustManagerFactory tmf = TrustManagerFactory.getInstance(
+                algorithm == null ? TrustManagerFactory.getDefaultAlgorithm() : algorithm);
+                KeyStore ts = buildTrustStore();
+                tmf.init(ts);
+                return tmf;
+            }
+            else
+            {
+                throw new SSLException("Can't build KeyManagerFactory in absence of the trusted certificates");
+            }
+        } catch(Exception e) {
+            throw new SSLException("failed to build trust manager store for secure connections", e);
+        }
+    }
+
+    private String readPEMFile(String file) throws IOException
+    {
+        StringBuilder fileData = new StringBuilder();
+        try(BufferedReader br = new BufferedReader(new FileReader(file)))
+        {
+            String line = null;
+            while( (line = br.readLine()) != null)

Review comment:
       `while ((line = br.readLine()) != null)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org