You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2021/11/10 02:24:03 UTC

[ratis] branch master updated: RATIS-1424. NettyClientStreamRpc#workerGroup Can cause too much nioEventLoopGroup object (#527)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 1dec9dc  RATIS-1424. NettyClientStreamRpc#workerGroup Can cause too much nioEventLoopGroup object (#527)
1dec9dc is described below

commit 1dec9dcebfc87833be1c2dc9a127b03eaf62f129
Author: hao guo <gh...@126.com>
AuthorDate: Wed Nov 10 10:24:00 2021 +0800

    RATIS-1424. NettyClientStreamRpc#workerGroup Can cause too much nioEventLoopGroup object (#527)
---
 .../org/apache/ratis/netty/NettyConfigKeys.java    | 28 +++++++++++-----
 .../ratis/netty/client/NettyClientStreamRpc.java   | 39 ++++++++++++++++++++--
 2 files changed, 56 insertions(+), 11 deletions(-)

diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
index de71889..9d7f45d 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -70,16 +70,28 @@ public interface NettyConfigKeys {
       setInt(properties::setInt, PORT_KEY, port);
     }
 
-    String CLIENT_EVENT_LOOP_THREADS_KEY = PREFIX + ".client.eventLoopThreads";
-    int CLIENT_EVENT_LOOP_THREADS_DEFAULT = Math.max(1, NettyRuntime.availableProcessors() * 2);
+    String CLIENT_WORKER_GROUP_SIZE_KEY = PREFIX + ".client.worker-group.size";
+    int CLIENT_WORKER_GROUP_SIZE_DEFAULT = Math.max(1, NettyRuntime.availableProcessors() * 2);
 
-    static int clientEventLoopThreads(RaftProperties properties) {
-      return getInt(properties::getInt, CLIENT_EVENT_LOOP_THREADS_KEY,
-          CLIENT_EVENT_LOOP_THREADS_DEFAULT, getDefaultLog(), requireMin(1), requireMax(65536));
+    static int clientWorkerGroupSize(RaftProperties properties) {
+      return getInt(properties::getInt, CLIENT_WORKER_GROUP_SIZE_KEY,
+          CLIENT_WORKER_GROUP_SIZE_DEFAULT, getDefaultLog(), requireMin(1), requireMax(65536));
     }
 
-    static void setClientEventLoopThreads(RaftProperties properties, int eventLoopThreads) {
-      setInt(properties::setInt, CLIENT_EVENT_LOOP_THREADS_KEY, eventLoopThreads);
+    static void setClientWorkerGroupSize(RaftProperties properties, int clientWorkerGroupSize) {
+      setInt(properties::setInt, CLIENT_WORKER_GROUP_SIZE_KEY, clientWorkerGroupSize);
+    }
+
+    String CLIENT_WORKER_GROUP_SHARE_KEY = PREFIX + ".client.worker-group.share";
+    boolean CLIENT_WORKER_GROUP_SHARE_DEFAULT = false;
+
+    static boolean clientWorkerGroupShare(RaftProperties properties) {
+      return getBoolean(properties::getBoolean, CLIENT_WORKER_GROUP_SHARE_KEY,
+          CLIENT_WORKER_GROUP_SHARE_DEFAULT, getDefaultLog());
+    }
+
+    static void setClientWorkerGroupShare(RaftProperties properties, boolean clientWorkerGroupShare) {
+      setBoolean(properties::setBoolean, CLIENT_WORKER_GROUP_SHARE_KEY, clientWorkerGroupShare);
     }
   }
 
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index b1ddfc6..a1ccf49 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -48,22 +48,55 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
 public class NettyClientStreamRpc implements DataStreamClientRpc {
   public static final Logger LOG = LoggerFactory.getLogger(NettyClientStreamRpc.class);
 
+  private static class WorkerGroupGetter implements Supplier<EventLoopGroup> {
+    private static final AtomicReference<EventLoopGroup> SHARED_WORKER_GROUP = new AtomicReference<>();
+
+    static EventLoopGroup newWorkerGroup(RaftProperties properties) {
+      return new NioEventLoopGroup(NettyConfigKeys.DataStream.clientWorkerGroupSize(properties));
+    }
+
+    private final EventLoopGroup workerGroup;
+    private final boolean ignoreShutdown;
+
+    WorkerGroupGetter(RaftProperties properties) {
+      if (NettyConfigKeys.DataStream.clientWorkerGroupShare(properties)) {
+        workerGroup = SHARED_WORKER_GROUP.updateAndGet(g -> g != null? g: newWorkerGroup(properties));
+        ignoreShutdown = true;
+      } else {
+        workerGroup = newWorkerGroup(properties);
+        ignoreShutdown = false;
+      }
+    }
+
+    @Override
+    public EventLoopGroup get() {
+      return workerGroup;
+    }
+
+    void shutdownGracefully() {
+      if (!ignoreShutdown) {
+        workerGroup.shutdownGracefully();
+      }
+    }
+  }
+
   private final String name;
-  private final EventLoopGroup workerGroup;
+  private final WorkerGroupGetter workerGroup;
   private final Supplier<Channel> channel;
   private final ConcurrentMap<ClientInvocationId, Queue<CompletableFuture<DataStreamReply>>> replies =
       new ConcurrentHashMap<>();
 
   public NettyClientStreamRpc(RaftPeer server, RaftProperties properties){
     this.name = JavaUtils.getClassSimpleName(getClass()) + "->" + server;
-    this.workerGroup = new NioEventLoopGroup(NettyConfigKeys.DataStream.clientEventLoopThreads(properties));
+    this.workerGroup = new WorkerGroupGetter(properties);
     final ChannelFuture f = new Bootstrap()
-        .group(workerGroup)
+        .group(workerGroup.get())
         .channel(NioSocketChannel.class)
         .handler(getInitializer())
         .option(ChannelOption.SO_KEEPALIVE, true)