You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2021/11/10 02:24:03 UTC
[ratis] branch master updated: RATIS-1424.
NettyClientStreamRpc#workerGroup Can cause too much nioEventLoopGroup
object (#527)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 1dec9dc RATIS-1424. NettyClientStreamRpc#workerGroup Can cause too much nioEventLoopGroup object (#527)
1dec9dc is described below
commit 1dec9dcebfc87833be1c2dc9a127b03eaf62f129
Author: hao guo <gh...@126.com>
AuthorDate: Wed Nov 10 10:24:00 2021 +0800
RATIS-1424. NettyClientStreamRpc#workerGroup Can cause too much nioEventLoopGroup object (#527)
---
.../org/apache/ratis/netty/NettyConfigKeys.java | 28 +++++++++++-----
.../ratis/netty/client/NettyClientStreamRpc.java | 39 ++++++++++++++++++++--
2 files changed, 56 insertions(+), 11 deletions(-)
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
index de71889..9d7f45d 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -70,16 +70,28 @@ public interface NettyConfigKeys {
setInt(properties::setInt, PORT_KEY, port);
}
- String CLIENT_EVENT_LOOP_THREADS_KEY = PREFIX + ".client.eventLoopThreads";
- int CLIENT_EVENT_LOOP_THREADS_DEFAULT = Math.max(1, NettyRuntime.availableProcessors() * 2);
+ String CLIENT_WORKER_GROUP_SIZE_KEY = PREFIX + ".client.worker-group.size";
+ int CLIENT_WORKER_GROUP_SIZE_DEFAULT = Math.max(1, NettyRuntime.availableProcessors() * 2);
- static int clientEventLoopThreads(RaftProperties properties) {
- return getInt(properties::getInt, CLIENT_EVENT_LOOP_THREADS_KEY,
- CLIENT_EVENT_LOOP_THREADS_DEFAULT, getDefaultLog(), requireMin(1), requireMax(65536));
+ static int clientWorkerGroupSize(RaftProperties properties) {
+ return getInt(properties::getInt, CLIENT_WORKER_GROUP_SIZE_KEY,
+ CLIENT_WORKER_GROUP_SIZE_DEFAULT, getDefaultLog(), requireMin(1), requireMax(65536));
}
- static void setClientEventLoopThreads(RaftProperties properties, int eventLoopThreads) {
- setInt(properties::setInt, CLIENT_EVENT_LOOP_THREADS_KEY, eventLoopThreads);
+ static void setClientWorkerGroupSize(RaftProperties properties, int clientWorkerGroupSize) {
+ setInt(properties::setInt, CLIENT_WORKER_GROUP_SIZE_KEY, clientWorkerGroupSize);
+ }
+
+ String CLIENT_WORKER_GROUP_SHARE_KEY = PREFIX + ".client.worker-group.share";
+ boolean CLIENT_WORKER_GROUP_SHARE_DEFAULT = false;
+
+ static boolean clientWorkerGroupShare(RaftProperties properties) {
+ return getBoolean(properties::getBoolean, CLIENT_WORKER_GROUP_SHARE_KEY,
+ CLIENT_WORKER_GROUP_SHARE_DEFAULT, getDefaultLog());
+ }
+
+ static void setClientWorkerGroupShare(RaftProperties properties, boolean clientWorkerGroupShare) {
+ setBoolean(properties::setBoolean, CLIENT_WORKER_GROUP_SHARE_KEY, clientWorkerGroupShare);
}
}
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index b1ddfc6..a1ccf49 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -48,22 +48,55 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
public class NettyClientStreamRpc implements DataStreamClientRpc {
public static final Logger LOG = LoggerFactory.getLogger(NettyClientStreamRpc.class);
+ private static class WorkerGroupGetter implements Supplier<EventLoopGroup> {
+ private static final AtomicReference<EventLoopGroup> SHARED_WORKER_GROUP = new AtomicReference<>();
+
+ static EventLoopGroup newWorkerGroup(RaftProperties properties) {
+ return new NioEventLoopGroup(NettyConfigKeys.DataStream.clientWorkerGroupSize(properties));
+ }
+
+ private final EventLoopGroup workerGroup;
+ private final boolean ignoreShutdown;
+
+ WorkerGroupGetter(RaftProperties properties) {
+ if (NettyConfigKeys.DataStream.clientWorkerGroupShare(properties)) {
+ workerGroup = SHARED_WORKER_GROUP.updateAndGet(g -> g != null? g: newWorkerGroup(properties));
+ ignoreShutdown = true;
+ } else {
+ workerGroup = newWorkerGroup(properties);
+ ignoreShutdown = false;
+ }
+ }
+
+ @Override
+ public EventLoopGroup get() {
+ return workerGroup;
+ }
+
+ void shutdownGracefully() {
+ if (!ignoreShutdown) {
+ workerGroup.shutdownGracefully();
+ }
+ }
+ }
+
private final String name;
- private final EventLoopGroup workerGroup;
+ private final WorkerGroupGetter workerGroup;
private final Supplier<Channel> channel;
private final ConcurrentMap<ClientInvocationId, Queue<CompletableFuture<DataStreamReply>>> replies =
new ConcurrentHashMap<>();
public NettyClientStreamRpc(RaftPeer server, RaftProperties properties){
this.name = JavaUtils.getClassSimpleName(getClass()) + "->" + server;
- this.workerGroup = new NioEventLoopGroup(NettyConfigKeys.DataStream.clientEventLoopThreads(properties));
+ this.workerGroup = new WorkerGroupGetter(properties);
final ChannelFuture f = new Bootstrap()
- .group(workerGroup)
+ .group(workerGroup.get())
.channel(NioSocketChannel.class)
.handler(getInitializer())
.option(ChannelOption.SO_KEEPALIVE, true)