You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/09/14 06:47:41 UTC
[rocketmq] branch develop updated: [ISSUE #5066] Fix only individual queues are consumed in high tps scenarios (#5067)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 6efbe6824 [ISSUE #5066] Fix only individual queues are consumed in high tps scenarios (#5067)
6efbe6824 is described below
commit 6efbe68241d813d86b5c52f29c2ea30bb66ba368
Author: guyinyou <36...@users.noreply.github.com>
AuthorDate: Wed Sep 14 14:47:20 2022 +0800
[ISSUE #5066] Fix only individual queues are consumed in high tps scenarios (#5067)
---
.../broker/processor/DefaultPullMessageResultHandler.java | 2 +-
.../rocketmq/broker/processor/PullMessageProcessorTest.java | 2 --
.../src/main/java/org/apache/rocketmq/common/BrokerConfig.java | 10 +++++-----
3 files changed, 6 insertions(+), 8 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
index 0e6a11283..ac6fa88bc 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
@@ -172,7 +172,7 @@ public class DefaultPullMessageResultHandler implements PullMessageResultHandler
}
private boolean channelIsWritable(Channel channel, PullMessageRequestHeader requestHeader) {
- if (this.brokerController.getBrokerConfig().isNetWorkFlowController()) {
+ if (this.brokerController.getBrokerConfig().isEnableNetWorkFlowControl()) {
if (!channel.isWritable()) {
log.warn("channel {} not writable ,cid {}", channel.remoteAddress(), requestHeader.getConsumerGroup());
return false;
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
index 5b7e4cf93..e20acb0cf 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
@@ -83,10 +83,8 @@ public class PullMessageProcessorTest {
SubscriptionGroupManager subscriptionGroupManager = new SubscriptionGroupManager(brokerController);
pullMessageProcessor = new PullMessageProcessor(brokerController);
Channel mockChannel = mock(Channel.class);
- when(mockChannel.isWritable()).thenReturn(true);
when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024));
when(handlerContext.channel()).thenReturn(mockChannel);
- when(handlerContext.channel().isWritable()).thenReturn(true);
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new TopicConfig());
clientChannelInfo = new ClientChannelInfo(mockChannel);
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index daa809a05..f82f6a1de 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -191,7 +191,7 @@ public class BrokerConfig extends BrokerIdentity {
*/
private long brokerNotActiveTimeoutMillis = 10 * 1000;
- private boolean netWorkFlowController = true;
+ private boolean enableNetWorkFlowControl = false;
private int popPollingSize = 1024;
private int popPollingMapSize = 100000;
@@ -1156,12 +1156,12 @@ public class BrokerConfig extends BrokerIdentity {
this.brokerNotActiveTimeoutMillis = brokerNotActiveTimeoutMillis;
}
- public boolean isNetWorkFlowController() {
- return netWorkFlowController;
+ public boolean isEnableNetWorkFlowControl() {
+ return enableNetWorkFlowControl;
}
- public void setNetWorkFlowController(boolean netWorkFlowController) {
- this.netWorkFlowController = netWorkFlowController;
+ public void setEnableNetWorkFlowControl(boolean enableNetWorkFlowControl) {
+ this.enableNetWorkFlowControl = enableNetWorkFlowControl;
}
public boolean isRealTimeNotifyConsumerChange() {