You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/01/06 02:57:41 UTC

[rocketmq] branch 5.0.0-alpha updated: [ISSUE #3708] Both CQ and BCQ need to be supported in DefaultMessageStore. (#3712)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-alpha
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha by this push:
     new e923a7e  [ISSUE #3708] Both CQ and BCQ need to be supported in DefaultMessageStore. (#3712)
e923a7e is described below

commit e923a7ea328e5d62cf27cf96e446faa3bcfe25f0
Author: Hongjian Fei <er...@163.com>
AuthorDate: Thu Jan 6 10:56:05 2022 +0800

    [ISSUE #3708] Both CQ and BCQ need to be supported in DefaultMessageStore. (#3712)
    
    * [RIP-26] Both CQ and BCQ will be supported in DefaultMessageStore.
    
    * [RIP-26] Both CQ and BCQ will be supported in DefaultMessageStore.
---
 .../apache/rocketmq/broker/BrokerController.java   |   11 +-
 .../broker/processor/AdminBrokerProcessor.java     |    1 -
 .../broker/processor/ConsumerManageProcessor.java  |    2 -
 .../broker/processor/SendMessageProcessor.java     |    7 +-
 .../broker/topic/TopicConfigManagerTest.java       |    9 +-
 .../java/org/apache/rocketmq/client/MQAdmin.java   |   14 +-
 .../client/consumer/DefaultMQPullConsumer.java     |    7 +-
 .../client/consumer/DefaultMQPushConsumer.java     |    6 +-
 .../apache/rocketmq/client/impl/MQAdminImpl.java   |    6 +-
 .../impl/consumer/DefaultMQPullConsumerImpl.java   |    2 +-
 .../impl/consumer/DefaultMQPushConsumerImpl.java   |    2 +-
 .../impl/producer/DefaultMQProducerImpl.java       |    2 +-
 .../client/producer/DefaultMQProducer.java         |    9 +-
 .../apache/rocketmq/common/TopicAttributes.java    |    4 +-
 .../apache/rocketmq/common/attribute}/CQType.java  |    2 +-
 .../rocketmq/common/utils/QueueTypeUtils.java      |   51 +
 .../java/org/apache/rocketmq/store/CommitLog.java  |   45 +-
 .../org/apache/rocketmq/store/ConsumeQueue.java    |   19 +-
 .../apache/rocketmq/store/DefaultMessageStore.java |  223 +-
 .../org/apache/rocketmq/store/MappedFileQueue.java |   16 +-
 .../org/apache/rocketmq/store/MessageStore.java    |   17 +-
 .../apache/rocketmq/store/PutMessageContext.java   |   67 +-
 .../java/org/apache/rocketmq/store/StoreUtil.java  |    4 -
 .../apache/rocketmq/store/StreamMessageStore.java  | 2573 --------------------
 .../org/apache/rocketmq/store/TopicQueueLock.java  |   11 +-
 .../rocketmq/store/config/MessageStoreConfig.java  |   11 -
 .../rocketmq/store/dledger/DLedgerCommitLog.java   |    3 +-
 .../apache/rocketmq/store/logfile/MappedFile.java  |    1 -
 .../rocketmq/store/queue/BatchConsumeQueue.java    |   28 +-
 .../store/queue/ConsumeQueueInterface.java         |    2 +
 .../rocketmq/store/queue/ConsumeQueueStore.java    |  290 ++-
 .../rocketmq/store/queue/FileQueueLifeCycle.java   |    2 +
 .../rocketmq/store/queue/QueueOffsetAssigner.java  |   60 +
 .../apache/rocketmq/store/util/QueueTypeUtils.java |   55 -
 .../store/DefaultMessageStoreShutDownTest.java     |    3 +-
 .../rocketmq/store/DefaultMessageStoreTest.java    |    3 +-
 .../store/queue/BatchConsumeMessageTest.java       |  274 ++-
 .../store/queue/BatchConsumeQueueTest.java         |   27 +-
 .../rocketmq/store/queue/ConsumeQueueTest.java     |    3 +-
 .../apache/rocketmq/store/queue/QueueTestBase.java |   28 +-
 .../rocketmq/test/util/MQAdminTestUtils.java       |    8 +-
 .../org/apache/rocketmq/test/base/BaseConf.java    |   35 +-
 .../rocketmq/test/base/IntegrationTestBase.java    |   49 +-
 .../base/dledger/DLedgerProduceAndConsumeIT.java   |    5 +-
 .../test/client/producer/batch/BatchSendIT.java    |   37 +-
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |    8 +-
 .../tools/admin/DefaultMQAdminExtImpl.java         |   16 +-
 .../tools/command/topic/UpdateTopicSubCommand.java |    1 -
 48 files changed, 912 insertions(+), 3147 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 6f9563b..7cf48c9 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -96,11 +96,9 @@ import org.apache.rocketmq.srvutil.FileWatchService;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.MessageArrivingListener;
 import org.apache.rocketmq.store.MessageStore;
-import org.apache.rocketmq.store.StreamMessageStore;
 import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
-import org.apache.rocketmq.store.queue.CQType;
 import org.apache.rocketmq.store.stats.BrokerStats;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
@@ -298,14 +296,9 @@ public class BrokerController {
 
         if (result) {
             try {
-                MessageStore messageStore;
-                if (Objects.equals(CQType.BatchCQ.toString(), this.messageStoreConfig.getDefaultCQType())) {
-                    messageStore = new StreamMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig);
-                } else {
-                    messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig);
-                }
+                this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig);
+                ((DefaultMessageStore) this.messageStore).setTopicConfigTable(topicConfigManager.getTopicConfigTable());
 
-                this.messageStore = messageStore;
                 if (messageStoreConfig.isEnableDLegerCommitLog()) {
                     DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
                     ((DLedgerCommitLog) messageStore.getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 5b8a19f..c1819a5 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -33,7 +33,6 @@ import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.PlainAccessConfig;
-import org.apache.rocketmq.common.TopicAttributes;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.admin.ConsumeStats;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index a266442..31a7993 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -32,12 +32,10 @@ import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHe
 import org.apache.rocketmq.common.rpc.RpcClientUtils;
 import org.apache.rocketmq.common.rpc.RpcRequest;
 import org.apache.rocketmq.common.rpc.RpcResponse;
-import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
 import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
-import org.apache.rocketmq.common.sysflag.PullSysFlag;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 3e3173e..a3f5c4e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -18,9 +18,9 @@ package org.apache.rocketmq.broker.processor;
 
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
-import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
@@ -42,7 +42,6 @@ import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.help.FAQUrl;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
-import org.apache.rocketmq.common.message.MessageClientIDSetter;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -56,6 +55,7 @@ import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.common.sysflag.TopicSysFlag;
 import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.QueueTypeUtils;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.netty.RemotingResponseCallback;
@@ -63,7 +63,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.MessageExtBatch;
 import org.apache.rocketmq.store.MessageExtBrokerInner;
 import org.apache.rocketmq.store.PutMessageResult;
-import org.apache.rocketmq.store.StoreUtil;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.config.StorePathConfigHelper;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
@@ -684,7 +683,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
 
         CompletableFuture<PutMessageResult> putMessageResult;
 
-        if (StoreUtil.isStreamMode(this.brokerController.getMessageStore()) && MessageClientIDSetter.getUniqID(messageExtBatch) != null) {
+        if (QueueTypeUtils.isBatchCq(Optional.of(topicConfig)) && MessageClientIDSetter.getUniqID(messageExtBatch) != null) {
             // newly introduced inner-batch message
             messageExtBatch.setSysFlag(messageExtBatch.getSysFlag() | MessageSysFlag.NEED_UNWRAP_FLAG);
             messageExtBatch.setSysFlag(messageExtBatch.getSysFlag() | MessageSysFlag.INNER_BATCH_FLAG);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
index 79cc01a..6b4f30d 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
@@ -24,9 +24,9 @@ import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.attribute.BooleanAttribute;
 import org.apache.rocketmq.common.attribute.EnumAttribute;
 import org.apache.rocketmq.common.attribute.LongRangeAttribute;
+import org.apache.rocketmq.common.utils.QueueTypeUtils;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.store.queue.CQType;
-import org.apache.rocketmq.store.util.QueueTypeUtils;
+import org.apache.rocketmq.common.attribute.CQType;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -37,6 +37,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static com.google.common.collect.Sets.newHashSet;
 import static java.util.Arrays.asList;
@@ -123,7 +124,7 @@ public class TopicConfigManagerTest {
     @Test
     public void testAddWrongValueOnCreating() {
         Map<String, String> attributes = new HashMap<>();
-        attributes.put("+" + TopicAttributes.QUEUE_TYPE.getName(), "wrong-value");
+        attributes.put("+" + TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), "wrong-value");
 
         TopicConfig topicConfig = new TopicConfig();
         topicConfig.setTopicName("new-topic");
@@ -300,7 +301,7 @@ public class TopicConfigManagerTest {
         topicConfigManager.updateTopicConfig(topicConfig);
 
         TopicConfig topicConfigUpdated = topicConfigManager.getTopicConfigTable().get(topic);
-        Assert.assertEquals(CQType.SimpleCQ, QueueTypeUtils.getCQType(topicConfigUpdated));
+        Assert.assertEquals(CQType.SimpleCQ, QueueTypeUtils.getCQType(Optional.of(topicConfigUpdated)));
 
         Assert.assertEquals("true", topicConfigUpdated.getAttributes().get(unchangeable));
     }
diff --git a/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java b/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java
index 63b2d14..79386bd 100644
--- a/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java
+++ b/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java
@@ -22,29 +22,31 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 
+import java.util.Map;
+
 /**
  * Base interface for MQ management
  */
 public interface MQAdmin {
     /**
      * Creates an topic
-     *
-     * @param key accesskey
+     *  @param key accesskey
      * @param newTopic topic name
      * @param queueNum topic's queue number
+     * @param attributes
      */
-    void createTopic(final String key, final String newTopic, final int queueNum)
+    void createTopic(final String key, final String newTopic, final int queueNum, Map<String, String> attributes)
         throws MQClientException;
 
     /**
      * Creates an topic
-     *
-     * @param key accesskey
+     *  @param key accesskey
      * @param newTopic topic name
      * @param queueNum topic's queue number
      * @param topicSysFlag topic system flag
+     * @param attributes
      */
-    void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
+    void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes)
         throws MQClientException;
 
     /**
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index 5829f77..3b893bb 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.client.consumer;
 
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.QueryResult;
@@ -124,8 +125,8 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
      */
     @Deprecated
     @Override
-    public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
-        createTopic(key, withNamespace(newTopic), queueNum, 0);
+    public void createTopic(String key, String newTopic, int queueNum, Map<String, String> attributes) throws MQClientException {
+        createTopic(key, withNamespace(newTopic), queueNum, 0, null);
     }
 
     /**
@@ -133,7 +134,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
      */
     @Deprecated
     @Override
-    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
+    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes) throws MQClientException {
         this.defaultMQPullConsumerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag);
     }
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index fce6b64..5ef2b63 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -431,8 +431,8 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
      */
     @Deprecated
     @Override
-    public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
-        createTopic(key, withNamespace(newTopic), queueNum, 0);
+    public void createTopic(String key, String newTopic, int queueNum, Map<String, String> attributes) throws MQClientException {
+        createTopic(key, withNamespace(newTopic), queueNum, 0, null);
     }
     
     @Override
@@ -448,7 +448,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
      */
     @Deprecated
     @Override
-    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
+    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes) throws MQClientException {
         this.defaultMQPushConsumerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag);
     }
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index 0a6e005..73e35c5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -76,10 +77,10 @@ public class MQAdminImpl {
     }
 
     public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
-        createTopic(key, newTopic, queueNum, 0);
+        createTopic(key, newTopic, queueNum, 0, null);
     }
 
-    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
+    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes) throws MQClientException {
         try {
             Validators.checkTopic(newTopic);
             Validators.isSystemTopic(newTopic);
@@ -100,6 +101,7 @@ public class MQAdminImpl {
                         topicConfig.setReadQueueNums(queueNum);
                         topicConfig.setWriteQueueNums(queueNum);
                         topicConfig.setTopicSysFlag(topicSysFlag);
+                        topicConfig.setAttributes(attributes);
 
                         boolean createOK = false;
                         for (int i = 0; i < 5; i++) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 05d3d48..497b274 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -100,7 +100,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
 
     public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
         this.isRunning();
-        this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag);
+        this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag, null);
     }
 
     private void isRunning() throws MQClientException {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 2fa3830..abe3ca7 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -184,7 +184,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     }
 
     public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
-        this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag);
+        this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag, null);
     }
 
     public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index bf2ca28..c1ec308 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -447,7 +447,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
         Validators.checkTopic(newTopic);
         Validators.isSystemTopic(newTopic);
 
