You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2021/12/16 13:15:29 UTC

[rocketmq] branch develop updated: [ISSUE #3651] for add netty channel option WRITE_BUFFER_WATER_MARK

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 17d25ea  [ISSUE #3651] for add netty channel option WRITE_BUFFER_WATER_MARK
17d25ea is described below

commit 17d25ea56b44625ae04e7c3181d24b0e80acaf77
Author: tianliuliu <64...@qq.com>
AuthorDate: Thu Dec 16 21:15:12 2021 +0800

    [ISSUE #3651] for add netty channel option WRITE_BUFFER_WATER_MARK
---
 .../rocketmq/remoting/netty/NettyClientConfig.java    | 19 +++++++++++++++++++
 .../rocketmq/remoting/netty/NettyRemotingClient.java  |  3 +++
 .../rocketmq/remoting/netty/NettyRemotingServer.java  |  3 +++
 .../rocketmq/remoting/netty/NettyServerConfig.java    | 18 ++++++++++++++++++
 .../rocketmq/remoting/netty/NettySystemConfig.java    |  8 ++++++++
 5 files changed, 51 insertions(+)

diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
index 5ba3534..c1b9345 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
@@ -40,6 +40,9 @@ public class NettyClientConfig {
 
     private boolean useTLS;
 
+    private int writeBufferHighWaterMark = NettySystemConfig.writeBufferHighWaterMark;
+    private int writeBufferLowWaterMark = NettySystemConfig.writeBufferLowWaterMark;
+
     public boolean isClientCloseSocketIfTimeout() {
         return clientCloseSocketIfTimeout;
     }
@@ -135,4 +138,20 @@ public class NettyClientConfig {
     public void setUseTLS(boolean useTLS) {
         this.useTLS = useTLS;
     }
+
+    public int getWriteBufferLowWaterMark() {
+        return writeBufferLowWaterMark;
+    }
+
+    public void setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
+        this.writeBufferLowWaterMark = writeBufferLowWaterMark;
+    }
+
+    public int getWriteBufferHighWaterMark() {
+        return writeBufferHighWaterMark;
+    }
+
+    public void setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
+        this.writeBufferHighWaterMark = writeBufferHighWaterMark;
+    }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 5ba6cfa..5697465 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -27,6 +27,7 @@ import io.netty.channel.ChannelPipeline;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.WriteBufferWaterMark;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
@@ -167,6 +168,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
             .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
             .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
             .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
+            .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyClientConfig.getWriteBufferLowWaterMark(),
+                nettyClientConfig.getWriteBufferHighWaterMark()))
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 1586472..2b7413f 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -28,6 +28,7 @@ import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.WriteBufferWaterMark;
 import io.netty.channel.epoll.Epoll;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.epoll.EpollServerSocketChannel;
@@ -204,6 +205,8 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
                 .childOption(ChannelOption.TCP_NODELAY, true)
                 .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                 .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
+                .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
+                    nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()))
                 .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                 .childHandler(new ChannelInitializer<SocketChannel>() {
                     @Override
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
index 8708471..bd87e5b 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
@@ -27,6 +27,8 @@ public class NettyServerConfig implements Cloneable {
 
     private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
     private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
+    private int writeBufferHighWaterMark = NettySystemConfig.writeBufferHighWaterMark;
+    private int writeBufferLowWaterMark = NettySystemConfig.writeBufferLowWaterMark;
     private int serverSocketBacklog = NettySystemConfig.socketBacklog;
     private boolean serverPooledByteBufAllocatorEnable = true;
 
@@ -139,4 +141,20 @@ public class NettyServerConfig implements Cloneable {
     public Object clone() throws CloneNotSupportedException {
         return (NettyServerConfig) super.clone();
     }
+
+    public int getWriteBufferLowWaterMark() {
+        return writeBufferLowWaterMark;
+    }
+
+    public void setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
+        this.writeBufferLowWaterMark = writeBufferLowWaterMark;
+    }
+
+    public int getWriteBufferHighWaterMark() {
+        return writeBufferHighWaterMark;
+    }
+
+    public void setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
+        this.writeBufferHighWaterMark = writeBufferHighWaterMark;
+    }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
index 85f30f5..4290bae 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
@@ -38,6 +38,10 @@ public class NettySystemConfig {
         "com.rocketmq.remoting.client.channel.maxIdleTimeSeconds";
     public static final String COM_ROCKETMQ_REMOTING_CLIENT_CLOSE_SOCKET_IF_TIMEOUT =
         "com.rocketmq.remoting.client.closeSocketIfTimeout";
+    public static final String COM_ROCKETMQ_REMOTING_WRITE_BUFFER_HIGH_WATER_MARK_VALUE =
+        "com.rocketmq.remoting.write.buffer.high.water.mark";
+    public static final String COM_ROCKETMQ_REMOTING_WRITE_BUFFER_LOW_WATER_MARK =
+        "com.rocketmq.remoting.write.buffer.low.water.mark";
 
     public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = //
         Boolean.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false"));
@@ -59,5 +63,9 @@ public class NettySystemConfig {
         Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_CHANNEL_MAX_IDLE_SECONDS, "120"));
     public static boolean clientCloseSocketIfTimeout =
         Boolean.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_CLOSE_SOCKET_IF_TIMEOUT, "true"));
+    public static int writeBufferHighWaterMark =
+        Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_WRITE_BUFFER_HIGH_WATER_MARK_VALUE, "4194304"));//4M
+    public static int writeBufferLowWaterMark =
+        Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_WRITE_BUFFER_LOW_WATER_MARK, "1048576")); //1MB
 
 }