You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2019/07/02 18:40:07 UTC
[cassandra] branch trunk updated: Update Netty dependencies to
latest, clean up SocketFactory
This is an automated email from the ASF dual-hosted git repository.
aleksey pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new a339aa9 Update Netty dependencies to latest, clean up SocketFactory
a339aa9 is described below
commit a339aa9e9811723e52896ec3c96395461cad0fd0
Author: Aleksey Yeshchenko <al...@apache.org>
AuthorDate: Tue Jul 2 10:10:19 2019 +0100
Update Netty dependencies to latest, clean up SocketFactory
patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith for
CASSANDRA-15195
---
CHANGES.txt | 1 +
build.xml | 3 +-
.../{netty-4.1.28.txt => netty-4.1.37.txt} | 0
...{netty-4.1.28.txt => netty-tcnative-2.0.25.txt} | 1 -
...4.1.28.Final.jar => netty-all-4.1.37.Final.jar} | Bin 3839841 -> 4024948 bytes
...etty-tcnative-boringssl-static-2.0.25.Final.jar | Bin 0 -> 3108312 bytes
.../cassandra/net/InboundConnectionInitiator.java | 2 +
.../cassandra/net/OutboundConnectionInitiator.java | 16 +-
.../org/apache/cassandra/net/SocketFactory.java | 230 +++++++++------------
.../org/apache/cassandra/security/SSLFactory.java | 14 +-
.../cassandra/service/NativeTransportService.java | 2 +-
11 files changed, 126 insertions(+), 143 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 2faca24..4d3a9a9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Update Netty dependencies to latest, clean up SocketFactory (CASSANDRA-15195)
* Native Transport - Apply noSpamLogger to ConnectionLimitHandler (CASSANDRA-15167)
* Reduce heap pressure during compactions (CASSANDRA-14654)
* Support building Cassandra with JDK 11 (CASSANDRA-15108)
diff --git a/build.xml b/build.xml
index bdf5ae2..acfc613 100644
--- a/build.xml
+++ b/build.xml
@@ -548,7 +548,8 @@
<dependency groupId="com.addthis.metrics" artifactId="reporter-config3" version="3.0.3" />
<dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" />
<dependency groupId="io.airlift" artifactId="airline" version="0.8" />
- <dependency groupId="io.netty" artifactId="netty-all" version="4.1.28.Final" />
+ <dependency groupId="io.netty" artifactId="netty-all" version="4.1.37.Final" />
+ <dependency groupId="io.netty" artifactId="netty-tcnative-boringssl-static" version="2.0.25.Final" />
<dependency groupId="net.openhft" artifactId="chronicle-queue" version="${chronicle-queue.version}"/>
<dependency groupId="net.openhft" artifactId="chronicle-core" version="${chronicle-core.version}"/>
<dependency groupId="net.openhft" artifactId="chronicle-bytes" version="${chronicle-bytes.version}"/>
diff --git a/lib/licenses/netty-4.1.28.txt b/lib/licenses/netty-4.1.37.txt
similarity index 100%
copy from lib/licenses/netty-4.1.28.txt
copy to lib/licenses/netty-4.1.37.txt
diff --git a/lib/licenses/netty-4.1.28.txt b/lib/licenses/netty-tcnative-2.0.25.txt
similarity index 99%
rename from lib/licenses/netty-4.1.28.txt
rename to lib/licenses/netty-tcnative-2.0.25.txt
index d645695..261eeb9 100644
--- a/lib/licenses/netty-4.1.28.txt
+++ b/lib/licenses/netty-tcnative-2.0.25.txt
@@ -1,4 +1,3 @@
-
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
diff --git a/lib/netty-all-4.1.28.Final.jar b/lib/netty-all-4.1.37.Final.jar
similarity index 56%
rename from lib/netty-all-4.1.28.Final.jar
rename to lib/netty-all-4.1.37.Final.jar
index 058662e..93cff04 100644
Binary files a/lib/netty-all-4.1.28.Final.jar and b/lib/netty-all-4.1.37.Final.jar differ
diff --git a/lib/netty-tcnative-boringssl-static-2.0.25.Final.jar b/lib/netty-tcnative-boringssl-static-2.0.25.Final.jar
new file mode 100644
index 0000000..954627f
Binary files /dev/null and b/lib/netty-tcnative-boringssl-static-2.0.25.Final.jar differ
diff --git a/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java b/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
index d26abfd..c390ba4 100644
--- a/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
+++ b/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
@@ -132,6 +132,8 @@ public class InboundConnectionInitiator
ServerBootstrap bootstrap = initializer.settings.socketFactory
.newServerBootstrap()
.option(ChannelOption.SO_BACKLOG, 1 << 9)
+ .option(ChannelOption.ALLOCATOR, GlobalBufferPoolAllocator.instance)
+ .option(ChannelOption.SO_REUSEADDR, true)
.childHandler(initializer);
int socketReceiveBufferSizeInBytes = initializer.settings.socketReceiveBufferSizeInBytes;
diff --git a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
index a63ccf9..fdfb2df 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
@@ -171,13 +171,15 @@ public class OutboundConnectionInitiator<SuccessType extends OutboundConnectionI
*/
private Bootstrap createBootstrap(EventLoop eventLoop)
{
- Bootstrap bootstrap = newBootstrap(eventLoop, settings.tcpUserTimeoutInMS)
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, settings.tcpConnectTimeoutInMS)
- .option(ChannelOption.SO_KEEPALIVE, true)
- .option(ChannelOption.SO_REUSEADDR, true)
- .option(ChannelOption.TCP_NODELAY, settings.tcpNoDelay)
- .option(ChannelOption.MESSAGE_SIZE_ESTIMATOR, NoSizeEstimator.instance)
- .handler(new Initializer());
+ Bootstrap bootstrap = settings.socketFactory
+ .newClientBootstrap(eventLoop, settings.tcpUserTimeoutInMS)
+ .option(ChannelOption.ALLOCATOR, GlobalBufferPoolAllocator.instance)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, settings.tcpConnectTimeoutInMS)
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .option(ChannelOption.SO_REUSEADDR, true)
+ .option(ChannelOption.TCP_NODELAY, settings.tcpNoDelay)
+ .option(ChannelOption.MESSAGE_SIZE_ESTIMATOR, NoSizeEstimator.instance)
+ .handler(new Initializer());
if (settings.socketSendBufferSizeInBytes > 0)
bootstrap.option(ChannelOption.SO_SNDBUF, settings.socketSendBufferSizeInBytes);
diff --git a/src/java/org/apache/cassandra/net/SocketFactory.java b/src/java/org/apache/cassandra/net/SocketFactory.java
index 18bb0d5..062c44b 100644
--- a/src/java/org/apache/cassandra/net/SocketFactory.java
+++ b/src/java/org/apache/cassandra/net/SocketFactory.java
@@ -18,13 +18,14 @@
package org.apache.cassandra.net;
import java.io.IOException;
-import java.lang.reflect.Field;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
+import java.nio.channels.spi.SelectorProvider;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import javax.net.ssl.SSLEngine;
@@ -37,11 +38,11 @@ import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelFactory;
+import io.netty.channel.DefaultSelectStrategyFactory;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
-import io.netty.channel.MultithreadEventLoopGroup;
-import io.netty.channel.SingleThreadEventLoop;
+import io.netty.channel.ServerChannel;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
@@ -53,10 +54,10 @@ import io.netty.channel.unix.Errors;
import io.netty.handler.ssl.OpenSsl;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.DefaultEventExecutorChooserFactory;
import io.netty.util.concurrent.DefaultThreadFactory;
-import io.netty.util.concurrent.EventExecutor;
-import io.netty.util.concurrent.MultithreadEventExecutorGroup;
-import io.netty.util.concurrent.SingleThreadEventExecutor;
+import io.netty.util.concurrent.RejectedExecutionHandlers;
+import io.netty.util.concurrent.ThreadPerTaskExecutor;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.netty.util.internal.logging.Slf4JLoggerFactory;
import org.apache.cassandra.concurrent.NamedThreadFactory;
@@ -82,8 +83,85 @@ public final class SocketFactory
private static final int EVENT_THREADS = Integer.getInteger(Config.PROPERTY_PREFIX + "internode-event-threads", FBUtilities.getAvailableProcessors());
- public enum Provider { EPOLL, NIO }
- private static final Provider DEFAULT_PROVIDER = NativeTransportService.useEpoll() ? Provider.EPOLL : Provider.NIO;
+ /**
+ * The default task queue used by {@code NioEventLoop} and {@code EpollEventLoop} is {@code MpscUnboundedArrayQueue},
+ * provided by JCTools. While efficient, it has an undesirable quality for a queue backing an event loop: it is
+ * not non-blocking, and can cause the event loop to busy-spin while waiting for a partially completed task
+ * offer, if the producer thread has been suspended mid-offer.
+ *
+ * As it happens, however, we have an MPSC queue implementation that is perfectly fit for this purpose -
+ * {@link ManyToOneConcurrentLinkedQueue}, that is non-blocking, and already used throughout the codebase,
+ * that we can and do use here as well.
+ */
+ enum Provider
+ {
+ NIO
+ {
+ @Override
+ NioEventLoopGroup makeEventLoopGroup(int threadCount, ThreadFactory threadFactory)
+ {
+ return new NioEventLoopGroup(threadCount,
+ new ThreadPerTaskExecutor(threadFactory),
+ DefaultEventExecutorChooserFactory.INSTANCE,
+ SelectorProvider.provider(),
+ DefaultSelectStrategyFactory.INSTANCE,
+ RejectedExecutionHandlers.reject(),
+ capacity -> new ManyToOneConcurrentLinkedQueue<>());
+ }
+
+ @Override
+ ChannelFactory<NioSocketChannel> clientChannelFactory()
+ {
+ return NioSocketChannel::new;
+ }
+
+ @Override
+ ChannelFactory<NioServerSocketChannel> serverChannelFactory()
+ {
+ return NioServerSocketChannel::new;
+ }
+ },
+ EPOLL
+ {
+ @Override
+ EpollEventLoopGroup makeEventLoopGroup(int threadCount, ThreadFactory threadFactory)
+ {
+ return new EpollEventLoopGroup(threadCount,
+ new ThreadPerTaskExecutor(threadFactory),
+ DefaultEventExecutorChooserFactory.INSTANCE,
+ DefaultSelectStrategyFactory.INSTANCE,
+ RejectedExecutionHandlers.reject(),
+ capacity -> new ManyToOneConcurrentLinkedQueue<>());
+ }
+
+ @Override
+ ChannelFactory<EpollSocketChannel> clientChannelFactory()
+ {
+ return EpollSocketChannel::new;
+ }
+
+ @Override
+ ChannelFactory<EpollServerSocketChannel> serverChannelFactory()
+ {
+ return EpollServerSocketChannel::new;
+ }
+ };
+
+ EventLoopGroup makeEventLoopGroup(int threadCount, String threadNamePrefix)
+ {
+ logger.debug("using netty {} event loop for pool prefix {}", name(), threadNamePrefix);
+ return makeEventLoopGroup(threadCount, new DefaultThreadFactory(threadNamePrefix, true));
+ }
+
+ abstract EventLoopGroup makeEventLoopGroup(int threadCount, ThreadFactory threadFactory);
+ abstract ChannelFactory<? extends Channel> clientChannelFactory();
+ abstract ChannelFactory<? extends ServerChannel> serverChannelFactory();
+
+ static Provider optimalProvider()
+ {
+ return NativeTransportService.useEpoll() ? EPOLL : NIO;
+ }
+ }
/** a useful addition for debugging; simply set to true to get more data in your logs */
static final boolean WIRETRACE = false;
@@ -93,94 +171,42 @@ public final class SocketFactory
InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE);
}
+ private final Provider provider;
private final EventLoopGroup acceptGroup;
private final EventLoopGroup defaultGroup;
// we need a separate EventLoopGroup for outbound streaming because sendFile is blocking
private final EventLoopGroup outboundStreamingGroup;
final ExecutorService synchronousWorkExecutor = Executors.newCachedThreadPool(new NamedThreadFactory("Messaging-SynchronousWork"));
- SocketFactory() { this(DEFAULT_PROVIDER); }
- SocketFactory(Provider provider)
- {
- this.acceptGroup = getEventLoopGroup(provider, 1, "Messaging-AcceptLoop");
- this.defaultGroup = getEventLoopGroup(provider, EVENT_THREADS, NamedThreadFactory.globalPrefix() + "Messaging-EventLoop");
- this.outboundStreamingGroup = getEventLoopGroup(provider, EVENT_THREADS, "Streaming-EventLoop");
- assert provider == providerOf(acceptGroup)
- && provider == providerOf(defaultGroup)
- && provider == providerOf(outboundStreamingGroup);
- }
-
- private static EventLoopGroup getEventLoopGroup(Provider provider, int threadCount, String threadNamePrefix)
+ SocketFactory()
{
- switch (provider)
- {
- case EPOLL:
- logger.debug("using netty epoll event loop for pool prefix {}", threadNamePrefix);
- return overwriteMPSCQueues(new EpollEventLoopGroup(threadCount, new DefaultThreadFactory(threadNamePrefix, true)));
- case NIO:
- logger.debug("using netty nio event loop for pool prefix {}", threadNamePrefix);
- return overwriteMPSCQueues(new NioEventLoopGroup(threadCount, new DefaultThreadFactory(threadNamePrefix, true)));
- default:
- throw new IllegalStateException();
- }
+ this(Provider.optimalProvider());
}
- private static Provider providerOf(EventLoopGroup eventLoopGroup)
+ SocketFactory(Provider provider)
{
- while (eventLoopGroup instanceof SingleThreadEventLoop)
- eventLoopGroup = ((SingleThreadEventLoop) eventLoopGroup).parent();
-
- if (eventLoopGroup instanceof EpollEventLoopGroup)
- return Provider.EPOLL;
- if (eventLoopGroup instanceof NioEventLoopGroup)
- return Provider.NIO;
- throw new IllegalStateException();
+ this.provider = provider;
+ this.acceptGroup = provider.makeEventLoopGroup(1, "Messaging-AcceptLoop");
+ this.defaultGroup = provider.makeEventLoopGroup(EVENT_THREADS, NamedThreadFactory.globalPrefix() + "Messaging-EventLoop");
+ this.outboundStreamingGroup = provider.makeEventLoopGroup(EVENT_THREADS, "Streaming-EventLoop");
}
- static Bootstrap newBootstrap(EventLoop eventLoop, int tcpUserTimeoutInMS)
+ Bootstrap newClientBootstrap(EventLoop eventLoop, int tcpUserTimeoutInMS)
{
if (eventLoop == null)
throw new IllegalArgumentException("must provide eventLoop");
- Bootstrap bootstrap = new Bootstrap()
- .group(eventLoop)
- .option(ChannelOption.ALLOCATOR, GlobalBufferPoolAllocator.instance)
- .option(ChannelOption.SO_KEEPALIVE, true);
+ Bootstrap bootstrap = new Bootstrap().group(eventLoop).channelFactory(provider.clientChannelFactory());
+
+ if (provider == Provider.EPOLL)
+ bootstrap.option(EpollChannelOption.TCP_USER_TIMEOUT, tcpUserTimeoutInMS);
- switch (providerOf(eventLoop))
- {
- case EPOLL:
- bootstrap.channel(EpollSocketChannel.class);
- bootstrap.option(EpollChannelOption.TCP_USER_TIMEOUT, tcpUserTimeoutInMS);
- break;
- case NIO:
- bootstrap.channel(NioSocketChannel.class);
- }
return bootstrap;
}
ServerBootstrap newServerBootstrap()
{
- return newServerBootstrap(acceptGroup, defaultGroup);
- }
-
- private static ServerBootstrap newServerBootstrap(EventLoopGroup acceptGroup, EventLoopGroup defaultGroup)
- {
- ServerBootstrap bootstrap = new ServerBootstrap()
- .group(acceptGroup, defaultGroup)
- .option(ChannelOption.ALLOCATOR, GlobalBufferPoolAllocator.instance)
- .option(ChannelOption.SO_REUSEADDR, true);
-
- switch (providerOf(defaultGroup))
- {
- case EPOLL:
- bootstrap.channel(EpollServerSocketChannel.class);
- break;
- case NIO:
- bootstrap.channel(NioServerSocketChannel.class);
- }
-
- return bootstrap;
+ return new ServerBootstrap().group(acceptGroup, defaultGroup).channelFactory(provider.serverChannelFactory());
}
/**
@@ -270,58 +296,4 @@ public final class SocketFactory
{
return from + "->" + to + '-' + type + '-' + id;
}
-
- /**
- * The default task queue used by {@code NioEventLoop} and {@code EpollEventLoop} is {@code MpscUnboundedArrayQueue},
- * provided by JCTools. While efficient, it has an undesirable quality for a queue backing an event loop: it is
- * not non-blocking, and can cause the event loop to busy-spin while waiting for a partially completed task
- * offer, if the producer thread has been suspended mid-offer. Sadly, there is currently no way to work around
- * this behaviour in application-logic.
- *
- * As it happens, however, we have an MPSC queue implementation that is perfectly fit for this purpose -
- * {@link ManyToOneConcurrentLinkedQueue}, that is non-blocking, and already used throughout the codebase.
- *
- * Unfortunately, there is no Netty API or to override the default queue, so we have to resort to reflection,
- * for now.
- *
- * We filed a Netty issue asking for this capability to be provided cleanly:
- * https://github.com/netty/netty/issues/9105, and hopefully Netty will implement it some day. When and if
- * that happens, this reflection-based workaround should be removed.
- */
- private static EventLoopGroup overwriteMPSCQueues(MultithreadEventLoopGroup eventLoopGroup)
- {
- try
- {
- for (EventExecutor eventExecutor : (EventExecutor[]) childrenField.get(eventLoopGroup))
- {
- SingleThreadEventLoop eventLoop = (SingleThreadEventLoop) eventExecutor;
- taskQueueField.set(eventLoop, new ManyToOneConcurrentLinkedQueue<>());
- tailTasksField.set(eventLoop, new ManyToOneConcurrentLinkedQueue<>());
- }
- return eventLoopGroup;
- }
- catch (IllegalAccessException e)
- {
- throw new IllegalStateException(e);
- }
- }
-
- private static final Field childrenField, taskQueueField, tailTasksField;
- static
- {
- try
- {
- childrenField = MultithreadEventExecutorGroup.class.getDeclaredField("children");
- taskQueueField = SingleThreadEventExecutor.class.getDeclaredField("taskQueue");
- tailTasksField = SingleThreadEventLoop.class.getDeclaredField("tailTasks");
-
- childrenField.setAccessible(true);
- taskQueueField.setAccessible(true);
- tailTasksField.setAccessible(true);
- }
- catch (NoSuchFieldException e)
- {
- throw new IllegalStateException(e);
- }
- }
}
diff --git a/src/java/org/apache/cassandra/security/SSLFactory.java b/src/java/org/apache/cassandra/security/SSLFactory.java
index 6674963..2ccb126 100644
--- a/src/java/org/apache/cassandra/security/SSLFactory.java
+++ b/src/java/org/apache/cassandra/security/SSLFactory.java
@@ -240,10 +240,12 @@ public final class SSLFactory
* Get a netty {@link SslContext} instance.
*/
@VisibleForTesting
- static SslContext getOrCreateSslContext(EncryptionOptions options, boolean buildTruststore,
- SocketType socketType, boolean useOpenSsl) throws IOException
+ static SslContext getOrCreateSslContext(EncryptionOptions options,
+ boolean buildTruststore,
+ SocketType socketType,
+ boolean useOpenSsl) throws IOException
{
- CacheKey key = new CacheKey(options, socketType);
+ CacheKey key = new CacheKey(options, socketType, useOpenSsl);
SslContext sslContext;
sslContext = cachedSslContexts.get(key);
@@ -413,11 +415,13 @@ public final class SSLFactory
{
private final EncryptionOptions encryptionOptions;
private final SocketType socketType;
+ private final boolean useOpenSSL;
- public CacheKey(EncryptionOptions encryptionOptions, SocketType socketType)
+ public CacheKey(EncryptionOptions encryptionOptions, SocketType socketType, boolean useOpenSSL)
{
this.encryptionOptions = encryptionOptions;
this.socketType = socketType;
+ this.useOpenSSL = useOpenSSL;
}
public boolean equals(Object o)
@@ -426,6 +430,7 @@ public final class SSLFactory
if (o == null || getClass() != o.getClass()) return false;
CacheKey cacheKey = (CacheKey) o;
return (socketType == cacheKey.socketType &&
+ useOpenSSL == cacheKey.useOpenSSL &&
Objects.equals(encryptionOptions, cacheKey.encryptionOptions));
}
@@ -434,6 +439,7 @@ public final class SSLFactory
int result = 0;
result += 31 * socketType.hashCode();
result += 31 * encryptionOptions.hashCode();
+ result += 31 * Boolean.hashCode(useOpenSSL);
return result;
}
}
diff --git a/src/java/org/apache/cassandra/service/NativeTransportService.java b/src/java/org/apache/cassandra/service/NativeTransportService.java
index 79acab1..79caafc 100644
--- a/src/java/org/apache/cassandra/service/NativeTransportService.java
+++ b/src/java/org/apache/cassandra/service/NativeTransportService.java
@@ -153,7 +153,7 @@ public class NativeTransportService
final boolean enableEpoll = Boolean.parseBoolean(System.getProperty("cassandra.native.epoll.enabled", "true"));
if (enableEpoll && !Epoll.isAvailable() && NativeLibrary.osType == NativeLibrary.OSType.LINUX)
- logger.warn("epoll not available {}", Epoll.unavailabilityCause());
+ logger.warn("epoll not available", Epoll.unavailabilityCause());
return enableEpoll && Epoll.isAvailable();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org