You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2017/11/03 12:09:15 UTC
cassandra git commit: Node to Node encryption transitional mode
Repository: cassandra
Updated Branches:
refs/heads/trunk 87962dcf3 -> 260846685
Node to Node encryption transitional mode
patch by jasobrown; reviewed by Stefan Podkowinski for CASSANDRA-10404
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/26084668
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/26084668
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/26084668
Branch: refs/heads/trunk
Commit: 260846685b6129a324a7cb7396da135fee85ec04
Parents: 87962dc
Author: Jason Brown <ja...@gmail.com>
Authored: Wed Feb 15 05:41:30 2017 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Fri Nov 3 05:06:38 2017 -0700
----------------------------------------------------------------------
NEWS.txt | 5 +-
conf/cassandra.yaml | 23 +-
.../org/apache/cassandra/config/Config.java | 4 +-
.../cassandra/config/DatabaseDescriptor.java | 33 ++-
.../cassandra/config/EncryptionOptions.java | 43 +++-
.../locator/ReconnectableSnitchHelper.java | 2 +-
.../apache/cassandra/net/MessagingService.java | 124 +++++++---
.../cassandra/net/async/NettyFactory.java | 117 ++++++----
.../cassandra/net/async/OptionalSslHandler.java | 67 ++++++
.../net/async/OutboundConnectionIdentifier.java | 6 +
.../net/async/OutboundMessagingConnection.java | 27 +++
.../cassandra/streaming/StreamSession.java | 2 +-
.../org/apache/cassandra/tools/BulkLoader.java | 2 +-
.../apache/cassandra/tools/LoaderOptions.java | 6 +-
.../org/apache/cassandra/transport/Client.java | 7 +-
.../org/apache/cassandra/transport/Server.java | 2 +-
.../cassandra/transport/SimpleClient.java | 15 +-
.../org/apache/cassandra/utils/FBUtilities.java | 2 +-
.../cassandra/net/MessagingServiceTest.java | 228 ++++++++++++++++---
.../cassandra/net/async/NettyFactoryTest.java | 51 ++++-
.../async/OutboundMessagingConnectionTest.java | 45 ++++
.../service/ProtocolBetaVersionTest.java | 4 +-
.../cassandra/transport/MessagePayloadTest.java | 4 +-
.../stress/settings/SettingsTransport.java | 5 +-
.../stress/settings/StressSettings.java | 2 +-
.../cassandra/stress/util/JavaDriverClient.java | 6 +-
26 files changed, 657 insertions(+), 175 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 7a133b8..09a9a7b 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -39,7 +39,7 @@ Upgrading
4.0 and the legacy tables must have been removed. See the 'Upgrading' section
for version 2.2 for migration instructions.
- Cassandra 4.0 removed support for the deprecated Thrift interface. Amongst
- Tother things, this imply the removal of all yaml option related to thrift
+ other things, this implies the removal of all yaml options related to thrift
('start_rpc', rpc_port, ...).
- Cassandra 4.0 removed support for any pre-3.0 format. This means you
cannot upgrade from a 2.x version to 4.0 directly, you have to upgrade to
@@ -67,6 +67,9 @@ Upgrading
- the miniumum value for internode message timeouts is 10ms. Previously, any
positive value was allowed. See cassandra.yaml entries like
read_request_timeout_in_ms for more details.
+ - Cassandra 4.0 allows a single port to be used for both secure and insecure
+ connections between cassandra nodes (CASSANDRA-10404). See the yaml for
+ specific property changes, and see the security doc for full details.
Materialized Views
-------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index ef94613..e41af17 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -570,9 +570,10 @@ trickle_fsync_interval_in_kb: 10240
# For security reasons, you should not expose this port to the internet. Firewall it if needed.
storage_port: 7000
-# SSL port, for encrypted communication. Unused unless enabled in
-# encryption_options
-# For security reasons, you should not expose this port to the internet. Firewall it if needed.
+# SSL port, for legacy encrypted communication. This property is unused unless enabled in
+# server_encryption_options (see below). As of cassandra 4.0, this property is deprecated
+# as a single port can be used for either/both secure and insecure connections.
+# For security reasons, you should not expose this port to the internet. Firewall it if needed.
ssl_storage_port: 7001
# Address or interface to bind to and tell other Cassandra nodes to connect to.
@@ -920,7 +921,7 @@ dynamic_snitch_reset_interval_in_ms: 600000
dynamic_snitch_badness_threshold: 0.1
# Enable or disable inter-node encryption
-# JVM defaults for supported SSL socket protocols and cipher suites can
+# JVM and netty defaults for supported SSL socket protocols and cipher suites can
# be replaced using custom encryption options. This is not recommended
# unless you have policies in place that dictate certain settings, or
# need to disable vulnerable ciphers or protocols in case the JVM cannot
@@ -928,17 +929,25 @@ dynamic_snitch_badness_threshold: 0.1
# FIPS compliant settings can be configured at JVM level and should not
# involve changing encryption settings here:
# https://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/FIPS.html
+#
# *NOTE* No custom encryption options are enabled at the moment
# The available internode options are : all, none, dc, rack
-#
# If set to dc cassandra will encrypt the traffic between the DCs
# If set to rack cassandra will encrypt the traffic between the racks
#
# The passwords used in these options must match the passwords used when generating
# the keystore and truststore. For instructions on generating these files, see:
-# http://download.oracle.com/javase/6/docs/technotes/guides/security/jsse/JSSERefGuide.html#CreateKeystore
+# http://download.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html#CreateKeystore
#
server_encryption_options:
+ # set to true for allowing secure incoming connections
+ enabled: false
+ # If enabled and optional are both set to true, encrypted and unencrypted connections are handled on the storage_port
+ optional: false
+ # if enabled, will open up an encrypted listening socket on ssl_storage_port. Should be used
+ # during upgrade to 4.0; otherwise, set to false.
+ enable_legacy_ssl_storage_port: false
+ # on outbound connections, determine which type of peers to securely connect to. 'enabled' must be set to true.
internode_encryption: none
keystore: conf/.keystore
keystore_password: cassandra
@@ -952,7 +961,7 @@ server_encryption_options:
# require_client_auth: false
# require_endpoint_verification: false
-# enable or disable client/server encryption.
+# enable or disable client-to-server encryption.
client_encryption_options:
enabled: false
# If enabled and optional is set to true encrypted and unencrypted connections are handled.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index a28d492..de193b0 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -209,9 +209,7 @@ public class Config
public double dynamic_snitch_badness_threshold = 0.1;
public EncryptionOptions.ServerEncryptionOptions server_encryption_options = new EncryptionOptions.ServerEncryptionOptions();
- public EncryptionOptions.ClientEncryptionOptions client_encryption_options = new EncryptionOptions.ClientEncryptionOptions();
- // this encOptions is for backward compatibility (a warning is logged by DatabaseDescriptor)
- public EncryptionOptions.ServerEncryptionOptions encryption_options;
+ public EncryptionOptions client_encryption_options = new EncryptionOptions();
public InternodeCompression internode_compression = InternodeCompression.none;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index d948abf..af1cbde 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.auth.IAuthorizer;
import org.apache.cassandra.auth.IInternodeAuthenticator;
import org.apache.cassandra.auth.IRoleManager;
import org.apache.cassandra.config.Config.CommitLogSync;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions.InternodeEncryption;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -648,13 +649,6 @@ public class DatabaseDescriptor
throw new ConfigurationException("index_summary_capacity_in_mb option was set incorrectly to '"
+ conf.index_summary_capacity_in_mb + "', it should be a non-negative integer.", false);
- if(conf.encryption_options != null)
- {
- logger.warn("Please rename encryption_options as server_encryption_options in the yaml");
- //operate under the assumption that server_encryption_options is not set in yaml rather than both
- conf.server_encryption_options = conf.encryption_options;
- }
-
if (conf.user_defined_function_fail_timeout < 0)
throw new ConfigurationException("user_defined_function_fail_timeout must not be negative", false);
if (conf.user_defined_function_warn_timeout < 0)
@@ -683,6 +677,14 @@ public class DatabaseDescriptor
throw new ConfigurationException("Encryption must be enabled in client_encryption_options for native_transport_port_ssl", false);
}
+ // internode messaging encryption options
+ if (conf.server_encryption_options.internode_encryption != InternodeEncryption.none
+ && !conf.server_encryption_options.enabled)
+ {
+ throw new ConfigurationException("Encryption must be enabled in server_encryption_options when using peer-to-peer security. " +
+ "server_encryption_options.internode_encryption = " + conf.server_encryption_options.internode_encryption, false);
+ }
+
if (conf.max_value_size_in_mb <= 0)
throw new ConfigurationException("max_value_size_in_mb must be positive", false);
else if (conf.max_value_size_in_mb >= 2048)
@@ -1638,6 +1640,11 @@ public class DatabaseDescriptor
return listenAddress;
}
+ public static void setListenAddress(InetAddress newlistenAddress)
+ {
+ listenAddress = newlistenAddress;
+ }
+
public static InetAddress getBroadcastAddress()
{
return broadcastAddress;
@@ -1648,6 +1655,11 @@ public class DatabaseDescriptor
return conf.listen_on_broadcast_address;
}
+ public static void setShouldListenOnBroadcastAddress(boolean shouldListenOnBroadcastAddress)
+ {
+ conf.listen_on_broadcast_address = shouldListenOnBroadcastAddress;
+ }
+
public static void setListenOnBroadcastAddress(boolean listen_on_broadcast_address)
{
conf.listen_on_broadcast_address = listen_on_broadcast_address;
@@ -1939,7 +1951,12 @@ public class DatabaseDescriptor
return conf.server_encryption_options;
}
- public static EncryptionOptions.ClientEncryptionOptions getClientEncryptionOptions()
+ public static void setServerEncryptionOptions(EncryptionOptions.ServerEncryptionOptions encryptionOptions)
+ {
+ conf.server_encryption_options = encryptionOptions;
+ }
+
+ public static EncryptionOptions getClientEncryptionOptions()
{
return conf.client_encryption_options;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/config/EncryptionOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/EncryptionOptions.java b/src/java/org/apache/cassandra/config/EncryptionOptions.java
index 6010746..aecbfca 100644
--- a/src/java/org/apache/cassandra/config/EncryptionOptions.java
+++ b/src/java/org/apache/cassandra/config/EncryptionOptions.java
@@ -17,7 +17,7 @@
*/
package org.apache.cassandra.config;
-public abstract class EncryptionOptions
+public class EncryptionOptions
{
public String keystore = "conf/.keystore";
public String keystore_password = "cassandra";
@@ -29,19 +29,52 @@ public abstract class EncryptionOptions
public String store_type = "JKS";
public boolean require_client_auth = false;
public boolean require_endpoint_verification = false;
+ public boolean enabled = false;
+ public boolean optional = false;
- public static class ClientEncryptionOptions extends EncryptionOptions
+ public EncryptionOptions()
+ { }
+
+ /**
+ * Copy constructor
+ */
+ public EncryptionOptions(EncryptionOptions options)
{
- public boolean enabled = false;
- public boolean optional = false;
+ keystore = options.keystore;
+ keystore_password = options.keystore_password;
+ truststore = options.truststore;
+ truststore_password = options.truststore_password;
+ cipher_suites = options.cipher_suites;
+ protocol = options.protocol;
+ algorithm = options.algorithm;
+ store_type = options.store_type;
+ require_client_auth = options.require_client_auth;
+ require_endpoint_verification = options.require_endpoint_verification;
+ enabled = options.enabled;
+ optional = options.optional;
}
public static class ServerEncryptionOptions extends EncryptionOptions
{
- public static enum InternodeEncryption
+ public enum InternodeEncryption
{
all, none, dc, rack
}
+
public InternodeEncryption internode_encryption = InternodeEncryption.none;
+ public boolean enable_legacy_ssl_storage_port = false;
+
+ public ServerEncryptionOptions()
+ { }
+
+ /**
+ * Copy constructor
+ */
+ public ServerEncryptionOptions(ServerEncryptionOptions options)
+ {
+ super(options);
+ internode_encryption = options.internode_encryption;
+ enable_legacy_ssl_storage_port = options.enable_legacy_ssl_storage_port;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
index 2235c76..0b344c9 100644
--- a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
+++ b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
@@ -64,7 +64,7 @@ public class ReconnectableSnitchHelper implements IEndpointStateChangeSubscriber
@VisibleForTesting
static void reconnect(InetAddress publicAddress, InetAddress localAddress, IEndpointSnitch snitch, String localDc)
{
- if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(publicAddress, MessagingService.portFor(publicAddress)))
+ if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(publicAddress, MessagingService.instance().portFor(publicAddress)))
{
logger.debug("InternodeAuthenticator said don't reconnect to {} on {}", publicAddress, localAddress);
return;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 2a44e68..4e6fe1c 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -1,4 +1,4 @@
-/*
+ /*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -729,10 +729,15 @@ public final class MessagingService implements MessagingServiceMBean
public void listen()
{
+ listen(DatabaseDescriptor.getServerEncryptionOptions());
+ }
+
+ public void listen(ServerEncryptionOptions serverEncryptionOptions)
+ {
callbacks.reset(); // hack to allow tests to stop/restart MS
- listen(FBUtilities.getLocalAddress());
+ listen(FBUtilities.getLocalAddress(), serverEncryptionOptions);
if (shouldListenOnBroadcastAddress())
- listen(FBUtilities.getBroadcastAddress());
+ listen(FBUtilities.getBroadcastAddress(), serverEncryptionOptions);
listenGate.signalAll();
}
@@ -747,40 +752,54 @@ public final class MessagingService implements MessagingServiceMBean
*
* @param localEp InetAddress whose port to listen on.
*/
- private void listen(InetAddress localEp) throws ConfigurationException
+ private void listen(InetAddress localEp, ServerEncryptionOptions serverEncryptionOptions) throws ConfigurationException
{
IInternodeAuthenticator authenticator = DatabaseDescriptor.getInternodeAuthenticator();
int receiveBufferSize = DatabaseDescriptor.getInternodeRecvBufferSize();
- if (DatabaseDescriptor.getServerEncryptionOptions().internode_encryption != ServerEncryptionOptions.InternodeEncryption.none)
+ // this is the legacy socket, for letting peer nodes that haven't upgrade yet connect to this node.
+ // should only occur during cluster upgrade. we can remove this block at 5.0!
+ if (serverEncryptionOptions.enabled && serverEncryptionOptions.enable_legacy_ssl_storage_port)
{
+ // clone the encryption options, and explicitly set the optional field to false
+ // (do not allow non-TLS connections on the legacy ssl port)
+ ServerEncryptionOptions legacyEncOptions = new ServerEncryptionOptions(serverEncryptionOptions);
+ legacyEncOptions.optional = false;
+
InetSocketAddress localAddr = new InetSocketAddress(localEp, DatabaseDescriptor.getSSLStoragePort());
- ChannelGroup channelGroup = new DefaultChannelGroup("EncryptedInternodeMessagingGroup", NettyFactory.executorForChannelGroups());
- InboundInitializer initializer = new InboundInitializer(authenticator, DatabaseDescriptor.getServerEncryptionOptions(), channelGroup);
+ ChannelGroup channelGroup = new DefaultChannelGroup("LegacyEncryptedInternodeMessagingGroup", NettyFactory.executorForChannelGroups());
+ InboundInitializer initializer = new InboundInitializer(authenticator, legacyEncOptions, channelGroup);
Channel encryptedChannel = NettyFactory.instance.createInboundChannel(localAddr, initializer, receiveBufferSize);
- serverChannels.add(new ServerChannel(encryptedChannel, channelGroup));
+ serverChannels.add(new ServerChannel(encryptedChannel, channelGroup, localAddr, ServerChannel.SecurityLevel.REQUIRED));
}
- if (DatabaseDescriptor.getServerEncryptionOptions().internode_encryption != ServerEncryptionOptions.InternodeEncryption.all)
- {
- InetSocketAddress localAddr = new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort());
- ChannelGroup channelGroup = new DefaultChannelGroup("InternodeMessagingGroup", NettyFactory.executorForChannelGroups());
- InboundInitializer initializer = new InboundInitializer(authenticator, null, channelGroup);
- Channel channel = NettyFactory.instance.createInboundChannel(localAddr, initializer, receiveBufferSize);
- serverChannels.add(new ServerChannel(channel, channelGroup));
- }
-
- if (serverChannels.isEmpty())
- throw new IllegalStateException("no listening channels set up in MessagingService!");
+ // this is for the socket that can be plain, only ssl, or optional plain/ssl
+ InetSocketAddress localAddr = new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort());
+ ChannelGroup channelGroup = new DefaultChannelGroup("InternodeMessagingGroup", NettyFactory.executorForChannelGroups());
+ InboundInitializer initializer = new InboundInitializer(authenticator, serverEncryptionOptions, channelGroup);
+ Channel channel = NettyFactory.instance.createInboundChannel(localAddr, initializer, receiveBufferSize);
+ ServerChannel.SecurityLevel securityLevel = !serverEncryptionOptions.enabled ? ServerChannel.SecurityLevel.NONE :
+ serverEncryptionOptions.optional ? ServerChannel.SecurityLevel.OPTIONAL :
+ ServerChannel.SecurityLevel.REQUIRED;
+ serverChannels.add(new ServerChannel(channel, channelGroup, localAddr, securityLevel));
}
/**
* A simple struct to wrap up the the components needed for each listening socket.
+ * <p>
+ * The {@link #securityLevel} is captured independently of the {@link #channel} as there's no real way to inspect a s
+ * erver-side 'channel' to check if it using TLS or not (the channel's configured pipeline will only apply to
+ * connections that get created, so it's not inspectible). {@link #securityLevel} is really only used for testing, anyway.
*/
@VisibleForTesting
static class ServerChannel
{
/**
+ * Declares the type of TLS used with the channel.
+ */
+ enum SecurityLevel { NONE, OPTIONAL, REQUIRED }
+
+ /**
* The base {@link Channel} that is doing the spcket listen/accept.
*/
private final Channel channel;
@@ -790,23 +809,46 @@ public final class MessagingService implements MessagingServiceMBean
* the inbound connections/channels can be closed when the listening socket itself is being closed.
*/
private final ChannelGroup connectedChannels;
+ private final InetSocketAddress address;
+ private final SecurityLevel securityLevel;
- private ServerChannel(Channel channel, ChannelGroup channelGroup)
+ private ServerChannel(Channel channel, ChannelGroup channelGroup, InetSocketAddress address, SecurityLevel securityLevel)
{
this.channel = channel;
this.connectedChannels = channelGroup;
+ this.address = address;
+ this.securityLevel = securityLevel;
}
void close()
{
- channel.close().syncUninterruptibly();
- connectedChannels.close().syncUninterruptibly();
+ if (channel.isOpen())
+ channel.close().awaitUninterruptibly();
+ connectedChannels.close().awaitUninterruptibly();
}
- int size()
+ int size()
{
return connectedChannels.size();
}
+
+ /**
+ * For testing only!
+ */
+ Channel getChannel()
+ {
+ return channel;
+ }
+
+ InetSocketAddress getAddress()
+ {
+ return address;
+ }
+
+ SecurityLevel getSecurityLevel()
+ {
+ return securityLevel;
+ }
}
public void waitUntilListening()
@@ -1038,6 +1080,11 @@ public final class MessagingService implements MessagingServiceMBean
*/
public void shutdown()
{
+ shutdown(false);
+ }
+
+ public void shutdown(boolean isTest)
+ {
logger.info("Waiting for messaging service to quiesce");
// We may need to schedule hints on the mutation stage, so it's erroneous to shut down the mutation stage first
assert !StageManager.getStage(Stage.MUTATION).isShutdown();
@@ -1057,7 +1104,8 @@ public final class MessagingService implements MessagingServiceMBean
for (OutboundMessagingPool pool : channelManagers.values())
pool.close(false);
- NettyFactory.instance.close();
+ if (!isTest)
+ NettyFactory.instance.close();
}
catch (Exception e)
{
@@ -1065,6 +1113,14 @@ public final class MessagingService implements MessagingServiceMBean
}
}
+ /**
+ * For testing only!
+ */
+ void clearServerChannels()
+ {
+ serverChannels.clear();
+ }
+
public void receive(MessageIn message, int id)
{
TraceState state = Tracing.instance.initializeFromMessage(message);
@@ -1443,7 +1499,7 @@ public final class MessagingService implements MessagingServiceMBean
if (pool == null)
{
final boolean secure = isEncryptedConnection(to);
- final int port = portFor(secure);
+ final int port = portFor(to, secure);
if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(to, port))
return null;
@@ -1463,15 +1519,25 @@ public final class MessagingService implements MessagingServiceMBean
return pool;
}
- public static int portFor(InetAddress addr)
+ public int portFor(InetAddress addr)
{
final boolean secure = isEncryptedConnection(addr);
- return portFor(secure);
+ return portFor(addr, secure);
}
- private static int portFor(boolean secure)
+ private int portFor(InetAddress address, boolean secure)
{
- return secure ? DatabaseDescriptor.getSSLStoragePort() : DatabaseDescriptor.getStoragePort();
+ if (!secure)
+ return DatabaseDescriptor.getStoragePort();
+
+ Integer v = versions.get(address);
+ // if we don't know the version of the peer, assume it is 4.0 (or higher) as the only time is would be lower
+ // (as in a 3.x version) is during a cluster upgrade (from 3.x to 4.0). In that case the outbound connection will
+ // unfortunately fail - however the peer should connect to this node (at some point), and once we learn it's version, it'll be
+ // in versions map. thus, when we attempt to reconnect to that node, we'll have the version and we can get the correct port.
+ // we will be able to remove this logic at 5.0.
+ int version = v != null ? v.intValue() : VERSION_40;
+ return version < VERSION_40 ? DatabaseDescriptor.getSSLStoragePort() : DatabaseDescriptor.getStoragePort();
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/net/async/NettyFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/NettyFactory.java b/src/java/org/apache/cassandra/net/async/NettyFactory.java
index d193e31..7fb81d3 100644
--- a/src/java/org/apache/cassandra/net/async/NettyFactory.java
+++ b/src/java/org/apache/cassandra/net/async/NettyFactory.java
@@ -1,8 +1,10 @@
package org.apache.cassandra.net.async;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.zip.Checksum;
+import javax.annotation.Nullable;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
@@ -45,7 +47,6 @@ import net.jpountz.xxhash.XXHashFactory;
import org.apache.cassandra.auth.IInternodeAuthenticator;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
-import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions.InternodeEncryption;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.security.SSLFactory;
@@ -53,7 +54,6 @@ import org.apache.cassandra.service.NativeTransportService;
import org.apache.cassandra.utils.ChecksumType;
import org.apache.cassandra.utils.CoalescingStrategies;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.NativeLibrary;
/**
* A factory for building Netty {@link Channel}s. Channels here are setup with a pipeline to participate
@@ -70,17 +70,13 @@ public final class NettyFactory
private static final int LZ4_HASH_SEED = 0x9747b28c;
- /**
- * Default seed value for xxhash.
- */
- public static final int XXHASH_DEFAULT_SEED = 0x9747b28c;
-
public enum Mode { MESSAGING, STREAMING }
- private static final String SSL_CHANNEL_HANDLER_NAME = "ssl";
- public static final String INBOUND_COMPRESSOR_HANDLER_NAME = "inboundCompressor";
- public static final String OUTBOUND_COMPRESSOR_HANDLER_NAME = "outboundCompressor";
- public static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
+ static final String SSL_CHANNEL_HANDLER_NAME = "ssl";
+ private static final String OPTIONAL_SSL_CHANNEL_HANDLER_NAME = "optionalSsl";
+ static final String INBOUND_COMPRESSOR_HANDLER_NAME = "inboundCompressor";
+ static final String OUTBOUND_COMPRESSOR_HANDLER_NAME = "outboundCompressor";
+ private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
public static final String INBOUND_STREAM_HANDLER_NAME = "inboundStreamHandler";
/** a useful addition for debugging; simply set to true to get more data in your logs */
@@ -125,7 +121,7 @@ public final class NettyFactory
NettyFactory(boolean useEpoll)
{
this.useEpoll = useEpoll;
- acceptGroup = getEventLoopGroup(useEpoll, determineAcceptGroupSize(DatabaseDescriptor.getServerEncryptionOptions().internode_encryption),
+ acceptGroup = getEventLoopGroup(useEpoll, determineAcceptGroupSize(DatabaseDescriptor.getServerEncryptionOptions()),
"MessagingService-NettyAcceptor-Thread", false);
inboundGroup = getEventLoopGroup(useEpoll, FBUtilities.getAvailableProcessors(), "MessagingService-NettyInbound-Thread", false);
outboundGroup = getEventLoopGroup(useEpoll, FBUtilities.getAvailableProcessors(), "MessagingService-NettyOutbound-Thread", true);
@@ -134,19 +130,23 @@ public final class NettyFactory
/**
* Determine the number of accept threads we need, which is based upon the number of listening sockets we will have.
- * We'll have either 1 or 2 listen sockets, depending on if we use SSL or not in combination with non-SSL. If we have both,
- * we'll have two sockets, and thus need two threads; else one socket and one thread.
- *
- * If the operator has configured multiple IP addresses (both {@link org.apache.cassandra.config.Config#broadcast_address}
- * and {@link org.apache.cassandra.config.Config#listen_address} are configured), then we listen on another set of sockets
- * - basically doubling the count. See CASSANDRA-9748 for more details.
+ * The idea is one accept thread per listening socket.
*/
- static int determineAcceptGroupSize(InternodeEncryption internode_encryption)
+ public static int determineAcceptGroupSize(ServerEncryptionOptions serverEncryptionOptions)
{
- int listenSocketCount = internode_encryption == InternodeEncryption.dc || internode_encryption == InternodeEncryption.rack ? 2 : 1;
+ int listenSocketCount = 1;
+
+ boolean listenOnBroadcastAddr = MessagingService.shouldListenOnBroadcastAddress();
+ if (listenOnBroadcastAddr)
+ listenSocketCount++;
- if (MessagingService.shouldListenOnBroadcastAddress())
- listenSocketCount *= 2;
+ if (serverEncryptionOptions.enable_legacy_ssl_storage_port)
+ {
+ listenSocketCount++;
+
+ if (listenOnBroadcastAddr)
+ listenSocketCount++;
+ }
return listenSocketCount;
}
@@ -236,6 +236,28 @@ public final class NettyFactory
return channelFuture.channel();
}
+ /**
+ * Creates a new {@link SslHandler} from provided SslContext.
+ * @param peer enables endpoint verification for remote address when not null
+ */
+ static SslHandler newSslHandler(Channel channel, SslContext sslContext, @Nullable InetSocketAddress peer)
+ {
+ if (peer == null)
+ {
+ return sslContext.newHandler(channel.alloc());
+ }
+ else
+ {
+ logger.debug("Creating SSL handler for %s:%d", peer.getHostString(), peer.getPort());
+ SslHandler sslHandler = sslContext.newHandler(channel.alloc(), peer.getHostString(), peer.getPort());
+ SSLEngine engine = sslHandler.engine();
+ SSLParameters sslParameters = engine.getSSLParameters();
+ sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
+ engine.setSSLParameters(sslParameters);
+ return sslHandler;
+ }
+ }
+
public static class InboundInitializer extends ChannelInitializer<SocketChannel>
{
private final IInternodeAuthenticator authenticator;
@@ -256,12 +278,20 @@ public final class NettyFactory
ChannelPipeline pipeline = channel.pipeline();
// order of handlers: ssl -> logger -> handshakeHandler
- if (encryptionOptions != null)
+ if (encryptionOptions.enabled)
{
- SslContext sslContext = SSLFactory.getSslContext(encryptionOptions, true, true);
- SslHandler sslHandler = sslContext.newHandler(channel.alloc());
- logger.trace("creating inbound netty SslContext: context={}, engine={}", sslContext.getClass().getName(), sslHandler.engine().getClass().getName());
- pipeline.addFirst(SSL_CHANNEL_HANDLER_NAME, sslHandler);
+ if (encryptionOptions.optional)
+ {
+ pipeline.addFirst(OPTIONAL_SSL_CHANNEL_HANDLER_NAME, new OptionalSslHandler(encryptionOptions));
+ }
+ else
+ {
+ SslContext sslContext = SSLFactory.getSslContext(encryptionOptions, true, true);
+ InetSocketAddress peer = encryptionOptions.require_endpoint_verification ? channel.remoteAddress() : null;
+ SslHandler sslHandler = newSslHandler(channel, sslContext, peer);
+ logger.trace("creating inbound netty SslContext: context={}, engine={}", sslContext.getClass().getName(), sslHandler.engine().getClass().getName());
+ pipeline.addFirst(SSL_CHANNEL_HANDLER_NAME, sslHandler);
+ }
}
if (WIRETRACE)
@@ -271,7 +301,7 @@ public final class NettyFactory
}
}
- private String encryptionLogStatement(ServerEncryptionOptions options)
+ private static String encryptionLogStatement(ServerEncryptionOptions options)
{
if (options == null)
return "disabled";
@@ -287,9 +317,11 @@ public final class NettyFactory
@VisibleForTesting
public Bootstrap createOutboundBootstrap(OutboundConnectionParams params)
{
- logger.debug("creating outbound bootstrap to peer {}, compression: {}, encryption: {}, coalesce: {}", params.connectionId.connectionAddress(),
+ logger.debug("creating outbound bootstrap to peer {}, compression: {}, encryption: {}, coalesce: {}, protocolVersion: {}",
+ params.connectionId.connectionAddress(),
params.compress, encryptionLogStatement(params.encryptionOptions),
- params.coalescingStrategy.isPresent() ? params.coalescingStrategy.get() : CoalescingStrategies.Strategy.DISABLED);
+ params.coalescingStrategy.isPresent() ? params.coalescingStrategy.get() : CoalescingStrategies.Strategy.DISABLED,
+ params.protocolVersion);
Class<? extends Channel> transport = useEpoll ? EpollSocketChannel.class : NioSocketChannel.class;
Bootstrap bootstrap = new Bootstrap().group(params.mode == Mode.MESSAGING ? outboundGroup : streamingGroup)
.channel(transport)
@@ -315,6 +347,12 @@ public final class NettyFactory
this.params = params;
}
+ /**
+ * {@inheritDoc}
+ *
+ * To determine if we should enable TLS, we only need to check if {@link #params#encryptionOptions} is set.
+ * The logic for figuring that out is is located in {@link MessagingService#getMessagingConnection(InetAddress)};
+ */
public void initChannel(SocketChannel channel) throws Exception
{
ChannelPipeline pipeline = channel.pipeline();
@@ -323,22 +361,9 @@ public final class NettyFactory
if (params.encryptionOptions != null)
{
SslContext sslContext = SSLFactory.getSslContext(params.encryptionOptions, true, false);
-
- final SslHandler sslHandler;
- if (params.encryptionOptions.require_endpoint_verification)
- {
- InetSocketAddress peer = params.connectionId.remoteAddress();
- sslHandler = sslContext.newHandler(channel.alloc(), peer.getHostString(), peer.getPort());
- SSLEngine engine = sslHandler.engine();
- SSLParameters sslParameters = engine.getSSLParameters();
- sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
- engine.setSSLParameters(sslParameters);
- }
- else
- {
- sslHandler = sslContext.newHandler(channel.alloc());
- }
-
+ // for some reason channel.remoteAddress() will return null
+ InetSocketAddress peer = params.encryptionOptions.require_endpoint_verification ? params.connectionId.remoteAddress() : null;
+ SslHandler sslHandler = newSslHandler(channel, sslContext, peer);
logger.trace("creating outbound netty SslContext: context={}, engine={}", sslContext.getClass().getName(), sslHandler.engine().getClass().getName());
pipeline.addFirst(SSL_CHANNEL_HANDLER_NAME, sslHandler);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/net/async/OptionalSslHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OptionalSslHandler.java b/src/java/org/apache/cassandra/net/async/OptionalSslHandler.java
new file mode 100644
index 0000000..b60ae13
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/OptionalSslHandler.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslHandler;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
+import org.apache.cassandra.security.SSLFactory;
+
+public class OptionalSslHandler extends ByteToMessageDecoder
+{
+ private final ServerEncryptionOptions encryptionOptions;
+
+ OptionalSslHandler(ServerEncryptionOptions encryptionOptions)
+ {
+ this.encryptionOptions = encryptionOptions;
+ }
+
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception
+ {
+ if (in.readableBytes() < 5)
+ {
+ // To detect if SSL must be used we need to have at least 5 bytes, so return here and try again
+ // once more bytes a ready.
+ return;
+ }
+
+ if (SslHandler.isEncrypted(in))
+ {
+ // Connection uses SSL/TLS, replace the detection handler with a SslHandler and so use encryption.
+ SslContext sslContext = SSLFactory.getSslContext(encryptionOptions, true, true);
+ Channel channel = ctx.channel();
+ InetSocketAddress peer = encryptionOptions.require_endpoint_verification ? (InetSocketAddress) channel.remoteAddress() : null;
+ SslHandler sslHandler = NettyFactory.newSslHandler(channel, sslContext, peer);
+ ctx.pipeline().replace(this, NettyFactory.SSL_CHANNEL_HANDLER_NAME, sslHandler);
+ }
+ else
+ {
+ // Connection use no TLS/SSL encryption, just remove the detection handler and continue without
+ // SslHandler in the pipeline.
+ ctx.pipeline().remove(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
index c834bd4..6b2ff0d 100644
--- a/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
+++ b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
@@ -120,6 +120,12 @@ public class OutboundConnectionIdentifier
return new OutboundConnectionIdentifier(localAddr, remoteAddr, remoteConnectionAddr, connectionType);
}
+ public OutboundConnectionIdentifier withNewConnectionPort(int port)
+ {
+ return new OutboundConnectionIdentifier(localAddr, new InetSocketAddress(remoteAddr.getAddress(), port),
+ new InetSocketAddress(remoteConnectionAddr.getAddress(), port), connectionType);
+ }
+
/**
* The local node address.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java b/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
index 6bda9cd..4522ba4 100644
--- a/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
+++ b/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
@@ -271,6 +271,7 @@ public class OutboundMessagingConnection
}
boolean compress = shouldCompressConnection(connectionId.local(), connectionId.remote());
+ maybeUpdateConnectionId();
Bootstrap bootstrap = buildBootstrap(compress);
ChannelFuture connectFuture = bootstrap.connect();
@@ -289,12 +290,38 @@ public class OutboundMessagingConnection
|| ((DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.dc) && !isLocalDC(localHost, remoteHost));
}
+ /**
+ * After a bounce we won't necessarily know the peer's version, so we assume the peer is at least 4.0
+ * and thus using a single port for secure and non-secure communication. However, during a rolling upgrade from
+ * 3.0.x/3.x to 4.0, the not-yet upgraded peer is still listening on separate ports, but we don't know the peer's
+ * version until we can successfully connect. Fortunately, the peer can connect to this node, at which point
+ * we'll grab it's version. We then use that knowledge to use the {@link Config#ssl_storage_port} to connect on,
+ * and to do that we need to update some member fields in this instance.
+ *
+ * Note: can be removed at 5.0
+ */
+ void maybeUpdateConnectionId()
+ {
+ if (encryptionOptions != null)
+ {
+ int version = MessagingService.instance().getVersion(connectionId.remote());
+ if (version < targetVersion)
+ {
+ targetVersion = version;
+ int port = MessagingService.instance().portFor(connectionId.remote());
+ connectionId = connectionId.withNewConnectionPort(port);
+ logger.debug("changing connectionId to {}, with a different port for secure communication, because peer version is {}", connectionId, version);
+ }
+ }
+ }
+
private Bootstrap buildBootstrap(boolean compress)
{
boolean tcpNoDelay = isLocalDC(connectionId.local(), connectionId.remote()) ? INTRADC_TCP_NODELAY : DatabaseDescriptor.getInterDCTcpNoDelay();
int sendBufferSize = DatabaseDescriptor.getInternodeSendBufferSize() > 0
? DatabaseDescriptor.getInternodeSendBufferSize()
: OutboundConnectionParams.DEFAULT_SEND_BUFFER_SIZE;
+
OutboundConnectionParams params = OutboundConnectionParams.builder()
.connectionId(connectionId)
.callback(this::finishHandshake)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 0381416..b6351f9 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -198,7 +198,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
this.index = index;
OutboundConnectionIdentifier id = OutboundConnectionIdentifier.stream(new InetSocketAddress(FBUtilities.getBroadcastAddress(), 0),
- new InetSocketAddress(connecting, MessagingService.portFor(connecting)));
+ new InetSocketAddress(connecting, MessagingService.instance().portFor(connecting)));
this.messageSender = new NettyStreamingMessageSender(this, id, factory, StreamMessage.CURRENT_VERSION, previewKind.isPreview());
this.metrics = StreamingMetrics.get(connecting);
this.keepSSTableLevel = keepSSTableLevel;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index e7b812f..0812e53 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -242,7 +242,7 @@ public class BulkLoader
}
}
- private static SSLOptions buildSSLOptions(EncryptionOptions.ClientEncryptionOptions clientEncryptionOptions)
+ private static SSLOptions buildSSLOptions(EncryptionOptions clientEncryptionOptions)
{
if (!clientEncryptionOptions.enabled)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/tools/LoaderOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/LoaderOptions.java b/src/java/org/apache/cassandra/tools/LoaderOptions.java
index 38317b6..c821e6a 100644
--- a/src/java/org/apache/cassandra/tools/LoaderOptions.java
+++ b/src/java/org/apache/cassandra/tools/LoaderOptions.java
@@ -77,7 +77,7 @@ public class LoaderOptions
public final int interDcThrottle;
public final int storagePort;
public final int sslStoragePort;
- public final EncryptionOptions.ClientEncryptionOptions clientEncOptions;
+ public final EncryptionOptions clientEncOptions;
public final int connectionsPerHost;
public final EncryptionOptions.ServerEncryptionOptions serverEncOptions;
public final Set<InetAddress> hosts;
@@ -119,7 +119,7 @@ public class LoaderOptions
int interDcThrottle = 0;
int storagePort;
int sslStoragePort;
- EncryptionOptions.ClientEncryptionOptions clientEncOptions = new EncryptionOptions.ClientEncryptionOptions();
+ EncryptionOptions clientEncOptions = new EncryptionOptions();
int connectionsPerHost = 1;
EncryptionOptions.ServerEncryptionOptions serverEncOptions = new EncryptionOptions.ServerEncryptionOptions();
Set<InetAddress> hosts = new HashSet<>();
@@ -208,7 +208,7 @@ public class LoaderOptions
return this;
}
- public Builder encOptions(EncryptionOptions.ClientEncryptionOptions encOptions)
+ public Builder encOptions(EncryptionOptions encOptions)
{
this.clientEncOptions = encOptions;
return this;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java
index 4793d17..3632175 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -28,6 +28,7 @@ import com.google.common.base.Splitter;
import org.apache.cassandra.auth.PasswordAuthenticator;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.marshal.Int32Type;
@@ -37,13 +38,11 @@ import org.apache.cassandra.utils.Hex;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.MD5Digest;
-import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
-
public class Client extends SimpleClient
{
private final SimpleEventHandler eventHandler = new SimpleEventHandler();
- public Client(String host, int port, ProtocolVersion version, ClientEncryptionOptions encryptionOptions)
+ public Client(String host, int port, ProtocolVersion version, EncryptionOptions encryptionOptions)
{
super(host, port, version, encryptionOptions);
setEventHandler(eventHandler);
@@ -248,7 +247,7 @@ public class Client extends SimpleClient
int port = Integer.parseInt(args[1]);
ProtocolVersion version = args.length == 3 ? ProtocolVersion.decode(Integer.parseInt(args[2])) : ProtocolVersion.CURRENT;
- ClientEncryptionOptions encryptionOptions = new ClientEncryptionOptions();
+ EncryptionOptions encryptionOptions = new EncryptionOptions();
System.out.println("CQL binary protocol console " + host + "@" + port + " using native protocol version " + version);
new Client(host, port, version, encryptionOptions).run();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 9408a3a..d3f1c2c 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -137,7 +137,7 @@ public class Server implements CassandraDaemon.Server
if (this.useSSL)
{
- final EncryptionOptions.ClientEncryptionOptions clientEnc = DatabaseDescriptor.getClientEncryptionOptions();
+ final EncryptionOptions clientEnc = DatabaseDescriptor.getClientEncryptionOptions();
if (clientEnc.optional)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index ddd3484..9c1fb07 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -41,6 +41,7 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.ssl.SslContext;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.netty.util.internal.logging.Slf4JLoggerFactory;
+import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.security.SSLFactory;
@@ -56,7 +57,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
-import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
+import io.netty.handler.ssl.SslHandler;
public class SimpleClient implements Closeable
{
@@ -68,7 +69,7 @@ public class SimpleClient implements Closeable
private static final Logger logger = LoggerFactory.getLogger(SimpleClient.class);
public final String host;
public final int port;
- private final ClientEncryptionOptions encryptionOptions;
+ private final EncryptionOptions encryptionOptions;
protected final ResponseHandler responseHandler = new ResponseHandler();
protected final Connection.Tracker tracker = new ConnectionTracker();
@@ -87,22 +88,22 @@ public class SimpleClient implements Closeable
}
};
- public SimpleClient(String host, int port, ProtocolVersion version, ClientEncryptionOptions encryptionOptions)
+ public SimpleClient(String host, int port, ProtocolVersion version, EncryptionOptions encryptionOptions)
{
this(host, port, version, false, encryptionOptions);
}
- public SimpleClient(String host, int port, ClientEncryptionOptions encryptionOptions)
+ public SimpleClient(String host, int port, EncryptionOptions encryptionOptions)
{
this(host, port, ProtocolVersion.CURRENT, encryptionOptions);
}
public SimpleClient(String host, int port, ProtocolVersion version)
{
- this(host, port, version, new ClientEncryptionOptions());
+ this(host, port, version, new EncryptionOptions());
}
- public SimpleClient(String host, int port, ProtocolVersion version, boolean useBeta, ClientEncryptionOptions encryptionOptions)
+ public SimpleClient(String host, int port, ProtocolVersion version, boolean useBeta, EncryptionOptions encryptionOptions)
{
this.host = host;
this.port = port;
@@ -115,7 +116,7 @@ public class SimpleClient implements Closeable
public SimpleClient(String host, int port)
{
- this(host, port, new ClientEncryptionOptions());
+ this(host, port, new EncryptionOptions());
}
public void connect(boolean useCompression) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 3faa034..1cb59d4 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -835,7 +835,7 @@ public class FBUtilities
}
@VisibleForTesting
- protected static void reset()
+ public static void reset()
{
localInetAddress = null;
broadcastInetAddress = null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
index a082d56..f0a959e 100644
--- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
+++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
@@ -36,6 +36,7 @@ import java.util.regex.*;
import java.util.regex.Matcher;
import com.google.common.collect.Iterables;
+import com.google.common.net.InetAddresses;
import com.codahale.metrics.Timer;
@@ -43,6 +44,7 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import org.apache.cassandra.auth.IInternodeAuthenticator;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.monitoring.ApproximateTime;
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -77,7 +79,9 @@ public class MessagingServiceTest
}
};
- static final IInternodeAuthenticator originalAuthenticator = DatabaseDescriptor.getInternodeAuthenticator();
+ private static IInternodeAuthenticator originalAuthenticator;
+ private static ServerEncryptionOptions originalServerEncryptionOptions;
+ private static InetAddress originalListenAddress;
private final MessagingService messagingService = MessagingService.test();
@@ -87,6 +91,9 @@ public class MessagingServiceTest
DatabaseDescriptor.daemonInitialization();
DatabaseDescriptor.setBackPressureStrategy(new MockBackPressureStrategy(Collections.emptyMap()));
DatabaseDescriptor.setBroadcastAddress(InetAddress.getByName("127.0.0.1"));
+ originalAuthenticator = DatabaseDescriptor.getInternodeAuthenticator();
+ originalServerEncryptionOptions = DatabaseDescriptor.getServerEncryptionOptions();
+ originalListenAddress = DatabaseDescriptor.getListenAddress();
}
private static int metricScopeId = 0;
@@ -101,9 +108,13 @@ public class MessagingServiceTest
}
@After
- public void replaceAuthenticator()
+ public void tearDown()
{
DatabaseDescriptor.setInternodeAuthenticator(originalAuthenticator);
+ DatabaseDescriptor.setServerEncryptionOptions(originalServerEncryptionOptions);
+ DatabaseDescriptor.setShouldListenOnBroadcastAddress(false);
+ DatabaseDescriptor.setListenAddress(originalListenAddress);
+ FBUtilities.reset();
}
@Test
@@ -465,39 +476,188 @@ public class MessagingServiceTest
@Test
public void testCloseInboundConnections() throws UnknownHostException, InterruptedException
{
- messagingService.listen();
- Assert.assertTrue(messagingService.isListening());
- Assert.assertTrue(messagingService.serverChannels.size() > 0);
- for (ServerChannel serverChannel : messagingService.serverChannels)
- Assert.assertEquals(0, serverChannel.size());
-
- // now, create a connection and make sure it's in a channel group
- InetSocketAddress server = new InetSocketAddress(FBUtilities.getBroadcastAddress(), DatabaseDescriptor.getStoragePort());
- OutboundConnectionIdentifier id = OutboundConnectionIdentifier.small(new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 0), server);
-
- CountDownLatch latch = new CountDownLatch(1);
- OutboundConnectionParams params = OutboundConnectionParams.builder()
- .mode(NettyFactory.Mode.MESSAGING)
- .sendBufferSize(1 << 10)
- .connectionId(id)
- .callback(handshakeResult -> latch.countDown())
- .protocolVersion(MessagingService.current_version)
- .build();
- Bootstrap bootstrap = NettyFactory.instance.createOutboundBootstrap(params);
- Channel channel = bootstrap.connect().awaitUninterruptibly().channel();
- Assert.assertNotNull(channel);
- latch.await(1, TimeUnit.SECONDS); // allow the netty pipeline/c* handshake to get set up
-
- int connectCount = 0;
- for (ServerChannel serverChannel : messagingService.serverChannels)
- connectCount += serverChannel.size();
- Assert.assertTrue(connectCount > 0);
-
- // last, shutdown the MS and make sure connections are removed
- messagingService.shutdown();
- for (ServerChannel serverChannel : messagingService.serverChannels)
- Assert.assertEquals(0, serverChannel.size());
+ try
+ {
+ messagingService.listen();
+ Assert.assertTrue(messagingService.isListening());
+ Assert.assertTrue(messagingService.serverChannels.size() > 0);
+ for (ServerChannel serverChannel : messagingService.serverChannels)
+ Assert.assertEquals(0, serverChannel.size());
+
+ // now, create a connection and make sure it's in a channel group
+ InetSocketAddress server = new InetSocketAddress(FBUtilities.getBroadcastAddress(), DatabaseDescriptor.getStoragePort());
+ OutboundConnectionIdentifier id = OutboundConnectionIdentifier.small(new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 0), server);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ OutboundConnectionParams params = OutboundConnectionParams.builder()
+ .mode(NettyFactory.Mode.MESSAGING)
+ .sendBufferSize(1 << 10)
+ .connectionId(id)
+ .callback(handshakeResult -> latch.countDown())
+ .protocolVersion(MessagingService.current_version)
+ .build();
+ Bootstrap bootstrap = NettyFactory.instance.createOutboundBootstrap(params);
+ Channel channel = bootstrap.connect().awaitUninterruptibly().channel();
+ Assert.assertNotNull(channel);
+ latch.await(1, TimeUnit.SECONDS); // allow the netty pipeline/c* handshake to get set up
+
+ int connectCount = 0;
+ for (ServerChannel serverChannel : messagingService.serverChannels)
+ connectCount += serverChannel.size();
+ Assert.assertTrue(connectCount > 0);
+ }
+ finally
+ {
+ // last, shutdown the MS and make sure connections are removed
+ messagingService.shutdown(true);
+ for (ServerChannel serverChannel : messagingService.serverChannels)
+ Assert.assertEquals(0, serverChannel.size());
+ messagingService.clearServerChannels();
+ }
+ }
+
+ @Test
+ public void listenPlainConnection()
+ {
+ ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions();
+ serverEncryptionOptions.enabled = false;
+ listen(serverEncryptionOptions, false);
+ }
+
+ @Test
+ public void listenPlainConnectionWithBroadcastAddr()
+ {
+ ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions();
+ serverEncryptionOptions.enabled = false;
+ listen(serverEncryptionOptions, true);
+ }
+
+ @Test
+ public void listenRequiredSecureConnection()
+ {
+ ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions();
+ serverEncryptionOptions.enabled = true;
+ serverEncryptionOptions.optional = false;
+ serverEncryptionOptions.enable_legacy_ssl_storage_port = false;
+ listen(serverEncryptionOptions, false);
+ }
+
+ @Test
+ public void listenRequiredSecureConnectionWithBroadcastAddr()
+ {
+ ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions();
+ serverEncryptionOptions.enabled = true;
+ serverEncryptionOptions.optional = false;
+ serverEncryptionOptions.enable_legacy_ssl_storage_port = false;
+ listen(serverEncryptionOptions, true);
+ }
+
+ @Test
+ public void listenRequiredSecureConnectionWithLegacyPort()
+ {
+ ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions();
+ serverEncryptionOptions.enabled = true;
+ serverEncryptionOptions.optional = false;
+ serverEncryptionOptions.enable_legacy_ssl_storage_port = true;
+ listen(serverEncryptionOptions, false);
+ }
+
+ @Test
+ public void listenRequiredSecureConnectionWithBroadcastAddrAndLegacyPort()
+ {
+ ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions();
+ serverEncryptionOptions.enabled = true;
+ serverEncryptionOptions.optional = false;
+ serverEncryptionOptions.enable_legacy_ssl_storage_port = true;
+ listen(serverEncryptionOptions, true);
}
+ @Test
+ public void listenOptionalSecureConnection()
+ {
+ ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions();
+ serverEncryptionOptions.enabled = true;
+ serverEncryptionOptions.optional = true;
+ listen(serverEncryptionOptions, false);
+ }
+
+ @Test
+ public void listenOptionalSecureConnectionWithBroadcastAddr()
+ {
+ ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions();
+ serverEncryptionOptions.enabled = true;
+ serverEncryptionOptions.optional = true;
+ listen(serverEncryptionOptions, true);
+ }
+
+ private void listen(ServerEncryptionOptions serverEncryptionOptions, boolean listenOnBroadcastAddr)
+ {
+ InetAddress listenAddress = null;
+ if (listenOnBroadcastAddr)
+ {
+ DatabaseDescriptor.setShouldListenOnBroadcastAddress(true);
+ listenAddress = InetAddresses.increment(FBUtilities.getBroadcastAddress());
+ DatabaseDescriptor.setListenAddress(listenAddress);
+ FBUtilities.reset();
+ }
+
+ try
+ {
+ messagingService.listen(serverEncryptionOptions);
+ Assert.assertTrue(messagingService.isListening());
+ int expectedListeningCount = NettyFactory.determineAcceptGroupSize(serverEncryptionOptions);
+ Assert.assertEquals(expectedListeningCount, messagingService.serverChannels.size());
+
+ if (!serverEncryptionOptions.enabled)
+ {
+ // make sure no channel is using TLS
+ for (ServerChannel serverChannel : messagingService.serverChannels)
+ Assert.assertEquals(ServerChannel.SecurityLevel.NONE, serverChannel.getSecurityLevel());
+ }
+ else
+ {
+ final int legacySslPort = DatabaseDescriptor.getSSLStoragePort();
+ boolean foundLegacyListenSslAddress = false;
+ for (ServerChannel serverChannel : messagingService.serverChannels)
+ {
+ if (serverEncryptionOptions.optional)
+ Assert.assertEquals(ServerChannel.SecurityLevel.OPTIONAL, serverChannel.getSecurityLevel());
+ else
+ Assert.assertEquals(ServerChannel.SecurityLevel.REQUIRED, serverChannel.getSecurityLevel());
+
+ if (serverEncryptionOptions.enable_legacy_ssl_storage_port)
+ {
+ if (legacySslPort == serverChannel.getAddress().getPort())
+ {
+ foundLegacyListenSslAddress = true;
+ Assert.assertEquals(ServerChannel.SecurityLevel.REQUIRED, serverChannel.getSecurityLevel());
+ }
+ }
+ }
+
+ if (serverEncryptionOptions.enable_legacy_ssl_storage_port && !foundLegacyListenSslAddress)
+ Assert.fail("failed to find legacy ssl listen address");
+ }
+ // check the optional listen address
+ if (listenOnBroadcastAddr)
+ {
+ int expectedCount = (serverEncryptionOptions.enabled && serverEncryptionOptions.enable_legacy_ssl_storage_port) ? 2 : 1;
+ int found = 0;
+ for (ServerChannel serverChannel : messagingService.serverChannels)
+ {
+ if (serverChannel.getAddress().getAddress().equals(listenAddress))
+ found++;
+ }
+
+ Assert.assertEquals(expectedCount, found);
+ }
+ }
+ finally
+ {
+ messagingService.shutdown(true);
+ messagingService.clearServerChannels();
+ Assert.assertEquals(0, messagingService.serverChannels.size());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java b/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
index 67b221a..0550490 100644
--- a/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
+++ b/test/unit/org/apache/cassandra/net/async/NettyFactoryTest.java
@@ -31,7 +31,6 @@ import org.junit.Test;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
-import io.netty.channel.DefaultEventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
@@ -44,10 +43,9 @@ import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.apache.cassandra.auth.AllowAllInternodeAuthenticator;
import org.apache.cassandra.auth.IInternodeAuthenticator;
-import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
-import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions.InternodeEncryption;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.async.NettyFactory.InboundInitializer;
@@ -154,10 +152,15 @@ public class NettyFactoryTest
@Test
public void deterineAcceptGroupSize()
{
- Assert.assertEquals(1, NettyFactory.determineAcceptGroupSize(InternodeEncryption.none));
- Assert.assertEquals(1, NettyFactory.determineAcceptGroupSize(InternodeEncryption.all));
- Assert.assertEquals(2, NettyFactory.determineAcceptGroupSize(InternodeEncryption.rack));
- Assert.assertEquals(2, NettyFactory.determineAcceptGroupSize(InternodeEncryption.dc));
+ ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions();
+ serverEncryptionOptions.enabled = false;
+ Assert.assertEquals(1, NettyFactory.determineAcceptGroupSize(serverEncryptionOptions));
+ serverEncryptionOptions.enabled = true;
+ Assert.assertEquals(1, NettyFactory.determineAcceptGroupSize(serverEncryptionOptions));
+
+ serverEncryptionOptions.enable_legacy_ssl_storage_port = true;
+ Assert.assertEquals(2, NettyFactory.determineAcceptGroupSize(serverEncryptionOptions));
+ serverEncryptionOptions.enable_legacy_ssl_storage_port = false;
InetAddress originalBroadcastAddr = FBUtilities.getBroadcastAddress();
try
@@ -165,10 +168,13 @@ public class NettyFactoryTest
FBUtilities.setBroadcastInetAddress(InetAddresses.increment(FBUtilities.getLocalAddress()));
DatabaseDescriptor.setListenOnBroadcastAddress(true);
- Assert.assertEquals(2, NettyFactory.determineAcceptGroupSize(InternodeEncryption.none));
- Assert.assertEquals(2, NettyFactory.determineAcceptGroupSize(InternodeEncryption.all));
- Assert.assertEquals(4, NettyFactory.determineAcceptGroupSize(InternodeEncryption.rack));
- Assert.assertEquals(4, NettyFactory.determineAcceptGroupSize(InternodeEncryption.dc));
+ serverEncryptionOptions.enabled = false;
+ Assert.assertEquals(2, NettyFactory.determineAcceptGroupSize(serverEncryptionOptions));
+ serverEncryptionOptions.enabled = true;
+ Assert.assertEquals(2, NettyFactory.determineAcceptGroupSize(serverEncryptionOptions));
+
+ serverEncryptionOptions.enable_legacy_ssl_storage_port = true;
+ Assert.assertEquals(4, NettyFactory.determineAcceptGroupSize(serverEncryptionOptions));
}
finally
{
@@ -263,10 +269,13 @@ public class NettyFactoryTest
@Test
public void createInboundInitializer_WithoutSsl() throws Exception
{
- InboundInitializer initializer = new InboundInitializer(AUTHENTICATOR, null, channelGroup);
+ ServerEncryptionOptions encryptionOptions = new ServerEncryptionOptions();
+ encryptionOptions.enabled = false;
+ InboundInitializer initializer = new InboundInitializer(AUTHENTICATOR, encryptionOptions, channelGroup);
NioSocketChannel channel = new NioSocketChannel();
initializer.initChannel(channel);
Assert.assertNull(channel.pipeline().get(SslHandler.class));
+ Assert.assertNull(channel.pipeline().get(OptionalSslHandler.class));
}
private ServerEncryptionOptions encOptions()
@@ -281,15 +290,33 @@ public class NettyFactoryTest
encryptionOptions.cipher_suites = new String[] {"TLS_RSA_WITH_AES_128_CBC_SHA"};
return encryptionOptions;
}
+
@Test
public void createInboundInitializer_WithSsl() throws Exception
{
ServerEncryptionOptions encryptionOptions = encOptions();
+ encryptionOptions.enabled = true;
+ encryptionOptions.optional = false;
InboundInitializer initializer = new InboundInitializer(AUTHENTICATOR, encryptionOptions, channelGroup);
NioSocketChannel channel = new NioSocketChannel();
Assert.assertNull(channel.pipeline().get(SslHandler.class));
initializer.initChannel(channel);
Assert.assertNotNull(channel.pipeline().get(SslHandler.class));
+ Assert.assertNull(channel.pipeline().get(OptionalSslHandler.class));
+ }
+
+ @Test
+ public void createInboundInitializer_WithOptionalSsl() throws Exception
+ {
+ ServerEncryptionOptions encryptionOptions = encOptions();
+ encryptionOptions.enabled = true;
+ encryptionOptions.optional = true;
+ InboundInitializer initializer = new InboundInitializer(AUTHENTICATOR, encryptionOptions, channelGroup);
+ NioSocketChannel channel = new NioSocketChannel();
+ Assert.assertNull(channel.pipeline().get(SslHandler.class));
+ initializer.initChannel(channel);
+ Assert.assertNotNull(channel.pipeline().get(OptionalSslHandler.class));
+ Assert.assertNull(channel.pipeline().get(SslHandler.class));
}
@Test
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java b/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
index d6dd633..641c28c 100644
--- a/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
+++ b/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.auth.AllowAllInternodeAuthenticator;
import org.apache.cassandra.auth.IInternodeAuthenticator;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.AbstractEndpointSnitch;
import org.apache.cassandra.locator.IEndpointSnitch;
@@ -68,6 +69,7 @@ public class OutboundMessagingConnectionTest
private EmbeddedChannel channel;
private IEndpointSnitch snitch;
+ private ServerEncryptionOptions encryptionOptions;
@BeforeClass
public static void before()
@@ -84,12 +86,14 @@ public class OutboundMessagingConnectionTest
omc.setChannelWriter(ChannelWriter.create(channel, omc::handleMessageResult, Optional.empty()));
snitch = DatabaseDescriptor.getEndpointSnitch();
+ encryptionOptions = DatabaseDescriptor.getServerEncryptionOptions();
}
@After
public void tearDown()
{
DatabaseDescriptor.setEndpointSnitch(snitch);
+ DatabaseDescriptor.setServerEncryptionOptions(encryptionOptions);
channel.finishAndReleaseAll();
}
@@ -471,4 +475,45 @@ public class OutboundMessagingConnectionTest
Assert.assertNotSame(omc.getConnectionId(), originalId);
Assert.assertSame(NOT_READY, omc.getState());
}
+
+ @Test
+ public void maybeUpdateConnectionId_NoEncryption()
+ {
+ OutboundConnectionIdentifier connectionId = omc.getConnectionId();
+ int version = omc.getTargetVersion();
+ omc.maybeUpdateConnectionId();
+ Assert.assertEquals(connectionId, omc.getConnectionId());
+ Assert.assertEquals(version, omc.getTargetVersion());
+ }
+
+ @Test
+ public void maybeUpdateConnectionId_SameVersion()
+ {
+ ServerEncryptionOptions encryptionOptions = new ServerEncryptionOptions();
+ omc = new OutboundMessagingConnection(connectionId, encryptionOptions, Optional.empty(), new AllowAllInternodeAuthenticator());
+ OutboundConnectionIdentifier connectionId = omc.getConnectionId();
+ int version = omc.getTargetVersion();
+ omc.maybeUpdateConnectionId();
+ Assert.assertEquals(connectionId, omc.getConnectionId());
+ Assert.assertEquals(version, omc.getTargetVersion());
+ }
+
+ @Test
+ public void maybeUpdateConnectionId_3_X_Version()
+ {
+ ServerEncryptionOptions encryptionOptions = new ServerEncryptionOptions();
+ encryptionOptions.enabled = true;
+ encryptionOptions.internode_encryption = ServerEncryptionOptions.InternodeEncryption.all;
+ DatabaseDescriptor.setServerEncryptionOptions(encryptionOptions);
+ omc = new OutboundMessagingConnection(connectionId, encryptionOptions, Optional.empty(), new AllowAllInternodeAuthenticator());
+ int peerVersion = MessagingService.VERSION_30;
+ MessagingService.instance().setVersion(connectionId.remote(), MessagingService.VERSION_30);
+
+ OutboundConnectionIdentifier connectionId = omc.getConnectionId();
+ omc.maybeUpdateConnectionId();
+ Assert.assertNotEquals(connectionId, omc.getConnectionId());
+ Assert.assertEquals(new InetSocketAddress(REMOTE_ADDR.getAddress(), DatabaseDescriptor.getSSLStoragePort()), omc.getConnectionId().remoteAddress());
+ Assert.assertEquals(new InetSocketAddress(REMOTE_ADDR.getAddress(), DatabaseDescriptor.getSSLStoragePort()), omc.getConnectionId().connectionAddress());
+ Assert.assertEquals(peerVersion, omc.getTargetVersion());
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java b/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java
index 0c51eb7..4ade4ad 100644
--- a/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java
+++ b/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java
@@ -68,7 +68,7 @@ public class ProtocolBetaVersionTest extends CQLTester
createTable("CREATE TABLE %s (pk int PRIMARY KEY, v int)");
assertTrue(betaVersion.isBeta()); // change to another beta version or remove test if no beta version
- try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, betaVersion, true, new EncryptionOptions.ClientEncryptionOptions()))
+ try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, betaVersion, true, new EncryptionOptions()))
{
client.connect(false);
for (int i = 0; i < 10; i++)
@@ -103,7 +103,7 @@ public class ProtocolBetaVersionTest extends CQLTester
}
assertTrue(betaVersion.isBeta()); // change to another beta version or remove test if no beta version
- try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, betaVersion, false, new EncryptionOptions.ClientEncryptionOptions()))
+ try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, betaVersion, false, new EncryptionOptions()))
{
client.connect(false);
fail("Exception should have been thrown");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
index 817cb06..5b8067e 100644
--- a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
+++ b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
@@ -29,6 +29,7 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.cql3.BatchQueryOptions;
import org.apache.cassandra.cql3.CQLStatement;
import org.apache.cassandra.cql3.QueryHandler;
@@ -48,7 +49,6 @@ import org.apache.cassandra.transport.messages.QueryMessage;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.MD5Digest;
-import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
public class MessagePayloadTest extends CQLTester
@@ -127,7 +127,7 @@ public class MessagePayloadTest extends CQLTester
nativePort,
ProtocolVersion.V5,
true,
- new ClientEncryptionOptions());
+ new EncryptionOptions());
try
{
client.connect(false);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTransport.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTransport.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTransport.java
index a6248bb..6acc500 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTransport.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsTransport.java
@@ -23,7 +23,6 @@ package org.apache.cassandra.stress.settings;
import java.io.Serializable;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -39,9 +38,9 @@ public class SettingsTransport implements Serializable
this.options = options;
}
- public EncryptionOptions.ClientEncryptionOptions getEncryptionOptions()
+ public EncryptionOptions getEncryptionOptions()
{
- EncryptionOptions.ClientEncryptionOptions encOptions = new EncryptionOptions.ClientEncryptionOptions();
+ EncryptionOptions encOptions = new EncryptionOptions();
if (options.trustStore.present())
{
encOptions.enabled = true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
index a27b986..af35490 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
@@ -134,7 +134,7 @@ public class StressSettings implements Serializable
if (client != null)
return client;
- EncryptionOptions.ClientEncryptionOptions encOptions = transport.getEncryptionOptions();
+ EncryptionOptions encOptions = transport.getEncryptionOptions();
JavaDriverClient c = new JavaDriverClient(this, currentNode, port.nativePort, encOptions);
c.connect(mode.compression());
if (keyspace != null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26084668/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
index d404653..4928cd2 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
@@ -50,7 +50,7 @@ public class JavaDriverClient
public final int connectionsPerHost;
private final ProtocolVersion protocolVersion;
- private final EncryptionOptions.ClientEncryptionOptions encryptionOptions;
+ private final EncryptionOptions encryptionOptions;
private Cluster cluster;
private Session session;
private final LoadBalancingPolicy loadBalancingPolicy;
@@ -59,10 +59,10 @@ public class JavaDriverClient
public JavaDriverClient(StressSettings settings, String host, int port)
{
- this(settings, host, port, new EncryptionOptions.ClientEncryptionOptions());
+ this(settings, host, port, new EncryptionOptions());
}
- public JavaDriverClient(StressSettings settings, String host, int port, EncryptionOptions.ClientEncryptionOptions encryptionOptions)
+ public JavaDriverClient(StressSettings settings, String host, int port, EncryptionOptions encryptionOptions)
{
this.protocolVersion = settings.mode.protocolVersion;
this.host = host;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org