You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/09/13 15:31:23 UTC
[rocketmq] branch develop updated: [ISSUE #5026] add timer message put result process
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 9442628e4 [ISSUE #5026] add timer message put result process
9442628e4 is described below
commit 9442628e42b4e8eb5115b39a7b0613f88f0853b1
Author: yiduwangkai <16...@users.noreply.github.com>
AuthorDate: Tue Sep 13 23:30:57 2022 +0800
[ISSUE #5026] add timer message put result process
Co-authored-by: wangkai <wa...@zhongan.com>
---
.../rocketmq/broker/processor/SendMessageProcessor.java | 14 ++++++++++++++
.../java/org/apache/rocketmq/store/PutMessageStatus.java | 3 +--
.../org/apache/rocketmq/store/timer/TimerMessageStore.java | 4 ++--
3 files changed, 17 insertions(+), 4 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index e94759500..3f15dbf1e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -351,6 +351,20 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
response.setRemark(String.format("the message is illegal, maybe msg body or properties length not matched. msg body length limit %dB, msg properties length limit 32KB.",
this.brokerController.getMessageStoreConfig().getMaxMessageSize()));
break;
+ case WHEEL_TIMER_MSG_ILLEGAL:
+ response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+ response.setRemark(String.format("timer message illegal, the delay time should not be bigger than the max delay %dms; or if set del msg, the delay time should be bigger than the current time",
+ this.brokerController.getMessageStoreConfig().getTimerMaxDelaySec() * 1000));
+ break;
+ case WHEEL_TIMER_FLOW_CONTROL:
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(String.format("send message too much, triggered the flow control max num limit is %d or the current value is greater than %d and less than %d, trigger random flow control",
+ this.brokerController.getMessageStoreConfig().getTimerCongestNumEachSlot() * 2L, this.brokerController.getMessageStoreConfig().getTimerCongestNumEachSlot(), this.brokerController.getMessageStoreConfig().getTimerCongestNumEachSlot() * 2L));
+ break;
+ case WHEEL_TIMER_NOT_ENABLE:
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(String.format("accurate delay is not enabled, timerWheelEnable is %s",
+ this.brokerController.getMessageStoreConfig().isTimerWheelEnable()));
case SERVICE_NOT_AVAILABLE:
response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
response.setRemark(
diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageStatus.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageStatus.java
index ac30541bd..55afd3732 100644
--- a/store/src/main/java/org/apache/rocketmq/store/PutMessageStatus.java
+++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageStatus.java
@@ -32,6 +32,5 @@ public enum PutMessageStatus {
LMQ_CONSUME_QUEUE_NUM_EXCEEDED,
WHEEL_TIMER_FLOW_CONTROL,
WHEEL_TIMER_MSG_ILLEGAL,
- WHEEL_TIMER_NOT_ENABLE,
- OS_PAGECACHE_BUSY
+ WHEEL_TIMER_NOT_ENABLE
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index 96335260e..92a7af836 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -1062,7 +1062,7 @@ public class TimerMessageStore {
case CREATE_MAPPED_FILE_FAILED:
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
- case OS_PAGECACHE_BUSY:
+ case OS_PAGE_CACHE_BUSY:
case SLAVE_NOT_AVAILABLE:
case UNKNOWN_ERROR:
default:
@@ -1628,7 +1628,7 @@ public class TimerMessageStore {
if (congestNum <= storeConfig.getTimerCongestNumEachSlot()) {
return false;
}
- if (congestNum >= storeConfig.getTimerCongestNumEachSlot() * 2) {
+ if (congestNum >= storeConfig.getTimerCongestNumEachSlot() * 2L) {
return true;
}
if (RANDOM.nextInt(1000) > 1000 * (congestNum - storeConfig.getTimerCongestNumEachSlot()) / (storeConfig.getTimerCongestNumEachSlot() + 0.1)) {