-        this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag);
+        this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag, null);
     }
 
     private void makeSureStateOK() throws MQClientException {
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 230785c..8c046aa 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.client.producer;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
@@ -757,12 +758,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
      * @param key accesskey
      * @param newTopic topic name
      * @param queueNum topic's queue number
+     * @param attributes
      * @throws MQClientException if there is any client error.
      */
     @Deprecated
     @Override
-    public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
-        createTopic(key, withNamespace(newTopic), queueNum, 0);
+    public void createTopic(String key, String newTopic, int queueNum, Map<String, String> attributes) throws MQClientException {
+        createTopic(key, withNamespace(newTopic), queueNum, 0, null);
     }
 
     /**
@@ -773,11 +775,12 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
      * @param newTopic topic name
      * @param queueNum topic's queue number
      * @param topicSysFlag topic system flag
+     * @param attributes
      * @throws MQClientException if there is any client error.
      */
     @Deprecated
     @Override
-    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
+    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes) throws MQClientException {
         this.defaultMQProducerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag);
     }
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java b/common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java
index 7bed217..add383f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java
@@ -25,7 +25,7 @@ import java.util.Map;
 import static com.google.common.collect.Sets.newHashSet;
 
 public class TopicAttributes {
-    public static final EnumAttribute QUEUE_TYPE = new EnumAttribute(
+    public static final EnumAttribute QUEUE_TYPE_ATTRIBUTE = new EnumAttribute(
             "queue.type",
             false,
             newHashSet("BatchCQ", "SimpleCQ"),
@@ -35,6 +35,6 @@ public class TopicAttributes {
 
     static {
         ALL = new HashMap<>();
-        ALL.put(QUEUE_TYPE.getName(), QUEUE_TYPE);
+        ALL.put(QUEUE_TYPE_ATTRIBUTE.getName(), QUEUE_TYPE_ATTRIBUTE);
     }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/CQType.java b/common/src/main/java/org/apache/rocketmq/common/attribute/CQType.java
similarity index 94%
rename from store/src/main/java/org/apache/rocketmq/store/queue/CQType.java
rename to common/src/main/java/org/apache/rocketmq/common/attribute/CQType.java
index efd65af..6bd6ad2 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/CQType.java
+++ b/common/src/main/java/org/apache/rocketmq/common/attribute/CQType.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.store.queue;
+package org.apache.rocketmq.common.attribute;
 
 public enum CQType {
     SimpleCQ,
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/QueueTypeUtils.java b/common/src/main/java/org/apache/rocketmq/common/utils/QueueTypeUtils.java
new file mode 100644
index 0000000..e2f006e
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/QueueTypeUtils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.common.utils;
+
+import org.apache.rocketmq.common.TopicAttributes;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.attribute.CQType;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+public class QueueTypeUtils {
+
+    public static boolean isBatchCq(Optional<TopicConfig> topicConfig) {
+        return Objects.equals(CQType.BatchCQ, getCQType(topicConfig));
+    }
+
+    public static CQType getCQType(Optional<TopicConfig> topicConfig) {
+        if (!topicConfig.isPresent()) {
+            return CQType.valueOf(TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getDefaultValue());
+        }
+
+        String attributeName = TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName();
+
+        Map<String, String> attributes = topicConfig.get().getAttributes();
+        if (attributes == null || attributes.size() == 0) {
+            return CQType.valueOf(TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getDefaultValue());
+        }
+
+        if (attributes.containsKey(attributeName)) {
+            return CQType.valueOf(attributes.get(attributeName));
+        } else {
+            return CQType.valueOf(TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getDefaultValue());
+        }
+    }
+}
\ No newline at end of file
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 062c269..2ae84eb 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -24,10 +24,10 @@ import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 
 import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageAccessor;
@@ -44,6 +45,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.QueueTypeUtils;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.store.config.BrokerRole;
@@ -51,9 +53,8 @@ import org.apache.rocketmq.store.config.FlushDiskType;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.ha.HAService;
 import org.apache.rocketmq.store.logfile.MappedFile;
-import org.apache.rocketmq.store.queue.CQType;
+import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.store.schedule.ScheduleMessageService;
-import org.apache.rocketmq.store.util.QueueTypeUtils;
 
 /**
  * Store all metadata downtime for recovery, data protection reliability
@@ -433,8 +434,8 @@ public class CommitLog implements Swappable {
 
     private void setBatchSizeIfNeeded(Map<String, String> propertiesMap, DispatchRequest dispatchRequest) {
         if (null != propertiesMap && propertiesMap.containsKey(MessageConst.PROPERTY_INNER_NUM) && propertiesMap.containsKey(MessageConst.PROPERTY_INNER_BASE)) {
-            dispatchRequest.setMsgBaseOffset(Long.valueOf(propertiesMap.get(MessageConst.PROPERTY_INNER_BASE)));
-            dispatchRequest.setBatchSize(Short.valueOf(propertiesMap.get(MessageConst.PROPERTY_INNER_NUM)));
+            dispatchRequest.setMsgBaseOffset(Long.parseLong(propertiesMap.get(MessageConst.PROPERTY_INNER_BASE)));
+            dispatchRequest.setBatchSize(Short.parseShort(propertiesMap.get(MessageConst.PROPERTY_INNER_NUM)));
         }
     }
 
@@ -672,7 +673,7 @@ public class CommitLog implements Swappable {
 
         topicQueueLock.lock(topicQueueKey);
         try {
-            defaultMessageStore.assignOffset(topicQueueKey, msg, getBatchNum(msg));
+            defaultMessageStore.assignOffset(topicQueueKey, msg, getMessageNum(msg));
 
             PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
             if (encodeResult != null) {
@@ -1031,15 +1032,6 @@ public class CommitLog implements Swappable {
         return this.mappedFileQueue.retryDeleteFirstFile(intervalForcibly);
     }
 
-    public void removeQueueFromTopicQueueTable(final String topic, final int queueId) {
-        String key = topic + "-" + queueId;
-        synchronized (this) {
-            this.defaultMessageStore.removeOffsetTable(key);
-        }
-
-        log.info("removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}", topic, queueId);
-    }
-
     public void checkSelf() {
         mappedFileQueue.checkSelf();
     }
@@ -1058,19 +1050,24 @@ public class CommitLog implements Swappable {
         return diff;
     }
 
-    protected short getBatchNum(MessageExtBrokerInner msgInner) {
-        short batchNum = 1;
+    protected short getMessageNum(MessageExtBrokerInner msgInner) {
+        short messageNum = 1;
         // IF inner batch, build batchQueueOffset and batchNum property.
-        CQType cqType = QueueTypeUtils.getCQType(defaultMessageStore);
-        if (MessageSysFlag.check(msgInner.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG) || CQType.BatchCQ.equals(cqType)) {
+        CQType cqType = getCqType(msgInner);
 
+        if (MessageSysFlag.check(msgInner.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG) || CQType.BatchCQ.equals(cqType)) {
             if (msgInner.getProperty(MessageConst.PROPERTY_INNER_NUM) != null) {
-                batchNum = Short.parseShort(msgInner.getProperty(MessageConst.PROPERTY_INNER_NUM));
-                batchNum = batchNum >= 1 ? batchNum : 1;
+                messageNum = Short.parseShort(msgInner.getProperty(MessageConst.PROPERTY_INNER_NUM));
+                messageNum = messageNum >= 1 ? messageNum : 1;
             }
         }
 
-        return batchNum;
+        return messageNum;
+    }
+
+    private CQType getCqType(MessageExtBrokerInner msgInner) {
+        Optional<TopicConfig> topicConfig = this.defaultMessageStore.getTopicConfig(msgInner.getTopic());
+        return QueueTypeUtils.getCQType(topicConfig);
     }
 
     abstract class FlushCommitLogService extends ServiceThread {
@@ -1488,7 +1485,7 @@ public class CommitLog implements Swappable {
             Long queueOffset = msgInner.getQueueOffset();
 
             // this msg maybe a inner-batch msg.
-            short batchNum = getBatchNum(msgInner);
+            short messageNum = getMessageNum(msgInner);
 
             // Transaction messages that require special handling
             final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
@@ -1545,7 +1542,7 @@ public class CommitLog implements Swappable {
             CommitLog.this.getMessageStore().getPerfCounter().endTick("WRITE_MEMORY_TIME_MS");
             msgInner.setEncodedBuff(null);
             return new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
-                msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills, batchNum);
+                msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills, messageNum);
         }
 
         public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 0efe74c..7763a0f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.store;
 
 import java.io.File;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.List;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
@@ -25,10 +26,11 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.config.StorePathConfigHelper;
 import org.apache.rocketmq.store.logfile.MappedFile;
-import org.apache.rocketmq.store.queue.CQType;
+import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
 import org.apache.rocketmq.store.queue.CqUnit;
 import org.apache.rocketmq.store.queue.FileQueueLifeCycle;
+import org.apache.rocketmq.store.queue.QueueOffsetAssigner;
 import org.apache.rocketmq.store.queue.ReferredIterator;
 
 public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
@@ -37,7 +39,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
     public static final int CQ_STORE_UNIT_SIZE = 20;
     private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
 
-    private final DefaultMessageStore defaultMessageStore;
+    private final MessageStore defaultMessageStore;
 
     private final MappedFileQueue mappedFileQueue;
     private final String topic;
@@ -55,7 +57,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
         final int queueId,
         final String storePath,
         final int mappedFileSize,
-        final DefaultMessageStore defaultMessageStore) {
+        final MessageStore defaultMessageStore) {
         this.storePath = storePath;
         this.mappedFileSize = mappedFileSize;
         this.defaultMessageStore = defaultMessageStore;
@@ -438,6 +440,17 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
         this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
     }
 
+    @Override
+    public void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, MessageExtBrokerInner msg, short messageNum) {
+        String topicQueueKey = getTopic() + "-" + getQueueId();
+        HashMap<String, Long> topicQueueTable = queueOffsetAssigner.getTopicQueueTable();
+
+        long topicOffset = topicQueueTable.computeIfAbsent(topicQueueKey, k -> 0L);
+        topicQueueTable.put(topicQueueKey, topicOffset + messageNum);
+
+        msg.setQueueOffset(topicOffset);
+    }
+
     private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
         final long cqOffset) {
 
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index d25b9eb..da47be6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -32,9 +32,9 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -47,13 +47,16 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.SystemClock;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.running.RunningStats;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.QueueTypeUtils;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.store.config.BrokerRole;
@@ -74,8 +77,6 @@ import org.apache.rocketmq.store.schedule.ScheduleMessageService;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.apache.rocketmq.store.util.PerfCounter;
 
-import static java.lang.String.format;
-
 public class DefaultMessageStore implements MessageStore {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
 
@@ -87,10 +88,6 @@ public class DefaultMessageStore implements MessageStore {
 
     private final ConsumeQueueStore consumeQueueStore;
 
-    private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueueInterface>> consumeQueueTable;
-
-    protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<>(1024);
-
     private final FlushConsumeQueueService flushConsumeQueueService;
 
     private final CleanCommitLogService cleanCommitLogService;
@@ -154,8 +151,7 @@ public class DefaultMessageStore implements MessageStore {
         } else {
             this.commitLog = new CommitLog(this);
         }
-        this.consumeQueueTable = new ConcurrentHashMap<>(32);
-        this.consumeQueueStore = new ConsumeQueueStore(this, this.messageStoreConfig, this.consumeQueueTable);
+        this.consumeQueueStore = new ConsumeQueueStore(this, this.messageStoreConfig);
 
         this.flushConsumeQueueService = new FlushConsumeQueueService();
         this.cleanCommitLogService = new CleanCommitLogService();
@@ -194,13 +190,7 @@ public class DefaultMessageStore implements MessageStore {
 
     @Override
     public void truncateDirtyLogicFiles(long phyOffset) {
-        ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> tables = DefaultMessageStore.this.consumeQueueTable;
-
-        for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : tables.values()) {
-            for (ConsumeQueueInterface logic : maps.values()) {
-                this.consumeQueueStore.truncateDirtyLogicFiles(logic, phyOffset);
-            }
-        }
+        this.consumeQueueStore.truncateDirty(phyOffset);
     }
 
     /**
@@ -211,7 +201,6 @@ public class DefaultMessageStore implements MessageStore {
         boolean result = true;
 
         try {
-            long start = System.currentTimeMillis();
             boolean lastExitOK = !this.isTempFileExist();
             log.info("last shutdown {}, root dir: {}", lastExitOK ? "normally" : "abnormally", messageStoreConfig.getStorePathRootDir());
 
@@ -219,7 +208,7 @@ public class DefaultMessageStore implements MessageStore {
             result = result && this.commitLog.load();
 
             // load Consume Queue
-            result = result && this.loadConsumeQueue();
+            result = result && this.consumeQueueStore.load();
 
             if (result) {
                 this.storeCheckpoint =
@@ -269,7 +258,7 @@ public class DefaultMessageStore implements MessageStore {
              * 4. Make sure the fall-behind messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed.
              */
             long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
-            for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
+            for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.getConsumeQueueTable().values()) {
                 for (ConsumeQueueInterface logic : maps.values()) {
                     if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
                         maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
@@ -387,11 +376,7 @@ public class DefaultMessageStore implements MessageStore {
 
     @Override
     public void destroyLogics() {
-        for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
-            for (ConsumeQueueInterface logic : maps.values()) {
-                this.consumeQueueStore.destroy(logic);
-            }
-        }
+        this.consumeQueueStore.destroy();
     }
 
     private PutMessageStatus checkMessage(MessageExtBrokerInner msg) {
@@ -464,6 +449,20 @@ public class DefaultMessageStore implements MessageStore {
             return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
         }
 
+        if (msg.getProperties().containsKey(MessageConst.PROPERTY_INNER_NUM)
+                && !MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) {
+            log.warn("[BUG]The message had property {} but is not an inner batch", MessageConst.PROPERTY_INNER_NUM);
+            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+        }
+
+        if (MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) {
+            Optional<TopicConfig> topicConfig = this.getTopicConfig(msg.getTopic());
+            if (!QueueTypeUtils.isBatchCq(topicConfig)) {
+                log.error("[BUG]The message is an inner batch but cq type is not batch cq");
+                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+            }
+        }
+
         long beginTime = this.getSystemClock().now();
         CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
 
@@ -649,8 +648,7 @@ public class DefaultMessageStore implements MessageStore {
                                 break;
                             }
 
-                            if (this.isTheBatchFull(sizePy, maxMsgNums, maxPullSize, getResult.getBufferTotalSize(), getResult.getMessageCount(),
-                                    isInDisk)) {
+                            if (this.isTheBatchFull(sizePy, cqUnit.getBatchNum(), maxMsgNums, maxPullSize, getResult.getBufferTotalSize(), getResult.getMessageCount(), isInDisk)) {
                                 break;
                             }
 
@@ -756,7 +754,7 @@ public class DefaultMessageStore implements MessageStore {
                 return logic.getMaxOffsetInQueue();
             }
         } else {
-            Long offset = this.topicQueueTable.get(topic + "-" + queueId);
+            Long offset = this.consumeQueueStore.getMaxOffset(topic, queueId);
             if (offset != null) {
                 return offset;
             }
@@ -800,7 +798,11 @@ public class DefaultMessageStore implements MessageStore {
     public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) {
         ConsumeQueueInterface logic = this.findConsumeQueue(topic, queueId);
         if (logic != null) {
-            return logic.getOffsetInQueueByTime(timestamp);
+            long resultOffset = logic.getOffsetInQueueByTime(timestamp);
+            // Make sure the result offset is in valid range.
+            resultOffset = Math.max(resultOffset, logic.getMinOffsetInQueue());
+            resultOffset = Math.min(resultOffset, logic.getMaxOffsetInQueue());
+            return resultOffset;
         }
 
         return 0;
@@ -1071,7 +1073,7 @@ public class DefaultMessageStore implements MessageStore {
 
     @Override
     public int cleanUnusedTopic(Set<String> topics) {
-        Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>> it = this.consumeQueueTable.entrySet().iterator();
+        Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>> it = this.getConsumeQueueTable().entrySet().iterator();
         while (it.hasNext()) {
             Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> next = it.next();
             String topic = next.getKey();
@@ -1086,7 +1088,7 @@ public class DefaultMessageStore implements MessageStore {
                         cq.getQueueId()
                     );
 
-                    this.commitLog.removeQueueFromTopicQueueTable(cq.getTopic(), cq.getQueueId());
+                    this.consumeQueueStore.removeTopicQueueTable(cq.getTopic(), cq.getQueueId());
                 }
                 it.remove();
 
@@ -1105,45 +1107,7 @@ public class DefaultMessageStore implements MessageStore {
     public void cleanExpiredConsumerQueue() {
         long minCommitLogOffset = this.commitLog.getMinOffset();
 
-        Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>> it = this.consumeQueueTable.entrySet().iterator();
-        while (it.hasNext()) {
-            Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> next = it.next();
-            String topic = next.getKey();
-            if (!topic.equals(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC)) {
-                ConcurrentMap<Integer, ConsumeQueueInterface> queueTable = next.getValue();
-                Iterator<Entry<Integer, ConsumeQueueInterface>> itQT = queueTable.entrySet().iterator();
-                while (itQT.hasNext()) {
-                    Entry<Integer, ConsumeQueueInterface> nextQT = itQT.next();
-                    long maxCLOffsetInConsumeQueue = nextQT.getValue().getLastOffset();
-
-                    if (maxCLOffsetInConsumeQueue == -1) {
-                        log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.",
-                            nextQT.getValue().getTopic(),
-                            nextQT.getValue().getQueueId(),
-                            nextQT.getValue().getMaxPhysicOffset(),
-                            nextQT.getValue().getMinLogicOffset());
-                    } else if (maxCLOffsetInConsumeQueue < minCommitLogOffset) {
-                        log.info(
-                            "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}",
-                            topic,
-                            nextQT.getKey(),
-                            minCommitLogOffset,
-                            maxCLOffsetInConsumeQueue);
-
-                        DefaultMessageStore.this.commitLog.removeQueueFromTopicQueueTable(nextQT.getValue().getTopic(),
-                            nextQT.getValue().getQueueId());
-
-                        this.consumeQueueStore.destroy(nextQT.getValue());
-                        itQT.remove();
-                    }
-                }
-
-                if (queueTable.isEmpty()) {
-                    log.info("cleanExpiredConsumerQueue: {},topic destroyed", topic);
-                    it.remove();
-                }
-            }
-        }
+        this.consumeQueueStore.cleanExpired(minCommitLogOffset);
     }
 
     public Map<String, Long> getMessageIds(final String topic, final int queueId, long minOffset, long maxOffset,
@@ -1269,13 +1233,13 @@ public class DefaultMessageStore implements MessageStore {
         return (maxOffsetPy - offsetPy) > memory;
     }
 
-    private boolean isTheBatchFull(int sizePy, int maxMsgNums, long maxMsgSize, int bufferTotal, int messageTotal, boolean isInDisk) {
+    private boolean isTheBatchFull(int sizePy, int unitBatchNum, int maxMsgNums, long maxMsgSize, int bufferTotal, int messageTotal, boolean isInDisk) {
 
         if (0 == bufferTotal || 0 == messageTotal) {
             return false;
         }
 
-        if (maxMsgNums <= messageTotal) {
+        if (messageTotal + unitBatchNum > maxMsgNums) {
             return true;
         }
 
@@ -1392,16 +1356,7 @@ public class DefaultMessageStore implements MessageStore {
 
     private void checkSelf() {
         this.commitLog.checkSelf();
-
-        Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>> it = this.consumeQueueTable.entrySet().iterator();
-        while (it.hasNext()) {
-            Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> next = it.next();
-            Iterator<Entry<Integer, ConsumeQueueInterface>> itNext = next.getValue().entrySet().iterator();
-            while (itNext.hasNext()) {
-                Entry<Integer, ConsumeQueueInterface> cq = itNext.next();
-                this.consumeQueueStore.checkSelf(cq.getValue());
-            }
-        }
+        this.consumeQueueStore.checkSelf();
     }
 
     private boolean isTempFileExist() {
@@ -1410,53 +1365,6 @@ public class DefaultMessageStore implements MessageStore {
         return file.exists();
     }
 
-    private boolean loadConsumeQueue() {
-        checkOtherConsumeQueue();
-
-        File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
-        File[] fileTopicList = dirLogic.listFiles();
-        if (fileTopicList != null) {
-
-            for (File fileTopic : fileTopicList) {
-                String topic = fileTopic.getName();
-
-                File[] fileQueueIdList = fileTopic.listFiles();
-                if (fileQueueIdList != null) {
-                    for (File fileQueueId : fileQueueIdList) {
-                        int queueId;
-                        try {
-                            queueId = Integer.parseInt(fileQueueId.getName());
-                        } catch (NumberFormatException e) {
-                            continue;
-                        }
-                        ConsumeQueueInterface logic = new ConsumeQueue(
-                            topic,
-                            queueId,
-                            StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
-                            this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
-                            this);
-                        this.putConsumeQueue(topic, queueId, logic);
-                        if (!this.consumeQueueStore.load(logic)) {
-                            return false;
-                        }
-                    }
-                }
-            }
-        }
-
-        log.info("load logics queue all over, OK");
-
-        return true;
-    }
-
-    private void checkOtherConsumeQueue() {
-        File dirLogic = new File(StorePathConfigHelper.getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
-        if (dirLogic.exists()) {
-            throw new RuntimeException(format("Batch consume queue directory: [%s] exist. Can not load consume queue while batch consume queue exists.",
-                    StorePathConfigHelper.getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir())));
-        }
-    }
-
     private void recover(final boolean lastExitOK) {
         long recoverCqStart = System.currentTimeMillis();
         long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
@@ -1485,43 +1393,13 @@ public class DefaultMessageStore implements MessageStore {
         return transientStorePool;
     }
 
-    private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueueInterface consumeQueue) {
-        ConcurrentMap<Integer/* queueId */, ConsumeQueueInterface> map = this.consumeQueueTable.get(topic);
-        if (null == map) {
-            map = new ConcurrentHashMap<Integer/* queueId */, ConsumeQueueInterface>();
-            map.put(queueId, consumeQueue);
-            this.consumeQueueTable.put(topic, map);
-        } else {
-            map.put(queueId, consumeQueue);
-        }
-    }
-
     private long recoverConsumeQueue() {
-        long maxPhysicOffset = -1;
-        for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
-            for (ConsumeQueueInterface logic : maps.values()) {
-                this.consumeQueueStore.recover(logic);
-                if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
-                    maxPhysicOffset = logic.getMaxPhysicOffset();
-                }
-            }
-        }
-
-        return maxPhysicOffset;
+        return this.consumeQueueStore.recover();
     }
 
     public void recoverTopicQueueTable() {
-        HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
         long minPhyOffset = this.commitLog.getMinOffset();
-        for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
-            for (ConsumeQueueInterface logic : maps.values()) {
-                String key = logic.getTopic() + "-" + logic.getQueueId();
-                table.put(key, logic.getMaxOffsetInQueue());
-                this.consumeQueueStore.correctMinOffset(logic, minPhyOffset);
-            }
-        }
-
-        this.topicQueueTable = table;
+        this.consumeQueueStore.recoverOffsetTable(minPhyOffset);
     }
 
     @Override
@@ -1539,7 +1417,7 @@ public class DefaultMessageStore implements MessageStore {
     }
 
     public ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> getConsumeQueueTable() {
-        return consumeQueueTable;
+        return consumeQueueStore.getConsumeQueueTable();
     }
 
     @Override
@@ -1569,8 +1447,7 @@ public class DefaultMessageStore implements MessageStore {
     }
 
     public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
-        ConsumeQueueInterface cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
-        this.consumeQueueStore.putMessagePositionInfoWrapper(cq, dispatchRequest);
+        this.consumeQueueStore.putMessagePositionInfoWrapper(dispatchRequest);
     }
 
     @Override
@@ -1606,7 +1483,7 @@ public class DefaultMessageStore implements MessageStore {
 
     @Override
     public ConsumeQueueInterface getConsumeQueue(String topic, int queueId) {
-        ConcurrentMap<Integer, ConsumeQueueInterface> map = consumeQueueTable.get(topic);
+        ConcurrentMap<Integer, ConsumeQueueInterface> map = this.getConsumeQueueTable().get(topic);
         if (map == null) {
             return null;
         }
@@ -1656,19 +1533,21 @@ public class DefaultMessageStore implements MessageStore {
     }
 
     @Override
-    public void assignOffset(String topicQueueKey, MessageExtBrokerInner msg, short batchNum) {
+    public void assignOffset(String topicQueueKey, MessageExtBrokerInner msg, short messageNum) {
         final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
 
         if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
-            long topicOffset = this.topicQueueTable.computeIfAbsent(topicQueueKey, k -> 0L);
-            msg.setQueueOffset(topicOffset);
-            this.topicQueueTable.put(topicQueueKey, topicOffset + batchNum);
+            this.consumeQueueStore.assignQueueOffset(msg, messageNum);
         }
     }
 
     @Override
-    public void removeOffsetTable(String topicQueueKey) {
-        this.topicQueueTable.remove(topicQueueKey);
+    public Optional<TopicConfig> getTopicConfig(String topic) {
+        return this.consumeQueueStore.getTopicConfig(topic);
+    }
+
+    public void setTopicConfigTable(ConcurrentMap<String, TopicConfig> topicConfigTable) {
+        this.consumeQueueStore.setTopicConfigTable(topicConfigTable);
     }
 
     class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@@ -1975,7 +1854,7 @@ public class DefaultMessageStore implements MessageStore {
             if (minOffset > this.lastPhysicalMinOffset) {
                 this.lastPhysicalMinOffset = minOffset;
 
-                ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> tables = DefaultMessageStore.this.consumeQueueTable;
+                ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> tables = DefaultMessageStore.this.getConsumeQueueTable();
 
                 for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : tables.values()) {
                     for (ConsumeQueueInterface logic : maps.values()) {
@@ -2021,7 +1900,7 @@ public class DefaultMessageStore implements MessageStore {
                 logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
             }
 
-            ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> tables = DefaultMessageStore.this.consumeQueueTable;
+            ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> tables = DefaultMessageStore.this.getConsumeQueueTable();
 
             for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : tables.values()) {
                 for (ConsumeQueueInterface cq : maps.values()) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
index a7d5083..2ad6ee4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
@@ -171,8 +171,8 @@ public class MappedFileQueue implements Swappable {
                 return true;
             }
 
-                try {
-                    MappedFile mappedFile = new DefaultMappedFile(file.getPath(), mappedFileSize);
+            try {
+                MappedFile mappedFile = new DefaultMappedFile(file.getPath(), mappedFileSize);
 
                 mappedFile.setWrotePosition(this.mappedFileSize);
                 mappedFile.setFlushedPosition(this.mappedFileSize);
@@ -234,13 +234,13 @@ public class MappedFileQueue implements Swappable {
         if (this.allocateMappedFileService != null) {
             mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                     nextNextFilePath, this.mappedFileSize);
-            } else {
-                try {
-                    mappedFile = new DefaultMappedFile(nextFilePath, this.mappedFileSize);
-                } catch (IOException e) {
-                    log.error("create mappedFile exception", e);
-                }
+        } else {
+            try {
+                mappedFile = new DefaultMappedFile(nextFilePath, this.mappedFileSize);
+            } catch (IOException e) {
+                log.error("create mappedFile exception", e);
             }
+        }
 
         if (mappedFile != null) {
             if (this.mappedFiles.isEmpty()) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 51d8a24..341a29f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -18,10 +18,12 @@ package org.apache.rocketmq.store;
 
 import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.rocketmq.common.SystemClock;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
@@ -562,16 +564,19 @@ public interface MessageStore {
     boolean isSyncMaster();
 
     /**
-     * assign an queue offset and increase it.
+     * Assign an queue offset and increase it.
+     * If there is a race condition, you need to lock/unlock this method yourself.
+     *
      * @param topicQueueKey topic-queue key
      * @param msg message
-     * @param batchNum batch num
+     * @param messageNum message num
      */
-    void assignOffset(String topicQueueKey, MessageExtBrokerInner msg, short batchNum);
+    void assignOffset(String topicQueueKey, MessageExtBrokerInner msg, short messageNum);
 
     /**
-     * remove offset table
-     * @param topicQueueKey topic-queue key
+     * get topic config
+     * @param topic topic name
+     * @return topic config info
      */
-    void removeOffsetTable(String topicQueueKey);
+    Optional<TopicConfig> getTopicConfig(String topic);
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageContext.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageContext.java
index d4c160d..bf8832d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/PutMessageContext.java
+++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageContext.java
@@ -8,40 +8,41 @@
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
  */
+
 package org.apache.rocketmq.store;
 
 public class PutMessageContext {
-        private String topicQueueTableKey;
-        private long[] phyPos;
-        private int batchSize;
-
-        public PutMessageContext(String topicQueueTableKey) {
-            this.topicQueueTableKey = topicQueueTableKey;
-        }
-
-        public String getTopicQueueTableKey() {
-            return topicQueueTableKey;
-        }
-
-        public long[] getPhyPos() {
-            return phyPos;
-        }
-
-        public void setPhyPos(long[] phyPos) {
-            this.phyPos = phyPos;
-        }
-
-        public int getBatchSize() {
-            return batchSize;
-        }
-
-        public void setBatchSize(int batchSize) {
-            this.batchSize = batchSize;
-        }
-    }
\ No newline at end of file
+    private String topicQueueTableKey;
+    private long[] phyPos;
+    private int batchSize;
+
+    public PutMessageContext(String topicQueueTableKey) {
+        this.topicQueueTableKey = topicQueueTableKey;
+    }
+
+    public String getTopicQueueTableKey() {
+        return topicQueueTableKey;
+    }
+
+    public long[] getPhyPos() {
+        return phyPos;
+    }
+
+    public void setPhyPos(long[] phyPos) {
+        this.phyPos = phyPos;
+    }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+}
\ No newline at end of file
diff --git a/store/src/main/java/org/apache/rocketmq/store/StoreUtil.java b/store/src/main/java/org/apache/rocketmq/store/StoreUtil.java
index 81b407e..e910c2a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/StoreUtil.java
+++ b/store/src/main/java/org/apache/rocketmq/store/StoreUtil.java
@@ -44,10 +44,6 @@ public class StoreUtil {
         return physicalTotal;
     }
 
-    public static boolean isStreamMode(MessageStore messageStore) {
-        return messageStore instanceof StreamMessageStore;
-    }
-
     public static void fileAppend(MappedFile file, ByteBuffer data) {
         boolean success = file.appendMessage(data);
         if (!success) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/StreamMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/StreamMessageStore.java
deleted file mode 100644
index d9277ff..0000000
--- a/store/src/main/java/org/apache/rocketmq/store/StreamMessageStore.java
+++ /dev/null
@@ -1,2573 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.store;
-
-import org.apache.rocketmq.common.BrokerConfig;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.ServiceThread;
-import org.apache.rocketmq.common.SystemClock;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.message.MessageAccessor;
-import org.apache.rocketmq.common.message.MessageConst;
-import org.apache.rocketmq.common.message.MessageDecoder;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.running.RunningStats;
-import org.apache.rocketmq.common.sysflag.MessageSysFlag;
-import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.store.config.BrokerRole;
-import org.apache.rocketmq.store.config.FlushDiskType;
-import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.store.config.StorePathConfigHelper;
-import org.apache.rocketmq.store.ha.HAService;
-import org.apache.rocketmq.store.index.IndexService;
-import org.apache.rocketmq.store.index.QueryOffsetResult;
-import org.apache.rocketmq.store.logfile.DefaultMappedFile;
-import org.apache.rocketmq.store.logfile.MappedFile;
-import org.apache.rocketmq.store.queue.BatchConsumeQueue;
-import org.apache.rocketmq.store.queue.CQType;
-import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
-import org.apache.rocketmq.store.queue.ConsumeQueueStore;
-import org.apache.rocketmq.store.queue.CqUnit;
-import org.apache.rocketmq.store.queue.ReferredIterator;
-import org.apache.rocketmq.store.schedule.ScheduleMessageService;
-import org.apache.rocketmq.store.stats.BrokerStatsManager;
-import org.apache.rocketmq.store.util.PerfCounter;
-import org.apache.rocketmq.store.util.QueueTypeUtils;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileLock;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static java.lang.String.format;
-
-public class StreamMessageStore implements MessageStore {
-    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
-
-    public final PerfCounter.Ticks perfs = new PerfCounter.Ticks(log);
-
-    private final MessageStoreConfig messageStoreConfig;
-    // CommitLog
-    private final CommitLog commitLog;
-
-    private final ConsumeQueueStore consumeQueueStore;
-
-    private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueueInterface>> consumeQueueTable;
-
-    protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
-
-    protected HashMap<String/* topic-queueid */, Long/* offset */> batchTopicQueueTable = new HashMap<String, Long>(1024);
-
-    private final FlushConsumeQueueService flushConsumeQueueService;
-
-    private final CleanCommitLogService cleanCommitLogService;
-
-    private final CleanConsumeQueueService cleanConsumeQueueService;
-
-    private final CorrectLogicOffsetService correctLogicOffsetService;
-
-    private final IndexService indexService;
-
-    private final AllocateMappedFileService allocateMappedFileService;
-
-    private final ReputMessageService reputMessageService;
-
-    private final HAService haService;
-
-    private final ScheduleMessageService scheduleMessageService;
-
-    private final StoreStatsService storeStatsService;
-
-    private final TransientStorePool transientStorePool;
-
-    private final RunningFlags runningFlags = new RunningFlags();
-    private final SystemClock systemClock = new SystemClock();
-
-    private final ScheduledExecutorService scheduledExecutorService =
-        Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread"));
-    private final BrokerStatsManager brokerStatsManager;
-    private final MessageArrivingListener messageArrivingListener;
-    private final BrokerConfig brokerConfig;
-
-    private volatile boolean shutdown = true;
-
-    private StoreCheckpoint storeCheckpoint;
-
-    private AtomicLong printTimes = new AtomicLong(0);
-
-    private final LinkedList<CommitLogDispatcher> dispatcherList;
-
-    private RandomAccessFile lockFile;
-
-    private FileLock lock;
-
-    boolean shutDownNormal = false;
-
-    //polish for reput
-    private ThreadPoolExecutor[] reputExecutors;
-
-    private BlockingQueue<Runnable>[] reputQueues;
-
-    private boolean isDispatchFromSenderThread;
-
-    private static final Future EMPTY_FUTURE = new Future() {
-        @Override
-        public boolean cancel(final boolean mayInterruptIfRunning) {
-            return false;
-        }
-
-        @Override
-        public boolean isCancelled() {
-            return false;
-        }
-
-        @Override
-        public boolean isDone() {
-            return true;
-        }
-
-        @Override
-        public Object get() {
-            return null;
-        }
-
-        @Override
-        public Object get(final long timeout, final TimeUnit unit) {
-            return null;
-        }
-    };
-
-    // Max pull msg size
-    private final static int MAX_PULL_MSG_SIZE = 128 * 1024 * 1024;
-
-    public StreamMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
-                               final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
-        this.messageArrivingListener = messageArrivingListener;
-        this.brokerConfig = brokerConfig;
-        this.messageStoreConfig = messageStoreConfig;
-        this.brokerStatsManager = brokerStatsManager;
-        this.allocateMappedFileService = new AllocateMappedFileService(this);
-        if (messageStoreConfig.isEnableDLegerCommitLog()) {
-            throw new RuntimeException("dleger is not supported in this message store.");
-        }
-        this.isDispatchFromSenderThread = messageStoreConfig.isDispatchFromSenderThread();
-        this.commitLog = new CommitLog(this);
-        this.consumeQueueTable = new ConcurrentHashMap<>(32);
-        this.consumeQueueStore = new ConsumeQueueStore(this, this.messageStoreConfig, this.consumeQueueTable);
-
-        this.flushConsumeQueueService = new FlushConsumeQueueService();
-        this.cleanCommitLogService = new CleanCommitLogService();
-        this.cleanConsumeQueueService = new CleanConsumeQueueService();
-        this.correctLogicOffsetService = new CorrectLogicOffsetService();
-        this.storeStatsService = new StoreStatsService();
-        this.indexService = new IndexService(this);
-        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
-            this.haService = new HAService(this);
-        } else {
-            this.haService = null;
-        }
-        if (isDispatchFromSenderThread) {
-            this.reputMessageService = new SyncReputMessageService();
-        } else {
-            this.reputMessageService = new ReputMessageService();
-        }
-
-        this.scheduleMessageService = new ScheduleMessageService(this);
-
-        this.transientStorePool = new TransientStorePool(messageStoreConfig);
-
-        if (messageStoreConfig.isTransientStorePoolEnable()) {
-            this.transientStorePool.init();
-        }
-
-        this.allocateMappedFileService.start();
-
-        this.indexService.start();
-
-        this.dispatcherList = new LinkedList<>();
-        this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
-        this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
-
-        File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
-        DefaultMappedFile.ensureDirOK(file.getParent());
-        lockFile = new RandomAccessFile(file, "rw");
-        initAsyncReputThreads(messageStoreConfig.getDispatchCqThreads(), messageStoreConfig.getDispatchCqCacheNum());
-    }
-
-    /**
-     * @throws IOException
-     */
-    @Override
-    public boolean load() {
-        boolean result = true;
-
-        try {
-            long start = System.currentTimeMillis();
-            boolean lastExitOK = !this.isTempFileExist();
-            log.info("last shutdown {}, root dir: {}", lastExitOK ? "normally" : "abnormally", messageStoreConfig.getStorePathRootDir());
-
-            if (null != scheduleMessageService) {
-                result = result && this.scheduleMessageService.load();
-            }
-
-            // load Commit Log
-            result = result && this.commitLog.load();
-
-            // load Batch Consume Queue
-            result = result && this.loadBatchConsumeQueue();
-
-            if (result) {
-                this.storeCheckpoint =
-                    new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
-
-                this.indexService.load(lastExitOK);
-
-                this.recover(lastExitOK);
-
-                log.info("load over, and the max phy offset = {} cost = {}", this.getMaxPhyOffset(), System.currentTimeMillis() - start);
-            }
-        } catch (Exception e) {
-            log.error("load exception", e);
-            result = false;
-        }
-
-        if (!result) {
-            this.allocateMappedFileService.shutdown();
-        }
-
-        return result;
-    }
-
-    /**
-     * @throws Exception
-     */
-    @Override
-    public void start() throws Exception {
-
-        lock = lockFile.getChannel().tryLock(0, 1, false);
-        if (lock == null || lock.isShared() || !lock.isValid()) {
-            throw new RuntimeException("Lock failed,MQ already started");
-        }
-
-        lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
-        lockFile.getChannel().force(true);
-
-        if (this.getMessageStoreConfig().isDuplicationEnable()) {
-            this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
-        } else {
-            this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
-        }
-        this.reputMessageService.start();
-
-        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
-            this.haService.start();
-            this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
-        }
-
-        this.flushConsumeQueueService.start();
-        this.commitLog.start();
-        this.storeStatsService.start();
-
-        this.createTempFile();
-        this.addScheduleTask();
-        this.perfs.start();
-        this.shutdown = false;
-    }
-
-    @Override
-    public void shutdown() {
-        if (!this.shutdown) {
-            this.shutdown = true;
-
-            this.scheduledExecutorService.shutdown();
-
-            try {
-
-                Thread.sleep(1000 * 3);
-            } catch (InterruptedException e) {
-                log.error("shutdown Exception, ", e);
-            }
-
-            if (this.scheduleMessageService != null) {
-                this.scheduleMessageService.shutdown();
-            }
-            if (this.haService != null) {
-                this.haService.shutdown();
-            }
-
-            this.storeStatsService.shutdown();
-            this.indexService.shutdown();
-            this.commitLog.shutdown();
-            this.reputMessageService.shutdown();
-            this.flushConsumeQueueService.shutdown();
-            this.allocateMappedFileService.shutdown();
-            this.storeCheckpoint.flush();
-            this.storeCheckpoint.shutdown();
-
-            this.perfs.shutdown();
-
-            if (this.runningFlags.isWriteable() && dispatchBehindBytes() == 0) {
-                this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
-                shutDownNormal = true;
-            } else {
-                log.warn("the store may be wrong, so shutdown abnormally, and keep abort file. writable: {}, dispatchBehindBytes: {}, abort file: {}",
-                        this.runningFlags.isWriteable(), dispatchBehindBytes(), StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
-            }
-        }
-
-        this.transientStorePool.destroy();
-
-        if (lockFile != null && lock != null) {
-            try {
-                lock.release();
-                lockFile.close();
-            } catch (IOException e) {
-            }
-        }
-    }
-
-    @Override
-    public void destroy() {
-        this.destroyLogics();
-        this.commitLog.destroy();
-        this.indexService.destroy();
-        this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
-        this.deleteFile(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
-    }
-
-    @Override
-    public void destroyLogics() {
-        for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
-            for (ConsumeQueueInterface logic : maps.values()) {
-                this.consumeQueueStore.destroy(logic);
-            }
-        }
-    }
-
-    @Override
-    public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
-        if (this.shutdown) {
-            log.warn("message store has shutdown, so putMessage is forbidden");
-            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
-        }
-
-        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
-            long value = this.printTimes.getAndIncrement();
-            if ((value % 50000) == 0) {
-                log.warn("message store is slave mode, so putMessage is forbidden ");
-            }
-
-            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
-        }
-
-        if (msg.getProperties().containsKey(MessageConst.PROPERTY_INNER_NUM)
-                && !MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) {
-            log.warn("[BUG]The message had property {} but is not an inner batch", MessageConst.PROPERTY_INNER_NUM);
-            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
-        }
-
-        if (MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) {
-            CQType cqType = QueueTypeUtils.getCQType(this);
-
-            if (!CQType.BatchCQ.equals(cqType)) {
-                log.warn("[BUG]The message is an inner batch but cq type is not batch consume queue");
-                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
-            }
-        }
-
-        if (!this.runningFlags.isWriteable()) {
-            long value = this.printTimes.getAndIncrement();
-            if ((value % 50000) == 0) {
-                log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
-            }
-
-            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
-        } else {
-            this.printTimes.set(0);
-        }
-
-        int topicLen = msg.getTopic().length();
-        if (topicLen > this.messageStoreConfig.getMaxTopicLength()) {
-            log.warn("putMessage message topic[{}] length too long {}", msg.getTopic(), topicLen);
-            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
-        }
-
-        if (topicLen > Byte.MAX_VALUE) {
-            log.warn("putMessage message topic[{}] length too long {}, but it is not supported by broker",
-                    msg.getTopic(), topicLen);
-            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
-        }
-
-        if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
-            log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
-            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null));
-        }
-
-        if (this.isOSPageCacheBusy()) {
-            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null));
-        }
-
-        long beginTime = this.getSystemClock().now();
-        perfs.startTick("PUT_MESSAGE_TIME_MS");
-        CompletableFuture<PutMessageResult> result = this.commitLog.asyncPutMessage(msg);
-        perfs.endTick("PUT_MESSAGE_TIME_MS");
-
-        long eclipseTime = this.getSystemClock().now() - beginTime;
-        if (eclipseTime > 500) {
-            log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
-        }
-
-        return result;
-    }
-
-    @Override
-    public PutMessageResult putMessage(MessageExtBrokerInner msg) {
-        CompletableFuture<PutMessageResult> future = asyncPutMessage(msg);
-        try {
-            return future.get(3, TimeUnit.SECONDS);
-        } catch (Throwable t) {
-            log.error("Get async put result failed", t);
-            return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
-        }
-    }
-
-    @Override
-    public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
-        CompletableFuture<PutMessageResult> future = asyncPutMessages(messageExtBatch);
-        try {
-            return future.get(3, TimeUnit.SECONDS);
-        } catch (Throwable t) {
-            log.error("Get async put result failed", t);
-            return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
-        }
-    }
-
-    @Override
-    public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
-        if (this.shutdown) {
-            log.warn("StreamMessageStore has shutdown, so putMessages is forbidden");
-            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
-        }
-
-        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
-            long value = this.printTimes.getAndIncrement();
-            if ((value % 50000) == 0) {
-                log.warn("StreamMessageStore is in slave mode, so putMessages is forbidden ");
-            }
-
-            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
-        }
-
-        if (!this.runningFlags.isWriteable()) {
-            long value = this.printTimes.getAndIncrement();
-            if ((value % 50000) == 0) {
-                log.warn("StreamMessageStore is not writable, so putMessages is forbidden " + this.runningFlags.getFlagBits());
-            }
-
-            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
-        } else {
-            this.printTimes.set(0);
-        }
-
-        int topicLen = messageExtBatch.getTopic().length();
-        if (topicLen > this.messageStoreConfig.getMaxTopicLength()) {
-            log.warn("putMessage batch message topic[{}] length too long {}", messageExtBatch.getTopic(), topicLen);
-            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
-        }
-
-        if (topicLen > Byte.MAX_VALUE) {
-            log.warn("putMessage batch message topic[{}] length too long {}, but it is not supported by broker",
-                    messageExtBatch.getTopic(), topicLen);
-            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
-        }
-
-        if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) {
-            log.warn("PutMessages body length too long " + messageExtBatch.getBody().length);
-            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
-        }
-
-        if (this.isOSPageCacheBusy()) {
-            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null));
-        }
-
-        long beginTime = this.getSystemClock().now();
-        CompletableFuture<PutMessageResult> result = this.commitLog.asyncPutMessages(messageExtBatch);
-
-        long eclipseTime = this.getSystemClock().now() - beginTime;
-        if (eclipseTime > 500) {
-            log.warn("not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, messageExtBatch.getBody().length);
-        }
-        this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
-
-        return result;
-    }
-
-    @Override
-    public boolean isOSPageCacheBusy() {
-        long begin = this.getCommitLog().getBeginTimeInLock();
-        long diff = this.systemClock.now() - begin;
-
-        return diff < 10000000
-            && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
-    }
-
-    @Override
-    public long lockTimeMills() {
-        return this.commitLog.lockTimeMills();
-    }
-
-    @Override
-    public SystemClock getSystemClock() {
-        return systemClock;
-    }
-
-    @Override
-    public CommitLog getCommitLog() {
-        return commitLog;
-    }
-
-    public boolean isDispatchFromSenderThread() {
-        return isDispatchFromSenderThread;
-    }
-
-    @Override
-    public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
-        final int maxMsgNums,
-        final MessageFilter messageFilter) {
-        return getMessage(group, topic, queueId, offset, maxMsgNums, MAX_PULL_MSG_SIZE, messageFilter);
-    }
-
-    @Override
-    public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
-        final int maxMsgNums,
-        final int maxTotalMsgSize,
-        final MessageFilter messageFilter) {
-        if (this.shutdown) {
-            log.warn("message store has shutdown, so getMessage is forbidden");
-            return null;
-        }
-
-        if (!this.runningFlags.isReadable()) {
-            log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());
-            return null;
-        }
-
-        long beginTime = this.getSystemClock().now();
-
-        GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
-        long nextBeginOffset = offset;
-        long minOffset = 0;
-        long maxOffset = 0;
-
-        GetMessageResult getResult = new GetMessageResult();
-
-        final long maxOffsetPy = this.commitLog.getMaxOffset();
-
-        ConsumeQueueInterface consumeQueue = getConsumeQueue(topic, queueId);
-        if (consumeQueue != null) {
-            minOffset = consumeQueue.getMinOffsetInQueue();
-            maxOffset = consumeQueue.getMaxOffsetInQueue();
-
-            if (maxOffset == 0) {
-                status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
-                nextBeginOffset = nextOffsetCorrection(offset, 0);
-            } else if (offset < minOffset) {
-                status = GetMessageStatus.OFFSET_TOO_SMALL;
-                nextBeginOffset = nextOffsetCorrection(offset, minOffset);
-            } else if (offset == maxOffset) {
-                status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
-                nextBeginOffset = nextOffsetCorrection(offset, offset);
-            } else if (offset > maxOffset) {
-                status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
-                if (0 == minOffset) {
-                    nextBeginOffset = nextOffsetCorrection(offset, minOffset);
-                } else {
-                    nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
-                }
-            } else {
-                final int maxFilterMessageCount = Math.max(messageStoreConfig.getPullBatchMaxMessageCount(), maxMsgNums);
-                final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
-
-                long maxPullSize = Math.max(maxTotalMsgSize, 100);
-                if (maxPullSize > MAX_PULL_MSG_SIZE) {
-                    log.warn("The max pull size is too large maxPullSize={} topic={} queueId={}", maxPullSize, topic, queueId);
-                    maxPullSize = MAX_PULL_MSG_SIZE;
-                }
-                status = GetMessageStatus.NO_MATCHED_MESSAGE;
-                long maxPhyOffsetPulling = 0;
-                int cqFileNum = 0;
-
-                while (getResult.getBufferTotalSize() <= 0
-                        && nextBeginOffset < maxOffset
-                        && cqFileNum++ < this.messageStoreConfig.getTravelCqFileNumWhenGetMessage()) {
-                    ReferredIterator<CqUnit> bufferConsumeQueue = consumeQueue.iterateFrom(nextBeginOffset);
-
-                    if (bufferConsumeQueue == null) {
-                        status = GetMessageStatus.OFFSET_FOUND_NULL;
-                        nextBeginOffset = nextOffsetCorrection(nextBeginOffset, this.consumeQueueStore.rollNextFile(consumeQueue, nextBeginOffset));
-                        log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
-                                + maxOffset + ", but access logic queue failed. Correct nextBeginOffset to " + nextBeginOffset);
-                        break;
-                    }
-
-                    try {
-                        long nextPhyFileStartOffset = Long.MIN_VALUE;
-                        while (bufferConsumeQueue.hasNext()
-                                && nextBeginOffset < maxOffset) {
-                            CqUnit cqUnit = bufferConsumeQueue.next();
-                            long offsetPy = cqUnit.getPos();
-                            int sizePy = cqUnit.getSize();
-
-                            boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
-
-                            if (cqUnit.getQueueOffset() - offset > maxFilterMessageCount) {
-                                break;
-                            }
-
-                            if (this.isTheBatchFull(sizePy, cqUnit.getBatchNum(), maxMsgNums, maxPullSize, getResult.getBufferTotalSize(), getResult.getMessageCount(),
-                                    isInDisk)) {
-                                break;
-                            }
-
-                            if (getResult.getBufferTotalSize() >= maxPullSize) {
-                                break;
-                            }
-
-                            maxPhyOffsetPulling = offsetPy;
-
-                            //Be careful, here should before the isTheBatchFull
-                            nextBeginOffset = cqUnit.getQueueOffset() + cqUnit.getBatchNum();
-
-                            if (nextPhyFileStartOffset != Long.MIN_VALUE) {
-                                if (offsetPy < nextPhyFileStartOffset) {
-                                    continue;
-                                }
-                            }
-
-                            if (messageFilter != null
-                                    && !messageFilter.isMatchedByConsumeQueue(cqUnit.getValidTagsCodeAsLong(), cqUnit.getCqExtUnit())) {
-                                if (getResult.getBufferTotalSize() == 0) {
-                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
-                                }
-
-                                continue;
-                            }
-
-                            SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
-                            if (null == selectResult) {
-                                if (getResult.getBufferTotalSize() == 0) {
-                                    status = GetMessageStatus.MESSAGE_WAS_REMOVING;
-                                }
-
-                                nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
-                                continue;
-                            }
-
-                            if (messageFilter != null
-                                    && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
-                                if (getResult.getBufferTotalSize() == 0) {
-                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
-                                }
-                                // release...
-                                selectResult.release();
-                                continue;
-                            }
-
-                            this.storeStatsService.getGetMessageTransferedMsgCount().add(1);
-                            getResult.addMessage(selectResult, cqUnit.getQueueOffset(), cqUnit.getBatchNum());
-                            status = GetMessageStatus.FOUND;
-                            nextPhyFileStartOffset = Long.MIN_VALUE;
-                        }
-                    } finally {
-                        bufferConsumeQueue.release();
-                    }
-                }
-
-                if (diskFallRecorded) {
-                    long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
-                    brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
-                }
-
-                long diff = maxOffsetPy - maxPhyOffsetPulling;
-                long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
-                        * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
-                getResult.setSuggestPullingFromSlave(diff > memory);
-            }
-        } else {
-            status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
-            nextBeginOffset = nextOffsetCorrection(offset, 0);
-        }
-
-        if (GetMessageStatus.FOUND == status) {
-            this.storeStatsService.getGetMessageTimesTotalFound().add(1);
-        } else {
-            this.storeStatsService.getGetMessageTimesTotalMiss().add(1);
-        }
-        long elapsedTime = this.getSystemClock().now() - beginTime;
-        this.storeStatsService.setGetMessageEntireTimeMax(elapsedTime);
-
-        getResult.setStatus(status);
-        getResult.setNextBeginOffset(nextBeginOffset);
-        getResult.setMaxOffset(maxOffset);
-        getResult.setMinOffset(minOffset);
-        return getResult;
-    }
-
-    @Override
-    public long getMaxOffsetInQueue(String topic, int queueId) {
-        ConsumeQueueInterface logic = this.getConsumeQueue(topic, queueId);
-        if (logic != null) {
-            return logic.getMaxOffsetInQueue();
-        }
-
-        return 0;
-    }
-
-    @Override
-    public long getMaxOffsetInQueue(String topic, int queueId, boolean committed) {
-        if (committed) {
-            ConsumeQueueInterface logic = this.getConsumeQueue(topic, queueId);
-            if (logic != null) {
-                return logic.getMaxOffsetInQueue();
-            }
-        } else {
-            Long offset = this.batchTopicQueueTable.get(topic + "-" + queueId);
-            if (offset != null) {
-                return offset;
-            }
-        }
-
-        return 0;
-    }
-
-    @Override
-    public long getMinOffsetInQueue(String topic, int queueId) {
-        ConsumeQueueInterface logic = this.getConsumeQueue(topic, queueId);
-        if (logic != null) {
-            return logic.getMinOffsetInQueue();
-        }
-
-        return -1;
-    }
-
-    @Override
-    public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQueueOffset) {
-        ConsumeQueueInterface consumeQueue = getConsumeQueue(topic, queueId);
-        if (consumeQueue != null) {
-
-            ReferredIterator<CqUnit> bufferConsumeQueue = consumeQueue.iterateFrom(consumeQueueOffset);
-            if (bufferConsumeQueue != null) {
-                try {
-                    if (bufferConsumeQueue.hasNext()) {
-                        long offsetPy = bufferConsumeQueue.next().getPos();
-                        return offsetPy;
-                    }
-                } finally {
-                    bufferConsumeQueue.release();
-                }
-            }
-        }
-
-        return 0;
-    }
-
-    @Override
-    public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) {
-        ConsumeQueueInterface logic = getConsumeQueue(topic, queueId);
-        if (logic != null) {
-            long resultOffset = logic.getOffsetInQueueByTime(timestamp);
-            // -1 means no msg found.
-            if (resultOffset == -1) {
-                return -1;
-            }
-            // Make sure the result offset should in valid range.
-            resultOffset = Math.max(resultOffset, logic.getMinOffsetInQueue());
-            resultOffset = Math.min(resultOffset, logic.getMaxOffsetInQueue());
-            return resultOffset;
-        }
-
-        // logic is null means there is no message in this queue, return -1.
-        return -1;
-    }
-
-    @Override
-    public MessageExt lookMessageByOffset(long commitLogOffset) {
-        SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4);
-        if (null != sbr) {
-            try {
-                // 1 TOTALSIZE
-                int size = sbr.getByteBuffer().getInt();
-                return lookMessageByOffset(commitLogOffset, size);
-            } finally {
-                sbr.release();
-            }
-        }
-
-        return null;
-    }
-
-    @Override
-    public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) {
-        SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4);
-        if (null != sbr) {
-            try {
-                // 1 TOTALSIZE
-                int size = sbr.getByteBuffer().getInt();
-                return this.commitLog.getMessage(commitLogOffset, size);
-            } finally {
-                sbr.release();
-            }
-        }
-
-        return null;
-    }
-
-    @Override
-    public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset, int msgSize) {
-        return this.commitLog.getMessage(commitLogOffset, msgSize);
-    }
-
-    @Override
-    public String getRunningDataInfo() {
-        return this.storeStatsService.toString();
-    }
-
-    private String getStorePathPhysic() {
-        String storePathPhysic = StreamMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
-        return storePathPhysic;
-    }
-
-    @Override
-    public HashMap<String, String> getRuntimeInfo() {
-        HashMap<String, String> result = this.storeStatsService.getRuntimeInfo();
-
-        {
-            String storePathPhysic = this.getMessageStoreConfig().getStorePathCommitLog();
-            double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
-            result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio));
-
-        }
-
-        {
-
-            String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir());
-            double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
-            result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio));
-        }
-
-        {
-            if (this.scheduleMessageService != null) {
-                this.scheduleMessageService.buildRunningStats(result);
-            }
-        }
-
-        result.put(RunningStats.commitLogMinOffset.name(), String.valueOf(this.getMinPhyOffset()));
-        result.put(RunningStats.commitLogMaxOffset.name(), String.valueOf(this.getMaxPhyOffset()));
-
-        return result;
-    }
-
-    @Override
-    public long getMaxPhyOffset() {
-        return this.commitLog.getMaxOffset();
-    }
-
-    @Override
-    public long getMinPhyOffset() {
-        return this.commitLog.getMinOffset();
-    }
-
-    @Override
-    public long getEarliestMessageTime(String topic, int queueId) {
-        ConsumeQueueInterface logicQueue = this.getConsumeQueue(topic, queueId);
-        if (logicQueue != null) {
-            return getStoreTime(logicQueue.getEarliestUnit());
-        }
-
-        return -1;
-    }
-
-    private long getStoreTime(CqUnit result) {
-        if (result != null) {
-            try {
-                final long phyOffset = result.getPos();
-                final int size = result.getSize();
-                long storeTime = this.getCommitLog().pickupStoreTimestamp(phyOffset, size);
-                return storeTime;
-            } catch (Exception e) {
-            }
-        }
-        return -1;
-    }
-
-    @Override
-    public long getEarliestMessageTime() {
-        final long minPhyOffset = this.getMinPhyOffset();
-        final int size = this.messageStoreConfig.getMaxMessageSize() * 2;
-        return this.getCommitLog().pickupStoreTimestamp(minPhyOffset, size);
-    }
-
-    @Override
-    public long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset) {
-        ConsumeQueueInterface logicQueue = this.getConsumeQueue(topic, queueId);
-        if (logicQueue != null) {
-            return getStoreTime(logicQueue.get(consumeQueueOffset));
-        }
-
-        return -1;
-    }
-
-    @Override
-    public long getMessageTotalInQueue(String topic, int queueId) {
-        ConsumeQueueInterface logicQueue = this.getConsumeQueue(topic, queueId);
-        if (logicQueue != null) {
-            return logicQueue.getMessageTotalInQueue();
-        }
-
-        return -1;
-    }
-
-    @Override
-    public SelectMappedBufferResult getCommitLogData(final long offset) {
-        if (this.shutdown) {
-            log.warn("message store has shutdown, so getPhyQueueData is forbidden");
-            return null;
-        }
-
-        return this.commitLog.getData(offset);
-    }
-
-    @Override
-    public boolean appendToCommitLog(long startOffset, byte[] data, int dataStart, int dataLength) {
-        if (this.shutdown) {
-            log.warn("message store has shutdown, so appendToPhyQueue is forbidden");
-            return false;
-        }
-
-        boolean result = this.commitLog.appendData(startOffset, data, dataStart, dataLength);
-        if (result) {
-            this.reputMessageService.wakeup();
-        } else {
-            log.error("appendToPhyQueue failed " + startOffset + " " + data.length);
-        }
-
-        return result;
-    }
-
-    @Override
-    public void executeDeleteFilesManually() {
-        this.cleanCommitLogService.excuteDeleteFilesManualy();
-    }
-
-    @Override
-    public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {
-        QueryMessageResult queryMessageResult = new QueryMessageResult();
-
-        long lastQueryMsgTime = end;
-
-        for (int i = 0; i < 3; i++) {
-            QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);
-            if (queryOffsetResult.getPhyOffsets().isEmpty()) {
-                break;
-            }
-
-            Collections.sort(queryOffsetResult.getPhyOffsets());
-
-            queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset());
-            queryMessageResult.setIndexLastUpdateTimestamp(queryOffsetResult.getIndexLastUpdateTimestamp());
-
-            for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) {
-                long offset = queryOffsetResult.getPhyOffsets().get(m);
-
-                try {
-
-                    boolean match = true;
-                    MessageExt msg = this.lookMessageByOffset(offset);
-                    if (0 == m) {
-                        lastQueryMsgTime = msg.getStoreTimestamp();
-                    }
-
-//                    String[] keyArray = msg.getKeys().split(MessageConst.KEY_SEPARATOR);
-//                    if (topic.equals(msg.getTopic())) {
-//                        for (String k : keyArray) {
-//                            if (k.equals(key)) {
-//                                match = true;
-//                                break;
-//                            }
-//                        }
-//                    }
-
-                    if (match) {
-                        SelectMappedBufferResult result = this.commitLog.getData(offset, false);
-                        if (result != null) {
-                            int size = result.getByteBuffer().getInt(0);
-                            result.getByteBuffer().limit(size);
-                            result.setSize(size);
-                            queryMessageResult.addMessage(result);
-                        }
-                    } else {
-                        log.warn("queryMessage hash duplicate, {} {}", topic, key);
-                    }
-                } catch (Exception e) {
-                    log.error("queryMessage exception", e);
-                }
-            }
-
-            if (queryMessageResult.getBufferTotalSize() > 0) {
-                break;
-            }
-
-            if (lastQueryMsgTime < begin) {
-                break;
-            }
-        }
-
-        return queryMessageResult;
-    }
-
-    @Override
-    public void updateHaMasterAddress(String newAddr) {
-        this.haService.updateMasterAddress(newAddr);
-    }
-
-    @Override
-    public long slaveFallBehindMuch() {
-        return this.commitLog.getMaxOffset() - this.haService.getPush2SlaveMaxOffset().get();
-    }
-
-    @Override
-    public long now() {
-        return this.systemClock.now();
-    }
-
-    @Override
-    public int cleanUnusedTopic(Set<String> topics) {
-        Iterator<Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>> it = consumeQueueTable.entrySet().iterator();
-        while (it.hasNext()) {
-            Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> next = it.next();
-            String topic = next.getKey();
-
-            if (!topics.contains(topic) && !topic.equals(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC)) {
-                ConcurrentMap<Integer, ConsumeQueueInterface> queueTable = next.getValue();
-                for (ConsumeQueueInterface cq : queueTable.values()) {
-                    this.consumeQueueStore.destroy(cq);
-                    log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned",
-                            cq.getTopic(),
-                            cq.getQueueId()
-                    );
-
-                    this.commitLog.removeQueueFromTopicQueueTable(cq.getTopic(), cq.getQueueId());
-                }
-                it.remove();
-                log.info("cleanUnusedTopic: {},topic consumeQueue destroyed", topic);
-            }
-        }
-        return 0;
-    }
-
-    @Override
-    public void cleanExpiredConsumerQueue() {
-        long minCommitLogOffset = this.commitLog.getMinOffset();
-
-        Iterator<Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>> it = this.consumeQueueTable.entrySet().iterator();
-        while (it.hasNext()) {
-            Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> next = it.next();
-            String topic = next.getKey();
-            if (!topic.equals(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC)) {
-                ConcurrentMap<Integer, ConsumeQueueInterface> queueTable = next.getValue();
-                Iterator<Map.Entry<Integer, ConsumeQueueInterface>> itQT = queueTable.entrySet().iterator();
-                while (itQT.hasNext()) {
-                    Map.Entry<Integer, ConsumeQueueInterface> nextQT = itQT.next();
-                    long maxCLOffsetInConsumeQueue = nextQT.getValue().getMaxPhysicOffset();
-
-                    if (maxCLOffsetInConsumeQueue == -1) {
-                        log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.",
-                                nextQT.getValue().getTopic(),
-                                nextQT.getValue().getQueueId(),
-                                nextQT.getValue().getMaxPhysicOffset(),
-                                nextQT.getValue().getMinLogicOffset());
-                    } else if (maxCLOffsetInConsumeQueue < minCommitLogOffset) {
-                        log.info(
-                                "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}",
-                                topic,
-                                nextQT.getKey(),
-                                minCommitLogOffset,
-                                maxCLOffsetInConsumeQueue);
-
-                        this.commitLog.removeQueueFromTopicQueueTable(nextQT.getValue().getTopic(),
-                                nextQT.getValue().getQueueId());
-
-                        this.consumeQueueStore.destroy(nextQT.getValue());
-                        itQT.remove();
-                    }
-                }
-
-                if (queueTable.isEmpty()) {
-                    log.info("cleanExpiredConsumerQueue: {},topic destroyed", topic);
-                    it.remove();
-                }
-            }
-        }
-    }
-
-    public double getDiskSpaceWarningLevelRatio() {
-        return cleanCommitLogService.getDiskSpaceWarningLevelRatio();
-    }
-
-    @Override
-    public boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset) {
-
-        final long maxOffsetPy = this.commitLog.getMaxOffset();
-
-        ConsumeQueueInterface consumeQueue = getConsumeQueue(topic, queueId);
-        if (consumeQueue != null) {
-            CqUnit cqUnit = consumeQueue.get(consumeOffset);
-
-            if (cqUnit != null) {
-                long offsetPy = cqUnit.getPos();
-                return checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
-            } else {
-                return false;
-            }
-        }
-        return false;
-    }
-
-    @Override
-    public long dispatchBehindBytes() {
-        return this.reputMessageService.behind();
-    }
-
-    @Override
-    public long flush() {
-        return this.commitLog.flush();
-    }
-
-    @Override
-    public boolean resetWriteOffset(long phyOffset) {
-        return this.commitLog.resetOffset(phyOffset);
-    }
-
-    @Override
-    public long getConfirmOffset() {
-        return this.commitLog.getConfirmOffset();
-    }
-
-    @Override
-    public void setConfirmOffset(long phyOffset) {
-        this.commitLog.setConfirmOffset(phyOffset);
-    }
-
-    @Override
-    public MessageExt lookMessageByOffset(long commitLogOffset, int size) {
-        SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, size);
-        if (null != sbr) {
-            try {
-                return MessageDecoder.decode(sbr.getByteBuffer(), true, false);
-            } finally {
-                sbr.release();
-            }
-        }
-
-        return null;
-    }
-
-    private long nextOffsetCorrection(long oldOffset, long newOffset) {
-        long nextOffset = oldOffset;
-        if (this.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE || this.getMessageStoreConfig().isOffsetCheckInSlave()) {
-            nextOffset = newOffset;
-        }
-        return nextOffset;
-    }
-
-    private boolean checkInDiskByCommitOffset(long offsetPy, long maxOffsetPy) {
-        long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
-        return (maxOffsetPy - offsetPy) > memory;
-    }
-
-    private boolean isTheBatchFull(int sizePy, int unitBatchNum, int maxMsgNums, long maxMsgSize, int bufferTotal,
-                                   int messageTotal, boolean isInDisk) {
-
-        //At least has one message(batch)
-        if (0 == bufferTotal || 0 == messageTotal) {
-            return false;
-        }
-
-        if (messageTotal + unitBatchNum > maxMsgNums) {
-            return true;
-        }
-
-        if (bufferTotal + sizePy > maxMsgSize) {
-            return true;
-        }
-
-        if (isInDisk) {
-            if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) {
-                return true;
-            }
-            if (messageTotal + unitBatchNum > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk()) {
-                return true;
-            }
-        } else {
-            if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) {
-                return true;
-            }
-
-            if (messageTotal + unitBatchNum > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory()) {
-                return true;
-            }
-        }
-
-        return false;
-    }
-
-    private void deleteFile(final String fileName) {
-        File file = new File(fileName);
-        boolean result = file.delete();
-        log.info(fileName + (result ? " delete OK" : " delete Failed"));
-    }
-
-    /**
-     * @throws IOException
-     */
-    private void createTempFile() throws IOException {
-        String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
-        File file = new File(fileName);
-        DefaultMappedFile.ensureDirOK(file.getParent());
-        boolean result = file.createNewFile();
-        log.info(fileName + (result ? " create OK" : " already exists"));
-    }
-
-    private void addScheduleTask() {
-
-        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                StreamMessageStore.this.cleanFilesPeriodically();
-            }
-        }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
-
-        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                StreamMessageStore.this.checkSelf();
-            }
-        }, 1, 10, TimeUnit.MINUTES);
-
-        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    if (!getMessageStoreConfig().isMappedFileSwapEnable()) {
-                        log.warn("Swap is not enabled.");
-                        return ;
-                    }
-                    StreamMessageStore.this.commitLog.swapMap(getMessageStoreConfig().getCommitLogSwapMapReserveFileNum(),
-                            getMessageStoreConfig().getCommitLogForceSwapMapInterval(), getMessageStoreConfig().getCommitLogSwapMapInterval());
-                    for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : StreamMessageStore.this.consumeQueueTable.values()) {
-                        for (ConsumeQueueInterface logic : maps.values()) {
-                            StreamMessageStore.this.consumeQueueStore.swapMap(logic, getMessageStoreConfig().getLogicQueueSwapMapReserveFileNum(),
-                                    getMessageStoreConfig().getLogicQueueForceSwapMapInterval(), getMessageStoreConfig().getLogicQueueSwapMapInterval());
-                        }
-                    }
-                } catch (Exception e) {
-                    log.error("swap map exception", e);
-                }
-            }
-        }, 1, 5, TimeUnit.MINUTES);
-
-        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    StreamMessageStore.this.commitLog.cleanSwappedMap(getMessageStoreConfig().getCleanSwapedMapInterval());
-                    for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : StreamMessageStore.this.consumeQueueTable.values()) {
-                        for (ConsumeQueueInterface logic : maps.values()) {
-                            StreamMessageStore.this.consumeQueueStore.cleanSwappedMap(logic, getMessageStoreConfig().getCleanSwapedMapInterval());
-                        }
-                    }
-                } catch (Exception e) {
-                    log.error("clean swap map exception", e);
-                }
-            }
-        }, 1, 5, TimeUnit.MINUTES);
-
-        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                if (StreamMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) {
-                    try {
-                        if (StreamMessageStore.this.commitLog.getBeginTimeInLock() != 0) {
-                            long lockTime = System.currentTimeMillis() - StreamMessageStore.this.commitLog.getBeginTimeInLock();
-                            if (lockTime > 1000 && lockTime < 10000000) {
-
-                                String stack = UtilAll.jstack();
-                                final String fileName = System.getProperty("user.home") + File.separator + "debug/lock/stack-"
-                                        + StreamMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime;
-                                MixAll.string2FileNotSafe(stack, fileName);
-                            }
-                        }
-                    } catch (Exception e) {
-                    }
-                }
-            }
-        }, 1, 1, TimeUnit.SECONDS);
-
-        // this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-        // @Override
-        // public void run() {
-        // StreamMessageStore.this.cleanExpiredConsumerQueue();
-        // }
-        // }, 1, 1, TimeUnit.HOURS);
-    }
-
-    private void cleanFilesPeriodically() {
-        this.cleanCommitLogService.run();
-        this.cleanConsumeQueueService.run();
-        this.correctLogicOffsetService.run();
-    }
-
-    private void checkSelf() {
-        this.commitLog.checkSelf();
-
-        Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>> it = this.consumeQueueTable.entrySet().iterator();
-        while (it.hasNext()) {
-            Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> next = it.next();
-            Iterator<Entry<Integer, ConsumeQueueInterface>> itNext = next.getValue().entrySet().iterator();
-            while (itNext.hasNext()) {
-                Entry<Integer, ConsumeQueueInterface> cq = itNext.next();
-                this.consumeQueueStore.checkSelf(cq.getValue());
-            }
-        }
-    }
-
-    private boolean isTempFileExist() {
-        String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
-        File file = new File(fileName);
-        return file.exists();
-    }
-
-    protected boolean loadBatchConsumeQueue() {
-        checkOtherConsumeQueue();
-
-        File dirLogic = new File(StorePathConfigHelper.getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
-        File[] fileTopicList = dirLogic.listFiles();
-        if (fileTopicList != null) {
-
-            for (File fileTopic : fileTopicList) {
-                String topic = fileTopic.getName();
-
-                File[] fileQueueIdList = fileTopic.listFiles();
-                if (fileQueueIdList != null) {
-                    for (File fileQueueId : fileQueueIdList) {
-                        int queueId;
-                        try {
-                            queueId = Integer.parseInt(fileQueueId.getName());
-                        } catch (NumberFormatException e) {
-                            continue;
-                        }
-                        ConsumeQueueInterface logic = new BatchConsumeQueue(
-                                topic,
-                                queueId,
-                                StorePathConfigHelper.getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
-                                this.getMessageStoreConfig().getMapperFileSizeBatchConsumeQueue(),
-                                this);
-                        this.putConsumeQueue(topic, queueId, logic);
-                        if (!this.consumeQueueStore.load(logic)) {
-                            return false;
-                        }
-                    }
-                }
-            }
-        }
-
-        log.info("load logics queue all over, OK");
-
-        return true;
-    }
-
-    private void checkOtherConsumeQueue() {
-        File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
-        if (dirLogic.exists()) {
-            throw new RuntimeException(format("Consume queue directory: [%s] exist. Can not load batch consume queue while consume queue exists.",
-                    StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir())));
-        }
-    }
-
-    private void recover(final boolean lastExitOK) {
-        long recoverCqStart = System.currentTimeMillis();
-        long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
-        long recoverCqEnd = System.currentTimeMillis();
-
-        if (lastExitOK) {
-            this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
-        } else {
-            this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
-        }
-        long recoverClogEnd = System.currentTimeMillis();
-        this.recoverTopicQueueTable();
-        long recoverOffsetEnd = System.currentTimeMillis();
-
-        log.info("Recover end total:{} recoverCq:{} recoverClog:{} recoverOffset:{}",
-                recoverOffsetEnd - recoverCqStart, recoverCqEnd - recoverCqStart, recoverClogEnd - recoverCqEnd, recoverOffsetEnd - recoverClogEnd);
-    }
-
-    @Override
-    public MessageStoreConfig getMessageStoreConfig() {
-        return messageStoreConfig;
-    }
-
-    @Override
-    public TransientStorePool getTransientStorePool() {
-        return transientStorePool;
-    }
-
-    private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueueInterface consumeQueue) {
-        ConcurrentMap<Integer/* queueId */, ConsumeQueueInterface> map = this.consumeQueueTable.get(topic);
-        if (null == map) {
-            map = new ConcurrentHashMap<Integer/* queueId */, ConsumeQueueInterface>();
-            map.put(queueId, consumeQueue);
-            this.consumeQueueTable.put(topic, map);
-        } else {
-            map.put(queueId, consumeQueue);
-        }
-    }
-
-    private long recoverConsumeQueue() {
-        long maxPhysicOffset = -1;
-        for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
-            for (ConsumeQueueInterface logic : maps.values()) {
-                this.consumeQueueStore.recover(logic);
-                if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
-                    maxPhysicOffset = logic.getMaxPhysicOffset();
-                }
-            }
-        }
-
-        return maxPhysicOffset;
-    }
-
-    public void recoverTopicQueueTable() {
-        HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
-        long minPhyOffset = this.commitLog.getMinOffset();
-        for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
-            for (ConsumeQueueInterface logic : maps.values()) {
-                String key = logic.getTopic() + "-" + logic.getQueueId();
-                table.put(key, logic.getMaxOffsetInQueue());
-                this.consumeQueueStore.correctMinOffset(logic, minPhyOffset);
-            }
-        }
-
-        this.batchTopicQueueTable = table;
-    }
-
-    @Override
-    public AllocateMappedFileService getAllocateMappedFileService() {
-        return allocateMappedFileService;
-    }
-
-    @Override
-    public void truncateDirtyLogicFiles(long phyOffset) {
-        ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> tables = StreamMessageStore.this.consumeQueueTable;
-
-        for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : tables.values()) {
-            for (ConsumeQueueInterface logic : maps.values()) {
-                this.consumeQueueStore.truncateDirtyLogicFiles(logic, phyOffset);
-            }
-        }
-    }
-
-    @Override
-    public StoreStatsService getStoreStatsService() {
-        return storeStatsService;
-    }
-
-    public ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> getConsumeQueueTable() {
-        return consumeQueueTable;
-    }
-
-    @Override
-    public StoreCheckpoint getStoreCheckpoint() {
-        return storeCheckpoint;
-    }
-
-    @Override
-    public HAService getHaService() {
-        return haService;
-    }
-
-    @Override
-    public void registerCleanFileHook(CleanFilesHook logicalQueueCleanHook) {
-
-    }
-
-    @Override
-    public ScheduleMessageService getScheduleMessageService() {
-        return scheduleMessageService;
-    }
-
-    @Override
-    public RunningFlags getRunningFlags() {
-        return runningFlags;
-    }
-
-    public void initAsyncReputThreads(int tsNum, int cacheNum) {
-        if (tsNum <= 0) {
-            tsNum = 1;
-        }
-        if (cacheNum < 512) {
-            cacheNum = 512;
-        }
-        reputExecutors = new ThreadPoolExecutor[tsNum];
-        reputQueues = new BlockingQueue[tsNum];
-
-        for (int i = 0; i < tsNum; i++) {
-            final int tmp = i;
-            reputQueues[i] = new LinkedBlockingDeque<>(cacheNum);
-            //Each executor can only have one thread, otherwise the cq index will get wrong
-            reputExecutors[i] = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
-                    reputQueues[i],
-                    new ThreadFactory() {
-                        @Override
-                        public Thread newThread(Runnable r) {
-                            return new Thread(r, "MQDispatchThread-" + tmp);
-                        }
-                    });
-        }
-        for (ThreadPoolExecutor executorService: reputExecutors) {
-            if (executorService.getMaximumPoolSize() != 1 ||
-                    executorService.getCorePoolSize() != 1) {
-                throw new RuntimeException("The MQDispatchThreadPoll can only have one thread");
-            }
-        }
-
-    }
-
-    public Future doDispatch(final DispatchRequest request) {
-        return doDispatch(request, false);
-    }
-
-    public Future doDispatch(final DispatchRequest request, boolean async) {
-        Runnable task = new Runnable() {
-            @Override
-            public void run() {
-                for (CommitLogDispatcher dispatcher : StreamMessageStore.this.dispatcherList) {
-                    dispatcher.dispatch(request);
-                }
-            }
-        };
-        if (!async) {
-            task.run();
-            return EMPTY_FUTURE;
-        }
-        int hash = Math.abs((request.getTopic() + request.getQueueId()).hashCode());
-        int slot = hash % reputExecutors.length;
-        try {
-            return reputExecutors[slot].submit(task);
-        } catch (RejectedExecutionException ignored) {
-            int tryNum = 0;
-            while (tryNum++ < Integer.MAX_VALUE) {
-                try {
-                    Thread.sleep(1);
-                } catch (Throwable ignored2) {
-
-                }
-                try {
-                    return reputExecutors[slot].submit(task);
-                } catch (RejectedExecutionException e) {
-                    log.warn("DispatchReject topic:{} queue:{} pyOffset:{} tryNum:{}", request.getTopic(), request.getQueueId(), request.getCommitLogOffset(), tryNum,  e);
-                }
-            }
-        }
-        return EMPTY_FUTURE;
-    }
-
-    public void syncProcessDispatchRequest(DispatchRequest request, boolean isRecover) throws InterruptedException {
-        if (!isDispatchFromSenderThread) {
-            log.error("addDispatchRequestQueue operation not supported while isCreateDispatchRequestAsync is true");
-        } else {
-            if (isRecover) {
-                ((SyncReputMessageService) this.reputMessageService).processDispatchRequestForRecover(request);
-            } else {
-                ((SyncReputMessageService) this.reputMessageService).processDispatchRequest(request);
-            }
-        }
-    }
-
-    public boolean dispatched(long physicalOffset) {
-        return reputMessageService.dispatched(physicalOffset);
-    }
-
-    public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
-        ConsumeQueueInterface cq = this.findOrCreateConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
-        this.consumeQueueStore.putMessagePositionInfoWrapper(cq, dispatchRequest);
-    }
-
-    private ConsumeQueueInterface findOrCreateConsumeQueue(String topic, int queueId) {
-        return this.consumeQueueStore.findOrCreateConsumeQueue(topic, queueId);
-    }
-
-    @Override
-    public BrokerStatsManager getBrokerStatsManager() {
-        return brokerStatsManager;
-    }
-
-    @Override
-    public void handleScheduleMessageService(final BrokerRole brokerRole) {
-        if (this.scheduleMessageService != null) {
-            if (brokerRole == BrokerRole.SLAVE) {
-                this.scheduleMessageService.shutdown();
-            } else {
-                this.scheduleMessageService.start();
-            }
-        }
-
-    }
-
-    public int remainTransientStoreBufferNumbs() {
-        return this.transientStorePool.availableBufferNums();
-    }
-
-    @Override
-    public boolean isTransientStorePoolDeficient() {
-        return remainTransientStoreBufferNumbs() == 0;
-    }
-
-    @Override
-    public LinkedList<CommitLogDispatcher> getDispatcherList() {
-        return this.dispatcherList;
-    }
-
-    @Override
-    public ConsumeQueueInterface getConsumeQueue(String topic, int queueId) {
-        ConcurrentMap<Integer, ConsumeQueueInterface> map = consumeQueueTable.get(topic);
-        if (map == null) {
-            return null;
-        }
-        return map.get(queueId);
-    }
-
-    @Override
-    public void unlockMappedFile(final MappedFile mappedFile) {
-        this.scheduledExecutorService.schedule(new Runnable() {
-            @Override
-            public void run() {
-                mappedFile.munlock();
-            }
-        }, 6, TimeUnit.SECONDS);
-    }
-
-    @Override
-    public PerfCounter.Ticks getPerfCounter() {
-        return perfs;
-    }
-
-    @Override
-    public ConsumeQueueStore getQueueStore() {
-        return consumeQueueStore;
-    }
-
-    @Override
-    public boolean isSyncDiskFlush() {
-        return FlushDiskType.SYNC_FLUSH == this.getMessageStoreConfig().getFlushDiskType();
-    }
-
-    @Override
-    public boolean isSyncMaster() {
-        return BrokerRole.SYNC_MASTER == this.getMessageStoreConfig().getBrokerRole();
-    }
-
-    @Override
-    public void assignOffset(String topicQueueKey, MessageExtBrokerInner msg, short batchNum) {
-        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
-
-        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
-            assignOffsetForCq(topicQueueKey, msg);
-            assignOffsetForBcq(topicQueueKey, msg, batchNum);
-        }
-    }
-
-    private void assignOffsetForCq(String topicQueueKey, MessageExtBrokerInner msg) {
-        // not supported yet
-    }
-
-    private void assignOffsetForBcq(String topicQueueKey, MessageExtBrokerInner msg, short batchNum) {
-        Long batchTopicOffset = this.batchTopicQueueTable.computeIfAbsent(topicQueueKey, k -> 0L);
-        CQType cqType = QueueTypeUtils.getCQType(this);
-        if (MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG) || CQType.BatchCQ.equals(cqType)) {
-            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_BASE, String.valueOf(batchTopicOffset));
-            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
-        }
-        msg.setQueueOffset(batchTopicOffset);
-        this.batchTopicQueueTable.put(topicQueueKey, batchTopicOffset + batchNum);
-    }
-
-    @Override
-    public void removeOffsetTable(String topicQueueKey) {
-        this.topicQueueTable.remove(topicQueueKey);
-        this.batchTopicQueueTable.remove(topicQueueKey);
-    }
-
-    @Override
-    public void onCommitLogAppend(MessageExtBrokerInner msg, AppendMessageResult result, MappedFile commitLogFile) {
-        DispatchRequest dispatchRequest;
-        switch (result.getStatus()) {
-            case PUT_OK:
-                dispatchRequest = constructDispatchRequest(msg, result);
-                onCommitLogDispatch(dispatchRequest, this.isDispatchFromSenderThread(), commitLogFile, false, false);
-                break;
-            case END_OF_FILE:
-                dispatchRequest = new DispatchRequest(0, true);
-                onCommitLogDispatch(dispatchRequest, this.isDispatchFromSenderThread(), commitLogFile, false, true);
-                break;
-            default:
-                throw new RuntimeException("");
-        }
-    }
-
-    @Override
-    public void onCommitLogDispatch(DispatchRequest dispatchRequest, boolean doDispatch, MappedFile commitLogFile, boolean isRecover, boolean isFileEnd) {
-        if (isFileEnd) {
-            if (doDispatch) {
-                long nextReputFromOffset = this.getCommitLog().rollNextFile(commitLogFile.getFileFromOffset());
-                dispatchRequest.setNextReputFromOffset(nextReputFromOffset);
-                syncDispatch(dispatchRequest, isRecover);
-            }
-        } else {
-            if (doDispatch) {
-                dispatchRequest.setNextReputFromOffset(dispatchRequest.getCommitLogOffset() + dispatchRequest.getMsgSize());
-                syncDispatch(dispatchRequest, isRecover);
-            }
-        }
-    }
-
-    private DispatchRequest constructDispatchRequest(MessageExtBrokerInner msg, AppendMessageResult appendResult) {
-        long tagsCode = 0;
-        String keys = "";
-        String uniqKey = null;
-        int sysFlag = msg.getSysFlag();
-        String topic = msg.getTopic();
-        long storeTimestamp = msg.getStoreTimestamp();
-        int queueId = msg.getQueueId();
-        long preparedTransactionOffset = msg.getPreparedTransactionOffset();
-        Map<String, String> propertiesMap = msg.getProperties();
-
-        if (msg.getProperties() != null && msg.getProperties().size() > 0) {
-
-            keys = propertiesMap.get(MessageConst.PROPERTY_KEYS);
-
-            uniqKey = propertiesMap.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
-
-            String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);
-            if (tags != null && tags.length() > 0) {
-                tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);
-            }
-
-            // Timing message processing
-            {
-                String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
-                if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {
-                    int delayLevel = Integer.parseInt(t);
-
-                    if (delayLevel > this.getScheduleMessageService().getMaxDelayLevel()) {
-                        delayLevel = this.getScheduleMessageService().getMaxDelayLevel();
-                    }
-
-                    if (delayLevel > 0) {
-                        tagsCode = this.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
-                                storeTimestamp);
-                    }
-                }
-            }
-        }
-
-        DispatchRequest dispatchRequest = new DispatchRequest(
-                topic,
-                queueId,
-                appendResult.getWroteOffset(),
-                appendResult.getWroteBytes(),
-                tagsCode,
-                storeTimestamp,
-                appendResult.getLogicsOffset(),
-                keys,
-                uniqKey,
-                sysFlag,
-                preparedTransactionOffset,
-                propertiesMap
-        );
-
-        if (null != propertiesMap && propertiesMap.containsKey(MessageConst.PROPERTY_INNER_NUM) && propertiesMap.containsKey(MessageConst.PROPERTY_INNER_BASE)) {
-            dispatchRequest.setMsgBaseOffset(Long.parseLong(propertiesMap.get(MessageConst.PROPERTY_INNER_BASE)));
-            dispatchRequest.setBatchSize(Short.parseShort(propertiesMap.get(MessageConst.PROPERTY_INNER_NUM)));
-        }
-        return dispatchRequest;
-    }
-
-    private void syncDispatch(DispatchRequest dispatchRequest, boolean isRecover) {
-        try {
-            this.syncProcessDispatchRequest(dispatchRequest, isRecover);
-        } catch (InterruptedException e) {
-            log.error("OnCommitlogAppend sync dispatch failed, addDispatchRequestQueue interrupted. DispatchRequest:{}", dispatchRequest);
-        }
-    }
-
-    class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
-
-        @Override
-        public void dispatch(DispatchRequest dispatchRequest) {
-            final int tranType = MessageSysFlag.getTransactionValue(dispatchRequest.getSysFlag());
-            switch (tranType) {
-                case MessageSysFlag.TRANSACTION_NOT_TYPE:
-                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
-                    StreamMessageStore.this.putMessagePositionInfo(dispatchRequest);
-                    if (BrokerRole.SLAVE != StreamMessageStore.this.getMessageStoreConfig().getBrokerRole()
-                            && StreamMessageStore.this.brokerConfig.isLongPollingEnable()) {
-                        StreamMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
-                                dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
-                                dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
-                                dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
-                    }
-                    if (StreamMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
-                        StreamMessageStore.this.storeStatsService
-                                .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
-                        StreamMessageStore.this.storeStatsService
-                                .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
-                                .add(dispatchRequest.getMsgSize());
-                    }
-                    break;
-                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
-                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
-                    break;
-            }
-        }
-    }
-
-    class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
-
-        @Override
-        public void dispatch(DispatchRequest request) {
-            if (StreamMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
-                StreamMessageStore.this.indexService.buildIndex(request);
-            }
-        }
-    }
-
-    class CleanCommitLogService {
-
-        private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20;
-        private final String diskSpaceWarningLevelRatio =
-                System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "");
-
-        private final String diskSpaceCleanForciblyRatio =
-                System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "");
-        private long lastRedeleteTimestamp = 0;
-
-        private volatile int manualDeleteFileSeveralTimes = 0;
-
-        private volatile boolean cleanImmediately = false;
-
-        double getDiskSpaceWarningLevelRatio() {
-            double finalDiskSpaceWarningLevelRatio;
-            if ("".equals(diskSpaceWarningLevelRatio)) {
-                finalDiskSpaceWarningLevelRatio = StreamMessageStore.this.getMessageStoreConfig().getDiskSpaceWarningLevelRatio() / 100.0;
-            } else {
-                finalDiskSpaceWarningLevelRatio = Double.parseDouble(diskSpaceWarningLevelRatio);
-            }
-
-            if (finalDiskSpaceWarningLevelRatio > 0.90) {
-                finalDiskSpaceWarningLevelRatio = 0.90;
-            }
-            if (finalDiskSpaceWarningLevelRatio < 0.35) {
-                finalDiskSpaceWarningLevelRatio = 0.35;
-            }
-
-            return finalDiskSpaceWarningLevelRatio;
-        }
-
-        double getDiskSpaceCleanForciblyRatio() {
-            double finalDiskSpaceCleanForciblyRatio;
-            if ("".equals(diskSpaceCleanForciblyRatio)) {
-                finalDiskSpaceCleanForciblyRatio = StreamMessageStore.this.getMessageStoreConfig().getDiskSpaceCleanForciblyRatio() / 100.0;
-            } else {
-                finalDiskSpaceCleanForciblyRatio = Double.parseDouble(diskSpaceCleanForciblyRatio);
-            }
-
-            if (finalDiskSpaceCleanForciblyRatio > 0.85) {
-                finalDiskSpaceCleanForciblyRatio = 0.85;
-            }
-            if (finalDiskSpaceCleanForciblyRatio < 0.30) {
-                finalDiskSpaceCleanForciblyRatio = 0.30;
-            }
-
-            return finalDiskSpaceCleanForciblyRatio;
-        }
-
-        public void excuteDeleteFilesManualy() {
-            this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES;
-            StreamMessageStore.log.info("executeDeleteFilesManually was invoked");
-        }
-
-        public long run() {
-            int deleteCount = 0;
-            try {
-                deleteCount = this.deleteExpiredFiles();
-
-                this.redeleteHangedFile();
-            } catch (Throwable e) {
-                StreamMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
-            }
-            return deleteCount;
-        }
-
-        private int deleteExpiredFiles() {
-            int deleteCount = 0;
-            long fileReservedTime = StreamMessageStore.this.getMessageStoreConfig().getFileReservedTime();
-            int deletePhysicFilesInterval = StreamMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
-            int destroyMapedFileIntervalForcibly = StreamMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
-            int maxBatchDeleteFilesNum = StreamMessageStore.this.getMessageStoreConfig().getMaxBatchDeleteFilesNum();
-            if (maxBatchDeleteFilesNum < 10) {
-                maxBatchDeleteFilesNum = 10;
-            }
-
-            boolean timeup = this.isTimeToDelete();
-            boolean spacefull = this.isSpaceToDelete();
-            boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
-
-            boolean needDelete = timeup || spacefull || manualDelete;
-
-            if (needDelete) {
-
-                if (manualDelete)
-                    this.manualDeleteFileSeveralTimes--;
-
-                boolean cleanAtOnce = StreamMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
-
-                String storePathPhysic = StreamMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
-                double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
-                long totalSpace = UtilAll.getTotalSpace(storePathPhysic);
-                double realRatio = (StreamMessageStore.this.commitLog.getMaxOffset() - StreamMessageStore.this.commitLog.getMinOffset()) / (totalSpace + 0.001);
-
-                cleanAtOnce = cleanAtOnce && (realRatio > 0.3);
-
-                log.info("begin to delete before {} hours file. timeup:{} spacefull:{} manualDeleteFileSeveralTimes:{} cleanAtOnce:{} maxBatchDeleteFilesNum:{} physicRatio:{}",
-                        fileReservedTime,
-                        timeup,
-                        spacefull,
-                        manualDeleteFileSeveralTimes,
-                        cleanAtOnce,
-                        maxBatchDeleteFilesNum,
-                        physicRatio);
-
-                fileReservedTime *= 60 * 60 * 1000;
-
-                deleteCount = StreamMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
-                        destroyMapedFileIntervalForcibly, cleanAtOnce);
-                if (deleteCount > 0) {
-                } else if (spacefull) {
-                    log.warn("disk space will be full soon, but delete file failed. timeup:{} manualDelete:{} cleanAtOnce:{} physicRatio:{}",
-                            timeup,
-                            manualDelete,
-                            cleanAtOnce,
-                            physicRatio);
-                }
-            }
-            return deleteCount;
-        }
-
-        private void redeleteHangedFile() {
-            int interval = StreamMessageStore.this.getMessageStoreConfig().getRedeleteHangedFileInterval();
-            long currentTimestamp = System.currentTimeMillis();
-            if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) {
-                this.lastRedeleteTimestamp = currentTimestamp;
-                int destroyMapedFileIntervalForcibly =
-                        StreamMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
-                if (StreamMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) {
-                }
-            }
-        }
-
-        public String getServiceName() {
-            return CleanCommitLogService.class.getSimpleName();
-        }
-
-        private boolean isTimeToDelete() {
-            String when = StreamMessageStore.this.getMessageStoreConfig().getDeleteWhen();
-            if (UtilAll.isItTimeToDo(when)) {
-                StreamMessageStore.log.info("it's time to reclaim disk space, " + when);
-                return true;
-            }
-
-            return false;
-        }
-
-        private boolean isSpaceToDelete() {
-            double ratio = StreamMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
-
-            cleanImmediately = false;
-
-            {
-                double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic());
-                if (physicRatio > getDiskSpaceWarningLevelRatio()) {
-                    boolean diskok = StreamMessageStore.this.runningFlags.getAndMakeDiskFull();
-                    if (diskok) {
-                        StreamMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
-                    }
-
-                    cleanImmediately = true;
-                } else if (physicRatio > getDiskSpaceCleanForciblyRatio()) {
-                    cleanImmediately = true;
-                } else {
-                    boolean diskok = StreamMessageStore.this.runningFlags.getAndMakeDiskOK();
-                    if (!diskok) {
-                        StreamMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
-                    }
-                }
-
-                if (physicRatio < 0 || physicRatio > ratio) {
-                    StreamMessageStore.log.info("physic disk maybe full soon, so reclaim space, {}, cleanImmediately {}", physicRatio, cleanImmediately);
-                    return true;
-                }
-            }
-
-            {
-                String storePathLogics = StorePathConfigHelper
-                        .getStorePathConsumeQueue(StreamMessageStore.this.getMessageStoreConfig().getStorePathRootDir());
-                double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
-                if (logicsRatio > getDiskSpaceWarningLevelRatio()) {
-                    boolean diskok = StreamMessageStore.this.runningFlags.getAndMakeDiskFull();
-                    if (diskok) {
-                        StreamMessageStore.log.error("logics disk maybe full soon " + logicsRatio + ", so mark disk full");
-                    }
-
-                    cleanImmediately = true;
-                } else if (logicsRatio > getDiskSpaceCleanForciblyRatio()) {
-                    cleanImmediately = true;
-                } else {
-                    boolean diskok = StreamMessageStore.this.runningFlags.getAndMakeDiskOK();
-                    if (!diskok) {
-                        StreamMessageStore.log.info("logics disk space OK " + logicsRatio + ", so mark disk ok");
-                    }
-                }
-
-                if (logicsRatio < 0 || logicsRatio > ratio) {
-                    StreamMessageStore.log.info("logics disk maybe full soon, so reclaim space, " + logicsRatio);
-                    return true;
-                }
-            }
-
-            return false;
-        }
-
-        public int getManualDeleteFileSeveralTimes() {
-            return manualDeleteFileSeveralTimes;
-        }
-
-        public void setManualDeleteFileSeveralTimes(int manualDeleteFileSeveralTimes) {
-            this.manualDeleteFileSeveralTimes = manualDeleteFileSeveralTimes;
-        }
-    }
-
-    class CleanConsumeQueueService {
-        private long lastPhysicalMinOffset = 0;
-
-        public void run() {
-            try {
-                this.deleteExpiredFiles();
-            } catch (Throwable e) {
-                StreamMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
-            }
-        }
-
-        private void deleteExpiredFiles() {
-            int deleteLogicsFilesInterval = StreamMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
-
-            long minOffset = StreamMessageStore.this.commitLog.getMinOffset();
-            if (minOffset > this.lastPhysicalMinOffset) {
-                this.lastPhysicalMinOffset = minOffset;
-
-                ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> tables = StreamMessageStore.this.consumeQueueTable;
-
-                for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : tables.values()) {
-                    for (ConsumeQueueInterface logic : maps.values()) {
-                        int deleteCount = StreamMessageStore.this.consumeQueueStore.deleteExpiredFile(logic, minOffset);
-
-                        if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
-                            try {
-                                Thread.sleep(deleteLogicsFilesInterval);
-                            } catch (InterruptedException ignored) {
-                            }
-                        }
-                    }
-                }
-
-                StreamMessageStore.this.indexService.deleteExpiredFile(minOffset);
-            }
-        }
-
-        public String getServiceName() {
-            return CleanConsumeQueueService.class.getSimpleName();
-        }
-    }
-
-    class CorrectLogicOffsetService {
-
-        private long lastForceCorrectTime = -1L;
-
-        public void run() {
-            try {
-                this.correctLogicMinOffset();
-            } catch (Throwable e) {
-                StreamMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
-            }
-        }
-
-        private boolean needCorrect(ConsumeQueueInterface logic, long minPhyOffset, long lastForeCorrectTimeCurRun) {
-            if (logic == null) {
-                return false;
-            }
-            // If first exist and not available, it means first file may destroy failed, delete it.
-            if (StreamMessageStore.this.consumeQueueStore.isFirstFileExist(logic) && !StreamMessageStore.this.consumeQueueStore.isFirstFileAvailable(logic)) {
-                log.error("CorrectLogicOffsetService.needCorrect. first file not available, trigger correct." +
-                                " topic:{}, queue:{}, maxPhyOffset in queue:{}, minPhyOffset " +
-                                "in commit log:{}, minOffset in queue:{}, maxOffset in queue:{}, cqType:{}"
-                        , logic.getTopic(), logic.getQueueId(), logic.getMaxPhysicOffset()
-                        , minPhyOffset, logic.getMinOffsetInQueue(), logic.getMaxOffsetInQueue(), logic.getCQType());
-                return true;
-            }
-
-            // logic.getMaxPhysicOffset() or minPhyOffset = -1
-            // means there is no message in current queue, so no need to correct.
-            if (logic.getMaxPhysicOffset() == -1 || minPhyOffset == -1) {
-                return false;
-            }
-
-            if (logic.getMaxPhysicOffset() < minPhyOffset) {
-                if (logic.getMinOffsetInQueue() < logic.getMaxOffsetInQueue()) {
-                    log.error("CorrectLogicOffsetService.needCorrect. logic max phy offset: {} is less than min phy offset: {}, " +
-                                    "but min offset: {} is less than max offset: {}. topic:{}, queue:{}, cqType:{}."
-                            , logic.getMaxPhysicOffset(), minPhyOffset, logic.getMinOffsetInQueue()
-                            , logic.getMaxOffsetInQueue(), logic.getTopic(), logic.getQueueId(), logic.getCQType());
-                    return true;
-                } else if (logic.getMinOffsetInQueue() == logic.getMaxOffsetInQueue()) {
-                    return false;
-                } else {
-                    log.error("CorrectLogicOffsetService.needCorrect. It should not happen, logic max phy offset: {} is less than min phy offset: {}," +
-                                    " but min offset: {} is larger than max offset: {}. topic:{}, queue:{}, cqType:{}"
-                            , logic.getMaxPhysicOffset(), minPhyOffset, logic.getMinOffsetInQueue()
-                            , logic.getMaxOffsetInQueue(), logic.getTopic(), logic.getQueueId(), logic.getCQType());
-                    return false;
-                }
-            }
-            //the logic.getMaxPhysicOffset() >= minPhyOffset
-            int forceCorrectInterval = StreamMessageStore.this.getMessageStoreConfig().getCorrectLogicMinOffsetForceInterval();
-            if ((System.currentTimeMillis() - lastForeCorrectTimeCurRun) > forceCorrectInterval) {
-                lastForceCorrectTime = System.currentTimeMillis();
-                CqUnit cqUnit = logic.getEarliestUnit();
-                if (cqUnit == null) {
-                    if (logic.getMinOffsetInQueue() == logic.getMaxOffsetInQueue()) {
-                        return false;
-                    } else {
-                        log.error("CorrectLogicOffsetService.needCorrect. cqUnit is null, logic max phy offset: {} is greater than min phy offset: {}, " +
-                                        "but min offset: {} is not equal to max offset: {}. topic:{}, queue:{}, cqType:{}."
-                                , logic.getMaxPhysicOffset(), minPhyOffset, logic.getMinOffsetInQueue()
-                                , logic.getMaxOffsetInQueue(), logic.getTopic(), logic.getQueueId(), logic.getCQType());
-                        return true;
-                    }
-                }
-
-                if (cqUnit.getPos() < minPhyOffset) {
-                    log.error("CorrectLogicOffsetService.needCorrect. logic max phy offset: {} is greater than min phy offset: {}, " +
-                                    "but minPhyPos in cq is: {}. min offset in queue: {}, max offset in queue: {}, topic:{}, queue:{}, cqType:{}."
-                            , logic.getMaxPhysicOffset(), minPhyOffset, cqUnit.getPos(), logic.getMinOffsetInQueue()
-                            , logic.getMaxOffsetInQueue(), logic.getTopic(), logic.getQueueId(), logic.getCQType());
-                    return true;
-                }
-
-                if (cqUnit.getPos() >= minPhyOffset) {
-
-                    // Normal case, do not need correct.
-                    return false;
-                }
-            }
-
-            return false;
-        }
-
-        private void correctLogicMinOffset() {
-
-            long lastForeCorrectTimeCurRun = lastForceCorrectTime;
-            long minPhyOffset = getMinPhyOffset();
-            ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> tables = StreamMessageStore.this.consumeQueueTable;
-            for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : tables.values()) {
-                for (ConsumeQueueInterface logic : maps.values()) {
-                    if (needCorrect(logic, minPhyOffset, lastForeCorrectTimeCurRun)) {
-                        doCorrect(logic, minPhyOffset);
-                    }
-                }
-            }
-        }
-
-        private void doCorrect(ConsumeQueueInterface logic, long minPhyOffset) {
-            StreamMessageStore.this.consumeQueueStore.deleteExpiredFile(logic, minPhyOffset);
-            int sleepIntervalWhenCorrectMinOffset = StreamMessageStore.this.getMessageStoreConfig().getCorrectLogicMinOffsetSleepInterval();
-            if (sleepIntervalWhenCorrectMinOffset > 0) {
-                try {
-                    Thread.sleep(sleepIntervalWhenCorrectMinOffset);
-                } catch (InterruptedException ignored) {
-                }
-            }
-        }
-
-        public String getServiceName() {
-            return CorrectLogicOffsetService.class.getSimpleName();
-        }
-    }
-
-    class FlushConsumeQueueService extends ServiceThread {
-        private static final int RETRY_TIMES_OVER = 3;
-        private long lastFlushTimestamp = 0;
-
-        private void doFlush(int retryTimes) {
-            int flushConsumeQueueLeastPages = StreamMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
-
-            if (retryTimes == RETRY_TIMES_OVER) {
-                flushConsumeQueueLeastPages = 0;
-            }
-
-            long logicsMsgTimestamp = 0;
-
-            int flushConsumeQueueThoroughInterval = StreamMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
-            long currentTimeMillis = System.currentTimeMillis();
-            if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
-                this.lastFlushTimestamp = currentTimeMillis;
-                flushConsumeQueueLeastPages = 0;
-                logicsMsgTimestamp = StreamMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
-            }
-
-            ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> tables = StreamMessageStore.this.consumeQueueTable;
-
-            for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : tables.values()) {
-                for (ConsumeQueueInterface cq : maps.values()) {
-                    boolean result = false;
-                    for (int i = 0; i < retryTimes && !result; i++) {
-                        result = StreamMessageStore.this.consumeQueueStore.flush(cq, flushConsumeQueueLeastPages);
-                    }
-                }
-            }
-
-            if (0 == flushConsumeQueueLeastPages) {
-                if (logicsMsgTimestamp > 0) {
-                    StreamMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
-                }
-                StreamMessageStore.this.getStoreCheckpoint().flush();
-            }
-        }
-
-        @Override
-        public void run() {
-            StreamMessageStore.log.info(this.getServiceName() + " service started");
-
-            while (!this.isStopped()) {
-                try {
-                    int interval = StreamMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue();
-                    this.waitForRunning(interval);
-                    this.doFlush(1);
-                } catch (Exception e) {
-                    StreamMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
-                }
-            }
-
-            this.doFlush(RETRY_TIMES_OVER);
-
-            StreamMessageStore.log.info(this.getServiceName() + " service end");
-        }
-
-        @Override
-        public String getServiceName() {
-            return FlushConsumeQueueService.class.getSimpleName();
-        }
-
-        @Override
-        public long getJointime() {
-            return 1000 * 60;
-        }
-    }
-
-    private class SyncReputMessageServiceFutureItem {
-        private Future future;
-        private long nextReputFromOffset;
-
-        public SyncReputMessageServiceFutureItem(Future future, long nextReputFromOffset) {
-            this.future = future;
-            this.nextReputFromOffset = nextReputFromOffset;
-        }
-
-        public Future getFuture() {
-            return future;
-        }
-
-        public void setFuture(Future future) {
-            this.future = future;
-        }
-
-        public long getNextReputFromOffset() {
-            return nextReputFromOffset;
-        }
-
-        public void setNextReputFromOffset(long nextReputFromOffset) {
-            this.nextReputFromOffset = nextReputFromOffset;
-        }
-    }
-
-    class SyncReputMessageService extends ReputMessageService {
-
-        private BlockingQueue<SyncReputMessageServiceFutureItem> reputResultQueue;
-
-        SyncReputMessageService() {
-            reputResultQueue = new LinkedBlockingDeque<>(messageStoreConfig.getDispatchCqThreads() * messageStoreConfig.getDispatchCqCacheNum());
-        }
-
-        void processDispatchRequestForRecover(DispatchRequest dispatchRequest) {
-            int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
-            if (dispatchRequest.isSuccess() && size > 0) {
-                StreamMessageStore.this.doDispatch(dispatchRequest);
-                this.reputFromOffset = Math.max(dispatchRequest.getNextReputFromOffset(), this.reputFromOffset);
-            }
-        }
-
-        void processDispatchRequest(DispatchRequest dispatchRequest) throws InterruptedException {
-            int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
-            if (dispatchRequest.isSuccess() && size > 0) {
-                Future future = StreamMessageStore.this.doDispatch(dispatchRequest, messageStoreConfig.isEnableAsyncReput());
-                reputResultQueue.put(new SyncReputMessageServiceFutureItem(future, dispatchRequest.getNextReputFromOffset()));
-            }
-        }
-
-        @Override
-        public void shutdown() {
-            for (int i = 0; i < 30 && this.reputResultQueue.size() > 0; i++) {
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException ignored) {
-                }
-            }
-
-            if (this.reputResultQueue.size() > 0) {
-                log.warn("shutdown SyncReputMessageService, but reputResultQueue not all processed, CLMaxOffset: {} reputFromOffset: {}",
-                        StreamMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);
-            }
-
-            super.shutdown();
-        }
-        @Override
-        public void run() {
-            StreamMessageStore.log.info(this.getServiceName() + " service started");
-            while (!this.isStopped()) {
-                try {
-                    SyncReputMessageServiceFutureItem item = reputResultQueue.poll(1, TimeUnit.SECONDS);
-                    if (null == item) {
-                        continue;
-                    }
-                    item.getFuture().get();
-                    this.reputFromOffset = Math.max(item.getNextReputFromOffset(), this.reputFromOffset);
-
-                } catch (Exception e) {
-                    StreamMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
-                    waitForRunning(100);
-                }
-            }
-
-            StreamMessageStore.log.info(this.getServiceName() + " service end");
-        }
-
-        @Override
-        public String getServiceName() {
-            return SyncReputMessageService.class.getSimpleName();
-        }
-    }
-
-    class ReputMessageService extends ServiceThread {
-
-        protected volatile long reputFromOffset = 0;
-
-        public long getReputFromOffset() {
-            return reputFromOffset;
-        }
-
-        public void setReputFromOffset(long reputFromOffset) {
-            this.reputFromOffset = reputFromOffset;
-        }
-
-        public boolean dispatched(long physicalOffset) {
-            return this.reputFromOffset > physicalOffset;
-        }
-
-        @Override
-        public void shutdown() {
-            for (int i = 0; i < 30 && this.isCommitLogAvailable(); i++) {
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException ignored) {
-                }
-            }
-
-            if (this.isCommitLogAvailable()) {
-                log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}",
-                        StreamMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);
-            }
-
-            super.shutdown();
-        }
-
-        public long behind() {
-            return StreamMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;
-        }
-
-        protected boolean isCommitLogAvailable() {
-            return this.reputFromOffset < StreamMessageStore.this.commitLog.getMaxOffset();
-        }
-
-        private void doReput() throws Exception {
-            if (this.reputFromOffset < StreamMessageStore.this.commitLog.getMinOffset()) {
-                log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
-                        this.reputFromOffset, StreamMessageStore.this.commitLog.getMinOffset());
-                this.reputFromOffset = StreamMessageStore.this.commitLog.getMinOffset();
-            }
-            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
-
-                if (StreamMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
-                        && this.reputFromOffset >= StreamMessageStore.this.getConfirmOffset()) {
-                    break;
-                }
-
-                SelectMappedBufferResult result = StreamMessageStore.this.commitLog.getData(reputFromOffset);
-                boolean reputAsync = messageStoreConfig.isEnableAsyncReput();
-                if (result != null) {
-                    long cacheReputFromOffset = this.reputFromOffset;
-                    List<Future> futures = new LinkedList<>();
-                    try {
-                        this.reputFromOffset = result.getStartOffset();
-                        cacheReputFromOffset = this.reputFromOffset;
-
-                        for (int readSize = 0; readSize < result.getSize() && doNext; ) {
-                            DispatchRequest dispatchRequest =
-                                    StreamMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
-                            int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
-
-                            if (dispatchRequest.isSuccess()) {
-                                if (size > 0) {
-                                    futures.add(StreamMessageStore.this.doDispatch(dispatchRequest, reputAsync));
-
-                                    if (BrokerRole.SLAVE != StreamMessageStore.this.getMessageStoreConfig().getBrokerRole()
-                                            && StreamMessageStore.this.brokerConfig.isLongPollingEnable()) {
-                                        StreamMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
-                                                dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
-                                                dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
-                                                dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
-                                    }
-
-                                    cacheReputFromOffset += size;
-                                    readSize += size;
-                                    if (StreamMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
-                                        StreamMessageStore.this.storeStatsService
-                                                .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1);
-                                        StreamMessageStore.this.storeStatsService
-                                                .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
-                                                .add(dispatchRequest.getMsgSize());
-                                    }
-                                } else if (size == 0) {
-                                    cacheReputFromOffset = StreamMessageStore.this.commitLog.rollNextFile(cacheReputFromOffset);
-                                    readSize = result.getSize();
-                                }
-                            } else if (!dispatchRequest.isSuccess()) {
-
-                                if (size > 0) {
-                                    log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", cacheReputFromOffset);
-                                    cacheReputFromOffset += size;
-                                } else {
-                                    doNext = false;
-                                    if (StreamMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
-                                        log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
-                                                cacheReputFromOffset);
-                                        cacheReputFromOffset += result.getSize() - readSize;
-                                    }
-                                }
-                            }
-                        }
-                        for (Future future: futures) {
-                            future.get();
-                        }
-                        futures.clear();
-                        this.reputFromOffset = cacheReputFromOffset;
-                    } finally {
-                        result.release();
-                    }
-                } else {
-                    doNext = false;
-                }
-            }
-        }
-
-        @Override
-        public void run() {
-            StreamMessageStore.log.info(this.getServiceName() + " service started");
-
-            while (!this.isStopped()) {
-                try {
-                    Thread.sleep(1);
-                    this.doReput();
-                } catch (Exception e) {
-                    StreamMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
-                }
-            }
-
-            StreamMessageStore.log.info(this.getServiceName() + " service end");
-        }
-
-        @Override
-        public String getServiceName() {
-            return ReputMessageService.class.getSimpleName();
-        }
-
-    }
-}
diff --git a/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java b/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java
index 97c50b9..a78eeed 100644
--- a/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java
+++ b/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java
@@ -8,12 +8,13 @@
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
  */
