You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yc...@apache.org on 2022/05/24 20:13:42 UTC

[cassandra] branch trunk updated: Adding support for TLS client authentication for internode communication

This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ed3901823a Adding support for TLS client authentication for internode communication
ed3901823a is described below

commit ed3901823a5fe9f8838d8b592a1b7703b12e810b
Author: Jyothsna Konisa <jk...@apple.com>
AuthorDate: Tue May 24 10:21:16 2022 -0700

    Adding support for TLS client authentication for internode communication
    
    patch by Jyothsna Konisa; reviewed by Bernardo Botella, Francisco Guerrero, Jon Meredith, Maulin Vasavada, Yifan Cai for CASSANDRA-17513
---
 CHANGES.txt                                        |   1 +
 conf/cassandra.yaml                                |   6 +
 .../apache/cassandra/config/EncryptionOptions.java | 112 +++++++++++----
 .../cassandra/net/OutboundConnectionSettings.java  |   8 +-
 .../security/AbstractSslContextFactory.java        |  11 +-
 .../security/DisableSslContextFactory.java         |  12 ++
 .../security/FileBasedSslContextFactory.java       | 125 ++++++++++------
 .../cassandra/security/ISslContextFactory.java     |  10 ++
 .../security/PEMBasedSslContextFactory.java        | 160 ++++++++++++++-------
 test/conf/cassandra_ssl_test_outbound.keystore     | Bin 0 -> 2286 bytes
 .../cassandra/config/EncryptionOptionsTest.java    |   2 +-
 .../security/DefaultSslContextFactoryTest.java     |  86 +++++++++--
 .../security/PEMBasedSslContextFactoryTest.java    |  45 ++++--
 .../apache/cassandra/security/SSLFactoryTest.java  |  98 ++++++++++++-
 14 files changed, 513 insertions(+), 163 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index f0041514bb..0f64ce2612 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.2
+ * Adding support for TLS client authentication for internode communication (CASSANDRA-17513)
  * Add new CQL function maxWritetime (CASSANDRA-17425)
  * Add guardrail for ALTER TABLE ADD / DROP / REMOVE column operations (CASSANDRA-17495)
  * Rename DisableFlag class to EnableFlag on guardrails (CASSANDRA-17544)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index d11e5e64bd..409110a024 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1316,6 +1316,12 @@ server_encryption_options:
   # Set to a valid keystore if internode_encryption is dc, rack or all
   keystore: conf/.keystore
   keystore_password: cassandra
+  # During internode mTLS authentication, inbound connections (acting as servers) use keystore, keystore_password
+  # containing server certificate to create SSLContext and
+  # outbound connections (acting as clients) use outbound_keystore & outbound_keystore_password with client certificates
+  # to create SSLContext. By default, outbound_keystore is the same as keystore indicating mTLS is not enabled.
+#  outbound_keystore: conf/.keystore
+#  outbound_keystore_password: cassandra
   # Verify peer server certificates
   require_client_auth: false
   # Set to a valid trustore if require_client_auth is true
diff --git a/src/java/org/apache/cassandra/config/EncryptionOptions.java b/src/java/org/apache/cassandra/config/EncryptionOptions.java
index eb6724f96d..0ab653f088 100644
--- a/src/java/org/apache/cassandra/config/EncryptionOptions.java
+++ b/src/java/org/apache/cassandra/config/EncryptionOptions.java
@@ -25,20 +25,14 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLException;
-import javax.net.ssl.TrustManagerFactory;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.annotation.JsonIgnore;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.security.AbstractSslContextFactory;
 import org.apache.cassandra.security.DisableSslContextFactory;
 import org.apache.cassandra.security.ISslContextFactory;
 import org.apache.cassandra.utils.FBUtilities;
@@ -111,6 +105,8 @@ public class EncryptionOptions
     {
         KEYSTORE("keystore"),
         KEYSTORE_PASSWORD("keystore_password"),
+        OUTBOUND_KEYSTORE("outbound_keystore"),
+        OUTBOUND_KEYSTORE_PASSWORD("outbound_keystore_password"),
         TRUSTSTORE("truststore"),
         TRUSTSTORE_PASSWORD("truststore_password"),
         CIPHER_SUITES("cipher_suites"),
@@ -263,11 +259,8 @@ public class EncryptionOptions
         }
     }
 
-    private void initializeSslContextFactory()
+    protected void fillSslContextParams(Map<String, Object> sslContextFactoryParameters)
     {
-        Map<String,Object> sslContextFactoryParameters = new HashMap<>();
-        prepareSslContextFactoryParameterizedKeys(sslContextFactoryParameters);
-
         /*
          * Copy all configs to the Map to pass it on to the ISslContextFactory's implementation
          */
@@ -284,6 +277,13 @@ public class EncryptionOptions
         putSslContextFactoryParameter(sslContextFactoryParameters, ConfigKey.REQUIRE_ENDPOINT_VERIFICATION, this.require_endpoint_verification);
         putSslContextFactoryParameter(sslContextFactoryParameters, ConfigKey.ENABLED, this.enabled);
         putSslContextFactoryParameter(sslContextFactoryParameters, ConfigKey.OPTIONAL, this.optional);
+    }
+
+    private void initializeSslContextFactory()
+    {
+        Map<String, Object> sslContextFactoryParameters = new HashMap<>();
+        prepareSslContextFactoryParameterizedKeys(sslContextFactoryParameters);
+        fillSslContextParams(sslContextFactoryParameters);
 
         if (CassandraRelevantProperties.TEST_JVM_DTEST_DISABLE_SSL.getBoolean())
         {
@@ -296,8 +296,7 @@ public class EncryptionOptions
         }
     }
 
-    private void putSslContextFactoryParameter(Map<String,Object> existingParameters, ConfigKey configKey,
-                                               Object value)
+    protected static void putSslContextFactoryParameter(Map<String, Object> existingParameters, ConfigKey configKey, Object value)
     {
         if (value != null) {
             existingParameters.put(configKey.getKeyName(), value);
@@ -608,15 +607,20 @@ public class EncryptionOptions
         public final InternodeEncryption internode_encryption;
         @Replaces(oldName = "enable_legacy_ssl_storage_port", deprecated = true)
         public final boolean legacy_ssl_storage_port_enabled;
+        public final String outbound_keystore;
+        public final String outbound_keystore_password;
 
         public ServerEncryptionOptions()
         {
             this.internode_encryption = InternodeEncryption.none;
             this.legacy_ssl_storage_port_enabled = false;
+            this.outbound_keystore = null;
+            this.outbound_keystore_password = null;
         }
 
         public ServerEncryptionOptions(ParameterizedClass sslContextFactoryClass, String keystore,
-                                       String keystore_password, String truststore, String truststore_password,
+                                       String keystore_password,String outbound_keystore,
+                                       String outbound_keystore_password, String truststore, String truststore_password,
                                        List<String> cipher_suites, String protocol, List<String> accepted_protocols,
                                        String algorithm, String store_type, boolean require_client_auth,
                                        boolean require_endpoint_verification, Boolean optional,
@@ -627,6 +631,8 @@ public class EncryptionOptions
             null, optional);
             this.internode_encryption = internode_encryption;
             this.legacy_ssl_storage_port_enabled = legacy_ssl_storage_port_enabled;
+            this.outbound_keystore = outbound_keystore;
+            this.outbound_keystore_password = outbound_keystore_password;
         }
 
         public ServerEncryptionOptions(ServerEncryptionOptions options)
@@ -634,6 +640,16 @@ public class EncryptionOptions
             super(options);
             this.internode_encryption = options.internode_encryption;
             this.legacy_ssl_storage_port_enabled = options.legacy_ssl_storage_port_enabled;
+            this.outbound_keystore = options.outbound_keystore;
+            this.outbound_keystore_password = options.outbound_keystore_password;
+        }
+
+        @Override
+        protected void fillSslContextParams(Map<String, Object> sslContextFactoryParameters)
+        {
+            super.fillSslContextParams(sslContextFactoryParameters);
+            putSslContextFactoryParameter(sslContextFactoryParameters, ConfigKey.OUTBOUND_KEYSTORE, this.outbound_keystore);
+            putSslContextFactoryParameter(sslContextFactoryParameters, ConfigKey.OUTBOUND_KEYSTORE_PASSWORD, this.outbound_keystore_password);
         }
 
         @Override
