You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2022/07/25 18:09:02 UTC
[pinot] branch master updated: Enhance event loop initialization (#9094)
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9a49ab2ded Enhance event loop initialization (#9094)
9a49ab2ded is described below
commit 9a49ab2dedb4d9157af99cd42403f4dad56b2b90
Author: Jia Guo <ji...@linkedin.com>
AuthorDate: Mon Jul 25 11:08:53 2022 -0700
Enhance event loop initialization (#9094)
* Enhance event loop initialization
---
.../apache/pinot/core/transport/QueryServer.java | 35 +++++++++++++++++-----
.../pinot/core/transport/ServerChannels.java | 28 ++++++++++++++---
2 files changed, 52 insertions(+), 11 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
index f2b504a89f..d0acbc6000 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
@@ -23,8 +23,10 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.kqueue.KQueueServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
@@ -45,6 +47,8 @@ import org.apache.pinot.core.query.scheduler.QueryScheduler;
import org.apache.pinot.core.util.OsCheck;
import org.apache.pinot.server.access.AccessControl;
import org.apache.pinot.server.access.AllowAllAccessFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -52,15 +56,16 @@ import org.apache.pinot.server.access.AllowAllAccessFactory;
* Brokers.
*/
public class QueryServer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(QueryServer.class);
private final int _port;
private final QueryScheduler _queryScheduler;
private final ServerMetrics _serverMetrics;
private final TlsConfig _tlsConfig;
private final AccessControl _accessControl;
- private EventLoopGroup _bossGroup;
- private EventLoopGroup _workerGroup;
- private Class<? extends ServerSocketChannel> _channelClass;
+ private final EventLoopGroup _bossGroup;
+ private final EventLoopGroup _workerGroup;
+ private final Class<? extends ServerSocketChannel> _channelClass;
private Channel _channel;
@@ -94,20 +99,36 @@ public class QueryServer {
_serverMetrics = serverMetrics;
_tlsConfig = tlsConfig;
_accessControl = accessControl;
- if (nettyConfig != null && nettyConfig.isNativeTransportsEnabled()
- && OsCheck.getOperatingSystemType() == OsCheck.OSType.Linux) {
+
+ boolean enableNativeTransports = nettyConfig != null && nettyConfig.isNativeTransportsEnabled();
+ OsCheck.OSType operatingSystemType = OsCheck.getOperatingSystemType();
+ if (enableNativeTransports
+ && operatingSystemType == OsCheck.OSType.Linux
+ && Epoll.isAvailable()) {
_bossGroup = new EpollEventLoopGroup();
_workerGroup = new EpollEventLoopGroup();
_channelClass = EpollServerSocketChannel.class;
- } else if (nettyConfig != null && nettyConfig.isNativeTransportsEnabled()
- && OsCheck.getOperatingSystemType() == OsCheck.OSType.MacOS) {
+ LOGGER.info("Using Epoll event loop");
+ } else if (enableNativeTransports
+ && operatingSystemType == OsCheck.OSType.MacOS
+ && KQueue.isAvailable()) {
_bossGroup = new KQueueEventLoopGroup();
_workerGroup = new KQueueEventLoopGroup();
_channelClass = KQueueServerSocketChannel.class;
+ LOGGER.info("Using KQueue event loop");
} else {
_bossGroup = new NioEventLoopGroup();
_workerGroup = new NioEventLoopGroup();
_channelClass = NioServerSocketChannel.class;
+ StringBuilder log = new StringBuilder("Using NIO event loop");
+ if (operatingSystemType == OsCheck.OSType.Linux
+ && enableNativeTransports) {
+ log.append(", as Epoll is not available: ").append(Epoll.unavailabilityCause());
+ } else if (operatingSystemType == OsCheck.OSType.MacOS
+ && enableNativeTransports) {
+ log.append(", as KQueue is not available: ").append(KQueue.unavailabilityCause());
+ }
+ LOGGER.info(log.toString());
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
index 8b76b8ed22..aeeda17819 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
@@ -24,8 +24,10 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.kqueue.KQueueSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
@@ -53,6 +55,8 @@ import org.apache.pinot.core.util.OsCheck;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -62,6 +66,7 @@ import org.apache.thrift.transport.TTransportException;
*/
@ThreadSafe
public class ServerChannels {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ServerChannels.class);
public static final String CHANNEL_LOCK_TIMEOUT_MSG = "Timeout while acquiring channel lock";
private static final long TRY_CONNECT_CHANNEL_LOCK_TIMEOUT_MS = 5_000L;
@@ -83,17 +88,32 @@ public class ServerChannels {
*/
public ServerChannels(QueryRouter queryRouter, BrokerMetrics brokerMetrics, @Nullable NettyConfig nettyConfig,
@Nullable TlsConfig tlsConfig) {
- if (nettyConfig != null && nettyConfig.isNativeTransportsEnabled()
- && OsCheck.getOperatingSystemType() == OsCheck.OSType.Linux) {
+ boolean enableNativeTransports = nettyConfig != null && nettyConfig.isNativeTransportsEnabled();
+ OsCheck.OSType operatingSystemType = OsCheck.getOperatingSystemType();
+ if (enableNativeTransports
+ && operatingSystemType == OsCheck.OSType.Linux
+ && Epoll.isAvailable()) {
_eventLoopGroup = new EpollEventLoopGroup();
_channelClass = EpollSocketChannel.class;
- } else if (nettyConfig != null && nettyConfig.isNativeTransportsEnabled()
- && OsCheck.getOperatingSystemType() == OsCheck.OSType.MacOS) {
+ LOGGER.info("Using Epoll event loop");
+ } else if (enableNativeTransports
+ && operatingSystemType == OsCheck.OSType.MacOS
+ && KQueue.isAvailable()) {
_eventLoopGroup = new KQueueEventLoopGroup();
_channelClass = KQueueSocketChannel.class;
+ LOGGER.info("Using KQueue event loop");
} else {
_eventLoopGroup = new NioEventLoopGroup();
_channelClass = NioSocketChannel.class;
+ StringBuilder log = new StringBuilder("Using NIO event loop");
+ if (operatingSystemType == OsCheck.OSType.Linux
+ && enableNativeTransports) {
+ log.append(", as Epoll is not available: ").append(Epoll.unavailabilityCause());
+ } else if (operatingSystemType == OsCheck.OSType.MacOS
+ && enableNativeTransports) {
+ log.append(", as KQueue is not available: ").append(KQueue.unavailabilityCause());
+ }
+ LOGGER.info(log.toString());
}
_queryRouter = queryRouter;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org