You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2017/08/22 20:55:15 UTC
[08/11] cassandra git commit: switch internode messaging to netty
http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/NettyFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/NettyFactory.java b/src/java/org/apache/cassandra/net/async/NettyFactory.java
new file mode 100644
index 0000000..13d8810
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/NettyFactory.java
@@ -0,0 +1,375 @@
+package org.apache.cassandra.net.async;
+
+import java.net.InetSocketAddress;
+import java.util.zip.Checksum;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLParameters;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.ServerChannel;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.compression.Lz4FrameDecoder;
+import io.netty.handler.codec.compression.Lz4FrameEncoder;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.handler.ssl.OpenSsl;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.DefaultEventExecutor;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+import io.netty.util.internal.logging.Slf4JLoggerFactory;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.xxhash.XXHashFactory;
+import org.apache.cassandra.auth.IInternodeAuthenticator;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions.InternodeEncryption;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.security.SSLFactory;
+import org.apache.cassandra.service.NativeTransportService;
+import org.apache.cassandra.utils.ChecksumType;
+import org.apache.cassandra.utils.CoalescingStrategies;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * A factory for building Netty {@link Channel}s. Channels here are setup with a pipeline to participate
+ * in the internode protocol handshake, either the inbound or outbound side as per the method invoked.
+ */
+public final class NettyFactory
+{
+ private static final Logger logger = LoggerFactory.getLogger(NettyFactory.class);
+
+ /**
+ * The block size for use with netty's lz4 code.
+ */
+ private static final int COMPRESSION_BLOCK_SIZE = 1 << 16;
+
+ private static final int LZ4_HASH_SEED = 0x9747b28c;
+
+ public enum Mode { MESSAGING, STREAMING }
+
+ private static final String SSL_CHANNEL_HANDLER_NAME = "ssl";
+ static final String INBOUND_COMPRESSOR_HANDLER_NAME = "inboundCompressor";
+ static final String OUTBOUND_COMPRESSOR_HANDLER_NAME = "outboundCompressor";
+ private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
+
+ /** a useful addition for debugging; simply set to true to get more data in your logs */
+ private static final boolean WIRETRACE = false;
+ static
+ {
+ if (WIRETRACE)
+ InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE);
+ }
+
+ private static final boolean DEFAULT_USE_EPOLL = NativeTransportService.useEpoll();
+ static
+ {
+ if (!DEFAULT_USE_EPOLL)
+ logger.warn("epoll not availble {}", Epoll.unavailabilityCause());
+ }
+
+ /**
+ * The size of the receive queue for the outbound channels. As outbound channels do not receive data
+ * (outside of the internode messaging protocol's handshake), this value can be relatively small.
+ */
+ private static final int OUTBOUND_CHANNEL_RECEIVE_BUFFER_SIZE = 1 << 10;
+
+ /**
+ * The size of the send queue for the inbound channels. As inbound channels do not send data
+ * (outside of the internode messaging protocol's handshake), this value can be relatively small.
+ */
+ private static final int INBOUND_CHANNEL_SEND_BUFFER_SIZE = 1 << 10;
+
+ /**
+ * A factory instance that all normal, runtime code should use. Separate instances should only be used for testing.
+ */
+ public static final NettyFactory instance = new NettyFactory(DEFAULT_USE_EPOLL);
+
+ private final boolean useEpoll;
+ private final EventLoopGroup acceptGroup;
+
+ private final EventLoopGroup inboundGroup;
+ private final EventLoopGroup outboundGroup;
+
+ /**
+ * Constructor that allows modifying the {@link NettyFactory#useEpoll} for testing purposes. Otherwise, use the
+ * default {@link #instance}.
+ */
+ @VisibleForTesting
+ NettyFactory(boolean useEpoll)
+ {
+ this.useEpoll = useEpoll;
+ acceptGroup = getEventLoopGroup(useEpoll, determineAcceptGroupSize(DatabaseDescriptor.getServerEncryptionOptions().internode_encryption),
+ "MessagingService-NettyAcceptor-Threads", false);
+ inboundGroup = getEventLoopGroup(useEpoll, FBUtilities.getAvailableProcessors(), "MessagingService-NettyInbound-Threads", false);
+ outboundGroup = getEventLoopGroup(useEpoll, FBUtilities.getAvailableProcessors(), "MessagingService-NettyOutbound-Threads", true);
+ }
+
+ /**
+ * Determine the number of accept threads we need, which is based upon the number of listening sockets we will have.
+ * We'll have either 1 or 2 listen sockets, depending on if we use SSL or not in combination with non-SSL. If we have both,
+ * we'll have two sockets, and thus need two threads; else one socket and one thread.
+ *
+ * If the operator has configured multiple IP addresses (both {@link org.apache.cassandra.config.Config#broadcast_address}
+ * and {@link org.apache.cassandra.config.Config#listen_address} are configured), then we listen on another set of sockets
+ * - basically doubling the count. See CASSANDRA-9748 for more details.
+ */
+ static int determineAcceptGroupSize(InternodeEncryption internode_encryption)
+ {
+ int listenSocketCount = internode_encryption == InternodeEncryption.dc || internode_encryption == InternodeEncryption.rack ? 2 : 1;
+
+ if (MessagingService.shouldListenOnBroadcastAddress())
+ listenSocketCount *= 2;
+
+ return listenSocketCount;
+ }
+
+ /**
+ * Create an {@link EventLoopGroup}, for epoll or nio. The {@code boostIoRatio} flag passes a hint to the netty
+ * event loop threads to optimize comsuming all the tasks from the netty channel before checking for IO activity.
+ * By default, netty will process some maximum number of tasks off it's queue before it will check for activity on
+ * any of the open FDs, which basically amounts to checking for any incoming data. If you have a class of event loops
+ * that that do almost *no* inbound activity (like cassandra's outbound channels), then it behooves us to have the
+ * outbound netty channel consume as many tasks as it can before making the system calls to check up on the FDs,
+ * as we're not expecting any incoming data on those sockets, anyways. Thus, we pass the magic value {@code 100}
+ * to achieve the maximum consuption from the netty queue. (for implementation details, as of netty 4.1.8,
+ * see {@link io.netty.channel.epoll.EpollEventLoop#run()}.
+ */
+ static EventLoopGroup getEventLoopGroup(boolean useEpoll, int threadCount, String threadNamePrefix, boolean boostIoRatio)
+ {
+ if (useEpoll)
+ {
+ logger.debug("using netty epoll event loop for pool prefix {}", threadNamePrefix);
+ EpollEventLoopGroup eventLoopGroup = new EpollEventLoopGroup(threadCount, new DefaultThreadFactory(threadNamePrefix));
+ if (boostIoRatio)
+ eventLoopGroup.setIoRatio(100);
+ return eventLoopGroup;
+ }
+
+ logger.debug("using netty nio event loop for pool prefix {}", threadNamePrefix);
+ NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(threadCount, new DefaultThreadFactory(threadNamePrefix));
+ if (boostIoRatio)
+ eventLoopGroup.setIoRatio(100);
+ return eventLoopGroup;
+ }
+
+ /**
+ * Create a {@link Channel} that listens on the {@code localAddr}. This method will block while trying to bind to the address,
+ * but it does not make a remote call.
+ */
+ public Channel createInboundChannel(InetSocketAddress localAddr, InboundInitializer initializer, int receiveBufferSize) throws ConfigurationException
+ {
+ String nic = FBUtilities.getNetworkInterface(localAddr.getAddress());
+ logger.info("Starting Messaging Service on {} {}, encryption: {}",
+ localAddr, nic == null ? "" : String.format(" (%s)", nic), encryptionLogStatement(initializer.encryptionOptions));
+ Class<? extends ServerChannel> transport = useEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class;
+ ServerBootstrap bootstrap = new ServerBootstrap().group(acceptGroup, inboundGroup)
+ .channel(transport)
+ .option(ChannelOption.SO_BACKLOG, 500)
+ .childOption(ChannelOption.SO_KEEPALIVE, true)
+ .childOption(ChannelOption.TCP_NODELAY, true)
+ .childOption(ChannelOption.SO_REUSEADDR, true)
+ .childOption(ChannelOption.SO_SNDBUF, INBOUND_CHANNEL_SEND_BUFFER_SIZE)
+ .childHandler(initializer);
+
+ if (receiveBufferSize > 0)
+ bootstrap.childOption(ChannelOption.SO_RCVBUF, receiveBufferSize);
+
+ ChannelFuture channelFuture = bootstrap.bind(localAddr);
+
+ if (!channelFuture.awaitUninterruptibly().isSuccess())
+ {
+ if (channelFuture.channel().isOpen())
+ channelFuture.channel().close();
+
+ Throwable failedChannelCause = channelFuture.cause();
+
+ String causeString = "";
+ if (failedChannelCause != null && failedChannelCause.getMessage() != null)
+ causeString = failedChannelCause.getMessage();
+
+ if (causeString.contains("in use"))
+ {
+ throw new ConfigurationException(localAddr + " is in use by another process. Change listen_address:storage_port " +
+ "in cassandra.yaml to values that do not conflict with other services");
+ }
+ // looking at the jdk source, solaris/windows bind failue messages both use the phrase "cannot assign requested address".
+ // windows message uses "Cannot" (with a capital 'C'), and solaris (a/k/a *nux) doe not. hence we search for "annot" <sigh>
+ else if (causeString.contains("annot assign requested address"))
+ {
+ throw new ConfigurationException("Unable to bind to address " + localAddr
+ + ". Set listen_address in cassandra.yaml to an interface you can bind to, e.g., your private IP address on EC2");
+ }
+ else
+ {
+ throw new ConfigurationException("failed to bind to: " + localAddr, failedChannelCause);
+ }
+ }
+
+ return channelFuture.channel();
+ }
+
+ public static class InboundInitializer extends ChannelInitializer<SocketChannel>
+ {
+ private final IInternodeAuthenticator authenticator;
+ private final ServerEncryptionOptions encryptionOptions;
+ private final ChannelGroup channelGroup;
+
+ public InboundInitializer(IInternodeAuthenticator authenticator, ServerEncryptionOptions encryptionOptions, ChannelGroup channelGroup)
+ {
+ this.authenticator = authenticator;
+ this.encryptionOptions = encryptionOptions;
+ this.channelGroup = channelGroup;
+ }
+
+ @Override
+ public void initChannel(SocketChannel channel) throws Exception
+ {
+ channelGroup.add(channel);
+ ChannelPipeline pipeline = channel.pipeline();
+
+ // order of handlers: ssl -> logger -> handshakeHandler
+ if (encryptionOptions != null)
+ {
+ SslContext sslContext = SSLFactory.getSslContext(encryptionOptions, true, true);
+ SslHandler sslHandler = sslContext.newHandler(channel.alloc());
+ logger.trace("creating inbound netty SslContext: context={}, engine={}", sslContext.getClass().getName(), sslHandler.engine().getClass().getName());
+ pipeline.addFirst(SSL_CHANNEL_HANDLER_NAME, sslHandler); }
+
+ if (WIRETRACE)
+ pipeline.addLast("logger", new LoggingHandler(LogLevel.INFO));
+
+ channel.pipeline().addLast(HANDSHAKE_HANDLER_NAME, new InboundHandshakeHandler(authenticator));
+ }
+ }
+
+ private String encryptionLogStatement(ServerEncryptionOptions options)
+ {
+ if (options == null)
+ return "disabled";
+
+ String encryptionType = OpenSsl.isAvailable() ? "openssl" : "jdk";
+ return "enabled (" + encryptionType + ')';
+ }
+
+ /**
+ * Create the {@link Bootstrap} for connecting to a remote peer. This method does <b>not</b> attempt to connect to the peer,
+ * and thus does not block.
+ */
+ public Bootstrap createOutboundBootstrap(OutboundConnectionParams params)
+ {
+ logger.debug("creating outbound bootstrap to peer {}, compression: {}, encryption: {}, coalesce: {}", params.connectionId.connectionAddress(),
+ params.compress, encryptionLogStatement(params.encryptionOptions),
+ params.coalescingStrategy.isPresent() ? params.coalescingStrategy.get() : CoalescingStrategies.Strategy.DISABLED);
+ Class<? extends Channel> transport = useEpoll ? EpollSocketChannel.class : NioSocketChannel.class;
+ Bootstrap bootstrap = new Bootstrap().group(outboundGroup)
+ .channel(transport)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000)
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .option(ChannelOption.SO_REUSEADDR, true)
+ .option(ChannelOption.SO_SNDBUF, params.sendBufferSize)
+ .option(ChannelOption.SO_RCVBUF, OUTBOUND_CHANNEL_RECEIVE_BUFFER_SIZE)
+ .option(ChannelOption.TCP_NODELAY, params.tcpNoDelay)
+ .option(ChannelOption.WRITE_BUFFER_WATER_MARK, params.waterMark)
+ .handler(new OutboundInitializer(params));
+ bootstrap.localAddress(params.connectionId.local(), 0);
+ bootstrap.remoteAddress(params.connectionId.connectionAddress());
+ return bootstrap;
+ }
+
+ public static class OutboundInitializer extends ChannelInitializer<SocketChannel>
+ {
+ private final OutboundConnectionParams params;
+
+ OutboundInitializer(OutboundConnectionParams params)
+ {
+ this.params = params;
+ }
+
+ public void initChannel(SocketChannel channel) throws Exception
+ {
+ ChannelPipeline pipeline = channel.pipeline();
+
+ // order of handlers: ssl -> logger -> handshakeHandler
+ if (params.encryptionOptions != null)
+ {
+ SslContext sslContext = SSLFactory.getSslContext(params.encryptionOptions, true, false);
+
+ final SslHandler sslHandler;
+ if (params.encryptionOptions.require_endpoint_verification)
+ {
+ InetSocketAddress peer = params.connectionId.remoteAddress();
+ sslHandler = sslContext.newHandler(channel.alloc(), peer.getHostString(), peer.getPort());
+ SSLEngine engine = sslHandler.engine();
+ SSLParameters sslParameters = engine.getSSLParameters();
+ sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
+ engine.setSSLParameters(sslParameters);
+ }
+ else
+ {
+ sslHandler = sslContext.newHandler(channel.alloc());
+ }
+
+ logger.trace("creating outbound netty SslContext: context={}, engine={}", sslContext.getClass().getName(), sslHandler.engine().getClass().getName());
+ pipeline.addFirst(SSL_CHANNEL_HANDLER_NAME, sslHandler);
+ }
+
+ if (NettyFactory.WIRETRACE)
+ pipeline.addLast("logger", new LoggingHandler(LogLevel.INFO));
+
+ pipeline.addLast(HANDSHAKE_HANDLER_NAME, new OutboundHandshakeHandler(params));
+ }
+ }
+
+ public void close()
+ {
+ acceptGroup.shutdownGracefully();
+ outboundGroup.shutdownGracefully();
+ inboundGroup.shutdownGracefully();
+ }
+
+ static Lz4FrameEncoder createLz4Encoder(int protocolVersion)
+ {
+ return new Lz4FrameEncoder(LZ4Factory.fastestInstance(), false, COMPRESSION_BLOCK_SIZE, checksumForFrameEncoders(protocolVersion));
+ }
+
+ private static Checksum checksumForFrameEncoders(int protocolVersion)
+ {
+ if (protocolVersion >= MessagingService.current_version)
+ return ChecksumType.CRC32.newInstance();
+ return XXHashFactory.fastestInstance().newStreamingHash32(LZ4_HASH_SEED).asChecksum();
+ }
+
+ static Lz4FrameDecoder createLz4Decoder(int protocolVersion)
+ {
+ return new Lz4FrameDecoder(LZ4Factory.fastestInstance(), checksumForFrameEncoders(protocolVersion));
+ }
+
+ public static EventExecutor executorForChannelGroups()
+ {
+ return new DefaultEventExecutor();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
new file mode 100644
index 0000000..24dc5ff
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+/**
+ * Identifies an outbound messaging connection.
+ *
+ * This mainly hold the remote address and the type (small/large messages or gossip) of connection used, but with the
+ * additional detail that in some case (typically public EC2 address across regions) the address to which we connect
+ * to the remote is different from the address by which the node is known by the rest of the C*.
+ */
+public class OutboundConnectionIdentifier
+{
+ enum ConnectionType
+ {
+ GOSSIP, LARGE_MESSAGE, SMALL_MESSAGE
+ }
+
+ /**
+ * Memoization of the local node's broadcast address.
+ */
+ private final InetSocketAddress localAddr;
+
+ /**
+ * The address by which the remote is identified. This may be different from {@link #remoteConnectionAddr} for
+ * something like EC2 public IP address which need to be used for communication between EC2 regions.
+ */
+ private final InetSocketAddress remoteAddr;
+
+ /**
+ * The address to which we're connecting to the node (often the same as {@link #remoteAddr} but not always).
+ */
+ private final InetSocketAddress remoteConnectionAddr;
+
+ private final ConnectionType connectionType;
+
+ private OutboundConnectionIdentifier(InetSocketAddress localAddr,
+ InetSocketAddress remoteAddr,
+ InetSocketAddress remoteConnectionAddr,
+ ConnectionType connectionType)
+ {
+ this.localAddr = localAddr;
+ this.remoteAddr = remoteAddr;
+ this.remoteConnectionAddr = remoteConnectionAddr;
+ this.connectionType = connectionType;
+ }
+
+ private OutboundConnectionIdentifier(InetSocketAddress localAddr,
+ InetSocketAddress remoteAddr,
+ ConnectionType connectionType)
+ {
+ this(localAddr, remoteAddr, remoteAddr, connectionType);
+ }
+
+ /**
+ * Creates an identifier for a small message connection and using the remote "identifying" address as its connection
+ * address.
+ */
+ public static OutboundConnectionIdentifier small(InetSocketAddress localAddr, InetSocketAddress remoteAddr)
+ {
+ return new OutboundConnectionIdentifier(localAddr, remoteAddr, ConnectionType.SMALL_MESSAGE);
+ }
+
+ /**
+ * Creates an identifier for a large message connection and using the remote "identifying" address as its connection
+ * address.
+ */
+ public static OutboundConnectionIdentifier large(InetSocketAddress localAddr, InetSocketAddress remoteAddr)
+ {
+ return new OutboundConnectionIdentifier(localAddr, remoteAddr, ConnectionType.LARGE_MESSAGE);
+ }
+
+ /**
+ * Creates an identifier for a gossip connection and using the remote "identifying" address as its connection
+ * address.
+ */
+ public static OutboundConnectionIdentifier gossip(InetSocketAddress localAddr, InetSocketAddress remoteAddr)
+ {
+ return new OutboundConnectionIdentifier(localAddr, remoteAddr, ConnectionType.GOSSIP);
+ }
+
+ /**
+ * Returns a newly created connection identifier to the same remote that this identifier, but using the provided
+ * address as connection address.
+ *
+ * @param remoteConnectionAddr the address to use for connection to the remote in the new identifier.
+ * @return a newly created connection identifier that differs from this one only by using {@code remoteConnectionAddr}
+ * as connection address to the remote.
+ */
+ OutboundConnectionIdentifier withNewConnectionAddress(InetSocketAddress remoteConnectionAddr)
+ {
+ return new OutboundConnectionIdentifier(localAddr, remoteAddr, remoteConnectionAddr, connectionType);
+ }
+
+ /**
+ * The local node address.
+ */
+ InetAddress local()
+ {
+ return localAddr.getAddress();
+ }
+
+ /**
+ * The remote node identifying address (the one to use for anything else than connecting to the node).
+ */
+ InetSocketAddress remoteAddress()
+ {
+ return remoteAddr;
+ }
+
+ /**
+ * The remote node identifying address (the one to use for anything else than connecting to the node).
+ */
+ InetAddress remote()
+ {
+ return remoteAddr.getAddress();
+ }
+
+ /**
+ * The remote node connection address (the one to use to actually connect to the remote, and only that).
+ */
+ InetSocketAddress connectionAddress()
+ {
+ return remoteConnectionAddr;
+ }
+
+ /**
+ * The type of this connection.
+ */
+ ConnectionType type()
+ {
+ return connectionType;
+ }
+
+ @Override
+ public String toString()
+ {
+ return remoteAddr.equals(remoteConnectionAddr)
+ ? String.format("%s (%s)", remoteAddr, connectionType)
+ : String.format("%s on %s (%s)", remoteAddr, remoteConnectionAddr, connectionType);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/OutboundConnectionParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OutboundConnectionParams.java b/src/java/org/apache/cassandra/net/async/OutboundConnectionParams.java
new file mode 100644
index 0000000..282480e
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/OutboundConnectionParams.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import com.google.common.base.Preconditions;
+
+import io.netty.channel.WriteBufferWaterMark;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
+import org.apache.cassandra.net.async.OutboundHandshakeHandler.HandshakeResult;
+import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy;
+
+/**
+ * A collection of data points to be passed around for outbound connections.
+ */
+public class OutboundConnectionParams
+{
+ public static final int DEFAULT_SEND_BUFFER_SIZE = 1 << 16;
+
+ final OutboundConnectionIdentifier connectionId;
+ final Consumer<HandshakeResult> callback;
+ final ServerEncryptionOptions encryptionOptions;
+ final NettyFactory.Mode mode;
+ final boolean compress;
+ final Optional<CoalescingStrategy> coalescingStrategy;
+ final int sendBufferSize;
+ final boolean tcpNoDelay;
+ final Supplier<QueuedMessage> backlogSupplier;
+ final Consumer<MessageResult> messageResultConsumer;
+ final WriteBufferWaterMark waterMark;
+ final int protocolVersion;
+
+ private OutboundConnectionParams(OutboundConnectionIdentifier connectionId,
+ Consumer<HandshakeResult> callback,
+ ServerEncryptionOptions encryptionOptions,
+ NettyFactory.Mode mode,
+ boolean compress,
+ Optional<CoalescingStrategy> coalescingStrategy,
+ int sendBufferSize,
+ boolean tcpNoDelay,
+ Supplier<QueuedMessage> backlogSupplier,
+ Consumer<MessageResult> messageResultConsumer,
+ WriteBufferWaterMark waterMark,
+ int protocolVersion)
+ {
+ this.connectionId = connectionId;
+ this.callback = callback;
+ this.encryptionOptions = encryptionOptions;
+ this.mode = mode;
+ this.compress = compress;
+ this.coalescingStrategy = coalescingStrategy;
+ this.sendBufferSize = sendBufferSize;
+ this.tcpNoDelay = tcpNoDelay;
+ this.backlogSupplier = backlogSupplier;
+ this.messageResultConsumer = messageResultConsumer;
+ this.waterMark = waterMark;
+ this.protocolVersion = protocolVersion;
+ }
+
+ public static Builder builder()
+ {
+ return new Builder();
+ }
+
+ public static Builder builder(OutboundConnectionParams params)
+ {
+ return new Builder(params);
+ }
+
+ public static class Builder
+ {
+ private OutboundConnectionIdentifier connectionId;
+ private Consumer<HandshakeResult> callback;
+ private ServerEncryptionOptions encryptionOptions;
+ private NettyFactory.Mode mode;
+ private boolean compress;
+ private Optional<CoalescingStrategy> coalescingStrategy = Optional.empty();
+ private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE;
+ private boolean tcpNoDelay;
+ private Supplier<QueuedMessage> backlogSupplier;
+ private Consumer<MessageResult> messageResultConsumer;
+ private WriteBufferWaterMark waterMark = WriteBufferWaterMark.DEFAULT;
+ int protocolVersion;
+
+ private Builder()
+ { }
+
+ private Builder(OutboundConnectionParams params)
+ {
+ this.connectionId = params.connectionId;
+ this.callback = params.callback;
+ this.encryptionOptions = params.encryptionOptions;
+ this.mode = params.mode;
+ this.compress = params.compress;
+ this.coalescingStrategy = params.coalescingStrategy;
+ this.sendBufferSize = params.sendBufferSize;
+ this.tcpNoDelay = params.tcpNoDelay;
+ this.backlogSupplier = params.backlogSupplier;
+ this.messageResultConsumer = params.messageResultConsumer;
+ }
+
+ public Builder connectionId(OutboundConnectionIdentifier connectionId)
+ {
+ this.connectionId = connectionId;
+ return this;
+ }
+
+ public Builder callback(Consumer<HandshakeResult> callback)
+ {
+ this.callback = callback;
+ return this;
+ }
+
+ public Builder encryptionOptions(ServerEncryptionOptions encryptionOptions)
+ {
+ this.encryptionOptions = encryptionOptions;
+ return this;
+ }
+
+ public Builder mode(NettyFactory.Mode mode)
+ {
+ this.mode = mode;
+ return this;
+ }
+
+ public Builder compress(boolean compress)
+ {
+ this.compress = compress;
+ return this;
+ }
+
+ public Builder coalescingStrategy(Optional<CoalescingStrategy> coalescingStrategy)
+ {
+ this.coalescingStrategy = coalescingStrategy;
+ return this;
+ }
+
+ public Builder sendBufferSize(int sendBufferSize)
+ {
+ this.sendBufferSize = sendBufferSize;
+ return this;
+ }
+
+ public Builder tcpNoDelay(boolean tcpNoDelay)
+ {
+ this.tcpNoDelay = tcpNoDelay;
+ return this;
+ }
+
+ public Builder backlogSupplier(Supplier<QueuedMessage> backlogSupplier)
+ {
+ this.backlogSupplier = backlogSupplier;
+ return this;
+ }
+
+ public Builder messageResultConsumer(Consumer<MessageResult> messageResultConsumer)
+ {
+ this.messageResultConsumer = messageResultConsumer;
+ return this;
+ }
+
+ public Builder waterMark(WriteBufferWaterMark waterMark)
+ {
+ this.waterMark = waterMark;
+ return this;
+ }
+
+ public Builder protocolVersion(int protocolVersion)
+ {
+ this.protocolVersion = protocolVersion;
+ return this;
+ }
+
+ public OutboundConnectionParams build()
+ {
+ Preconditions.checkArgument(protocolVersion > 0, "illegal protocol version: " + protocolVersion);
+ Preconditions.checkArgument(sendBufferSize > 0 && sendBufferSize < 1 << 20, "illegal send buffer size: " + sendBufferSize);
+
+ return new OutboundConnectionParams(connectionId, callback, encryptionOptions, mode, compress, coalescingStrategy, sendBufferSize,
+ tcpNoDelay, backlogSupplier, messageResultConsumer, waterMark, protocolVersion);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java b/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java
new file mode 100644
index 0000000..703549a
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandler;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.Future;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.async.HandshakeProtocol.FirstHandshakeMessage;
+import org.apache.cassandra.net.async.HandshakeProtocol.SecondHandshakeMessage;
+import org.apache.cassandra.net.async.HandshakeProtocol.ThirdHandshakeMessage;
+
+import static org.apache.cassandra.config.Config.PROPERTY_PREFIX;
+
+/**
+ * A {@link ChannelHandler} to execute the send-side of the internode communication handshake protocol.
+ * As soon as the handler is added to the channel via {@link #channelActive(ChannelHandlerContext)}
+ * (which is only invoked if the underlying TCP connection was properly established), the {@link FirstHandshakeMessage}
+ * of the internode messaging protocol is automatically sent out. See {@link HandshakeProtocol} for full details
+ * about the internode messaging hndshake protocol.
+ * <p>
+ * Upon completion of the handshake (on success or fail), the {@link #callback} is invoked to let the listener
+ * know the result of the handshake. See {@link HandshakeResult} for details about the different result states.
+ * <p>
+ * This class extends {@link ByteToMessageDecoder}, which is a {@link ChannelInboundHandler}, because this handler
+ * waits for the peer's handshake response (the {@link SecondHandshakeMessage} of the internode messaging handshake protocol).
+ */
+public class OutboundHandshakeHandler extends ByteToMessageDecoder
+{
+ private static final Logger logger = LoggerFactory.getLogger(OutboundHandshakeHandler.class);
+
+ /**
+ * The number of milliseconds to wait before closing a channel if there has been no progress (when there is
+ * data to be sent).See {@link IdleStateHandler} and {@link MessageOutHandler#userEventTriggered(ChannelHandlerContext, Object)}.
+ */
+ private static final long DEFAULT_WRITE_IDLE_MS = TimeUnit.SECONDS.toMillis(10);
+ private static final String WRITE_IDLE_PROPERTY = PROPERTY_PREFIX + "outbound_write_idle_ms";
+ private static final long WRITE_IDLE_MS = Long.getLong(WRITE_IDLE_PROPERTY, DEFAULT_WRITE_IDLE_MS);
+
+ private final OutboundConnectionIdentifier connectionId;
+
+ /**
+ * The expected messaging service version to use.
+ */
+ private final int messagingVersion;
+
+ /**
+ * A function to invoke upon completion of the attempt, success or failure, to connect to the peer.
+ */
+ private final Consumer<HandshakeResult> callback;
+ private final NettyFactory.Mode mode;
+ private final OutboundConnectionParams params;
+
+ OutboundHandshakeHandler(OutboundConnectionParams params)
+ {
+ this.params = params;
+ this.connectionId = params.connectionId;
+ this.messagingVersion = params.protocolVersion;
+ this.callback = params.callback;
+ this.mode = params.mode;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Invoked when the channel is made active, and sends out the {@link FirstHandshakeMessage}
+ */
+ @Override
+ public void channelActive(final ChannelHandlerContext ctx) throws Exception
+ {
+ FirstHandshakeMessage msg = new FirstHandshakeMessage(messagingVersion, mode, params.compress);
+ logger.trace("starting handshake with peer {}, msg = {}", connectionId.connectionAddress(), msg);
+ ctx.writeAndFlush(msg.encode(ctx.alloc())).addListener(future -> firstHandshakeMessageListener(future, ctx));
+ ctx.fireChannelActive();
+ }
+
+ /**
+ * A simple listener to make sure we could send the {@link FirstHandshakeMessage} to the socket,
+ * and fail the handshake attempt if we could not (for example, maybe we could create the TCP socket, but then
+ * the connection gets closed for some reason).
+ */
+ void firstHandshakeMessageListener(Future<? super Void> future, ChannelHandlerContext ctx)
+ {
+ if (future.isSuccess())
+ return;
+
+ ChannelFuture channelFuture = (ChannelFuture)future;
+ exceptionCaught(ctx, channelFuture.cause());
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Invoked when we get the response back from the peer, which should contain the second message of the internode messaging handshake.
+ * <p>
+ * If the peer's protocol version does not equal what we were expecting, immediately close the channel (and socket);
+ * do *not* send out the third message of the internode messaging handshake.
+ * We will reconnect on the appropriate protocol version.
+ */
+ @Override
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception
+ {
+ SecondHandshakeMessage msg = SecondHandshakeMessage.maybeDecode(in);
+ if (msg == null)
+ return;
+
+ logger.trace("received second handshake message from peer {}, msg = {}", connectionId.connectionAddress(), msg);
+ final int peerMessagingVersion = msg.messagingVersion;
+
+ // we expected a higher protocol version, but it was actually lower
+ if (messagingVersion > peerMessagingVersion)
+ {
+ logger.trace("peer's max version is {}; will reconnect with that version", peerMessagingVersion);
+ try
+ {
+ if (DatabaseDescriptor.getSeeds().contains(connectionId.remote()))
+ logger.warn("Seed gossip version is {}; will not connect with that version", peerMessagingVersion);
+ }
+ catch (Throwable e)
+ {
+ // If invalid yaml has been added to the config since startup, getSeeds() will throw an AssertionError
+ // Additionally, third party seed providers may throw exceptions if network is flakey.
+ // Regardless of what's thrown, we must catch it, disconnect, and try again
+ logger.warn("failed to reread yaml (on trying to connect to a seed): {}", e.getLocalizedMessage());
+ }
+ ctx.close();
+ callback.accept(HandshakeResult.disconnect(peerMessagingVersion));
+ return;
+ }
+ // we anticipate a version that is lower than what peer is actually running
+ else if (messagingVersion < peerMessagingVersion && messagingVersion < MessagingService.current_version)
+ {
+ logger.trace("peer has a higher max version than expected {} (previous value {})", peerMessagingVersion, messagingVersion);
+ ctx.close();
+ callback.accept(HandshakeResult.disconnect(peerMessagingVersion));
+ return;
+ }
+
+ try
+ {
+ ctx.writeAndFlush(new ThirdHandshakeMessage(MessagingService.current_version, connectionId.local()).encode(ctx.alloc()));
+ ChannelWriter channelWriter = setupPipeline(ctx.channel(), peerMessagingVersion);
+ callback.accept(HandshakeResult.success(channelWriter, peerMessagingVersion));
+ }
+ catch (Exception e)
+ {
+ logger.info("failed to finalize internode messaging handshake", e);
+ ctx.close();
+ callback.accept(HandshakeResult.failed());
+ }
+ }
+
+ @VisibleForTesting
+ ChannelWriter setupPipeline(Channel channel, int messagingVersion)
+ {
+ ChannelPipeline pipeline = channel.pipeline();
+ pipeline.addLast("idleWriteHandler", new IdleStateHandler(true, 0, WRITE_IDLE_MS, 0, TimeUnit.MILLISECONDS));
+ if (params.compress)
+ pipeline.addLast(NettyFactory.OUTBOUND_COMPRESSOR_HANDLER_NAME, NettyFactory.createLz4Encoder(messagingVersion));
+
+ ChannelWriter channelWriter = ChannelWriter.create(channel, params.messageResultConsumer, params.coalescingStrategy);
+ pipeline.addLast("messageOutHandler", new MessageOutHandler(connectionId, messagingVersion, channelWriter, params.backlogSupplier));
+ pipeline.remove(this);
+ return channelWriter;
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+ {
+ logger.error("Failed to properly handshake with peer {}. Closing the channel.", connectionId, cause);
+ ctx.close();
+ callback.accept(HandshakeResult.failed());
+ }
+
+ /**
+ * The result of the handshake. Handshake has 3 possible outcomes:
+ * 1) it can be successful, in which case the channel and version to used is returned in this result.
+ * 2) we may decide to disconnect to reconnect with another protocol version (namely, the version is passed in this result).
+ * 3) we can have a negotiation failure for an unknown reason. (#sadtrombone)
+ */
+ public static class HandshakeResult
+ {
+ static final int UNKNOWN_PROTOCOL_VERSION = -1;
+
+ /**
+ * Describes the result of receiving the response back from the peer (Message 2 of the handshake)
+ * and implies an action that should be taken.
+ */
+ enum Outcome
+ {
+ SUCCESS, DISCONNECT, NEGOTIATION_FAILURE
+ }
+
+ /** The channel for the connection, only set for successful handshake. */
+ final ChannelWriter channelWriter;
+ /** The version negotiated with the peer. Set unless this is a {@link Outcome#NEGOTIATION_FAILURE}. */
+ final int negotiatedMessagingVersion;
+ /** The handshake {@link Outcome}. */
+ final Outcome outcome;
+
+ private HandshakeResult(ChannelWriter channelWriter, int negotiatedMessagingVersion, Outcome outcome)
+ {
+ this.channelWriter = channelWriter;
+ this.negotiatedMessagingVersion = negotiatedMessagingVersion;
+ this.outcome = outcome;
+ }
+
+ static HandshakeResult success(ChannelWriter channel, int negotiatedMessagingVersion)
+ {
+ return new HandshakeResult(channel, negotiatedMessagingVersion, Outcome.SUCCESS);
+ }
+
+ static HandshakeResult disconnect(int negotiatedMessagingVersion)
+ {
+ return new HandshakeResult(null, negotiatedMessagingVersion, Outcome.DISCONNECT);
+ }
+
+ static HandshakeResult failed()
+ {
+ return new HandshakeResult(null, UNKNOWN_PROTOCOL_VERSION, Outcome.NEGOTIATION_FAILURE);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java b/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
new file mode 100644
index 0000000..6bda9cd
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
@@ -0,0 +1,716 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.Future;
+import org.apache.cassandra.auth.IInternodeAuthenticator;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.async.NettyFactory.Mode;
+import org.apache.cassandra.net.async.OutboundHandshakeHandler.HandshakeResult;
+import org.apache.cassandra.utils.CoalescingStrategies;
+import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.NoSpamLogger;
+
+/**
+ * Represents one connection to a peer, and handles the state transistions on the connection and the netty {@link Channel}
+ * The underlying socket is not opened until explicitly requested (by sending a message).
+ *
+ * The basic setup for the channel is like this: a message is requested to be sent via {@link #sendMessage(MessageOut, int)}.
+ * If the channel is not established, then we need to create it (obviously). To prevent multiple threads from creating
+ * independent connections, they attempt to update the {@link #state}; one thread will win the race and create the connection.
+ * Upon sucessfully setting up the connection/channel, the {@link #state} will be updated again (to {@link State#READY},
+ * which indicates to other threads that the channel is ready for business and can be written to.
+ *
+ */
+public class OutboundMessagingConnection
+{
+ static final Logger logger = LoggerFactory.getLogger(OutboundMessagingConnection.class);
+ private static final NoSpamLogger errorLogger = NoSpamLogger.getLogger(logger, 10, TimeUnit.SECONDS);
+
+ private static final String INTRADC_TCP_NODELAY_PROPERTY = Config.PROPERTY_PREFIX + "otc_intradc_tcp_nodelay";
+
+ /**
+ * Enabled/disable TCP_NODELAY for intradc connections. Defaults to enabled.
+ */
+ private static final boolean INTRADC_TCP_NODELAY = Boolean.parseBoolean(System.getProperty(INTRADC_TCP_NODELAY_PROPERTY, "true"));
+
+ /**
+ * Number of milliseconds between connection createRetry attempts.
+ */
+ private static final int OPEN_RETRY_DELAY_MS = 100;
+
+ /**
+ * A minimum number of milliseconds to wait for a connection (TCP socket connect + handshake)
+ */
+ private static final int MINIMUM_CONNECT_TIMEOUT_MS = 2000;
+ private final IInternodeAuthenticator authenticator;
+
+ /**
+ * Describes this instance's ability to send messages into it's Netty {@link Channel}.
+ */
+ enum State
+ {
+ /** waiting to create the connection */
+ NOT_READY,
+ /** we've started to create the connection/channel */
+ CREATING_CHANNEL,
+ /** channel is established and we can send messages */
+ READY,
+ /** a dead state which should not be transitioned away from */
+ CLOSED
+ }
+
+ /**
+ * Backlog to hold messages passed by upstream threads while the Netty {@link Channel} is being set up or recreated.
+ */
+ private final Queue<QueuedMessage> backlog;
+
+ /**
+ * Reference to a {@link ScheduledExecutorService} rther than directly depending on something like {@link ScheduledExecutors}.
+ */
+ private final ScheduledExecutorService scheduledExecutor;
+
+ final AtomicLong droppedMessageCount;
+ final AtomicLong completedMessageCount;
+
+ private volatile OutboundConnectionIdentifier connectionId;
+
+ private final ServerEncryptionOptions encryptionOptions;
+
+ /**
+ * A future for retrying connections. Bear in mind that this future does not execute in the
+ * netty event event loop, so there's some races to be careful of.
+ */
+ private volatile ScheduledFuture<?> connectionRetryFuture;
+
+ /**
+ * A future for notifying when the timeout for creating the connection and negotiating the handshake has elapsed.
+ * It will be cancelled when the channel is established correctly. Bear in mind that this future does not execute in the
+ * netty event event loop, so there's some races to be careful of.
+ */
+ private volatile ScheduledFuture<?> connectionTimeoutFuture;
+
+ private final AtomicReference<State> state;
+
+ private final Optional<CoalescingStrategy> coalescingStrategy;
+
+ /**
+ * A running count of the number of times we've tried to create a connection.
+ */
+ private volatile int connectAttemptCount;
+
+ /**
+ * The netty channel, once a socket connection is established; it won't be in it's normal working state until the handshake is complete.
+ */
+ private volatile ChannelWriter channelWriter;
+
+ /**
+ * the target protocol version to communicate to the peer with, discovered/negotiated via handshaking
+ */
+ private int targetVersion;
+
+ OutboundMessagingConnection(OutboundConnectionIdentifier connectionId,
+ ServerEncryptionOptions encryptionOptions,
+ Optional<CoalescingStrategy> coalescingStrategy,
+ IInternodeAuthenticator authenticator)
+ {
+ this(connectionId, encryptionOptions, coalescingStrategy, authenticator, ScheduledExecutors.scheduledFastTasks);
+ }
+
+ @VisibleForTesting
+ OutboundMessagingConnection(OutboundConnectionIdentifier connectionId,
+ ServerEncryptionOptions encryptionOptions,
+ Optional<CoalescingStrategy> coalescingStrategy,
+ IInternodeAuthenticator authenticator,
+ ScheduledExecutorService sceduledExecutor)
+ {
+ this.connectionId = connectionId;
+ this.encryptionOptions = encryptionOptions;
+ this.authenticator = authenticator;
+ backlog = new ConcurrentLinkedQueue<>();
+ droppedMessageCount = new AtomicLong(0);
+ completedMessageCount = new AtomicLong(0);
+ state = new AtomicReference<>(State.NOT_READY);
+ this.scheduledExecutor = sceduledExecutor;
+ this.coalescingStrategy = coalescingStrategy;
+
+ // We want to use the most precise protocol version we know because while there is version detection on connect(),
+ // the target version might be accessed by the pool (in getConnection()) before we actually connect (as we
+ // only connect when the first message is submitted). Note however that the only case where we'll connect
+ // without knowing the true version of a node is if that node is a seed (otherwise, we can't know a node
+ // unless it has been gossiped to us or it has connected to us, and in both cases that will set the version).
+ // In that case we won't rely on that targetVersion before we're actually connected and so the version
+ // detection in connect() will do its job.
+ targetVersion = MessagingService.instance().getVersion(connectionId.remote());
+ }
+
+ /**
+ * If the connection is set up and ready to use (the normal case), simply send the message to it and return.
+ * Otherwise, one lucky thread is selected to create the Channel, while other threads just add the {@code msg} to
+ * the backlog queue.
+ *
+ * @return true if the message was accepted by the {@link #channelWriter}; else false if it was not accepted
+ * and added to the backlog or the channel is {@link State#CLOSED}. See documentation in {@link ChannelWriter} and
+ * {@link MessageOutHandler} how the backlogged messages get consumed.
+ */
+ boolean sendMessage(MessageOut msg, int id)
+ {
+ return sendMessage(new QueuedMessage(msg, id));
+ }
+
+ boolean sendMessage(QueuedMessage queuedMessage)
+ {
+ State state = this.state.get();
+ if (state == State.READY)
+ {
+ if (channelWriter.write(queuedMessage, false))
+ return true;
+
+ backlog.add(queuedMessage);
+ return false;
+ }
+ else if (state == State.CLOSED)
+ {
+ errorLogger.warn("trying to write message to a closed connection");
+ return false;
+ }
+ else
+ {
+ backlog.add(queuedMessage);
+ connect();
+ return true;
+ }
+ }
+
+ /**
+ * Initiate all the actions required to establish a working, valid connection. This includes
+ * opening the socket, negotiating the internode messaging handshake, and setting up the working
+ * Netty {@link Channel}. However, this method will not block for all those actions: it will only
+ * kick off the connection attempt as everything is asynchronous.
+ * <p>
+ * Threads compete to update the {@link #state} field to {@link State#CREATING_CHANNEL} to ensure only one
+ * connection is attempted at a time.
+ *
+ * @return true if kicking off the connection attempt was started by this thread; else, false.
+ */
+ public boolean connect()
+ {
+ // try to be the winning thread to create the channel
+ if (!state.compareAndSet(State.NOT_READY, State.CREATING_CHANNEL))
+ return false;
+
+ // clean up any lingering connection attempts
+ if (connectionTimeoutFuture != null)
+ {
+ connectionTimeoutFuture.cancel(false);
+ connectionTimeoutFuture = null;
+ }
+
+ return tryConnect();
+ }
+
+ private boolean tryConnect()
+ {
+ if (state.get() != State.CREATING_CHANNEL)
+ return false;
+
+ logger.debug("connection attempt {} to {}", connectAttemptCount, connectionId);
+
+
+ InetSocketAddress remote = connectionId.remoteAddress();
+ if (!authenticator.authenticate(remote.getAddress(), remote.getPort()))
+ {
+ logger.warn("Internode auth failed connecting to {}", connectionId);
+ //Remove the connection pool and other thread so messages aren't queued
+ MessagingService.instance().destroyConnectionPool(remote.getAddress());
+
+ // don't update the state field as destroyConnectionPool() *should* call OMC.close()
+ // on all the connections in the OMP for the remoteAddress
+ return false;
+ }
+
+ boolean compress = shouldCompressConnection(connectionId.local(), connectionId.remote());
+ Bootstrap bootstrap = buildBootstrap(compress);
+
+ ChannelFuture connectFuture = bootstrap.connect();
+ connectFuture.addListener(this::connectCallback);
+
+ long timeout = Math.max(MINIMUM_CONNECT_TIMEOUT_MS, DatabaseDescriptor.getRpcTimeout());
+ if (connectionTimeoutFuture == null || connectionTimeoutFuture.isDone())
+ connectionTimeoutFuture = scheduledExecutor.schedule(() -> connectionTimeout(connectFuture), timeout, TimeUnit.MILLISECONDS);
+ return true;
+ }
+
+ @VisibleForTesting
+ static boolean shouldCompressConnection(InetAddress localHost, InetAddress remoteHost)
+ {
+ return (DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.all)
+ || ((DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.dc) && !isLocalDC(localHost, remoteHost));
+ }
+
+ private Bootstrap buildBootstrap(boolean compress)
+ {
+ boolean tcpNoDelay = isLocalDC(connectionId.local(), connectionId.remote()) ? INTRADC_TCP_NODELAY : DatabaseDescriptor.getInterDCTcpNoDelay();
+ int sendBufferSize = DatabaseDescriptor.getInternodeSendBufferSize() > 0
+ ? DatabaseDescriptor.getInternodeSendBufferSize()
+ : OutboundConnectionParams.DEFAULT_SEND_BUFFER_SIZE;
+ OutboundConnectionParams params = OutboundConnectionParams.builder()
+ .connectionId(connectionId)
+ .callback(this::finishHandshake)
+ .encryptionOptions(encryptionOptions)
+ .mode(Mode.MESSAGING)
+ .compress(compress)
+ .coalescingStrategy(coalescingStrategy)
+ .sendBufferSize(sendBufferSize)
+ .tcpNoDelay(tcpNoDelay)
+ .backlogSupplier(() -> nextBackloggedMessage())
+ .messageResultConsumer(this::handleMessageResult)
+ .protocolVersion(targetVersion)
+ .build();
+
+ return NettyFactory.instance.createOutboundBootstrap(params);
+ }
+
+ private QueuedMessage nextBackloggedMessage()
+ {
+ QueuedMessage msg = backlog.poll();
+ if (msg == null)
+ return null;
+
+ if (!msg.isTimedOut())
+ return msg;
+
+ if (msg.shouldRetry())
+ return msg.createRetry();
+
+ droppedMessageCount.incrementAndGet();
+ return null;
+ }
+
+ static boolean isLocalDC(InetAddress localHost, InetAddress remoteHost)
+ {
+ String remoteDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(remoteHost);
+ String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(localHost);
+ return remoteDC != null && remoteDC.equals(localDC);
+ }
+
+ /**
+ * Handles the callback of the TCP connection attempt (not including the handshake negotiation!), and really all
+ * we're handling here is the TCP connection failures. On failure, we close the channel (which should disconnect
+ * the socket, if connected). If there was an {@link IOException} while trying to connect, the connection will be
+ * retried after a short delay.
+ * <p>
+ * This method does not alter the {@link #state} as it's only evaluating the TCP connect, not TCP connect and handshake.
+ * Thus, {@link #finishHandshake(HandshakeResult)} will handle any necessary state updates.
+ * <p>
+ * Note: this method is called from the event loop, so be careful wrt thread visibility
+ *
+ * @return true iff the TCP connection was established and the {@link #state} is not {@link State#CLOSED}; else false.
+ */
+ @VisibleForTesting
+ boolean connectCallback(Future<? super Void> future)
+ {
+ ChannelFuture channelFuture = (ChannelFuture)future;
+
+ // make sure this instance is not (terminally) closed
+ if (state.get() == State.CLOSED)
+ {
+ channelFuture.channel().close();
+ return false;
+ }
+
+ // this is the success state
+ final Throwable cause = future.cause();
+ if (cause == null)
+ {
+ connectAttemptCount = 0;
+ return true;
+ }
+
+ setStateIfNotClosed(state, State.NOT_READY);
+ if (cause instanceof IOException)
+ {
+ logger.trace("unable to connect on attempt {} to {}", connectAttemptCount, connectionId, cause);
+ connectAttemptCount++;
+ connectionRetryFuture = scheduledExecutor.schedule(this::connect, OPEN_RETRY_DELAY_MS * connectAttemptCount, TimeUnit.MILLISECONDS);
+ }
+ else
+ {
+ JVMStabilityInspector.inspectThrowable(cause);
+ logger.error("non-IO error attempting to connect to {}", connectionId, cause);
+ }
+ return false;
+ }
+
+ /**
+ * A callback for handling timeouts when creating a connection/negotiating the handshake.
+ * <p>
+ * Note: this method is *not* invoked from the netty event loop,
+ * so there's an inherent race with {@link #finishHandshake(HandshakeResult)},
+ * as well as any possible connect() reattempts (a seemingly remote race condition, however).
+ * Therefore, this function tries to lose any races, as much as possible.
+ *
+ * @return true if there was a timeout on the connect/handshake; else false.
+ */
+ boolean connectionTimeout(ChannelFuture channelFuture)
+ {
+ if (connectionRetryFuture != null)
+ {
+ connectionRetryFuture.cancel(false);
+ connectionRetryFuture = null;
+ }
+ connectAttemptCount = 0;
+ State initialState = state.get();
+ if (initialState == State.CLOSED)
+ return true;
+
+ if (initialState != State.READY)
+ {
+ logger.debug("timed out while trying to connect to {}", connectionId);
+
+ channelFuture.channel().close();
+ // a last-ditch attempt to let finishHandshake() win the race
+ if (state.compareAndSet(initialState, State.NOT_READY))
+ {
+ backlog.clear();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Process the results of the handshake negotiation.
+ * <p>
+ * Note: this method will be invoked from the netty event loop,
+ * so there's an inherent race with {@link #connectionTimeout(ChannelFuture)}.
+ */
+ void finishHandshake(HandshakeResult result)
+ {
+ // clean up the connector instances before changing the state
+ if (connectionTimeoutFuture != null)
+ {
+ connectionTimeoutFuture.cancel(false);
+ connectionTimeoutFuture = null;
+ }
+ if (connectionRetryFuture != null)
+ {
+ connectionRetryFuture.cancel(false);
+ connectionRetryFuture = null;
+ }
+ connectAttemptCount = 0;
+
+ if (result.negotiatedMessagingVersion != HandshakeResult.UNKNOWN_PROTOCOL_VERSION)
+ {
+ targetVersion = result.negotiatedMessagingVersion;
+ MessagingService.instance().setVersion(connectionId.remote(), targetVersion);
+ }
+
+ switch (result.outcome)
+ {
+ case SUCCESS:
+ assert result.channelWriter != null;
+ logger.debug("successfully connected to {}, conmpress = {}, coalescing = {}", connectionId,
+ shouldCompressConnection(connectionId.local(), connectionId.remote()),
+ coalescingStrategy.isPresent() ? coalescingStrategy.get() : CoalescingStrategies.Strategy.DISABLED);
+ if (state.get() == State.CLOSED)
+ {
+ result.channelWriter.close();
+ backlog.clear();
+ break;
+ }
+ channelWriter = result.channelWriter;
+ // drain the backlog to the channel
+ channelWriter.writeBacklog(backlog, true);
+ // change the state so newly incoming messages can be sent to the channel (without adding to the backlog)
+ setStateIfNotClosed(state, State.READY);
+ // ship out any stragglers that got added to the backlog
+ channelWriter.writeBacklog(backlog, true);
+ break;
+ case DISCONNECT:
+ reconnect();
+ break;
+ case NEGOTIATION_FAILURE:
+ setStateIfNotClosed(state, State.NOT_READY);
+ backlog.clear();
+ break;
+ default:
+ throw new IllegalArgumentException("unhandled result type: " + result.outcome);
+ }
+ }
+
+ @VisibleForTesting
+ static boolean setStateIfNotClosed(AtomicReference<State> state, State newState)
+ {
+ State s = state.get();
+ if (s == State.CLOSED)
+ return false;
+ state.set(newState);
+ return true;
+ }
+
+ int getTargetVersion()
+ {
+ return targetVersion;
+ }
+
+ /**
+ * Handles the result of each message sent.
+ *
+ * Note: this function is expected to be invoked on the netty event loop. Also, do not retain any state from
+ * the input {@code messageResult}.
+ */
+ void handleMessageResult(MessageResult messageResult)
+ {
+ completedMessageCount.incrementAndGet();
+
+ // checking the cause() is an optimized way to tell if the operation was successful (as the cause will be null)
+ // Note that ExpiredException is just a marker for timeout-ed message we're dropping, but as we already
+ // incremented the dropped message count in MessageOutHandler, we have nothing to do.
+ Throwable cause = messageResult.future.cause();
+ if (cause == null)
+ return;
+
+ if (cause instanceof ExpiredException)
+ {
+ droppedMessageCount.incrementAndGet();
+ return;
+ }
+
+ JVMStabilityInspector.inspectThrowable(cause);
+
+ if (cause instanceof IOException || cause.getCause() instanceof IOException)
+ {
+ ChannelWriter writer = messageResult.writer;
+ if (writer.shouldPurgeBacklog())
+ purgeBacklog();
+
+ // This writer needs to be closed and we need to trigger a reconnection. We really only want to do that
+ // once for this channel however (and again, no race because we're on the netty event loop).
+ if (!writer.isClosed() && messageResult.allowReconnect)
+ {
+ reconnect();
+ writer.close();
+ }
+
+ QueuedMessage msg = messageResult.msg;
+ if (msg != null && msg.shouldRetry())
+ {
+ sendMessage(msg.createRetry());
+ }
+ }
+ else if (messageResult.future.isCancelled())
+ {
+ // Someone cancelled the future, which we assume meant it doesn't want the message to be sent if it hasn't
+ // yet. Just ignore.
+ }
+ else
+ {
+ // Non IO exceptions are likely a programming error so let's not silence them
+ logger.error("Unexpected error writing on " + connectionId, cause);
+ }
+ }
+
+ /**
+ * Change the IP address on which we connect to the peer. We will attempt to connect to the new address if there
+ * was a previous connection, and new incoming messages as well as existing {@link #backlog} messages will be sent there.
+ * Any outstanding messages in the existing channel will still be sent to the previous address (we won't/can't move them from
+ * one channel to another).
+ */
+ void reconnectWithNewIp(InetSocketAddress newAddr)
+ {
+ State currentState = state.get();
+
+ // if we're closed, ignore the request
+ if (currentState == State.CLOSED)
+ return;
+
+ // capture a reference to the current channel, in case it gets swapped out before we can call close() on it
+ ChannelWriter currentChannel = channelWriter;
+ connectionId = connectionId.withNewConnectionAddress(newAddr);
+
+ if (currentState != State.NOT_READY)
+ reconnect();
+
+ // lastly, push through anything remaining in the existing channel.
+ if (currentChannel != null)
+ currentChannel.softClose();
+ }
+
+ /**
+ * Sets the state properly so {@link #connect()} can attempt to reconnect.
+ */
+ void reconnect()
+ {
+ if (setStateIfNotClosed(state, State.NOT_READY))
+ connect();
+ }
+
+ void purgeBacklog()
+ {
+ backlog.clear();
+ }
+
+ public void close(boolean softClose)
+ {
+ state.set(State.CLOSED);
+
+ if (connectionTimeoutFuture != null)
+ {
+ connectionTimeoutFuture.cancel(false);
+ connectionTimeoutFuture = null;
+ }
+
+ // drain the backlog
+ if (channelWriter != null)
+ {
+ if (softClose)
+ {
+ channelWriter.writeBacklog(backlog, false);
+ channelWriter.softClose();
+ }
+ else
+ {
+ backlog.clear();
+ channelWriter.close();
+ }
+
+ channelWriter = null;
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return connectionId.toString();
+ }
+
+ public Integer getPendingMessages()
+ {
+ int pending = backlog.size();
+ ChannelWriter chan = channelWriter;
+ if (chan != null)
+ pending += (int)chan.pendingMessageCount();
+ return pending;
+ }
+
+ public Long getCompletedMessages()
+ {
+ return completedMessageCount.get();
+ }
+
+ public Long getDroppedMessages()
+ {
+ return droppedMessageCount.get();
+ }
+
+ /*
+ methods specific to testing follow
+ */
+
+ @VisibleForTesting
+ int backlogSize()
+ {
+ return backlog.size();
+ }
+
+ @VisibleForTesting
+ void addToBacklog(QueuedMessage msg)
+ {
+ backlog.add(msg);
+ }
+
+ @VisibleForTesting
+ void setChannelWriter(ChannelWriter channelWriter)
+ {
+ this.channelWriter = channelWriter;
+ }
+
+ @VisibleForTesting
+ ChannelWriter getChannelWriter()
+ {
+ return channelWriter;
+ }
+
+ @VisibleForTesting
+ void setState(State state)
+ {
+ this.state.set(state);
+ }
+
+ @VisibleForTesting
+ State getState()
+ {
+ return state.get();
+ }
+
+ @VisibleForTesting
+ void setTargetVersion(int targetVersion)
+ {
+ this.targetVersion = targetVersion;
+ }
+
+ @VisibleForTesting
+ OutboundConnectionIdentifier getConnectionId()
+ {
+ return connectionId;
+ }
+
+ @VisibleForTesting
+ void setConnectionTimeoutFuture(ScheduledFuture<?> connectionTimeoutFuture)
+ {
+ this.connectionTimeoutFuture = connectionTimeoutFuture;
+ }
+
+ @VisibleForTesting
+ ScheduledFuture<?> getConnectionTimeoutFuture()
+ {
+ return connectionTimeoutFuture;
+ }
+
+ public boolean isConnected()
+ {
+ return state.get() == State.READY;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java b/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java
new file mode 100644
index 0000000..0086da8
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.net.InetSocketAddress;
+import java.util.Optional;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.auth.IInternodeAuthenticator;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
+import org.apache.cassandra.metrics.ConnectionMetrics;
+import org.apache.cassandra.net.BackPressureState;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType;
+import org.apache.cassandra.utils.CoalescingStrategies;
+import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy;
+
+/**
+ * Groups a set of outbound connections to a given peer, and routes outgoing messages to the appropriate connection
+ * (based upon message's type or size). Contains a {@link OutboundMessagingConnection} for each of the
+ * {@link ConnectionType} type.
+ */
+public class OutboundMessagingPool
+{
+ @VisibleForTesting
+ static final long LARGE_MESSAGE_THRESHOLD = Long.getLong(Config.PROPERTY_PREFIX + "otcp_large_message_threshold", 1024 * 64);
+
+ private final ConnectionMetrics metrics;
+ private final BackPressureState backPressureState;
+
+ public OutboundMessagingConnection gossipChannel;
+ public OutboundMessagingConnection largeMessageChannel;
+ public OutboundMessagingConnection smallMessageChannel;
+
+ /**
+ * An override address on which to communicate with the peer. Typically used for something like EC2 public IP addresses
+ * which need to be used for communication between EC2 regions.
+ */
+ private InetSocketAddress preferredRemoteAddr;
+
+ public OutboundMessagingPool(InetSocketAddress remoteAddr, InetSocketAddress localAddr, ServerEncryptionOptions encryptionOptions,
+ BackPressureState backPressureState, IInternodeAuthenticator authenticator)
+ {
+ preferredRemoteAddr = remoteAddr;
+ this.backPressureState = backPressureState;
+ metrics = new ConnectionMetrics(localAddr.getAddress(), this);
+
+
+ smallMessageChannel = new OutboundMessagingConnection(OutboundConnectionIdentifier.small(localAddr, preferredRemoteAddr),
+ encryptionOptions, coalescingStrategy(remoteAddr), authenticator);
+ largeMessageChannel = new OutboundMessagingConnection(OutboundConnectionIdentifier.large(localAddr, preferredRemoteAddr),
+ encryptionOptions, coalescingStrategy(remoteAddr), authenticator);
+
+ // don't attempt coalesce the gossip messages, just ship them out asap (let's not anger the FD on any peer node by any artificial delays)
+ gossipChannel = new OutboundMessagingConnection(OutboundConnectionIdentifier.gossip(localAddr, preferredRemoteAddr),
+ encryptionOptions, Optional.empty(), authenticator);
+ }
+
+ private static Optional<CoalescingStrategy> coalescingStrategy(InetSocketAddress remoteAddr)
+ {
+ String strategyName = DatabaseDescriptor.getOtcCoalescingStrategy();
+ String displayName = remoteAddr.getAddress().getHostAddress();
+ return CoalescingStrategies.newCoalescingStrategy(strategyName,
+ DatabaseDescriptor.getOtcCoalescingWindow(),
+ OutboundMessagingConnection.logger,
+ displayName);
+
+ }
+
+ public BackPressureState getBackPressureState()
+ {
+ return backPressureState;
+ }
+
+ public void sendMessage(MessageOut msg, int id)
+ {
+ getConnection(msg).sendMessage(msg, id);
+ }
+
+ @VisibleForTesting
+ public OutboundMessagingConnection getConnection(MessageOut msg)
+ {
+ // optimize for the common path (the small message channel)
+ if (Stage.GOSSIP != msg.getStage())
+ {
+ return msg.serializedSize(smallMessageChannel.getTargetVersion()) < LARGE_MESSAGE_THRESHOLD
+ ? smallMessageChannel
+ : largeMessageChannel;
+ }
+ return gossipChannel;
+ }
+
+ /**
+ * Reconnect to the peer using the given {@code addr}. Outstanding messages in each channel will be sent on the
+ * current channel. Typically this function is used for something like EC2 public IP addresses which need to be used
+ * for communication between EC2 regions.
+ *
+ * @param addr IP Address to use (and prefer) going forward for connecting to the peer
+ */
+ public void reconnectWithNewIp(InetSocketAddress addr)
+ {
+ preferredRemoteAddr = addr;
+ gossipChannel.reconnectWithNewIp(addr);
+ largeMessageChannel.reconnectWithNewIp(addr);
+ smallMessageChannel.reconnectWithNewIp(addr);
+ }
+
+ /**
+ * Close each netty channel and it's socket.
+ *
+ * @param softClose {@code true} if existing messages in the queue should be sent before closing.
+ */
+ public void close(boolean softClose)
+ {
+ gossipChannel.close(softClose);
+ largeMessageChannel.close(softClose);
+ smallMessageChannel.close(softClose);
+ }
+
+ /**
+ * For testing purposes only.
+ */
+ @VisibleForTesting
+ OutboundMessagingConnection getConnection(ConnectionType connectionType)
+ {
+ switch (connectionType)
+ {
+ case GOSSIP:
+ return gossipChannel;
+ case LARGE_MESSAGE:
+ return largeMessageChannel;
+ case SMALL_MESSAGE:
+ return smallMessageChannel;
+ default:
+ throw new IllegalArgumentException("unsupported connection type: " + connectionType);
+ }
+ }
+
+ public void incrementTimeout()
+ {
+ metrics.timeouts.mark();
+ }
+
+ public long getTimeouts()
+ {
+ return metrics.timeouts.getCount();
+ }
+
+ public InetSocketAddress getPreferredRemoteAddr()
+ {
+ return preferredRemoteAddr;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/QueuedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/QueuedMessage.java b/src/java/org/apache/cassandra/net/async/QueuedMessage.java
new file mode 100644
index 0000000..28e4ba4
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/QueuedMessage.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.CoalescingStrategies;
+
+/**
+ * A wrapper for outbound messages. All messages will be retried once.
+ */
+public class QueuedMessage implements CoalescingStrategies.Coalescable
+{
+ public final MessageOut<?> message;
+ public final int id;
+ public final long timestampNanos;
+ public final boolean droppable;
+ private final boolean retryable;
+
+ public QueuedMessage(MessageOut<?> message, int id)
+ {
+ this(message, id, System.nanoTime(), MessagingService.DROPPABLE_VERBS.contains(message.verb), true);
+ }
+
+ @VisibleForTesting
+ public QueuedMessage(MessageOut<?> message, int id, long timestampNanos, boolean droppable, boolean retryable)
+ {
+ this.message = message;
+ this.id = id;
+ this.timestampNanos = timestampNanos;
+ this.droppable = droppable;
+ this.retryable = retryable;
+ }
+
+ /** don't drop a non-droppable message just because it's timestamp is expired */
+ public boolean isTimedOut()
+ {
+ return droppable && timestampNanos < System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(message.getTimeout());
+ }
+
+ public boolean shouldRetry()
+ {
+ return retryable;
+ }
+
+ public QueuedMessage createRetry()
+ {
+ return new QueuedMessage(message, id, System.nanoTime(), droppable, false);
+ }
+
+ public long timestampNanos()
+ {
+ return timestampNanos;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org