You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2021/01/29 10:57:31 UTC

[cassandra] branch trunk updated (0a9e129 -> cd20f88)

This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 0a9e129  Upgrade Jacoco to 0.8.6 (for Java 11 support) Patch by Jacek Lewandowski; reviewed by Michael Semb Wever for CASSANDRA-16365
     add c2f24d2  Fix inbound TLS enforcement
     add aa317ae  Merge branch 'cassandra-3.0' into cassandra-3.11
     new cd20f88  Merge branch 'cassandra-3.11' into trunk

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/cassandra/config/EncryptionOptions.java |  18 +++
 .../cassandra/net/InboundConnectionInitiator.java  |  18 +++
 .../cassandra/net/InboundMessageHandlers.java      |   8 ++
 .../test/InternodeEncryptionEnforcementTest.java   | 134 +++++++++++++++++++++
 4 files changed, 178 insertions(+)
 create mode 100644 test/distributed/org/apache/cassandra/distributed/test/InternodeEncryptionEnforcementTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.11' into trunk

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit cd20f882820230980107ce5737c47a78a6fc4901
Merge: 0a9e129 aa317ae
Author: Aleksey Yeshchenko <al...@apache.org>
AuthorDate: Fri Jan 29 10:55:06 2021 +0000

    Merge branch 'cassandra-3.11' into trunk

 .../apache/cassandra/config/EncryptionOptions.java |  18 +++
 .../cassandra/net/InboundConnectionInitiator.java  |  18 +++
 .../cassandra/net/InboundMessageHandlers.java      |   8 ++
 .../test/InternodeEncryptionEnforcementTest.java   | 134 +++++++++++++++++++++
 4 files changed, 178 insertions(+)

diff --cc src/java/org/apache/cassandra/config/EncryptionOptions.java
index a534e36,1b1d8ce..93668d9
--- a/src/java/org/apache/cassandra/config/EncryptionOptions.java
+++ b/src/java/org/apache/cassandra/config/EncryptionOptions.java
@@@ -466,63 -54,13 +466,72 @@@ public class EncryptionOption
              all, none, dc, rack
          }
  
 -        public InternodeEncryption internode_encryption = InternodeEncryption.none;
 +        public final InternodeEncryption internode_encryption;
 +        public final boolean enable_legacy_ssl_storage_port;
  
 -        public boolean shouldEncrypt(InetAddress endpoint)
 +        public ServerEncryptionOptions()
          {
 -            IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
 -            InetAddress local = FBUtilities.getBroadcastAddress();
 +            this.internode_encryption = InternodeEncryption.none;
 +            this.enable_legacy_ssl_storage_port = false;
 +        }
 +
 +        public ServerEncryptionOptions(String keystore, String keystore_password, String truststore,
 +                                       String truststore_password, List<String> cipher_suites, String protocol,
 +                                       List<String> accepted_protocols, String algorithm, String store_type,
 +                                       boolean require_client_auth, boolean require_endpoint_verification,
 +                                       Boolean optional, InternodeEncryption internode_encryption,
 +                                       boolean enable_legacy_ssl_storage_port)
 +        {
 +            super(keystore, keystore_password, truststore, truststore_password, cipher_suites, protocol,
 +                  accepted_protocols, algorithm, store_type, require_client_auth, require_endpoint_verification,
 +                  null, optional);
 +            this.internode_encryption = internode_encryption;
 +            this.enable_legacy_ssl_storage_port = enable_legacy_ssl_storage_port;
 +        }
 +
 +        public ServerEncryptionOptions(ServerEncryptionOptions options)
 +        {
 +            super(options);
 +            this.internode_encryption = options.internode_encryption;
 +            this.enable_legacy_ssl_storage_port = options.enable_legacy_ssl_storage_port;
 +        }
 +
 +        @Override
 +        public EncryptionOptions applyConfig()
 +        {
 +            return applyConfigInternal();
 +        }
  
 +        private ServerEncryptionOptions applyConfigInternal()
 +        {
 +            super.applyConfig();
 +
 +            isEnabled = this.internode_encryption != InternodeEncryption.none;
 +
 +            if (this.enabled != null && this.enabled && !isEnabled)
 +            {
 +                logger.warn("Setting server_encryption_options.enabled has no effect, use internode_encryption");
 +            }
 +
++            if (require_client_auth && (internode_encryption == InternodeEncryption.rack || internode_encryption == InternodeEncryption.dc))
++            {
++                logger.warn("Setting require_client_auth is incompatible with 'rack' and 'dc' internode_encryption values."
++                          + " It is possible for an internode connection to pretend to be in the same rack/dc by spoofing"
++                          + " its broadcast address in the handshake and bypass authentication. To ensure that mutual TLS"
++                          + " authentication is not bypassed, please set internode_encryption to 'all'. Continuing with"
++                          + " insecure configuration.");
++            }
++
 +            // regardless of the optional flag, if the internode encryption is set to rack or dc
 +            // it must be optional so that unencrypted connections within the rack or dc can be established.
 +            isOptional = super.isOptional || internode_encryption == InternodeEncryption.rack || internode_encryption == InternodeEncryption.dc;
 +
 +            return this;
 +        }
 +
 +        public boolean shouldEncrypt(InetAddressAndPort endpoint)
 +        {
 +            IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
              switch (internode_encryption)
              {
                  case none:
@@@ -543,112 -81,16 +552,121 @@@
              return true;
          }
  
 -        public void validate()
++        /**
++         * {@link #isOptional} will be set to {@code true} implicitly for {@code internode_encryption}
++         * values of "dc" and "all". This method returns the explicit, raw value of {@link #optional}
++         * as set by the user (if set at all).
++         */
++        public boolean isExplicitlyOptional()
+         {
 -            if (require_client_auth && (internode_encryption == InternodeEncryption.rack || internode_encryption == InternodeEncryption.dc))
 -            {
 -                logger.warn("Setting require_client_auth is incompatible with 'rack' and 'dc' internode_encryption values."
 -                          + " It is possible for an internode connection to pretend to be in the same rack/dc by spoofing"
 -                          + " its broadcast address in the handshake and bypass authentication. To ensure that mutual TLS"
 -                          + " authentication is not bypassed, please set internode_encryption to 'all'. Continuing with"
 -                          + " insecure configuration.");
 -            }
++            return optional != null && optional;
++        }
 +
 +        public ServerEncryptionOptions withKeyStore(String keystore)
 +        {
 +            return new ServerEncryptionOptions(keystore, keystore_password, truststore, truststore_password, cipher_suites,
 +                                               protocol, accepted_protocols, algorithm, store_type, require_client_auth, require_endpoint_verification,
 +                                               optional, internode_encryption, enable_legacy_ssl_storage_port).applyConfigInternal();
 +        }
 +
 +        public ServerEncryptionOptions withKeyStorePassword(String keystore_password)
 +        {
 +            return new ServerEncryptionOptions(keystore, keystore_password, truststore, truststore_password, cipher_suites,
 +                                               protocol, accepted_protocols, algorithm, store_type, require_client_auth, require_endpoint_verification,
 +                                               optional, internode_encryption, enable_legacy_ssl_storage_port).applyConfigInternal();
 +        }
 +
 +        public ServerEncryptionOptions withTrustStore(String truststore)
 +        {
 +            return new ServerEncryptionOptions(keystore, keystore_password, truststore, truststore_password, cipher_suites,
 +                                               protocol, accepted_protocols, algorithm, store_type, require_client_auth, require_endpoint_verification,
 +                                               optional, internode_encryption, enable_legacy_ssl_storage_port).applyConfigInternal();
 +        }
 +
 +        public ServerEncryptionOptions withTrustStorePassword(String truststore_password)
 +        {
 +            return new ServerEncryptionOptions(keystore, keystore_password, truststore, truststore_password, cipher_suites,
 +                                               protocol, accepted_protocols, algorithm, store_type, require_client_auth, require_endpoint_verification,
 +                                               optional, internode_encryption, enable_legacy_ssl_storage_port).applyConfigInternal();
 +        }
 +
 +        public ServerEncryptionOptions withCipherSuites(List<String> cipher_suites)
 +        {
 +            return new ServerEncryptionOptions(keystore, keystore_password, truststore, truststore_password, cipher_suites,
 +                                               protocol, accepted_protocols, algorithm, store_type, require_client_auth, require_endpoint_verification,
 +                                               optional, internode_encryption, enable_legacy_ssl_storage_port).applyConfigInternal();
 +        }
 +
 +        public ServerEncryptionOptions withCipherSuites(String ... cipher_suites)
 +        {
 +            return new ServerEncryptionOptions(keystore, keystore_password, truststore, truststore_password, ImmutableList.copyOf(cipher_suites),
 +                                               protocol, accepted_protocols, algorithm, store_type, require_client_auth, require_endpoint_verification,
 +                                               optional, internode_encryption, enable_legacy_ssl_storage_port).applyConfigInternal();
 +        }
 +
 +        public ServerEncryptionOptions withProtocol(String protocol)
 +        {
 +            return new ServerEncryptionOptions(keystore, keystore_password, truststore, truststore_password, cipher_suites,
 +                                               protocol, accepted_protocols, algorithm, store_type, require_client_auth, require_endpoint_verification,
 +                                               optional, internode_encryption, enable_legacy_ssl_storage_port).applyConfigInternal();
 +        }
 +
 +        public ServerEncryptionOptions withAcceptedProtocols(List<String> accepted_protocols)
 +        {
 +            return new ServerEncryptionOptions(keystore, keystore_password, truststore, truststore_password, cipher_suites,
 +                                               protocol, accepted_protocols == null ? null : ImmutableList.copyOf(accepted_protocols),
 +                                               algorithm, store_type, require_client_auth, require_endpoint_verification,
 +                                               optional, internode_encryption, enable_legacy_ssl_storage_port).applyConfigInternal();
 +        }
 +
 +        public ServerEncryptionOptions withAlgorithm(String algorithm)
 +        {
 +            return new ServerEncryptionOptions(keystore, keystore_password, truststore, truststore_password, cipher_suites,
 +                                               protocol, accepted_protocols, algorithm, store_type, require_client_auth, require_endpoint_verification,
 +                                               optional, internode_encryption, enable_legacy_ssl_storage_port).applyConfigInternal();
 +        }
 +
 +        public ServerEncryptionOptions withStoreType(String store_type)
 +        {
 +            return new ServerEncryptionOptions(keystore, keystore_password, truststore, truststore_password, cipher_suites,
 +                                               protocol, accepted_protocols, algorithm, store_type, require_client_auth, require_endpoint_verification,
 +                                               optional, internode_encryption, enable_legacy_ssl_storage_port).applyConfigInternal();
 +        }
 +
 +        public ServerEncryptionOptions withRequireClientAuth(boolean require_client_auth)
 +        {
 +            return new ServerEncryptionOptions(keystore, keystore_password, truststore, truststore_password, cipher_suites,
 +                                               protocol, accepted_protocols, algorithm, store_type, require_client_auth, require_endpoint_verification,
 +                                               optional, internode_encryption, enable_legacy_ssl_storage_port).applyConfigInternal();
          }
 +
 +        public ServerEncryptionOptions withRequireEndpointVerification(boolean require_endpoint_verification)
 +        {
 +            return new ServerEncryptionOptions(keystore, keystore_password, truststore, truststore_password, cipher_suites,
 +                                               protocol, accepted_protocols, algorithm, store_type, require_client_auth, require_endpoint_verification,
 +                                               optional, internode_encryption, enable_legacy_ssl_storage_port).applyConfigInternal();
 +        }
 +
 +        public ServerEncryptionOptions withOptional(boolean optional)
 +        {
 +            return new ServerEncryptionOptions(keystore, keystore_password, truststore, truststore_password, cipher_suites,
 +                                               protocol, accepted_protocols, algorithm, store_type, require_client_auth, require_endpoint_verification,
 +                                               optional, internode_encryption, enable_legacy_ssl_storage_port).applyConfigInternal();
 +        }
 +
 +        public ServerEncryptionOptions withInternodeEncryption(InternodeEncryption internode_encryption)
 +        {
 +            return new ServerEncryptionOptions(keystore, keystore_password, truststore, truststore_password, cipher_suites,
 +                                               protocol, accepted_protocols, algorithm, store_type, require_client_auth, require_endpoint_verification,
 +                                               optional, internode_encryption, enable_legacy_ssl_storage_port).applyConfigInternal();
 +        }
 +
 +        public ServerEncryptionOptions withLegacySslStoragePort(boolean enable_legacy_ssl_storage_port)
 +        {
 +            return new ServerEncryptionOptions(keystore, keystore_password, truststore, truststore_password, cipher_suites,
 +                                               protocol, accepted_protocols, algorithm, store_type, require_client_auth, require_endpoint_verification,
 +                                               optional, internode_encryption, enable_legacy_ssl_storage_port).applyConfigInternal();
 +        }
 +
      }
  }
