You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2021/11/08 13:01:01 UTC
[ratis] branch master updated: RATIS-1421. [Streaming]make client
NioEventLoopGroup size configurable. (#524)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new bba47a5 RATIS-1421. [Streaming]make client NioEventLoopGroup size configurable. (#524)
bba47a5 is described below
commit bba47a5602bbbab496ca55f4761eefbeda5fd30e
Author: micah zhao <10...@qq.com>
AuthorDate: Mon Nov 8 21:00:54 2021 +0800
RATIS-1421. [Streaming]make client NioEventLoopGroup size configurable. (#524)
---
.../main/java/org/apache/ratis/netty/NettyConfigKeys.java | 13 +++++++++++++
.../org/apache/ratis/netty/client/NettyClientStreamRpc.java | 5 +++--
.../test/java/org/apache/ratis/OutputStreamBaseTest.java | 2 +-
.../ratis/datastream/DataStreamAsyncClusterTests.java | 4 ++--
4 files changed, 19 insertions(+), 5 deletions(-)
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
index d3be80b..de71889 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
@@ -18,6 +18,7 @@
package org.apache.ratis.netty;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.thirdparty.io.netty.util.NettyRuntime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,6 +69,18 @@ public interface NettyConfigKeys {
static void setPort(RaftProperties properties, int port) {
setInt(properties::setInt, PORT_KEY, port);
}
+
+ String CLIENT_EVENT_LOOP_THREADS_KEY = PREFIX + ".client.eventLoopThreads";
+ int CLIENT_EVENT_LOOP_THREADS_DEFAULT = Math.max(1, NettyRuntime.availableProcessors() * 2);
+
+ static int clientEventLoopThreads(RaftProperties properties) {
+ return getInt(properties::getInt, CLIENT_EVENT_LOOP_THREADS_KEY,
+ CLIENT_EVENT_LOOP_THREADS_DEFAULT, getDefaultLog(), requireMin(1), requireMax(65536));
+ }
+
+ static void setClientEventLoopThreads(RaftProperties properties, int eventLoopThreads) {
+ setInt(properties::setInt, CLIENT_EVENT_LOOP_THREADS_KEY, eventLoopThreads);
+ }
}
static void main(String[] args) {
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 551810f..b1ddfc6 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -22,6 +22,7 @@ import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
+import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.netty.NettyDataStreamUtils;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamReply;
@@ -53,14 +54,14 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
public static final Logger LOG = LoggerFactory.getLogger(NettyClientStreamRpc.class);
private final String name;
- private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+ private final EventLoopGroup workerGroup;
private final Supplier<Channel> channel;
private final ConcurrentMap<ClientInvocationId, Queue<CompletableFuture<DataStreamReply>>> replies =
new ConcurrentHashMap<>();
public NettyClientStreamRpc(RaftPeer server, RaftProperties properties){
this.name = JavaUtils.getClassSimpleName(getClass()) + "->" + server;
-
+ this.workerGroup = new NioEventLoopGroup(NettyConfigKeys.DataStream.clientEventLoopThreads(properties));
final ChannelFuture f = new Bootstrap()
.group(workerGroup)
.channel(NioSocketChannel.class)
diff --git a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
index aa69fa4..1a5b509 100644
--- a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
@@ -70,7 +70,7 @@ public abstract class OutputStreamBaseTest<CLUSTER extends MiniRaftCluster>
return b;
}
- @Test
+ @Test(timeout = 300000)
public void testSimpleWrite() throws Exception {
runWithNewCluster(NUM_SERVERS, this::runTestSimpleWrite);
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
index 826facb..bdb571d 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
@@ -53,7 +53,7 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER extends MiniRaftCluste
runWithNewCluster(1, this::runTestDataStream);
}
- @Test
+ @Test(timeout = 300000)
public void testMultipleStreamsMultipleServers() throws Exception {
// Avoid changing leader
final TimeDuration min = RaftServerConfigKeys.Rpc.timeoutMin(getProperties());
@@ -68,7 +68,7 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER extends MiniRaftCluste
RaftServerConfigKeys.Rpc.setTimeoutMax(getProperties(), max);
}
- @Test
+ @Test(timeout = 300000)
public void testMultipleStreamsMultipleServersStepDownLeader() throws Exception {
runWithNewCluster(3, this::runTestDataStreamStepDownLeader);
}