You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/09/24 09:55:19 UTC

[rocketmq] branch develop updated: [ISSUE #1869] Delay message can't be consumed when delay offset in delayOffset.json is wrong (#3358)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 538b6b7  [ISSUE #1869] Delay message can't be consumed when delay offset in delayOffset.json is wrong (#3358)
538b6b7 is described below

commit 538b6b7ffd5ec0f98dc902a376d65d957143dcf8
Author: rongtong <ji...@163.com>
AuthorDate: Fri Sep 24 17:55:02 2021 +0800

    [ISSUE #1869] Delay message can't be consumed when delay offset in delayOffset.json is wrong (#3358)
    
    * fix(store):delay message can't be consumed when delay offset is wrong
    
    * fix(store):delay message can't be consumed when delay offset is wrong
    
    * fix(common):pass the code style
    
    * fix(store): fix bug of get cq max offset
    
    * fix(store): polish the code
    
    * fix(store): fix bug to pass the UT test
    
    * chore(store): polish log format
    
    * chore(store): polish code style
---
 .../apache/rocketmq/common/stats/StatsItem.java    |  1 -
 .../apache/rocketmq/store/DefaultMessageStore.java |  9 +--
 .../store/schedule/ScheduleMessageService.java     | 64 +++++++++++++----
 .../rocketmq/store/ScheduleMessageServiceTest.java | 84 ++++++++++++++++++++++
 4 files changed, 141 insertions(+), 17 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
index 6007cb0..d016662 100644
--- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
@@ -20,7 +20,6 @@ package org.apache.rocketmq.common.stats;
 import java.util.LinkedList;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.rocketmq.common.UtilAll;
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 5bf68ac..81344c0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -186,10 +186,6 @@ public class DefaultMessageStore implements MessageStore {
             boolean lastExitOK = !this.isTempFileExist();
             log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
 
-            if (null != scheduleMessageService) {
-                result = result && this.scheduleMessageService.load();
-            }
-
             // load Commit Log
             result = result && this.commitLog.load();
 
@@ -205,7 +201,12 @@ public class DefaultMessageStore implements MessageStore {
                 this.recover(lastExitOK);
 
                 log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
+
+                if (null != scheduleMessageService) {
+                    result =  this.scheduleMessageService.load();
+                }
             }
+
         } catch (Exception e) {
             log.error("load exception", e);
             result = false;
diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
index 1164ab8..e0e7b95 100644
--- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
@@ -78,8 +78,7 @@ public class ScheduleMessageService extends ConfigManager {
     }
 
     /**
-     * @param writeMessageStore
-     *     the writeMessageStore to set
+     * @param writeMessageStore the writeMessageStore to set
      */
     public void setWriteMessageStore(MessageStore writeMessageStore) {
         this.writeMessageStore = writeMessageStore;
@@ -133,7 +132,9 @@ public class ScheduleMessageService extends ConfigManager {
                 @Override
                 public void run() {
                     try {
-                        if (started.get()) ScheduleMessageService.this.persist();
+                        if (started.get()) {
+                            ScheduleMessageService.this.persist();
+                        }
                     } catch (Throwable e) {
                         log.error("scheduleAtFixedRate flush exception", e);
                     }
@@ -165,9 +166,46 @@ public class ScheduleMessageService extends ConfigManager {
     public boolean load() {
         boolean result = super.load();
         result = result && this.parseDelayLevel();
+        result = result && this.correctDelayOffset();
         return result;
     }
 
+    public boolean correctDelayOffset() {
+        try {
+            for (int delayLevel : delayLevelTable.keySet()) {
+                ConsumeQueue cq =
+                    ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
+                        delayLevel2QueueId(delayLevel));
+                Long currentDelayOffset = offsetTable.get(delayLevel);
+                if (currentDelayOffset == null || cq == null) {
+                    continue;
+                }
+                long correctDelayOffset = currentDelayOffset;
+                long cqMinOffset = cq.getMinOffsetInQueue();
+                long cqMaxOffset = cq.getMaxOffsetInQueue();
+                if (currentDelayOffset < cqMinOffset) {
+                    correctDelayOffset = cqMinOffset;
+                    log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
+                        currentDelayOffset, cqMinOffset, cqMaxOffset, cq.getQueueId());
+                }
+
+                if (currentDelayOffset > cqMaxOffset) {
+                    correctDelayOffset = cqMaxOffset;
+                    log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
+                        currentDelayOffset, cqMinOffset, cqMaxOffset, cq.getQueueId());
+                }
+                if (correctDelayOffset != currentDelayOffset) {
+                    log.error("correct delay offset [ delayLevel {} ] from {} to {}", delayLevel, currentDelayOffset, correctDelayOffset);
+                    offsetTable.put(delayLevel, correctDelayOffset);
+                }
+            }
+        } catch (Exception e) {
+            log.error("correctDelayOffset exception", e);
+            return false;
+        }
+        return true;
+    }
+
     @Override
     public String configFilePath() {
         return StorePathConfigHelper.getDelayOffsetStorePath(this.defaultMessageStore.getMessageStoreConfig()
@@ -309,7 +347,7 @@ public class ScheduleMessageService extends ConfigManager {
                                         MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                         if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
                                             log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
-                                                    msgInner.getTopic(), msgInner);
+                                                msgInner.getTopic(), msgInner);
                                             continue;
                                         }
                                         PutMessageResult putMessageResult =
@@ -344,14 +382,9 @@ public class ScheduleMessageService extends ConfigManager {
                                     } catch (Exception e) {
                                         /*
                                          * XXX: warn and notify me
-
-
-
                                          */
                                         log.error(
-                                            "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
-                                                + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
-                                                + offsetPy + ",sizePy=" + sizePy, e);
+                                            "ScheduleMessageService, messageTimeup execute error, drop it. msgExt={}, nextOffset={}, offsetPy={}, sizePy={}", msgExt, nextOffset, offsetPy, sizePy, e);
                                     }
                                 }
                             } else {
@@ -376,10 +409,17 @@ public class ScheduleMessageService extends ConfigManager {
                 else {
 
                     long cqMinOffset = cq.getMinOffsetInQueue();
+                    long cqMaxOffset = cq.getMaxOffsetInQueue();
                     if (offset < cqMinOffset) {
                         failScheduleOffset = cqMinOffset;
-                        log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
-                            + cqMinOffset + ", queueId=" + cq.getQueueId());
+                        log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
+                            offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
+                    }
+
+                    if (offset > cqMaxOffset) {
+                        failScheduleOffset = cqMaxOffset;
+                        log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
+                            offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
                     }
                 }
             } // end of if (cq != null)
diff --git a/store/src/test/java/org/apache/rocketmq/store/ScheduleMessageServiceTest.java b/store/src/test/java/org/apache/rocketmq/store/ScheduleMessageServiceTest.java
new file mode 100644
index 0000000..8502521
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/ScheduleMessageServiceTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.store.config.FlushDiskType;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.schedule.ScheduleMessageService;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ScheduleMessageServiceTest {
+
+    private Random random = new Random();
+
+    @Test
+    public void testCorrectDelayOffset_whenInit() throws Exception {
+
+        ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable = null;
+
+        ScheduleMessageService scheduleMessageService = new ScheduleMessageService((DefaultMessageStore) buildMessageStore());
+        scheduleMessageService.parseDelayLevel();
+
+        ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable1 = new ConcurrentHashMap<>();
+        for (int i = 1; i <= 18; i++) {
+            offsetTable1.put(i, random.nextLong());
+        }
+
+        Field field = scheduleMessageService.getClass().getDeclaredField("offsetTable");
+        field.setAccessible(true);
+        field.set(scheduleMessageService, offsetTable1);
+
+        String jsonStr = scheduleMessageService.encode();
+        scheduleMessageService.decode(jsonStr);
+
+        offsetTable = (ConcurrentMap<Integer, Long>) field.get(scheduleMessageService);
+
+        for (Map.Entry<Integer, Long> entry : offsetTable.entrySet()) {
+            assertEquals(entry.getValue(), offsetTable1.get(entry.getKey()));
+        }
+
+        scheduleMessageService.correctDelayOffset();
+
+        offsetTable = (ConcurrentMap<Integer, Long>) field.get(scheduleMessageService);
+
+        for (long offset : offsetTable.values()) {
+            assertEquals(offset, 0);
+        }
+
+    }
+
+    private MessageStore buildMessageStore() throws Exception {
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setMappedFileSizeCommitLog(1024 * 1024 * 10);
+        messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 1024 * 10);
+        messageStoreConfig.setMaxHashSlotNum(10000);
+        messageStoreConfig.setMaxIndexNum(100 * 100);
+        messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
+        messageStoreConfig.setFlushIntervalConsumeQueue(1);
+        return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), null, new BrokerConfig());
+    }
+}