You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/02/08 02:43:56 UTC
[rocketmq] 13/17: [ISSUE #3708] Refactor CQ and BCQ loading process and Fix some unit-tests issue. (#3713)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-alpha
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 6172ad371384619b0200ea5424d725c4ce9400f8
Author: Hongjian Fei <er...@163.com>
AuthorDate: Thu Jan 6 20:07:21 2022 +0800
[ISSUE #3708] Refactor CQ and BCQ loading process and Fix some unit-tests issue. (#3713)
* [RIP-26] Both CQ and BCQ will be supported in DefaultMessageStore.
* [RIP-26] Both CQ and BCQ will be supported in DefaultMessageStore.
* Part of ISSUE #3708. Refactor CQ and BCQ loading process.
* Mock getDiskSpaceWarningLevelRatio and getDiskSpaceCleanForciblyRatio for ut.
* Optimize batch message unit-test.
* Mock getDiskSpaceWarningLevelRatio and getDiskSpaceCleanForciblyRatio for ut.
* Optimize batch message unit-test.
---
.../rocketmq/store/queue/ConsumeQueueStore.java | 127 +++++++++------------
.../store/DefaultMessageStoreCleanFilesTest.java | 28 ++---
.../store/queue/BatchConsumeMessageTest.java | 70 +++++-------
.../store/queue/ConsumeQueueStoreTest.java | 100 ++++++++++++++++
.../apache/rocketmq/store/queue/QueueTestBase.java | 37 ++++--
5 files changed, 214 insertions(+), 148 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index e8146ff..d3bfe75 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -28,7 +28,6 @@ import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.store.config.StorePathConfigHelper;
import java.io.File;
import java.util.HashMap;
@@ -39,6 +38,10 @@ import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import static java.lang.String.format;
+import static org.apache.rocketmq.store.config.StorePathConfigHelper.getStorePathBatchConsumeQueue;
+import static org.apache.rocketmq.store.config.StorePathConfigHelper.getStorePathConsumeQueue;
+
public class ConsumeQueueStore {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
@@ -79,7 +82,8 @@ public class ConsumeQueueStore {
* Apply the dispatched request and build the consume queue.
* This function should be idempotent.
*
- * @param request
+ * @param consumeQueue consume queue
+ * @param request dispatch request
*/
public void putMessagePositionInfoWrapper(ConsumeQueueInterface consumeQueue, DispatchRequest request) {
FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
@@ -97,11 +101,13 @@ public class ConsumeQueueStore {
}
public boolean load() {
- return loadConsumeQueues() && loadBatchConsumeQueues();
+ boolean cqLoadResult = loadConsumeQueues(getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), CQType.SimpleCQ);
+ boolean bcqLoadResult = loadConsumeQueues(getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), CQType.BatchCQ);
+ return cqLoadResult && bcqLoadResult;
}
- private boolean loadBatchConsumeQueues() {
- File dirLogic = new File(StorePathConfigHelper.getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
+ private boolean loadConsumeQueues(String storePath, CQType cqType) {
+ File dirLogic = new File(storePath);
File[] fileTopicList = dirLogic.listFiles();
if (fileTopicList != null) {
@@ -118,24 +124,9 @@ public class ConsumeQueueStore {
continue;
}
- TopicConfig topicConfig = this.topicConfigTable == null ? null : this.topicConfigTable.get(topic);
-
- // For batch consume queue, the topic config must exist
- if (topicConfig == null) {
- log.warn("topic: {} has no topic config.", topic);
- continue;
- }
+ queueTypeShouldBe(topic, cqType);
- if (!Objects.equals(CQType.BatchCQ, QueueTypeUtils.getCQType(Optional.of(topicConfig)))) {
- log.error("[BUG]topic: {} should be BCQ.", topic);
- }
-
- ConsumeQueueInterface logic = new BatchConsumeQueue(
- topic,
- queueId,
- StorePathConfigHelper.getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
- this.messageStoreConfig.getMappedFileSizeConsumeQueue(),
- this.messageStore);
+ ConsumeQueueInterface logic = createConsumeQueueByType(cqType, topic, queueId, storePath);
this.putConsumeQueue(topic, queueId, logic);
if (!this.load(logic)) {
return false;
@@ -145,46 +136,39 @@ public class ConsumeQueueStore {
}
}
- log.info("load batch consume queue all over, OK");
+ log.info("load {} all over, OK", cqType);
return true;
}
- private boolean loadConsumeQueues() {
- File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
- File[] fileTopicList = dirLogic.listFiles();
- if (fileTopicList != null) {
-
- for (File fileTopic : fileTopicList) {
- String topic = fileTopic.getName();
-
- File[] fileQueueIdList = fileTopic.listFiles();
- if (fileQueueIdList != null) {
- for (File fileQueueId : fileQueueIdList) {
- int queueId;
- try {
- queueId = Integer.parseInt(fileQueueId.getName());
- } catch (NumberFormatException e) {
- continue;
- }
- ConsumeQueueInterface logic = new ConsumeQueue(
- topic,
- queueId,
- StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
- this.messageStoreConfig.getMappedFileSizeConsumeQueue(),
- this.messageStore);
- this.putConsumeQueue(topic, queueId, logic);
- if (!this.load(logic)) {
- return false;
- }
- }
- }
- }
+ private ConsumeQueueInterface createConsumeQueueByType(CQType cqType, String topic, int queueId, String storePath) {
+ if (Objects.equals(CQType.SimpleCQ, cqType)) {
+ return new ConsumeQueue(
+ topic,
+ queueId,
+ storePath,
+ this.messageStoreConfig.getMappedFileSizeConsumeQueue(),
+ this.messageStore);
+ } else if (Objects.equals(CQType.BatchCQ, cqType)) {
+ return new BatchConsumeQueue(
+ topic,
+ queueId,
+ storePath,
+ this.messageStoreConfig.getMapperFileSizeBatchConsumeQueue(),
+ this.messageStore);
+ } else {
+ throw new RuntimeException(format("queue type %s is not supported.", cqType.toString()));
}
+ }
- log.info("load logics queue all over, OK");
+ private void queueTypeShouldBe(String topic, CQType cqTypeExpected) {
+ TopicConfig topicConfig = this.topicConfigTable == null ? null : this.topicConfigTable.get(topic);
- return true;
+ CQType cqTypeActual = QueueTypeUtils.getCQType(Optional.ofNullable(topicConfig));
+
+ if (!Objects.equals(cqTypeExpected, cqTypeActual)) {
+ throw new RuntimeException(format("The queue type of topic: %s should be %s, but is %s", topic, cqTypeExpected, cqTypeActual));
+ }
}
public void recover(ConsumeQueueInterface consumeQueue) {
@@ -212,13 +196,9 @@ public class ConsumeQueueStore {
}
public void checkSelf() {
- Iterator<Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>> it = this.consumeQueueTable.entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> next = it.next();
- Iterator<Map.Entry<Integer, ConsumeQueueInterface>> itNext = next.getValue().entrySet().iterator();
- while (itNext.hasNext()) {
- Map.Entry<Integer, ConsumeQueueInterface> cq = itNext.next();
- this.checkSelf(cq.getValue());
+ for (Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> topicEntry : this.consumeQueueTable.entrySet()) {
+ for (Map.Entry<Integer, ConsumeQueueInterface> cqEntry : topicEntry.getValue().entrySet()) {
+ this.checkSelf(cqEntry.getValue());
}
}
}
@@ -292,28 +272,23 @@ public class ConsumeQueueStore {
newLogic = new BatchConsumeQueue(
topic,
queueId,
- StorePathConfigHelper.getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
+ getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
this.messageStoreConfig.getMapperFileSizeBatchConsumeQueue(),
this.messageStore);
- ConsumeQueueInterface oldLogic = map.putIfAbsent(queueId, newLogic);
- if (oldLogic != null) {
- logic = oldLogic;
- } else {
- logic = newLogic;
- }
} else {
newLogic = new ConsumeQueue(
topic,
queueId,
- StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
+ getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
this.messageStoreConfig.getMappedFileSizeConsumeQueue(),
this.messageStore);
- ConsumeQueueInterface oldLogic = map.putIfAbsent(queueId, newLogic);
- if (oldLogic != null) {
- logic = oldLogic;
- } else {
- logic = newLogic;
- }
+ }
+
+ ConsumeQueueInterface oldLogic = map.putIfAbsent(queueId, newLogic);
+ if (oldLogic != null) {
+ logic = oldLogic;
+ } else {
+ logic = newLogic;
}
return logic;
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
index 88cf181..9dad5ea 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
@@ -47,6 +47,8 @@ import static org.apache.rocketmq.common.message.MessageDecoder.CHARSET_UTF8;
import static org.apache.rocketmq.store.ConsumeQueue.CQ_STORE_UNIT_SIZE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
/**
* Test case for DefaultMessageStore.CleanCommitLogService and DefaultMessageStore.CleanConsumeQueueService
@@ -334,26 +336,6 @@ public class DefaultMessageStoreCleanFilesTest {
}
}
- private DefaultMessageStore.CleanCommitLogService getCleanCommitLogService(double diskSpaceCleanForciblyRatio)
- throws Exception {
- Field serviceField = messageStore.getClass().getDeclaredField("cleanCommitLogService");
- serviceField.setAccessible(true);
- DefaultMessageStore.CleanCommitLogService cleanCommitLogService =
- (DefaultMessageStore.CleanCommitLogService) serviceField.get(messageStore);
- serviceField.setAccessible(false);
-
- Field warningLevelRatioField = cleanCommitLogService.getClass().getDeclaredField("diskSpaceWarningLevelRatio");
- warningLevelRatioField.setAccessible(true);
- warningLevelRatioField.set(cleanCommitLogService, String.valueOf(diskSpaceCleanForciblyRatio));
- warningLevelRatioField.setAccessible(false);
-
- Field cleanForciblyRatioField = cleanCommitLogService.getClass().getDeclaredField("diskSpaceCleanForciblyRatio");
- cleanForciblyRatioField.setAccessible(true);
- cleanForciblyRatioField.set(cleanCommitLogService, String.valueOf(diskSpaceCleanForciblyRatio));
- cleanForciblyRatioField.setAccessible(false);
- return cleanCommitLogService;
- }
-
private DefaultMessageStore.CleanConsumeQueueService getCleanConsumeQueueService()
throws Exception {
Field serviceField = messageStore.getClass().getDeclaredField("cleanConsumeQueueService");
@@ -490,11 +472,15 @@ public class DefaultMessageStoreCleanFilesTest {
messageStore = new DefaultMessageStore(messageStoreConfig,
new BrokerStatsManager("test"), new MyMessageArrivingListener(), new BrokerConfig());
- cleanCommitLogService = getCleanCommitLogService(diskSpaceCleanForciblyRatio);
cleanConsumeQueueService = getCleanConsumeQueueService();
assertTrue(messageStore.load());
messageStore.start();
+
+ // partially mock a real obj
+ cleanCommitLogService = spy(cleanCommitLogService);
+ when(cleanCommitLogService.getDiskSpaceWarningLevelRatio()).thenReturn(diskSpaceCleanForciblyRatio);
+ when(cleanCommitLogService.getDiskSpaceCleanForciblyRatio()).thenReturn(diskSpaceCleanForciblyRatio);
}
private class MyMessageArrivingListener implements MessageArrivingListener {
diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
index 500cd81..bc5f896 100644
--- a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
@@ -17,8 +17,6 @@
package org.apache.rocketmq.store.queue;
-import org.apache.rocketmq.common.TopicAttributes;
-import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.message.MessageAccessor;
@@ -27,7 +25,6 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.utils.QueueTypeUtils;
-import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageExtBrokerInner;
@@ -43,16 +40,11 @@ import org.junit.Test;
import java.io.File;
import java.util.ArrayDeque;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Random;
import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
@@ -173,8 +165,9 @@ public class BatchConsumeMessageTest extends QueueTestBase {
long pullOffset = 0L;
int getMessageCount = 0;
+ int atMostMsgNum = 1;
while (true) {
- GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, pullOffset, 1, null);
+ GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, pullOffset, atMostMsgNum, null);
if (Objects.equals(getMessageResult.getStatus(), GetMessageStatus.OFFSET_OVERFLOW_ONE)) {
break;
}
@@ -229,8 +222,9 @@ public class BatchConsumeMessageTest extends QueueTestBase {
String topic = "TestDispatchBuildConsumeQueue";
createTopic(topic, CQType.SimpleCQ, messageStore);
- long timeStart = System.currentTimeMillis();
+ long timeStart = -1;
long timeMid = -1;
+ long commitLogMid = -1;
for (int i = 0; i < 100; i++) {
MessageExtBrokerInner messageExtBrokerInner = buildMessage(topic, -1);
@@ -238,8 +232,14 @@ public class BatchConsumeMessageTest extends QueueTestBase {
Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
Thread.sleep(2);
- if (i == 49)
- timeMid = System.currentTimeMillis();
+ if (i == 0) {
+ timeStart = putMessageResult.getAppendMessageResult().getStoreTimestamp();
+ }
+ if (i == 50) {
+ timeMid = putMessageResult.getAppendMessageResult().getStoreTimestamp();
+ commitLogMid = putMessageResult.getAppendMessageResult().getWroteOffset();
+ }
+
}
await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
@@ -261,16 +261,14 @@ public class BatchConsumeMessageTest extends QueueTestBase {
}
//check the message time
- long latencyAllowed = 20;
- long earlistMessageTime = messageStore.getEarliestMessageTime(topic, 0);
- Assert.assertTrue(earlistMessageTime > timeStart - latencyAllowed);
- Assert.assertTrue(earlistMessageTime < timeStart + latencyAllowed);
+ long earliestMessageTime = messageStore.getEarliestMessageTime(topic, 0);
+ Assert.assertEquals(timeStart, earliestMessageTime);
long messageStoreTime = messageStore.getMessageStoreTimeStamp(topic, 0, 50);
- Assert.assertTrue(messageStoreTime > timeMid - latencyAllowed);
- Assert.assertTrue(messageStoreTime < timeMid + latencyAllowed);
+ Assert.assertEquals(timeMid, messageStoreTime);
long commitLogOffset = messageStore.getCommitLogOffsetInQueue(topic, 0, 50);
Assert.assertTrue(commitLogOffset >= messageStore.getMinPhyOffset());
Assert.assertTrue(commitLogOffset <= messageStore.getMaxPhyOffset());
+ Assert.assertEquals(commitLogMid, commitLogOffset);
Assert.assertFalse(messageStore.checkInDiskByConsumeOffset(topic, 0, 50));
}
@@ -279,7 +277,7 @@ public class BatchConsumeMessageTest extends QueueTestBase {
public void testDispatchBuildBatchConsumeQueue() throws Exception {
String topic = "testDispatchBuildBatchConsumeQueue";
int batchNum = 10;
- long timeStart = System.currentTimeMillis();
+ long timeStart = -1;
long timeMid = -1;
createTopic(topic, CQType.BatchCQ, messageStore);
@@ -288,8 +286,13 @@ public class BatchConsumeMessageTest extends QueueTestBase {
PutMessageResult putMessageResult = messageStore.putMessage(buildMessage(topic, batchNum));
Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
Thread.sleep(2);
- if (i == 29)
- timeMid = System.currentTimeMillis();
+ if (i == 0) {
+ timeStart = putMessageResult.getAppendMessageResult().getStoreTimestamp();
+ }
+ if (i == 30) {
+ timeMid = putMessageResult.getAppendMessageResult().getStoreTimestamp();;
+ }
+
}
await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
@@ -309,12 +312,10 @@ public class BatchConsumeMessageTest extends QueueTestBase {
}
//check the message time
- long earlistMessageTime = messageStore.getEarliestMessageTime(topic, 0);
- Assert.assertTrue(earlistMessageTime > timeStart - 20);
- Assert.assertTrue(earlistMessageTime < timeStart + 20);
+ long earliestMessageTime = messageStore.getEarliestMessageTime(topic, 0);
+ Assert.assertEquals(earliestMessageTime, timeStart);
long messageStoreTime = messageStore.getMessageStoreTimeStamp(topic, 0, 300);
- Assert.assertTrue(messageStoreTime > timeMid - 20);
- Assert.assertTrue(messageStoreTime < timeMid + 20);
+ Assert.assertEquals(messageStoreTime, timeMid);
long commitLogOffset = messageStore.getCommitLogOffsetInQueue(topic, 0, 300);
Assert.assertTrue(commitLogOffset >= messageStore.getMinPhyOffset());
Assert.assertTrue(commitLogOffset <= messageStore.getMaxPhyOffset());
@@ -450,21 +451,4 @@ public class BatchConsumeMessageTest extends QueueTestBase {
}
}
- private void createTopic(String topic, CQType cqType, MessageStore messageStore) {
- ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
- TopicConfig topicConfigToBeAdded = new TopicConfig();
-
- Map<String, String> attributes = new HashMap<>();
- attributes.put(TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), cqType.toString());
- topicConfigToBeAdded.setTopicName(topic);
- topicConfigToBeAdded.setAttributes(attributes);
-
- topicConfigTable.put(topic, topicConfigToBeAdded);
- ((DefaultMessageStore)messageStore).setTopicConfigTable(topicConfigTable);
- }
-
- private Callable<Boolean> fullyDispatched(MessageStore messageStore) {
- return () -> messageStore.dispatchBehindBytes() == 0;
- }
-
}
diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreTest.java
new file mode 100644
index 0000000..a8379fc
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.queue;
+
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.attribute.CQType;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.UUID;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+
+public class ConsumeQueueStoreTest extends QueueTestBase {
+ private MessageStore messageStore;
+
+ @Before
+ public void init() throws Exception {
+ messageStore = createMessageStore(null, true);
+ messageStore.load();
+ messageStore.start();
+ }
+
+ @After
+ public void destroy() {
+ messageStore.shutdown();
+ messageStore.destroy();
+
+ File file = new File(messageStore.getMessageStoreConfig().getStorePathRootDir());
+ UtilAll.deleteFile(file);
+ }
+
+ @Test
+ public void testLoadConsumeQueuesWithWrongAttribute() {
+ String normalTopic = UUID.randomUUID().toString();
+ createTopic(normalTopic, CQType.SimpleCQ, messageStore);
+
+ for (int i = 0; i < 10; i++) {
+ PutMessageResult putMessageResult = messageStore.putMessage(buildMessage(normalTopic, -1));
+ assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
+ }
+
+ await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
+
+ // simulate delete topic but with files left.
+ ((DefaultMessageStore)messageStore).setTopicConfigTable(null);
+
+ createTopic(normalTopic, CQType.BatchCQ, messageStore);
+ messageStore.shutdown();
+
+ RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> messageStore.getQueueStore().load());
+ Assert.assertTrue(runtimeException.getMessage().endsWith("should be SimpleCQ, but is BatchCQ"));
+ }
+
+ @Test
+ public void testLoadBatchConsumeQueuesWithWrongAttribute() {
+ String batchTopic = UUID.randomUUID().toString();
+ createTopic(batchTopic, CQType.BatchCQ, messageStore);
+
+ for (int i = 0; i < 10; i++) {
+ PutMessageResult putMessageResult = messageStore.putMessage(buildMessage(batchTopic, 10));
+ assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
+ }
+
+ await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
+
+ // simulate delete topic but with files left.
+ ((DefaultMessageStore)messageStore).setTopicConfigTable(null);
+
+ createTopic(batchTopic, CQType.SimpleCQ, messageStore);
+ messageStore.shutdown();
+
+ RuntimeException runtimeException = Assert.assertThrows(RuntimeException.class, () -> messageStore.getQueueStore().load());
+ Assert.assertTrue(runtimeException.getMessage().endsWith("should be BatchCQ, but is SimpleCQ"));
+ }
+
+}
diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java b/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java
index c16d7cb..506cbd6 100644
--- a/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java
+++ b/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java
@@ -17,6 +17,9 @@
package org.apache.rocketmq.store.queue;
import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.TopicAttributes;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
@@ -31,10 +34,31 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import java.io.File;
+import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
public class QueueTestBase extends StoreTestBase {
+ protected void createTopic(String topic, CQType cqType, MessageStore messageStore) {
+ ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
+ TopicConfig topicConfigToBeAdded = new TopicConfig();
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), cqType.toString());
+ topicConfigToBeAdded.setTopicName(topic);
+ topicConfigToBeAdded.setAttributes(attributes);
+
+ topicConfigTable.put(topic, topicConfigToBeAdded);
+ ((DefaultMessageStore)messageStore).setTopicConfigTable(topicConfigTable);
+ }
+
+ protected Callable<Boolean> fullyDispatched(MessageStore messageStore) {
+ return () -> messageStore.dispatchBehindBytes() == 0;
+ }
+
protected MessageStore createMessageStore(String baseDir, boolean extent) throws Exception {
if (baseDir == null) {
baseDir = createBaseDir();
@@ -58,14 +82,11 @@ public class QueueTestBase extends StoreTestBase {
messageStoreConfig.setFlushIntervalCommitLog(1);
messageStoreConfig.setFlushCommitLogThoroughInterval(2);
- DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), new MessageArrivingListener() {
- @Override
- public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
- byte[] filterBitMap, Map<String, String> properties) {
-
- }
- }, new BrokerConfig());
- return messageStore;
+ return new DefaultMessageStore(
+ messageStoreConfig,
+ new BrokerStatsManager("simpleTest"),
+ (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> {},
+ new BrokerConfig());
}
public MessageExtBrokerInner buildMessage(String topic, int batchNum) {