You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/02/14 05:50:59 UTC
[rocketmq] 04/18: Set a custom reject policy for batchDispatchRequestExecutor
This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 8bf81c46849837703d7acc4802558ed3bdc122b7
Author: nowinkey <no...@tom.com>
AuthorDate: Sun Feb 5 20:38:35 2023 +0800
Set a custom reject policy for batchDispatchRequestExecutor
---
.../apache/rocketmq/store/DefaultMessageStore.java | 23 +++++++++++++++++-----
1 file changed, 18 insertions(+), 5 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 97a0ee835..cf1745154 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -43,6 +43,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.ExecutionException;
@@ -55,6 +56,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
@@ -2632,9 +2634,9 @@ public class DefaultMessageStore implements MessageStore {
DispatchRequest[][] buffer;
- int ptr = 0;
+ long ptr = 0;
- AtomicInteger maxPtr = new AtomicInteger();
+ AtomicLong maxPtr = new AtomicLong();
public DispatchRequestOrderlyQueue(int bufferNum) {
this.buffer = new DispatchRequest[bufferNum][];
@@ -2658,7 +2660,7 @@ public class DefaultMessageStore implements MessageStore {
public DispatchRequest[] get(List<DispatchRequest[]> rets) {
synchronized (this) {
for (int i = 0; i < this.buffer.length; i++) {
- int mod = ptr % this.buffer.length;
+ int mod = (int) (ptr % this.buffer.length);
DispatchRequest[] ret = this.buffer[mod];
if (ret == null) {
this.notifyAll();
@@ -2860,8 +2862,19 @@ public class DefaultMessageStore implements MessageStore {
DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(),
1000 * 60,
TimeUnit.MICROSECONDS,
- new LinkedBlockingQueue<>(1024),
- new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"));
+ new LinkedBlockingQueue<>(4096),
+ new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"),
+ new RejectedExecutionHandler() {
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ try {
+ LOGGER.warn("Task {} is blocking put into the workQueue", r);
+ executor.getQueue().put(r);
+ } catch (InterruptedException e) {
+ LOGGER.error("Task {} failed to put into the workQueue", r);
+ }
+ }
+ });
}
private void pollBatchDispatchRequest() {