@@ -697,7 +713,6 @@ public class EncryptionOptions
          * values of "dc" and "all". This method returns the explicit, raw value of {@link #optional}
          * as set by the user (if set at all).
          */
-        @JsonIgnore
         public boolean isExplicitlyOptional()
         {
             return optional != null && optional;
@@ -705,7 +720,8 @@ public class EncryptionOptions
 
         public ServerEncryptionOptions withSslContextFactory(ParameterizedClass sslContextFactoryClass)
         {
-            return new ServerEncryptionOptions(sslContextFactoryClass, keystore, keystore_password, truststore,
+            return new ServerEncryptionOptions(sslContextFactoryClass, keystore, keystore_password,
+                                               outbound_keystore, outbound_keystore_password, truststore,
                                                truststore_password, cipher_suites, protocol, accepted_protocols,
                                                algorithm, store_type, require_client_auth,
                                                require_endpoint_verification, optional, internode_encryption,
@@ -714,7 +730,8 @@ public class EncryptionOptions
 
         public ServerEncryptionOptions withKeyStore(String keystore)
         {
-            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password, truststore,
+            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password,
+                                               outbound_keystore, outbound_keystore_password, truststore,
                                                truststore_password, cipher_suites, protocol, accepted_protocols,
                                                algorithm, store_type, require_client_auth,
                                                require_endpoint_verification, optional, internode_encryption,
@@ -723,7 +740,8 @@ public class EncryptionOptions
 
         public ServerEncryptionOptions withKeyStorePassword(String keystore_password)
         {
-            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password, truststore,
+            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password,
+                                               outbound_keystore, outbound_keystore_password, truststore,
                                                truststore_password, cipher_suites, protocol, accepted_protocols,
                                                algorithm, store_type, require_client_auth,
                                                require_endpoint_verification, optional, internode_encryption,
@@ -732,7 +750,8 @@ public class EncryptionOptions
 
         public ServerEncryptionOptions withTrustStore(String truststore)
         {
-            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password, truststore,
+            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password,
+                                               outbound_keystore, outbound_keystore_password, truststore,
                                                truststore_password, cipher_suites, protocol, accepted_protocols,
                                                algorithm, store_type, require_client_auth,
                                                require_endpoint_verification, optional, internode_encryption,
@@ -741,7 +760,8 @@ public class EncryptionOptions
 
         public ServerEncryptionOptions withTrustStorePassword(String truststore_password)
         {
-            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password, truststore,
+            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password,
+                                               outbound_keystore, outbound_keystore_password, truststore,
                                                truststore_password, cipher_suites, protocol, accepted_protocols,
                                                algorithm, store_type, require_client_auth,
                                                require_endpoint_verification, optional, internode_encryption,
@@ -750,16 +770,18 @@ public class EncryptionOptions
 
         public ServerEncryptionOptions withCipherSuites(List<String> cipher_suites)
         {
-            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password, truststore,
+            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password,
+                                               outbound_keystore, outbound_keystore_password, truststore,
                                                truststore_password, cipher_suites, protocol, accepted_protocols,
                                                algorithm, store_type, require_client_auth,
                                                require_endpoint_verification, optional, internode_encryption,
                                                legacy_ssl_storage_port_enabled).applyConfigInternal();
         }
 
-        public ServerEncryptionOptions withCipherSuites(String ... cipher_suites)
+        public ServerEncryptionOptions withCipherSuites(String... cipher_suites)
         {
-            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password, truststore,
+            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password,
+                                               outbound_keystore, outbound_keystore_password, truststore,
                                                truststore_password, Arrays.asList(cipher_suites), protocol,
                                                accepted_protocols, algorithm, store_type, require_client_auth,
                                                require_endpoint_verification, optional, internode_encryption,
@@ -768,7 +790,8 @@ public class EncryptionOptions
 
         public ServerEncryptionOptions withProtocol(String protocol)
         {
-            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password, truststore,
+            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password,
+                                               outbound_keystore, outbound_keystore_password, truststore,
                                                truststore_password, cipher_suites, protocol, accepted_protocols,
                                                algorithm, store_type, require_client_auth,
                                                require_endpoint_verification, optional, internode_encryption,
@@ -777,7 +800,8 @@ public class EncryptionOptions
 
         public ServerEncryptionOptions withAcceptedProtocols(List<String> accepted_protocols)
         {
-            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password, truststore,
+            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password,
+                                               outbound_keystore, outbound_keystore_password, truststore,
                                                truststore_password, cipher_suites, protocol, accepted_protocols,
                                                algorithm, store_type, require_client_auth,
                                                require_endpoint_verification, optional, internode_encryption,
@@ -786,7 +810,8 @@ public class EncryptionOptions
 
         public ServerEncryptionOptions withAlgorithm(String algorithm)
         {
-            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password, truststore,
+            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password,
+                                               outbound_keystore, outbound_keystore_password, truststore,
                                                truststore_password, cipher_suites, protocol, accepted_protocols,
                                                algorithm, store_type, require_client_auth,
                                                require_endpoint_verification, optional, internode_encryption,
@@ -795,7 +820,8 @@ public class EncryptionOptions
 
         public ServerEncryptionOptions withStoreType(String store_type)
         {
-            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password, truststore,
+            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password,
+                                               outbound_keystore, outbound_keystore_password, truststore,
                                                truststore_password, cipher_suites, protocol, accepted_protocols,
                                                algorithm, store_type, require_client_auth,
                                                require_endpoint_verification, optional, internode_encryption,
@@ -804,7 +830,8 @@ public class EncryptionOptions
 
         public ServerEncryptionOptions withRequireClientAuth(boolean require_client_auth)
         {
-            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password, truststore,
+            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password,
+                                               outbound_keystore, outbound_keystore_password, truststore,
                                                truststore_password, cipher_suites, protocol, accepted_protocols,
                                                algorithm, store_type, require_client_auth,
                                                require_endpoint_verification, optional, internode_encryption,
@@ -813,7 +840,8 @@ public class EncryptionOptions
 
         public ServerEncryptionOptions withRequireEndpointVerification(boolean require_endpoint_verification)
         {
-            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password, truststore,
+            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password,
+                                               outbound_keystore, outbound_keystore_password, truststore,
                                                truststore_password, cipher_suites, protocol, accepted_protocols,
                                                algorithm, store_type, require_client_auth,
                                                require_endpoint_verification, optional, internode_encryption,
@@ -822,7 +850,8 @@ public class EncryptionOptions
 
         public ServerEncryptionOptions withOptional(boolean optional)
         {
-            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password, truststore,
+            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password,
+                                               outbound_keystore, outbound_keystore_password, truststore,
                                                truststore_password, cipher_suites, protocol, accepted_protocols,
                                                algorithm, store_type, require_client_auth,
                                                require_endpoint_verification, optional, internode_encryption,
@@ -831,7 +860,8 @@ public class EncryptionOptions
 
         public ServerEncryptionOptions withInternodeEncryption(InternodeEncryption internode_encryption)
         {
-            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password, truststore,
+            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password,
+                                               outbound_keystore, outbound_keystore_password, truststore,
                                                truststore_password, cipher_suites, protocol, accepted_protocols,
                                                algorithm, store_type, require_client_auth,
                                                require_endpoint_verification, optional, internode_encryption,
@@ -840,12 +870,32 @@ public class EncryptionOptions
 
         public ServerEncryptionOptions withLegacySslStoragePort(boolean enable_legacy_ssl_storage_port)
         {
-            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password, truststore,
+            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password,
+                                               outbound_keystore, outbound_keystore_password, truststore,
                                                truststore_password, cipher_suites, protocol, accepted_protocols,
                                                algorithm, store_type, require_client_auth,
                                                require_endpoint_verification, optional, internode_encryption,
                                                enable_legacy_ssl_storage_port).applyConfigInternal();
         }
 
+        public ServerEncryptionOptions withOutboundKeystore(String outboundKeystore)
+        {
+            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password,
+                                               outboundKeystore, outbound_keystore_password, truststore,
+                                               truststore_password, cipher_suites, protocol, accepted_protocols,
+                                               algorithm, store_type, require_client_auth,
+                                               require_endpoint_verification, optional, internode_encryption,
+                                               legacy_ssl_storage_port_enabled).applyConfigInternal();
+        }
+
+        public ServerEncryptionOptions withOutboundKeystorePassword(String outboundKeystorePassword)
+        {
+            return new ServerEncryptionOptions(ssl_context_factory, keystore, keystore_password,
+                                               outbound_keystore, outboundKeystorePassword, truststore,
+                                               truststore_password, cipher_suites, protocol, accepted_protocols,
+                                               algorithm, store_type, require_client_auth,
+                                               require_endpoint_verification, optional, internode_encryption,
+                                               legacy_ssl_storage_port_enabled).applyConfigInternal();
+        }
     }
 }
diff --git a/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java b/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java
index 599e7178b5..db2873d934 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java
@@ -82,7 +82,7 @@ public class OutboundConnectionSettings
     public final IInternodeAuthenticator authenticator;
     public final InetAddressAndPort to;
     public final InetAddressAndPort connectTo; // may be represented by a different IP address on this node's local network
-    public final EncryptionOptions encryption;
+    public final ServerEncryptionOptions encryption;
     public final Framing framing;
     public final Integer socketSendBufferSizeInBytes;
     public final Integer applicationSendQueueCapacityInBytes;
@@ -112,7 +112,7 @@ public class OutboundConnectionSettings
     private OutboundConnectionSettings(IInternodeAuthenticator authenticator,
                                        InetAddressAndPort to,
                                        InetAddressAndPort connectTo,
-                                       EncryptionOptions encryption,
+                                       ServerEncryptionOptions encryption,
                                        Framing framing,
                                        Integer socketSendBufferSizeInBytes,
                                        Integer applicationSendQueueCapacityInBytes,
@@ -365,7 +365,7 @@ public class OutboundConnectionSettings
         return debug != null ? debug : OutboundDebugCallbacks.NONE;
     }
 
-    public EncryptionOptions encryption()
+    public ServerEncryptionOptions encryption()
     {
         return encryption != null ? encryption : defaultEncryptionOptions(to);
     }
@@ -499,7 +499,7 @@ public class OutboundConnectionSettings
     }
 
     @VisibleForTesting
-    static EncryptionOptions defaultEncryptionOptions(InetAddressAndPort endpoint)
+    static ServerEncryptionOptions defaultEncryptionOptions(InetAddressAndPort endpoint)
     {
         ServerEncryptionOptions options = DatabaseDescriptor.getInternodeMessagingEncyptionOptions();
         return options.shouldEncrypt(endpoint) ? options : null;
diff --git a/src/java/org/apache/cassandra/security/AbstractSslContextFactory.java b/src/java/org/apache/cassandra/security/AbstractSslContextFactory.java
index c2ef851bfc..e4f868f9b1 100644
--- a/src/java/org/apache/cassandra/security/AbstractSslContextFactory.java
+++ b/src/java/org/apache/cassandra/security/AbstractSslContextFactory.java
@@ -178,15 +178,16 @@ abstract public class AbstractSslContextFactory implements ISslContextFactory
             key file in PEM format (see {@link SslContextBuilder#forServer(File, File, String)}). However, we are
             not supporting that now to keep the config/yaml API simple.
          */
-        KeyManagerFactory kmf = buildKeyManagerFactory();
         SslContextBuilder builder;
         if (socketType == SocketType.SERVER)
         {
+            KeyManagerFactory kmf = buildKeyManagerFactory();
             builder = SslContextBuilder.forServer(kmf).clientAuth(this.require_client_auth ? ClientAuth.REQUIRE :
                                                                   ClientAuth.NONE);
         }
         else
         {
+            KeyManagerFactory kmf = buildOutboundKeyManagerFactory();
             builder = SslContextBuilder.forClient().keyManager(kmf);
         }
 
@@ -263,4 +264,12 @@ abstract public class AbstractSslContextFactory implements ISslContextFactory
     abstract protected KeyManagerFactory buildKeyManagerFactory() throws SSLException;
 
     abstract protected TrustManagerFactory buildTrustManagerFactory() throws SSLException;
+
+    /**
+     * Create a {@code KeyManagerFactory} for outbound connections.
+     * It provides a seperate keystore for internode mTLS outbound connections.
+     * @return {@code KeyManagerFactory}
+     * @throws SSLException
+     */
+    abstract protected KeyManagerFactory buildOutboundKeyManagerFactory() throws SSLException;
 }
diff --git a/src/java/org/apache/cassandra/security/DisableSslContextFactory.java b/src/java/org/apache/cassandra/security/DisableSslContextFactory.java
index 9dab062f0b..8058d0aba6 100644
--- a/src/java/org/apache/cassandra/security/DisableSslContextFactory.java
+++ b/src/java/org/apache/cassandra/security/DisableSslContextFactory.java
@@ -36,12 +36,24 @@ public class DisableSslContextFactory extends AbstractSslContextFactory
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    protected KeyManagerFactory buildOutboundKeyManagerFactory() throws SSLException
+    {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public boolean hasKeystore()
     {
         return false;
     }
 
+    @Override
+    public boolean hasOutboundKeystore()
+    {
+        return false;
+    }
+
     @Override
     public void initHotReloading() throws SSLException
     {
diff --git a/src/java/org/apache/cassandra/security/FileBasedSslContextFactory.java b/src/java/org/apache/cassandra/security/FileBasedSslContextFactory.java
index 3d47509fd3..5b3ca124ff 100644
--- a/src/java/org/apache/cassandra/security/FileBasedSslContextFactory.java
+++ b/src/java/org/apache/cassandra/security/FileBasedSslContextFactory.java
@@ -33,7 +33,7 @@ import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLException;
 import javax.net.ssl.TrustManagerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,38 +47,32 @@ import org.apache.cassandra.utils.Clock;
  * {@code CAUTION:} While this is a useful abstraction, please be careful if you need to modify this class
  * given possible custom implementations out there!
  */
-abstract public class FileBasedSslContextFactory extends AbstractSslContextFactory
+public abstract class FileBasedSslContextFactory extends AbstractSslContextFactory
 {
     private static final Logger logger = LoggerFactory.getLogger(FileBasedSslContextFactory.class);
-
-    @VisibleForTesting
-    protected volatile boolean checkedExpiry = false;
+    protected FileBasedStoreContext keystoreContext;
+    protected FileBasedStoreContext outboundKeystoreContext;
+    protected FileBasedStoreContext trustStoreContext;
 
     /**
      * List of files that trigger hot reloading of SSL certificates
      */
     protected volatile List<HotReloadableFile> hotReloadableFiles = new ArrayList<>();
 
-    protected String keystore;
-    protected String keystore_password;
-    protected String truststore;
-    protected String truststore_password;
-
     public FileBasedSslContextFactory()
     {
-        keystore = "conf/.keystore";
-        keystore_password = "cassandra";
-        truststore = "conf/.truststore";
-        truststore_password = "cassandra";
+        keystoreContext = new FileBasedStoreContext("conf/.keystore", "cassandra");
+        outboundKeystoreContext = new FileBasedStoreContext("conf/.keystore", "cassandra");
+        trustStoreContext = new FileBasedStoreContext("conf/.truststore", "cassandra");
     }
 
     public FileBasedSslContextFactory(Map<String, Object> parameters)
     {
         super(parameters);
-        keystore = getString("keystore");
-        keystore_password = getString("keystore_password");
-        truststore = getString("truststore");
-        truststore_password = getString("truststore_password");
+        keystoreContext = new FileBasedStoreContext(getString("keystore"), getString("keystore_password"));
+        outboundKeystoreContext = new FileBasedStoreContext(StringUtils.defaultString(getString("outbound_keystore"), keystoreContext.filePath),
+                                                            StringUtils.defaultString(getString("outbound_keystore_password"), keystoreContext.password));
+        trustStoreContext = new FileBasedStoreContext(getString("truststore"), getString("truststore_password"));
     }
 
     @Override
@@ -90,30 +84,41 @@ abstract public class FileBasedSslContextFactory extends AbstractSslContextFacto
     @Override
     public boolean hasKeystore()
     {
-        return keystore != null && new File(keystore).exists();
+        return keystoreContext.hasKeystore();
+    }
+
+    @Override
+    public boolean hasOutboundKeystore()
+    {
+        return outboundKeystoreContext.hasKeystore();
     }
 
     private boolean hasTruststore()
     {
-        return truststore != null && new File(truststore).exists();
+        return trustStoreContext.filePath != null && new File(trustStoreContext.filePath).exists();
     }
 
     @Override
     public synchronized void initHotReloading()
     {
         boolean hasKeystore = hasKeystore();
+        boolean hasOutboundKeystore = hasOutboundKeystore();
         boolean hasTruststore = hasTruststore();
 
-        if (hasKeystore || hasTruststore)
+        if (hasKeystore || hasOutboundKeystore || hasTruststore)
         {
             List<HotReloadableFile> fileList = new ArrayList<>();
             if (hasKeystore)
             {
-                fileList.add(new HotReloadableFile(keystore));
+                fileList.add(new HotReloadableFile(keystoreContext.filePath));
+            }
+            if (hasOutboundKeystore)
+            {
+                fileList.add(new HotReloadableFile(outboundKeystoreContext.filePath));
             }
             if (hasTruststore)
             {
-                fileList.add(new HotReloadableFile(truststore));
+                fileList.add(new HotReloadableFile(trustStoreContext.filePath));
             }
             hotReloadableFiles = fileList;
         }
@@ -129,25 +134,13 @@ abstract public class FileBasedSslContextFactory extends AbstractSslContextFacto
     @Override
     protected KeyManagerFactory buildKeyManagerFactory() throws SSLException
     {
+        return getKeyManagerFactory(keystoreContext);
+    }
 
-        try (InputStream ksf = Files.newInputStream(Paths.get(keystore)))
-        {
-            final String algorithm = this.algorithm == null ? KeyManagerFactory.getDefaultAlgorithm() : this.algorithm;
-            KeyManagerFactory kmf = KeyManagerFactory.getInstance(algorithm);
-            KeyStore ks = KeyStore.getInstance(store_type);
-            ks.load(ksf, keystore_password.toCharArray());
-            if (!checkedExpiry)
-            {
-                checkExpiredCerts(ks);
-                checkedExpiry = true;
-            }
-            kmf.init(ks, keystore_password.toCharArray());
-            return kmf;
-        }
-        catch (Exception e)
-        {
-            throw new SSLException("failed to build key manager store for secure connections", e);
-        }
+    @Override
+    protected KeyManagerFactory buildOutboundKeyManagerFactory() throws SSLException
+    {
+        return getKeyManagerFactory(outboundKeystoreContext);
     }
 
     /**
@@ -159,12 +152,12 @@ abstract public class FileBasedSslContextFactory extends AbstractSslContextFacto
     @Override
     protected TrustManagerFactory buildTrustManagerFactory() throws SSLException
     {
-        try (InputStream tsf = Files.newInputStream(Paths.get(truststore)))
+        try (InputStream tsf = Files.newInputStream(Paths.get(trustStoreContext.filePath)))
         {
             final String algorithm = this.algorithm == null ? TrustManagerFactory.getDefaultAlgorithm() : this.algorithm;
             TrustManagerFactory tmf = TrustManagerFactory.getInstance(algorithm);
             KeyStore ts = KeyStore.getInstance(store_type);
-            ts.load(tsf, truststore_password.toCharArray());
+            ts.load(tsf, trustStoreContext.password.toCharArray());
             tmf.init(ts);
             return tmf;
         }
@@ -174,6 +167,29 @@ abstract public class FileBasedSslContextFactory extends AbstractSslContextFacto
         }
     }
 
+    private KeyManagerFactory getKeyManagerFactory(final FileBasedStoreContext context) throws SSLException
+    {
+        try (InputStream ksf = Files.newInputStream(Paths.get(context.filePath)))
+        {
+            final String algorithm = this.algorithm == null ? KeyManagerFactory.getDefaultAlgorithm() : this.algorithm;
+            KeyManagerFactory kmf = KeyManagerFactory.getInstance(algorithm);
+            KeyStore ks = KeyStore.getInstance(store_type);
+            ks.load(ksf, context.password.toCharArray());
+
+            if (!context.checkedExpiry)
+            {
+                checkExpiredCerts(ks);
+                context.checkedExpiry = true;
+            }
+            kmf.init(ks, context.password.toCharArray());
+            return kmf;
+        }
+        catch (Exception e)
+        {
+            throw new SSLException("failed to build key manager store for secure connections", e);
+        }
+    }
+
     protected boolean checkExpiredCerts(KeyStore ks) throws KeyStoreException
     {
         boolean hasExpiredCerts = false;
@@ -225,4 +241,27 @@ abstract public class FileBasedSslContextFactory extends AbstractSslContextFacto
                    '}';
         }
     }
+
+    protected static class FileBasedStoreContext
+    {
+        public volatile boolean checkedExpiry = false;
+        public String filePath;
+        public String password;
+
+        public FileBasedStoreContext(String keystore, String keystorePassword)
+        {
+            this.filePath = keystore;
+            this.password = keystorePassword;
+        }
+
+        protected boolean hasKeystore()
+        {
+            return filePath != null && new File(filePath).exists();
+        }
+
+        protected boolean passwordMatchesIfPresent(String keyPassword)
+        {
+            return StringUtils.isEmpty(password) || keyPassword.equals(password);
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/security/ISslContextFactory.java b/src/java/org/apache/cassandra/security/ISslContextFactory.java
index 579c95e43a..11c4717b3c 100644
--- a/src/java/org/apache/cassandra/security/ISslContextFactory.java
+++ b/src/java/org/apache/cassandra/security/ISslContextFactory.java
@@ -99,6 +99,16 @@ public interface ISslContextFactory
         return true;
     }
 
+    /**
+     * Returns if this factory uses outbound keystore.
+     *
+     * @return {@code true} by default unless the implementation overrides this
+     */
+    default boolean hasOutboundKeystore()
+    {
+        return false;
+    }
+
     /**
      * Returns the prepared list of accepted protocols.
      *
diff --git a/src/java/org/apache/cassandra/security/PEMBasedSslContextFactory.java b/src/java/org/apache/cassandra/security/PEMBasedSslContextFactory.java
index 8ecbec59ef..3d3ecc21fd 100644
--- a/src/java/org/apache/cassandra/security/PEMBasedSslContextFactory.java
+++ b/src/java/org/apache/cassandra/security/PEMBasedSslContextFactory.java
@@ -90,47 +90,55 @@ public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
 {
     public static final String DEFAULT_TARGET_STORETYPE = "PKCS12";
     private static final Logger logger = LoggerFactory.getLogger(PEMBasedSslContextFactory.class);
-    private String pemEncodedKey;
-    private String keyPassword;
-    private String pemEncodedCertificates;
-    private boolean maybeFileBasedPrivateKey;
-    private boolean maybeFileBasedTrustedCertificates;
+    private PEMBasedKeyStoreContext pemEncodedTrustCertificates;
+    private PEMBasedKeyStoreContext pemEncodedKeyContext;
+    private PEMBasedKeyStoreContext pemEncodedOutboundKeyContext;
 
     public PEMBasedSslContextFactory()
     {
     }
 
-    public PEMBasedSslContextFactory(Map<String, Object> parameters)
+    private void validatePasswords()
     {
-        super(parameters);
-        pemEncodedKey = getString(ConfigKey.ENCODED_KEY.getKeyName());
-        keyPassword = getString(ConfigKey.KEY_PASSWORD.getKeyName());
-        if (StringUtils.isEmpty(keyPassword))
-        {
-            keyPassword = keystore_password;
-        }
-        else if (!StringUtils.isEmpty(keystore_password) && !keyPassword.equals(keystore_password))
+        boolean shouldThrow = !keystoreContext.passwordMatchesIfPresent(pemEncodedKeyContext.password)
+                              || !outboundKeystoreContext.passwordMatchesIfPresent(pemEncodedOutboundKeyContext.password);
+        boolean outboundPasswordMismatch = !outboundKeystoreContext.passwordMatchesIfPresent(pemEncodedOutboundKeyContext.password);
+        String keyName = outboundPasswordMismatch ? "outbound_" : "";
+
+        if (shouldThrow)
         {
-            throw new IllegalArgumentException("'keystore_password' and 'key_password' both configurations are given and the " +
-                                               "values do not match");
+            final String msg = String.format("'%skeystore_password' and '%skey_password' both configurations are given and the values do not match", keyName, keyName);
+            throw new IllegalArgumentException(msg);
         }
         else
         {
-            logger.warn("'keystore_password' and 'key_password' both are configured but since the values match it's " +
-                        "okay. Ideally you should only specify one of them.");
+            logger.warn("'{}keystore_password' and '{}key_password' both are configured but since the values match it's " +
+                        "okay. Ideally you should only specify one of them.", keyName, keyName);
         }
+    }
+
+    public PEMBasedSslContextFactory(Map<String, Object> parameters)
+    {
+        super(parameters);
+        final String pemEncodedKey = getString(ConfigKey.ENCODED_KEY.getKeyName());
+        final String pemEncodedKeyPassword = StringUtils.defaultString(getString(ConfigKey.KEY_PASSWORD.getKeyName()), keystoreContext.password);
+        pemEncodedKeyContext = new PEMBasedKeyStoreContext(pemEncodedKey, pemEncodedKeyPassword, StringUtils.isEmpty(pemEncodedKey), keystoreContext);
+
+        final String pemEncodedOutboundKey = StringUtils.defaultString(getString(ConfigKey.OUTBOUND_ENCODED_KEY.getKeyName()), pemEncodedKey);
+        final String outboundKeyPassword = StringUtils.defaultString(StringUtils.defaultString(getString(ConfigKey.OUTBOUND_ENCODED_KEY_PASSWORD.getKeyName()),
+                                                                                               outboundKeystoreContext.password), pemEncodedKeyPassword);
+        pemEncodedOutboundKeyContext = new PEMBasedKeyStoreContext(pemEncodedKey, outboundKeyPassword, StringUtils.isEmpty(pemEncodedOutboundKey), outboundKeystoreContext);
+
+        validatePasswords();
 
-        if (!StringUtils.isEmpty(truststore_password))
+        if (!StringUtils.isEmpty(trustStoreContext.password))
         {
             logger.warn("PEM based truststore should not be using password. Ignoring the given value in " +
                         "'truststore_password' configuration.");
         }
 
-        pemEncodedCertificates = getString(ConfigKey.ENCODED_CERTIFICATES.getKeyName());
-
-        maybeFileBasedPrivateKey = StringUtils.isEmpty(pemEncodedKey);
-        maybeFileBasedTrustedCertificates = StringUtils.isEmpty(pemEncodedCertificates);
-
+        final String pemEncodedCerts = getString(ConfigKey.ENCODED_CERTIFICATES.getKeyName());
+        pemEncodedTrustCertificates = new PEMBasedKeyStoreContext(pemEncodedCerts, null, StringUtils.isEmpty(pemEncodedCerts), trustStoreContext);
         enforceSinglePrivateKeySource();
         enforceSingleTurstedCertificatesSource();
     }
@@ -143,18 +151,22 @@ public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
     @Override
     public boolean hasKeystore()
     {
-        return maybeFileBasedPrivateKey ? keystoreFileExists() :
-               !StringUtils.isEmpty(pemEncodedKey);
+        return pemEncodedKeyContext.maybeFilebasedKey
+               ? keystoreContext.hasKeystore()
+               : !StringUtils.isEmpty(pemEncodedKeyContext.key);
     }
 
     /**
-     * Checks if the keystore file exists.
+     * Decides if this factory has an outbound keystore defined - key material specified in files or inline to the configuration.
      *
-     * @return {@code true} if keystore file exists; {@code false} otherwise
+     * @return {@code true} if there is an outbound keystore defined; {@code false} otherwise
      */
-    private boolean keystoreFileExists()
+    @Override
+    public boolean hasOutboundKeystore()
     {
-        return keystore != null && new File(keystore).exists();
+        return pemEncodedOutboundKeyContext.maybeFilebasedKey
+               ? outboundKeystoreContext.hasKeystore()
+               : !StringUtils.isEmpty(pemEncodedOutboundKeyContext.key);
     }
 
     /**
@@ -165,8 +177,8 @@ public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
      */
     private boolean hasTruststore()
     {
-        return maybeFileBasedTrustedCertificates ? truststoreFileExists() :
-               !StringUtils.isEmpty(pemEncodedCertificates);
+        return pemEncodedTrustCertificates.maybeFilebasedKey ? truststoreFileExists() :
+               !StringUtils.isEmpty(pemEncodedTrustCertificates.key);
     }
 
     /**
@@ -176,7 +188,7 @@ public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
      */
     private boolean truststoreFileExists()
     {
-        return truststore != null && new File(truststore).exists();
+        return trustStoreContext.filePath != null && new File(trustStoreContext.filePath).exists();
     }
 
     /**
@@ -186,13 +198,17 @@ public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
     public synchronized void initHotReloading()
     {
         List<HotReloadableFile> fileList = new ArrayList<>();
-        if (maybeFileBasedPrivateKey && hasKeystore())
+        if (pemEncodedKeyContext.maybeFilebasedKey && hasKeystore())
+        {
+            fileList.add(new HotReloadableFile(keystoreContext.filePath));
+        }
+        if (pemEncodedOutboundKeyContext.maybeFilebasedKey && hasOutboundKeystore())
         {
-            fileList.add(new HotReloadableFile(keystore));
+            fileList.add(new HotReloadableFile(outboundKeystoreContext.filePath));
         }
-        if (maybeFileBasedTrustedCertificates && hasTruststore())
+        if (pemEncodedTrustCertificates.maybeFilebasedKey && hasTruststore())
         {
-            fileList.add(new HotReloadableFile(truststore));
+            fileList.add(new HotReloadableFile(trustStoreContext.filePath));
         }
         if (!fileList.isEmpty())
         {
@@ -209,30 +225,41 @@ public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
      */
     @Override
     protected KeyManagerFactory buildKeyManagerFactory() throws SSLException
+    {
+        return buildKeyManagerFactory(pemEncodedKeyContext, keystoreContext);
+    }
+
+    @Override
+    protected KeyManagerFactory buildOutboundKeyManagerFactory() throws SSLException
+    {
+        return buildKeyManagerFactory(pemEncodedOutboundKeyContext, outboundKeystoreContext);
+    }
+
+    private KeyManagerFactory buildKeyManagerFactory(PEMBasedKeyStoreContext pemBasedKeyStoreContext, FileBasedStoreContext keyStoreContext) throws SSLException
     {
         try
         {
-            if (hasKeystore())
+            if (pemBasedKeyStoreContext.hasKey())
             {
-                if (maybeFileBasedPrivateKey)
+                if (pemBasedKeyStoreContext.maybeFilebasedKey)
                 {
-                    pemEncodedKey = readPEMFile(keystore); // read PEM from the file
+                    pemBasedKeyStoreContext.key = readPEMFile(keyStoreContext.filePath); // read PEM from the file
                 }
 
                 KeyManagerFactory kmf = KeyManagerFactory.getInstance(
                 algorithm == null ? KeyManagerFactory.getDefaultAlgorithm() : algorithm);
-                KeyStore ks = buildKeyStore();
-                if (!checkedExpiry)
+                KeyStore ks = buildKeyStore(pemBasedKeyStoreContext.key, pemBasedKeyStoreContext.password);
+                if (!keyStoreContext.checkedExpiry)
                 {
                     checkExpiredCerts(ks);
-                    checkedExpiry = true;
+                    keyStoreContext.checkedExpiry = true;
                 }
-                kmf.init(ks, keyPassword != null ? keyPassword.toCharArray() : null);
+                kmf.init(ks, pemBasedKeyStoreContext.password != null ? pemBasedKeyStoreContext.password.toCharArray() : null);
                 return kmf;
             }
             else
             {
-                throw new SSLException("Must provide keystore or private_key in configuration for PEMBasedSSlContextFactory");
+                throw new SSLException("Must provide outbound_keystore or outbound_private_key in configuration for PEMBasedSSlContextFactory");
             }
         }
         catch (Exception e)
@@ -254,9 +281,9 @@ public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
         {
             if (hasTruststore())
             {
-                if (maybeFileBasedTrustedCertificates)
+                if (pemEncodedTrustCertificates.maybeFilebasedKey)
                 {
-                    pemEncodedCertificates = readPEMFile(truststore); // read PEM from the file
+                    pemEncodedTrustCertificates.key = readPEMFile(trustStoreContext.filePath); // read PEM from the file
                 }
 
                 TrustManagerFactory tmf = TrustManagerFactory.getInstance(
@@ -286,7 +313,7 @@ public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
      * Builds KeyStore object given the {@link #DEFAULT_TARGET_STORETYPE} out of the PEM formatted private key material.
      * It uses {@code cassandra-ssl-keystore} as the alias for the created key-entry.
      */
-    private KeyStore buildKeyStore() throws GeneralSecurityException, IOException
+    private static KeyStore buildKeyStore(final String pemEncodedKey, final String keyPassword) throws GeneralSecurityException, IOException
     {
         char[] keyPasswordArray = keyPassword != null ? keyPassword.toCharArray() : null;
         PrivateKey privateKey = PEMReader.extractPrivateKey(pemEncodedKey, keyPassword);
@@ -310,7 +337,7 @@ public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
      */
     private KeyStore buildTrustStore() throws GeneralSecurityException, IOException
     {
-        Certificate[] certChainArray = PEMReader.extractCertificates(pemEncodedCertificates);
+        Certificate[] certChainArray = PEMReader.extractCertificates(pemEncodedTrustCertificates.key);
         if (certChainArray == null || certChainArray.length == 0)
         {
             throw new SSLException("Could not read any certificates from the given PEM");
@@ -331,11 +358,16 @@ public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
      */
     private void enforceSinglePrivateKeySource()
     {
-        if (keystoreFileExists() && !StringUtils.isEmpty(pemEncodedKey))
+        if (keystoreContext.hasKeystore() && !StringUtils.isEmpty(pemEncodedKeyContext.key))
         {
             throw new IllegalArgumentException("Configuration must specify value for either keystore or private_key, " +
                                                "not both for PEMBasedSSlContextFactory");
         }
+        if (outboundKeystoreContext.hasKeystore() && !StringUtils.isEmpty(pemEncodedOutboundKeyContext.key))
+        {
+            throw new IllegalArgumentException("Configuration must specify value for either outbound_keystore or outbound_private_key, " +
+                                               "not both for PEMBasedSSlContextFactory");
+        }
     }
 
     /**
@@ -344,17 +376,43 @@ public final class PEMBasedSslContextFactory extends FileBasedSslContextFactory
      */
     private void enforceSingleTurstedCertificatesSource()
     {
-        if (truststoreFileExists() && !StringUtils.isEmpty(pemEncodedCertificates))
+        if (truststoreFileExists() && !StringUtils.isEmpty(pemEncodedTrustCertificates.key))
         {
             throw new IllegalArgumentException("Configuration must specify value for either truststore or " +
                                                "trusted_certificates, not both for PEMBasedSSlContextFactory");
         }
     }
 
+    public static class PEMBasedKeyStoreContext
+    {
+        public String key;
+        public final String password;
+        public final boolean maybeFilebasedKey;
+        public final FileBasedStoreContext filebasedKeystoreContext;
+
+        public PEMBasedKeyStoreContext(final String encodedKey, final String getEncodedKeyPassword,
+                                       final boolean maybeFilebasedKey, final FileBasedStoreContext filebasedKeystoreContext)
+        {
+            this.key = encodedKey;
+            this.password = getEncodedKeyPassword;
+            this.maybeFilebasedKey = maybeFilebasedKey;
+            this.filebasedKeystoreContext = filebasedKeystoreContext;
+        }
+
+        public boolean hasKey()
+        {
+            return maybeFilebasedKey
+                   ? filebasedKeystoreContext.hasKeystore()
+                   : !StringUtils.isEmpty(key);
+        }
+    }
+
     public enum ConfigKey
     {
         ENCODED_KEY("private_key"),
         KEY_PASSWORD("private_key_password"),
+        OUTBOUND_ENCODED_KEY("outbound_private_key"),
+        OUTBOUND_ENCODED_KEY_PASSWORD("outbound_private_key_password"),
         ENCODED_CERTIFICATES("trusted_certificates");
 
         final String keyName;
diff --git a/test/conf/cassandra_ssl_test_outbound.keystore b/test/conf/cassandra_ssl_test_outbound.keystore
new file mode 100644
index 0000000000..7dbf466e5a
Binary files /dev/null and b/test/conf/cassandra_ssl_test_outbound.keystore differ
diff --git a/test/unit/org/apache/cassandra/config/EncryptionOptionsTest.java b/test/unit/org/apache/cassandra/config/EncryptionOptionsTest.java
index a76c24a458..0ff2124794 100644
--- a/test/unit/org/apache/cassandra/config/EncryptionOptionsTest.java
+++ b/test/unit/org/apache/cassandra/config/EncryptionOptionsTest.java
@@ -125,7 +125,7 @@ public class EncryptionOptionsTest
                                                          EncryptionOptions.TlsEncryptionPolicy expected)
         {
             return new ServerEncryptionOptionsTestCase(new EncryptionOptions.ServerEncryptionOptions(new ParameterizedClass("org.apache.cassandra.security.DefaultSslContextFactory",
-                                                                                                                            new HashMap<>()), keystorePath, "dummypass", "dummytruststore", "dummypass",
+                                                                                                                            new HashMap<>()), keystorePath, "dummypass", keystorePath, "dummypass", "dummytruststore", "dummypass",
                                                                                                Collections.emptyList(), null, null, null, "JKS", false, false, optional, internodeEncryption, false)
                                                        .applyConfig(),
                                                  expected,
diff --git a/test/unit/org/apache/cassandra/security/DefaultSslContextFactoryTest.java b/test/unit/org/apache/cassandra/security/DefaultSslContextFactoryTest.java
index 13d1faca04..3edf9c188e 100644
--- a/test/unit/org/apache/cassandra/security/DefaultSslContextFactoryTest.java
+++ b/test/unit/org/apache/cassandra/security/DefaultSslContextFactoryTest.java
@@ -53,15 +53,23 @@ public class DefaultSslContextFactoryTest
         config.put("keystore_password", "cassandra");
     }
 
+    private void addOutboundKeystoreOptions(Map<String, Object> config)
+    {
+        config.put("outbound_keystore", "test/conf/cassandra_ssl_test_outbound.keystore");
+        config.put("outbound_keystore_password", "cassandra");
+    }
+
     @Test
     public void getSslContextOpenSSL() throws IOException
     {
-        EncryptionOptions options = new EncryptionOptions().withTrustStore("test/conf/cassandra_ssl_test.truststore")
-                                                           .withTrustStorePassword("cassandra")
-                                                           .withKeyStore("test/conf/cassandra_ssl_test.keystore")
-                                                           .withKeyStorePassword("cassandra")
-                                                           .withRequireClientAuth(false)
-                                                           .withCipherSuites("TLS_RSA_WITH_AES_128_CBC_SHA");
+        EncryptionOptions.ServerEncryptionOptions options = new EncryptionOptions.ServerEncryptionOptions().withTrustStore("test/conf/cassandra_ssl_test.truststore")
+                                                                                                           .withTrustStorePassword("cassandra")
+                                                                                                           .withKeyStore("test/conf/cassandra_ssl_test.keystore")
+                                                                                                           .withKeyStorePassword("cassandra")
+                                                                                                           .withOutboundKeystore("test/conf/cassandra_ssl_test_outbound.keystore")
+                                                                                                           .withOutboundKeystorePassword("cassandra")
+                                                                                                           .withRequireClientAuth(false)
+                                                                                                           .withCipherSuites("TLS_RSA_WITH_AES_128_CBC_SHA");
         SslContext sslContext = SSLFactory.getOrCreateSslContext(options, true, ISslContextFactory.SocketType.CLIENT);
         Assert.assertNotNull(sslContext);
         if (OpenSsl.isAvailable())
@@ -78,7 +86,7 @@ public class DefaultSslContextFactoryTest
         config.put("truststore", "/this/is/probably/not/a/file/on/your/test/machine");
 
         DefaultSslContextFactory defaultSslContextFactoryImpl = new DefaultSslContextFactory(config);
-        defaultSslContextFactoryImpl.checkedExpiry = false;
+        defaultSslContextFactoryImpl.keystoreContext.checkedExpiry = false;
         defaultSslContextFactoryImpl.buildTrustManagerFactory();
     }
 
@@ -90,7 +98,7 @@ public class DefaultSslContextFactoryTest
         config.put("truststore_password", "HomeOfBadPasswords");
 
         DefaultSslContextFactory defaultSslContextFactoryImpl = new DefaultSslContextFactory(config);
-        defaultSslContextFactoryImpl.checkedExpiry = false;
+        defaultSslContextFactoryImpl.keystoreContext.checkedExpiry = false;
         defaultSslContextFactoryImpl.buildTrustManagerFactory();
     }
 
@@ -101,7 +109,7 @@ public class DefaultSslContextFactoryTest
         config.putAll(commonConfig);
 
         DefaultSslContextFactory defaultSslContextFactoryImpl = new DefaultSslContextFactory(config);
-        defaultSslContextFactoryImpl.checkedExpiry = false;
+        defaultSslContextFactoryImpl.keystoreContext.checkedExpiry = false;
         TrustManagerFactory trustManagerFactory = defaultSslContextFactoryImpl.buildTrustManagerFactory();
         Assert.assertNotNull(trustManagerFactory);
     }
@@ -114,7 +122,7 @@ public class DefaultSslContextFactoryTest
         config.put("keystore", "/this/is/probably/not/a/file/on/your/test/machine");
 
         DefaultSslContextFactory defaultSslContextFactoryImpl = new DefaultSslContextFactory(config);
-        defaultSslContextFactoryImpl.checkedExpiry = false;
+        defaultSslContextFactoryImpl.keystoreContext.checkedExpiry = false;
         defaultSslContextFactoryImpl.buildKeyManagerFactory();
     }
 
@@ -138,20 +146,70 @@ public class DefaultSslContextFactoryTest
 
         DefaultSslContextFactory defaultSslContextFactoryImpl = new DefaultSslContextFactory(config);
         // Make sure the exiry check didn't happen so far for the private key
-        Assert.assertFalse(defaultSslContextFactoryImpl.checkedExpiry);
+        Assert.assertFalse(defaultSslContextFactoryImpl.keystoreContext.checkedExpiry);
 
         addKeystoreOptions(config);
         DefaultSslContextFactory defaultSslContextFactoryImpl2 = new DefaultSslContextFactory(config);
         // Trigger the private key loading. That will also check for expired private key
         defaultSslContextFactoryImpl2.buildKeyManagerFactory();
         // Now we should have checked the private key's expiry
-        Assert.assertTrue(defaultSslContextFactoryImpl2.checkedExpiry);
+        Assert.assertTrue(defaultSslContextFactoryImpl2.keystoreContext.checkedExpiry);
 
         // Make sure that new factory object preforms the fresh private key expiry check
         DefaultSslContextFactory defaultSslContextFactoryImpl3 = new DefaultSslContextFactory(config);
-        Assert.assertFalse(defaultSslContextFactoryImpl3.checkedExpiry);
+        Assert.assertFalse(defaultSslContextFactoryImpl3.keystoreContext.checkedExpiry);
         defaultSslContextFactoryImpl3.buildKeyManagerFactory();
-        Assert.assertTrue(defaultSslContextFactoryImpl3.checkedExpiry);
+        Assert.assertTrue(defaultSslContextFactoryImpl3.keystoreContext.checkedExpiry);
+    }
+
+    @Test(expected = IOException.class)
+    public void buildOutboundKeyManagerFactoryWithInvalidKeystoreFile() throws IOException
+    {
+        Map<String, Object> config = new HashMap<>();
+        config.putAll(commonConfig);
+        config.put("outbound_keystore", "/this/is/probably/not/a/file/on/your/test/machine");
+
+        DefaultSslContextFactory defaultSslContextFactoryImpl = new DefaultSslContextFactory(config);
+        defaultSslContextFactoryImpl.outboundKeystoreContext.checkedExpiry = false;
+        defaultSslContextFactoryImpl.buildOutboundKeyManagerFactory();
+    }
+
+    @Test(expected = IOException.class)
+    public void buildOutboundKeyManagerFactoryWithBadPassword() throws IOException
+    {
+        Map<String, Object> config = new HashMap<>();
+        config.putAll(commonConfig);
+        addOutboundKeystoreOptions(config);
+        config.put("outbound_keystore_password", "HomeOfBadPasswords");
+
+        DefaultSslContextFactory defaultSslContextFactoryImpl = new DefaultSslContextFactory(config);
+        defaultSslContextFactoryImpl.buildKeyManagerFactory();
+    }
+
+    @Test
+    public void buildOutboundKeyManagerFactoryHappyPath() throws IOException
+    {
+        Map<String, Object> config = new HashMap<>();
+        config.putAll(commonConfig);
+
+        DefaultSslContextFactory defaultSslContextFactoryImpl = new DefaultSslContextFactory(config);
+        // Make sure the exiry check didn't happen so far for the private key
+        Assert.assertFalse(defaultSslContextFactoryImpl.outboundKeystoreContext.checkedExpiry);
+
+        addOutboundKeystoreOptions(config);
+        DefaultSslContextFactory defaultSslContextFactoryImpl2 = new DefaultSslContextFactory(config);
+        // Trigger the private key loading. That will also check for expired private key
+        defaultSslContextFactoryImpl2.buildOutboundKeyManagerFactory();
+        // Now we should have checked the private key's expiry
+        Assert.assertTrue(defaultSslContextFactoryImpl2.outboundKeystoreContext.checkedExpiry);
+        Assert.assertFalse(defaultSslContextFactoryImpl2.keystoreContext.checkedExpiry);
+
+        // Make sure that new factory object preforms the fresh private key expiry check
+        DefaultSslContextFactory defaultSslContextFactoryImpl3 = new DefaultSslContextFactory(config);
+        Assert.assertFalse(defaultSslContextFactoryImpl3.outboundKeystoreContext.checkedExpiry);
+        defaultSslContextFactoryImpl3.buildOutboundKeyManagerFactory();
+        Assert.assertTrue(defaultSslContextFactoryImpl3.outboundKeystoreContext.checkedExpiry);
+        Assert.assertFalse(defaultSslContextFactoryImpl2.keystoreContext.checkedExpiry);
     }
 
     @Test
diff --git a/test/unit/org/apache/cassandra/security/PEMBasedSslContextFactoryTest.java b/test/unit/org/apache/cassandra/security/PEMBasedSslContextFactoryTest.java
index 243d300539..f919a1994e 100644
--- a/test/unit/org/apache/cassandra/security/PEMBasedSslContextFactoryTest.java
+++ b/test/unit/org/apache/cassandra/security/PEMBasedSslContextFactoryTest.java
@@ -216,6 +216,27 @@ public class PEMBasedSslContextFactoryTest
                                                            .withRequireClientAuth(false)
                                                            .withCipherSuites("TLS_RSA_WITH_AES_128_CBC_SHA")
                                                            .withSslContextFactory(sslContextFactory);
+        SslContext sslContext = SSLFactory.getOrCreateSslContext(options, true, ISslContextFactory.SocketType.SERVER);
+        Assert.assertNotNull(sslContext);
+        if (OpenSsl.isAvailable())
+            Assert.assertTrue(sslContext instanceof OpenSslContext);
+        else
+            Assert.assertTrue(sslContext instanceof SslContext);
+    }
+
+    @Test
+    public void getSslContextOpenSSLOutboundKeystore() throws IOException
+    {
+        ParameterizedClass sslContextFactory = new ParameterizedClass(PEMBasedSslContextFactory.class.getSimpleName()
+        , new HashMap<>());
+        EncryptionOptions.ServerEncryptionOptions options = new EncryptionOptions.ServerEncryptionOptions().withTrustStore("test/conf/cassandra_ssl_test.truststore.pem")
+                                                                                                           .withKeyStore("test/conf/cassandra_ssl_test.keystore.pem")
+                                                                                                           .withKeyStorePassword("cassandra")
+                                                                                                           .withOutboundKeystore("test/conf/cassandra_ssl_test.keystore.pem")
+                                                                                                           .withOutboundKeystorePassword("cassandra")
+                                                                                                           .withRequireClientAuth(false)
+                                                                                                           .withCipherSuites("TLS_RSA_WITH_AES_128_CBC_SHA")
+                                                                                                           .withSslContextFactory(sslContextFactory);
         SslContext sslContext = SSLFactory.getOrCreateSslContext(options, true, ISslContextFactory.SocketType.CLIENT);
         Assert.assertNotNull(sslContext);
         if (OpenSsl.isAvailable())
@@ -233,7 +254,7 @@ public class PEMBasedSslContextFactoryTest
         config.put("truststore", "/this/is/probably/not/a/file/on/your/test/machine");
 
         DefaultSslContextFactory defaultSslContextFactoryImpl = new DefaultSslContextFactory(config);
-        defaultSslContextFactoryImpl.checkedExpiry = false;
+        defaultSslContextFactoryImpl.keystoreContext.checkedExpiry = false;
         defaultSslContextFactoryImpl.buildTrustManagerFactory();
     }
 
@@ -244,7 +265,7 @@ public class PEMBasedSslContextFactoryTest
         config.putAll(commonConfig);
 
         PEMBasedSslContextFactory sslContextFactory = new PEMBasedSslContextFactory(config);
-        sslContextFactory.checkedExpiry = false;
+        sslContextFactory.keystoreContext.checkedExpiry = false;
         TrustManagerFactory trustManagerFactory = sslContextFactory.buildTrustManagerFactory();
         Assert.assertNotNull(trustManagerFactory);
     }
@@ -258,7 +279,7 @@ public class PEMBasedSslContextFactoryTest
         addFileBaseTrustStoreOptions(config);
 
         PEMBasedSslContextFactory sslContextFactory = new PEMBasedSslContextFactory(config);
-        sslContextFactory.checkedExpiry = false;
+        sslContextFactory.keystoreContext.checkedExpiry = false;
         TrustManagerFactory trustManagerFactory = sslContextFactory.buildTrustManagerFactory();
         Assert.assertNotNull(trustManagerFactory);
     }
@@ -271,7 +292,7 @@ public class PEMBasedSslContextFactoryTest
         config.put("keystore", "/this/is/probably/not/a/file/on/your/test/machine");
 
         PEMBasedSslContextFactory sslContextFactory = new PEMBasedSslContextFactory(config);
-        sslContextFactory.checkedExpiry = false;
+        sslContextFactory.keystoreContext.checkedExpiry = false;
         sslContextFactory.buildKeyManagerFactory();
     }
 
@@ -295,20 +316,20 @@ public class PEMBasedSslContextFactoryTest
 
         PEMBasedSslContextFactory sslContextFactory1 = new PEMBasedSslContextFactory(config);
         // Make sure the exiry check didn't happen so far for the private key
-        Assert.assertFalse(sslContextFactory1.checkedExpiry);
+        Assert.assertFalse(sslContextFactory1.keystoreContext.checkedExpiry);
 
         addKeyStoreOptions(config);
         PEMBasedSslContextFactory sslContextFactory2 = new PEMBasedSslContextFactory(config);
         // Trigger the private key loading. That will also check for expired private key
         sslContextFactory2.buildKeyManagerFactory();
         // Now we should have checked the private key's expiry
-        Assert.assertTrue(sslContextFactory2.checkedExpiry);
+        Assert.assertTrue(sslContextFactory2.keystoreContext.checkedExpiry);
 
         // Make sure that new factory object preforms the fresh private key expiry check
         PEMBasedSslContextFactory sslContextFactory3 = new PEMBasedSslContextFactory(config);
-        Assert.assertFalse(sslContextFactory3.checkedExpiry);
+        Assert.assertFalse(sslContextFactory3.keystoreContext.checkedExpiry);
         sslContextFactory3.buildKeyManagerFactory();
-        Assert.assertTrue(sslContextFactory3.checkedExpiry);
+        Assert.assertTrue(sslContextFactory3.keystoreContext.checkedExpiry);
     }
 
     @Test(expected = IllegalArgumentException.class)
@@ -343,20 +364,20 @@ public class PEMBasedSslContextFactoryTest
 
         PEMBasedSslContextFactory sslContextFactory1 = new PEMBasedSslContextFactory(config);
         // Make sure the expiry check didn't happen so far for the private key
-        Assert.assertFalse(sslContextFactory1.checkedExpiry);
+        Assert.assertFalse(sslContextFactory1.keystoreContext.checkedExpiry);
 
         addFileBaseKeyStoreOptions(config);
         PEMBasedSslContextFactory sslContextFactory2 = new PEMBasedSslContextFactory(config);
         // Trigger the private key loading. That will also check for expired private key
         sslContextFactory2.buildKeyManagerFactory();
         // Now we should have checked the private key's expiry
-        Assert.assertTrue(sslContextFactory2.checkedExpiry);
+        Assert.assertTrue(sslContextFactory2.keystoreContext.checkedExpiry);
 
         // Make sure that new factory object preforms the fresh private key expiry check
         PEMBasedSslContextFactory sslContextFactory3 = new PEMBasedSslContextFactory(config);
-        Assert.assertFalse(sslContextFactory3.checkedExpiry);
+        Assert.assertFalse(sslContextFactory3.keystoreContext.checkedExpiry);
         sslContextFactory3.buildKeyManagerFactory();
-        Assert.assertTrue(sslContextFactory3.checkedExpiry);
+        Assert.assertTrue(sslContextFactory3.keystoreContext.checkedExpiry);
     }
 
     @Test
diff --git a/test/unit/org/apache/cassandra/security/SSLFactoryTest.java b/test/unit/org/apache/cassandra/security/SSLFactoryTest.java
index e5aa4b1057..ff3bab9d26 100644
--- a/test/unit/org/apache/cassandra/security/SSLFactoryTest.java
+++ b/test/unit/org/apache/cassandra/security/SSLFactoryTest.java
@@ -18,11 +18,19 @@
 */
 package org.apache.cassandra.security;
 
-import org.apache.cassandra.io.util.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.Certificate;
 import java.security.cert.CertificateException;
 import java.util.HashMap;
 import java.util.Map;
+import javax.net.ssl.X509KeyManager;
 
 import org.apache.commons.io.FileUtils;
 import org.junit.Assert;
@@ -31,12 +39,19 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.handler.ssl.OpenSslClientContext;
+import io.netty.handler.ssl.OpenSslServerContext;
+import io.netty.handler.ssl.OpenSslSessionContext;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.util.SelfSignedCertificate;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
 import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.io.util.File;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 public class SSLFactoryTest
 {
@@ -65,13 +80,17 @@ public class SSLFactoryTest
                             .withTrustStore("test/conf/cassandra_ssl_test.truststore")
                             .withTrustStorePassword("cassandra")
                             .withRequireClientAuth(false)
-                            .withCipherSuites("TLS_RSA_WITH_AES_128_CBC_SHA");
+                            .withCipherSuites("TLS_RSA_WITH_AES_128_CBC_SHA")
+                            .withSslContextFactory(new ParameterizedClass(TestFileBasedSSLContextFactory.class.getName(),
+                                                                          new HashMap<>()));
     }
 
     private ServerEncryptionOptions addKeystoreOptions(ServerEncryptionOptions options)
     {
         return options.withKeyStore("test/conf/cassandra_ssl_test.keystore")
-                      .withKeyStorePassword("cassandra");
+                      .withKeyStorePassword("cassandra")
+                      .withOutboundKeystore("test/conf/cassandra_ssl_test_outbound.keystore")
+                      .withOutboundKeystorePassword("cassandra");
     }
 
     private ServerEncryptionOptions addPEMKeystoreOptions(ServerEncryptionOptions options)
@@ -81,6 +100,8 @@ public class SSLFactoryTest
         return options.withSslContextFactory(sslContextFactoryClass)
                       .withKeyStore("test/conf/cassandra_ssl_test.keystore.pem")
                       .withKeyStorePassword("cassandra")
+                      .withOutboundKeystore("test/conf/cassandra_ssl_test.keystore.pem")
+                      .withOutboundKeystorePassword("cassandra")
                       .withTrustStore("test/conf/cassandra_ssl_test.truststore.pem");
     }
 
@@ -117,7 +138,41 @@ public class SSLFactoryTest
     }
 
     @Test
-    public void testPEMSslContextReload_HappyPath() throws IOException, InterruptedException
+    public void testServerSocketShouldUseKeystore() throws IOException, CertificateException, KeyStoreException, NoSuchAlgorithmException, NoSuchFieldException, IllegalAccessException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException
+    {
+        ServerEncryptionOptions options = addKeystoreOptions(encryptionOptions)
+        .withOutboundKeystore("dummyKeystore")
+        .withOutboundKeystorePassword("dummyPassword");
+
+        // Server socket type should create a keystore with keystore & keystore password
+        final OpenSslServerContext context = (OpenSslServerContext) SSLFactory.createNettySslContext(options, true, ISslContextFactory.SocketType.SERVER);
+        assertNotNull(context);
+
+        // Verify if right certificate is loaded into SslContext
+        final Certificate loadedCertificate = getCertificateLoadedInSslContext(context.sessionContext());
+        final Certificate certificate = getCertificates("test/conf/cassandra_ssl_test.keystore", "cassandra");
+        assertEquals(loadedCertificate, certificate);
+    }
+
+    @Test
+    public void testClientSocketShouldUseOutboundKeystore() throws IOException, CertificateException, KeyStoreException, NoSuchAlgorithmException, NoSuchFieldException, ClassNotFoundException, InvocationTargetException, IllegalAccessException, NoSuchMethodException
+    {
+        ServerEncryptionOptions options = addKeystoreOptions(encryptionOptions)
+        .withKeyStore("dummyKeystore")
+        .withKeyStorePassword("dummyPassword");
+
+        // Client socket type should create a keystore with outbound Keystore & outbound password
+        final OpenSslClientContext context = (OpenSslClientContext) SSLFactory.createNettySslContext(options, true, ISslContextFactory.SocketType.CLIENT);
+        assertNotNull(context);
+
+        // Verify if right certificate is loaded into SslContext
+        final Certificate loadedCertificate = getCertificateLoadedInSslContext(context.sessionContext());
+        final Certificate certificate = getCertificates("test/conf/cassandra_ssl_test_outbound.keystore", "cassandra");
+        assertEquals(loadedCertificate, certificate);
+    }
+
+    @Test
+    public void testPEMSslContextReload_HappyPath() throws IOException
     {
         try
         {
@@ -223,8 +278,7 @@ public class SSLFactoryTest
     @Test
     public void getSslContext_ParamChanges() throws IOException
     {
-        EncryptionOptions options = addKeystoreOptions(encryptionOptions)
-                                    .withEnabled(true)
+        ServerEncryptionOptions options = addKeystoreOptions(encryptionOptions)
                                     .withCipherSuites("TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256");
 
         SslContext ctx1 = SSLFactory.getOrCreateSslContext(options, true,
@@ -301,4 +355,36 @@ public class SSLFactoryTest
 
         Assert.assertNotEquals(cacheKey1, cacheKey2);
     }
+
+    public static class TestFileBasedSSLContextFactory extends FileBasedSslContextFactory {
+        public TestFileBasedSSLContextFactory(Map<String, Object> parameters)
+        {
+            super(parameters);
+        }
+    }
+
+    private static Certificate getCertificates(final String filename, final String password) throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException
+    {
+        FileInputStream is = new FileInputStream(filename);
+        KeyStore keystore = KeyStore.getInstance(KeyStore.getDefaultType());
+        char[] passwd = password.toCharArray();
+        keystore.load(is, passwd);
+        return keystore.getCertificate("cassandra_ssl_test");
+    }
+
+    private static Certificate getCertificateLoadedInSslContext(final OpenSslSessionContext session)
+    throws ClassNotFoundException, InvocationTargetException, IllegalAccessException, NoSuchMethodException, NoSuchFieldException
+    {
+        Field providerField = OpenSslSessionContext.class.getDeclaredField("provider");
+        providerField.setAccessible(true);
+
+        Class<?> keyMaterialProvider = Class.forName("io.netty.handler.ssl.OpenSslKeyMaterialProvider");
+        Object provider = keyMaterialProvider.cast(providerField.get(session));
+
+        Method keyManager = provider.getClass().getDeclaredMethod("keyManager");
+        keyManager.setAccessible(true);
+        X509KeyManager keyManager1 = (X509KeyManager) keyManager.invoke(provider);
+        final Certificate[] certificates = keyManager1.getCertificateChain("cassandra_ssl_test");
+        return certificates[0];
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org