diff --cc src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
index d21358a,0000000..4c31adf
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
+++ b/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
@@@ -1,527 -1,0 +1,545 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.net;
 +
 +import java.io.IOException;
 +import java.net.InetSocketAddress;
 +import java.net.SocketAddress;
 +import java.util.List;
 +import java.util.concurrent.Future;
 +import java.util.function.Consumer;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import io.netty.bootstrap.ServerBootstrap;
 +import io.netty.buffer.ByteBuf;
 +import io.netty.channel.Channel;
 +import io.netty.channel.ChannelFuture;
 +import io.netty.channel.ChannelFutureListener;
 +import io.netty.channel.ChannelHandlerContext;
 +import io.netty.channel.ChannelInitializer;
 +import io.netty.channel.ChannelOption;
 +import io.netty.channel.ChannelPipeline;
 +import io.netty.channel.group.ChannelGroup;
 +import io.netty.channel.socket.SocketChannel;
 +import io.netty.handler.codec.ByteToMessageDecoder;
 +import io.netty.handler.logging.LogLevel;
 +import io.netty.handler.logging.LoggingHandler;
 +import io.netty.handler.ssl.SslContext;
 +import io.netty.handler.ssl.SslHandler;
 +import org.apache.cassandra.config.EncryptionOptions;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.net.OutboundConnectionSettings.Framing;
 +import org.apache.cassandra.security.SSLFactory;
 +import org.apache.cassandra.streaming.async.StreamingInboundHandler;
 +import org.apache.cassandra.utils.memory.BufferPools;
 +
 +import static java.lang.Math.*;
 +import static java.util.concurrent.TimeUnit.MILLISECONDS;
 +import static org.apache.cassandra.net.MessagingService.*;
 +import static org.apache.cassandra.net.MessagingService.VERSION_40;
 +import static org.apache.cassandra.net.MessagingService.current_version;
 +import static org.apache.cassandra.net.MessagingService.minimum_version;
 +import static org.apache.cassandra.net.SocketFactory.WIRETRACE;
 +import static org.apache.cassandra.net.SocketFactory.newSslHandler;
 +
 +public class InboundConnectionInitiator
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(InboundConnectionInitiator.class);
 +
 +    private static class Initializer extends ChannelInitializer<SocketChannel>
 +    {
 +        private final InboundConnectionSettings settings;
 +        private final ChannelGroup channelGroup;
 +        private final Consumer<ChannelPipeline> pipelineInjector;
 +
 +        Initializer(InboundConnectionSettings settings, ChannelGroup channelGroup,
 +                    Consumer<ChannelPipeline> pipelineInjector)
 +        {
 +            this.settings = settings;
 +            this.channelGroup = channelGroup;
 +            this.pipelineInjector = pipelineInjector;
 +        }
 +
 +        @Override
 +        public void initChannel(SocketChannel channel) throws Exception
 +        {
 +            channelGroup.add(channel);
 +
 +            channel.config().setOption(ChannelOption.ALLOCATOR, GlobalBufferPoolAllocator.instance);
 +            channel.config().setOption(ChannelOption.SO_KEEPALIVE, true);
 +            channel.config().setOption(ChannelOption.SO_REUSEADDR, true);
 +            channel.config().setOption(ChannelOption.TCP_NODELAY, true); // we only send handshake messages; no point ever delaying
 +
 +            ChannelPipeline pipeline = channel.pipeline();
 +
 +            pipelineInjector.accept(pipeline);
 +
 +            // order of handlers: ssl -> logger -> handshakeHandler
 +            // For either unencrypted or transitional modes, allow Ssl optionally.
 +            switch(settings.encryption.tlsEncryptionPolicy())
 +            {
 +                case UNENCRYPTED:
 +                    // Handler checks for SSL connection attempts and cleanly rejects them if encryption is disabled
 +                    pipeline.addFirst("rejectssl", new RejectSslHandler());
 +                    break;
 +                case OPTIONAL:
 +                    pipeline.addFirst("ssl", new OptionalSslHandler(settings.encryption));
 +                    break;
 +                case ENCRYPTED:
 +                    SslHandler sslHandler = getSslHandler("creating", channel, settings.encryption);
 +                    pipeline.addFirst("ssl", sslHandler);
 +                    break;
 +            }
 +
 +            if (WIRETRACE)
 +                pipeline.addLast("logger", new LoggingHandler(LogLevel.INFO));
 +
 +            channel.pipeline().addLast("handshake", new Handler(settings));
 +
 +        }
 +    }
 +
 +    /**
 +     * Create a {@link Channel} that listens on the {@code localAddr}. This method will block while trying to bind to the address,
 +     * but it does not make a remote call.
 +     */
 +    private static ChannelFuture bind(Initializer initializer) throws ConfigurationException
 +    {
 +        logger.info("Listening on {}", initializer.settings);
 +
 +        ServerBootstrap bootstrap = initializer.settings.socketFactory
 +                                    .newServerBootstrap()
 +                                    .option(ChannelOption.SO_BACKLOG, 1 << 9)
 +                                    .option(ChannelOption.ALLOCATOR, GlobalBufferPoolAllocator.instance)
 +                                    .option(ChannelOption.SO_REUSEADDR, true)
 +                                    .childHandler(initializer);
 +
 +        int socketReceiveBufferSizeInBytes = initializer.settings.socketReceiveBufferSizeInBytes;
 +        if (socketReceiveBufferSizeInBytes > 0)
 +            bootstrap.childOption(ChannelOption.SO_RCVBUF, socketReceiveBufferSizeInBytes);
 +
 +        InetAddressAndPort bind = initializer.settings.bindAddress;
 +        ChannelFuture channelFuture = bootstrap.bind(new InetSocketAddress(bind.address, bind.port));
 +
 +        if (!channelFuture.awaitUninterruptibly().isSuccess())
 +        {
 +            if (channelFuture.channel().isOpen())
 +                channelFuture.channel().close();
 +
 +            Throwable failedChannelCause = channelFuture.cause();
 +
 +            String causeString = "";
 +            if (failedChannelCause != null && failedChannelCause.getMessage() != null)
 +                causeString = failedChannelCause.getMessage();
 +
 +            if (causeString.contains("in use"))
 +            {
 +                throw new ConfigurationException(bind + " is in use by another process.  Change listen_address:storage_port " +
 +                                                 "in cassandra.yaml to values that do not conflict with other services");
 +            }
 +            // looking at the jdk source, solaris/windows bind failue messages both use the phrase "cannot assign requested address".
 +            // windows message uses "Cannot" (with a capital 'C'), and solaris (a/k/a *nux) doe not. hence we search for "annot" <sigh>
 +            else if (causeString.contains("annot assign requested address"))
 +            {
 +                throw new ConfigurationException("Unable to bind to address " + bind
 +                                                 + ". Set listen_address in cassandra.yaml to an interface you can bind to, e.g., your private IP address on EC2");
 +            }
 +            else
 +            {
 +                throw new ConfigurationException("failed to bind to: " + bind, failedChannelCause);
 +            }
 +        }
 +
 +        return channelFuture;
 +    }
 +
 +    public static ChannelFuture bind(InboundConnectionSettings settings, ChannelGroup channelGroup,
 +                                     Consumer<ChannelPipeline> pipelineInjector)
 +    {
 +        return bind(new Initializer(settings, channelGroup, pipelineInjector));
 +    }
 +
 +    /**
 +     * 'Server-side' component that negotiates the internode handshake when establishing a new connection.
 +     * This handler will be the first in the netty channel for each incoming connection (secure socket (TLS) notwithstanding),
 +     * and once the handshake is successful, it will configure the proper handlers ({@link InboundMessageHandler}
 +     * or {@link StreamingInboundHandler}) and remove itself from the working pipeline.
 +     */
 +    static class Handler extends ByteToMessageDecoder
 +    {
 +        private final InboundConnectionSettings settings;
 +
 +        private HandshakeProtocol.Initiate initiate;
 +        private HandshakeProtocol.ConfirmOutboundPre40 confirmOutboundPre40;
 +
 +        /**
 +         * A future the essentially places a timeout on how long we'll wait for the peer
 +         * to complete the next step of the handshake.
 +         */
 +        private Future<?> handshakeTimeout;
 +
 +        Handler(InboundConnectionSettings settings)
 +        {
 +            this.settings = settings;
 +        }
 +
 +        /**
 +         * On registration, immediately schedule a timeout to kill this connection if it does not handshake promptly,
 +         * and authenticate the remote address.
 +         */
 +        public void handlerAdded(ChannelHandlerContext ctx) throws Exception
 +        {
 +            handshakeTimeout = ctx.executor().schedule(() -> {
 +                logger.error("Timeout handshaking with {} (on {})", SocketFactory.addressId(initiate.from, (InetSocketAddress) ctx.channel().remoteAddress()), settings.bindAddress);
 +                failHandshake(ctx);
 +            }, HandshakeProtocol.TIMEOUT_MILLIS, MILLISECONDS);
 +
 +            authenticate(ctx.channel().remoteAddress());
 +        }
 +
 +        private void authenticate(SocketAddress socketAddress) throws IOException
 +        {
 +            if (socketAddress.getClass().getSimpleName().equals("EmbeddedSocketAddress"))
 +                return;
 +
 +            if (!(socketAddress instanceof InetSocketAddress))
 +                throw new IOException(String.format("Unexpected SocketAddress type: %s, %s", socketAddress.getClass(), socketAddress));
 +
 +            InetSocketAddress addr = (InetSocketAddress)socketAddress;
 +            if (!settings.authenticate(addr.getAddress(), addr.getPort()))
 +                throw new IOException("Authentication failure for inbound connection from peer " + addr);
 +        }
 +
 +        @Override
 +        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception
 +        {
 +            if (initiate == null) initiate(ctx, in);
 +            else if (initiate.acceptVersions == null && confirmOutboundPre40 == null) confirmPre40(ctx, in);
 +            else throw new IllegalStateException("Should no longer be on pipeline");
 +        }
 +
 +        void initiate(ChannelHandlerContext ctx, ByteBuf in) throws IOException
 +        {
 +            initiate = HandshakeProtocol.Initiate.maybeDecode(in);
 +            if (initiate == null)
 +                return;
 +
 +            logger.trace("Received handshake initiation message from peer {}, message = {}", ctx.channel().remoteAddress(), initiate);
++
++            if (isEncryptionRequired(initiate.from) && !isChannelEncrypted(ctx))
++            {
++                logger.warn("peer {} attempted to establish an unencrypted connection (broadcast address {})",
++                            ctx.channel().remoteAddress(), initiate.from);
++                failHandshake(ctx);
++            }
++
 +            if (initiate.acceptVersions != null)
 +            {
 +                logger.trace("Connection version {} (min {}) from {}", initiate.acceptVersions.max, initiate.acceptVersions.min, initiate.from);
 +
 +                final AcceptVersions accept;
 +
 +                if (initiate.type.isStreaming())
 +                    accept = settings.acceptStreaming;
 +                else
 +                    accept = settings.acceptMessaging;
 +
 +                int useMessagingVersion = max(accept.min, min(accept.max, initiate.acceptVersions.max));
 +                ByteBuf flush = new HandshakeProtocol.Accept(useMessagingVersion, accept.max).encode(ctx.alloc());
 +
 +                AsyncChannelPromise.writeAndFlush(ctx, flush, (ChannelFutureListener) future -> {
 +                    if (!future.isSuccess())
 +                        exceptionCaught(future.channel(), future.cause());
 +                });
 +
 +                if (initiate.acceptVersions.min > accept.max)
 +                {
 +                    logger.info("peer {} only supports messaging versions higher ({}) than this node supports ({})", ctx.channel().remoteAddress(), initiate.acceptVersions.min, current_version);
 +                    failHandshake(ctx);
 +                }
 +                else if (initiate.acceptVersions.max < accept.min)
 +                {
 +                    logger.info("peer {} only supports messaging versions lower ({}) than this node supports ({})", ctx.channel().remoteAddress(), initiate.acceptVersions.max, minimum_version);
 +                    failHandshake(ctx);
 +                }
 +                else
 +                {
 +                    if (initiate.type.isStreaming())
 +                        setupStreamingPipeline(initiate.from, ctx);
 +                    else
 +                        setupMessagingPipeline(initiate.from, useMessagingVersion, initiate.acceptVersions.max, ctx.pipeline());
 +                }
 +            }
 +            else
 +            {
 +                int version = initiate.requestMessagingVersion;
 +                assert version < VERSION_40 && version >= settings.acceptMessaging.min;
 +                logger.trace("Connection version {} from {}", version, ctx.channel().remoteAddress());
 +
 +                if (initiate.type.isStreaming())
 +                {
 +                    // streaming connections are per-session and have a fixed version.  we can't do anything with a wrong-version stream connection, so drop it.
 +                    if (version != settings.acceptStreaming.max)
 +                    {
 +                        logger.warn("Received stream using protocol version {} (my version {}). Terminating connection", version, settings.acceptStreaming.max);
 +                        failHandshake(ctx);
 +                    }
 +                    setupStreamingPipeline(initiate.from, ctx);
 +                }
 +                else
 +                {
 +                    // if this version is < the MS version the other node is trying
 +                    // to connect with, the other node will disconnect
 +                    ByteBuf response = HandshakeProtocol.Accept.respondPre40(settings.acceptMessaging.max, ctx.alloc());
 +                    AsyncChannelPromise.writeAndFlush(ctx, response,
 +                          (ChannelFutureListener) future -> {
 +                               if (!future.isSuccess())
 +                                   exceptionCaught(future.channel(), future.cause());
 +                    });
 +
 +                    if (version < VERSION_30)
 +                        throw new IOException(String.format("Unable to read obsolete message version %s from %s; The earliest version supported is 3.0.0", version, ctx.channel().remoteAddress()));
 +
 +                    // we don't setup the messaging pipeline here, as the legacy messaging handshake requires one more message to finish
 +                }
 +            }
 +        }
 +
++        private boolean isEncryptionRequired(InetAddressAndPort peer)
++        {
++            return !settings.encryption.isExplicitlyOptional() && settings.encryption.shouldEncrypt(peer);
++        }
++
++        private boolean isChannelEncrypted(ChannelHandlerContext ctx)
++        {
++            return ctx.pipeline().get(SslHandler.class) != null;
++        }
++
 +        /**
 +         * Handles the third (and last) message in the internode messaging handshake protocol for pre40 nodes.
 +         * Grabs the protocol version and IP addr the peer wants to use.
 +         */
 +        @VisibleForTesting
 +        void confirmPre40(ChannelHandlerContext ctx, ByteBuf in)
 +        {
 +            confirmOutboundPre40 = HandshakeProtocol.ConfirmOutboundPre40.maybeDecode(in);
 +            if (confirmOutboundPre40 == null)
 +                return;
 +
 +            logger.trace("Received third handshake message from peer {}, message = {}", ctx.channel().remoteAddress(), confirmOutboundPre40);
 +            setupMessagingPipeline(confirmOutboundPre40.from, initiate.requestMessagingVersion, confirmOutboundPre40.maxMessagingVersion, ctx.pipeline());
 +        }
 +
 +        @Override
 +        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
 +        {
 +            exceptionCaught(ctx.channel(), cause);
 +        }
 +
 +        private void exceptionCaught(Channel channel, Throwable cause)
 +        {
 +            logger.error("Failed to properly handshake with peer {}. Closing the channel.", channel.remoteAddress(), cause);
 +            try
 +            {
 +                failHandshake(channel);
 +            }
 +            catch (Throwable t)
 +            {
 +                logger.error("Unexpected exception in {}.exceptionCaught", this.getClass().getSimpleName(), t);
 +            }
 +        }
 +
 +        private void failHandshake(ChannelHandlerContext ctx)
 +        {
 +            failHandshake(ctx.channel());
 +        }
 +
 +        private void failHandshake(Channel channel)
 +        {
 +            channel.close();
 +            if (handshakeTimeout != null)
 +                handshakeTimeout.cancel(true);
 +        }
 +
 +        private void setupStreamingPipeline(InetAddressAndPort from, ChannelHandlerContext ctx)
 +        {
 +            handshakeTimeout.cancel(true);
 +            assert initiate.framing == Framing.UNPROTECTED;
 +
 +            ChannelPipeline pipeline = ctx.pipeline();
 +            Channel channel = ctx.channel();
 +
 +            if (from == null)
 +            {
 +                InetSocketAddress address = (InetSocketAddress) channel.remoteAddress();
 +                from = InetAddressAndPort.getByAddressOverrideDefaults(address.getAddress(), address.getPort());
 +            }
 +
 +            BufferPools.forNetworking().setRecycleWhenFreeForCurrentThread(false);
 +            pipeline.replace(this, "streamInbound", new StreamingInboundHandler(from, current_version, null));
 +
 +            logger.info("{} streaming connection established, version = {}, framing = {}, encryption = {}",
 +                        SocketFactory.channelId(from,
 +                                                (InetSocketAddress) channel.remoteAddress(),
 +                                                settings.bindAddress,
 +                                                (InetSocketAddress) channel.localAddress(),
 +                                                ConnectionType.STREAMING,
 +                                                channel.id().asShortText()),
 +                        current_version,
 +                        initiate.framing,
 +                        SocketFactory.encryptionConnectionSummary(pipeline.channel()));
 +        }
 +
 +        @VisibleForTesting
 +        void setupMessagingPipeline(InetAddressAndPort from, int useMessagingVersion, int maxMessagingVersion, ChannelPipeline pipeline)
 +        {
 +            handshakeTimeout.cancel(true);
 +            // record the "true" endpoint, i.e. the one the peer is identified with, as opposed to the socket it connected over
 +            instance().versions.set(from, maxMessagingVersion);
 +
 +            BufferPools.forNetworking().setRecycleWhenFreeForCurrentThread(false);
 +            BufferPoolAllocator allocator = GlobalBufferPoolAllocator.instance;
 +            if (initiate.type == ConnectionType.LARGE_MESSAGES)
 +            {
 +                // for large messages, swap the global pool allocator for a local one, to optimise utilisation of chunks
 +                allocator = new LocalBufferPoolAllocator(pipeline.channel().eventLoop());
 +                pipeline.channel().config().setAllocator(allocator);
 +            }
 +
 +            FrameDecoder frameDecoder;
 +            switch (initiate.framing)
 +            {
 +                case LZ4:
 +                {
 +                    if (useMessagingVersion >= VERSION_40)
 +                        frameDecoder = FrameDecoderLZ4.fast(allocator);
 +                    else
 +                        frameDecoder = new FrameDecoderLegacyLZ4(allocator, useMessagingVersion);
 +                    break;
 +                }
 +                case CRC:
 +                {
 +                    if (useMessagingVersion >= VERSION_40)
 +                    {
 +                        frameDecoder = FrameDecoderCrc.create(allocator);
 +                        break;
 +                    }
 +                }
 +                case UNPROTECTED:
 +                {
 +                    if (useMessagingVersion >= VERSION_40)
 +                        frameDecoder = new FrameDecoderUnprotected(allocator);
 +                    else
 +                        frameDecoder = new FrameDecoderLegacy(allocator, useMessagingVersion);
 +                    break;
 +                }
 +                default:
 +                    throw new AssertionError();
 +            }
 +
 +            frameDecoder.addLastTo(pipeline);
 +
 +            InboundMessageHandler handler =
 +                settings.handlers.apply(from).createHandler(frameDecoder, initiate.type, pipeline.channel(), useMessagingVersion);
 +
 +            logger.info("{} messaging connection established, version = {}, framing = {}, encryption = {}",
 +                        handler.id(true),
 +                        useMessagingVersion,
 +                        initiate.framing,
 +                        SocketFactory.encryptionConnectionSummary(pipeline.channel()));
 +
 +            pipeline.addLast("deserialize", handler);
 +
 +            pipeline.remove(this);
 +        }
 +    }
 +
 +    private static SslHandler getSslHandler(String description, Channel channel, EncryptionOptions.ServerEncryptionOptions encryptionOptions) throws IOException
 +    {
 +        final boolean buildTrustStore = true;
 +        SslContext sslContext = SSLFactory.getOrCreateSslContext(encryptionOptions, buildTrustStore, SSLFactory.SocketType.SERVER);
 +        InetSocketAddress peer = encryptionOptions.require_endpoint_verification ? (InetSocketAddress) channel.remoteAddress() : null;
 +        SslHandler sslHandler = newSslHandler(channel, sslContext, peer);
 +        logger.trace("{} inbound netty SslContext: context={}, engine={}", description, sslContext.getClass().getName(), sslHandler.engine().getClass().getName());
 +        return sslHandler;
 +    }
 +
 +    private static class OptionalSslHandler extends ByteToMessageDecoder
 +    {
 +        private final EncryptionOptions.ServerEncryptionOptions encryptionOptions;
 +
 +        OptionalSslHandler(EncryptionOptions.ServerEncryptionOptions encryptionOptions)
 +        {
 +            this.encryptionOptions = encryptionOptions;
 +        }
 +
 +        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception
 +        {
 +            if (in.readableBytes() < 5)
 +            {
 +                // To detect if SSL must be used we need to have at least 5 bytes, so return here and try again
 +                // once more bytes a ready.
 +                return;
 +            }
 +
 +            if (SslHandler.isEncrypted(in))
 +            {
 +                // Connection uses SSL/TLS, replace the detection handler with a SslHandler and so use encryption.
 +                SslHandler sslHandler = getSslHandler("replacing optional", ctx.channel(), encryptionOptions);
 +                ctx.pipeline().replace(this, "ssl", sslHandler);
 +            }
 +            else
 +            {
 +                // Connection use no TLS/SSL encryption, just remove the detection handler and continue without
 +                // SslHandler in the pipeline.
 +                ctx.pipeline().remove(this);
 +            }
 +        }
 +    }
 +
 +    private static class RejectSslHandler extends ByteToMessageDecoder
 +    {
 +        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
 +        {
 +            if (in.readableBytes() < 5)
 +            {
 +                // To detect if SSL must be used we need to have at least 5 bytes, so return here and try again
 +                // once more bytes a ready.
 +                return;
 +            }
 +
 +            if (SslHandler.isEncrypted(in))
 +            {
 +                logger.info("Rejected incoming TLS connection before negotiating from {} to {}. TLS is explicitly disabled by configuration.",
 +                            ctx.channel().remoteAddress(), ctx.channel().localAddress());
 +                in.readBytes(in.readableBytes()); // discard the readable bytes so not called again
 +                ctx.close();
 +            }
 +            else
 +            {
 +                // Incoming connection did not attempt TLS/SSL encryption, just remove the detection handler and continue without
 +                // SslHandler in the pipeline.
 +                ctx.pipeline().remove(this);
 +            }
 +        }
 +    }
 +}
