You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2021/12/16 13:15:29 UTC
[rocketmq] branch develop updated: [ISSUE #3651] for add netty channel option WRITE_BUFFER_WATER_MARK
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 17d25ea [ISSUE #3651] for add netty channel option WRITE_BUFFER_WATER_MARK
17d25ea is described below
commit 17d25ea56b44625ae04e7c3181d24b0e80acaf77
Author: tianliuliu <64...@qq.com>
AuthorDate: Thu Dec 16 21:15:12 2021 +0800
[ISSUE #3651] for add netty channel option WRITE_BUFFER_WATER_MARK
---
.../rocketmq/remoting/netty/NettyClientConfig.java | 19 +++++++++++++++++++
.../rocketmq/remoting/netty/NettyRemotingClient.java | 3 +++
.../rocketmq/remoting/netty/NettyRemotingServer.java | 3 +++
.../rocketmq/remoting/netty/NettyServerConfig.java | 18 ++++++++++++++++++
.../rocketmq/remoting/netty/NettySystemConfig.java | 8 ++++++++
5 files changed, 51 insertions(+)
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
index 5ba3534..c1b9345 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
@@ -40,6 +40,9 @@ public class NettyClientConfig {
private boolean useTLS;
+ private int writeBufferHighWaterMark = NettySystemConfig.writeBufferHighWaterMark;
+ private int writeBufferLowWaterMark = NettySystemConfig.writeBufferLowWaterMark;
+
public boolean isClientCloseSocketIfTimeout() {
return clientCloseSocketIfTimeout;
}
@@ -135,4 +138,20 @@ public class NettyClientConfig {
public void setUseTLS(boolean useTLS) {
this.useTLS = useTLS;
}
+
+ public int getWriteBufferLowWaterMark() {
+ return writeBufferLowWaterMark;
+ }
+
+ public void setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
+ this.writeBufferLowWaterMark = writeBufferLowWaterMark;
+ }
+
+ public int getWriteBufferHighWaterMark() {
+ return writeBufferHighWaterMark;
+ }
+
+ public void setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
+ this.writeBufferHighWaterMark = writeBufferHighWaterMark;
+ }
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 5ba6cfa..5697465 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -27,6 +27,7 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
@@ -167,6 +168,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
+ .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyClientConfig.getWriteBufferLowWaterMark(),
+ nettyClientConfig.getWriteBufferHighWaterMark()))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 1586472..2b7413f 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -28,6 +28,7 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
@@ -204,6 +205,8 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
+ .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
+ nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()))
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
index 8708471..bd87e5b 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
@@ -27,6 +27,8 @@ public class NettyServerConfig implements Cloneable {
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
+ private int writeBufferHighWaterMark = NettySystemConfig.writeBufferHighWaterMark;
+ private int writeBufferLowWaterMark = NettySystemConfig.writeBufferLowWaterMark;
private int serverSocketBacklog = NettySystemConfig.socketBacklog;
private boolean serverPooledByteBufAllocatorEnable = true;
@@ -139,4 +141,20 @@ public class NettyServerConfig implements Cloneable {
public Object clone() throws CloneNotSupportedException {
return (NettyServerConfig) super.clone();
}
+
+ public int getWriteBufferLowWaterMark() {
+ return writeBufferLowWaterMark;
+ }
+
+ public void setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
+ this.writeBufferLowWaterMark = writeBufferLowWaterMark;
+ }
+
+ public int getWriteBufferHighWaterMark() {
+ return writeBufferHighWaterMark;
+ }
+
+ public void setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
+ this.writeBufferHighWaterMark = writeBufferHighWaterMark;
+ }
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
index 85f30f5..4290bae 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
@@ -38,6 +38,10 @@ public class NettySystemConfig {
"com.rocketmq.remoting.client.channel.maxIdleTimeSeconds";
public static final String COM_ROCKETMQ_REMOTING_CLIENT_CLOSE_SOCKET_IF_TIMEOUT =
"com.rocketmq.remoting.client.closeSocketIfTimeout";
+ public static final String COM_ROCKETMQ_REMOTING_WRITE_BUFFER_HIGH_WATER_MARK_VALUE =
+ "com.rocketmq.remoting.write.buffer.high.water.mark";
+ public static final String COM_ROCKETMQ_REMOTING_WRITE_BUFFER_LOW_WATER_MARK =
+ "com.rocketmq.remoting.write.buffer.low.water.mark";
public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = //
Boolean.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false"));
@@ -59,5 +63,9 @@ public class NettySystemConfig {
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_CHANNEL_MAX_IDLE_SECONDS, "120"));
public static boolean clientCloseSocketIfTimeout =
Boolean.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_CLOSE_SOCKET_IF_TIMEOUT, "true"));
+ public static int writeBufferHighWaterMark =
+ Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_WRITE_BUFFER_HIGH_WATER_MARK_VALUE, "4194304"));//4M
+ public static int writeBufferLowWaterMark =
+ Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_WRITE_BUFFER_LOW_WATER_MARK, "1048576")); //1MB
}