You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2022/07/25 18:09:02 UTC

[pinot] branch master updated: Enhance event loop initialization (#9094)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a49ab2ded Enhance event loop initialization (#9094)
9a49ab2ded is described below

commit 9a49ab2dedb4d9157af99cd42403f4dad56b2b90
Author: Jia Guo <ji...@linkedin.com>
AuthorDate: Mon Jul 25 11:08:53 2022 -0700

    Enhance event loop initialization (#9094)
    
    * Enhance event loop initialization
---
 .../apache/pinot/core/transport/QueryServer.java   | 35 +++++++++++++++++-----
 .../pinot/core/transport/ServerChannels.java       | 28 ++++++++++++++---
 2 files changed, 52 insertions(+), 11 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
index f2b504a89f..d0acbc6000 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
@@ -23,8 +23,10 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.kqueue.KQueue;
 import io.netty.channel.kqueue.KQueueEventLoopGroup;
 import io.netty.channel.kqueue.KQueueServerSocketChannel;
 import io.netty.channel.nio.NioEventLoopGroup;
@@ -45,6 +47,8 @@ import org.apache.pinot.core.query.scheduler.QueryScheduler;
 import org.apache.pinot.core.util.OsCheck;
 import org.apache.pinot.server.access.AccessControl;
 import org.apache.pinot.server.access.AllowAllAccessFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -52,15 +56,16 @@ import org.apache.pinot.server.access.AllowAllAccessFactory;
  * Brokers.
  */
 public class QueryServer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(QueryServer.class);
   private final int _port;
   private final QueryScheduler _queryScheduler;
   private final ServerMetrics _serverMetrics;
   private final TlsConfig _tlsConfig;
   private final AccessControl _accessControl;
 
-  private EventLoopGroup _bossGroup;
-  private EventLoopGroup _workerGroup;
-  private Class<? extends ServerSocketChannel> _channelClass;
+  private final EventLoopGroup _bossGroup;
+  private final EventLoopGroup _workerGroup;
+  private final Class<? extends ServerSocketChannel> _channelClass;
   private Channel _channel;
 
 
@@ -94,20 +99,36 @@ public class QueryServer {
     _serverMetrics = serverMetrics;
     _tlsConfig = tlsConfig;
     _accessControl = accessControl;
-    if (nettyConfig != null && nettyConfig.isNativeTransportsEnabled()
-        && OsCheck.getOperatingSystemType() == OsCheck.OSType.Linux) {
+
+    boolean enableNativeTransports = nettyConfig != null && nettyConfig.isNativeTransportsEnabled();
+    OsCheck.OSType operatingSystemType = OsCheck.getOperatingSystemType();
+    if (enableNativeTransports
+        && operatingSystemType == OsCheck.OSType.Linux
+        && Epoll.isAvailable()) {
       _bossGroup = new EpollEventLoopGroup();
       _workerGroup = new EpollEventLoopGroup();
       _channelClass = EpollServerSocketChannel.class;
-    } else if (nettyConfig != null && nettyConfig.isNativeTransportsEnabled()
-        && OsCheck.getOperatingSystemType() == OsCheck.OSType.MacOS) {
+      LOGGER.info("Using Epoll event loop");
+    } else if (enableNativeTransports
+        && operatingSystemType == OsCheck.OSType.MacOS
+        && KQueue.isAvailable()) {
       _bossGroup = new KQueueEventLoopGroup();
       _workerGroup = new KQueueEventLoopGroup();
       _channelClass = KQueueServerSocketChannel.class;
+      LOGGER.info("Using KQueue event loop");
     } else {
       _bossGroup = new NioEventLoopGroup();
       _workerGroup = new NioEventLoopGroup();
       _channelClass = NioServerSocketChannel.class;
+      StringBuilder log = new StringBuilder("Using NIO event loop");
+      if (operatingSystemType == OsCheck.OSType.Linux
+          && enableNativeTransports) {
+        log.append(", as Epoll is not available: ").append(Epoll.unavailabilityCause());
+      } else if (operatingSystemType == OsCheck.OSType.MacOS
+          && enableNativeTransports) {
+        log.append(", as KQueue is not available: ").append(KQueue.unavailabilityCause());
+      }
+      LOGGER.info(log.toString());
     }
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
index 8b76b8ed22..aeeda17819 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
@@ -24,8 +24,10 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.kqueue.KQueue;
 import io.netty.channel.kqueue.KQueueEventLoopGroup;
 import io.netty.channel.kqueue.KQueueSocketChannel;
 import io.netty.channel.nio.NioEventLoopGroup;
@@ -53,6 +55,8 @@ import org.apache.pinot.core.util.OsCheck;
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -62,6 +66,7 @@ import org.apache.thrift.transport.TTransportException;
  */
 @ThreadSafe
 public class ServerChannels {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ServerChannels.class);
   public static final String CHANNEL_LOCK_TIMEOUT_MSG = "Timeout while acquiring channel lock";
   private static final long TRY_CONNECT_CHANNEL_LOCK_TIMEOUT_MS = 5_000L;
 
@@ -83,17 +88,32 @@ public class ServerChannels {
    */
   public ServerChannels(QueryRouter queryRouter, BrokerMetrics brokerMetrics, @Nullable NettyConfig nettyConfig,
       @Nullable TlsConfig tlsConfig) {
-    if (nettyConfig != null && nettyConfig.isNativeTransportsEnabled()
-        && OsCheck.getOperatingSystemType() == OsCheck.OSType.Linux) {
+    boolean enableNativeTransports = nettyConfig != null && nettyConfig.isNativeTransportsEnabled();
+    OsCheck.OSType operatingSystemType = OsCheck.getOperatingSystemType();
+    if (enableNativeTransports
+        && operatingSystemType == OsCheck.OSType.Linux
+        && Epoll.isAvailable()) {
       _eventLoopGroup = new EpollEventLoopGroup();
       _channelClass = EpollSocketChannel.class;
-    } else if (nettyConfig != null && nettyConfig.isNativeTransportsEnabled()
-        && OsCheck.getOperatingSystemType() == OsCheck.OSType.MacOS) {
+      LOGGER.info("Using Epoll event loop");
+    } else if (enableNativeTransports
+        && operatingSystemType == OsCheck.OSType.MacOS
+        && KQueue.isAvailable()) {
       _eventLoopGroup = new KQueueEventLoopGroup();
       _channelClass = KQueueSocketChannel.class;
+      LOGGER.info("Using KQueue event loop");
     } else {
       _eventLoopGroup = new NioEventLoopGroup();
       _channelClass = NioSocketChannel.class;
+      StringBuilder log = new StringBuilder("Using NIO event loop");
+      if (operatingSystemType == OsCheck.OSType.Linux
+          && enableNativeTransports) {
+        log.append(", as Epoll is not available: ").append(Epoll.unavailabilityCause());
+      } else if (operatingSystemType == OsCheck.OSType.MacOS
+          && enableNativeTransports) {
+        log.append(", as KQueue is not available: ").append(KQueue.unavailabilityCause());
+      }
+      LOGGER.info(log.toString());
     }
 
     _queryRouter = queryRouter;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org