You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2017/11/03 12:09:15 UTC

cassandra git commit: Node to Node encryption transitional mode

Repository: cassandra
Updated Branches:
  refs/heads/trunk 87962dcf3 -> 260846685


Node to Node encryption transitional mode

patch by jasobrown; reviewed by Stefan Podkowinski for CASSANDRA-10404


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/26084668
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/26084668
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/26084668

Branch: refs/heads/trunk
Commit: 260846685b6129a324a7cb7396da135fee85ec04
Parents: 87962dc
Author: Jason Brown <ja...@gmail.com>
Authored: Wed Feb 15 05:41:30 2017 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Fri Nov 3 05:06:38 2017 -0700

----------------------------------------------------------------------
 NEWS.txt                                        |   5 +-
 conf/cassandra.yaml                             |  23 +-
 .../org/apache/cassandra/config/Config.java     |   4 +-
 .../cassandra/config/DatabaseDescriptor.java    |  33 ++-
 .../cassandra/config/EncryptionOptions.java     |  43 +++-
 .../locator/ReconnectableSnitchHelper.java      |   2 +-
 .../apache/cassandra/net/MessagingService.java  | 124 +++++++---
 .../cassandra/net/async/NettyFactory.java       | 117 ++++++----
 .../cassandra/net/async/OptionalSslHandler.java |  67 ++++++
 .../net/async/OutboundConnectionIdentifier.java |   6 +
 .../net/async/OutboundMessagingConnection.java  |  27 +++
 .../cassandra/streaming/StreamSession.java      |   2 +-
 .../org/apache/cassandra/tools/BulkLoader.java  |   2 +-
 .../apache/cassandra/tools/LoaderOptions.java   |   6 +-
 .../org/apache/cassandra/transport/Client.java  |   7 +-
 .../org/apache/cassandra/transport/Server.java  |   2 +-
 .../cassandra/transport/SimpleClient.java       |  15 +-
 .../org/apache/cassandra/utils/FBUtilities.java |   2 +-
 .../cassandra/net/MessagingServiceTest.java     | 228 ++++++++++++++++---
 .../cassandra/net/async/NettyFactoryTest.java   |  51 ++++-
 .../async/OutboundMessagingConnectionTest.java  |  45 ++++
 .../service/ProtocolBetaVersionTest.java        |   4 +-
 .../cassandra/transport/MessagePayloadTest.java |   4 +-
 .../stress/settings/SettingsTransport.java      |   5 +-
 .../stress/settings/StressSettings.java         |   2 +-
 .../cassandra/stress/util/JavaDriverClient.java |   6 +-
 26 files changed, 657 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 7a133b8..09a9a7b 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -39,7 +39,7 @@ Upgrading
       4.0 and the legacy tables must have been removed. See the 'Upgrading' section
       for version 2.2 for migration instructions.
     - Cassandra 4.0 removed support for the deprecated Thrift interface. Amongst
-      Tother things, this imply the removal of all yaml option related to thrift
+      other things, this implies the removal of all yaml options related to thrift
       ('start_rpc', rpc_port, ...).
     - Cassandra 4.0 removed support for any pre-3.0 format. This means you
       cannot upgrade from a 2.x version to 4.0 directly, you have to upgrade to
@@ -67,6 +67,9 @@ Upgrading
 	- the miniumum value for internode message timeouts is 10ms. Previously, any
 	  positive value was allowed. See cassandra.yaml entries like
 	  read_request_timeout_in_ms for more details.
+	- Cassandra 4.0 allows a single port to be used for both secure and insecure
+	  connections between cassandra nodes (CASSANDRA-10404). See the yaml for
+	  specific property changes, and see the security doc for full details.
 
 Materialized Views
 -------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index ef94613..e41af17 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -570,9 +570,10 @@ trickle_fsync_interval_in_kb: 10240
 # For security reasons, you should not expose this port to the internet.  Firewall it if needed.
 storage_port: 7000
 
-# SSL port, for encrypted communication.  Unused unless enabled in
-# encryption_options
-# For security reasons, you should not expose this port to the internet.  Firewall it if needed.
+# SSL port, for legacy encrypted communication. This property is unused unless enabled in
+# server_encryption_options (see below). As of cassandra 4.0, this property is deprecated
+# as a single port can be used for either/both secure and insecure connections.
+# For security reasons, you should not expose this port to the internet. Firewall it if needed.
 ssl_storage_port: 7001
 
 # Address or interface to bind to and tell other Cassandra nodes to connect to.
@@ -920,7 +921,7 @@ dynamic_snitch_reset_interval_in_ms: 600000
 dynamic_snitch_badness_threshold: 0.1
 
 # Enable or disable inter-node encryption
-# JVM defaults for supported SSL socket protocols and cipher suites can
+# JVM and netty defaults for supported SSL socket protocols and cipher suites can
 # be replaced using custom encryption options. This is not recommended
 # unless you have policies in place that dictate certain settings, or
 # need to disable vulnerable ciphers or protocols in case the JVM cannot
@@ -928,17 +929,25 @@ dynamic_snitch_badness_threshold: 0.1
 # FIPS compliant settings can be configured at JVM level and should not
 # involve changing encryption settings here:
 # https://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/FIPS.html
+#
 # *NOTE* No custom encryption options are enabled at the moment
 # The available internode options are : all, none, dc, rack
-#
 # If set to dc cassandra will encrypt the traffic between the DCs
 # If set to rack cassandra will encrypt the traffic between the racks
 #
 # The passwords used in these options must match the passwords used when generating
 # the keystore and truststore.  For instructions on generating these files, see:
-# http://download.oracle.com/javase/6/docs/technotes/guides/security/jsse/JSSERefGuide.html#CreateKeystore
+# http://download.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html#CreateKeystore
 #
 server_encryption_options:
+    # set to true for allowing secure incoming connections
+    enabled: false
+    # If enabled and optional are both set to true, encrypted and unencrypted connections are handled on the storage_port
+    optional: false
+    # if enabled, will open up an encrypted listening socket on ssl_storage_port. Should be used
+    # during upgrade to 4.0; otherwise, set to false.
+    enable_legacy_ssl_storage_port: false
+    # on outbound connections, determine which type of peers to securely connect to. 'enabled' must be set to true.
     internode_encryption: none
     keystore: conf/.keystore
     keystore_password: cassandra
@@ -952,7 +961,7 @@ server_encryption_options:
     # require_client_auth: false
     # require_endpoint_verification: false
 
-# enable or disable client/server encryption.
+# enable or disable client-to-server encryption.
 client_encryption_options:
     enabled: false
     # If enabled and optional is set to true encrypted and unencrypted connections are handled.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index a28d492..de193b0 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -209,9 +209,7 @@ public class Config
     public double dynamic_snitch_badness_threshold = 0.1;
 
     public EncryptionOptions.ServerEncryptionOptions server_encryption_options = new EncryptionOptions.ServerEncryptionOptions();
-    public EncryptionOptions.ClientEncryptionOptions client_encryption_options = new EncryptionOptions.ClientEncryptionOptions();
-    // this encOptions is for backward compatibility (a warning is logged by DatabaseDescriptor)
-    public EncryptionOptions.ServerEncryptionOptions encryption_options;
+    public EncryptionOptions client_encryption_options = new EncryptionOptions();
 
     public InternodeCompression internode_compression = InternodeCompression.none;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index d948abf..af1cbde 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.auth.IAuthorizer;
 import org.apache.cassandra.auth.IInternodeAuthenticator;
 import org.apache.cassandra.auth.IRoleManager;
 import org.apache.cassandra.config.Config.CommitLogSync;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions.InternodeEncryption;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -648,13 +649,6 @@ public class DatabaseDescriptor
             throw new ConfigurationException("index_summary_capacity_in_mb option was set incorrectly to '"
                                              + conf.index_summary_capacity_in_mb + "', it should be a non-negative integer.", false);
 
-        if(conf.encryption_options != null)
-        {
-            logger.warn("Please rename encryption_options as server_encryption_options in the yaml");
-            //operate under the assumption that server_encryption_options is not set in yaml rather than both
-            conf.server_encryption_options = conf.encryption_options;
-        }
-
         if (conf.user_defined_function_fail_timeout < 0)
             throw new ConfigurationException("user_defined_function_fail_timeout must not be negative", false);
         if (conf.user_defined_function_warn_timeout < 0)
@@ -683,6 +677,14 @@ public class DatabaseDescriptor
             throw new ConfigurationException("Encryption must be enabled in client_encryption_options for native_transport_port_ssl", false);
         }
 
