You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/02/18 07:59:51 UTC

[ratis] branch master updated: RATIS-1521. NettyServerStreamRpc use EpollEventLoopGroup (#599)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 075cae9  RATIS-1521. NettyServerStreamRpc use EpollEventLoopGroup (#599)
075cae9 is described below

commit 075cae92207335b62f3f8df3c57deed78c3980a8
Author: hao guo <gu...@360.cn>
AuthorDate: Fri Feb 18 15:59:44 2022 +0800

    RATIS-1521. NettyServerStreamRpc use EpollEventLoopGroup (#599)
---
 .../org/apache/ratis/netty/NettyConfigKeys.java    | 33 +++++++++++++++++
 .../java/org/apache/ratis/netty/NettyUtils.java    | 43 ++++++++++++++++++++++
 .../ratis/netty/server/NettyServerStreamRpc.java   | 18 +++++++--
 3 files changed, 90 insertions(+), 4 deletions(-)

diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
index 907cda6..e6fe6a2 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
@@ -106,6 +106,39 @@ public interface NettyConfigKeys {
     static void setClientReplyQueueGracePeriod(RaftProperties properties, TimeDuration timeoutDuration) {
       setTimeDuration(properties::setTimeDuration, CLIENT_REPLY_QUEUE_GRACE_PERIOD_KEY, timeoutDuration);
     }
+
+    interface Server {
+      String PREFIX = NettyConfigKeys.PREFIX + ".server";
+
+      String USE_EPOLL_KEY = PREFIX + ".use-epoll";
+      boolean USE_EPOLL_DEFAULT = false;
+      static boolean useEpoll(RaftProperties properties) {
+        return getBoolean(properties::getBoolean, USE_EPOLL_KEY, USE_EPOLL_DEFAULT, getDefaultLog());
+      }
+      static void setUseEpoll(RaftProperties properties, boolean enable) {
+        setBoolean(properties::setBoolean, USE_EPOLL_KEY, enable);
+      }
+
+      String BOSS_GROUP_SIZE_KEY = PREFIX + ".boss-group.size";
+      int BOSS_GROUP_SIZE_DEFAULT = 0;
+      static int bossGroupSize(RaftProperties properties) {
+        return getInt(properties::getInt, BOSS_GROUP_SIZE_KEY, BOSS_GROUP_SIZE_DEFAULT, getDefaultLog(),
+            requireMin(0), requireMax(65536));
+      }
+      static void setBossGroupSize(RaftProperties properties, int num) {
+        setInt(properties::setInt, BOSS_GROUP_SIZE_KEY, num);
+      }
+
+      String WORKER_GROUP_SIZE_KEY = PREFIX + ".worker-group.size";
+      int WORKER_GROUP_SIZE_DEFAULT = 0;
+      static int workerGroupSize(RaftProperties properties) {
+        return getInt(properties::getInt, WORKER_GROUP_SIZE_KEY, WORKER_GROUP_SIZE_DEFAULT, getDefaultLog(),
+            requireMin(0), requireMax(65536));
+      }
+      static void setWorkerGroupSize(RaftProperties properties, int num) {
+        setInt(properties::setInt, WORKER_GROUP_SIZE_KEY, num);
+      }
+    }
   }
 
   static void main(String[] args) {
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyUtils.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyUtils.java
new file mode 100644
index 0000000..939fb93
--- /dev/null
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
+import org.apache.ratis.thirdparty.io.netty.channel.epoll.Epoll;
+import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollEventLoopGroup;
+import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.ratis.util.ConcurrentUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public interface NettyUtils {
+  Logger LOG = LoggerFactory.getLogger(NettyUtils.class);
+
+  static EventLoopGroup newEventLoopGroup(String name, int size, boolean useEpoll) {
+    if (useEpoll) {
+      if (Epoll.isAvailable()) {
+        LOG.info("Create EpollEventLoopGroup for {}; Thread size is {}.", name, size);
+        return new EpollEventLoopGroup(size, ConcurrentUtils.newThreadFactory(name + "-"));
+      } else {
+        LOG.warn("Failed to create EpollEventLoopGroup for " + name + "; fall back on NioEventLoopGroup.",
+            Epoll.unavailabilityCause());
+      }
+    }
+    return new NioEventLoopGroup(size, ConcurrentUtils.newThreadFactory(name + "-"));
+  }
+}
\ No newline at end of file
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 5c46576..a307e04 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -24,6 +24,7 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
 import org.apache.ratis.netty.NettyConfigKeys;
 import org.apache.ratis.netty.NettyDataStreamUtils;
+import org.apache.ratis.netty.NettyUtils;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
@@ -40,7 +41,8 @@ import org.apache.ratis.thirdparty.io.netty.channel.ChannelInitializer;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelPipeline;
 import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
-import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollEventLoopGroup;
+import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollServerSocketChannel;
 import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
 import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel;
 import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
@@ -108,8 +110,8 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
   }
 
   private final String name;
-  private final EventLoopGroup bossGroup = new NioEventLoopGroup();
-  private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+  private final EventLoopGroup bossGroup;
+  private final EventLoopGroup workerGroup;
   private final ChannelFuture channelFuture;
 
   private final DataStreamManagement requests;
@@ -126,13 +128,21 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
       this.proxies.add(new Proxies(new PeerProxyMap<>(name, peer -> newClient(peer, properties))));
     }
 
+    final boolean useEpoll = NettyConfigKeys.DataStream.Server.useEpoll(properties);
+    this.bossGroup = NettyUtils.newEventLoopGroup(name + "-bossGroup",
+        NettyConfigKeys.DataStream.Server.bossGroupSize(properties), useEpoll);
+    this.workerGroup = NettyUtils.newEventLoopGroup(name + "-workerGroup",
+        NettyConfigKeys.DataStream.Server.workerGroupSize(properties), useEpoll);
+
     final int port = NettyConfigKeys.DataStream.port(properties);
     this.channelFuture = new ServerBootstrap()
         .group(bossGroup, workerGroup)
-        .channel(NioServerSocketChannel.class)
+        .channel(bossGroup instanceof EpollEventLoopGroup ?
+            EpollServerSocketChannel.class : NioServerSocketChannel.class)
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(getInitializer())
         .childOption(ChannelOption.SO_KEEPALIVE, true)
+        .childOption(ChannelOption.TCP_NODELAY, true)
         .bind(port);
   }