You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/02/14 05:50:59 UTC

[rocketmq] 04/18: Set a custom reject policy for batchDispatchRequestExecutor

This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 8bf81c46849837703d7acc4802558ed3bdc122b7
Author: nowinkey <no...@tom.com>
AuthorDate: Sun Feb 5 20:38:35 2023 +0800

    Set a custom reject policy for batchDispatchRequestExecutor
---
 .../apache/rocketmq/store/DefaultMessageStore.java | 23 +++++++++++++++++-----
 1 file changed, 18 insertions(+), 5 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 97a0ee835..cf1745154 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -43,6 +43,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.ExecutionException;
@@ -55,6 +56,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.AbstractBrokerRunnable;
@@ -2632,9 +2634,9 @@ public class DefaultMessageStore implements MessageStore {
 
         DispatchRequest[][] buffer;
 
-        int ptr = 0;
+        long ptr = 0;
 
-        AtomicInteger maxPtr = new AtomicInteger();
+        AtomicLong maxPtr = new AtomicLong();
 
         public DispatchRequestOrderlyQueue(int bufferNum) {
             this.buffer = new DispatchRequest[bufferNum][];
@@ -2658,7 +2660,7 @@ public class DefaultMessageStore implements MessageStore {
         public DispatchRequest[] get(List<DispatchRequest[]> rets) {
             synchronized (this) {
                 for (int i = 0; i < this.buffer.length; i++) {
-                    int mod = ptr % this.buffer.length;
+                    int mod = (int) (ptr % this.buffer.length);
                     DispatchRequest[] ret = this.buffer[mod];
                     if (ret == null) {
                         this.notifyAll();
@@ -2860,8 +2862,19 @@ public class DefaultMessageStore implements MessageStore {
                     DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
                     1000 * 60,
                     TimeUnit.MICROSECONDS,
-                    new LinkedBlockingQueue<>(1024),
-                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"));
+                    new LinkedBlockingQueue<>(4096),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+                    new RejectedExecutionHandler() {
+                        @Override
+                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+                            try {
+                                LOGGER.warn("Task {} is blocking put into the workQueue", r);
+                                executor.getQueue().put(r);
+                            } catch (InterruptedException e) {
+                                LOGGER.error("Task {} failed to put into the workQueue", r);
+                            }
+                        }
+                    });
         }
 
         private void pollBatchDispatchRequest() {