+        // internode messaging encryption options
+        if (conf.server_encryption_options.internode_encryption != InternodeEncryption.none
+            && !conf.server_encryption_options.enabled)
+        {
+            throw new ConfigurationException("Encryption must be enabled in server_encryption_options when using peer-to-peer security. " +
+                                            "server_encryption_options.internode_encryption = " + conf.server_encryption_options.internode_encryption, false);
+        }
+
         if (conf.max_value_size_in_mb <= 0)
             throw new ConfigurationException("max_value_size_in_mb must be positive", false);
         else if (conf.max_value_size_in_mb >= 2048)
@@ -1638,6 +1640,11 @@ public class DatabaseDescriptor
         return listenAddress;
     }
 
+    public static void setListenAddress(InetAddress newlistenAddress)
+    {
+        listenAddress = newlistenAddress;
+    }
+
     public static InetAddress getBroadcastAddress()
     {
         return broadcastAddress;
@@ -1648,6 +1655,11 @@ public class DatabaseDescriptor
         return conf.listen_on_broadcast_address;
     }
 
+    public static void setShouldListenOnBroadcastAddress(boolean shouldListenOnBroadcastAddress)
+    {
+        conf.listen_on_broadcast_address = shouldListenOnBroadcastAddress;
+    }
+
     public static void setListenOnBroadcastAddress(boolean listen_on_broadcast_address)
     {
         conf.listen_on_broadcast_address = listen_on_broadcast_address;
@@ -1939,7 +1951,12 @@ public class DatabaseDescriptor
         return conf.server_encryption_options;
     }
 
