You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2021/12/15 12:09:43 UTC

[rocketmq] branch develop updated: Fix bug that the broker will hang after merge the pr that fix the headWaitTimeMills of sendThreadPoolQueue (#3631)

This is an automated email from the ASF dual-hosted git repository.

huangli pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new a2d9424  Fix bug that the broker will hang after merge the pr that fix the headWaitTimeMills of sendThreadPoolQueue (#3631)
a2d9424 is described below

commit a2d94242f3632dc71ce29e2a7bba0a91b20a6b8f
Author: rongtong <ji...@163.com>
AuthorDate: Wed Dec 15 20:09:34 2021 +0800

    Fix bug that the broker will hang after merge the pr that fix the headWaitTimeMills of sendThreadPoolQueue (#3631)
---
 .../apache/rocketmq/broker/BrokerController.java   | 32 ++++++++++++++--------
 .../broker/processor/SendMessageProcessor.java     |  2 +-
 .../rocketmq/broker/BrokerControllerTest.java      |  2 --
 .../org/apache/rocketmq/common/BrokerConfig.java   | 18 ++++++++++++
 4 files changed, 40 insertions(+), 14 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 662ec49..1f72e27 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -33,8 +33,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.Optional;
-import java.util.Objects;
 import org.apache.rocketmq.acl.AccessValidator;
 import org.apache.rocketmq.broker.client.ClientHousekeepingService;
 import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
@@ -134,6 +132,7 @@ public class BrokerController {
         "BrokerControllerScheduledThread"));
     private final SlaveSynchronize slaveSynchronize;
     private final BlockingQueue<Runnable> sendThreadPoolQueue;
+    private final BlockingQueue<Runnable> putThreadPoolQueue;
     private final BlockingQueue<Runnable> pullThreadPoolQueue;
     private final BlockingQueue<Runnable> replyThreadPoolQueue;
     private final BlockingQueue<Runnable> queryThreadPoolQueue;
@@ -150,6 +149,7 @@ public class BrokerController {
     private RemotingServer fastRemotingServer;
     private TopicConfigManager topicConfigManager;
     private ExecutorService sendMessageExecutor;
+    private ExecutorService putMessageFutureExecutor;
     private ExecutorService pullMessageExecutor;
     private ExecutorService replyMessageExecutor;
     private ExecutorService queryMessageExecutor;
@@ -198,6 +198,7 @@ public class BrokerController {
         this.slaveSynchronize = new SlaveSynchronize(this);
 
         this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
+        this.putThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPutThreadPoolQueueCapacity());
         this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
         this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
         this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
@@ -275,6 +276,14 @@ public class BrokerController {
                 this.sendThreadPoolQueue,
                 new ThreadFactoryImpl("SendMessageThread_"));
 
+            this.putMessageFutureExecutor = new BrokerFixedThreadPoolExecutor(
+                this.brokerConfig.getPutMessageFutureThreadPoolNums(),
+                this.brokerConfig.getPutMessageFutureThreadPoolNums(),
+                1000 * 60,
+                TimeUnit.MILLISECONDS,
+                this.putThreadPoolQueue,
+                new ThreadFactoryImpl("PutMessageThread_"));
+
             this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
                 this.brokerConfig.getPullMessageThreadPoolNums(),
                 this.brokerConfig.getPullMessageThreadPoolNums(),
@@ -652,13 +661,10 @@ public class BrokerController {
 
     public long headSlowTimeMills(BlockingQueue<Runnable> q) {
         long slowTimeMills = 0;
-        Optional<RequestTask> op = q.stream()
-                .map(BrokerFastFailure::castRunnable)
-                .filter(Objects::nonNull)
-                .findFirst();
-        if (op.isPresent()) {
-            RequestTask rt = op.get();
-            slowTimeMills = this.messageStore.now() - rt.getCreateTimestamp();
+        final Runnable peek = q.peek();
+        if (peek != null) {
+            RequestTask rt = BrokerFastFailure.castRunnable(peek);
+            slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
         }
 
         if (slowTimeMills < 0) {
@@ -787,6 +793,10 @@ public class BrokerController {
             this.sendMessageExecutor.shutdown();
         }
 
+        if (this.putMessageFutureExecutor != null) {
+            this.putMessageFutureExecutor.shutdown();
+        }
+
         if (this.pullMessageExecutor != null) {
             this.pullMessageExecutor.shutdown();
         }
@@ -1245,7 +1255,7 @@ public class BrokerController {
         }
     }
 
-    public ExecutorService getSendMessageExecutor() {
-        return sendMessageExecutor;
+    public ExecutorService getPutMessageFutureExecutor() {
+        return putMessageFutureExecutor;
     }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index b31c71e..f5ebf3a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -84,7 +84,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
 
     @Override
     public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {
-        asyncProcessRequest(ctx, request).thenAcceptAsync(responseCallback::callback, this.brokerController.getSendMessageExecutor());
+        asyncProcessRequest(ctx, request).thenAcceptAsync(responseCallback::callback, this.brokerController.getPutMessageFutureExecutor());
     }
 
     public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
index e8442a4..9706334 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
@@ -71,7 +71,6 @@ public class BrokerControllerTest {
 
             }
         };
-        queue.add(runnable);
 
         RequestTask requestTask = new RequestTask(runnable, null, null);
         // the requestTask is not the head of queue;
@@ -80,6 +79,5 @@ public class BrokerControllerTest {
         long headSlowTimeMills = 100;
         TimeUnit.MILLISECONDS.sleep(headSlowTimeMills);
         assertThat(brokerController.headSlowTimeMills(queue)).isGreaterThanOrEqualTo(headSlowTimeMills);
-        //Attention: if we use the previous version method BrokerController#headSlowTimeMills, it will return 0;
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index a45efbe..0e472fb 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -60,6 +60,7 @@ public class BrokerConfig {
      * thread numbers for send message thread pool.
      */
     private int sendMessageThreadPoolNums = Math.min(Runtime.getRuntime().availableProcessors(), 4);
+    private int putMessageFutureThreadPoolNums = Math.min(Runtime.getRuntime().availableProcessors(), 4);
     private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
     private int processReplyMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
     private int queryMessageThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors();
@@ -84,6 +85,7 @@ public class BrokerConfig {
     @ImportantField
     private boolean fetchNamesrvAddrByAddressServer = false;
     private int sendThreadPoolQueueCapacity = 10000;
+    private int putThreadPoolQueueCapacity = 10000;
     private int pullThreadPoolQueueCapacity = 100000;
     private int replyThreadPoolQueueCapacity = 10000;
     private int queryThreadPoolQueueCapacity = 20000;
@@ -375,6 +377,14 @@ public class BrokerConfig {
         this.sendMessageThreadPoolNums = sendMessageThreadPoolNums;
     }
 
+    public int getPutMessageFutureThreadPoolNums() {
+        return putMessageFutureThreadPoolNums;
+    }
+
+    public void setPutMessageFutureThreadPoolNums(int putMessageFutureThreadPoolNums) {
+        this.putMessageFutureThreadPoolNums = putMessageFutureThreadPoolNums;
+    }
+
     public int getPullMessageThreadPoolNums() {
         return pullMessageThreadPoolNums;
     }
@@ -479,6 +489,14 @@ public class BrokerConfig {
         this.sendThreadPoolQueueCapacity = sendThreadPoolQueueCapacity;
     }
 
+    public int getPutThreadPoolQueueCapacity() {
+        return putThreadPoolQueueCapacity;
+    }
+
+    public void setPutThreadPoolQueueCapacity(int putThreadPoolQueueCapacity) {
+        this.putThreadPoolQueueCapacity = putThreadPoolQueueCapacity;
+    }
+
     public int getPullThreadPoolQueueCapacity() {
         return pullThreadPoolQueueCapacity;
     }