You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2021/12/15 12:09:43 UTC
[rocketmq] branch develop updated: Fix bug that the broker will hang after merge the pr that fix the headWaitTimeMills of sendThreadPoolQueue (#3631)
This is an automated email from the ASF dual-hosted git repository.
huangli pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new a2d9424 Fix bug that the broker will hang after merge the pr that fix the headWaitTimeMills of sendThreadPoolQueue (#3631)
a2d9424 is described below
commit a2d94242f3632dc71ce29e2a7bba0a91b20a6b8f
Author: rongtong <ji...@163.com>
AuthorDate: Wed Dec 15 20:09:34 2021 +0800
Fix bug that the broker will hang after merge the pr that fix the headWaitTimeMills of sendThreadPoolQueue (#3631)
---
.../apache/rocketmq/broker/BrokerController.java | 32 ++++++++++++++--------
.../broker/processor/SendMessageProcessor.java | 2 +-
.../rocketmq/broker/BrokerControllerTest.java | 2 --
.../org/apache/rocketmq/common/BrokerConfig.java | 18 ++++++++++++
4 files changed, 40 insertions(+), 14 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 662ec49..1f72e27 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -33,8 +33,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.Optional;
-import java.util.Objects;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
@@ -134,6 +132,7 @@ public class BrokerController {
"BrokerControllerScheduledThread"));
private final SlaveSynchronize slaveSynchronize;
private final BlockingQueue<Runnable> sendThreadPoolQueue;
+ private final BlockingQueue<Runnable> putThreadPoolQueue;
private final BlockingQueue<Runnable> pullThreadPoolQueue;
private final BlockingQueue<Runnable> replyThreadPoolQueue;
private final BlockingQueue<Runnable> queryThreadPoolQueue;
@@ -150,6 +149,7 @@ public class BrokerController {
private RemotingServer fastRemotingServer;
private TopicConfigManager topicConfigManager;
private ExecutorService sendMessageExecutor;
+ private ExecutorService putMessageFutureExecutor;
private ExecutorService pullMessageExecutor;
private ExecutorService replyMessageExecutor;
private ExecutorService queryMessageExecutor;
@@ -198,6 +198,7 @@ public class BrokerController {
this.slaveSynchronize = new SlaveSynchronize(this);
this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
+ this.putThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPutThreadPoolQueueCapacity());
this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
@@ -275,6 +276,14 @@ public class BrokerController {
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_"));
+ this.putMessageFutureExecutor = new BrokerFixedThreadPoolExecutor(
+ this.brokerConfig.getPutMessageFutureThreadPoolNums(),
+ this.brokerConfig.getPutMessageFutureThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.putThreadPoolQueue,
+ new ThreadFactoryImpl("PutMessageThread_"));
+
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
@@ -652,13 +661,10 @@ public class BrokerController {
public long headSlowTimeMills(BlockingQueue<Runnable> q) {
long slowTimeMills = 0;
- Optional<RequestTask> op = q.stream()
- .map(BrokerFastFailure::castRunnable)
- .filter(Objects::nonNull)
- .findFirst();
- if (op.isPresent()) {
- RequestTask rt = op.get();
- slowTimeMills = this.messageStore.now() - rt.getCreateTimestamp();
+ final Runnable peek = q.peek();
+ if (peek != null) {
+ RequestTask rt = BrokerFastFailure.castRunnable(peek);
+ slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
}
if (slowTimeMills < 0) {
@@ -787,6 +793,10 @@ public class BrokerController {
this.sendMessageExecutor.shutdown();
}
+ if (this.putMessageFutureExecutor != null) {
+ this.putMessageFutureExecutor.shutdown();
+ }
+
if (this.pullMessageExecutor != null) {
this.pullMessageExecutor.shutdown();
}
@@ -1245,7 +1255,7 @@ public class BrokerController {
}
}
- public ExecutorService getSendMessageExecutor() {
- return sendMessageExecutor;
+ public ExecutorService getPutMessageFutureExecutor() {
+ return putMessageFutureExecutor;
}
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index b31c71e..f5ebf3a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -84,7 +84,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
@Override
public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {
- asyncProcessRequest(ctx, request).thenAcceptAsync(responseCallback::callback, this.brokerController.getSendMessageExecutor());
+ asyncProcessRequest(ctx, request).thenAcceptAsync(responseCallback::callback, this.brokerController.getPutMessageFutureExecutor());
}
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
index e8442a4..9706334 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
@@ -71,7 +71,6 @@ public class BrokerControllerTest {
}
};
- queue.add(runnable);
RequestTask requestTask = new RequestTask(runnable, null, null);
// the requestTask is not the head of queue;
@@ -80,6 +79,5 @@ public class BrokerControllerTest {
long headSlowTimeMills = 100;
TimeUnit.MILLISECONDS.sleep(headSlowTimeMills);
assertThat(brokerController.headSlowTimeMills(queue)).isGreaterThanOrEqualTo(headSlowTimeMills);
- //Attention: if we use the previous version method BrokerController#headSlowTimeMills, it will return 0;
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index a45efbe..0e472fb 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -60,6 +60,7 @@ public class BrokerConfig {
* thread numbers for send message thread pool.
*/
private int sendMessageThreadPoolNums = Math.min(Runtime.getRuntime().availableProcessors(), 4);
+ private int putMessageFutureThreadPoolNums = Math.min(Runtime.getRuntime().availableProcessors(), 4);
private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
private int processReplyMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
private int queryMessageThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors();
@@ -84,6 +85,7 @@ public class BrokerConfig {
@ImportantField
private boolean fetchNamesrvAddrByAddressServer = false;
private int sendThreadPoolQueueCapacity = 10000;
+ private int putThreadPoolQueueCapacity = 10000;
private int pullThreadPoolQueueCapacity = 100000;
private int replyThreadPoolQueueCapacity = 10000;
private int queryThreadPoolQueueCapacity = 20000;
@@ -375,6 +377,14 @@ public class BrokerConfig {
this.sendMessageThreadPoolNums = sendMessageThreadPoolNums;
}
+ public int getPutMessageFutureThreadPoolNums() {
+ return putMessageFutureThreadPoolNums;
+ }
+
+ public void setPutMessageFutureThreadPoolNums(int putMessageFutureThreadPoolNums) {
+ this.putMessageFutureThreadPoolNums = putMessageFutureThreadPoolNums;
+ }
+
public int getPullMessageThreadPoolNums() {
return pullMessageThreadPoolNums;
}
@@ -479,6 +489,14 @@ public class BrokerConfig {
this.sendThreadPoolQueueCapacity = sendThreadPoolQueueCapacity;
}
+ public int getPutThreadPoolQueueCapacity() {
+ return putThreadPoolQueueCapacity;
+ }
+
+ public void setPutThreadPoolQueueCapacity(int putThreadPoolQueueCapacity) {
+ this.putThreadPoolQueueCapacity = putThreadPoolQueueCapacity;
+ }
+
public int getPullThreadPoolQueueCapacity() {
return pullThreadPoolQueueCapacity;
}