-    public static EncryptionOptions.ClientEncryptionOptions getClientEncryptionOptions()
+    public static void setServerEncryptionOptions(EncryptionOptions.ServerEncryptionOptions encryptionOptions)
+    {
+        conf.server_encryption_options = encryptionOptions;
+    }
+
+    public static EncryptionOptions getClientEncryptionOptions()
     {
         return conf.client_encryption_options;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/config/EncryptionOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/EncryptionOptions.java b/src/java/org/apache/cassandra/config/EncryptionOptions.java
index 6010746..aecbfca 100644
--- a/src/java/org/apache/cassandra/config/EncryptionOptions.java
+++ b/src/java/org/apache/cassandra/config/EncryptionOptions.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.config;
 
-public abstract class EncryptionOptions
+public class EncryptionOptions
 {
     public String keystore = "conf/.keystore";
     public String keystore_password = "cassandra";
@@ -29,19 +29,52 @@ public abstract class EncryptionOptions
     public String store_type = "JKS";
     public boolean require_client_auth = false;
     public boolean require_endpoint_verification = false;
+    public boolean enabled = false;
+    public boolean optional = false;
 
-    public static class ClientEncryptionOptions extends EncryptionOptions
+    public EncryptionOptions()
+    {   }
+
+    /**
+     * Copy constructor
+     */
+    public EncryptionOptions(EncryptionOptions options)
     {
-        public boolean enabled = false;
-        public boolean optional = false;
+        keystore = options.keystore;
+        keystore_password = options.keystore_password;
+        truststore = options.truststore;
+        truststore_password = options.truststore_password;
+        cipher_suites = options.cipher_suites;
+        protocol = options.protocol;
+        algorithm = options.algorithm;
+        store_type = options.store_type;
+        require_client_auth = options.require_client_auth;
+        require_endpoint_verification = options.require_endpoint_verification;
+        enabled = options.enabled;
+        optional = options.optional;
     }
 
     public static class ServerEncryptionOptions extends EncryptionOptions
     {
-        public static enum InternodeEncryption
+        public enum InternodeEncryption
         {
             all, none, dc, rack
         }
+
         public InternodeEncryption internode_encryption = InternodeEncryption.none;
+        public boolean enable_legacy_ssl_storage_port = false;
+
+        public ServerEncryptionOptions()
+        {   }
+
+        /**
+         * Copy constructor
+         */
+        public ServerEncryptionOptions(ServerEncryptionOptions options)
+        {
+            super(options);
+            internode_encryption = options.internode_encryption;
+            enable_legacy_ssl_storage_port = options.enable_legacy_ssl_storage_port;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
index 2235c76..0b344c9 100644
--- a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
+++ b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
@@ -64,7 +64,7 @@ public class ReconnectableSnitchHelper implements IEndpointStateChangeSubscriber
     @VisibleForTesting
     static void reconnect(InetAddress publicAddress, InetAddress localAddress, IEndpointSnitch snitch, String localDc)
     {
-        if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(publicAddress, MessagingService.portFor(publicAddress)))
+        if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(publicAddress, MessagingService.instance().portFor(publicAddress)))
         {
             logger.debug("InternodeAuthenticator said don't reconnect to {} on {}", publicAddress, localAddress);
             return;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 2a44e68..4e6fe1c 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -1,4 +1,4 @@
-/*
+    /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -729,10 +729,15 @@ public final class MessagingService implements MessagingServiceMBean
 
     public void listen()
     {
+        listen(DatabaseDescriptor.getServerEncryptionOptions());
+    }
+
+    public void listen(ServerEncryptionOptions serverEncryptionOptions)
+    {
         callbacks.reset(); // hack to allow tests to stop/restart MS
-        listen(FBUtilities.getLocalAddress());
+        listen(FBUtilities.getLocalAddress(), serverEncryptionOptions);
         if (shouldListenOnBroadcastAddress())
-            listen(FBUtilities.getBroadcastAddress());
+            listen(FBUtilities.getBroadcastAddress(), serverEncryptionOptions);
         listenGate.signalAll();
     }
 
@@ -747,40 +752,54 @@ public final class MessagingService implements MessagingServiceMBean
      *
      * @param localEp InetAddress whose port to listen on.
      */
-    private void listen(InetAddress localEp) throws ConfigurationException
+    private void listen(InetAddress localEp, ServerEncryptionOptions serverEncryptionOptions) throws ConfigurationException
     {
         IInternodeAuthenticator authenticator = DatabaseDescriptor.getInternodeAuthenticator();
         int receiveBufferSize = DatabaseDescriptor.getInternodeRecvBufferSize();
 
-        if (DatabaseDescriptor.getServerEncryptionOptions().internode_encryption != ServerEncryptionOptions.InternodeEncryption.none)
+        // this is the legacy socket, for letting peer nodes that haven't upgrade yet connect to this node.
+        // should only occur during cluster upgrade. we can remove this block at 5.0!
+        if (serverEncryptionOptions.enabled && serverEncryptionOptions.enable_legacy_ssl_storage_port)
         {
+            // clone the encryption options, and explicitly set the optional field to false
+            // (do not allow non-TLS connections on the legacy ssl port)
+            ServerEncryptionOptions legacyEncOptions = new ServerEncryptionOptions(serverEncryptionOptions);
+            legacyEncOptions.optional = false;
+
             InetSocketAddress localAddr = new InetSocketAddress(localEp, DatabaseDescriptor.getSSLStoragePort());
-            ChannelGroup channelGroup = new DefaultChannelGroup("EncryptedInternodeMessagingGroup", NettyFactory.executorForChannelGroups());
-            InboundInitializer initializer = new InboundInitializer(authenticator, DatabaseDescriptor.getServerEncryptionOptions(), channelGroup);
+            ChannelGroup channelGroup = new DefaultChannelGroup("LegacyEncryptedInternodeMessagingGroup", NettyFactory.executorForChannelGroups());
+            InboundInitializer initializer = new InboundInitializer(authenticator, legacyEncOptions, channelGroup);
             Channel encryptedChannel = NettyFactory.instance.createInboundChannel(localAddr, initializer, receiveBufferSize);
-            serverChannels.add(new ServerChannel(encryptedChannel, channelGroup));
+            serverChannels.add(new ServerChannel(encryptedChannel, channelGroup, localAddr, ServerChannel.SecurityLevel.REQUIRED));
         }
 
-        if (DatabaseDescriptor.getServerEncryptionOptions().internode_encryption != ServerEncryptionOptions.InternodeEncryption.all)
-        {
-            InetSocketAddress localAddr = new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort());
-            ChannelGroup channelGroup = new DefaultChannelGroup("InternodeMessagingGroup", NettyFactory.executorForChannelGroups());
-            InboundInitializer initializer = new InboundInitializer(authenticator, null, channelGroup);
-            Channel channel = NettyFactory.instance.createInboundChannel(localAddr, initializer, receiveBufferSize);
-            serverChannels.add(new ServerChannel(channel, channelGroup));
-        }
-
-        if (serverChannels.isEmpty())
-            throw new IllegalStateException("no listening channels set up in MessagingService!");
+        // this is for the socket that can be plain, only ssl, or optional plain/ssl
+        InetSocketAddress localAddr = new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort());
+        ChannelGroup channelGroup = new DefaultChannelGroup("InternodeMessagingGroup", NettyFactory.executorForChannelGroups());
+        InboundInitializer initializer = new InboundInitializer(authenticator, serverEncryptionOptions, channelGroup);
+        Channel channel = NettyFactory.instance.createInboundChannel(localAddr, initializer, receiveBufferSize);
+        ServerChannel.SecurityLevel securityLevel = !serverEncryptionOptions.enabled ? ServerChannel.SecurityLevel.NONE :
+                                                    serverEncryptionOptions.optional ? ServerChannel.SecurityLevel.OPTIONAL :
+                                                    ServerChannel.SecurityLevel.REQUIRED;
+        serverChannels.add(new ServerChannel(channel, channelGroup, localAddr, securityLevel));
     }
 
     /**
      * A simple struct to wrap up the the components needed for each listening socket.
+     * <p>
+     * The {@link #securityLevel} is captured independently of the {@link #channel} as there's no real way to inspect a s
+     * erver-side 'channel' to check if it using TLS or not (the channel's configured pipeline will only apply to
+     * connections that get created, so it's not inspectible). {@link #securityLevel} is really only used for testing, anyway.
      */
     @VisibleForTesting
     static class ServerChannel
     {
         /**
+         * Declares the type of TLS used with the channel.
+         */
+        enum SecurityLevel { NONE, OPTIONAL, REQUIRED }
+
+        /**
          * The base {@link Channel} that is doing the spcket listen/accept.
          */
         private final Channel channel;
@@ -790,23 +809,46 @@ public final class MessagingService implements MessagingServiceMBean
          * the inbound connections/channels can be closed when the listening socket itself is being closed.
          */
         private final ChannelGroup connectedChannels;
+        private final InetSocketAddress address;
+        private final SecurityLevel securityLevel;
 
-        private ServerChannel(Channel channel, ChannelGroup channelGroup)
+        private ServerChannel(Channel channel, ChannelGroup channelGroup, InetSocketAddress address, SecurityLevel securityLevel)
         {
             this.channel = channel;
             this.connectedChannels = channelGroup;
+            this.address = address;
+            this.securityLevel = securityLevel;
         }
 
         void close()
         {
-            channel.close().syncUninterruptibly();
-            connectedChannels.close().syncUninterruptibly();
+            if (channel.isOpen())
+                channel.close().awaitUninterruptibly();
+            connectedChannels.close().awaitUninterruptibly();
         }
-        int size()
 
+        int size()
         {
             return connectedChannels.size();
         }
+
+        /**
+         * For testing only!
+         */
+        Channel getChannel()
+        {
+            return channel;
+        }
+
+        InetSocketAddress getAddress()
+        {
+            return address;
+        }
+
+        SecurityLevel getSecurityLevel()
+        {
+            return securityLevel;
+        }
     }
 
     public void waitUntilListening()
@@ -1038,6 +1080,11 @@ public final class MessagingService implements MessagingServiceMBean
      */
     public void shutdown()
     {
+        shutdown(false);
+    }
+
+    public void shutdown(boolean isTest)
+    {
         logger.info("Waiting for messaging service to quiesce");
         // We may need to schedule hints on the mutation stage, so it's erroneous to shut down the mutation stage first
         assert !StageManager.getStage(Stage.MUTATION).isShutdown();
@@ -1057,7 +1104,8 @@ public final class MessagingService implements MessagingServiceMBean
             for (OutboundMessagingPool pool : channelManagers.values())
                 pool.close(false);
 
-            NettyFactory.instance.close();
+            if (!isTest)
+                NettyFactory.instance.close();
         }
         catch (Exception e)
         {
@@ -1065,6 +1113,14 @@ public final class MessagingService implements MessagingServiceMBean
         }
     }
 
+    /**
+     * For testing only!
+     */
+    void clearServerChannels()
+    {
+        serverChannels.clear();
+    }
+
     public void receive(MessageIn message, int id)
     {
         TraceState state = Tracing.instance.initializeFromMessage(message);
@@ -1443,7 +1499,7 @@ public final class MessagingService implements MessagingServiceMBean
         if (pool == null)
         {
             final boolean secure = isEncryptedConnection(to);
-            final int port = portFor(secure);
+            final int port = portFor(to, secure);
             if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(to, port))
                 return null;
 
@@ -1463,15 +1519,25 @@ public final class MessagingService implements MessagingServiceMBean
         return pool;
     }
 
-    public static int portFor(InetAddress addr)
+    public int portFor(InetAddress addr)
     {
         final boolean secure = isEncryptedConnection(addr);
-        return portFor(secure);
+        return portFor(addr, secure);
     }
 
-    private static int portFor(boolean secure)
+    private int portFor(InetAddress address, boolean secure)
     {
-        return secure ? DatabaseDescriptor.getSSLStoragePort() : DatabaseDescriptor.getStoragePort();
+        if (!secure)
+            return DatabaseDescriptor.getStoragePort();
+
+        Integer v = versions.get(address);
+        // if we don't know the version of the peer, assume it is 4.0 (or higher) as the only time is would be lower
+        // (as in a 3.x version) is during a cluster upgrade (from 3.x to 4.0). In that case the outbound connection will
+        // unfortunately fail - however the peer should connect to this node (at some point), and once we learn it's version, it'll be
+        // in versions map. thus, when we attempt to reconnect to that node, we'll have the version and we can get the correct port.
+        // we will be able to remove this logic at 5.0.
+        int version = v != null ? v.intValue() : VERSION_40;
+        return version < VERSION_40 ? DatabaseDescriptor.getSSLStoragePort() : DatabaseDescriptor.getStoragePort();
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/net/async/NettyFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/NettyFactory.java b/src/java/org/apache/cassandra/net/async/NettyFactory.java
index d193e31..7fb81d3 100644
--- a/src/java/org/apache/cassandra/net/async/NettyFactory.java
+++ b/src/java/org/apache/cassandra/net/async/NettyFactory.java
@@ -1,8 +1,10 @@
 package org.apache.cassandra.net.async;
 
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.zip.Checksum;
 
+import javax.annotation.Nullable;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLParameters;
 
@@ -45,7 +47,6 @@ import net.jpountz.xxhash.XXHashFactory;
 import org.apache.cassandra.auth.IInternodeAuthenticator;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
-import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions.InternodeEncryption;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.security.SSLFactory;
@@ -53,7 +54,6 @@ import org.apache.cassandra.service.NativeTransportService;
 import org.apache.cassandra.utils.ChecksumType;
 import org.apache.cassandra.utils.CoalescingStrategies;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.NativeLibrary;
 
 /**
  * A factory for building Netty {@link Channel}s. Channels here are setup with a pipeline to participate
@@ -70,17 +70,13 @@ public final class NettyFactory
 
     private static final int LZ4_HASH_SEED = 0x9747b28c;
 
-    /**
-     * Default seed value for xxhash.
-     */
-    public static final int XXHASH_DEFAULT_SEED = 0x9747b28c;
-
     public enum Mode { MESSAGING, STREAMING }
 
-    private static final String SSL_CHANNEL_HANDLER_NAME = "ssl";
-    public static final String INBOUND_COMPRESSOR_HANDLER_NAME = "inboundCompressor";
-    public static final String OUTBOUND_COMPRESSOR_HANDLER_NAME = "outboundCompressor";
-    public static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
+    static final String SSL_CHANNEL_HANDLER_NAME = "ssl";
+    private static final String OPTIONAL_SSL_CHANNEL_HANDLER_NAME = "optionalSsl";
+    static final String INBOUND_COMPRESSOR_HANDLER_NAME = "inboundCompressor";
+    static final String OUTBOUND_COMPRESSOR_HANDLER_NAME = "outboundCompressor";
+    private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
     public static final String INBOUND_STREAM_HANDLER_NAME = "inboundStreamHandler";
 
     /** a useful addition for debugging; simply set to true to get more data in your logs */
@@ -125,7 +121,7 @@ public final class NettyFactory
     NettyFactory(boolean useEpoll)
     {
         this.useEpoll = useEpoll;
-        acceptGroup = getEventLoopGroup(useEpoll, determineAcceptGroupSize(DatabaseDescriptor.getServerEncryptionOptions().internode_encryption),
+        acceptGroup = getEventLoopGroup(useEpoll, determineAcceptGroupSize(DatabaseDescriptor.getServerEncryptionOptions()),
                                         "MessagingService-NettyAcceptor-Thread", false);
         inboundGroup = getEventLoopGroup(useEpoll, FBUtilities.getAvailableProcessors(), "MessagingService-NettyInbound-Thread", false);
         outboundGroup = getEventLoopGroup(useEpoll, FBUtilities.getAvailableProcessors(), "MessagingService-NettyOutbound-Thread", true);
@@ -134,19 +130,23 @@ public final class NettyFactory
 
     /**
      * Determine the number of accept threads we need, which is based upon the number of listening sockets we will have.
-     * We'll have either 1 or 2 listen sockets, depending on if we use SSL or not in combination with non-SSL. If we have both,
-     * we'll have two sockets, and thus need two threads; else one socket and one thread.
-     *
-     * If the operator has configured multiple IP addresses (both {@link org.apache.cassandra.config.Config#broadcast_address}
-     * and {@link org.apache.cassandra.config.Config#listen_address} are configured), then we listen on another set of sockets
-     * - basically doubling the count. See CASSANDRA-9748 for more details.
+     * The idea is one accept thread per listening socket.
      */
-    static int determineAcceptGroupSize(InternodeEncryption internode_encryption)
+    public static int determineAcceptGroupSize(ServerEncryptionOptions serverEncryptionOptions)
     {
-        int listenSocketCount = internode_encryption == InternodeEncryption.dc || internode_encryption == InternodeEncryption.rack ? 2 : 1;
+        int listenSocketCount = 1;
+
+        boolean listenOnBroadcastAddr = MessagingService.shouldListenOnBroadcastAddress();
+        if (listenOnBroadcastAddr)
+            listenSocketCount++;
 
-        if (MessagingService.shouldListenOnBroadcastAddress())
-            listenSocketCount *= 2;
+        if (serverEncryptionOptions.enable_legacy_ssl_storage_port)
+        {
+            listenSocketCount++;
+
+            if (listenOnBroadcastAddr)
+                listenSocketCount++;
+        }
 
         return listenSocketCount;
     }
@@ -236,6 +236,28 @@ public final class NettyFactory
         return channelFuture.channel();
     }
 
+    /**
+     * Creates a new {@link SslHandler} from provided SslContext.
+     * @param peer enables endpoint verification for remote address when not null
+     */
+    static SslHandler newSslHandler(Channel channel, SslContext sslContext, @Nullable InetSocketAddress peer)
+    {
+        if (peer == null)
+        {
+            return sslContext.newHandler(channel.alloc());
+        }
+        else
+        {
+            logger.debug("Creating SSL handler for %s:%d", peer.getHostString(), peer.getPort());
+            SslHandler sslHandler = sslContext.newHandler(channel.alloc(), peer.getHostString(), peer.getPort());
+            SSLEngine engine = sslHandler.engine();
+            SSLParameters sslParameters = engine.getSSLParameters();
+            sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
+            engine.setSSLParameters(sslParameters);
+            return sslHandler;
+        }
+    }
+
     public static class InboundInitializer extends ChannelInitializer<SocketChannel>
     {
         private final IInternodeAuthenticator authenticator;
@@ -256,12 +278,20 @@ public final class NettyFactory
             ChannelPipeline pipeline = channel.pipeline();
 
             // order of handlers: ssl -> logger -> handshakeHandler
-            if (encryptionOptions != null)
+            if (encryptionOptions.enabled)
             {
-                SslContext sslContext = SSLFactory.getSslContext(encryptionOptions, true, true);
-                SslHandler sslHandler = sslContext.newHandler(channel.alloc());
-                logger.trace("creating inbound netty SslContext: context={}, engine={}", sslContext.getClass().getName(), sslHandler.engine().getClass().getName());
-                pipeline.addFirst(SSL_CHANNEL_HANDLER_NAME, sslHandler);
+                if (encryptionOptions.optional)
+                {
+                    pipeline.addFirst(OPTIONAL_SSL_CHANNEL_HANDLER_NAME, new OptionalSslHandler(encryptionOptions));
+                }
+                else
+                {
+                    SslContext sslContext = SSLFactory.getSslContext(encryptionOptions, true, true);
+                    InetSocketAddress peer = encryptionOptions.require_endpoint_verification ? channel.remoteAddress() : null;
+                    SslHandler sslHandler = newSslHandler(channel, sslContext, peer);
+                    logger.trace("creating inbound netty SslContext: context={}, engine={}", sslContext.getClass().getName(), sslHandler.engine().getClass().getName());
+                    pipeline.addFirst(SSL_CHANNEL_HANDLER_NAME, sslHandler);
+                }
             }
 
             if (WIRETRACE)
@@ -271,7 +301,7 @@ public final class NettyFactory
         }
     }
 
-    private String encryptionLogStatement(ServerEncryptionOptions options)
+    private static String encryptionLogStatement(ServerEncryptionOptions options)
     {
         if (options == null)
             return "disabled";
@@ -287,9 +317,11 @@ public final class NettyFactory
     @VisibleForTesting
     public Bootstrap createOutboundBootstrap(OutboundConnectionParams params)
     {
-        logger.debug("creating outbound bootstrap to peer {}, compression: {}, encryption: {}, coalesce: {}", params.connectionId.connectionAddress(),
+        logger.debug("creating outbound bootstrap to peer {}, compression: {}, encryption: {}, coalesce: {}, protocolVersion: {}",
+                     params.connectionId.connectionAddress(),
                      params.compress, encryptionLogStatement(params.encryptionOptions),
-                     params.coalescingStrategy.isPresent() ? params.coalescingStrategy.get() : CoalescingStrategies.Strategy.DISABLED);
+                     params.coalescingStrategy.isPresent() ? params.coalescingStrategy.get() : CoalescingStrategies.Strategy.DISABLED,
+                     params.protocolVersion);
         Class<? extends Channel> transport = useEpoll ? EpollSocketChannel.class : NioSocketChannel.class;
         Bootstrap bootstrap = new Bootstrap().group(params.mode == Mode.MESSAGING ? outboundGroup : streamingGroup)
                               .channel(transport)
@@ -315,6 +347,12 @@ public final class NettyFactory
             this.params = params;
         }
 
+        /**
+         * {@inheritDoc}
+         *
+         * To determine if we should enable TLS, we only need to check if {@link #params#encryptionOptions} is set.
+         * The logic for figuring that out is is located in {@link MessagingService#getMessagingConnection(InetAddress)};
+         */
         public void initChannel(SocketChannel channel) throws Exception
         {
             ChannelPipeline pipeline = channel.pipeline();
@@ -323,22 +361,9 @@ public final class NettyFactory
             if (params.encryptionOptions != null)
             {
                 SslContext sslContext = SSLFactory.getSslContext(params.encryptionOptions, true, false);
-
-                final SslHandler sslHandler;
-                if (params.encryptionOptions.require_endpoint_verification)
-                {
-                    InetSocketAddress peer = params.connectionId.remoteAddress();
-                    sslHandler = sslContext.newHandler(channel.alloc(), peer.getHostString(), peer.getPort());
-                    SSLEngine engine = sslHandler.engine();
-                    SSLParameters sslParameters = engine.getSSLParameters();
-                    sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
-                    engine.setSSLParameters(sslParameters);
-                }
-                else
-                {
-                    sslHandler = sslContext.newHandler(channel.alloc());
-                }
-
+                // for some reason channel.remoteAddress() will return null
+                InetSocketAddress peer = params.encryptionOptions.require_endpoint_verification ? params.connectionId.remoteAddress() : null;
+                SslHandler sslHandler = newSslHandler(channel, sslContext, peer);
                 logger.trace("creating outbound netty SslContext: context={}, engine={}", sslContext.getClass().getName(), sslHandler.engine().getClass().getName());
                 pipeline.addFirst(SSL_CHANNEL_HANDLER_NAME, sslHandler);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/net/async/OptionalSslHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OptionalSslHandler.java b/src/java/org/apache/cassandra/net/async/OptionalSslHandler.java
new file mode 100644
index 0000000..b60ae13
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/OptionalSslHandler.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslHandler;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
+import org.apache.cassandra.security.SSLFactory;
+
+public class OptionalSslHandler extends ByteToMessageDecoder
+{
+    private final ServerEncryptionOptions encryptionOptions;
+
+    OptionalSslHandler(ServerEncryptionOptions encryptionOptions)
+    {
+        this.encryptionOptions = encryptionOptions;
+    }
+
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception
+    {
+        if (in.readableBytes() < 5)
+        {
+            // To detect if SSL must be used we need to have at least 5 bytes, so return here and try again
+            // once more bytes a ready.
+            return;
+        }
+
+        if (SslHandler.isEncrypted(in))
+        {
+            // Connection uses SSL/TLS, replace the detection handler with a SslHandler and so use encryption.
+            SslContext sslContext = SSLFactory.getSslContext(encryptionOptions, true, true);
+            Channel channel = ctx.channel();
+            InetSocketAddress peer = encryptionOptions.require_endpoint_verification ? (InetSocketAddress) channel.remoteAddress() : null;
+            SslHandler sslHandler = NettyFactory.newSslHandler(channel, sslContext, peer);
+            ctx.pipeline().replace(this, NettyFactory.SSL_CHANNEL_HANDLER_NAME, sslHandler);
+        }
+        else
+        {
+            // Connection use no TLS/SSL encryption, just remove the detection handler and continue without
+            // SslHandler in the pipeline.
+            ctx.pipeline().remove(this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
index c834bd4..6b2ff0d 100644
--- a/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
+++ b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
@@ -120,6 +120,12 @@ public class OutboundConnectionIdentifier
         return new OutboundConnectionIdentifier(localAddr, remoteAddr, remoteConnectionAddr, connectionType);
     }
 
+    public OutboundConnectionIdentifier withNewConnectionPort(int port)
+    {
+        return new OutboundConnectionIdentifier(localAddr, new InetSocketAddress(remoteAddr.getAddress(), port),
+                                                new InetSocketAddress(remoteConnectionAddr.getAddress(), port), connectionType);
+    }
+
     /**
      * The local node address.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java b/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
index 6bda9cd..4522ba4 100644
--- a/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
+++ b/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
@@ -271,6 +271,7 @@ public class OutboundMessagingConnection
         }
 
         boolean compress = shouldCompressConnection(connectionId.local(), connectionId.remote());
+        maybeUpdateConnectionId();
         Bootstrap bootstrap = buildBootstrap(compress);
 
         ChannelFuture connectFuture = bootstrap.connect();
@@ -289,12 +290,38 @@ public class OutboundMessagingConnection
                || ((DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.dc) && !isLocalDC(localHost, remoteHost));
     }
 
+    /**
+     * After a bounce we won't necessarily know the peer's version, so we assume the peer is at least 4.0
+     * and thus using a single port for secure and non-secure communication. However, during a rolling upgrade from
+     * 3.0.x/3.x to 4.0, the not-yet upgraded peer is still listening on separate ports, but we don't know the peer's
+     * version until we can successfully connect. Fortunately, the peer can connect to this node, at which point
+     * we'll grab it's version. We then use that knowledge to use the {@link Config#ssl_storage_port} to connect on,
+     * and to do that we need to update some member fields in this instance.
+     *
+     * Note: can be removed at 5.0
+     */
+    void maybeUpdateConnectionId()
+    {
+        if (encryptionOptions != null)
+        {
+            int version = MessagingService.instance().getVersion(connectionId.remote());
+            if (version < targetVersion)
+            {
+                targetVersion = version;
+                int port = MessagingService.instance().portFor(connectionId.remote());
+                connectionId = connectionId.withNewConnectionPort(port);
+                logger.debug("changing connectionId to {}, with a different port for secure communication, because peer version is {}", connectionId, version);
+            }
+        }
+    }
+
     private Bootstrap buildBootstrap(boolean compress)
     {
         boolean tcpNoDelay = isLocalDC(connectionId.local(), connectionId.remote()) ? INTRADC_TCP_NODELAY : DatabaseDescriptor.getInterDCTcpNoDelay();
         int sendBufferSize = DatabaseDescriptor.getInternodeSendBufferSize() > 0
                              ? DatabaseDescriptor.getInternodeSendBufferSize()
                              : OutboundConnectionParams.DEFAULT_SEND_BUFFER_SIZE;
+
         OutboundConnectionParams params = OutboundConnectionParams.builder()
                                                                   .connectionId(connectionId)
                                                                   .callback(this::finishHandshake)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 0381416..b6351f9 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -198,7 +198,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         this.index = index;
 
         OutboundConnectionIdentifier id = OutboundConnectionIdentifier.stream(new InetSocketAddress(FBUtilities.getBroadcastAddress(), 0),
-                                                                              new InetSocketAddress(connecting, MessagingService.portFor(connecting)));
+                                                                              new InetSocketAddress(connecting, MessagingService.instance().portFor(connecting)));
         this.messageSender = new NettyStreamingMessageSender(this, id, factory, StreamMessage.CURRENT_VERSION, previewKind.isPreview());
         this.metrics = StreamingMetrics.get(connecting);
         this.keepSSTableLevel = keepSSTableLevel;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index e7b812f..0812e53 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -242,7 +242,7 @@ public class BulkLoader
         }
     }
 
-    private static SSLOptions buildSSLOptions(EncryptionOptions.ClientEncryptionOptions clientEncryptionOptions)
+    private static SSLOptions buildSSLOptions(EncryptionOptions clientEncryptionOptions)
     {
 
         if (!clientEncryptionOptions.enabled)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/tools/LoaderOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/LoaderOptions.java b/src/java/org/apache/cassandra/tools/LoaderOptions.java
index 38317b6..c821e6a 100644
--- a/src/java/org/apache/cassandra/tools/LoaderOptions.java
+++ b/src/java/org/apache/cassandra/tools/LoaderOptions.java
@@ -77,7 +77,7 @@ public class LoaderOptions
     public final int interDcThrottle;
     public final int storagePort;
     public final int sslStoragePort;
-    public final EncryptionOptions.ClientEncryptionOptions clientEncOptions;
+    public final EncryptionOptions clientEncOptions;
     public final int connectionsPerHost;
     public final EncryptionOptions.ServerEncryptionOptions serverEncOptions;
     public final Set<InetAddress> hosts;
@@ -119,7 +119,7 @@ public class LoaderOptions
         int interDcThrottle = 0;
         int storagePort;
         int sslStoragePort;
-        EncryptionOptions.ClientEncryptionOptions clientEncOptions = new EncryptionOptions.ClientEncryptionOptions();
+        EncryptionOptions clientEncOptions = new EncryptionOptions();
         int connectionsPerHost = 1;
         EncryptionOptions.ServerEncryptionOptions serverEncOptions = new EncryptionOptions.ServerEncryptionOptions();
         Set<InetAddress> hosts = new HashSet<>();
@@ -208,7 +208,7 @@ public class LoaderOptions
             return this;
         }
 
-        public Builder encOptions(EncryptionOptions.ClientEncryptionOptions encOptions)
+        public Builder encOptions(EncryptionOptions encOptions)
         {
             this.clientEncOptions = encOptions;
             return this;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java
index 4793d17..3632175 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -28,6 +28,7 @@ import com.google.common.base.Splitter;
 
 import org.apache.cassandra.auth.PasswordAuthenticator;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.marshal.Int32Type;
@@ -37,13 +38,11 @@ import org.apache.cassandra.utils.Hex;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.MD5Digest;
 
-import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
-
 public class Client extends SimpleClient
 {
     private final SimpleEventHandler eventHandler = new SimpleEventHandler();
 
-    public Client(String host, int port, ProtocolVersion version, ClientEncryptionOptions encryptionOptions)
+    public Client(String host, int port, ProtocolVersion version, EncryptionOptions encryptionOptions)
     {
         super(host, port, version, encryptionOptions);
         setEventHandler(eventHandler);
@@ -248,7 +247,7 @@ public class Client extends SimpleClient
         int port = Integer.parseInt(args[1]);
         ProtocolVersion version = args.length == 3 ? ProtocolVersion.decode(Integer.parseInt(args[2])) : ProtocolVersion.CURRENT;
 
-        ClientEncryptionOptions encryptionOptions = new ClientEncryptionOptions();
+        EncryptionOptions encryptionOptions = new EncryptionOptions();
         System.out.println("CQL binary protocol console " + host + "@" + port + " using native protocol version " + version);
 
         new Client(host, port, version, encryptionOptions).run();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 9408a3a..d3f1c2c 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -137,7 +137,7 @@ public class Server implements CassandraDaemon.Server
 
         if (this.useSSL)
         {
-            final EncryptionOptions.ClientEncryptionOptions clientEnc = DatabaseDescriptor.getClientEncryptionOptions();
+            final EncryptionOptions clientEnc = DatabaseDescriptor.getClientEncryptionOptions();
 
             if (clientEnc.optional)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index ddd3484..9c1fb07 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -41,6 +41,7 @@ import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.handler.ssl.SslContext;
 import io.netty.util.internal.logging.InternalLoggerFactory;
 import io.netty.util.internal.logging.Slf4JLoggerFactory;
+import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.security.SSLFactory;
@@ -56,7 +57,7 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPipeline;
-import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
+import io.netty.handler.ssl.SslHandler;
 
 public class SimpleClient implements Closeable
 {
@@ -68,7 +69,7 @@ public class SimpleClient implements Closeable
     private static final Logger logger = LoggerFactory.getLogger(SimpleClient.class);
     public final String host;
     public final int port;
-    private final ClientEncryptionOptions encryptionOptions;
+    private final EncryptionOptions encryptionOptions;
 
     protected final ResponseHandler responseHandler = new ResponseHandler();
     protected final Connection.Tracker tracker = new ConnectionTracker();
@@ -87,22 +88,22 @@ public class SimpleClient implements Closeable
         }
     };
 
-    public SimpleClient(String host, int port, ProtocolVersion version, ClientEncryptionOptions encryptionOptions)
+    public SimpleClient(String host, int port, ProtocolVersion version, EncryptionOptions encryptionOptions)
     {
         this(host, port, version, false, encryptionOptions);
     }
 
-    public SimpleClient(String host, int port, ClientEncryptionOptions encryptionOptions)
+    public SimpleClient(String host, int port, EncryptionOptions encryptionOptions)
     {
         this(host, port, ProtocolVersion.CURRENT, encryptionOptions);
     }
 
     public SimpleClient(String host, int port, ProtocolVersion version)
     {
-        this(host, port, version, new ClientEncryptionOptions());
+        this(host, port, version, new EncryptionOptions());
     }
 
-    public SimpleClient(String host, int port, ProtocolVersion version, boolean useBeta, ClientEncryptionOptions encryptionOptions)
+    public SimpleClient(String host, int port, ProtocolVersion version, boolean useBeta, EncryptionOptions encryptionOptions)
     {
         this.host = host;
         this.port = port;
@@ -115,7 +116,7 @@ public class SimpleClient implements Closeable
 
     public SimpleClient(String host, int port)
     {
-        this(host, port, new ClientEncryptionOptions());
+        this(host, port, new EncryptionOptions());
     }
 
     public void connect(boolean useCompression) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 3faa034..1cb59d4 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -835,7 +835,7 @@ public class FBUtilities
     }
 
     @VisibleForTesting
-    protected static void reset()
+    public static void reset()
     {
         localInetAddress = null;
         broadcastInetAddress = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
index a082d56..f0a959e 100644
--- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
+++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
@@ -36,6 +36,7 @@ import java.util.regex.*;
 import java.util.regex.Matcher;
 
 import com.google.common.collect.Iterables;
+import com.google.common.net.InetAddresses;
 
 import com.codahale.metrics.Timer;
 
@@ -43,6 +44,7 @@ import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import org.apache.cassandra.auth.IInternodeAuthenticator;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.monitoring.ApproximateTime;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -77,7 +79,9 @@ public class MessagingServiceTest
 
         }
     };
-    static final IInternodeAuthenticator originalAuthenticator = DatabaseDescriptor.getInternodeAuthenticator();
+    private static IInternodeAuthenticator originalAuthenticator;
+    private static ServerEncryptionOptions originalServerEncryptionOptions;
+    private static InetAddress originalListenAddress;
 
     private final MessagingService messagingService = MessagingService.test();
 
@@ -87,6 +91,9 @@ public class MessagingServiceTest
         DatabaseDescriptor.daemonInitialization();
         DatabaseDescriptor.setBackPressureStrategy(new MockBackPressureStrategy(Collections.emptyMap()));
         DatabaseDescriptor.setBroadcastAddress(InetAddress.getByName("127.0.0.1"));
+        originalAuthenticator = DatabaseDescriptor.getInternodeAuthenticator();
+        originalServerEncryptionOptions = DatabaseDescriptor.getServerEncryptionOptions();
+        originalListenAddress = DatabaseDescriptor.getListenAddress();
     }
 
     private static int metricScopeId = 0;
@@ -101,9 +108,13 @@ public class MessagingServiceTest
     }
 
     @After
-    public void replaceAuthenticator()
+    public void tearDown()
     {
         DatabaseDescriptor.setInternodeAuthenticator(originalAuthenticator);
+        DatabaseDescriptor.setServerEncryptionOptions(originalServerEncryptionOptions);
+        DatabaseDescriptor.setShouldListenOnBroadcastAddress(false);
+        DatabaseDescriptor.setListenAddress(originalListenAddress);
+        FBUtilities.reset();
     }
 
     @Test
@@ -465,39 +476,188 @@ public class MessagingServiceTest
     @Test
     public void testCloseInboundConnections() throws UnknownHostException, InterruptedException
     {
-        messagingService.listen();
-        Assert.assertTrue(messagingService.isListening());
-        Assert.assertTrue(messagingService.serverChannels.size() > 0);
-        for (ServerChannel serverChannel : messagingService.serverChannels)
-            Assert.assertEquals(0, serverChannel.size());
-
-        // now, create a connection and make sure it's in a channel group
-        InetSocketAddress server = new InetSocketAddress(FBUtilities.getBroadcastAddress(), DatabaseDescriptor.getStoragePort());
-        OutboundConnectionIdentifier id = OutboundConnectionIdentifier.small(new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 0), server);
-
-        CountDownLatch latch = new CountDownLatch(1);
-        OutboundConnectionParams params = OutboundConnectionParams.builder()
-                                                                  .mode(NettyFactory.Mode.MESSAGING)
-                                                                  .sendBufferSize(1 << 10)
-                                                                  .connectionId(id)
-                                                                  .callback(handshakeResult -> latch.countDown())
-                                                                  .protocolVersion(MessagingService.current_version)
-                                                                  .build();
-        Bootstrap bootstrap = NettyFactory.instance.createOutboundBootstrap(params);
-        Channel channel = bootstrap.connect().awaitUninterruptibly().channel();
-        Assert.assertNotNull(channel);
-        latch.await(1, TimeUnit.SECONDS); // allow the netty pipeline/c* handshake to get set up
-
-        int connectCount = 0;
-        for (ServerChannel serverChannel : messagingService.serverChannels)
-            connectCount += serverChannel.size();
-        Assert.assertTrue(connectCount > 0);
-
-        // last, shutdown the MS and make sure connections are removed
-        messagingService.shutdown();
-        for (ServerChannel serverChannel : messagingService.serverChannels)
-            Assert.assertEquals(0, serverChannel.size());
+        try
+        {
+            messagingService.listen();
+            Assert.assertTrue(messagingService.isListening());
+            Assert.assertTrue(messagingService.serverChannels.size() > 0);
+            for (ServerChannel serverChannel : messagingService.serverChannels)
+                Assert.assertEquals(0, serverChannel.size());
+
+            // now, create a connection and make sure it's in a channel group
+            InetSocketAddress server = new InetSocketAddress(FBUtilities.getBroadcastAddress(), DatabaseDescriptor.getStoragePort());
+            OutboundConnectionIdentifier id = OutboundConnectionIdentifier.small(new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 0), server);
+
+            CountDownLatch latch = new CountDownLatch(1);
+            OutboundConnectionParams params = OutboundConnectionParams.builder()
+                                                                      .mode(NettyFactory.Mode.MESSAGING)
+                                                                      .sendBufferSize(1 << 10)
+                                                                      .connectionId(id)
+                                                                      .callback(handshakeResult -> latch.countDown())
+                                                                      .protocolVersion(MessagingService.current_version)
+                                                                      .build();
+            Bootstrap bootstrap = NettyFactory.instance.createOutboundBootstrap(params);
+            Channel channel = bootstrap.connect().awaitUninterruptibly().channel();
+            Assert.assertNotNull(channel);
+            latch.await(1, TimeUnit.SECONDS); // allow the netty pipeline/c* handshake to get set up
+
+            int connectCount = 0;
+            for (ServerChannel serverChannel : messagingService.serverChannels)
+                connectCount += serverChannel.size();
+            Assert.assertTrue(connectCount > 0);
+        }
+        finally
+        {
+            // last, shutdown the MS and make sure connections are removed
+            messagingService.shutdown(true);
+            for (ServerChannel serverChannel : messagingService.serverChannels)
+                Assert.assertEquals(0, serverChannel.size());
+            messagingService.clearServerChannels();
+        }
+    }
+
+    @Test
+    public void listenPlainConnection()
+    {
+        ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions();
+        serverEncryptionOptions.enabled = false;
+        listen(serverEncryptionOptions, false);
+    }
+
+    @Test
+    public void listenPlainConnectionWithBroadcastAddr()
+    {
+        ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions();
+        serverEncryptionOptions.enabled = false;
+        listen(serverEncryptionOptions, true);
+    }
+
+    @Test
+    public void listenRequiredSecureConnection()
+    {
+        ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions();
+        serverEncryptionOptions.enabled = true;
+        serverEncryptionOptions.optional = false;
+        serverEncryptionOptions.enable_legacy_ssl_storage_port = false;
+        listen(serverEncryptionOptions, false);
+    }
+
+    @Test
+    public void listenRequiredSecureConnectionWithBroadcastAddr()
+    {
+        ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions();
+        serverEncryptionOptions.enabled = true;
+        serverEncryptionOptions.optional = false;
+        serverEncryptionOptions.enable_legacy_ssl_storage_port = false;
+        listen(serverEncryptionOptions, true);
+    }
+
+    @Test
+    public void listenRequiredSecureConnectionWithLegacyPort()
+    {
+        ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions();
+        serverEncryptionOptions.enabled = true;
+        serverEncryptionOptions.optional = false;
+        serverEncryptionOptions.enable_legacy_ssl_storage_port = true;
+        listen(serverEncryptionOptions, false);
+    }
+
+    @Test
+    public void listenRequiredSecureConnectionWithBroadcastAddrAndLegacyPort()
+    {
+        ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions();
+        serverEncryptionOptions.enabled = true;
+        serverEncryptionOptions.optional = false;
+        serverEncryptionOptions.enable_legacy_ssl_storage_port = true;
+        listen(serverEncryptionOptions, true);
     }
 
+    @Test
+    public void listenOptionalSecureConnection()
+    {
+        ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions();
+        serverEncryptionOptions.enabled = true;
+        serverEncryptionOptions.optional = true;
+        listen(serverEncryptionOptions, false);
+    }
+
+    @Test
+    public void listenOptionalSecureConnectionWithBroadcastAddr()
+    {
+        ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions();
+        serverEncryptionOptions.enabled = true;
+        serverEncryptionOptions.optional = true;
+        listen(serverEncryptionOptions, true);
+    }
+
+    private void listen(ServerEncryptionOptions serverEncryptionOptions, boolean listenOnBroadcastAddr)
+    {
+        InetAddress listenAddress = null;
+        if (listenOnBroadcastAddr)
+        {
+            DatabaseDescriptor.setShouldListenOnBroadcastAddress(true);
+            listenAddress = InetAddresses.increment(FBUtilities.getBroadcastAddress());
+            DatabaseDescriptor.setListenAddress(listenAddress);
+            FBUtilities.reset();
+        }
+
+        try
+        {
+            messagingService.listen(serverEncryptionOptions);
+            Assert.assertTrue(messagingService.isListening());
+            int expectedListeningCount = NettyFactory.determineAcceptGroupSize(serverEncryptionOptions);
+            Assert.assertEquals(expectedListeningCount, messagingService.serverChannels.size());
+
+            if (!serverEncryptionOptions.enabled)
+            {
+                // make sure no channel is using TLS
+                for (ServerChannel serverChannel : messagingService.serverChannels)
+                    Assert.assertEquals(ServerChannel.SecurityLevel.NONE, serverChannel.getSecurityLevel());
+            }
+            else
+            {
+                final int legacySslPort = DatabaseDescriptor.getSSLStoragePort();
+                boolean foundLegacyListenSslAddress = false;
+                for (ServerChannel serverChannel : messagingService.serverChannels)
+                {
+                    if (serverEncryptionOptions.optional)
+                        Assert.assertEquals(ServerChannel.SecurityLevel.OPTIONAL, serverChannel.getSecurityLevel());
+                    else
+                        Assert.assertEquals(ServerChannel.SecurityLevel.REQUIRED, serverChannel.getSecurityLevel());
+
+                    if (serverEncryptionOptions.enable_legacy_ssl_storage_port)
+                    {
+                        if (legacySslPort == serverChannel.getAddress().getPort())
+                        {
+                            foundLegacyListenSslAddress = true;
+                            Assert.assertEquals(ServerChannel.SecurityLevel.REQUIRED, serverChannel.getSecurityLevel());
+                        }
+                    }
+                }
+
+                if (serverEncryptionOptions.enable_legacy_ssl_storage_port && !foundLegacyListenSslAddress)
+                    Assert.fail("failed to find legacy ssl listen address");
+            }
 
+            // check the optional listen address
+            if (listenOnBroadcastAddr)
+            {
+                int expectedCount = (serverEncryptionOptions.enabled && serverEncryptionOptions.enable_legacy_ssl_storage_port) ? 2 : 1;
+                int found = 0;
+                for (ServerChannel serverChannel : messagingService.serverChannels)
+                {
+                    if (serverChannel.getAddress().getAddress().equals(listenAddress))
+                        found++;
+                }
+
+                Assert.assertEquals(expectedCount, found);
+            }
+        }
+        finally
+        {
+            messagingService.shutdown(true);
+            messagingService.clearServerChannels();
+            Assert.assertEquals(0, messagingService.serverChannels.size());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java b/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
index 67b221a..0550490 100644
--- a/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
+++ b/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
@@ -31,7 +31,6 @@ import org.junit.Test;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
-import io.netty.channel.DefaultEventLoop;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.epoll.EpollServerSocketChannel;
@@ -44,10 +43,9 @@ import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.GlobalEventExecutor;
 import org.apache.cassandra.auth.AllowAllInternodeAuthenticator;
 import org.apache.cassandra.auth.IInternodeAuthenticator;
-import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
-import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions.InternodeEncryption;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.async.NettyFactory.InboundInitializer;
@@ -154,10 +152,15 @@ public class NettyFactoryTest
     @Test
     public void deterineAcceptGroupSize()
     {
-        Assert.assertEquals(1, NettyFactory.determineAcceptGroupSize(InternodeEncryption.none));
-        Assert.assertEquals(1, NettyFactory.determineAcceptGroupSize(InternodeEncryption.all));
-        Assert.assertEquals(2, NettyFactory.determineAcceptGroupSize(InternodeEncryption.rack));
-        Assert.assertEquals(2, NettyFactory.determineAcceptGroupSize(InternodeEncryption.dc));
+        ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions();
+        serverEncryptionOptions.enabled = false;
+        Assert.assertEquals(1, NettyFactory.determineAcceptGroupSize(serverEncryptionOptions));
+        serverEncryptionOptions.enabled = true;
+        Assert.assertEquals(1, NettyFactory.determineAcceptGroupSize(serverEncryptionOptions));
+
+        serverEncryptionOptions.enable_legacy_ssl_storage_port = true;
+        Assert.assertEquals(2, NettyFactory.determineAcceptGroupSize(serverEncryptionOptions));
+        serverEncryptionOptions.enable_legacy_ssl_storage_port = false;
 
         InetAddress originalBroadcastAddr = FBUtilities.getBroadcastAddress();
         try
@@ -165,10 +168,13 @@ public class NettyFactoryTest
             FBUtilities.setBroadcastInetAddress(InetAddresses.increment(FBUtilities.getLocalAddress()));
             DatabaseDescriptor.setListenOnBroadcastAddress(true);
 
-            Assert.assertEquals(2, NettyFactory.determineAcceptGroupSize(InternodeEncryption.none));
-            Assert.assertEquals(2, NettyFactory.determineAcceptGroupSize(InternodeEncryption.all));
-            Assert.assertEquals(4, NettyFactory.determineAcceptGroupSize(InternodeEncryption.rack));
-            Assert.assertEquals(4, NettyFactory.determineAcceptGroupSize(InternodeEncryption.dc));
+            serverEncryptionOptions.enabled = false;
+            Assert.assertEquals(2, NettyFactory.determineAcceptGroupSize(serverEncryptionOptions));
+            serverEncryptionOptions.enabled = true;
+            Assert.assertEquals(2, NettyFactory.determineAcceptGroupSize(serverEncryptionOptions));
+
+            serverEncryptionOptions.enable_legacy_ssl_storage_port = true;
+            Assert.assertEquals(4, NettyFactory.determineAcceptGroupSize(serverEncryptionOptions));
         }
         finally
         {
@@ -263,10 +269,13 @@ public class NettyFactoryTest
     @Test
     public void createInboundInitializer_WithoutSsl() throws Exception
     {
-        InboundInitializer initializer = new InboundInitializer(AUTHENTICATOR, null, channelGroup);
+        ServerEncryptionOptions encryptionOptions = new ServerEncryptionOptions();
+        encryptionOptions.enabled = false;
+        InboundInitializer initializer = new InboundInitializer(AUTHENTICATOR, encryptionOptions, channelGroup);
         NioSocketChannel channel = new NioSocketChannel();
         initializer.initChannel(channel);
         Assert.assertNull(channel.pipeline().get(SslHandler.class));
+        Assert.assertNull(channel.pipeline().get(OptionalSslHandler.class));
     }
 
     private ServerEncryptionOptions encOptions()
@@ -281,15 +290,33 @@ public class NettyFactoryTest
         encryptionOptions.cipher_suites = new String[] {"TLS_RSA_WITH_AES_128_CBC_SHA"};
         return encryptionOptions;
     }
+
     @Test
     public void createInboundInitializer_WithSsl() throws Exception
     {
         ServerEncryptionOptions encryptionOptions = encOptions();
+        encryptionOptions.enabled = true;
+        encryptionOptions.optional = false;
         InboundInitializer initializer = new InboundInitializer(AUTHENTICATOR, encryptionOptions, channelGroup);
         NioSocketChannel channel = new NioSocketChannel();
         Assert.assertNull(channel.pipeline().get(SslHandler.class));
         initializer.initChannel(channel);
         Assert.assertNotNull(channel.pipeline().get(SslHandler.class));
+        Assert.assertNull(channel.pipeline().get(OptionalSslHandler.class));
+    }
+
+    @Test
+    public void createInboundInitializer_WithOptionalSsl() throws Exception
+    {
+        ServerEncryptionOptions encryptionOptions = encOptions();
+        encryptionOptions.enabled = true;
+        encryptionOptions.optional = true;
+        InboundInitializer initializer = new InboundInitializer(AUTHENTICATOR, encryptionOptions, channelGroup);
+        NioSocketChannel channel = new NioSocketChannel();
+        Assert.assertNull(channel.pipeline().get(SslHandler.class));
+        initializer.initChannel(channel);
+        Assert.assertNotNull(channel.pipeline().get(OptionalSslHandler.class));
+        Assert.assertNull(channel.pipeline().get(SslHandler.class));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java b/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
index d6dd633..641c28c 100644
--- a/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
+++ b/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.auth.AllowAllInternodeAuthenticator;
 import org.apache.cassandra.auth.IInternodeAuthenticator;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.AbstractEndpointSnitch;
 import org.apache.cassandra.locator.IEndpointSnitch;
@@ -68,6 +69,7 @@ public class OutboundMessagingConnectionTest
     private EmbeddedChannel channel;
 
     private IEndpointSnitch snitch;
+    private ServerEncryptionOptions encryptionOptions;
 
     @BeforeClass
     public static void before()
@@ -84,12 +86,14 @@ public class OutboundMessagingConnectionTest
         omc.setChannelWriter(ChannelWriter.create(channel, omc::handleMessageResult, Optional.empty()));
 
         snitch = DatabaseDescriptor.getEndpointSnitch();
+        encryptionOptions = DatabaseDescriptor.getServerEncryptionOptions();
     }
 
     @After
     public void tearDown()
     {
         DatabaseDescriptor.setEndpointSnitch(snitch);
+        DatabaseDescriptor.setServerEncryptionOptions(encryptionOptions);
         channel.finishAndReleaseAll();
     }
 
@@ -471,4 +475,45 @@ public class OutboundMessagingConnectionTest
         Assert.assertNotSame(omc.getConnectionId(), originalId);
         Assert.assertSame(NOT_READY, omc.getState());
     }
+
+    @Test
+    public void maybeUpdateConnectionId_NoEncryption()
+    {
+        OutboundConnectionIdentifier connectionId = omc.getConnectionId();
+        int version = omc.getTargetVersion();
+        omc.maybeUpdateConnectionId();
+        Assert.assertEquals(connectionId, omc.getConnectionId());
+        Assert.assertEquals(version, omc.getTargetVersion());
+    }
+
+    @Test
+    public void maybeUpdateConnectionId_SameVersion()
+    {
+        ServerEncryptionOptions encryptionOptions = new ServerEncryptionOptions();
+        omc = new OutboundMessagingConnection(connectionId, encryptionOptions, Optional.empty(), new AllowAllInternodeAuthenticator());
+        OutboundConnectionIdentifier connectionId = omc.getConnectionId();
+        int version = omc.getTargetVersion();
+        omc.maybeUpdateConnectionId();
+        Assert.assertEquals(connectionId, omc.getConnectionId());
+        Assert.assertEquals(version, omc.getTargetVersion());
+    }
+
+    @Test
+    public void maybeUpdateConnectionId_3_X_Version()
+    {
+        ServerEncryptionOptions encryptionOptions = new ServerEncryptionOptions();
+        encryptionOptions.enabled = true;
+        encryptionOptions.internode_encryption = ServerEncryptionOptions.InternodeEncryption.all;
+        DatabaseDescriptor.setServerEncryptionOptions(encryptionOptions);
+        omc = new OutboundMessagingConnection(connectionId, encryptionOptions, Optional.empty(), new AllowAllInternodeAuthenticator());
+        int peerVersion = MessagingService.VERSION_30;
+        MessagingService.instance().setVersion(connectionId.remote(), MessagingService.VERSION_30);
+
+        OutboundConnectionIdentifier connectionId = omc.getConnectionId();
+        omc.maybeUpdateConnectionId();
+        Assert.assertNotEquals(connectionId, omc.getConnectionId());
+        Assert.assertEquals(new InetSocketAddress(REMOTE_ADDR.getAddress(), DatabaseDescriptor.getSSLStoragePort()), omc.getConnectionId().remoteAddress());
+        Assert.assertEquals(new InetSocketAddress(REMOTE_ADDR.getAddress(), DatabaseDescriptor.getSSLStoragePort()), omc.getConnectionId().connectionAddress());
+        Assert.assertEquals(peerVersion, omc.getTargetVersion());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java b/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java
index 0c51eb7..4ade4ad 100644
--- a/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java
+++ b/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java
@@ -68,7 +68,7 @@ public class ProtocolBetaVersionTest extends CQLTester
         createTable("CREATE TABLE %s (pk int PRIMARY KEY, v int)");
         assertTrue(betaVersion.isBeta()); // change to another beta version or remove test if no beta version
 
-        try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, betaVersion, true, new EncryptionOptions.ClientEncryptionOptions()))
+        try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, betaVersion, true, new EncryptionOptions()))
         {
             client.connect(false);
             for (int i = 0; i < 10; i++)
@@ -103,7 +103,7 @@ public class ProtocolBetaVersionTest extends CQLTester
         }
 
         assertTrue(betaVersion.isBeta()); // change to another beta version or remove test if no beta version
-        try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, betaVersion, false, new EncryptionOptions.ClientEncryptionOptions()))
+        try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, betaVersion, false, new EncryptionOptions()))
         {
             client.connect(false);
             fail("Exception should have been thrown");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
index 817cb06..5b8067e 100644
--- a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
+++ b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
@@ -29,6 +29,7 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.cql3.BatchQueryOptions;
 import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.cql3.QueryHandler;
@@ -48,7 +49,6 @@ import org.apache.cassandra.transport.messages.QueryMessage;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.MD5Digest;
 
-import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
 public class MessagePayloadTest extends CQLTester
@@ -127,7 +127,7 @@ public class MessagePayloadTest extends CQLTester
                                                    nativePort,
                                                    ProtocolVersion.V5,
                                                    true,
-                                                   new ClientEncryptionOptions());
+                                                   new EncryptionOptions());
             try
             {
                 client.connect(false);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTransport.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTransport.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTransport.java
index a6248bb..6acc500 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTransport.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTransport.java
@@ -23,7 +23,6 @@ package org.apache.cassandra.stress.settings;
 
 import java.io.Serializable;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -39,9 +38,9 @@ public class SettingsTransport implements Serializable
         this.options = options;
     }
 
