You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2021/11/08 13:01:01 UTC

[ratis] branch master updated: RATIS-1421. [Streaming]make client NioEventLoopGroup size configurable. (#524)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new bba47a5  RATIS-1421. [Streaming]make client NioEventLoopGroup size configurable. (#524)
bba47a5 is described below

commit bba47a5602bbbab496ca55f4761eefbeda5fd30e
Author: micah zhao <10...@qq.com>
AuthorDate: Mon Nov 8 21:00:54 2021 +0800

    RATIS-1421. [Streaming]make client NioEventLoopGroup size configurable. (#524)
---
 .../main/java/org/apache/ratis/netty/NettyConfigKeys.java   | 13 +++++++++++++
 .../org/apache/ratis/netty/client/NettyClientStreamRpc.java |  5 +++--
 .../test/java/org/apache/ratis/OutputStreamBaseTest.java    |  2 +-
 .../ratis/datastream/DataStreamAsyncClusterTests.java       |  4 ++--
 4 files changed, 19 insertions(+), 5 deletions(-)

diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
index d3be80b..de71889 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.netty;
 
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.thirdparty.io.netty.util.NettyRuntime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,6 +69,18 @@ public interface NettyConfigKeys {
     static void setPort(RaftProperties properties, int port) {
       setInt(properties::setInt, PORT_KEY, port);
     }
+
+    String CLIENT_EVENT_LOOP_THREADS_KEY = PREFIX + ".client.eventLoopThreads";
+    int CLIENT_EVENT_LOOP_THREADS_DEFAULT = Math.max(1, NettyRuntime.availableProcessors() * 2);
+
+    static int clientEventLoopThreads(RaftProperties properties) {
+      return getInt(properties::getInt, CLIENT_EVENT_LOOP_THREADS_KEY,
+          CLIENT_EVENT_LOOP_THREADS_DEFAULT, getDefaultLog(), requireMin(1), requireMax(65536));
+    }
+
+    static void setClientEventLoopThreads(RaftProperties properties, int eventLoopThreads) {
+      setInt(properties::setInt, CLIENT_EVENT_LOOP_THREADS_KEY, eventLoopThreads);
+    }
   }
 
   static void main(String[] args) {
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 551810f..b1ddfc6 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -22,6 +22,7 @@ import org.apache.ratis.client.DataStreamClientRpc;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
 import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
+import org.apache.ratis.netty.NettyConfigKeys;
 import org.apache.ratis.netty.NettyDataStreamUtils;
 import org.apache.ratis.protocol.ClientInvocationId;
 import org.apache.ratis.protocol.DataStreamReply;
@@ -53,14 +54,14 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
   public static final Logger LOG = LoggerFactory.getLogger(NettyClientStreamRpc.class);
 
   private final String name;
-  private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+  private final EventLoopGroup workerGroup;
   private final Supplier<Channel> channel;
   private final ConcurrentMap<ClientInvocationId, Queue<CompletableFuture<DataStreamReply>>> replies =
       new ConcurrentHashMap<>();
 
   public NettyClientStreamRpc(RaftPeer server, RaftProperties properties){
     this.name = JavaUtils.getClassSimpleName(getClass()) + "->" + server;
-
+    this.workerGroup = new NioEventLoopGroup(NettyConfigKeys.DataStream.clientEventLoopThreads(properties));
     final ChannelFuture f = new Bootstrap()
         .group(workerGroup)
         .channel(NioSocketChannel.class)
diff --git a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
index aa69fa4..1a5b509 100644
--- a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
@@ -70,7 +70,7 @@ public abstract class OutputStreamBaseTest<CLUSTER extends MiniRaftCluster>
     return b;
   }
 
-  @Test
+  @Test(timeout = 300000)
   public void testSimpleWrite() throws Exception {
     runWithNewCluster(NUM_SERVERS, this::runTestSimpleWrite);
   }
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
index 826facb..bdb571d 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
@@ -53,7 +53,7 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER extends MiniRaftCluste
     runWithNewCluster(1, this::runTestDataStream);
   }
 
-  @Test
+  @Test(timeout = 300000)
   public void testMultipleStreamsMultipleServers() throws Exception {
     // Avoid changing leader
     final TimeDuration min = RaftServerConfigKeys.Rpc.timeoutMin(getProperties());
@@ -68,7 +68,7 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER extends MiniRaftCluste
     RaftServerConfigKeys.Rpc.setTimeoutMax(getProperties(), max);
   }
 
-  @Test
+  @Test(timeout = 300000)
   public void testMultipleStreamsMultipleServersStepDownLeader() throws Exception {
     runWithNewCluster(3, this::runTestDataStreamStepDownLeader);
   }