+
 package org.apache.rocketmq.store;
 
 import java.util.ArrayList;
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 7a41c27..e58e695 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.store.config;
 import org.apache.rocketmq.common.annotation.ImportantField;
 import org.apache.rocketmq.store.ConsumeQueue;
 import org.apache.rocketmq.store.queue.BatchConsumeQueue;
-import org.apache.rocketmq.store.queue.CQType;
 
 import java.io.File;
 
@@ -206,9 +205,6 @@ public class MessageStoreConfig {
     @ImportantField
     private boolean enableCleanExpiredOffset = false;
 
-    @ImportantField
-    private String defaultCQType = CQType.SimpleCQ.toString();
-
     private int maxAsyncPutMessageRequests = 5000;
 
     private int pullBatchMaxMessageCount = 160;
@@ -799,13 +795,6 @@ public class MessageStoreConfig {
         this.enableCleanExpiredOffset = enableCleanExpiredOffset;
     }
 
-    public String getDefaultCQType() {
-        return defaultCQType;
-    }
-
-    public void setDefaultCQType(String defaultCQType) {
-        this.defaultCQType = defaultCQType;
-    }
     public String getReadOnlyCommitLogStorePaths() {
         return readOnlyCommitLogStorePaths;
     }
diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
index 50078c9..cbe794e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
@@ -33,7 +33,6 @@ import io.openmessaging.storage.dledger.utils.DLedgerUtils;
 import java.net.Inet6Address;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -431,7 +430,7 @@ public class DLedgerCommitLog extends CommitLog {
         String topicQueueKey = msg.getTopic() + "-" + msg.getQueueId();
         topicQueueLock.lock(topicQueueKey);
         try {
-            defaultMessageStore.assignOffset(topicQueueKey, msg, getBatchNum(msg));
+            defaultMessageStore.assignOffset(topicQueueKey, msg, getMessageNum(msg));
 
             encodeResult = this.messageSerializer.serialize(msg);
             if (encodeResult.status != AppendMessageStatus.PUT_OK) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java
index 0b6a9dd..7d1feba 100644
--- a/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.store.logfile;
 
 import org.apache.rocketmq.store.AppendMessageCallback;
 import org.apache.rocketmq.store.AppendMessageResult;
-import org.apache.rocketmq.store.CommitLog;
 import org.apache.rocketmq.store.MessageExtBatch;
 import org.apache.rocketmq.store.MessageExtBrokerInner;
 import org.apache.rocketmq.store.PutMessageContext;
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
index de0e636..648a472 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
@@ -17,11 +17,17 @@
 
 package org.apache.rocketmq.store.queue;
 
+import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.store.DispatchRequest;
 import org.apache.rocketmq.store.MappedFileQueue;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.store.config.BrokerRole;
@@ -29,6 +35,7 @@ import org.apache.rocketmq.store.logfile.MappedFile;
 
 import java.io.File;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -386,9 +393,9 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy
     }
 
     @Override
-    public int deleteExpiredFile(long offset) {
-        int cnt = this.mappedFileQueue.deleteExpiredFileByOffset(offset, CQ_STORE_UNIT_SIZE);
-        this.correctMinOffset(offset);
+    public int deleteExpiredFile(long minCommitLogPos) {
+        int cnt = this.mappedFileQueue.deleteExpiredFileByOffset(minCommitLogPos, CQ_STORE_UNIT_SIZE);
+        this.correctMinOffset(minCommitLogPos);
         return cnt;
     }
 
@@ -473,6 +480,21 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy
         this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
     }
 
+    @Override
+    public void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, MessageExtBrokerInner msg, short messageNum) {
+        HashMap<String, Long> batchTopicQueueTable = queueOffsetAssigner.getBatchTopicQueueTable();
+        String topicQueueKey = getTopic() + "-" + getQueueId();
+
+        Long topicOffset = batchTopicQueueTable.computeIfAbsent(topicQueueKey, k -> 0L);
+
+        if (MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) {
+            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_BASE, String.valueOf(topicOffset));
+            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
+        }
+        msg.setQueueOffset(topicOffset);
+        batchTopicQueueTable.put(topicQueueKey, topicOffset + messageNum);
+    }
+
     boolean putBatchMessagePositionInfo(final long offset, final int size, final long tagsCode, final long storeTime,
         final long msgBaseOffset, final short batchSize) {
 
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
index 48f717d..5232a74 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
@@ -17,6 +17,8 @@
 
 package org.apache.rocketmq.store.queue;
 
+import org.apache.rocketmq.common.attribute.CQType;
+
 public interface ConsumeQueueInterface {
     /**
      * Get the topic name
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index bf42e74..e8146ff 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -16,27 +16,49 @@
  */
 package org.apache.rocketmq.store.queue;
 
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.attribute.CQType;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.QueueTypeUtils;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.store.ConsumeQueue;
-import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.DispatchRequest;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
 import org.apache.rocketmq.store.MessageStore;
-import org.apache.rocketmq.store.StoreUtil;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.config.StorePathConfigHelper;
 
+import java.io.File;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 public class ConsumeQueueStore {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
 
     protected final MessageStore messageStore;
     protected final MessageStoreConfig messageStoreConfig;
+    protected final QueueOffsetAssigner queueOffsetAssigner = new QueueOffsetAssigner();
     protected final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueueInterface>> consumeQueueTable;
 
-    public ConsumeQueueStore(MessageStore messageStore, MessageStoreConfig messageStoreConfig, ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> consumeQueueTable) {
+    // Should be careful, do not change the topic config
+    // TopicConfigManager is more suitable here.
+    private ConcurrentMap<String, TopicConfig> topicConfigTable;
+
+    public ConsumeQueueStore(MessageStore messageStore, MessageStoreConfig messageStoreConfig) {
         this.messageStore = messageStore;
         this.messageStoreConfig = messageStoreConfig;
-        this.consumeQueueTable = consumeQueueTable;
+        this.consumeQueueTable = new ConcurrentHashMap<>(32);
+    }
+
+    public void setTopicConfigTable(ConcurrentMap<String, TopicConfig> topicConfigTable) {
+        this.topicConfigTable = topicConfigTable;
     }
 
     private FileQueueLifeCycle getLifeCycle(String topic, int queueId) {
@@ -64,21 +86,143 @@ public class ConsumeQueueStore {
         fileQueueLifeCycle.putMessagePositionInfoWrapper(request);
     }
 
+    public void putMessagePositionInfoWrapper(DispatchRequest dispatchRequest) {
+        ConsumeQueueInterface cq = this.findOrCreateConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
+        this.putMessagePositionInfoWrapper(cq, dispatchRequest);
+    }
+
     public boolean load(ConsumeQueueInterface consumeQueue) {
         FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
         return fileQueueLifeCycle.load();
     }
 
+    public boolean load() {
+        return loadConsumeQueues() && loadBatchConsumeQueues();
+    }
+
+    private boolean loadBatchConsumeQueues() {
+        File dirLogic = new File(StorePathConfigHelper.getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
+        File[] fileTopicList = dirLogic.listFiles();
+        if (fileTopicList != null) {
+
+            for (File fileTopic : fileTopicList) {
+                String topic = fileTopic.getName();
+
+                File[] fileQueueIdList = fileTopic.listFiles();
+                if (fileQueueIdList != null) {
+                    for (File fileQueueId : fileQueueIdList) {
+                        int queueId;
+                        try {
+                            queueId = Integer.parseInt(fileQueueId.getName());
+                        } catch (NumberFormatException e) {
+                            continue;
+                        }
+
+                        TopicConfig topicConfig = this.topicConfigTable == null ? null : this.topicConfigTable.get(topic);
+
+                        // For batch consume queue, the topic config must exist
+                        if (topicConfig == null) {
+                            log.warn("topic: {} has no topic config.", topic);
+                            continue;
+                        }
+
+                        if (!Objects.equals(CQType.BatchCQ, QueueTypeUtils.getCQType(Optional.of(topicConfig)))) {
+                            log.error("[BUG]topic: {} should be BCQ.", topic);
+                        }
+
+                        ConsumeQueueInterface logic = new BatchConsumeQueue(
+                                topic,
+                                queueId,
+                                StorePathConfigHelper.getStorePathBatchConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
+                                this.messageStoreConfig.getMappedFileSizeConsumeQueue(),
+                                this.messageStore);
+                        this.putConsumeQueue(topic, queueId, logic);
+                        if (!this.load(logic)) {
+                            return false;
+                        }
+                    }
+                }
+            }
+        }
+
+        log.info("load batch consume queue all over, OK");
+
+        return true;
+    }
+
+    private boolean loadConsumeQueues() {
+        File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
+        File[] fileTopicList = dirLogic.listFiles();
+        if (fileTopicList != null) {
+
+            for (File fileTopic : fileTopicList) {
+                String topic = fileTopic.getName();
+
+                File[] fileQueueIdList = fileTopic.listFiles();
+                if (fileQueueIdList != null) {
+                    for (File fileQueueId : fileQueueIdList) {
+                        int queueId;
+                        try {
+                            queueId = Integer.parseInt(fileQueueId.getName());
+                        } catch (NumberFormatException e) {
+                            continue;
+                        }
+                        ConsumeQueueInterface logic = new ConsumeQueue(
+                                topic,
+                                queueId,
+                                StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
+                                this.messageStoreConfig.getMappedFileSizeConsumeQueue(),
+                                this.messageStore);
+                        this.putConsumeQueue(topic, queueId, logic);
+                        if (!this.load(logic)) {
+                            return false;
+                        }
+                    }
+                }
+            }
+        }
+
+        log.info("load logics queue all over, OK");
+
+        return true;
+    }
+
     public void recover(ConsumeQueueInterface consumeQueue) {
         FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
         fileQueueLifeCycle.recover();
     }
 
+    public long recover() {
+        long maxPhysicOffset = -1;
+        for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
+            for (ConsumeQueueInterface logic : maps.values()) {
+                this.recover(logic);
+                if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
+                    maxPhysicOffset = logic.getMaxPhysicOffset();
+                }
+            }
+        }
+
+        return maxPhysicOffset;
+    }
+
     public void checkSelf(ConsumeQueueInterface consumeQueue) {
         FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
         fileQueueLifeCycle.checkSelf();
     }
 
+    public void checkSelf() {
+        Iterator<Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>> it = this.consumeQueueTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> next = it.next();
+            Iterator<Map.Entry<Integer, ConsumeQueueInterface>> itNext = next.getValue().entrySet().iterator();
+            while (itNext.hasNext()) {
+                Map.Entry<Integer, ConsumeQueueInterface> cq = itNext.next();
+                this.checkSelf(cq.getValue());
+            }
+        }
+    }
+
     public boolean flush(ConsumeQueueInterface consumeQueue, int flushLeastPages) {
         FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
         return fileQueueLifeCycle.flush(flushLeastPages);
@@ -89,9 +233,9 @@ public class ConsumeQueueStore {
         fileQueueLifeCycle.destroy();
     }
 
-    public int deleteExpiredFile(ConsumeQueueInterface consumeQueue, long minOffset) {
+    public int deleteExpiredFile(ConsumeQueueInterface consumeQueue, long minCommitLogPos) {
         FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
-        return fileQueueLifeCycle.deleteExpiredFile(minOffset);
+        return fileQueueLifeCycle.deleteExpiredFile(minCommitLogPos);
     }
 
     public void truncateDirtyLogicFiles(ConsumeQueueInterface consumeQueue, long phyOffset) {
@@ -124,10 +268,9 @@ public class ConsumeQueueStore {
     }
 
     private ConsumeQueueInterface doFindOrCreateConsumeQueue(String topic, int queueId) {
-
         ConcurrentMap<Integer, ConsumeQueueInterface> map = consumeQueueTable.get(topic);
         if (null == map) {
-            ConcurrentMap<Integer, ConsumeQueueInterface> newMap = new ConcurrentHashMap<Integer, ConsumeQueueInterface>(128);
+            ConcurrentMap<Integer, ConsumeQueueInterface> newMap = new ConcurrentHashMap<>(128);
             ConcurrentMap<Integer, ConsumeQueueInterface> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
             if (oldMap != null) {
                 map = oldMap;
@@ -143,7 +286,9 @@ public class ConsumeQueueStore {
 
         ConsumeQueueInterface newLogic;
 
-        if (StoreUtil.isStreamMode(this.messageStore)) {
+        Optional<TopicConfig> topicConfig = this.getTopicConfig(topic);
+        // TODO maybe the topic has been deleted.
+        if (Objects.equals(CQType.BatchCQ, QueueTypeUtils.getCQType(topicConfig))) {
             newLogic = new BatchConsumeQueue(
                     topic,
                     queueId,
@@ -162,7 +307,7 @@ public class ConsumeQueueStore {
                     queueId,
                     StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
                     this.messageStoreConfig.getMappedFileSizeConsumeQueue(),
-                    (DefaultMessageStore) this.messageStore);
+                    this.messageStore);
             ConsumeQueueInterface oldLogic = map.putIfAbsent(queueId, newLogic);
             if (oldLogic != null) {
                 logic = oldLogic;
@@ -173,4 +318,129 @@ public class ConsumeQueueStore {
 
         return logic;
     }
+
+    public Long getMaxOffset(String topic, int queueId) {
+        return this.queueOffsetAssigner.getTopicQueueTable().get(topic + "-" + queueId);
+    }
+
+    public void setTopicQueueTable(HashMap<String, Long> topicQueueTable) {
+        this.queueOffsetAssigner.setTopicQueueTable(topicQueueTable);
+    }
+
+    public void setBatchTopicQueueTable(HashMap<String, Long> batchTopicQueueTable) {
+        this.queueOffsetAssigner.setBatchTopicQueueTable(batchTopicQueueTable);
+    }
+
+    public void assignQueueOffset(MessageExtBrokerInner msg, short messageNum) {
+        FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(msg.getTopic(), msg.getQueueId());
+        fileQueueLifeCycle.assignQueueOffset(this.queueOffsetAssigner, msg, messageNum);
+    }
+
+    public void removeTopicQueueTable(String topic, Integer queueId) {
+        this.queueOffsetAssigner.remove(topic, queueId);
+    }
+
+    public ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> getConsumeQueueTable() {
+        return consumeQueueTable;
+    }
+
+    private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueueInterface consumeQueue) {
+        ConcurrentMap<Integer/* queueId */, ConsumeQueueInterface> map = this.consumeQueueTable.get(topic);
+        if (null == map) {
+            map = new ConcurrentHashMap<>();
+            map.put(queueId, consumeQueue);
+            this.consumeQueueTable.put(topic, map);
+        } else {
+            map.put(queueId, consumeQueue);
+        }
+    }
+
+    public void recoverOffsetTable(long minPhyOffset) {
+        HashMap<String, Long> cqOffsetTable = new HashMap<>(1024);
+        HashMap<String, Long> bcqOffsetTable = new HashMap<>(1024);
+
+        for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
+            for (ConsumeQueueInterface logic : maps.values()) {
+                String key = logic.getTopic() + "-" + logic.getQueueId();
+
+                long maxOffsetInQueue = logic.getMaxOffsetInQueue();
+                if (Objects.equals(CQType.BatchCQ, logic.getCQType())) {
+                    bcqOffsetTable.put(key, maxOffsetInQueue);
+                } else {
+                    cqOffsetTable.put(key, maxOffsetInQueue);
+                }
+
+                this.correctMinOffset(logic, minPhyOffset);
+            }
+        }
+
+        this.setTopicQueueTable(cqOffsetTable);
+        this.setBatchTopicQueueTable(bcqOffsetTable);
+    }
+
+    public void destroy() {
+        for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
+            for (ConsumeQueueInterface logic : maps.values()) {
+                this.destroy(logic);
+            }
+        }
+    }
+
+    public void cleanExpired(long minCommitLogOffset) {
+        Iterator<Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>> it = this.consumeQueueTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> next = it.next();
+            String topic = next.getKey();
+            if (!topic.equals(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC)) {
+                ConcurrentMap<Integer, ConsumeQueueInterface> queueTable = next.getValue();
+                Iterator<Map.Entry<Integer, ConsumeQueueInterface>> itQT = queueTable.entrySet().iterator();
+                while (itQT.hasNext()) {
+                    Map.Entry<Integer, ConsumeQueueInterface> nextQT = itQT.next();
+                    long maxCLOffsetInConsumeQueue = nextQT.getValue().getLastOffset();
+
+                    if (maxCLOffsetInConsumeQueue == -1) {
+                        log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.",
+                                nextQT.getValue().getTopic(),
+                                nextQT.getValue().getQueueId(),
+                                nextQT.getValue().getMaxPhysicOffset(),
+                                nextQT.getValue().getMinLogicOffset());
+                    } else if (maxCLOffsetInConsumeQueue < minCommitLogOffset) {
+                        log.info(
+                                "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}",
+                                topic,
+                                nextQT.getKey(),
+                                minCommitLogOffset,
+                                maxCLOffsetInConsumeQueue);
+
+                        removeTopicQueueTable(nextQT.getValue().getTopic(),
+                                nextQT.getValue().getQueueId());
+
+                        this.destroy(nextQT.getValue());
+                        itQT.remove();
+                    }
+                }
+
+                if (queueTable.isEmpty()) {
+                    log.info("cleanExpiredConsumerQueue: {},topic destroyed", topic);
+                    it.remove();
+                }
+            }
+        }
+    }
+
+    public void truncateDirty(long phyOffset) {
+        for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
+            for (ConsumeQueueInterface logic : maps.values()) {
+                this.truncateDirtyLogicFiles(logic, phyOffset);
+            }
+        }
+    }
+
+    public Optional<TopicConfig> getTopicConfig(String topic) {
+        if (this.topicConfigTable == null) {
+            return Optional.empty();
+        }
+
+        return Optional.ofNullable(this.topicConfigTable.get(topic));
+    }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java b/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java
index dd8c86d..05c217f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.store.queue;
 
 import org.apache.rocketmq.store.DispatchRequest;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
 import org.apache.rocketmq.store.Swappable;
 
 public interface FileQueueLifeCycle extends Swappable {
@@ -32,4 +33,5 @@ public interface FileQueueLifeCycle extends Swappable {
     boolean isFirstFileExist();
     void correctMinOffset(long minCommitLogOffset);
     void putMessagePositionInfoWrapper(DispatchRequest request);
+    void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, MessageExtBrokerInner msg, short messageNum);
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
new file mode 100644
index 0000000..09e18ec
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.store.queue;
+
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+import java.util.HashMap;
+
+/**
+ * QueueOffsetAssigner is a component for assigning queue.
+ *
+ */
+public class QueueOffsetAssigner {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+
+    private HashMap<String, Long> topicQueueTable = new HashMap<>(1024);
+    private HashMap<String, Long> batchTopicQueueTable = new HashMap<>(1024);
+
+    public HashMap<String, Long> getTopicQueueTable() {
+        return topicQueueTable;
+    }
+
+    public void setTopicQueueTable(HashMap<String, Long> topicQueueTable) {
+        this.topicQueueTable = topicQueueTable;
+    }
+
+    public HashMap<String, Long> getBatchTopicQueueTable() {
+        return batchTopicQueueTable;
+    }
+
+    public void setBatchTopicQueueTable(HashMap<String, Long> batchTopicQueueTable) {
+        this.batchTopicQueueTable = batchTopicQueueTable;
+    }
+
+    public synchronized void remove(String topic, Integer queueId) {
+        String topicQueueKey = topic + "-" + queueId;
+        // Beware of thread-safety
+        this.topicQueueTable.remove(topicQueueKey);
+        this.batchTopicQueueTable.remove(topicQueueKey);
+
+        log.info("removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}", topic, queueId);
+    }
+}
\ No newline at end of file
diff --git a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java b/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
deleted file mode 100644
index 1067337..0000000
--- a/store/src/main/java/org/apache/rocketmq/store/util/QueueTypeUtils.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.store.util;
-
-import org.apache.rocketmq.common.TopicAttributes;
-import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.MessageStore;
-import org.apache.rocketmq.store.StreamMessageStore;
-import org.apache.rocketmq.store.queue.CQType;
-
-import java.util.Map;
-
-public class QueueTypeUtils {
-
-    @Deprecated
-    public static CQType getCQType(MessageStore messageStore) {
-        if (messageStore instanceof DefaultMessageStore) {
-            return CQType.SimpleCQ;
-        } else if (messageStore instanceof StreamMessageStore) {
-            return CQType.BatchCQ;
-        } else {
-            throw new RuntimeException("new cq type is not supported now.");
-        }
-    }
-
-    public static CQType getCQType(TopicConfig topicConfig) {
-        String attributeName = TopicAttributes.QUEUE_TYPE.getName();
-
-        Map<String, String> attributes = topicConfig.getAttributes();
-        if (attributes == null || attributes.size() == 0) {
-            return CQType.valueOf(TopicAttributes.QUEUE_TYPE.getDefaultValue());
-        }
-
-        if (attributes.containsKey(attributeName)) {
-            return CQType.valueOf(attributes.get(attributeName));
-        } else {
-            return CQType.valueOf(TopicAttributes.QUEUE_TYPE.getDefaultValue());
-        }
-    }
-}
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java
index 1ccb974..e27483e 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.store.config.FlushDiskType;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.config.StorePathConfigHelper;
-import org.apache.rocketmq.store.queue.CQType;
+import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.junit.After;
 import org.junit.Before;
@@ -72,7 +72,6 @@ public class DefaultMessageStoreShutDownTest {
         messageStoreConfig.setMaxIndexNum(100 * 100);
         messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
         messageStoreConfig.setHaListenPort(StoreTestBase.nextPort());
-        messageStoreConfig.setDefaultCQType(CQType.SimpleCQ.name());
         return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), null, new BrokerConfig());
     }
 
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 41cf92f..932fc2f 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -38,7 +38,7 @@ import org.apache.rocketmq.store.config.FlushDiskType;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.config.StorePathConfigHelper;
 import org.apache.rocketmq.store.logfile.DefaultMappedFile;
-import org.apache.rocketmq.store.queue.CQType;
+import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
 import org.apache.rocketmq.store.queue.CqUnit;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
@@ -115,7 +115,6 @@ public class DefaultMessageStoreTest {
         messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
         messageStoreConfig.setFlushIntervalConsumeQueue(1);
         messageStoreConfig.setHaListenPort(StoreTestBase.nextPort());
-        messageStoreConfig.setDefaultCQType(CQType.SimpleCQ.name());
         return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), new MyMessageArrivingListener(), new BrokerConfig());
     }
 
diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
index 3ebe148..500cd81 100644
--- a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest.java
@@ -17,9 +17,17 @@
 
 package org.apache.rocketmq.store.queue;
 
+import org.apache.rocketmq.common.TopicAttributes;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.attribute.CQType;
+import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
+import org.apache.rocketmq.common.utils.QueueTypeUtils;
+import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.GetMessageStatus;
 import org.apache.rocketmq.store.MessageExtBrokerInner;
@@ -27,21 +35,168 @@ import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
 
 public class BatchConsumeMessageTest extends QueueTestBase {
+    private MessageStore messageStore;
 
-    @Test
-    public void testGetOffsetInQueueByTime() throws Exception {
-        MessageStore messageStore = createMessageStore(null, true, CQType.BatchCQ);
+    @Before
+    public void init() throws Exception {
+        messageStore = createMessageStore(null, true);
         messageStore.load();
         messageStore.start();
+    }
+
+    @After
+    public void destroy() {
+        messageStore.shutdown();
+        messageStore.destroy();
+
+        File file = new File(messageStore.getMessageStoreConfig().getStorePathRootDir());
+        UtilAll.deleteFile(file);
+    }
+
+    @Test
+    public void testSendMessagesToCqTopic() {
+        String topic = UUID.randomUUID().toString();
+        createTopic(topic, CQType.SimpleCQ, messageStore);
+
+        int batchNum = 10;
+
+        // case 1 has PROPERTY_INNER_NUM but has no INNER_BATCH_FLAG
+        MessageExtBrokerInner messageExtBrokerInner = buildMessage(topic, batchNum);
+        messageExtBrokerInner.setSysFlag(0);
+        PutMessageResult putMessageResult = messageStore.putMessage(messageExtBrokerInner);
+        Assert.assertEquals(PutMessageStatus.MESSAGE_ILLEGAL, putMessageResult.getPutMessageStatus());
+
+        // case 2 has PROPERTY_INNER_NUM and has INNER_BATCH_FLAG, but is not a batchCq
+        messageExtBrokerInner = buildMessage(topic, 1);
+        putMessageResult = messageStore.putMessage(messageExtBrokerInner);
+        Assert.assertEquals(PutMessageStatus.MESSAGE_ILLEGAL, putMessageResult.getPutMessageStatus());
+
+        // case 3 has neither PROPERTY_INNER_NUM nor INNER_BATCH_FLAG.
+        messageExtBrokerInner = buildMessage(topic, -1);
+        putMessageResult = messageStore.putMessage(messageExtBrokerInner);
+        Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
+    }
+
+    @Test
+    public void testSendMessagesToBcqTopic() {
+        String topic = UUID.randomUUID().toString();
+        createTopic(topic, CQType.BatchCQ, messageStore);
+
+        // case 1 has PROPERTY_INNER_NUM but has no INNER_BATCH_FLAG
+        MessageExtBrokerInner messageExtBrokerInner = buildMessage(topic, 1);
+        PutMessageResult putMessageResult = messageStore.putMessage(messageExtBrokerInner);
+        Assert.assertEquals(PutMessageStatus.MESSAGE_ILLEGAL, putMessageResult.getPutMessageStatus());
+
+        // case 2 has neither PROPERTY_INNER_NUM nor INNER_BATCH_FLAG.
+        messageExtBrokerInner = buildMessage(topic, -1);
+        putMessageResult = messageStore.putMessage(messageExtBrokerInner);
+        Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
+
+        // case 3 has INNER_BATCH_FLAG but has no PROPERTY_INNER_NUM.
+        messageExtBrokerInner = buildMessage(topic, 1);
+        MessageAccessor.clearProperty(messageExtBrokerInner, MessageConst.PROPERTY_INNER_NUM);
+        messageExtBrokerInner.setSysFlag(MessageSysFlag.INNER_BATCH_FLAG);
+        putMessageResult = messageStore.putMessage(messageExtBrokerInner);
+        Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
+    }
+
+    @Test
+    public void testConsumeBatchMessage() {
+        String topic = UUID.randomUUID().toString();
+        createTopic(topic, CQType.BatchCQ, messageStore);
+        int batchNum = 10;
+
+        MessageExtBrokerInner messageExtBrokerInner = buildMessage(topic, batchNum);
+        PutMessageResult putMessageResult = messageStore.putMessage(messageExtBrokerInner);
+        Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
+
+        await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
+
+        List<GetMessageResult> results = new ArrayList<>();
+        for (int i = 0; i < batchNum; i++) {
+            GetMessageResult result = messageStore.getMessage("whatever", topic, 0, i, Integer.MAX_VALUE, Integer.MAX_VALUE, null);
+            try {
+                Assert.assertEquals(GetMessageStatus.FOUND, result.getStatus());
+                results.add(result);
+            } finally {
+                result.release();
+            }
+        }
+
+        for (GetMessageResult result : results) {
+            Assert.assertEquals(0, result.getMinOffset());
+            Assert.assertEquals(batchNum, result.getMaxOffset());
+        }
+
+    }
+
+    @Test
+    public void testNextBeginOffsetConsumeBatchMessage() {
+        String topic = UUID.randomUUID().toString();
+        createTopic(topic, CQType.BatchCQ, messageStore);
+        Random random = new Random();
+        int putMessageCount = 1000;
+
+        Queue<Integer> queue = new ArrayDeque<>();
+        for (int i = 0; i < putMessageCount; i++) {
+            int batchNum = random.nextInt(1000) + 2;
+            MessageExtBrokerInner messageExtBrokerInner = buildMessage(topic, batchNum);
+            PutMessageResult putMessageResult = messageStore.putMessage(messageExtBrokerInner);
+            Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
+            queue.add(batchNum);
+        }
+
+        await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
+
+        long pullOffset = 0L;
+        int getMessageCount = 0;
+        while (true) {
+            GetMessageResult getMessageResult = messageStore.getMessage("group", topic, 0, pullOffset, 1, null);
+            if (Objects.equals(getMessageResult.getStatus(), GetMessageStatus.OFFSET_OVERFLOW_ONE)) {
+                break;
+            }
+            Assert.assertEquals(1, getMessageResult.getMessageQueueOffset().size());
+            Long baseOffset = getMessageResult.getMessageQueueOffset().get(0);
+            Integer batchNum = queue.poll();
+            Assert.assertNotNull(batchNum);
+            Assert.assertEquals(baseOffset + batchNum, getMessageResult.getNextBeginOffset());
+            pullOffset = getMessageResult.getNextBeginOffset();
+            getMessageCount++;
+        }
+        Assert.assertEquals(putMessageCount, getMessageCount);
+    }
+
+    @Test
+    public void testGetOffsetInQueueByTime() throws Exception {
         String topic = "testGetOffsetInQueueByTime";
 
-        //The initial min max offset, before and after the creation of consume queue
+        createTopic(topic, CQType.BatchCQ, messageStore);
+        Assert.assertTrue(QueueTypeUtils.isBatchCq(messageStore.getTopicConfig(topic)));
+
+        // The initial min max offset, before and after the creation of consume queue
         Assert.assertEquals(0, messageStore.getMaxOffsetInQueue(topic, 0));
         Assert.assertEquals(-1, messageStore.getMinOffsetInQueue(topic, 0));
 
@@ -54,34 +209,30 @@ public class BatchConsumeMessageTest extends QueueTestBase {
             if (i == 7)
                 timeMid = System.currentTimeMillis();
         }
+
+        await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
+
         Assert.assertEquals(80, messageStore.getOffsetInQueueByTime(topic, 0, timeMid));
+        Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
+        Assert.assertEquals(190, messageStore.getMaxOffsetInQueue(topic, 0));
+
         Thread.sleep(5 * 1000);
         int maxBatchDeleteFilesNum = messageStore.getMessageStoreConfig().getMaxBatchDeleteFilesNum();
         messageStore.getCommitLog().deleteExpiredFile(1L, 100, 12000, true, maxBatchDeleteFilesNum);
+        Assert.assertEquals(80, messageStore.getOffsetInQueueByTime(topic, 0, timeMid));
         Thread.sleep(70 * 1000);
         Assert.assertEquals(180, messageStore.getOffsetInQueueByTime(topic, 0, timeMid));
-
     }
 
     @Test
     public void testDispatchNormalConsumeQueue() throws Exception {
-        MessageStore messageStore = createMessageStore(null, true, CQType.SimpleCQ);
-        messageStore.load();
-        messageStore.start();
         String topic = "TestDispatchBuildConsumeQueue";
-        int batchNum = 10;
+        createTopic(topic, CQType.SimpleCQ, messageStore);
+
         long timeStart = System.currentTimeMillis();
         long timeMid = -1;
-        for (int i = 0; i < 100; i++) {
-//            MessageExtBrokerInner messageExtBrokerInner = buildMessage(topic, batchNum);
-//            messageExtBrokerInner.setSysFlag(0);
-//            PutMessageResult putMessageResult = messageStore.putMessage(messageExtBrokerInner);
-//            Assert.assertEquals(PutMessageStatus.MESSAGE_ILLEGAL, putMessageResult.getPutMessageStatus());
-//
-//            messageExtBrokerInner = buildMessage(topic, 1);
-//            putMessageResult = messageStore.putMessage(messageExtBrokerInner);
-//            Assert.assertEquals(PutMessageStatus.MESSAGE_ILLEGAL, putMessageResult.getPutMessageStatus());
 
+        for (int i = 0; i < 100; i++) {
             MessageExtBrokerInner messageExtBrokerInner = buildMessage(topic, -1);
             PutMessageResult putMessageResult = messageStore.putMessage(messageExtBrokerInner);
             Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
@@ -90,7 +241,9 @@ public class BatchConsumeMessageTest extends QueueTestBase {
             if (i == 49)
                 timeMid = System.currentTimeMillis();
         }
-        Thread.sleep(500);
+
+        await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
+
         ConsumeQueueInterface consumeQueue = messageStore.getConsumeQueue(topic, 0);
         Assert.assertEquals(CQType.SimpleCQ, consumeQueue.getCQType());
         //check the consume queue
@@ -106,43 +259,41 @@ public class BatchConsumeMessageTest extends QueueTestBase {
         for (int i = -100; i < 100; i += 20) {
             Assert.assertEquals(consumeQueue.getOffsetInQueueByTime(timeMid + i), messageStore.getOffsetInQueueByTime(topic, 0, timeMid + i));
         }
+
         //check the message time
+        long latencyAllowed = 20;
         long earlistMessageTime = messageStore.getEarliestMessageTime(topic, 0);
-        Assert.assertTrue(earlistMessageTime > timeStart - 10);
-        Assert.assertTrue(earlistMessageTime < timeStart + 10);
+        Assert.assertTrue(earlistMessageTime > timeStart - latencyAllowed);
+        Assert.assertTrue(earlistMessageTime < timeStart + latencyAllowed);
         long messageStoreTime = messageStore.getMessageStoreTimeStamp(topic, 0, 50);
-        Assert.assertTrue(messageStoreTime > timeMid - 10);
-        Assert.assertTrue(messageStoreTime < timeMid + 10);
+        Assert.assertTrue(messageStoreTime > timeMid - latencyAllowed);
+        Assert.assertTrue(messageStoreTime < timeMid + latencyAllowed);
         long commitLogOffset = messageStore.getCommitLogOffsetInQueue(topic, 0, 50);
         Assert.assertTrue(commitLogOffset >= messageStore.getMinPhyOffset());
         Assert.assertTrue(commitLogOffset <= messageStore.getMaxPhyOffset());
 
         Assert.assertFalse(messageStore.checkInDiskByConsumeOffset(topic, 0, 50));
-        messageStore.shutdown();
-        messageStore.destroy();
     }
 
     @Test
     public void testDispatchBuildBatchConsumeQueue() throws Exception {
-        MessageStore messageStore = createMessageStore(null, true, CQType.BatchCQ);
-        messageStore.load();
-        messageStore.start();
         String topic = "testDispatchBuildBatchConsumeQueue";
         int batchNum = 10;
         long timeStart = System.currentTimeMillis();
         long timeMid = -1;
+
+        createTopic(topic, CQType.BatchCQ, messageStore);
+
         for (int i = 0; i < 100; i++) {
             PutMessageResult putMessageResult = messageStore.putMessage(buildMessage(topic, batchNum));
             Assert.assertEquals(PutMessageStatus.PUT_OK, putMessageResult.getPutMessageStatus());
             Thread.sleep(2);
             if (i == 29)
                 timeMid = System.currentTimeMillis();
-
-            MessageExtBrokerInner messageExtBrokerInner = buildMessage(topic, 1);
-            putMessageResult = messageStore.putMessage(messageExtBrokerInner);
-            Assert.assertEquals(PutMessageStatus.MESSAGE_ILLEGAL, putMessageResult.getPutMessageStatus());
         }
-        Thread.sleep(500);
+
+        await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
+
         ConsumeQueueInterface consumeQueue = messageStore.getConsumeQueue(topic, 0);
         Assert.assertEquals(CQType.BatchCQ, consumeQueue.getCQType());
 
@@ -176,21 +327,18 @@ public class BatchConsumeMessageTest extends QueueTestBase {
         for (int i = 0; i < 10; i++) {
             SelectMappedBufferResult sbr = getMessageResult.getMessageMapedList().get(i);
             MessageExt messageExt = MessageDecoder.decode(sbr.getByteBuffer());
-            short tmpBatchNum = Short.valueOf(messageExt.getProperty(MessageConst.PROPERTY_INNER_NUM));
+            short tmpBatchNum = Short.parseShort(messageExt.getProperty(MessageConst.PROPERTY_INNER_NUM));
             Assert.assertEquals(i * batchNum, Long.parseLong(messageExt.getProperty(MessageConst.PROPERTY_INNER_BASE)));
             Assert.assertEquals(batchNum, tmpBatchNum);
         }
-
-        messageStore.destroy();
-        messageStore.shutdown();
     }
 
     @Test
-    public void testGetBatchMessageWithinNumber() throws Exception {
-        MessageStore messageStore = createMessageStore(null, true, CQType.BatchCQ);
-        messageStore.load();
-        messageStore.start();
+    public void testGetBatchMessageWithinNumber() {
         String topic = UUID.randomUUID().toString();
+
+        createTopic(topic, CQType.BatchCQ, messageStore);
+
         int batchNum = 20;
         for (int i = 0; i < 200; i++) {
             PutMessageResult putMessageResult = messageStore.putMessage(buildMessage(topic, batchNum));
@@ -198,7 +346,9 @@ public class BatchConsumeMessageTest extends QueueTestBase {
             Assert.assertEquals(i * batchNum, putMessageResult.getAppendMessageResult().getLogicsOffset());
             Assert.assertEquals(batchNum, putMessageResult.getAppendMessageResult().getMsgNum());
         }
-        Thread.sleep(500);
+
+        await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
+
         ConsumeQueueInterface consumeQueue = messageStore.getConsumeQueue(topic, 0);
         Assert.assertEquals(CQType.BatchCQ, consumeQueue.getCQType());
         Assert.assertEquals(0, consumeQueue.getMinOffsetInQueue());
@@ -212,7 +362,7 @@ public class BatchConsumeMessageTest extends QueueTestBase {
             Assert.assertEquals(batchNum, getMessageResult.getMessageCount());
             SelectMappedBufferResult sbr = getMessageResult.getMessageMapedList().get(0);
             MessageExt messageExt = MessageDecoder.decode(sbr.getByteBuffer());
-            short tmpBatchNum = Short.valueOf(messageExt.getProperty(MessageConst.PROPERTY_INNER_NUM));
+            short tmpBatchNum = Short.parseShort(messageExt.getProperty(MessageConst.PROPERTY_INNER_NUM));
             Assert.assertEquals(0, messageExt.getQueueOffset());
             Assert.assertEquals(batchNum, tmpBatchNum);
         }
@@ -236,22 +386,19 @@ public class BatchConsumeMessageTest extends QueueTestBase {
                 SelectMappedBufferResult sbr = getMessageResult.getMessageMapedList().get(i);
                 MessageExt messageExt = MessageDecoder.decode(sbr.getByteBuffer());
                 Assert.assertNotNull(messageExt);
-                short innerBatchNum = Short.valueOf(messageExt.getProperty(MessageConst.PROPERTY_INNER_NUM));
+                short innerBatchNum = Short.parseShort(messageExt.getProperty(MessageConst.PROPERTY_INNER_NUM));
                 Assert.assertEquals(i * batchNum, Long.parseLong(messageExt.getProperty(MessageConst.PROPERTY_INNER_BASE)));
                 Assert.assertEquals(batchNum, innerBatchNum);
 
             }
         }
-        messageStore.destroy();
-        messageStore.shutdown();
     }
 
     @Test
-    public void testGetBatchMessageWithinSize() throws Exception {
-        MessageStore messageStore = createMessageStore(null, true, CQType.BatchCQ);
-        messageStore.load();
-        messageStore.start();
+    public void testGetBatchMessageWithinSize() {
         String topic = UUID.randomUUID().toString();
+        createTopic(topic, CQType.BatchCQ, messageStore);
+
         int batchNum = 10;
         for (int i = 0; i < 100; i++) {
             PutMessageResult putMessageResult = messageStore.putMessage(buildMessage(topic, batchNum));
@@ -259,7 +406,9 @@ public class BatchConsumeMessageTest extends QueueTestBase {
             Assert.assertEquals(i * 10, putMessageResult.getAppendMessageResult().getLogicsOffset());
             Assert.assertEquals(batchNum, putMessageResult.getAppendMessageResult().getMsgNum());
         }
-        Thread.sleep(500);
+
+        await().atMost(5, SECONDS).until(fullyDispatched(messageStore));
+
         ConsumeQueueInterface consumeQueue = messageStore.getConsumeQueue(topic, 0);
         Assert.assertEquals(CQType.BatchCQ, consumeQueue.getCQType());
         Assert.assertEquals(0, consumeQueue.getMinOffsetInQueue());
@@ -272,7 +421,7 @@ public class BatchConsumeMessageTest extends QueueTestBase {
             Assert.assertEquals(1, getMessageResult.getMessageMapedList().size());
             SelectMappedBufferResult sbr = getMessageResult.getMessageMapedList().get(0);
             MessageExt messageExt = MessageDecoder.decode(sbr.getByteBuffer());
-            short tmpBatchNum = Short.valueOf(messageExt.getProperty(MessageConst.PROPERTY_INNER_NUM));
+            short tmpBatchNum = Short.parseShort(messageExt.getProperty(MessageConst.PROPERTY_INNER_NUM));
             Assert.assertEquals(0, messageExt.getQueueOffset());
             Assert.assertEquals(batchNum, tmpBatchNum);
         }
@@ -293,14 +442,29 @@ public class BatchConsumeMessageTest extends QueueTestBase {
                 Assert.assertFalse(getMessageResult.getMessageMapedList().get(i).hasReleased());
                 SelectMappedBufferResult sbr = getMessageResult.getMessageMapedList().get(i);
                 MessageExt messageExt = MessageDecoder.decode(sbr.getByteBuffer());
-                short tmpBatchNum = Short.valueOf(messageExt.getProperty(MessageConst.PROPERTY_INNER_NUM));
+                short tmpBatchNum = Short.parseShort(messageExt.getProperty(MessageConst.PROPERTY_INNER_NUM));
                 Assert.assertEquals(i * batchNum, Long.parseLong(messageExt.getProperty(MessageConst.PROPERTY_INNER_BASE)));
                 Assert.assertEquals(batchNum, tmpBatchNum);
 
             }
         }
-        messageStore.destroy();
-        messageStore.shutdown();
+    }
+
+    private void createTopic(String topic, CQType cqType, MessageStore messageStore) {
+        ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
+        TopicConfig topicConfigToBeAdded = new TopicConfig();
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put(TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), cqType.toString());
+        topicConfigToBeAdded.setTopicName(topic);
+        topicConfigToBeAdded.setAttributes(attributes);
+
+        topicConfigTable.put(topic, topicConfigToBeAdded);
+        ((DefaultMessageStore)messageStore).setTopicConfigTable(topicConfigTable);
+    }
+
+    private Callable<Boolean> fullyDispatched(MessageStore messageStore) {
+        return () -> messageStore.dispatchBehindBytes() == 0;
     }
 
 }
diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java
index 8395d7d..1d9addc 100644
--- a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java
@@ -19,8 +19,8 @@ package org.apache.rocketmq.store.queue;
 
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.store.ConsumeQueue;
-import org.apache.rocketmq.store.StreamMessageStore;
-import org.apache.rocketmq.store.MessageArrivingListener;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.store.StoreTestBase;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
@@ -32,7 +32,6 @@ import org.junit.Test;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 
 import static java.lang.String.format;
@@ -45,13 +44,13 @@ public class BatchConsumeQueueTest extends StoreTestBase {
             path = createBaseDir();
         }
         baseDirs.add(path);
-        StreamMessageStore bcqMessageStore = null;
+        MessageStore messageStore = null;
         try {
-            bcqMessageStore = createBcqMessageStore(null);
+            messageStore = createMessageStore(null);
         } catch (Exception e) {
             Assert.fail();
         }
-        BatchConsumeQueue batchConsumeQueue = new BatchConsumeQueue("topic", 0, path,fileSize, bcqMessageStore);
+        BatchConsumeQueue batchConsumeQueue = new BatchConsumeQueue("topic", 0, path,fileSize, messageStore);
         batchConsumeQueues.add(batchConsumeQueue);
         return batchConsumeQueue;
     }
@@ -280,7 +279,7 @@ public class BatchConsumeQueueTest extends StoreTestBase {
         }
     }
 
-    protected StreamMessageStore createBcqMessageStore(String baseDir) throws Exception {
+    protected MessageStore createMessageStore(String baseDir) throws Exception {
         if (baseDir == null) {
             baseDir = createBaseDir();
         }
@@ -299,17 +298,13 @@ public class BatchConsumeQueueTest extends StoreTestBase {
         messageStoreConfig.setMaxTransferBytesOnMessageInMemory(1024 * 1024);
         messageStoreConfig.setMaxTransferCountOnMessageInDisk(1024);
         messageStoreConfig.setMaxTransferCountOnMessageInMemory(1024);
-        messageStoreConfig.setDefaultCQType(CQType.BatchCQ.name());
         messageStoreConfig.setSearchBcqByCacheEnable(true);
 
-        StreamMessageStore defaultMessageStore =  new StreamMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), new MessageArrivingListener() {
-            @Override
-            public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
-                                 byte[] filterBitMap, Map<String, String> properties) {
-
-            }
-        }, new BrokerConfig());
-        return defaultMessageStore;
+        return new DefaultMessageStore(
+                messageStoreConfig,
+                new BrokerStatsManager("simpleTest"),
+                (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> {},
+                new BrokerConfig());
     }
 
 }
diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
index 4345f0e..f274201 100644
--- a/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/queue/ConsumeQueueTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.store.queue;
 
+import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.store.ConsumeQueueExt;
 import org.apache.rocketmq.store.DispatchRequest;
 import org.apache.rocketmq.store.MessageStore;
@@ -30,7 +31,7 @@ public class ConsumeQueueTest extends QueueTestBase {
     public void testIterator() throws Exception {
         final int msgNum = 100;
         final int msgSize = 1000;
-        MessageStore messageStore =  createMessageStore(null, true, CQType.SimpleCQ);
+        MessageStore messageStore =  createMessageStore(null, true);
         messageStore.load();
         String topic = UUID.randomUUID().toString();
         //The initial min max offset, before and after the creation of consume queue
diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java b/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java
index a8e9f94..c16d7cb 100644
--- a/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java
+++ b/store/src/test/java/org/apache/rocketmq/store/queue/QueueTestBase.java
@@ -23,7 +23,6 @@ import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.store.ConsumeQueue;
 import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.StreamMessageStore;
 import org.apache.rocketmq.store.MessageArrivingListener;
 import org.apache.rocketmq.store.MessageExtBrokerInner;
 import org.apache.rocketmq.store.MessageStore;
@@ -33,11 +32,10 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
 import java.io.File;
 import java.util.Map;
-import java.util.Objects;
 
 public class QueueTestBase extends StoreTestBase {
 
-    protected MessageStore createMessageStore(String baseDir, boolean extent, CQType cqType) throws Exception {
+    protected MessageStore createMessageStore(String baseDir, boolean extent) throws Exception {
         if (baseDir == null) {
             baseDir = createBaseDir();
         }
@@ -56,28 +54,18 @@ public class QueueTestBase extends StoreTestBase {
         messageStoreConfig.setMaxTransferBytesOnMessageInMemory(1024 * 1024);
         messageStoreConfig.setMaxTransferCountOnMessageInDisk(1024);
         messageStoreConfig.setMaxTransferCountOnMessageInMemory(1024);
-        messageStoreConfig.setDefaultCQType(cqType.name());
 
         messageStoreConfig.setFlushIntervalCommitLog(1);
         messageStoreConfig.setFlushCommitLogThoroughInterval(2);
 
-        if (Objects.equals(CQType.BatchCQ, cqType)) {
-            return new StreamMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), new MessageArrivingListener() {
-                @Override
-                public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
-                                     byte[] filterBitMap, Map<String, String> properties) {
+        DefaultMessageStore messageStore = new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), new MessageArrivingListener() {
+            @Override
+            public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
+                                 byte[] filterBitMap, Map<String, String> properties) {
 
-                }
-            }, new BrokerConfig());
-        } else {
-            return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), new MessageArrivingListener() {
-                @Override
-                public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
-                                     byte[] filterBitMap, Map<String, String> properties) {
-
-                }
-            }, new BrokerConfig());
-        }
+            }
+        }, new BrokerConfig());
+        return messageStore;
     }
 
     public MessageExtBrokerInner buildMessage(String topic, int batchNum) {
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
index 8287d81..20149d4 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
@@ -50,20 +50,20 @@ public class MQAdminTestUtils {
     private static Logger log = Logger.getLogger(MQAdminTestUtils.class);
 
     public static boolean createTopic(String nameSrvAddr, String clusterName, String topic,
-                                      int queueNum) {
+                                      int queueNum, Map<String, String> attributes) {
         int defaultWaitTime = 5;
-        return createTopic(nameSrvAddr, clusterName, topic, queueNum, defaultWaitTime);
+        return createTopic(nameSrvAddr, clusterName, topic, queueNum, attributes, defaultWaitTime);
     }
 
     public static boolean createTopic(String nameSrvAddr, String clusterName, String topic,
-                                      int queueNum, int waitTimeSec) {
+                                      int queueNum, Map<String, String> attributes, int waitTimeSec) {
         boolean createResult = false;
         DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
         mqAdminExt.setInstanceName(UUID.randomUUID().toString());
         mqAdminExt.setNamesrvAddr(nameSrvAddr);
         try {
             mqAdminExt.start();
-            mqAdminExt.createTopic(clusterName, topic, queueNum);
+            mqAdminExt.createTopic(clusterName, topic, queueNum, attributes);
         } catch (Exception e) {
         }
 
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index a4e07fb..32af3bd 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -37,7 +37,7 @@ import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.namesrv.NamesrvController;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.store.queue.CQType;
+import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
 import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
 import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
@@ -61,18 +61,14 @@ public class BaseConf {
     //the logic queue test need at least three brokers
     protected final static String broker3Name;
     protected final static String clusterName;
-    protected final static String steamClusterName = "steam-cluster";
     protected final static int brokerNum;
     protected final static int waitTime = 5;
     protected final static int consumeTime = 2 * 60 * 1000;
     protected final static int QUEUE_NUMBERS = 8;
     protected final static NamesrvController namesrvController;
-    protected static BrokerController brokerController1;
-    protected static BrokerController brokerController2;
-    protected static BrokerController brokerController3;
-    protected static BrokerController streamBrokerController1;
-    protected static BrokerController streamBrokerController2;
-    protected static BrokerController streamBrokerController3;
+    protected final static BrokerController brokerController1;
+    protected final static BrokerController brokerController2;
+    protected final static BrokerController brokerController3;
     protected final static List<BrokerController> brokerControllerList;
     protected final static Map<String, BrokerController> brokerControllerMap;
     protected final static List<Object> mqClients = new ArrayList<Object>();
@@ -80,7 +76,7 @@ public class BaseConf {
     private final static Logger log = Logger.getLogger(BaseConf.class);
 
     static {
-    	System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
+        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
         namesrvController = IntegrationTestBase.createAndStartNamesrv();
         nsAddr = "127.0.0.1:" + namesrvController.getNettyServerConfig().getListenPort();
         brokerController1 = IntegrationTestBase.createAndStartBroker(nsAddr);
@@ -91,9 +87,6 @@ public class BaseConf {
         broker2Name = brokerController2.getBrokerConfig().getBrokerName();
         broker3Name = brokerController3.getBrokerConfig().getBrokerName();
         brokerNum = 3;
-        streamBrokerController1 = IntegrationTestBase.createAndStartBroker(nsAddr, CQType.BatchCQ.toString(), steamClusterName);
-        streamBrokerController2 = IntegrationTestBase.createAndStartBroker(nsAddr, CQType.BatchCQ.toString(), steamClusterName);
-        streamBrokerController3 = IntegrationTestBase.createAndStartBroker(nsAddr, CQType.BatchCQ.toString(), steamClusterName);
         brokerControllerList = ImmutableList.of(brokerController1, brokerController2, brokerController3);
         brokerControllerMap = brokerControllerList.stream().collect(Collectors.toMap(input -> input.getBrokerConfig().getBrokerName(), Function.identity()));
     }
@@ -124,7 +117,7 @@ public class BaseConf {
 
     public static String initTopic() {
         String topic = "tt-" + MQRandomUtils.getRandomTopic();
-        IntegrationTestBase.initTopic(topic, nsAddr, clusterName);
+        IntegrationTestBase.initTopic(topic, nsAddr, clusterName, CQType.SimpleCQ);
 
         return topic;
     }
@@ -171,9 +164,9 @@ public class BaseConf {
     }
 
     public static RMQNormalProducer getProducer(String nsAddr, String topic, String producerGoup,
-        String instanceName) {
+                                                String instanceName) {
         RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic, producerGoup,
-            instanceName);
+                instanceName);
         if (debug) {
             producer.setDebug();
         }
@@ -191,31 +184,31 @@ public class BaseConf {
     }
 
     public static RMQNormalConsumer getConsumer(String nsAddr, String topic, String subExpression,
-        AbstractListener listener) {
+                                                AbstractListener listener) {
         return getConsumer(nsAddr, topic, subExpression, listener, false);
     }
 
     public static RMQNormalConsumer getConsumer(String nsAddr, String topic, String subExpression,
-        AbstractListener listener, boolean useTLS) {
+                                                AbstractListener listener, boolean useTLS) {
         String consumerGroup = initConsumerGroup();
         return getConsumer(nsAddr, consumerGroup, topic, subExpression, listener, useTLS);
     }
 
     public static RMQNormalConsumer getConsumer(String nsAddr, String consumerGroup, String topic,
-        String subExpression, AbstractListener listener) {
+                                                String subExpression, AbstractListener listener) {
         return getConsumer(nsAddr, consumerGroup, topic, subExpression, listener, false);
     }
 
     public static RMQNormalConsumer getConsumer(String nsAddr, String consumerGroup, String topic,
-        String subExpression, AbstractListener listener, boolean useTLS) {
+                                                String subExpression, AbstractListener listener, boolean useTLS) {
         RMQNormalConsumer consumer = ConsumerFactory.getRMQNormalConsumer(nsAddr, consumerGroup,
-            topic, subExpression, listener, useTLS);
+                topic, subExpression, listener, useTLS);
         if (debug) {
             consumer.setDebug();
         }
         mqClients.add(consumer);
         log.info(String.format("consumer[%s] start,topic[%s],subExpression[%s]", consumerGroup,
-            topic, subExpression));
+                topic, subExpression));
         return consumer;
     }
 
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
index a74fd21..398bd19 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
@@ -20,13 +20,18 @@ package org.apache.rocketmq.test.base;
 import com.google.common.truth.Truth;
 import java.io.File;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.TopicAttributes;
 import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.common.namesrv.NamesrvConfig;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -34,10 +39,8 @@ import org.apache.rocketmq.namesrv.NamesrvController;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.store.queue.CQType;
 import org.apache.rocketmq.test.util.MQAdminTestUtils;
 import org.apache.rocketmq.test.util.TestUtils;
-import org.assertj.core.util.Strings;
 
 public class IntegrationTestBase {
     public static InternalLogger logger = InternalLoggerFactory.getLogger(IntegrationTestBase.class);
@@ -142,34 +145,6 @@ public class IntegrationTestBase {
         storeConfig.setMaxIndexNum(INDEX_NUM);
         storeConfig.setMaxHashSlotNum(INDEX_NUM * 4);
         storeConfig.setDeleteWhen("01;02;03;04;05;06;07;08;09;10;11;12;13;14;15;16;17;18;19;20;21;22;23;00");
-        storeConfig.setDefaultCQType(CQType.SimpleCQ.toString());
-        storeConfig.setMaxTransferCountOnMessageInMemory(1024);
-        storeConfig.setMaxTransferCountOnMessageInDisk(1024);
-        return createAndStartBroker(storeConfig, brokerConfig);
-    }
-
-    public static BrokerController createAndStartBroker(String nsAddr, String cqType, String cluster) {
-        String baseDir = createBaseDir();
-        BrokerConfig brokerConfig = new BrokerConfig();
-        MessageStoreConfig storeConfig = new MessageStoreConfig();
-        brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.incrementAndGet());
-        brokerConfig.setBrokerIP1("127.0.0.1");
-        brokerConfig.setNamesrvAddr(nsAddr);
-        brokerConfig.setEnablePropertyFilter(true);
-        brokerConfig.setLoadBalancePollNameServerInterval(500);
-        brokerConfig.setBrokerClusterName(cluster);
-        storeConfig.setStorePathRootDir(baseDir);
-        storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
-        storeConfig.setMappedFileSizeCommitLog(COMMIT_LOG_SIZE);
-        storeConfig.setMaxIndexNum(INDEX_NUM);
-        storeConfig.setMaxHashSlotNum(INDEX_NUM * 4);
-        storeConfig.setDeleteWhen("01;02;03;04;05;06;07;08;09;10;11;12;13;14;15;16;17;18;19;20;21;22;23;00");
-
-        if (!Strings.isNullOrEmpty(cqType)) {
-            storeConfig.setDefaultCQType(cqType);
-        } else {
-            storeConfig.setDefaultCQType(CQType.SimpleCQ.toString());
-        }
         storeConfig.setMaxTransferCountOnMessageInMemory(1024);
         storeConfig.setMaxTransferCountOnMessageInDisk(1024);
         return createAndStartBroker(storeConfig, brokerConfig);
@@ -193,17 +168,21 @@ public class IntegrationTestBase {
         return brokerController;
     }
 
-    public static boolean initTopic(String topic, String nsAddr, String clusterName, int queueNumbers) {
+    public static boolean initTopic(String topic, String nsAddr, String clusterName, int queueNumbers, CQType cqType) {
         long startTime = System.currentTimeMillis();
         boolean createResult;
 
         while (true) {
-            createResult = MQAdminTestUtils.createTopic(nsAddr, clusterName, topic, queueNumbers);
+            Map<String, String> attributes = new HashMap<>();
+            if (!Objects.equals(CQType.SimpleCQ, cqType)) {
+                attributes.put("+" + TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName(), cqType.toString());
+            }
+            createResult = MQAdminTestUtils.createTopic(nsAddr, clusterName, topic, queueNumbers, attributes);
             if (createResult) {
                 break;
             } else if (System.currentTimeMillis() - startTime > topicCreateTime) {
                 Truth.assertWithMessage(String.format("topic[%s] is created failed after:%d ms", topic,
-                    System.currentTimeMillis() - startTime)).fail();
+                        System.currentTimeMillis() - startTime)).fail();
                 break;
             } else {
                 TestUtils.waitForMoment(500);
@@ -214,8 +193,8 @@ public class IntegrationTestBase {
         return createResult;
     }
 
-    public static boolean initTopic(String topic, String nsAddr, String clusterName) {
-        return initTopic(topic, nsAddr, clusterName, BaseConf.QUEUE_NUMBERS);
+    public static boolean initTopic(String topic, String nsAddr, String clusterName, CQType cqType) {
+        return initTopic(topic, nsAddr, clusterName, BaseConf.QUEUE_NUMBERS, cqType);
     }
 
     public static void deleteFile(File file) {
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java b/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java
index dd71dd1..0e1bf26 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java
@@ -30,7 +30,7 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.store.queue.CQType;
+import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.test.base.BaseConf;
 import org.apache.rocketmq.test.base.IntegrationTestBase;
 import org.apache.rocketmq.test.factory.ConsumerFactory;
@@ -63,7 +63,6 @@ public class DLedgerProduceAndConsumeIT {
         storeConfig.setdLegerGroup(brokerName);
         storeConfig.setdLegerSelfId(selfId);
         storeConfig.setdLegerPeers(peers);
-        storeConfig.setDefaultCQType(CQType.SimpleCQ.toString());
         return storeConfig;
     }
 
@@ -83,7 +82,7 @@ public class DLedgerProduceAndConsumeIT {
 
         String topic = UUID.randomUUID().toString();
         String consumerGroup = UUID.randomUUID().toString();
-        IntegrationTestBase.initTopic(topic, BaseConf.nsAddr, cluster, 1);
+        IntegrationTestBase.initTopic(topic, BaseConf.nsAddr, cluster, 1, CQType.SimpleCQ);
         DefaultMQProducer producer = ProducerFactory.getRMQProducer(BaseConf.nsAddr);
         DefaultMQPullConsumer consumer = ConsumerFactory.getRMQPullConsumer(BaseConf.nsAddr, consumerGroup);
 
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java
index bf0b9e5..3a649ed 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java
@@ -23,7 +23,6 @@ import java.util.Random;
 import java.util.UUID;
 
 import org.apache.log4j.Logger;
-import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
@@ -32,14 +31,14 @@ import org.apache.rocketmq.client.hook.SendMessageHook;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.TopicAttributes;
+import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageBatch;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.StreamMessageStore;
-import org.apache.rocketmq.store.queue.CQType;
 import org.apache.rocketmq.test.base.BaseConf;
 import org.apache.rocketmq.test.base.IntegrationTestBase;
 import org.apache.rocketmq.test.client.consumer.tag.TagMessageWith1ConsumerIT;
@@ -101,23 +100,23 @@ public class BatchSendIT extends BaseConf {
 
     @Test
     public void testBatchSend_SysInnerBatch() throws Exception {
-        waitBrokerRegistered(nsAddr, steamClusterName, brokerNum);
-
-        Assert.assertTrue(streamBrokerController1.getMessageStore() instanceof StreamMessageStore);
-        Assert.assertTrue(streamBrokerController2.getMessageStore() instanceof StreamMessageStore);
-        Assert.assertTrue(streamBrokerController3.getMessageStore() instanceof StreamMessageStore);
+        waitBrokerRegistered(nsAddr, clusterName, brokerNum);
 
         String batchTopic = UUID.randomUUID().toString();
-        IntegrationTestBase.initTopic(batchTopic, nsAddr, steamClusterName);
-        Assert.assertEquals(8, streamBrokerController1.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums());
-        Assert.assertEquals(8, streamBrokerController2.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums());
-        Assert.assertEquals(8, streamBrokerController3.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums());
-        Assert.assertEquals(-1, streamBrokerController1.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
-        Assert.assertEquals(-1, streamBrokerController2.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
-        Assert.assertEquals(-1, streamBrokerController3.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
-        Assert.assertEquals(0, streamBrokerController1.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));
-        Assert.assertEquals(0, streamBrokerController2.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));
-        Assert.assertEquals(0, streamBrokerController3.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));
+        IntegrationTestBase.initTopic(batchTopic, nsAddr, clusterName, CQType.BatchCQ);
+
+        Assert.assertEquals(CQType.BatchCQ.toString(), brokerController1.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getAttributes().get(TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName()));
+        Assert.assertEquals(CQType.BatchCQ.toString(), brokerController2.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getAttributes().get(TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName()));
+        Assert.assertEquals(CQType.BatchCQ.toString(), brokerController3.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getAttributes().get(TopicAttributes.QUEUE_TYPE_ATTRIBUTE.getName()));
+        Assert.assertEquals(8, brokerController1.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums());
+        Assert.assertEquals(8, brokerController2.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums());
+        Assert.assertEquals(8, brokerController3.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums());
+        Assert.assertEquals(-1, brokerController1.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
+        Assert.assertEquals(-1, brokerController2.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
+        Assert.assertEquals(-1, brokerController3.getMessageStore().getMinOffsetInQueue(batchTopic, 0));
+        Assert.assertEquals(0, brokerController1.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));
+        Assert.assertEquals(0, brokerController2.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));
+        Assert.assertEquals(0, brokerController3.getMessageStore().getMaxOffsetInQueue(batchTopic, 0));
 
         DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);
         MessageQueue messageQueue = producer.fetchPublishMessageQueues(batchTopic).iterator().next();
@@ -168,7 +167,7 @@ public class BatchSendIT extends BaseConf {
         Assert.assertTrue(brokerController3.getMessageStore() instanceof DefaultMessageStore);
 
         String batchTopic = UUID.randomUUID().toString();
-        IntegrationTestBase.initTopic(batchTopic, nsAddr, clusterName);
+        IntegrationTestBase.initTopic(batchTopic, nsAddr, clusterName, CQType.SimpleCQ);
         Assert.assertEquals(8, brokerController1.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums());
         Assert.assertEquals(8, brokerController2.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums());
         Assert.assertEquals(8, brokerController3.getTopicConfigManager().getTopicConfigTable().get(batchTopic).getReadQueueNums());
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 6f551d3..378f7db 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -95,13 +95,13 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
     }
 
     @Override
-    public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
-        createTopic(key, newTopic, queueNum, 0);
+    public void createTopic(String key, String newTopic, int queueNum, Map<String, String> attributes) throws MQClientException {
+        createTopic(key, newTopic, queueNum, 0, attributes);
     }
 
     @Override
-    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
-        defaultMQAdminExtImpl.createTopic(key, newTopic, queueNum, topicSysFlag);
+    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes) throws MQClientException {
+        defaultMQAdminExtImpl.createTopic(key, newTopic, queueNum, topicSysFlag, attributes);
     }
 
     @Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index b7e4816..f9b4822 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -77,11 +77,8 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
-import org.apache.rocketmq.common.statictopic.TopicQueueMappingOne;
-import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -95,9 +92,6 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.tools.admin.api.MessageTrack;
 import org.apache.rocketmq.tools.admin.api.TrackType;
 
-import static org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils.checkAndBuildMappingItems;
-import static org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils.getMappingDetailFromConfig;
-
 public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
 
     private static final Set<String> SYSTEM_GROUP_SET = new HashSet<String>();
@@ -1063,13 +1057,13 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
     }
 
     @Override
-    public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
-        createTopic(key, newTopic, queueNum, 0);
+    public void createTopic(String key, String newTopic, int queueNum, Map<String, String> attributes) throws MQClientException {
+        createTopic(key, newTopic, queueNum, 0, null);
     }
 
     @Override
-    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
-        this.mqClientInstance.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag);
+    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes) throws MQClientException {
+        this.mqClientInstance.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag, attributes);
     }
 
     @Override
@@ -1176,7 +1170,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
 
     @Override
     public void setMessageRequestMode(final String brokerAddr, final String topic, final String consumerGroup, final
-    MessageRequestMode mode, final int popShareQueueNum, final long timeoutMillis)
+        MessageRequestMode mode, final int popShareQueueNum, final long timeoutMillis)
         throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
         RemotingConnectException, MQClientException {
         this.mqClientInstance.getMQClientAPIImpl().setMessageRequestMode(brokerAddr, topic, consumerGroup, mode, popShareQueueNum, timeoutMillis);
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java
index 3caa477..284900b 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java
@@ -23,7 +23,6 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionGroup;
 import org.apache.commons.cli.Options;
-import org.apache.rocketmq.common.TopicAttributes;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.attribute.AttributeParser;
 import org.apache.rocketmq.common.sysflag.TopicSysFlag;