-    public EncryptionOptions.ClientEncryptionOptions getEncryptionOptions()
+    public EncryptionOptions getEncryptionOptions()
     {
-        EncryptionOptions.ClientEncryptionOptions encOptions = new EncryptionOptions.ClientEncryptionOptions();
+        EncryptionOptions encOptions = new EncryptionOptions();
         if (options.trustStore.present())
         {
             encOptions.enabled = true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
index a27b986..af35490 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
@@ -134,7 +134,7 @@ public class StressSettings implements Serializable
                 if (client != null)
                     return client;
 
-                EncryptionOptions.ClientEncryptionOptions encOptions = transport.getEncryptionOptions();
+                EncryptionOptions encOptions = transport.getEncryptionOptions();
                 JavaDriverClient c = new JavaDriverClient(this, currentNode, port.nativePort, encOptions);
                 c.connect(mode.compression());
                 if (keyspace != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
index d404653..4928cd2 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
@@ -50,7 +50,7 @@ public class JavaDriverClient
     public final int connectionsPerHost;
 
     private final ProtocolVersion protocolVersion;
-    private final EncryptionOptions.ClientEncryptionOptions encryptionOptions;
+    private final EncryptionOptions encryptionOptions;
     private Cluster cluster;
     private Session session;
     private final LoadBalancingPolicy loadBalancingPolicy;
@@ -59,10 +59,10 @@ public class JavaDriverClient
 
     public JavaDriverClient(StressSettings settings, String host, int port)
     {
-        this(settings, host, port, new EncryptionOptions.ClientEncryptionOptions());
+        this(settings, host, port, new EncryptionOptions());
     }
 
-    public JavaDriverClient(StressSettings settings, String host, int port, EncryptionOptions.ClientEncryptionOptions encryptionOptions)
+    public JavaDriverClient(StressSettings settings, String host, int port, EncryptionOptions encryptionOptions)
     {
         this.protocolVersion = settings.mode.protocolVersion;
         this.host = host;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org