You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/02/18 07:59:51 UTC
[ratis] branch master updated: RATIS-1521. NettyServerStreamRpc use EpollEventLoopGroup (#599)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 075cae9 RATIS-1521. NettyServerStreamRpc use EpollEventLoopGroup (#599)
075cae9 is described below
commit 075cae92207335b62f3f8df3c57deed78c3980a8
Author: hao guo <gu...@360.cn>
AuthorDate: Fri Feb 18 15:59:44 2022 +0800
RATIS-1521. NettyServerStreamRpc use EpollEventLoopGroup (#599)
---
.../org/apache/ratis/netty/NettyConfigKeys.java | 33 +++++++++++++++++
.../java/org/apache/ratis/netty/NettyUtils.java | 43 ++++++++++++++++++++++
.../ratis/netty/server/NettyServerStreamRpc.java | 18 +++++++--
3 files changed, 90 insertions(+), 4 deletions(-)
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
index 907cda6..e6fe6a2 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
@@ -106,6 +106,39 @@ public interface NettyConfigKeys {
static void setClientReplyQueueGracePeriod(RaftProperties properties, TimeDuration timeoutDuration) {
setTimeDuration(properties::setTimeDuration, CLIENT_REPLY_QUEUE_GRACE_PERIOD_KEY, timeoutDuration);
}
+
+ interface Server {
+ String PREFIX = NettyConfigKeys.PREFIX + ".server";
+
+ String USE_EPOLL_KEY = PREFIX + ".use-epoll";
+ boolean USE_EPOLL_DEFAULT = false;
+ static boolean useEpoll(RaftProperties properties) {
+ return getBoolean(properties::getBoolean, USE_EPOLL_KEY, USE_EPOLL_DEFAULT, getDefaultLog());
+ }
+ static void setUseEpoll(RaftProperties properties, boolean enable) {
+ setBoolean(properties::setBoolean, USE_EPOLL_KEY, enable);
+ }
+
+ String BOSS_GROUP_SIZE_KEY = PREFIX + ".boss-group.size";
+ int BOSS_GROUP_SIZE_DEFAULT = 0;
+ static int bossGroupSize(RaftProperties properties) {
+ return getInt(properties::getInt, BOSS_GROUP_SIZE_KEY, BOSS_GROUP_SIZE_DEFAULT, getDefaultLog(),
+ requireMin(0), requireMax(65536));
+ }
+ static void setBossGroupSize(RaftProperties properties, int num) {
+ setInt(properties::setInt, BOSS_GROUP_SIZE_KEY, num);
+ }
+
+ String WORKER_GROUP_SIZE_KEY = PREFIX + ".worker-group.size";
+ int WORKER_GROUP_SIZE_DEFAULT = 0;
+ static int workerGroupSize(RaftProperties properties) {
+ return getInt(properties::getInt, WORKER_GROUP_SIZE_KEY, WORKER_GROUP_SIZE_DEFAULT, getDefaultLog(),
+ requireMin(0), requireMax(65536));
+ }
+ static void setWorkerGroupSize(RaftProperties properties, int num) {
+ setInt(properties::setInt, WORKER_GROUP_SIZE_KEY, num);
+ }
+ }
}
static void main(String[] args) {
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyUtils.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyUtils.java
new file mode 100644
index 0000000..939fb93
--- /dev/null
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
+import org.apache.ratis.thirdparty.io.netty.channel.epoll.Epoll;
+import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollEventLoopGroup;
+import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.ratis.util.ConcurrentUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public interface NettyUtils {
+ Logger LOG = LoggerFactory.getLogger(NettyUtils.class);
+
+ static EventLoopGroup newEventLoopGroup(String name, int size, boolean useEpoll) {
+ if (useEpoll) {
+ if (Epoll.isAvailable()) {
+ LOG.info("Create EpollEventLoopGroup for {}; Thread size is {}.", name, size);
+ return new EpollEventLoopGroup(size, ConcurrentUtils.newThreadFactory(name + "-"));
+ } else {
+ LOG.warn("Failed to create EpollEventLoopGroup for " + name + "; fall back on NioEventLoopGroup.",
+ Epoll.unavailabilityCause());
+ }
+ }
+ return new NioEventLoopGroup(size, ConcurrentUtils.newThreadFactory(name + "-"));
+ }
+}
\ No newline at end of file
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 5c46576..a307e04 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -24,6 +24,7 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.netty.NettyDataStreamUtils;
+import org.apache.ratis.netty.NettyUtils;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
@@ -40,7 +41,8 @@ import org.apache.ratis.thirdparty.io.netty.channel.ChannelInitializer;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
-import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollEventLoopGroup;
+import org.apache.ratis.thirdparty.io.netty.channel.epoll.EpollServerSocketChannel;
import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
@@ -108,8 +110,8 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
}
private final String name;
- private final EventLoopGroup bossGroup = new NioEventLoopGroup();
- private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+ private final EventLoopGroup bossGroup;
+ private final EventLoopGroup workerGroup;
private final ChannelFuture channelFuture;
private final DataStreamManagement requests;
@@ -126,13 +128,21 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
this.proxies.add(new Proxies(new PeerProxyMap<>(name, peer -> newClient(peer, properties))));
}
+ final boolean useEpoll = NettyConfigKeys.DataStream.Server.useEpoll(properties);
+ this.bossGroup = NettyUtils.newEventLoopGroup(name + "-bossGroup",
+ NettyConfigKeys.DataStream.Server.bossGroupSize(properties), useEpoll);
+ this.workerGroup = NettyUtils.newEventLoopGroup(name + "-workerGroup",
+ NettyConfigKeys.DataStream.Server.workerGroupSize(properties), useEpoll);
+
final int port = NettyConfigKeys.DataStream.port(properties);
this.channelFuture = new ServerBootstrap()
.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
+ .channel(bossGroup instanceof EpollEventLoopGroup ?
+ EpollServerSocketChannel.class : NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(getInitializer())
.childOption(ChannelOption.SO_KEEPALIVE, true)
+ .childOption(ChannelOption.TCP_NODELAY, true)
.bind(port);
}