You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/09/14 06:47:41 UTC

[rocketmq] branch develop updated: [ISSUE #5066] Fix only individual queues are consumed in high tps scenarios (#5067)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6efbe6824 [ISSUE #5066] Fix only individual queues are consumed in high tps scenarios (#5067)
6efbe6824 is described below

commit 6efbe68241d813d86b5c52f29c2ea30bb66ba368
Author: guyinyou <36...@users.noreply.github.com>
AuthorDate: Wed Sep 14 14:47:20 2022 +0800

    [ISSUE #5066] Fix only individual queues are consumed in high tps scenarios (#5067)
---
 .../broker/processor/DefaultPullMessageResultHandler.java      |  2 +-
 .../rocketmq/broker/processor/PullMessageProcessorTest.java    |  2 --
 .../src/main/java/org/apache/rocketmq/common/BrokerConfig.java | 10 +++++-----
 3 files changed, 6 insertions(+), 8 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
index 0e6a11283..ac6fa88bc 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
@@ -172,7 +172,7 @@ public class DefaultPullMessageResultHandler implements PullMessageResultHandler
     }
 
     private boolean channelIsWritable(Channel channel, PullMessageRequestHeader requestHeader) {
-        if (this.brokerController.getBrokerConfig().isNetWorkFlowController()) {
+        if (this.brokerController.getBrokerConfig().isEnableNetWorkFlowControl()) {
             if (!channel.isWritable()) {
                 log.warn("channel {} not writable ,cid {}", channel.remoteAddress(), requestHeader.getConsumerGroup());
                 return false;
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
index 5b7e4cf93..e20acb0cf 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
@@ -83,10 +83,8 @@ public class PullMessageProcessorTest {
         SubscriptionGroupManager subscriptionGroupManager = new SubscriptionGroupManager(brokerController);
         pullMessageProcessor = new PullMessageProcessor(brokerController);
         Channel mockChannel = mock(Channel.class);
-        when(mockChannel.isWritable()).thenReturn(true);
         when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024));
         when(handlerContext.channel()).thenReturn(mockChannel);
-        when(handlerContext.channel().isWritable()).thenReturn(true);
         when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
         brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new TopicConfig());
         clientChannelInfo = new ClientChannelInfo(mockChannel);
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index daa809a05..f82f6a1de 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -191,7 +191,7 @@ public class BrokerConfig extends BrokerIdentity {
      */
     private long brokerNotActiveTimeoutMillis = 10 * 1000;
 
-    private boolean netWorkFlowController = true;
+    private boolean enableNetWorkFlowControl = false;
 
     private int popPollingSize = 1024;
     private int popPollingMapSize = 100000;
@@ -1156,12 +1156,12 @@ public class BrokerConfig extends BrokerIdentity {
         this.brokerNotActiveTimeoutMillis = brokerNotActiveTimeoutMillis;
     }
 
-    public boolean isNetWorkFlowController() {
-        return netWorkFlowController;
+    public boolean isEnableNetWorkFlowControl() {
+        return enableNetWorkFlowControl;
     }
 
-    public void setNetWorkFlowController(boolean netWorkFlowController) {
-        this.netWorkFlowController = netWorkFlowController;
+    public void setEnableNetWorkFlowControl(boolean enableNetWorkFlowControl) {
+        this.enableNetWorkFlowControl = enableNetWorkFlowControl;
     }
 
     public boolean isRealTimeNotifyConsumerChange() {