diff --cc src/java/org/apache/cassandra/net/InboundMessageHandlers.java
index 4466466,0000000..a706557
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/net/InboundMessageHandlers.java
+++ b/src/java/org/apache/cassandra/net/InboundMessageHandlers.java
@@@ -1,447 -1,0 +1,455 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.net;
 +
 +import java.util.Collection;
 +import java.util.concurrent.CopyOnWriteArrayList;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 +import java.util.function.Consumer;
 +import java.util.function.ToLongFunction;
 +
++import com.google.common.annotations.VisibleForTesting;
++
 +import io.netty.channel.Channel;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.metrics.InternodeInboundMetrics;
 +import org.apache.cassandra.net.Message.Header;
 +
 +import static java.util.concurrent.TimeUnit.NANOSECONDS;
 +import static org.apache.cassandra.utils.MonotonicClock.approxTime;
 +
 +/**
 + * An aggregation of {@link InboundMessageHandler}s for all connections from a peer.
 + *
 + * Manages metrics and shared resource limits. Can have multiple connections of a single
 + * type open simultaneousely (legacy in particular).
 + */
 +public final class InboundMessageHandlers
 +{
 +    private final InetAddressAndPort self;
 +    private final InetAddressAndPort peer;
 +
 +    private final int queueCapacity;
 +    private final ResourceLimits.Limit endpointReserveCapacity;
 +    private final ResourceLimits.Limit globalReserveCapacity;
 +
 +    private final InboundMessageHandler.WaitQueue endpointWaitQueue;
 +    private final InboundMessageHandler.WaitQueue globalWaitQueue;
 +
 +    private final InboundCounters urgentCounters = new InboundCounters();
 +    private final InboundCounters smallCounters  = new InboundCounters();
 +    private final InboundCounters largeCounters  = new InboundCounters();
 +    private final InboundCounters legacyCounters = new InboundCounters();
 +
 +    private final InboundMessageCallbacks urgentCallbacks;
 +    private final InboundMessageCallbacks smallCallbacks;
 +    private final InboundMessageCallbacks largeCallbacks;
 +    private final InboundMessageCallbacks legacyCallbacks;
 +
 +    private final InternodeInboundMetrics metrics;
 +    private final MessageConsumer messageConsumer;
 +
 +    private final HandlerProvider handlerProvider;
 +    private final Collection<InboundMessageHandler> handlers = new CopyOnWriteArrayList<>();
 +
 +    static class GlobalResourceLimits
 +    {
 +        final ResourceLimits.Limit reserveCapacity;
 +        final InboundMessageHandler.WaitQueue waitQueue;
 +
 +        GlobalResourceLimits(ResourceLimits.Limit reserveCapacity)
 +        {
 +            this.reserveCapacity = reserveCapacity;
 +            this.waitQueue = InboundMessageHandler.WaitQueue.global(reserveCapacity);
 +        }
 +    }
 +
 +    public interface MessageConsumer extends Consumer<Message<?>>
 +    {
 +        void fail(Message.Header header, Throwable failure);
 +    }
 +
 +    public interface GlobalMetricCallbacks
 +    {
 +        LatencyConsumer internodeLatencyRecorder(InetAddressAndPort to);
 +        void recordInternalLatency(Verb verb, long timeElapsed, TimeUnit timeUnit);
 +        void recordInternodeDroppedMessage(Verb verb, long timeElapsed, TimeUnit timeUnit);
 +    }
 +
 +    public InboundMessageHandlers(InetAddressAndPort self,
 +                                  InetAddressAndPort peer,
 +                                  int queueCapacity,
 +                                  long endpointReserveCapacity,
 +                                  GlobalResourceLimits globalResourceLimits,
 +                                  GlobalMetricCallbacks globalMetricCallbacks,
 +                                  MessageConsumer messageConsumer)
 +    {
 +        this(self, peer, queueCapacity, endpointReserveCapacity, globalResourceLimits, globalMetricCallbacks, messageConsumer, InboundMessageHandler::new);
 +    }
 +
 +    public InboundMessageHandlers(InetAddressAndPort self,
 +                                  InetAddressAndPort peer,
 +                                  int queueCapacity,
 +                                  long endpointReserveCapacity,
 +                                  GlobalResourceLimits globalResourceLimits,
 +                                  GlobalMetricCallbacks globalMetricCallbacks,
 +                                  MessageConsumer messageConsumer,
 +                                  HandlerProvider handlerProvider)
 +    {
 +        this.self = self;
 +        this.peer = peer;
 +
 +        this.queueCapacity = queueCapacity;
 +        this.endpointReserveCapacity = new ResourceLimits.Concurrent(endpointReserveCapacity);
 +        this.globalReserveCapacity = globalResourceLimits.reserveCapacity;
 +        this.endpointWaitQueue = InboundMessageHandler.WaitQueue.endpoint(this.endpointReserveCapacity);
 +        this.globalWaitQueue = globalResourceLimits.waitQueue;
 +        this.messageConsumer = messageConsumer;
 +
 +        this.handlerProvider = handlerProvider;
 +
 +        urgentCallbacks = makeMessageCallbacks(peer, urgentCounters, globalMetricCallbacks, messageConsumer);
 +        smallCallbacks  = makeMessageCallbacks(peer, smallCounters,  globalMetricCallbacks, messageConsumer);
 +        largeCallbacks  = makeMessageCallbacks(peer, largeCounters,  globalMetricCallbacks, messageConsumer);
 +        legacyCallbacks = makeMessageCallbacks(peer, legacyCounters, globalMetricCallbacks, messageConsumer);
 +
 +        metrics = new InternodeInboundMetrics(peer, this);
 +    }
 +
 +    InboundMessageHandler createHandler(FrameDecoder frameDecoder, ConnectionType type, Channel channel, int version)
 +    {
 +        InboundMessageHandler handler =
 +            handlerProvider.provide(frameDecoder,
 +
 +                                    type,
 +                                    channel,
 +                                    self,
 +                                    peer,
 +                                    version,
 +                                    OutboundConnections.LARGE_MESSAGE_THRESHOLD,
 +
 +                                    queueCapacity,
 +                                    endpointReserveCapacity,
 +                                    globalReserveCapacity,
 +                                    endpointWaitQueue,
 +                                    globalWaitQueue,
 +
 +                                    this::onHandlerClosed,
 +                                    callbacksFor(type),
 +                                    messageConsumer);
 +        handlers.add(handler);
 +        return handler;
 +    }
 +
 +    void releaseMetrics()
 +    {
 +        metrics.release();
 +    }
 +
 +    private void onHandlerClosed(AbstractMessageHandler handler)
 +    {
 +        assert handler instanceof InboundMessageHandler;
 +        handlers.remove(handler);
 +        absorbCounters((InboundMessageHandler)handler);
 +    }
 +
++    @VisibleForTesting
++    public int count()
++    {
++        return handlers.size();
++    }
++
 +    /*
 +     * Message callbacks
 +     */
 +
 +    private InboundMessageCallbacks callbacksFor(ConnectionType type)
 +    {
 +        switch (type)
 +        {
 +            case URGENT_MESSAGES: return urgentCallbacks;
 +            case  SMALL_MESSAGES: return smallCallbacks;
 +            case  LARGE_MESSAGES: return largeCallbacks;
 +            case LEGACY_MESSAGES: return legacyCallbacks;
 +        }
 +
 +        throw new IllegalArgumentException();
 +    }
 +
 +    private static InboundMessageCallbacks makeMessageCallbacks(InetAddressAndPort peer, InboundCounters counters, GlobalMetricCallbacks globalMetrics, MessageConsumer messageConsumer)
 +    {
 +        LatencyConsumer internodeLatency = globalMetrics.internodeLatencyRecorder(peer);
 +
 +        return new InboundMessageCallbacks()
 +        {
 +            @Override
 +            public void onHeaderArrived(int messageSize, Header header, long timeElapsed, TimeUnit unit)
 +            {
 +                // do not log latency if we are within error bars of zero
 +                if (timeElapsed > unit.convert(approxTime.error(), NANOSECONDS))
 +                    internodeLatency.accept(timeElapsed, unit);
 +            }
 +
 +            @Override
 +            public void onArrived(int messageSize, Header header, long timeElapsed, TimeUnit unit)
 +            {
 +            }
 +
 +            @Override
 +            public void onArrivedExpired(int messageSize, Header header, boolean wasCorrupt, long timeElapsed, TimeUnit unit)
 +            {
 +                counters.addExpired(messageSize);
 +
 +                globalMetrics.recordInternodeDroppedMessage(header.verb, timeElapsed, unit);
 +            }
 +
 +            @Override
 +            public void onArrivedCorrupt(int messageSize, Header header, long timeElapsed, TimeUnit unit)
 +            {
 +                counters.addError(messageSize);
 +
 +                messageConsumer.fail(header, new Crc.InvalidCrc(0, 0)); // could use one of the original exceptions?
 +            }
 +
 +            @Override
 +            public void onClosedBeforeArrival(int messageSize, Header header, int bytesReceived, boolean wasCorrupt, boolean wasExpired)
 +            {
 +                counters.addError(messageSize);
 +
 +                messageConsumer.fail(header, new InvalidSerializedSizeException(header.verb, messageSize, bytesReceived));
 +            }
 +
 +            @Override
 +            public void onExpired(int messageSize, Header header, long timeElapsed, TimeUnit unit)
 +            {
 +                counters.addExpired(messageSize);
 +
 +                globalMetrics.recordInternodeDroppedMessage(header.verb, timeElapsed, unit);
 +            }
 +
 +            @Override
 +            public void onFailedDeserialize(int messageSize, Header header, Throwable t)
 +            {
 +                counters.addError(messageSize);
 +
 +                /*
 +                 * If an exception is caught during deser, return a failure response immediately
 +                 * instead of waiting for the callback on the other end to expire.
 +                 */
 +                messageConsumer.fail(header, t);
 +            }
 +
 +            @Override
 +            public void onDispatched(int messageSize, Header header)
 +            {
 +                counters.addPending(messageSize);
 +            }
 +
 +            @Override
 +            public void onExecuting(int messageSize, Header header, long timeElapsed, TimeUnit unit)
 +            {
 +                globalMetrics.recordInternalLatency(header.verb, timeElapsed, unit);
 +            }
 +
 +            @Override
 +            public void onExecuted(int messageSize, Header header, long timeElapsed, TimeUnit unit)
 +            {
 +                counters.removePending(messageSize);
 +            }
 +
 +            @Override
 +            public void onProcessed(int messageSize, Header header)
 +            {
 +                counters.addProcessed(messageSize);
 +            }
 +        };
 +    }
 +
 +    /*
 +     * Aggregated counters
 +     */
 +
 +    InboundCounters countersFor(ConnectionType type)
 +    {
 +        switch (type)
 +        {
 +            case URGENT_MESSAGES: return urgentCounters;
 +            case  SMALL_MESSAGES: return smallCounters;
 +            case  LARGE_MESSAGES: return largeCounters;
 +            case LEGACY_MESSAGES: return legacyCounters;
 +        }
 +
 +        throw new IllegalArgumentException();
 +    }
 +
 +    public long receivedCount()
 +    {
 +        return sumHandlers(h -> h.receivedCount) + closedReceivedCount;
 +    }
 +
 +    public long receivedBytes()
 +    {
 +        return sumHandlers(h -> h.receivedBytes) + closedReceivedBytes;
 +    }
 +
 +    public long throttledCount()
 +    {
 +        return sumHandlers(h -> h.throttledCount) + closedThrottledCount;
 +    }
 +
 +    public long throttledNanos()
 +    {
 +        return sumHandlers(h -> h.throttledNanos) + closedThrottledNanos;
 +    }
 +
 +    public long usingCapacity()
 +    {
 +        return sumHandlers(h -> h.queueSize);
 +    }
 +
 +    public long usingEndpointReserveCapacity()
 +    {
 +        return endpointReserveCapacity.using();
 +    }
 +
 +    public long corruptFramesRecovered()
 +    {
 +        return sumHandlers(h -> h.corruptFramesRecovered) + closedCorruptFramesRecovered;
 +    }
 +
 +    public long corruptFramesUnrecovered()
 +    {
 +        return sumHandlers(h -> h.corruptFramesUnrecovered) + closedCorruptFramesUnrecovered;
 +    }
 +
 +    public long errorCount()
 +    {
 +        return sumCounters(InboundCounters::errorCount);
 +    }
 +
 +    public long errorBytes()
 +    {
 +        return sumCounters(InboundCounters::errorBytes);
 +    }
 +
 +    public long expiredCount()
 +    {
 +        return sumCounters(InboundCounters::expiredCount);
 +    }
 +
 +    public long expiredBytes()
 +    {
 +        return sumCounters(InboundCounters::expiredBytes);
 +    }
 +
 +    public long processedCount()
 +    {
 +        return sumCounters(InboundCounters::processedCount);
 +    }
 +
 +    public long processedBytes()
 +    {
 +        return sumCounters(InboundCounters::processedBytes);
 +    }
 +
 +    public long scheduledCount()
 +    {
 +        return sumCounters(InboundCounters::scheduledCount);
 +    }
 +
 +    public long scheduledBytes()
 +    {
 +        return sumCounters(InboundCounters::scheduledBytes);
 +    }
 +
 +    /*
 +     * 'Archived' counter values, combined for all connections that have been closed.
 +     */
 +
 +    private volatile long closedReceivedCount, closedReceivedBytes;
 +
 +    private static final AtomicLongFieldUpdater<InboundMessageHandlers> closedReceivedCountUpdater =
 +        AtomicLongFieldUpdater.newUpdater(InboundMessageHandlers.class, "closedReceivedCount");
 +    private static final AtomicLongFieldUpdater<InboundMessageHandlers> closedReceivedBytesUpdater =
 +        AtomicLongFieldUpdater.newUpdater(InboundMessageHandlers.class, "closedReceivedBytes");
 +
 +    private volatile long closedThrottledCount, closedThrottledNanos;
 +
 +    private static final AtomicLongFieldUpdater<InboundMessageHandlers> closedThrottledCountUpdater =
 +        AtomicLongFieldUpdater.newUpdater(InboundMessageHandlers.class, "closedThrottledCount");
 +    private static final AtomicLongFieldUpdater<InboundMessageHandlers> closedThrottledNanosUpdater =
 +        AtomicLongFieldUpdater.newUpdater(InboundMessageHandlers.class, "closedThrottledNanos");
 +
 +    private volatile long closedCorruptFramesRecovered, closedCorruptFramesUnrecovered;
 +
 +    private static final AtomicLongFieldUpdater<InboundMessageHandlers> closedCorruptFramesRecoveredUpdater =
 +        AtomicLongFieldUpdater.newUpdater(InboundMessageHandlers.class, "closedCorruptFramesRecovered");
 +    private static final AtomicLongFieldUpdater<InboundMessageHandlers> closedCorruptFramesUnrecoveredUpdater =
 +        AtomicLongFieldUpdater.newUpdater(InboundMessageHandlers.class, "closedCorruptFramesUnrecovered");
 +
 +    private void absorbCounters(InboundMessageHandler handler)
 +    {
 +        closedReceivedCountUpdater.addAndGet(this, handler.receivedCount);
 +        closedReceivedBytesUpdater.addAndGet(this, handler.receivedBytes);
 +
 +        closedThrottledCountUpdater.addAndGet(this, handler.throttledCount);
 +        closedThrottledNanosUpdater.addAndGet(this, handler.throttledNanos);
 +
 +        closedCorruptFramesRecoveredUpdater.addAndGet(this, handler.corruptFramesRecovered);
 +        closedCorruptFramesUnrecoveredUpdater.addAndGet(this, handler.corruptFramesUnrecovered);
 +    }
 +
 +    private long sumHandlers(ToLongFunction<InboundMessageHandler> counter)
 +    {
 +        long sum = 0L;
 +        for (InboundMessageHandler h : handlers)
 +            sum += counter.applyAsLong(h);
 +        return sum;
 +    }
 +
 +    private long sumCounters(ToLongFunction<InboundCounters> mapping)
 +    {
 +        return mapping.applyAsLong(urgentCounters)
 +             + mapping.applyAsLong(smallCounters)
 +             + mapping.applyAsLong(largeCounters)
 +             + mapping.applyAsLong(legacyCounters);
 +    }
 +
 +    interface HandlerProvider
 +    {
 +        InboundMessageHandler provide(FrameDecoder decoder,
 +
 +                                      ConnectionType type,
 +                                      Channel channel,
 +                                      InetAddressAndPort self,
 +                                      InetAddressAndPort peer,
 +                                      int version,
 +                                      int largeMessageThreshold,
 +
 +                                      int queueCapacity,
 +                                      ResourceLimits.Limit endpointReserveCapacity,
 +                                      ResourceLimits.Limit globalReserveCapacity,
 +                                      InboundMessageHandler.WaitQueue endpointWaitQueue,
 +                                      InboundMessageHandler.WaitQueue globalWaitQueue,
 +
 +                                      InboundMessageHandler.OnHandlerClosed onClosed,
 +                                      InboundMessageCallbacks callbacks,
 +                                      Consumer<Message<?>> consumer);
 +    }
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/InternodeEncryptionEnforcementTest.java
index 0000000,5c0e3b3..86a7c99
mode 000000,100644..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/InternodeEncryptionEnforcementTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/InternodeEncryptionEnforcementTest.java
@@@ -1,0 -1,137 +1,134 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.distributed.test;
+ 
+ import java.util.HashMap;
 -import java.util.List;
+ 
+ import com.google.common.collect.ImmutableMap;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.api.Feature;
+ import org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableRunnable;
+ import org.apache.cassandra.distributed.shared.NetworkTopology;
++import org.apache.cassandra.net.InboundMessageHandlers;
+ import org.apache.cassandra.net.MessagingService;
++import org.apache.cassandra.net.OutboundConnections;
+ 
+ import static com.google.common.collect.Iterables.getOnlyElement;
+ import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
+ 
+ public final class InternodeEncryptionEnforcementTest extends TestBaseImpl
+ {
+     @Test
+     public void testConnectionsAreRejectedWithInvalidConfig() throws Throwable
+     {
+         Cluster.Builder builder = builder()
+             .withNodes(2)
+             .withConfig(c ->
+             {
+                 c.with(Feature.NETWORK);
+                 c.with(Feature.NATIVE_PROTOCOL);
+ 
+                 if (c.num() == 1)
+                 {
+                     HashMap<String, Object> encryption = new HashMap<>();
+                     encryption.put("keystore", "test/conf/cassandra_ssl_test.keystore");
+                     encryption.put("keystore_password", "cassandra");
+                     encryption.put("truststore", "test/conf/cassandra_ssl_test.truststore");
+                     encryption.put("truststore_password", "cassandra");
+                     encryption.put("internode_encryption", "dc");
+                     c.set("server_encryption_options", encryption);
+                 }
+             })
+             .withNodeIdTopology(ImmutableMap.of(1, NetworkTopology.dcAndRack("dc1", "r1a"),
+                                                 2, NetworkTopology.dcAndRack("dc2", "r2a")));
+ 
+         try (Cluster cluster = builder.start())
+         {
+             /*
+              * instance (1) won't connect to (2), since (2) won't have a TLS listener;
+              * instance (2) won't connect to (1), since inbound check will reject
+              * the unencrypted connection attempt;
+              *
+              * without the patch, instance (2) *CAN* connect to (1), without encryption,
+              * despite being in a different dc.
+              */
+ 
+             cluster.get(1).runOnInstance(() ->
+             {
 -                List<MessagingService.SocketThread> threads = MessagingService.instance().getSocketThreads();
 -                assertEquals(2, threads.size());
++                InboundMessageHandlers inbound = getOnlyElement(MessagingService.instance().messageHandlers.values());
++                assertEquals(0, inbound.count());
+ 
 -                for (MessagingService.SocketThread thread : threads)
 -                {
 -                    assertEquals(0, thread.connections.size());
 -                }
++                OutboundConnections outbound = getOnlyElement(MessagingService.instance().channelManagers.values());
++                assertFalse(outbound.small.isConnected() || outbound.large.isConnected() || outbound.urgent.isConnected());
+             });
+ 
+             cluster.get(2).runOnInstance(() ->
+             {
 -                List<MessagingService.SocketThread> threads = MessagingService.instance().getSocketThreads();
 -                assertEquals(1, threads.size());
 -                assertTrue(getOnlyElement(threads).connections.isEmpty());
++                assertTrue(MessagingService.instance().messageHandlers.isEmpty());
++
++                OutboundConnections outbound = getOnlyElement(MessagingService.instance().channelManagers.values());
++                assertFalse(outbound.small.isConnected() || outbound.large.isConnected() || outbound.urgent.isConnected());
+             });
+         }
+     }
+ 
+     @Test
+     public void testConnectionsAreAcceptedWithValidConfig() throws Throwable
+     {
+         Cluster.Builder builder = builder()
+             .withNodes(2)
+             .withConfig(c ->
+             {
+                 c.with(Feature.NETWORK);
+                 c.with(Feature.NATIVE_PROTOCOL);
+ 
 -                HashMap<String, Object> encryption = new HashMap<>();
 -                encryption.put("keystore", "test/conf/cassandra_ssl_test.keystore");
++                HashMap<String, Object> encryption = new HashMap<>(); encryption.put("keystore", "test/conf/cassandra_ssl_test.keystore");
+                 encryption.put("keystore_password", "cassandra");
+                 encryption.put("truststore", "test/conf/cassandra_ssl_test.truststore");
+                 encryption.put("truststore_password", "cassandra");
+                 encryption.put("internode_encryption", "dc");
+                 c.set("server_encryption_options", encryption);
+             })
+             .withNodeIdTopology(ImmutableMap.of(1, NetworkTopology.dcAndRack("dc1", "r1a"),
+                                                 2, NetworkTopology.dcAndRack("dc2", "r2a")));
+ 
+         try (Cluster cluster = builder.start())
+         {
+             /*
+              * instance (1) should connect to instance (2) without any issues;
+              * instance (2) should connect to instance (1) without any issues.
+              */
+ 
+             SerializableRunnable runnable = () ->
+             {
 -                List<MessagingService.SocketThread> threads = MessagingService.instance().getSocketThreads();
 -                assertEquals(2, threads.size());
 -
 -                MessagingService.SocketThread sslThread = threads.get(0);
 -                assertEquals(1, sslThread.connections.size());
++                InboundMessageHandlers inbound = getOnlyElement(MessagingService.instance().messageHandlers.values());
++                assertTrue(inbound.count() > 0);
+ 
 -                MessagingService.SocketThread plainThread = threads.get(1);
 -                assertEquals(0, plainThread.connections.size());
++                OutboundConnections outbound = getOnlyElement(MessagingService.instance().channelManagers.values());
++                assertTrue(outbound.small.isConnected() || outbound.large.isConnected() || outbound.urgent.isConnected());
+             };
+ 
+             cluster.get(1).runOnInstance(runnable);
+             cluster.get(2).runOnInstance(runnable);
+         }
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org