You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/09/13 15:31:23 UTC

[rocketmq] branch develop updated: [ISSUE #5026] add timer message put result process

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9442628e4 [ISSUE #5026] add timer message put result process
9442628e4 is described below

commit 9442628e42b4e8eb5115b39a7b0613f88f0853b1
Author: yiduwangkai <16...@users.noreply.github.com>
AuthorDate: Tue Sep 13 23:30:57 2022 +0800

    [ISSUE #5026] add timer message put result process
    
    Co-authored-by: wangkai <wa...@zhongan.com>
---
 .../rocketmq/broker/processor/SendMessageProcessor.java    | 14 ++++++++++++++
 .../java/org/apache/rocketmq/store/PutMessageStatus.java   |  3 +--
 .../org/apache/rocketmq/store/timer/TimerMessageStore.java |  4 ++--
 3 files changed, 17 insertions(+), 4 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index e94759500..3f15dbf1e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -351,6 +351,20 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
                 response.setRemark(String.format("the message is illegal, maybe msg body or properties length not matched. msg body length limit %dB, msg properties length limit 32KB.",
                     this.brokerController.getMessageStoreConfig().getMaxMessageSize()));
                 break;
+            case WHEEL_TIMER_MSG_ILLEGAL:
+                response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+                response.setRemark(String.format("timer message illegal, the delay time should not be bigger than the max delay %dms; or if set del msg, the delay time should be bigger than the current time",
+                    this.brokerController.getMessageStoreConfig().getTimerMaxDelaySec() * 1000));
+                break;
+            case WHEEL_TIMER_FLOW_CONTROL:
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark(String.format("send message too much, triggered the flow control max num limit is %d or the current value is greater than %d and less than %d, trigger random flow control",
+                     this.brokerController.getMessageStoreConfig().getTimerCongestNumEachSlot() * 2L, this.brokerController.getMessageStoreConfig().getTimerCongestNumEachSlot(), this.brokerController.getMessageStoreConfig().getTimerCongestNumEachSlot() * 2L));
+                break;
+            case WHEEL_TIMER_NOT_ENABLE:
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark(String.format("accurate delay is not enabled, timerWheelEnable is %s",
+                     this.brokerController.getMessageStoreConfig().isTimerWheelEnable()));
             case SERVICE_NOT_AVAILABLE:
                 response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
                 response.setRemark(
diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageStatus.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageStatus.java
index ac30541bd..55afd3732 100644
--- a/store/src/main/java/org/apache/rocketmq/store/PutMessageStatus.java
+++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageStatus.java
@@ -32,6 +32,5 @@ public enum PutMessageStatus {
     LMQ_CONSUME_QUEUE_NUM_EXCEEDED,
     WHEEL_TIMER_FLOW_CONTROL,
     WHEEL_TIMER_MSG_ILLEGAL,
-    WHEEL_TIMER_NOT_ENABLE,
-    OS_PAGECACHE_BUSY
+    WHEEL_TIMER_NOT_ENABLE
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index 96335260e..92a7af836 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -1062,7 +1062,7 @@ public class TimerMessageStore {
                     case CREATE_MAPPED_FILE_FAILED:
                     case FLUSH_DISK_TIMEOUT:
                     case FLUSH_SLAVE_TIMEOUT:
-                    case OS_PAGECACHE_BUSY:
+                    case OS_PAGE_CACHE_BUSY:
                     case SLAVE_NOT_AVAILABLE:
                     case UNKNOWN_ERROR:
                     default:
@@ -1628,7 +1628,7 @@ public class TimerMessageStore {
         if (congestNum <= storeConfig.getTimerCongestNumEachSlot()) {
             return false;
         }
-        if (congestNum >= storeConfig.getTimerCongestNumEachSlot() * 2) {
+        if (congestNum >= storeConfig.getTimerCongestNumEachSlot() * 2L) {
             return true;
         }
         if (RANDOM.nextInt(1000) > 1000 * (congestNum - storeConfig.getTimerCongestNumEachSlot()) / (storeConfig.getTimerCongestNumEachSlot() + 0.1)) {