You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2017/08/22 20:55:15 UTC

[08/11] cassandra git commit: switch internode messaging to netty

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/NettyFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/NettyFactory.java b/src/java/org/apache/cassandra/net/async/NettyFactory.java
new file mode 100644
index 0000000..13d8810
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/NettyFactory.java
@@ -0,0 +1,375 @@
+package org.apache.cassandra.net.async;
+
+import java.net.InetSocketAddress;
+import java.util.zip.Checksum;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLParameters;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.ServerChannel;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.compression.Lz4FrameDecoder;
+import io.netty.handler.codec.compression.Lz4FrameEncoder;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.handler.ssl.OpenSsl;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.DefaultEventExecutor;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+import io.netty.util.internal.logging.Slf4JLoggerFactory;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.xxhash.XXHashFactory;
+import org.apache.cassandra.auth.IInternodeAuthenticator;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions.InternodeEncryption;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.security.SSLFactory;
+import org.apache.cassandra.service.NativeTransportService;
+import org.apache.cassandra.utils.ChecksumType;
+import org.apache.cassandra.utils.CoalescingStrategies;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * A factory for building Netty {@link Channel}s. Channels here are setup with a pipeline to participate
+ * in the internode protocol handshake, either the inbound or outbound side as per the method invoked.
+ */
+public final class NettyFactory
+{
+    private static final Logger logger = LoggerFactory.getLogger(NettyFactory.class);
+
+    /**
+     * The block size for use with netty's lz4 code.
+     */
+    private static final int COMPRESSION_BLOCK_SIZE = 1 << 16;
+
+    private static final int LZ4_HASH_SEED = 0x9747b28c;
+
+    public enum Mode { MESSAGING, STREAMING }
+
+    private static final String SSL_CHANNEL_HANDLER_NAME = "ssl";
+    static final String INBOUND_COMPRESSOR_HANDLER_NAME = "inboundCompressor";
+    static final String OUTBOUND_COMPRESSOR_HANDLER_NAME = "outboundCompressor";
+    private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
+
+    /** a useful addition for debugging; simply set to true to get more data in your logs */
+    private static final boolean WIRETRACE = false;
+    static
+    {
+        if (WIRETRACE)
+            InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE);
+    }
+
+    private static final boolean DEFAULT_USE_EPOLL = NativeTransportService.useEpoll();
+    static
+    {
+        if (!DEFAULT_USE_EPOLL)
+            logger.warn("epoll not availble {}", Epoll.unavailabilityCause());
+    }
+
+    /**
+     * The size of the receive queue for the outbound channels. As outbound channels do not receive data
+     * (outside of the internode messaging protocol's handshake), this value can be relatively small.
+     */
+    private static final int OUTBOUND_CHANNEL_RECEIVE_BUFFER_SIZE = 1 << 10;
+
+    /**
+     * The size of the send queue for the inbound channels. As inbound channels do not send data
+     * (outside of the internode messaging protocol's handshake), this value can be relatively small.
+     */
+    private static final int INBOUND_CHANNEL_SEND_BUFFER_SIZE = 1 << 10;
+
+    /**
+     * A factory instance that all normal, runtime code should use. Separate instances should only be used for testing.
+     */
+    public static final NettyFactory instance = new NettyFactory(DEFAULT_USE_EPOLL);
+
+    private final boolean useEpoll;
+    private final EventLoopGroup acceptGroup;
+
+    private final EventLoopGroup inboundGroup;
+    private final EventLoopGroup outboundGroup;
+
+    /**
+     * Constructor that allows modifying the {@link NettyFactory#useEpoll} for testing purposes. Otherwise, use the
+     * default {@link #instance}.
+     */
+    @VisibleForTesting
+    NettyFactory(boolean useEpoll)
+    {
+        this.useEpoll = useEpoll;
+        acceptGroup = getEventLoopGroup(useEpoll, determineAcceptGroupSize(DatabaseDescriptor.getServerEncryptionOptions().internode_encryption),
+                                        "MessagingService-NettyAcceptor-Threads", false);
+        inboundGroup = getEventLoopGroup(useEpoll, FBUtilities.getAvailableProcessors(), "MessagingService-NettyInbound-Threads", false);
+        outboundGroup = getEventLoopGroup(useEpoll, FBUtilities.getAvailableProcessors(), "MessagingService-NettyOutbound-Threads", true);
+    }
+
+    /**
+     * Determine the number of accept threads we need, which is based upon the number of listening sockets we will have.
+     * We'll have either 1 or 2 listen sockets, depending on if we use SSL or not in combination with non-SSL. If we have both,
+     * we'll have two sockets, and thus need two threads; else one socket and one thread.
+     *
+     * If the operator has configured multiple IP addresses (both {@link org.apache.cassandra.config.Config#broadcast_address}
+     * and {@link org.apache.cassandra.config.Config#listen_address} are configured), then we listen on another set of sockets
+     * - basically doubling the count. See CASSANDRA-9748 for more details.
+     */
+    static int determineAcceptGroupSize(InternodeEncryption internode_encryption)
+    {
+        int listenSocketCount = internode_encryption == InternodeEncryption.dc || internode_encryption == InternodeEncryption.rack ? 2 : 1;
+
+        if (MessagingService.shouldListenOnBroadcastAddress())
+            listenSocketCount *= 2;
+
+        return listenSocketCount;
+    }
+
+    /**
+     * Create an {@link EventLoopGroup}, for epoll or nio. The {@code boostIoRatio} flag passes a hint to the netty
+     * event loop threads to optimize comsuming all the tasks from the netty channel before checking for IO activity.
+     * By default, netty will process some maximum number of tasks off it's queue before it will check for activity on
+     * any of the open FDs, which basically amounts to checking for any incoming data. If you have a class of event loops
+     * that that do almost *no* inbound activity (like cassandra's outbound channels), then it behooves us to have the
+     * outbound netty channel consume as many tasks as it can before making the system calls to check up on the FDs,
+     * as we're not expecting any incoming data on those sockets, anyways. Thus, we pass the magic value {@code 100}
+     * to achieve the maximum consuption from the netty queue. (for implementation details, as of netty 4.1.8,
+     * see {@link io.netty.channel.epoll.EpollEventLoop#run()}.
+     */
+    static EventLoopGroup getEventLoopGroup(boolean useEpoll, int threadCount, String threadNamePrefix, boolean boostIoRatio)
+    {
+        if (useEpoll)
+        {
+            logger.debug("using netty epoll event loop for pool prefix {}", threadNamePrefix);
+            EpollEventLoopGroup eventLoopGroup = new EpollEventLoopGroup(threadCount, new DefaultThreadFactory(threadNamePrefix));
+            if (boostIoRatio)
+                eventLoopGroup.setIoRatio(100);
+            return eventLoopGroup;
+        }
+
+        logger.debug("using netty nio event loop for pool prefix {}", threadNamePrefix);
+        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(threadCount, new DefaultThreadFactory(threadNamePrefix));
+        if (boostIoRatio)
+            eventLoopGroup.setIoRatio(100);
+        return eventLoopGroup;
+    }
+
+    /**
+     * Create a {@link Channel} that listens on the {@code localAddr}. This method will block while trying to bind to the address,
+     * but it does not make a remote call.
+     */
+    public Channel createInboundChannel(InetSocketAddress localAddr, InboundInitializer initializer, int receiveBufferSize) throws ConfigurationException
+    {
+        String nic = FBUtilities.getNetworkInterface(localAddr.getAddress());
+        logger.info("Starting Messaging Service on {} {}, encryption: {}",
+                    localAddr, nic == null ? "" : String.format(" (%s)", nic), encryptionLogStatement(initializer.encryptionOptions));
+        Class<? extends ServerChannel> transport = useEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class;
+        ServerBootstrap bootstrap = new ServerBootstrap().group(acceptGroup, inboundGroup)
+                                                         .channel(transport)
+                                                         .option(ChannelOption.SO_BACKLOG, 500)
+                                                         .childOption(ChannelOption.SO_KEEPALIVE, true)
+                                                         .childOption(ChannelOption.TCP_NODELAY, true)
+                                                         .childOption(ChannelOption.SO_REUSEADDR, true)
+                                                         .childOption(ChannelOption.SO_SNDBUF, INBOUND_CHANNEL_SEND_BUFFER_SIZE)
+                                                         .childHandler(initializer);
+
+        if (receiveBufferSize > 0)
+            bootstrap.childOption(ChannelOption.SO_RCVBUF, receiveBufferSize);
+
+        ChannelFuture channelFuture = bootstrap.bind(localAddr);
+
+        if (!channelFuture.awaitUninterruptibly().isSuccess())
+        {
+            if (channelFuture.channel().isOpen())
+                channelFuture.channel().close();
+
+            Throwable failedChannelCause = channelFuture.cause();
+
+            String causeString = "";
+            if (failedChannelCause != null && failedChannelCause.getMessage() != null)
+                causeString = failedChannelCause.getMessage();
+
+            if (causeString.contains("in use"))
+            {
+                throw new ConfigurationException(localAddr + " is in use by another process.  Change listen_address:storage_port " +
+                                                 "in cassandra.yaml to values that do not conflict with other services");
+            }
+            // looking at the jdk source, solaris/windows bind failue messages both use the phrase "cannot assign requested address".
+            // windows message uses "Cannot" (with a capital 'C'), and solaris (a/k/a *nux) doe not. hence we search for "annot" <sigh>
+            else if (causeString.contains("annot assign requested address"))
+            {
+                throw new ConfigurationException("Unable to bind to address " + localAddr
+                                                 + ". Set listen_address in cassandra.yaml to an interface you can bind to, e.g., your private IP address on EC2");
+            }
+            else
+            {
+                throw new ConfigurationException("failed to bind to: " + localAddr, failedChannelCause);
+            }
+        }
+
+        return channelFuture.channel();
+    }
+
+    public static class InboundInitializer extends ChannelInitializer<SocketChannel>
+    {
+        private final IInternodeAuthenticator authenticator;
+        private final ServerEncryptionOptions encryptionOptions;
+        private final ChannelGroup channelGroup;
+
+        public InboundInitializer(IInternodeAuthenticator authenticator, ServerEncryptionOptions encryptionOptions, ChannelGroup channelGroup)
+        {
+            this.authenticator = authenticator;
+            this.encryptionOptions = encryptionOptions;
+            this.channelGroup = channelGroup;
+        }
+
+        @Override
+        public void initChannel(SocketChannel channel) throws Exception
+        {
+            channelGroup.add(channel);
+            ChannelPipeline pipeline = channel.pipeline();
+
+            // order of handlers: ssl -> logger -> handshakeHandler
+            if (encryptionOptions != null)
+            {
+                SslContext sslContext = SSLFactory.getSslContext(encryptionOptions, true, true);
+                SslHandler sslHandler = sslContext.newHandler(channel.alloc());
+                logger.trace("creating inbound netty SslContext: context={}, engine={}", sslContext.getClass().getName(), sslHandler.engine().getClass().getName());
+                pipeline.addFirst(SSL_CHANNEL_HANDLER_NAME, sslHandler);            }
+
+            if (WIRETRACE)
+                pipeline.addLast("logger", new LoggingHandler(LogLevel.INFO));
+
+            channel.pipeline().addLast(HANDSHAKE_HANDLER_NAME, new InboundHandshakeHandler(authenticator));
+        }
+    }
+
+    private String encryptionLogStatement(ServerEncryptionOptions options)
+    {
+        if (options == null)
+            return "disabled";
+
+        String encryptionType = OpenSsl.isAvailable() ? "openssl" : "jdk";
+        return "enabled (" + encryptionType + ')';
+    }
+
+    /**
+     * Create the {@link Bootstrap} for connecting to a remote peer. This method does <b>not</b> attempt to connect to the peer,
+     * and thus does not block.
+     */
+    public Bootstrap createOutboundBootstrap(OutboundConnectionParams params)
+    {
+        logger.debug("creating outbound bootstrap to peer {}, compression: {}, encryption: {}, coalesce: {}", params.connectionId.connectionAddress(),
+                     params.compress, encryptionLogStatement(params.encryptionOptions),
+                     params.coalescingStrategy.isPresent() ? params.coalescingStrategy.get() : CoalescingStrategies.Strategy.DISABLED);
+        Class<? extends Channel>  transport = useEpoll ? EpollSocketChannel.class : NioSocketChannel.class;
+        Bootstrap bootstrap = new Bootstrap().group(outboundGroup)
+                              .channel(transport)
+                              .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000)
+                              .option(ChannelOption.SO_KEEPALIVE, true)
+                              .option(ChannelOption.SO_REUSEADDR, true)
+                              .option(ChannelOption.SO_SNDBUF, params.sendBufferSize)
+                              .option(ChannelOption.SO_RCVBUF, OUTBOUND_CHANNEL_RECEIVE_BUFFER_SIZE)
+                              .option(ChannelOption.TCP_NODELAY, params.tcpNoDelay)
+                              .option(ChannelOption.WRITE_BUFFER_WATER_MARK, params.waterMark)
+                              .handler(new OutboundInitializer(params));
+        bootstrap.localAddress(params.connectionId.local(), 0);
+        bootstrap.remoteAddress(params.connectionId.connectionAddress());
+        return bootstrap;
+    }
+
+    public static class OutboundInitializer extends ChannelInitializer<SocketChannel>
+    {
+        private final OutboundConnectionParams params;
+
+        OutboundInitializer(OutboundConnectionParams params)
+        {
+            this.params = params;
+        }
+
+        public void initChannel(SocketChannel channel) throws Exception
+        {
+            ChannelPipeline pipeline = channel.pipeline();
+
+            // order of handlers: ssl -> logger -> handshakeHandler
+            if (params.encryptionOptions != null)
+            {
+                SslContext sslContext = SSLFactory.getSslContext(params.encryptionOptions, true, false);
+
+                final SslHandler sslHandler;
+                if (params.encryptionOptions.require_endpoint_verification)
+                {
+                    InetSocketAddress peer = params.connectionId.remoteAddress();
+                    sslHandler = sslContext.newHandler(channel.alloc(), peer.getHostString(), peer.getPort());
+                    SSLEngine engine = sslHandler.engine();
+                    SSLParameters sslParameters = engine.getSSLParameters();
+                    sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
+                    engine.setSSLParameters(sslParameters);
+                }
+                else
+                {
+                    sslHandler = sslContext.newHandler(channel.alloc());
+                }
+
+                logger.trace("creating outbound netty SslContext: context={}, engine={}", sslContext.getClass().getName(), sslHandler.engine().getClass().getName());
+                pipeline.addFirst(SSL_CHANNEL_HANDLER_NAME, sslHandler);
+            }
+
+            if (NettyFactory.WIRETRACE)
+                pipeline.addLast("logger", new LoggingHandler(LogLevel.INFO));
+
+            pipeline.addLast(HANDSHAKE_HANDLER_NAME, new OutboundHandshakeHandler(params));
+        }
+    }
+
+    public void close()
+    {
+        acceptGroup.shutdownGracefully();
+        outboundGroup.shutdownGracefully();
+        inboundGroup.shutdownGracefully();
+    }
+
+    static Lz4FrameEncoder createLz4Encoder(int protocolVersion)
+    {
+        return new Lz4FrameEncoder(LZ4Factory.fastestInstance(), false, COMPRESSION_BLOCK_SIZE, checksumForFrameEncoders(protocolVersion));
+    }
+
+    private static Checksum checksumForFrameEncoders(int protocolVersion)
+    {
+        if (protocolVersion >= MessagingService.current_version)
+            return ChecksumType.CRC32.newInstance();
+        return XXHashFactory.fastestInstance().newStreamingHash32(LZ4_HASH_SEED).asChecksum();
+    }
+
+    static Lz4FrameDecoder createLz4Decoder(int protocolVersion)
+    {
+        return new Lz4FrameDecoder(LZ4Factory.fastestInstance(), checksumForFrameEncoders(protocolVersion));
+    }
+
+    public static EventExecutor executorForChannelGroups()
+    {
+        return new DefaultEventExecutor();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
new file mode 100644
index 0000000..24dc5ff
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+/**
+ * Identifies an outbound messaging connection.
+ *
+ * This mainly hold the remote address and the type (small/large messages or gossip) of connection used, but with the
+ * additional detail that in some case (typically public EC2 address across regions) the address to which we connect
+ * to the remote is different from the address by which the node is known by the rest of the C*.
+ */
+public class OutboundConnectionIdentifier
+{
+    enum ConnectionType
+    {
+        GOSSIP, LARGE_MESSAGE, SMALL_MESSAGE
+    }
+
+    /**
+     * Memoization of the local node's broadcast address.
+     */
+    private final InetSocketAddress localAddr;
+
+    /**
+     * The address by which the remote is identified. This may be different from {@link #remoteConnectionAddr} for
+     * something like EC2 public IP address which need to be used for communication between EC2 regions.
+     */
+    private final InetSocketAddress remoteAddr;
+
+    /**
+     * The address to which we're connecting to the node (often the same as {@link #remoteAddr} but not always).
+     */
+    private final InetSocketAddress remoteConnectionAddr;
+
+    private final ConnectionType connectionType;
+
+    private OutboundConnectionIdentifier(InetSocketAddress localAddr,
+                                         InetSocketAddress remoteAddr,
+                                         InetSocketAddress remoteConnectionAddr,
+                                         ConnectionType connectionType)
+    {
+        this.localAddr = localAddr;
+        this.remoteAddr = remoteAddr;
+        this.remoteConnectionAddr = remoteConnectionAddr;
+        this.connectionType = connectionType;
+    }
+
+    private OutboundConnectionIdentifier(InetSocketAddress localAddr,
+                                         InetSocketAddress remoteAddr,
+                                         ConnectionType connectionType)
+    {
+        this(localAddr, remoteAddr, remoteAddr, connectionType);
+    }
+
+    /**
+     * Creates an identifier for a small message connection and using the remote "identifying" address as its connection
+     * address.
+     */
+    public static OutboundConnectionIdentifier small(InetSocketAddress localAddr, InetSocketAddress remoteAddr)
+    {
+        return new OutboundConnectionIdentifier(localAddr, remoteAddr, ConnectionType.SMALL_MESSAGE);
+    }
+
+    /**
+     * Creates an identifier for a large message connection and using the remote "identifying" address as its connection
+     * address.
+     */
+    public static OutboundConnectionIdentifier large(InetSocketAddress localAddr, InetSocketAddress remoteAddr)
+    {
+        return new OutboundConnectionIdentifier(localAddr, remoteAddr, ConnectionType.LARGE_MESSAGE);
+    }
+
+    /**
+     * Creates an identifier for a gossip connection and using the remote "identifying" address as its connection
+     * address.
+     */
+    public static OutboundConnectionIdentifier gossip(InetSocketAddress localAddr, InetSocketAddress remoteAddr)
+    {
+        return new OutboundConnectionIdentifier(localAddr, remoteAddr, ConnectionType.GOSSIP);
+    }
+
+    /**
+     * Returns a newly created connection identifier to the same remote that this identifier, but using the provided
+     * address as connection address.
+     *
+     * @param remoteConnectionAddr the address to use for connection to the remote in the new identifier.
+     * @return a newly created connection identifier that differs from this one only by using {@code remoteConnectionAddr}
+     * as connection address to the remote.
+     */
+    OutboundConnectionIdentifier withNewConnectionAddress(InetSocketAddress remoteConnectionAddr)
+    {
+        return new OutboundConnectionIdentifier(localAddr, remoteAddr, remoteConnectionAddr, connectionType);
+    }
+
+    /**
+     * The local node address.
+     */
+    InetAddress local()
+    {
+        return localAddr.getAddress();
+    }
+
+    /**
+     * The remote node identifying address (the one to use for anything else than connecting to the node).
+     */
+    InetSocketAddress remoteAddress()
+    {
+        return remoteAddr;
+    }
+
+    /**
+     * The remote node identifying address (the one to use for anything else than connecting to the node).
+     */
+    InetAddress remote()
+    {
+        return remoteAddr.getAddress();
+    }
+
+    /**
+     * The remote node connection address (the one to use to actually connect to the remote, and only that).
+     */
+    InetSocketAddress connectionAddress()
+    {
+        return remoteConnectionAddr;
+    }
+
+    /**
+     * The type of this connection.
+     */
+    ConnectionType type()
+    {
+        return connectionType;
+    }
+
+    @Override
+    public String toString()
+    {
+        return remoteAddr.equals(remoteConnectionAddr)
+               ? String.format("%s (%s)", remoteAddr, connectionType)
+               : String.format("%s on %s (%s)", remoteAddr, remoteConnectionAddr, connectionType);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/OutboundConnectionParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OutboundConnectionParams.java b/src/java/org/apache/cassandra/net/async/OutboundConnectionParams.java
new file mode 100644
index 0000000..282480e
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/OutboundConnectionParams.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import com.google.common.base.Preconditions;
+
+import io.netty.channel.WriteBufferWaterMark;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
+import org.apache.cassandra.net.async.OutboundHandshakeHandler.HandshakeResult;
+import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy;
+
+/**
+ * A collection of data points to be passed around for outbound connections.
+ */
+public class OutboundConnectionParams
+{
+    public static final int DEFAULT_SEND_BUFFER_SIZE = 1 << 16;
+
+    final OutboundConnectionIdentifier connectionId;
+    final Consumer<HandshakeResult> callback;
+    final ServerEncryptionOptions encryptionOptions;
+    final NettyFactory.Mode mode;
+    final boolean compress;
+    final Optional<CoalescingStrategy> coalescingStrategy;
+    final int sendBufferSize;
+    final boolean tcpNoDelay;
+    final Supplier<QueuedMessage> backlogSupplier;
+    final Consumer<MessageResult> messageResultConsumer;
+    final WriteBufferWaterMark waterMark;
+    final int protocolVersion;
+
+    private OutboundConnectionParams(OutboundConnectionIdentifier connectionId,
+                                     Consumer<HandshakeResult> callback,
+                                     ServerEncryptionOptions encryptionOptions,
+                                     NettyFactory.Mode mode,
+                                     boolean compress,
+                                     Optional<CoalescingStrategy> coalescingStrategy,
+                                     int sendBufferSize,
+                                     boolean tcpNoDelay,
+                                     Supplier<QueuedMessage> backlogSupplier,
+                                     Consumer<MessageResult> messageResultConsumer,
+                                     WriteBufferWaterMark waterMark,
+                                     int protocolVersion)
+    {
+        this.connectionId = connectionId;
+        this.callback = callback;
+        this.encryptionOptions = encryptionOptions;
+        this.mode = mode;
+        this.compress = compress;
+        this.coalescingStrategy = coalescingStrategy;
+        this.sendBufferSize = sendBufferSize;
+        this.tcpNoDelay = tcpNoDelay;
+        this.backlogSupplier = backlogSupplier;
+        this.messageResultConsumer = messageResultConsumer;
+        this.waterMark = waterMark;
+        this.protocolVersion = protocolVersion;
+    }
+
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+
+    public static Builder builder(OutboundConnectionParams params)
+    {
+        return new Builder(params);
+    }
+    
+    public static class Builder
+    {
+        private OutboundConnectionIdentifier connectionId;
+        private Consumer<HandshakeResult> callback;
+        private ServerEncryptionOptions encryptionOptions;
+        private NettyFactory.Mode mode;
+        private boolean compress;
+        private Optional<CoalescingStrategy> coalescingStrategy = Optional.empty();
+        private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE;
+        private boolean tcpNoDelay;
+        private Supplier<QueuedMessage> backlogSupplier;
+        private Consumer<MessageResult> messageResultConsumer;
+        private WriteBufferWaterMark waterMark = WriteBufferWaterMark.DEFAULT;
+        int protocolVersion;
+
+        private Builder()
+        {   }
+
+        private Builder(OutboundConnectionParams params)
+        {
+            this.connectionId = params.connectionId;
+            this.callback = params.callback;
+            this.encryptionOptions = params.encryptionOptions;
+            this.mode = params.mode;
+            this.compress = params.compress;
+            this.coalescingStrategy = params.coalescingStrategy;
+            this.sendBufferSize = params.sendBufferSize;
+            this.tcpNoDelay = params.tcpNoDelay;
+            this.backlogSupplier = params.backlogSupplier;
+            this.messageResultConsumer = params.messageResultConsumer;
+        }
+
+        public Builder connectionId(OutboundConnectionIdentifier connectionId)
+        {
+            this.connectionId = connectionId;
+            return this;
+        }
+
+        public Builder callback(Consumer<HandshakeResult> callback)
+        {
+            this.callback = callback;
+            return this;
+        }
+
+        public Builder encryptionOptions(ServerEncryptionOptions encryptionOptions)
+        {
+            this.encryptionOptions = encryptionOptions;
+            return this;
+        }
+
+        public Builder mode(NettyFactory.Mode mode)
+        {
+            this.mode = mode;
+            return this;
+        }
+
+        public Builder compress(boolean compress)
+        {
+            this.compress = compress;
+            return this;
+        }
+
+        public Builder coalescingStrategy(Optional<CoalescingStrategy> coalescingStrategy)
+        {
+            this.coalescingStrategy = coalescingStrategy;
+            return this;
+        }
+
+        public Builder sendBufferSize(int sendBufferSize)
+        {
+            this.sendBufferSize = sendBufferSize;
+            return this;
+        }
+
+        public Builder tcpNoDelay(boolean tcpNoDelay)
+        {
+            this.tcpNoDelay = tcpNoDelay;
+            return this;
+        }
+
+        public Builder backlogSupplier(Supplier<QueuedMessage> backlogSupplier)
+        {
+            this.backlogSupplier = backlogSupplier;
+            return this;
+        }
+
+        public Builder messageResultConsumer(Consumer<MessageResult> messageResultConsumer)
+        {
+            this.messageResultConsumer = messageResultConsumer;
+            return this;
+        }
+
+        public Builder waterMark(WriteBufferWaterMark waterMark)
+        {
+            this.waterMark = waterMark;
+            return this;
+        }
+
+        public Builder protocolVersion(int protocolVersion)
+        {
+            this.protocolVersion = protocolVersion;
+            return this;
+        }
+
+        public OutboundConnectionParams build()
+        {
+            Preconditions.checkArgument(protocolVersion > 0, "illegal protocol version: " + protocolVersion);
+            Preconditions.checkArgument(sendBufferSize > 0 && sendBufferSize < 1 << 20, "illegal send buffer size: " + sendBufferSize);
+
+            return new OutboundConnectionParams(connectionId, callback, encryptionOptions, mode, compress, coalescingStrategy, sendBufferSize,
+                                                tcpNoDelay, backlogSupplier, messageResultConsumer, waterMark, protocolVersion);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java b/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java
new file mode 100644
index 0000000..703549a
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandler;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.Future;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.async.HandshakeProtocol.FirstHandshakeMessage;
+import org.apache.cassandra.net.async.HandshakeProtocol.SecondHandshakeMessage;
+import org.apache.cassandra.net.async.HandshakeProtocol.ThirdHandshakeMessage;
+
+import static org.apache.cassandra.config.Config.PROPERTY_PREFIX;
+
+/**
+ * A {@link ChannelHandler} to execute the send-side of the internode communication handshake protocol.
+ * As soon as the handler is added to the channel via {@link #channelActive(ChannelHandlerContext)}
+ * (which is only invoked if the underlying TCP connection was properly established), the {@link FirstHandshakeMessage}
+ * of the internode messaging protocol is automatically sent out. See {@link HandshakeProtocol} for full details
+ * about the internode messaging hndshake protocol.
+ * <p>
+ * Upon completion of the handshake (on success or fail), the {@link #callback} is invoked to let the listener
+ * know the result of the handshake. See {@link HandshakeResult} for details about the different result states.
+ * <p>
+ * This class extends {@link ByteToMessageDecoder}, which is a {@link ChannelInboundHandler}, because this handler
+ * waits for the peer's handshake response (the {@link SecondHandshakeMessage} of the internode messaging handshake protocol).
+ */
+public class OutboundHandshakeHandler extends ByteToMessageDecoder
+{
+    private static final Logger logger = LoggerFactory.getLogger(OutboundHandshakeHandler.class);
+
+    /**
+     * The number of milliseconds to wait before closing a channel if there has been no progress (when there is
+     * data to be sent).See {@link IdleStateHandler} and {@link MessageOutHandler#userEventTriggered(ChannelHandlerContext, Object)}.
+     */
+    private static final long DEFAULT_WRITE_IDLE_MS = TimeUnit.SECONDS.toMillis(10);
+    private static final String WRITE_IDLE_PROPERTY = PROPERTY_PREFIX + "outbound_write_idle_ms";
+    private static final long WRITE_IDLE_MS = Long.getLong(WRITE_IDLE_PROPERTY, DEFAULT_WRITE_IDLE_MS);
+
+    private final OutboundConnectionIdentifier connectionId;
+
+    /**
+     * The expected messaging service version to use.
+     */
+    private final int messagingVersion;
+
+    /**
+     * A function to invoke upon completion of the attempt, success or failure, to connect to the peer.
+     */
+    private final Consumer<HandshakeResult> callback;
+    private final NettyFactory.Mode mode;
+    private final OutboundConnectionParams params;
+
+    OutboundHandshakeHandler(OutboundConnectionParams params)
+    {
+        this.params = params;
+        this.connectionId = params.connectionId;
+        this.messagingVersion = params.protocolVersion;
+        this.callback = params.callback;
+        this.mode = params.mode;
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * Invoked when the channel is made active, and sends out the {@link FirstHandshakeMessage}
+     */
+    @Override
+    public void channelActive(final ChannelHandlerContext ctx) throws Exception
+    {
+        FirstHandshakeMessage msg = new FirstHandshakeMessage(messagingVersion, mode, params.compress);
+        logger.trace("starting handshake with peer {}, msg = {}", connectionId.connectionAddress(), msg);
+        ctx.writeAndFlush(msg.encode(ctx.alloc())).addListener(future -> firstHandshakeMessageListener(future, ctx));
+        ctx.fireChannelActive();
+    }
+
+    /**
+     * A simple listener to make sure we could send the {@link FirstHandshakeMessage} to the socket,
+     * and fail the handshake attempt if we could not (for example, maybe we could create the TCP socket, but then
+     * the connection gets closed for some reason).
+     */
+    void firstHandshakeMessageListener(Future<? super Void> future, ChannelHandlerContext ctx)
+    {
+        if (future.isSuccess())
+            return;
+
+        ChannelFuture channelFuture = (ChannelFuture)future;
+        exceptionCaught(ctx, channelFuture.cause());
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * Invoked when we get the response back from the peer, which should contain the second message of the internode messaging handshake.
+     * <p>
+     * If the peer's protocol version does not equal what we were expecting, immediately close the channel (and socket);
+     * do *not* send out the third message of the internode messaging handshake.
+     * We will reconnect on the appropriate protocol version.
+     */
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception
+    {
+        SecondHandshakeMessage msg = SecondHandshakeMessage.maybeDecode(in);
+        if (msg == null)
+            return;
+
+        logger.trace("received second handshake message from peer {}, msg = {}", connectionId.connectionAddress(), msg);
+        final int peerMessagingVersion = msg.messagingVersion;
+
+        // we expected a higher protocol version, but it was actually lower
+        if (messagingVersion > peerMessagingVersion)
+        {
+            logger.trace("peer's max version is {}; will reconnect with that version", peerMessagingVersion);
+            try
+            {
+                if (DatabaseDescriptor.getSeeds().contains(connectionId.remote()))
+                    logger.warn("Seed gossip version is {}; will not connect with that version", peerMessagingVersion);
+            }
+            catch (Throwable e)
+            {
+                // If invalid yaml has been added to the config since startup, getSeeds() will throw an AssertionError
+                // Additionally, third party seed providers may throw exceptions if network is flakey.
+                // Regardless of what's thrown, we must catch it, disconnect, and try again
+                logger.warn("failed to reread yaml (on trying to connect to a seed): {}", e.getLocalizedMessage());
+            }
+            ctx.close();
+            callback.accept(HandshakeResult.disconnect(peerMessagingVersion));
+            return;
+        }
+        // we anticipate a version that is lower than what peer is actually running
+        else if (messagingVersion < peerMessagingVersion && messagingVersion < MessagingService.current_version)
+        {
+            logger.trace("peer has a higher max version than expected {} (previous value {})", peerMessagingVersion, messagingVersion);
+            ctx.close();
+            callback.accept(HandshakeResult.disconnect(peerMessagingVersion));
+            return;
+        }
+
+        try
+        {
+            ctx.writeAndFlush(new ThirdHandshakeMessage(MessagingService.current_version, connectionId.local()).encode(ctx.alloc()));
+            ChannelWriter channelWriter = setupPipeline(ctx.channel(), peerMessagingVersion);
+            callback.accept(HandshakeResult.success(channelWriter, peerMessagingVersion));
+        }
+        catch (Exception e)
+        {
+            logger.info("failed to finalize internode messaging handshake", e);
+            ctx.close();
+            callback.accept(HandshakeResult.failed());
+        }
+    }
+
+    @VisibleForTesting
+    ChannelWriter setupPipeline(Channel channel, int messagingVersion)
+    {
+        ChannelPipeline pipeline = channel.pipeline();
+        pipeline.addLast("idleWriteHandler", new IdleStateHandler(true, 0, WRITE_IDLE_MS, 0, TimeUnit.MILLISECONDS));
+        if (params.compress)
+            pipeline.addLast(NettyFactory.OUTBOUND_COMPRESSOR_HANDLER_NAME, NettyFactory.createLz4Encoder(messagingVersion));
+
+        ChannelWriter channelWriter = ChannelWriter.create(channel, params.messageResultConsumer, params.coalescingStrategy);
+        pipeline.addLast("messageOutHandler", new MessageOutHandler(connectionId, messagingVersion, channelWriter, params.backlogSupplier));
+        pipeline.remove(this);
+        return channelWriter;
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+    {
+        logger.error("Failed to properly handshake with peer {}. Closing the channel.", connectionId, cause);
+        ctx.close();
+        callback.accept(HandshakeResult.failed());
+    }
+
+    /**
+     * The result of the handshake. Handshake has 3 possible outcomes:
+     *  1) it can be successful, in which case the channel and version to used is returned in this result.
+     *  2) we may decide to disconnect to reconnect with another protocol version (namely, the version is passed in this result).
+     *  3) we can have a negotiation failure for an unknown reason. (#sadtrombone)
+     */
+    public static class HandshakeResult
+    {
+        static final int UNKNOWN_PROTOCOL_VERSION = -1;
+
+        /**
+         * Describes the result of receiving the response back from the peer (Message 2 of the handshake)
+         * and implies an action that should be taken.
+         */
+        enum Outcome
+        {
+            SUCCESS, DISCONNECT, NEGOTIATION_FAILURE
+        }
+
+        /** The channel for the connection, only set for successful handshake. */
+        final ChannelWriter channelWriter;
+        /** The version negotiated with the peer. Set unless this is a {@link Outcome#NEGOTIATION_FAILURE}. */
+        final int negotiatedMessagingVersion;
+        /** The handshake {@link Outcome}. */
+        final Outcome outcome;
+
+        private HandshakeResult(ChannelWriter channelWriter, int negotiatedMessagingVersion, Outcome outcome)
+        {
+            this.channelWriter = channelWriter;
+            this.negotiatedMessagingVersion = negotiatedMessagingVersion;
+            this.outcome = outcome;
+        }
+
+        static HandshakeResult success(ChannelWriter channel, int negotiatedMessagingVersion)
+        {
+            return new HandshakeResult(channel, negotiatedMessagingVersion, Outcome.SUCCESS);
+        }
+
+        static HandshakeResult disconnect(int negotiatedMessagingVersion)
+        {
+            return new HandshakeResult(null, negotiatedMessagingVersion, Outcome.DISCONNECT);
+        }
+
+        static HandshakeResult failed()
+        {
+            return new HandshakeResult(null, UNKNOWN_PROTOCOL_VERSION, Outcome.NEGOTIATION_FAILURE);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java b/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
new file mode 100644
index 0000000..6bda9cd
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
@@ -0,0 +1,716 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.Future;
+import org.apache.cassandra.auth.IInternodeAuthenticator;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.async.NettyFactory.Mode;
+import org.apache.cassandra.net.async.OutboundHandshakeHandler.HandshakeResult;
+import org.apache.cassandra.utils.CoalescingStrategies;
+import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.NoSpamLogger;
+
+/**
+ * Represents one connection to a peer, and handles the state transistions on the connection and the netty {@link Channel}
+ * The underlying socket is not opened until explicitly requested (by sending a message).
+ *
+ * The basic setup for the channel is like this: a message is requested to be sent via {@link #sendMessage(MessageOut, int)}.
+ * If the channel is not established, then we need to create it (obviously). To prevent multiple threads from creating
+ * independent connections, they attempt to update the {@link #state}; one thread will win the race and create the connection.
+ * Upon sucessfully setting up the connection/channel, the {@link #state} will be updated again (to {@link State#READY},
+ * which indicates to other threads that the channel is ready for business and can be written to.
+ *
+ */
+public class OutboundMessagingConnection
+{
+    static final Logger logger = LoggerFactory.getLogger(OutboundMessagingConnection.class);
+    private static final NoSpamLogger errorLogger = NoSpamLogger.getLogger(logger, 10, TimeUnit.SECONDS);
+
+    private static final String INTRADC_TCP_NODELAY_PROPERTY = Config.PROPERTY_PREFIX + "otc_intradc_tcp_nodelay";
+
+    /**
+     * Enabled/disable TCP_NODELAY for intradc connections. Defaults to enabled.
+     */
+    private static final boolean INTRADC_TCP_NODELAY = Boolean.parseBoolean(System.getProperty(INTRADC_TCP_NODELAY_PROPERTY, "true"));
+
+    /**
+     * Number of milliseconds between connection createRetry attempts.
+     */
+    private static final int OPEN_RETRY_DELAY_MS = 100;
+
+    /**
+     * A minimum number of milliseconds to wait for a connection (TCP socket connect + handshake)
+     */
+    private static final int MINIMUM_CONNECT_TIMEOUT_MS = 2000;
+    private final IInternodeAuthenticator authenticator;
+
+    /**
+     * Describes this instance's ability to send messages into it's Netty {@link Channel}.
+     */
+    enum State
+    {
+        /** waiting to create the connection */
+        NOT_READY,
+        /** we've started to create the connection/channel */
+        CREATING_CHANNEL,
+        /** channel is established and we can send messages */
+        READY,
+        /** a dead state which should not be transitioned away from */
+        CLOSED
+    }
+
+    /**
+     * Backlog to hold messages passed by upstream threads while the Netty {@link Channel} is being set up or recreated.
+     */
+    private final Queue<QueuedMessage> backlog;
+
+    /**
+     * Reference to a {@link ScheduledExecutorService} rther than directly depending on something like {@link ScheduledExecutors}.
+     */
+    private final ScheduledExecutorService scheduledExecutor;
+
+    final AtomicLong droppedMessageCount;
+    final AtomicLong completedMessageCount;
+
+    private volatile OutboundConnectionIdentifier connectionId;
+
+    private final ServerEncryptionOptions encryptionOptions;
+
+    /**
+     * A future for retrying connections. Bear in mind that this future does not execute in the
+     * netty event event loop, so there's some races to be careful of.
+     */
+    private volatile ScheduledFuture<?> connectionRetryFuture;
+
+    /**
+     * A future for notifying when the timeout for creating the connection and negotiating the handshake has elapsed.
+     * It will be cancelled when the channel is established correctly. Bear in mind that this future does not execute in the
+     * netty event event loop, so there's some races to be careful of.
+     */
+    private volatile ScheduledFuture<?> connectionTimeoutFuture;
+
+    private final AtomicReference<State> state;
+
+    private final Optional<CoalescingStrategy> coalescingStrategy;
+
+    /**
+     * A running count of the number of times we've tried to create a connection.
+     */
+    private volatile int connectAttemptCount;
+
+    /**
+     * The netty channel, once a socket connection is established; it won't be in it's normal working state until the handshake is complete.
+     */
+    private volatile ChannelWriter channelWriter;
+
+    /**
+     * the target protocol version to communicate to the peer with, discovered/negotiated via handshaking
+     */
+    private int targetVersion;
+
+    OutboundMessagingConnection(OutboundConnectionIdentifier connectionId,
+                                ServerEncryptionOptions encryptionOptions,
+                                Optional<CoalescingStrategy> coalescingStrategy,
+                                IInternodeAuthenticator authenticator)
+    {
+        this(connectionId, encryptionOptions, coalescingStrategy, authenticator, ScheduledExecutors.scheduledFastTasks);
+    }
+
+    @VisibleForTesting
+    OutboundMessagingConnection(OutboundConnectionIdentifier connectionId,
+                                ServerEncryptionOptions encryptionOptions,
+                                Optional<CoalescingStrategy> coalescingStrategy,
+                                IInternodeAuthenticator authenticator,
+                                ScheduledExecutorService sceduledExecutor)
+    {
+        this.connectionId = connectionId;
+        this.encryptionOptions = encryptionOptions;
+        this.authenticator = authenticator;
+        backlog = new ConcurrentLinkedQueue<>();
+        droppedMessageCount = new AtomicLong(0);
+        completedMessageCount = new AtomicLong(0);
+        state = new AtomicReference<>(State.NOT_READY);
+        this.scheduledExecutor = sceduledExecutor;
+        this.coalescingStrategy = coalescingStrategy;
+
+        // We want to use the most precise protocol version we know because while there is version detection on connect(),
+        // the target version might be accessed by the pool (in getConnection()) before we actually connect (as we
+        // only connect when the first message is submitted). Note however that the only case where we'll connect
+        // without knowing the true version of a node is if that node is a seed (otherwise, we can't know a node
+        // unless it has been gossiped to us or it has connected to us, and in both cases that will set the version).
+        // In that case we won't rely on that targetVersion before we're actually connected and so the version
+        // detection in connect() will do its job.
+        targetVersion = MessagingService.instance().getVersion(connectionId.remote());
+    }
+
+    /**
+     * If the connection is set up and ready to use (the normal case), simply send the message to it and return.
+     * Otherwise, one lucky thread is selected to create the Channel, while other threads just add the {@code msg} to
+     * the backlog queue.
+     *
+     * @return true if the message was accepted by the {@link #channelWriter}; else false if it was not accepted
+     * and added to the backlog or the channel is {@link State#CLOSED}. See documentation in {@link ChannelWriter} and
+     * {@link MessageOutHandler} how the backlogged messages get consumed.
+     */
+    boolean sendMessage(MessageOut msg, int id)
+    {
+        return sendMessage(new QueuedMessage(msg, id));
+    }
+
+    boolean sendMessage(QueuedMessage queuedMessage)
+    {
+        State state = this.state.get();
+        if (state == State.READY)
+        {
+            if (channelWriter.write(queuedMessage, false))
+                return true;
+
+            backlog.add(queuedMessage);
+            return false;
+        }
+        else if (state == State.CLOSED)
+        {
+            errorLogger.warn("trying to write message to a closed connection");
+            return false;
+        }
+        else
+        {
+            backlog.add(queuedMessage);
+            connect();
+            return true;
+        }
+    }
+
+    /**
+     * Initiate all the actions required to establish a working, valid connection. This includes
+     * opening the socket, negotiating the internode messaging handshake, and setting up the working
+     * Netty {@link Channel}. However, this method will not block for all those actions: it will only
+     * kick off the connection attempt as everything is asynchronous.
+     * <p>
+     * Threads compete to update the {@link #state} field to {@link State#CREATING_CHANNEL} to ensure only one
+     * connection is attempted at a time.
+     *
+     * @return true if kicking off the connection attempt was started by this thread; else, false.
+     */
+    public boolean connect()
+    {
+        // try to be the winning thread to create the channel
+        if (!state.compareAndSet(State.NOT_READY, State.CREATING_CHANNEL))
+            return false;
+
+        // clean up any lingering connection attempts
+        if (connectionTimeoutFuture != null)
+        {
+            connectionTimeoutFuture.cancel(false);
+            connectionTimeoutFuture = null;
+        }
+
+        return tryConnect();
+    }
+
+    private boolean tryConnect()
+    {
+        if (state.get() != State.CREATING_CHANNEL)
+                return false;
+
+        logger.debug("connection attempt {} to {}", connectAttemptCount, connectionId);
+
+
+        InetSocketAddress remote = connectionId.remoteAddress();
+        if (!authenticator.authenticate(remote.getAddress(), remote.getPort()))
+        {
+            logger.warn("Internode auth failed connecting to {}", connectionId);
+            //Remove the connection pool and other thread so messages aren't queued
+            MessagingService.instance().destroyConnectionPool(remote.getAddress());
+
+            // don't update the state field as destroyConnectionPool() *should* call OMC.close()
+            // on all the connections in the OMP for the remoteAddress
+            return false;
+        }
+
+        boolean compress = shouldCompressConnection(connectionId.local(), connectionId.remote());
+        Bootstrap bootstrap = buildBootstrap(compress);
+
+        ChannelFuture connectFuture = bootstrap.connect();
+        connectFuture.addListener(this::connectCallback);
+
+        long timeout = Math.max(MINIMUM_CONNECT_TIMEOUT_MS, DatabaseDescriptor.getRpcTimeout());
+        if (connectionTimeoutFuture == null || connectionTimeoutFuture.isDone())
+            connectionTimeoutFuture = scheduledExecutor.schedule(() -> connectionTimeout(connectFuture), timeout, TimeUnit.MILLISECONDS);
+        return true;
+    }
+
+    @VisibleForTesting
+    static boolean shouldCompressConnection(InetAddress localHost, InetAddress remoteHost)
+    {
+        return (DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.all)
+               || ((DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.dc) && !isLocalDC(localHost, remoteHost));
+    }
+
+    private Bootstrap buildBootstrap(boolean compress)
+    {
+        boolean tcpNoDelay = isLocalDC(connectionId.local(), connectionId.remote()) ? INTRADC_TCP_NODELAY : DatabaseDescriptor.getInterDCTcpNoDelay();
+        int sendBufferSize = DatabaseDescriptor.getInternodeSendBufferSize() > 0
+                             ? DatabaseDescriptor.getInternodeSendBufferSize()
+                             : OutboundConnectionParams.DEFAULT_SEND_BUFFER_SIZE;
+        OutboundConnectionParams params = OutboundConnectionParams.builder()
+                                                                  .connectionId(connectionId)
+                                                                  .callback(this::finishHandshake)
+                                                                  .encryptionOptions(encryptionOptions)
+                                                                  .mode(Mode.MESSAGING)
+                                                                  .compress(compress)
+                                                                  .coalescingStrategy(coalescingStrategy)
+                                                                  .sendBufferSize(sendBufferSize)
+                                                                  .tcpNoDelay(tcpNoDelay)
+                                                                  .backlogSupplier(() -> nextBackloggedMessage())
+                                                                  .messageResultConsumer(this::handleMessageResult)
+                                                                  .protocolVersion(targetVersion)
+                                                                  .build();
+
+        return NettyFactory.instance.createOutboundBootstrap(params);
+    }
+
+    private QueuedMessage nextBackloggedMessage()
+    {
+        QueuedMessage msg = backlog.poll();
+        if (msg == null)
+            return null;
+
+        if (!msg.isTimedOut())
+            return msg;
+
+        if (msg.shouldRetry())
+            return msg.createRetry();
+
+        droppedMessageCount.incrementAndGet();
+        return null;
+    }
+
+    static boolean isLocalDC(InetAddress localHost, InetAddress remoteHost)
+    {
+        String remoteDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(remoteHost);
+        String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(localHost);
+        return remoteDC != null && remoteDC.equals(localDC);
+    }
+
+    /**
+     * Handles the callback of the TCP connection attempt (not including the handshake negotiation!), and really all
+     * we're handling here is the TCP connection failures. On failure, we close the channel (which should disconnect
+     * the socket, if connected). If there was an {@link IOException} while trying to connect, the connection will be
+     * retried after a short delay.
+     * <p>
+     * This method does not alter the {@link #state} as it's only evaluating the TCP connect, not TCP connect and handshake.
+     * Thus, {@link #finishHandshake(HandshakeResult)} will handle any necessary state updates.
+     * <p>
+     * Note: this method is called from the event loop, so be careful wrt thread visibility
+     *
+     * @return true iff the TCP connection was established and the {@link #state} is not {@link State#CLOSED}; else false.
+     */
+    @VisibleForTesting
+    boolean connectCallback(Future<? super Void> future)
+    {
+        ChannelFuture channelFuture = (ChannelFuture)future;
+
+        // make sure this instance is not (terminally) closed
+        if (state.get() == State.CLOSED)
+        {
+            channelFuture.channel().close();
+            return false;
+        }
+
+        // this is the success state
+        final Throwable cause = future.cause();
+        if (cause == null)
+        {
+            connectAttemptCount = 0;
+            return true;
+        }
+
+        setStateIfNotClosed(state, State.NOT_READY);
+        if (cause instanceof IOException)
+        {
+            logger.trace("unable to connect on attempt {} to {}", connectAttemptCount, connectionId, cause);
+            connectAttemptCount++;
+            connectionRetryFuture = scheduledExecutor.schedule(this::connect, OPEN_RETRY_DELAY_MS * connectAttemptCount, TimeUnit.MILLISECONDS);
+        }
+        else
+        {
+            JVMStabilityInspector.inspectThrowable(cause);
+            logger.error("non-IO error attempting to connect to {}", connectionId, cause);
+        }
+        return false;
+    }
+
+    /**
+     * A callback for handling timeouts when creating a connection/negotiating the handshake.
+     * <p>
+     * Note: this method is *not* invoked from the netty event loop,
+     * so there's an inherent race with {@link #finishHandshake(HandshakeResult)},
+     * as well as any possible connect() reattempts (a seemingly remote race condition, however).
+     * Therefore, this function tries to lose any races, as much as possible.
+     *
+     * @return true if there was a timeout on the connect/handshake; else false.
+     */
+    boolean connectionTimeout(ChannelFuture channelFuture)
+    {
+        if (connectionRetryFuture != null)
+        {
+            connectionRetryFuture.cancel(false);
+            connectionRetryFuture = null;
+        }
+        connectAttemptCount = 0;
+        State initialState = state.get();
+        if (initialState == State.CLOSED)
+            return true;
+
+        if (initialState != State.READY)
+        {
+            logger.debug("timed out while trying to connect to {}", connectionId);
+
+            channelFuture.channel().close();
+            // a last-ditch attempt to let finishHandshake() win the race
+            if (state.compareAndSet(initialState, State.NOT_READY))
+            {
+                backlog.clear();
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Process the results of the handshake negotiation.
+     * <p>
+     * Note: this method will be invoked from the netty event loop,
+     * so there's an inherent race with {@link #connectionTimeout(ChannelFuture)}.
+     */
+    void finishHandshake(HandshakeResult result)
+    {
+        // clean up the connector instances before changing the state
+        if (connectionTimeoutFuture != null)
+        {
+            connectionTimeoutFuture.cancel(false);
+            connectionTimeoutFuture = null;
+        }
+        if (connectionRetryFuture != null)
+        {
+            connectionRetryFuture.cancel(false);
+            connectionRetryFuture = null;
+        }
+        connectAttemptCount = 0;
+
+        if (result.negotiatedMessagingVersion != HandshakeResult.UNKNOWN_PROTOCOL_VERSION)
+        {
+            targetVersion = result.negotiatedMessagingVersion;
+            MessagingService.instance().setVersion(connectionId.remote(), targetVersion);
+        }
+
+        switch (result.outcome)
+        {
+            case SUCCESS:
+                assert result.channelWriter != null;
+                logger.debug("successfully connected to {}, conmpress = {}, coalescing = {}", connectionId,
+                             shouldCompressConnection(connectionId.local(), connectionId.remote()),
+                             coalescingStrategy.isPresent() ? coalescingStrategy.get() : CoalescingStrategies.Strategy.DISABLED);
+                if (state.get() == State.CLOSED)
+                {
+                    result.channelWriter.close();
+                    backlog.clear();
+                    break;
+                }
+                channelWriter = result.channelWriter;
+                // drain the backlog to the channel
+                channelWriter.writeBacklog(backlog, true);
+                // change the state so newly incoming messages can be sent to the channel (without adding to the backlog)
+                setStateIfNotClosed(state, State.READY);
+                // ship out any stragglers that got added to the backlog
+                channelWriter.writeBacklog(backlog, true);
+                break;
+            case DISCONNECT:
+                reconnect();
+                break;
+            case NEGOTIATION_FAILURE:
+                setStateIfNotClosed(state, State.NOT_READY);
+                backlog.clear();
+                break;
+            default:
+                throw new IllegalArgumentException("unhandled result type: " + result.outcome);
+        }
+    }
+
+    @VisibleForTesting
+    static boolean setStateIfNotClosed(AtomicReference<State> state, State newState)
+    {
+        State s = state.get();
+        if (s == State.CLOSED)
+            return false;
+        state.set(newState);
+        return true;
+    }
+
+    int getTargetVersion()
+    {
+        return targetVersion;
+    }
+
+    /**
+     * Handles the result of each message sent.
+     *
+     * Note: this function is expected to be invoked on the netty event loop. Also, do not retain any state from
+     * the input {@code messageResult}.
+     */
+    void handleMessageResult(MessageResult messageResult)
+    {
+        completedMessageCount.incrementAndGet();
+
+        // checking the cause() is an optimized way to tell if the operation was successful (as the cause will be null)
+        // Note that ExpiredException is just a marker for timeout-ed message we're dropping, but as we already
+        // incremented the dropped message count in MessageOutHandler, we have nothing to do.
+        Throwable cause = messageResult.future.cause();
+        if (cause == null)
+            return;
+
+        if (cause instanceof ExpiredException)
+        {
+            droppedMessageCount.incrementAndGet();
+            return;
+        }
+
+        JVMStabilityInspector.inspectThrowable(cause);
+
+        if (cause instanceof IOException || cause.getCause() instanceof IOException)
+        {
+            ChannelWriter writer = messageResult.writer;
+            if (writer.shouldPurgeBacklog())
+                purgeBacklog();
+
+            // This writer needs to be closed and we need to trigger a reconnection. We really only want to do that
+            // once for this channel however (and again, no race because we're on the netty event loop).
+            if (!writer.isClosed() && messageResult.allowReconnect)
+            {
+                reconnect();
+                writer.close();
+            }
+
+            QueuedMessage msg = messageResult.msg;
+            if (msg != null && msg.shouldRetry())
+            {
+                sendMessage(msg.createRetry());
+            }
+        }
+        else if (messageResult.future.isCancelled())
+        {
+            // Someone cancelled the future, which we assume meant it doesn't want the message to be sent if it hasn't
+            // yet. Just ignore.
+        }
+        else
+        {
+            // Non IO exceptions are likely a programming error so let's not silence them
+            logger.error("Unexpected error writing on " + connectionId, cause);
+        }
+    }
+
+    /**
+     * Change the IP address on which we connect to the peer. We will attempt to connect to the new address if there
+     * was a previous connection, and new incoming messages as well as existing {@link #backlog} messages will be sent there.
+     * Any outstanding messages in the existing channel will still be sent to the previous address (we won't/can't move them from
+     * one channel to another).
+     */
+    void reconnectWithNewIp(InetSocketAddress newAddr)
+    {
+        State currentState = state.get();
+
+        // if we're closed, ignore the request
+        if (currentState == State.CLOSED)
+            return;
+
+        // capture a reference to the current channel, in case it gets swapped out before we can call close() on it
+        ChannelWriter currentChannel = channelWriter;
+        connectionId = connectionId.withNewConnectionAddress(newAddr);
+
+        if (currentState != State.NOT_READY)
+            reconnect();
+
+        // lastly, push through anything remaining in the existing channel.
+        if (currentChannel != null)
+            currentChannel.softClose();
+    }
+
+    /**
+     * Sets the state properly so {@link #connect()} can attempt to reconnect.
+     */
+    void reconnect()
+    {
+        if (setStateIfNotClosed(state, State.NOT_READY))
+            connect();
+    }
+
+    void purgeBacklog()
+    {
+        backlog.clear();
+    }
+
+    public void close(boolean softClose)
+    {
+        state.set(State.CLOSED);
+
+        if (connectionTimeoutFuture != null)
+        {
+            connectionTimeoutFuture.cancel(false);
+            connectionTimeoutFuture = null;
+        }
+
+        // drain the backlog
+        if (channelWriter != null)
+        {
+            if (softClose)
+            {
+                channelWriter.writeBacklog(backlog, false);
+                channelWriter.softClose();
+            }
+            else
+            {
+                backlog.clear();
+                channelWriter.close();
+            }
+
+            channelWriter = null;
+        }
+    }
+
+    @Override
+    public String toString()
+    {
+        return connectionId.toString();
+    }
+
+    public Integer getPendingMessages()
+    {
+        int pending = backlog.size();
+        ChannelWriter chan = channelWriter;
+        if (chan != null)
+            pending += (int)chan.pendingMessageCount();
+        return pending;
+    }
+
+    public Long getCompletedMessages()
+    {
+        return completedMessageCount.get();
+    }
+
+    public Long getDroppedMessages()
+    {
+        return droppedMessageCount.get();
+    }
+
+    /*
+        methods specific to testing follow
+     */
+
+    @VisibleForTesting
+    int backlogSize()
+    {
+        return backlog.size();
+    }
+
+    @VisibleForTesting
+    void addToBacklog(QueuedMessage msg)
+    {
+        backlog.add(msg);
+    }
+
+    @VisibleForTesting
+    void setChannelWriter(ChannelWriter channelWriter)
+    {
+        this.channelWriter = channelWriter;
+    }
+
+    @VisibleForTesting
+    ChannelWriter getChannelWriter()
+    {
+        return channelWriter;
+    }
+
+    @VisibleForTesting
+    void setState(State state)
+    {
+        this.state.set(state);
+    }
+
+    @VisibleForTesting
+    State getState()
+    {
+        return state.get();
+    }
+
+    @VisibleForTesting
+    void setTargetVersion(int targetVersion)
+    {
+        this.targetVersion = targetVersion;
+    }
+
+    @VisibleForTesting
+    OutboundConnectionIdentifier getConnectionId()
+    {
+        return connectionId;
+    }
+
+    @VisibleForTesting
+    void setConnectionTimeoutFuture(ScheduledFuture<?> connectionTimeoutFuture)
+    {
+        this.connectionTimeoutFuture = connectionTimeoutFuture;
+    }
+
+    @VisibleForTesting
+    ScheduledFuture<?> getConnectionTimeoutFuture()
+    {
+        return connectionTimeoutFuture;
+    }
+
+    public boolean isConnected()
+    {
+        return state.get() == State.READY;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java b/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java
new file mode 100644
index 0000000..0086da8
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.net.InetSocketAddress;
+import java.util.Optional;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.auth.IInternodeAuthenticator;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
+import org.apache.cassandra.metrics.ConnectionMetrics;
+import org.apache.cassandra.net.BackPressureState;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType;
+import org.apache.cassandra.utils.CoalescingStrategies;
+import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy;
+
+/**
+ * Groups a set of outbound connections to a given peer, and routes outgoing messages to the appropriate connection
+ * (based upon message's type or size). Contains a {@link OutboundMessagingConnection} for each of the
+ * {@link ConnectionType} type.
+ */
+public class OutboundMessagingPool
+{
+    @VisibleForTesting
+    static final long LARGE_MESSAGE_THRESHOLD = Long.getLong(Config.PROPERTY_PREFIX + "otcp_large_message_threshold", 1024 * 64);
+
+    private final ConnectionMetrics metrics;
+    private final BackPressureState backPressureState;
+
+    public OutboundMessagingConnection gossipChannel;
+    public OutboundMessagingConnection largeMessageChannel;
+    public OutboundMessagingConnection smallMessageChannel;
+
+    /**
+     * An override address on which to communicate with the peer. Typically used for something like EC2 public IP addresses
+     * which need to be used for communication between EC2 regions.
+     */
+    private InetSocketAddress preferredRemoteAddr;
+
+    public OutboundMessagingPool(InetSocketAddress remoteAddr, InetSocketAddress localAddr, ServerEncryptionOptions encryptionOptions,
+                                 BackPressureState backPressureState, IInternodeAuthenticator authenticator)
+    {
+        preferredRemoteAddr = remoteAddr;
+        this.backPressureState = backPressureState;
+        metrics = new ConnectionMetrics(localAddr.getAddress(), this);
+
+
+        smallMessageChannel = new OutboundMessagingConnection(OutboundConnectionIdentifier.small(localAddr, preferredRemoteAddr),
+                                                              encryptionOptions, coalescingStrategy(remoteAddr), authenticator);
+        largeMessageChannel = new OutboundMessagingConnection(OutboundConnectionIdentifier.large(localAddr, preferredRemoteAddr),
+                                                              encryptionOptions, coalescingStrategy(remoteAddr), authenticator);
+
+        // don't attempt coalesce the gossip messages, just ship them out asap (let's not anger the FD on any peer node by any artificial delays)
+        gossipChannel = new OutboundMessagingConnection(OutboundConnectionIdentifier.gossip(localAddr, preferredRemoteAddr),
+                                                        encryptionOptions, Optional.empty(), authenticator);
+    }
+
+    private static Optional<CoalescingStrategy> coalescingStrategy(InetSocketAddress remoteAddr)
+    {
+        String strategyName = DatabaseDescriptor.getOtcCoalescingStrategy();
+        String displayName = remoteAddr.getAddress().getHostAddress();
+        return CoalescingStrategies.newCoalescingStrategy(strategyName,
+                                                          DatabaseDescriptor.getOtcCoalescingWindow(),
+                                                          OutboundMessagingConnection.logger,
+                                                          displayName);
+
+    }
+
+    public BackPressureState getBackPressureState()
+    {
+        return backPressureState;
+    }
+
+    public void sendMessage(MessageOut msg, int id)
+    {
+        getConnection(msg).sendMessage(msg, id);
+    }
+
+    @VisibleForTesting
+    public OutboundMessagingConnection getConnection(MessageOut msg)
+    {
+        // optimize for the common path (the small message channel)
+        if (Stage.GOSSIP != msg.getStage())
+        {
+            return msg.serializedSize(smallMessageChannel.getTargetVersion()) < LARGE_MESSAGE_THRESHOLD
+            ? smallMessageChannel
+            : largeMessageChannel;
+        }
+        return gossipChannel;
+    }
+
+    /**
+     * Reconnect to the peer using the given {@code addr}. Outstanding messages in each channel will be sent on the
+     * current channel. Typically this function is used for something like EC2 public IP addresses which need to be used
+     * for communication between EC2 regions.
+     *
+     * @param addr IP Address to use (and prefer) going forward for connecting to the peer
+     */
+    public void reconnectWithNewIp(InetSocketAddress addr)
+    {
+        preferredRemoteAddr = addr;
+        gossipChannel.reconnectWithNewIp(addr);
+        largeMessageChannel.reconnectWithNewIp(addr);
+        smallMessageChannel.reconnectWithNewIp(addr);
+    }
+
+    /**
+     * Close each netty channel and it's socket.
+     *
+     * @param softClose {@code true} if existing messages in the queue should be sent before closing.
+     */
+    public void close(boolean softClose)
+    {
+        gossipChannel.close(softClose);
+        largeMessageChannel.close(softClose);
+        smallMessageChannel.close(softClose);
+    }
+
+    /**
+     * For testing purposes only.
+     */
+    @VisibleForTesting
+    OutboundMessagingConnection getConnection(ConnectionType connectionType)
+    {
+        switch (connectionType)
+        {
+            case GOSSIP:
+                return gossipChannel;
+            case LARGE_MESSAGE:
+                return largeMessageChannel;
+            case SMALL_MESSAGE:
+                return smallMessageChannel;
+            default:
+                throw new IllegalArgumentException("unsupported connection type: " + connectionType);
+        }
+    }
+
+    public void incrementTimeout()
+    {
+        metrics.timeouts.mark();
+    }
+
+    public long getTimeouts()
+    {
+        return metrics.timeouts.getCount();
+    }
+
+    public InetSocketAddress getPreferredRemoteAddr()
+    {
+        return preferredRemoteAddr;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/QueuedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/QueuedMessage.java b/src/java/org/apache/cassandra/net/async/QueuedMessage.java
new file mode 100644
index 0000000..28e4ba4
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/QueuedMessage.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.CoalescingStrategies;
+
+/**
+ *  A wrapper for outbound messages. All messages will be retried once.
+ */
+public class QueuedMessage implements CoalescingStrategies.Coalescable
+{
+    public final MessageOut<?> message;
+    public final int id;
+    public final long timestampNanos;
+    public final boolean droppable;
+    private final boolean retryable;
+
+    public QueuedMessage(MessageOut<?> message, int id)
+    {
+        this(message, id, System.nanoTime(), MessagingService.DROPPABLE_VERBS.contains(message.verb), true);
+    }
+
+    @VisibleForTesting
+    public QueuedMessage(MessageOut<?> message, int id, long timestampNanos, boolean droppable, boolean retryable)
+    {
+        this.message = message;
+        this.id = id;
+        this.timestampNanos = timestampNanos;
+        this.droppable = droppable;
+        this.retryable = retryable;
+    }
+
+    /** don't drop a non-droppable message just because it's timestamp is expired */
+    public boolean isTimedOut()
+    {
+        return droppable && timestampNanos < System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(message.getTimeout());
+    }
+
+    public boolean shouldRetry()
+    {
+        return retryable;
+    }
+
+    public QueuedMessage createRetry()
+    {
+        return new QueuedMessage(message, id, System.nanoTime(), droppable, false);
+    }
+
+    public long timestampNanos()
+    {
+        return timestampNanos;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org