You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/02/08 02:43:56 UTC

[rocketmq] 13/17: [ISSUE #3708] Refactor CQ and BCQ loading process and Fix some unit-tests issue. (#3713)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-alpha
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 6172ad371384619b0200ea5424d725c4ce9400f8
Author: Hongjian Fei <er...@163.com>
AuthorDate: Thu Jan 6 20:07:21 2022 +0800

    [ISSUE #3708] Refactor CQ and BCQ loading process and Fix some unit-tests issue. (#3713)
    
    * [RIP-26] Both CQ and BCQ will be supported in DefaultMessageStore.
    
    * [RIP-26] Both CQ and BCQ will be supported in DefaultMessageStore.
    
    * Part of ISSUE #3708. Refactor CQ and BCQ loading process.
    
    * Mock getDiskSpaceWarningLevelRatio and getDiskSpaceCleanForciblyRatio for ut.
    
    * Optimize batch message unit-test.
    
    * Mock getDiskSpaceWarningLevelRatio and getDiskSpaceCleanForciblyRatio for ut.
    
    * Optimize batch message unit-test.
---
 .../rocketmq/store/queue/ConsumeQueueStore.java    | 127 +++++++++------------
 .../store/DefaultMessageStoreCleanFilesTest.java   |  28 ++---
 .../store/queue/BatchConsumeMessageTest.java       |  70 +++++-------
 .../store/queue/ConsumeQueueStoreTest.java         | 100 ++++++++++++++++
 .../apache/rocketmq/store/queue/QueueTestBase.java |  37 ++++--
 5 files changed, 214 insertions(+), 148 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index e8146ff..d3bfe75 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -28,7 +28,6 @@ import org.apache.rocketmq.store.DispatchRequest;
 import org.apache.rocketmq.store.MessageExtBrokerInner;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.store.config.StorePathConfigHelper;
 
 import java.io.File;
 import java.util.HashMap;
@@ -39,6 +38,10 @@ import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import static java.lang.String.format;
+import static org.apache.rocketmq.store.config.StorePathConfigHelper.getStorePathBatchConsumeQueue;
+import static org.apache.rocketmq.store.config.StorePathConfigHelper.getStorePathConsumeQueue;
+
 public class ConsumeQueueStore {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
 
@@ -79,7 +82,8 @@ public class ConsumeQueueStore {
      * Apply the dispatched request and build the consume queue.
      * This function should be idempotent.
      *
-     * @param request
+     * @param consumeQueue consume queue
+     * @param request dispatch request
      */
     public void putMessagePositionInfoWrapper(ConsumeQueueInterface consumeQueue, DispatchRequest request) {
         FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
@@ -97,11 +101,13 @@ public class ConsumeQueueStore {
     }
 
     public boolean load() {
-        return loadConsumeQueues() && loadBatchConsumeQueues();
+        boolean cqLoadResult = loadConsumeQueues(getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), CQType.SimpleCQ);
+        boolean bcqLoadResult = loadConsumeQueues(getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), CQType.BatchCQ);
+        return cqLoadResult && bcqLoadResult;
     }
 
-    private boolean loadBatchConsumeQueues() {
-        File dirLogic = new File(StorePathConfigHelper.getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
+    private boolean loadConsumeQueues(String storePath, CQType cqType) {
+        File dirLogic = new File(storePath);
         File[] fileTopicList = dirLogic.listFiles();
         if (fileTopicList != null) {
 
@@ -118,24 +124,9 @@ public class ConsumeQueueStore {
                             continue;
                         }
 
-                        TopicConfig topicConfig = this.topicConfigTable == null ? null : this.topicConfigTable.get(topic);
-
-                        // For batch consume queue, the topic config must exist
-                        if (topicConfig == null) {
-                            log.warn("topic: {} has no topic config.", topic);
-                            continue;
-                        }
+                        queueTypeShouldBe(topic, cqType);
 
-                        if (!Objects.equals(CQType.BatchCQ, QueueTypeUtils.getCQType(Optional.of(topicConfig)))) {
-                            log.error("[BUG]topic: {} should be BCQ.", topic);
-                        }
-
-                        ConsumeQueueInterface logic = new BatchConsumeQueue(
-                                topic,
-                                queueId,
-                                StorePathConfigHelper.getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
-                                this.messageStoreConfig.getMappedFileSizeConsumeQueue(),
-                                this.messageStore);
+                        ConsumeQueueInterface logic = createConsumeQueueByType(cqType, topic, queueId, storePath);
                         this.putConsumeQueue(topic, queueId, logic);
                         if (!this.load(logic)) {
                             return false;
@@ -145,46 +136,39 @@ public class ConsumeQueueStore {
             }
         }
 
-        log.info("load batch consume queue all over, OK");
+        log.info("load {} all over, OK", cqType);
 
         return true;
     }
 
-    private boolean loadConsumeQueues() {
-        File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
-        File[] fileTopicList = dirLogic.listFiles();
-        if (fileTopicList != null) {
-
-            for (File fileTopic : fileTopicList) {
-                String topic = fileTopic.getName();
-
-                File[] fileQueueIdList = fileTopic.listFiles();
-                if (fileQueueIdList != null) {
-                    for (File fileQueueId : fileQueueIdList) {
-                        int queueId;
-                        try {
-                            queueId = Integer.parseInt(fileQueueId.getName());
-                        } catch (NumberFormatException e) {
-                            continue;
-                        }
-                        ConsumeQueueInterface logic = new ConsumeQueue(
-                                topic,
-                                queueId,
-                                StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
-                                this.messageStoreConfig.getMappedFileSizeConsumeQueue(),
-                                this.messageStore);
-                        this.putConsumeQueue(topic, queueId, logic);
-                        if (!this.load(logic)) {
-                            return false;
-                        }
-                    }
-                }
-            }
+    private ConsumeQueueInterface createConsumeQueueByType(CQType cqType, String topic, int queueId, String storePath) {
+        if (Objects.equals(CQType.SimpleCQ, cqType)) {
+            return new ConsumeQueue(
+                    topic,
+                    queueId,
+                    storePath,
+                    this.messageStoreConfig.getMappedFileSizeConsumeQueue(),
+                    this.messageStore);
+        } else if (Objects.equals(CQType.BatchCQ, cqType)) {
+            return new BatchConsumeQueue(
+                    topic,
+                    queueId,
+                    storePath,
+                    this.messageStoreConfig.getMapperFileSizeBatchConsumeQueue(),
+                    this.messageStore);
+        } else {
+            throw new RuntimeException(format("queue type %s is not supported.", cqType.toString()));
         }
+    }
 
-        log.info("load logics queue all over, OK");
+    private void queueTypeShouldBe(String topic, CQType cqTypeExpected) {
+        TopicConfig topicConfig = this.topicConfigTable == null ? null : this.topicConfigTable.get(topic);
 
-        return true;
+        CQType cqTypeActual = QueueTypeUtils.getCQType(Optional.ofNullable(topicConfig));
+
+        if (!Objects.equals(cqTypeExpected, cqTypeActual)) {
+            throw new RuntimeException(format("The queue type of topic: %s should be %s, but is %s", topic, cqTypeExpected, cqTypeActual));
+        }
     }
 
     public void recover(ConsumeQueueInterface consumeQueue) {
@@ -212,13 +196,9 @@ public class ConsumeQueueStore {
     }
 
     public void checkSelf() {
-        Iterator<Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>> it = this.consumeQueueTable.entrySet().iterator();
-        while (it.hasNext()) {
-            Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> next = it.next();
-            Iterator<Map.Entry<Integer, ConsumeQueueInterface>> itNext = next.getValue().entrySet().iterator();
-            while (itNext.hasNext()) {
-                Map.Entry<Integer, ConsumeQueueInterface> cq = itNext.next();
-                this.checkSelf(cq.getValue());
+        for (Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> topicEntry : this.consumeQueueTable.entrySet()) {
+            for (Map.Entry<Integer, ConsumeQueueInterface> cqEntry : topicEntry.getValue().entrySet()) {
+                this.checkSelf(cqEntry.getValue());
             }
         }
     }
@@ -292,28 +272,23 @@ public class ConsumeQueueStore {
             newLogic = new BatchConsumeQueue(
                     topic,
                     queueId,
-                    StorePathConfigHelper.getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
+                    getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
                     this.messageStoreConfig.getMapperFileSizeBatchConsumeQueue(),
                     this.messageStore);
-            ConsumeQueueInterface oldLogic = map.putIfAbsent(queueId, newLogic);
-            if (oldLogic != null) {
-                logic = oldLogic;
-            } else {
-                logic = newLogic;
-            }
         } else {
             newLogic = new ConsumeQueue(
                     topic,
                     queueId,
-                    StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
+                    getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
                     this.messageStoreConfig.getMappedFileSizeConsumeQueue(),
                     this.messageStore);
-            ConsumeQueueInterface oldLogic = map.putIfAbsent(queueId, newLogic);
-            if (oldLogic != null) {
-                logic = oldLogic;
-            } else {
-                logic = newLogic;
-            }
+        }
+
+        ConsumeQueueInterface oldLogic = map.putIfAbsent(queueId, newLogic);
+        if (oldLogic != null) {
+            logic = oldLogic;
+        } else {
+            logic = newLogic;
         }
 
         return logic;
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
index 88cf181..9dad5ea 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
@@ -47,6 +47,8 @@ import static org.apache.rocketmq.common.message.MessageDecoder.CHARSET_UTF8;
 import static org.apache.rocketmq.store.ConsumeQueue.CQ_STORE_UNIT_SIZE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 /**
  * Test case for DefaultMessageStore.CleanCommitLogService and DefaultMessageStore.CleanConsumeQueueService
@@ -334,26 +336,6 @@ public class DefaultMessageStoreCleanFilesTest {
         }
     }
 
-    private DefaultMessageStore.CleanCommitLogService getCleanCommitLogService(double diskSpaceCleanForciblyRatio)
-            throws Exception {
-        Field serviceField = messageStore.getClass().getDeclaredField("cleanCommitLogService");
-        serviceField.setAccessible(true);
-        DefaultMessageStore.CleanCommitLogService cleanCommitLogService =
-                (DefaultMessageStore.CleanCommitLogService) serviceField.get(messageStore);
-        serviceField.setAccessible(false);
-
-        Field warningLevelRatioField = cleanCommitLogService.getClass().getDeclaredField("diskSpaceWarningLevelRatio");
-        warningLevelRatioField.setAccessible(true);
-        warningLevelRatioField.set(cleanCommitLogService, String.valueOf(diskSpaceCleanForciblyRatio));
-        warningLevelRatioField.setAccessible(false);
-
-        Field cleanForciblyRatioField = cleanCommitLogService.getClass().getDeclaredField("diskSpaceCleanForciblyRatio");
-        cleanForciblyRatioField.setAccessible(true);
-        cleanForciblyRatioField.set(cleanCommitLogService, String.valueOf(diskSpaceCleanForciblyRatio));
-        cleanForciblyRatioField.setAccessible(false);
-        return cleanCommitLogService;
-    }
-
     private DefaultMessageStore.CleanConsumeQueueService getCleanConsumeQueueService()
             throws Exception {
         Field serviceField = messageStore.getClass().getDeclaredField("cleanConsumeQueueService");
@@ -490,11 +472,15 @@ public class DefaultMessageStoreCleanFilesTest {
         messageStore = new DefaultMessageStore(messageStoreConfig,
                 new BrokerStatsManager("test"), new MyMessageArrivingListener(), new BrokerConfig());
 
-        cleanCommitLogService = getCleanCommitLogService(diskSpaceCleanForciblyRatio);
         cleanConsumeQueueService = getCleanConsumeQueueService();
 
         assertTrue(messageStore.load());
         messageStore.start();
+
+        // partially mock a real obj
+        cleanCommitLogService = spy(cleanCommitLogService);
+        when(cleanCommitLogService.getDiskSpaceWarningLevelRatio()).thenReturn(diskSpaceCleanForciblyRatio);
+        when(cleanCommitLogService.getDiskSpaceCleanForciblyRatio()).thenReturn(diskSpaceCleanForciblyRatio);
     }
 
     private class MyMessageArrivingListener implements MessageArrivingListener {
diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
index 500cd81..bc5f896 100644
--- a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
@@ -17,8 +17,6 @@
 
 package org.apache.rocketmq.store.queue;
 
-import org.apache.rocketmq.common.TopicAttributes;
-import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.common.message.MessageAccessor;
@@ -27,7 +25,6 @@ import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.common.utils.QueueTypeUtils;
-import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.GetMessageStatus;
 import org.apache.rocketmq.store.MessageExtBrokerInner;
@@ -43,16 +40,11 @@ import org.junit.Test;
 import java.io.File;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 import java.util.Queue;
 import java.util.Random;
 import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.awaitility.Awaitility.await;
@@ -173,8 +165,9 @@ public class BatchConsumeMessageTest extends QueueTestBase {
 
         long pullOffset = 0L;
         int getMessageCount = 0;
+        int atMostMsgNum = 1;
         while (true) {
-            GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, pullOffset, 1, null);
+            GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, pullOffset, atMostMsgNum, null);
             if (Objects.equals(getMessageResult.getStatus(), GetMessageStatus.OFFSET_OVERFLOW_ONE)) {
                 break;
             }
@@ -229,8 +222,9 @@ public class BatchConsumeMessageTest extends QueueTestBase {
         String topic = "TestDispatchBuildConsumeQueue";
         createTopic(topic, CQType.SimpleCQ, messageStore);
 
-        long timeStart = System.currentTimeMillis();
+        long timeStart = -1;
         long timeMid = -1;
+        long commitLogMid = -1;
 
         for (int i = 0; i < 100; i++) {
             MessageExtBrokerInner messageExtBrokerInner = buildMessage(topic, -1);
@@ -238,8 +232,14 @@ public class BatchConsumeMessageTest extends QueueTestBase {
             Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
 
             Thread.sleep(2);
-            if (i == 49)
-                timeMid = System.currentTimeMillis();
+            if (i == 0) {
+                timeStart = putMessageResult.getAppendMessageResult().getStoreTimestamp();
+            }
+            if (i == 50) {
+                timeMid = putMessageResult.getAppendMessageResult().getStoreTimestamp();
+                commitLogMid = putMessageResult.getAppendMessageResult().getWroteOffset();
+            }
+
         }
 
         await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
@@ -261,16 +261,14 @@ public class BatchConsumeMessageTest extends QueueTestBase {
         }
 
         //check the message time
-        long latencyAllowed = 20;
-        long earlistMessageTime = messageStore.getEarliestMessageTime(topic, 0);
-        Assert.assertTrue(earlistMessageTime > timeStart - latencyAllowed);
-        Assert.assertTrue(earlistMessageTime < timeStart + latencyAllowed);
+        long earliestMessageTime = messageStore.getEarliestMessageTime(topic, 0);
+        Assert.assertEquals(timeStart, earliestMessageTime);
         long messageStoreTime = messageStore.getMessageStoreTimeStamp(topic, 0, 50);
-        Assert.assertTrue(messageStoreTime > timeMid - latencyAllowed);
-        Assert.assertTrue(messageStoreTime < timeMid + latencyAllowed);
+        Assert.assertEquals(timeMid, messageStoreTime);
         long commitLogOffset = messageStore.getCommitLogOffsetInQueue(topic, 0, 50);
         Assert.assertTrue(commitLogOffset >= messageStore.getMinPhyOffset());
         Assert.assertTrue(commitLogOffset <= messageStore.getMaxPhyOffset());
+        Assert.assertEquals(commitLogMid, commitLogOffset);
 
         Assert.assertFalse(messageStore.checkInDiskByConsumeOffset(topic, 0, 50));
     }
@@ -279,7 +277,7 @@ public class BatchConsumeMessageTest extends QueueTestBase {
     public void testDispatchBuildBatchConsumeQueue() throws Exception {
         String topic = "testDispatchBuildBatchConsumeQueue";
         int batchNum = 10;
-        long timeStart = System.currentTimeMillis();
+        long timeStart = -1;
         long timeMid = -1;
 
         createTopic(topic, CQType.BatchCQ, messageStore);
@@ -288,8 +286,13 @@ public class BatchConsumeMessageTest extends QueueTestBase {
             PutMessageResult putMessageResult = messageStore.putMessage(buildMessage(topic, batchNum));
             Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
             Thread.sleep(2);
-            if (i == 29)
-                timeMid = System.currentTimeMillis();
+            if (i == 0) {
+                timeStart = putMessageResult.getAppendMessageResult().getStoreTimestamp();
+            }
+            if (i == 30) {
+                timeMid = putMessageResult.getAppendMessageResult().getStoreTimestamp();;
+            }
+
         }
 
         await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
@@ -309,12 +312,10 @@ public class BatchConsumeMessageTest extends QueueTestBase {
         }
 
         //check the message time
-        long earlistMessageTime = messageStore.getEarliestMessageTime(topic, 0);
-        Assert.assertTrue(earlistMessageTime > timeStart - 20);
-        Assert.assertTrue(earlistMessageTime < timeStart + 20);
+        long earliestMessageTime = messageStore.getEarliestMessageTime(topic, 0);
+        Assert.assertEquals(earliestMessageTime, timeStart);
         long messageStoreTime = messageStore.getMessageStoreTimeStamp(topic, 0, 300);
-        Assert.assertTrue(messageStoreTime > timeMid - 20);
-        Assert.assertTrue(messageStoreTime < timeMid + 20);
+        Assert.assertEquals(messageStoreTime, timeMid);
         long commitLogOffset = messageStore.getCommitLogOffsetInQueue(topic, 0, 300);
         Assert.assertTrue(commitLogOffset >= messageStore.getMinPhyOffset());
         Assert.assertTrue(commitLogOffset <= messageStore.getMaxPhyOffset());
@@ -450,21 +451,4 @@ public class BatchConsumeMessageTest extends QueueTestBase {
         }
     }
 
-    private void createTopic(String topic, CQType cqType, MessageStore messageStore) {
-        ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
-        TopicConfig topicConfigToBeAdded = new TopicConfig();
-
-        Map<String, String> attributes = new HashMap<>();
-        attributes.put(TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), cqType.toString());
-        topicConfigToBeAdded.setTopicName(topic);
-        topicConfigToBeAdded.setAttributes(attributes);
-
-        topicConfigTable.put(topic, topicConfigToBeAdded);
-        ((DefaultMessageStore)messageStore).setTopicConfigTable(topicConfigTable);
-    }
-
-    private Callable<Boolean> fullyDispatched(MessageStore messageStore) {
-        return () -> messageStore.dispatchBehindBytes() == 0;
-    }
-
 }
diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreTest.java
new file mode 100644
index 0000000..a8379fc
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.queue;
+
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.attribute.CQType;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.UUID;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+
+public class ConsumeQueueStoreTest extends QueueTestBase {
+    private MessageStore messageStore;
+
+    @Before
+    public void init() throws Exception {
+        messageStore = createMessageStore(null, true);
+        messageStore.load();
+        messageStore.start();
+    }
+
+    @After
+    public void destroy() {
+        messageStore.shutdown();
+        messageStore.destroy();
+
+        File file = new File(messageStore.getMessageStoreConfig().getStorePathRootDir());
+        UtilAll.deleteFile(file);
+    }
+
+    @Test
+    public void testLoadConsumeQueuesWithWrongAttribute() {
+        String normalTopic = UUID.randomUUID().toString();
+        createTopic(normalTopic, CQType.SimpleCQ, messageStore);
+
+        for (int i = 0; i < 10; i++) {
+            PutMessageResult putMessageResult = messageStore.putMessage(buildMessage(normalTopic, -1));
+            assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
+        }
+
+        await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
+
+        // simulate delete topic but with files left.
+        ((DefaultMessageStore)messageStore).setTopicConfigTable(null);
+
+        createTopic(normalTopic, CQType.BatchCQ, messageStore);
+        messageStore.shutdown();
+
+        RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> messageStore.getQueueStore().load());
+        Assert.assertTrue(runtimeException.getMessage().endsWith("should be SimpleCQ, but is BatchCQ"));
+    }
+
+    @Test
+    public void testLoadBatchConsumeQueuesWithWrongAttribute() {
+        String batchTopic = UUID.randomUUID().toString();
+        createTopic(batchTopic, CQType.BatchCQ, messageStore);
+
+        for (int i = 0; i < 10; i++) {
+            PutMessageResult putMessageResult = messageStore.putMessage(buildMessage(batchTopic, 10));
+            assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
+        }
+
+        await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
+
+        // simulate delete topic but with files left.
+        ((DefaultMessageStore)messageStore).setTopicConfigTable(null);
+
+        createTopic(batchTopic, CQType.SimpleCQ, messageStore);
+        messageStore.shutdown();
+
+        RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> messageStore.getQueueStore().load());
+        Assert.assertTrue(runtimeException.getMessage().endsWith("should be BatchCQ, but is SimpleCQ"));
+    }
+
+}
diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java b/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java
index c16d7cb..506cbd6 100644
--- a/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java
+++ b/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java
@@ -17,6 +17,9 @@
 package org.apache.rocketmq.store.queue;
 
 import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.TopicAttributes;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
@@ -31,10 +34,31 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
 import java.io.File;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 public class QueueTestBase extends StoreTestBase {
 
+    protected void createTopic(String topic, CQType cqType, MessageStore messageStore) {
+        ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
+        TopicConfig topicConfigToBeAdded = new TopicConfig();
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put(TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), cqType.toString());
+        topicConfigToBeAdded.setTopicName(topic);
+        topicConfigToBeAdded.setAttributes(attributes);
+
+        topicConfigTable.put(topic, topicConfigToBeAdded);
+        ((DefaultMessageStore)messageStore).setTopicConfigTable(topicConfigTable);
+    }
+
+    protected Callable<Boolean> fullyDispatched(MessageStore messageStore) {
+        return () -> messageStore.dispatchBehindBytes() == 0;
+    }
+
     protected MessageStore createMessageStore(String baseDir, boolean extent) throws Exception {
         if (baseDir == null) {
             baseDir = createBaseDir();
@@ -58,14 +82,11 @@ public class QueueTestBase extends StoreTestBase {
         messageStoreConfig.setFlushIntervalCommitLog(1);
         messageStoreConfig.setFlushCommitLogThoroughInterval(2);
 
-        DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), new MessageArrivingListener() {
-            @Override
-            public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
-                                 byte[] filterBitMap, Map<String, String> properties) {
-
-            }
-        }, new BrokerConfig());
-        return messageStore;
+        return new DefaultMessageStore(
+                messageStoreConfig,
+                new BrokerStatsManager("simpleTest"),
+                (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> {},
+                new BrokerConfig());
     }
 
     public MessageExtBrokerInner buildMessage(String topic, int batchNum) {