You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/03/04 08:50:01 UTC

[rocketmq] 03/05: Merge 5.0.0-alpha-merge with develop branch

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-alpha
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 0d15e6c2d5b436081c91fa6d8e86e2e2b6be18e9
Merge: de76e06 1393329
Author: RongtongJin <ji...@mails.ucas.ac.cn>
AuthorDate: Fri Feb 25 17:43:06 2022 +0800

    Merge 5.0.0-alpha-merge with develop branch

 .travis.yml                                        |  15 +-
 NOTICE                                             |   2 +-
 README.md                                          |  22 +-
 .../org/apache/rocketmq/acl/AccessValidator.java   |  13 +
 .../org/apache/rocketmq/acl/common/AclUtils.java   |   2 +-
 .../rocketmq/acl/plain/PlainAccessValidator.java   |  19 +-
 .../rocketmq/acl/plain/PlainPermissionManager.java | 427 +++++++++---
 .../acl/plain/RemoteAddressStrategyFactory.java    |   4 +-
 .../apache/rocketmq/acl/common/AclUtilsTest.java   |  13 +-
 .../acl/plain/PlainAccessValidatorTest.java        | 762 ++++++++++++++-------
 .../acl/plain/PlainPermissionManagerTest.java      |  59 +-
 .../plain_acl.yml}                                 |  32 +-
 acl/src/test/resources/conf/plain_acl_null.yml     |  18 -
 broker/pom.xml                                     |   4 +
 .../apache/rocketmq/broker/BrokerController.java   |  40 +-
 .../rocketmq/broker/BrokerPathConfigHelper.java    |   4 -
 .../org/apache/rocketmq/broker/BrokerStartup.java  |  16 +-
 .../broker/filter/ConsumerFilterManager.java       |  17 +-
 .../broker/filter/MessageEvaluationContext.java    |   5 +-
 .../longpolling/LmqPullRequestHoldService.java     |  62 ++
 .../broker/longpolling/ManyPullRequest.java        |   4 +
 .../broker/longpolling/PullRequestHoldService.java |  10 +-
 .../broker/offset/ConsumerOffsetManager.java       |   6 +-
 .../broker/offset/LmqConsumerOffsetManager.java    | 109 +++
 .../broker/plugin/AbstractPluginMessageStore.java  | 549 +++++++--------
 .../broker/processor/AdminBrokerProcessor.java     |   8 +
 .../broker/processor/PullMessageProcessor.java     |   2 +-
 .../broker/processor/SendMessageProcessor.java     |  18 +-
 .../subscription/LmqSubscriptionGroupManager.java  |  46 ++
 .../broker/topic/LmqTopicConfigManager.java        |  49 ++
 .../rocketmq/broker/topic/TopicConfigManager.java  |   6 +-
 .../queue/TransactionalMessageBridge.java          |   8 +-
 .../queue/TransactionalMessageServiceImpl.java     |   4 +-
 .../rocketmq/broker/BrokerControllerTest.java      |  33 +
 .../broker/BrokerPathConfigHelperTest.java         |  42 ++
 .../broker/filter/MessageStoreWithFilterTest.java  |   2 +-
 .../offset/LmqConsumerOffsetManagerTest.java       |  81 +++
 .../broker/processor/AdminBrokerProcessorTest.java | 262 ++++++-
 .../broker/processor/SendMessageProcessorTest.java |   4 +-
 client/pom.xml                                     |  15 -
 .../org/apache/rocketmq/client/ClientConfig.java   |  10 +-
 .../java/org/apache/rocketmq/client/MQHelper.java  |   1 +
 .../org/apache/rocketmq/client/Validators.java     |  52 +-
 .../client/consumer/DefaultMQPullConsumer.java     |   1 +
 .../client/consumer/DefaultMQPushConsumer.java     |   5 +-
 .../rebalance/AllocateMachineRoomNearby.java       |   7 +-
 .../AllocateMessageQueueByMachineRoom.java         |  11 +
 .../consumer/store/LocalFileOffsetStore.java       |  11 +-
 .../consumer/store/RemoteBrokerOffsetStore.java    |   4 +-
 .../client/impl/ClientRemotingProcessor.java       |   6 +-
 .../apache/rocketmq/client/impl/MQAdminImpl.java   |   5 +-
 .../rocketmq/client/impl/MQClientAPIImpl.java      | 135 ++--
 .../ConsumeMessageConcurrentlyService.java         |  14 +-
 .../consumer/ConsumeMessageOrderlyService.java     |  14 +-
 .../impl/consumer/DefaultLitePullConsumerImpl.java |   2 +
 .../client/impl/consumer/ProcessQueue.java         |  12 +-
 .../client/impl/factory/MQClientInstance.java      |  17 +-
 .../impl/producer/DefaultMQProducerImpl.java       |  87 ++-
 .../client/producer/DefaultMQProducer.java         |   1 -
 .../client/producer/LocalTransactionExecuter.java  |   2 +-
 ...stFutureTable.java => RequestFutureHolder.java} |  54 +-
 .../client/producer/TransactionCheckListener.java  |   3 +-
 .../rocketmq/client/trace/TraceDataEncoder.java    |   5 +-
 .../org/apache/rocketmq/client/ValidatorsTest.java |   2 +-
 .../consumer/DefaultLitePullConsumerTest.java      |  18 +-
 .../client/consumer/DefaultMQPushConsumerTest.java |  34 +-
 .../AllocateMessageQueueAveragelyByCircleTest.java |  68 ++
 .../AllocateMessageQueueAveragelyTest.java         |  57 ++
 .../store/RemoteBrokerOffsetStoreTest.java         |   3 +-
 .../rocketmq/client/impl/MQClientAPIImplTest.java  |  36 +-
 .../ConsumeMessageConcurrentlyServiceTest.java     |  57 +-
 .../consumer/ConsumeMessageOrderlyServiceTest.java | 177 +++++
 .../impl/consumer/RebalanceLitePullImplTest.java   | 100 +++
 .../impl/consumer/RebalancePushImplTest.java       |  93 ++-
 .../client/producer/DefaultMQProducerTest.java     |  11 +-
 .../DefaultMQConsumerWithOpenTracingTest.java      |  25 +-
 .../trace/DefaultMQConsumerWithTraceTest.java      |   5 +-
 .../DefaultMQLitePullConsumerWithTraceTest.java    |   1 -
 .../client/trace/TraceDataEncoderTest.java         |  33 +-
 .../trace/TransactionMQProducerWithTraceTest.java  |  36 +-
 .../org/apache/rocketmq/common/BrokerConfig.java   |  41 +-
 .../org/apache/rocketmq/common/Configuration.java  |  19 +-
 .../java/org/apache/rocketmq/common/MixAll.java    |   5 +
 .../java/org/apache/rocketmq/common/UtilAll.java   |  14 +
 .../common/message/MessageClientIDSetter.java      |  27 +-
 .../rocketmq/common/message/MessageConst.java      |   5 +-
 .../rocketmq/common/message/MessageDecoder.java    |   5 +-
 .../apache/rocketmq/common/message/MessageExt.java |   4 +
 .../protocol/body/ClusterAclVersionInfo.java       |  15 +-
 .../common/protocol/body/ConsumerRunningInfo.java  |   8 +-
 .../header/GetBrokerAclConfigResponseHeader.java   |  10 +
 ...va => DeleteTopicFromNamesrvRequestHeader.java} |   2 +-
 .../rocketmq/common/sysflag/TopicSysFlag.java      |   3 -
 .../rocketmq/common/topic/TopicValidator.java      |  47 +-
 .../org/apache/rocketmq/common/MixAllTest.java     |  12 +
 .../common/message/MessageClientIDSetterTest.java  |  22 +
 distribution/NOTICE-BIN                            |   2 +-
 distribution/benchmark/runclass.sh                 |   3 +-
 distribution/bin/runbroker.cmd                     |   3 +-
 distribution/bin/runbroker.sh                      |  25 +-
 distribution/bin/runserver.cmd                     |   3 +-
 distribution/bin/runserver.sh                      |   3 +-
 distribution/bin/tools.cmd                         |   3 +-
 distribution/bin/tools.sh                          |   3 +-
 distribution/conf/{ => acl}/plain_acl.yml          |  44 +-
 distribution/conf/logback_broker.xml               |  48 +-
 docs/cn/Example_LMQ.md                             |  85 +++
 ...Multiple_ACL_Files_\350\256\276\350\256\241.md" | 137 ++++
 docs/cn/architecture.md                            |   6 +-
 docs/cn/best_practice.md                           |   6 +-
 .../java/API_Reference_ DefaultPullConsumer.md     | 143 ++++
 .../client/java/API_Reference_DefaultMQProducer.md |  26 +-
 docs/cn/design.md                                  |   2 +-
 docs/cn/image/LMQ_1.png                            | Bin 0 -> 304040 bytes
 docs/en/CLITools.md                                |   6 +-
 docs/en/Configuration_Client.md                    |  16 +-
 docs/en/Design_Query.md                            |   6 +-
 docs/en/Example_Transaction.md                     |  10 +-
 docs/en/best_practice.md                           |  79 +++
 example/pom.xml                                    |   4 -
 .../rocketmq/example/benchmark/AclClient.java      |  12 +-
 .../rocketmq/example/benchmark/Consumer.java       |  13 +-
 .../rocketmq/example/benchmark/Producer.java       |  15 +-
 .../example/benchmark/TransactionProducer.java     |  17 +-
 .../rocketmq/example/ordermessage/Consumer.java    |   4 -
 .../rocketmq/example/quickstart/Consumer.java      |   4 +-
 .../example/schedule/ScheduledMessageConsumer.java |  51 ++
 .../example/schedule/ScheduledMessageProducer.java |  41 ++
 .../rocketmq/example/simple/OnewayProducer.java    |  45 ++
 .../rocketmq/example/simple/PullConsumer.java      |   2 +-
 example/src/main/resources/MessageFilterImpl.java  |  39 --
 logappender/pom.xml                                |  80 ---
 .../logappender/common/ProducerInstance.java       |  97 ---
 .../logappender/log4j/RocketmqLog4jAppender.java   | 189 -----
 .../logappender/log4j2/RocketmqLog4j2Appender.java | 226 ------
 .../logback/RocketmqLogbackAppender.java           | 179 -----
 .../rocketmq/logappender/AbstractTestCase.java     |  72 --
 .../rocketmq/logappender/Log4jPropertiesTest.java  |  32 -
 .../apache/rocketmq/logappender/Log4jXmlTest.java  |  32 -
 .../apache/rocketmq/logappender/LogbackTest.java   |  52 --
 .../apache/rocketmq/logappender/log4j2Test.java    |  44 --
 .../src/test/resources/log4j-example.properties    |  33 -
 logappender/src/test/resources/log4j-example.xml   |  56 --
 logappender/src/test/resources/log4j2-example.xml  |  41 --
 logappender/src/test/resources/logback-example.xml |  81 ---
 .../rocketmq/logging/inner/LoggingBuilder.java     |   2 +-
 .../rocketmq/logging/inner/LoggingEvent.java       |   3 +
 namesrv/pom.xml                                    |   4 +
 .../apache/rocketmq/namesrv/NamesrvStartup.java    |   2 +-
 .../namesrv/processor/DefaultRequestProcessor.java |   6 +-
 .../processor/DefaultRequestProcessorTest.java     | 149 +++-
 .../io/openmessaging/rocketmq/utils/BeanUtils.java |   9 +-
 pom.xml                                            |  23 +-
 .../rocketmq/remoting/common/RemotingHelper.java   |  36 +-
 .../rocketmq/remoting/common/RemotingUtil.java     |   9 +-
 .../rocketmq/remoting/netty/NettyClientConfig.java |  19 +
 .../remoting/netty/NettyRemotingAbstract.java      |   5 +-
 .../remoting/netty/NettyRemotingClient.java        |  21 +-
 .../remoting/netty/NettyRemotingServer.java        |  19 +-
 .../rocketmq/remoting/netty/NettyServerConfig.java |  27 +
 .../rocketmq/remoting/netty/NettySystemConfig.java |  17 +-
 .../remoting/protocol/RemotingCommand.java         |  12 +-
 .../remoting/protocol/RocketMQSerializable.java    |   9 +-
 .../remoting/netty/NettyServerConfigTest.java      |  35 +-
 .../remoting/protocol/RemotingCommandTest.java     |  59 +-
 .../protocol/RocketMQSerializableTest.java         |  74 +-
 .../rocketmq/srvutil/AclFileWatchService.java      | 162 +++++
 .../org/apache/rocketmq/srvutil/ServerUtil.java    |   2 +-
 store/pom.xml                                      |   2 +-
 .../java/org/apache/rocketmq/store/CommitLog.java  | 197 ++++--
 .../apache/rocketmq/store/CommitLogDispatcher.java |   4 +
 .../org/apache/rocketmq/store/ConsumeQueue.java    |  56 +-
 .../apache/rocketmq/store/DefaultMessageStore.java | 121 +++-
 .../apache/rocketmq/store/FlushDiskWatcher.java    |  78 +++
 .../org/apache/rocketmq/store/MappedFileQueue.java |   4 +-
 .../rocketmq/store/MessageArrivingListener.java    |  11 +
 .../org/apache/rocketmq/store/MessageStore.java    |  10 +
 .../org/apache/rocketmq/store/MultiDispatch.java   | 157 +++++
 .../apache/rocketmq/store/PutMessageStatus.java    |   1 +
 .../apache/rocketmq/store/StoreStatsService.java   |  70 +-
 .../rocketmq/store/config/MessageStoreConfig.java  |  65 ++
 .../rocketmq/store/dledger/DLedgerCommitLog.java   |   4 +-
 .../org/apache/rocketmq/store/ha/HAConnection.java |  15 +-
 .../org/apache/rocketmq/store/ha/HAService.java    |   9 +-
 .../rocketmq/store/logfile/DefaultMappedFile.java  |  12 +-
 .../store/schedule/ScheduleMessageService.java     | 587 ++++++++++++----
 .../rocketmq/store/stats/BrokerStatsManager.java   | 109 ++-
 .../store/stats/LmqBrokerStatsManager.java         | 120 ++++
 .../apache/rocketmq/store/BatchPutMessageTest.java |   4 +-
 .../apache/rocketmq/store/ConsumeQueueTest.java    | 141 +++-
 .../store/DefaultMessageStoreCleanFilesTest.java   |   2 +-
 .../store/DefaultMessageStoreShutDownTest.java     |   3 +-
 .../rocketmq/store/DefaultMessageStoreTest.java    |  18 +-
 .../rocketmq/store/FlushDiskWatcherTest.java       |  84 +++
 .../java/org/apache/rocketmq/store/HATest.java     |   2 +-
 .../apache/rocketmq/store/MultiDispatchTest.java   |  98 +++
 .../rocketmq/store/ScheduleMessageServiceTest.java | 118 +++-
 .../store/dledger/MessageStoreTestBase.java        |   4 +-
 .../store/schedule/ScheduleMessageServiceTest.java |   2 +-
 .../test/java/stats/BrokerStatsManagerTest.java    |   2 +-
 test/pom.xml                                       |  21 +
 .../rocketmq/test/lmq/benchmark/BenchLmqStore.java | 305 +++++++++
 .../rocketmq/test/message/MessageQueueMsg.java     |   8 +-
 .../org/apache/rocketmq/test/util/FileUtil.java    |   7 +-
 .../org/apache/rocketmq/test/util/MQAdmin.java     | 166 +++++
 .../org/apache/rocketmq/test/util/StatUtil.java    | 478 +++++++++++++
 .../org/apache/rocketmq/test/base/BaseConf.java    |   9 +-
 .../producer/exception/msg/MessageExceptionIT.java |   2 +-
 .../client/producer/querymsg/QueryMsgByKeyIT.java  |  57 ++
 .../rocketmq/test/delay/NormalMsgDelayIT.java      |   2 +-
 .../rocketmq/test/lmq/TestBenchLmqStore.java       | 100 +++
 tools/pom.xml                                      |   4 -
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |   2 +-
 .../tools/admin/DefaultMQAdminExtImpl.java         |   4 +-
 .../apache/rocketmq/tools/command/CommandUtil.java |   7 +-
 .../rocketmq/tools/command/MQAdminStartup.java     |   2 +
 .../acl/ClusterAclConfigVersionListSubCommand.java |  32 +-
 .../broker/BrokerConsumeStatsSubCommad.java        |   6 +-
 .../command/broker/CleanUnusedTopicCommand.java    |   2 +-
 .../command/broker/GetBrokerConfigCommand.java     |  22 +-
 .../connection/ConsumerConnectionSubCommand.java   |  14 +-
 .../consumer/ConsumerProgressSubCommand.java       |  18 +-
 .../consumer/GetConsumerConfigSubCommand.java      |  10 +-
 .../tools/command/export/ExportConfigsCommand.java |   7 +-
 .../command/export/ExportMetadataCommand.java      |  73 +-
 .../command/namesrv/GetNamesrvConfigCommand.java   |  10 +-
 .../command/offset/CloneGroupOffsetCommand.java    |   2 +-
 .../command/offset/ResetOffsetByTimeCommand.java   |   2 +-
 ...ommand.java => SkipAccumulationSubCommand.java} |  61 +-
 .../command/queue/QueryConsumeQueueCommand.java    |   6 +-
 .../tools/command/stats/StatsAllSubCommand.java    |  12 +-
 .../tools/command/topic/TopicListSubCommand.java   |   6 +-
 .../command/broker/BrokerStatusSubCommandTest.java |  68 +-
 .../broker/CleanExpiredCQSubCommandTest.java       |  56 +-
 .../broker/CleanUnusedTopicCommandTest.java        |  56 +-
 .../command/broker/GetBrokerConfigCommandTest.java |  73 +-
 .../broker/UpdateBrokerConfigSubCommandTest.java   |  52 +-
 .../ConsumerConnectionSubCommandTest.java          |  87 +--
 .../ProducerConnectionSubCommandTest.java          |  80 +--
 .../consumer/ConsumerProgressSubCommandTest.java   | 103 +--
 .../consumer/ConsumerStatusSubCommandTest.java     | 113 +--
 .../consumer/GetConsumerConfigSubCommandTest.java  | 104 +--
 .../command/message/ConsumeMessageCommandTest.java |  39 +-
 .../message/QueryMsgByUniqueKeySubCommandTest.java |   6 +-
 .../message/QueryMsgTraceByIdSubCommandTest.java   | 126 ++--
 .../namesrv/AddWritePermSubCommandTest.java        |  38 +
 .../namesrv/GetNamesrvConfigCommandTest.java       |  77 +--
 .../command/namesrv/UpdateKvConfigCommandTest.java |  56 +-
 .../namesrv/WipeWritePermSubCommandTest.java       |  82 +--
 .../offset/GetConsumerStatusCommandTest.java       |  70 +-
 .../offset/ResetOffsetByTimeCommandTest.java       |  88 +--
 .../SkipAccumulationCommandTest.java}              |  16 +-
 .../tools/command/server/NameServerMocker.java     |  67 ++
 .../tools/command/server/ServerResponseMocker.java | 153 +++++
 254 files changed, 8322 insertions(+), 3927 deletions(-)

diff --cc .travis.yml
index 88058f3,5f08c85..837ae1f
--- a/.travis.yml
+++ b/.travis.yml
@@@ -4,9 -4,11 +4,11 @@@ notifications
    email:
      recipients:
        - dev@rocketmq.apache.org
+     if: branch = develop OR branch = master
    on_success: change
    on_failure: always
 -  
 +
+ 
  language: java
  
  matrix:
@@@ -43,11 -48,11 +48,11 @@@ before_script
    - ulimit -c unlimited
  
  script:
-   - mvn verify
 -  - mvn -B verify -DskipTests
++  - mvn verify -DskipTests
    - travis_retry mvn -B clean apache-rat:check
-   - travis_retry mvn -B package jacoco:report coveralls:report
+   - travis_retry mvn -B install jacoco:report coveralls:report
+   - travis_retry mvn -B clean install -pl test -Pit-test
  
  after_success:
-   - mvn clean install -Pit-test
    - mvn sonar:sonar -Psonar-apache
    - bash <(curl -s https://codecov.io/bash) || echo 'Codecov failed to upload'
diff --cc broker/pom.xml
index 2c5a407,fee6d9e..bb83991
--- a/broker/pom.xml
+++ b/broker/pom.xml
@@@ -67,9 -67,9 +67,13 @@@
              <artifactId>slf4j-api</artifactId>
          </dependency>
          <dependency>
+             <groupId>org.bouncycastle</groupId>
+             <artifactId>bcpkix-jdk15on</artifactId>
+         </dependency>
++        <dependency>
 +            <groupId>com.googlecode.concurrentlinkedhashmap</groupId>
 +            <artifactId>concurrentlinkedhashmap-lru</artifactId>
 +        </dependency>
      </dependencies>
  
      <build>
diff --cc broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 7cf48c9,7bfc618..ef67ade
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@@ -31,13 -47,13 +31,15 @@@ import org.apache.rocketmq.broker.filte
  import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
  import org.apache.rocketmq.broker.latency.BrokerFastFailure;
  import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
 +import org.apache.rocketmq.broker.loadbalance.AssignmentManager;
+ import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService;
  import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener;
  import org.apache.rocketmq.broker.longpolling.PullRequestHoldService;
  import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
  import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
  import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
 +import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
+ import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager;
  import org.apache.rocketmq.broker.out.BrokerOuterAPI;
  import org.apache.rocketmq.broker.plugin.MessageStoreFactory;
  import org.apache.rocketmq.broker.plugin.MessageStorePluginContext;
@@@ -54,10 -66,10 +56,12 @@@ import org.apache.rocketmq.broker.proce
  import org.apache.rocketmq.broker.processor.ReplyMessageProcessor;
  import org.apache.rocketmq.broker.processor.SendMessageProcessor;
  import org.apache.rocketmq.broker.slave.SlaveSynchronize;
+ import org.apache.rocketmq.broker.subscription.LmqSubscriptionGroupManager;
  import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
+ import org.apache.rocketmq.broker.topic.LmqTopicConfigManager;
  import org.apache.rocketmq.broker.topic.TopicConfigManager;
 +import org.apache.rocketmq.broker.topic.TopicQueueMappingCleanService;
 +import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
  import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
  import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
  import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
@@@ -101,30 -110,7 +105,31 @@@ import org.apache.rocketmq.store.config
  import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
  import org.apache.rocketmq.store.stats.BrokerStats;
  import org.apache.rocketmq.store.stats.BrokerStatsManager;
 +
 +import java.io.IOException;
 +import java.net.InetSocketAddress;
 +import java.util.AbstractMap;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Objects;
 +import java.util.Optional;
 +import java.util.concurrent.BlockingQueue;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ConcurrentMap;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.LinkedBlockingQueue;
 +import java.util.concurrent.ScheduledExecutorService;
 +import java.util.concurrent.ThreadPoolExecutor;
 +import java.util.concurrent.TimeUnit;
 +import java.util.function.Function;
 +import java.util.stream.Collectors;
+ import org.apache.rocketmq.store.stats.LmqBrokerStatsManager;
  
  public class BrokerController {
      private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@@ -160,7 -137,7 +165,8 @@@
          "BrokerControllerScheduledThread"));
      private final SlaveSynchronize slaveSynchronize;
      private final BlockingQueue<Runnable> sendThreadPoolQueue;
+     private final BlockingQueue<Runnable> putThreadPoolQueue;
 +    private final BlockingQueue<Runnable> ackThreadPoolQueue;
      private final BlockingQueue<Runnable> pullThreadPoolQueue;
      private final BlockingQueue<Runnable> replyThreadPoolQueue;
      private final BlockingQueue<Runnable> queryThreadPoolQueue;
@@@ -177,10 -153,9 +183,11 @@@
      private RemotingServer remotingServer;
      private RemotingServer fastRemotingServer;
      private TopicConfigManager topicConfigManager;
 +    private TopicQueueMappingManager topicQueueMappingManager;
      private ExecutorService sendMessageExecutor;
+     private ExecutorService putMessageFutureExecutor;
      private ExecutorService pullMessageExecutor;
 +    private ExecutorService ackMessageExecutor;
      private ExecutorService replyMessageExecutor;
      private ExecutorService queryMessageExecutor;
      private ExecutorService adminBrokerExecutor;
@@@ -213,17 -185,11 +220,16 @@@
          this.nettyServerConfig = nettyServerConfig;
          this.nettyClientConfig = nettyClientConfig;
          this.messageStoreConfig = messageStoreConfig;
-         this.consumerOffsetManager = new ConsumerOffsetManager(this);
-         this.topicConfigManager = new TopicConfigManager(this);
-         this.topicQueueMappingManager = new TopicQueueMappingManager(this);
+         this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new LmqConsumerOffsetManager(this) : new ConsumerOffsetManager(this);
+         this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this);
          this.pullMessageProcessor = new PullMessageProcessor(this);
-         this.pullRequestHoldService = new PullRequestHoldService(this);
+         this.pullRequestHoldService = messageStoreConfig.isEnableLmq() ? new LmqPullRequestHoldService(this) : new PullRequestHoldService(this);
 -        this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
 +        this.popMessageProcessor = new PopMessageProcessor(this);
 +        this.ackMessageProcessor = new AckMessageProcessor(this);
 +        this.changeInvisibleTimeProcessor = new ChangeInvisibleTimeProcessor(this);
 +        this.sendMessageProcessor = new SendMessageProcessor(this);
 +        this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService,
 +            this.popMessageProcessor);
          this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
          this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
          this.consumerFilterManager = new ConsumerFilterManager(this);
@@@ -231,18 -196,15 +237,19 @@@
          this.producerManager = new ProducerManager();
          this.clientHousekeepingService = new ClientHousekeepingService(this);
          this.broker2Client = new Broker2Client(this);
-         this.subscriptionGroupManager = new SubscriptionGroupManager(this);
+         this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this);
 -        this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
 +        this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig, this);
          this.filterServerManager = new FilterServerManager(this);
  
 +        this.assignmentManager = new AssignmentManager(this);
 +        this.queryAssignmentProcessor = new QueryAssignmentProcessor(this);
 +        this.clientManageProcessor = new ClientManageProcessor(this);
          this.slaveSynchronize = new SlaveSynchronize(this);
  
          this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
+         this.putThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPutThreadPoolQueueCapacity());
          this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
 +        this.ackThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getAckThreadPoolQueueCapacity());
          this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
          this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
          this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
@@@ -1410,27 -1261,7 +1430,27 @@@
          }
      }
  
-     public ExecutorService getSendMessageExecutor() {
-         return sendMessageExecutor;
+     public ExecutorService getPutMessageFutureExecutor() {
+         return putMessageFutureExecutor;
      }
 +
 +    public long getShouldStartTime() {
 +        return shouldStartTime;
 +    }
 +
 +    public AssignmentManager getAssignmentManager() {
 +        return assignmentManager;
 +    }
 +
 +    public SendMessageProcessor getSendMessageProcessor() {
 +        return sendMessageProcessor;
 +    }
 +
 +    public QueryAssignmentProcessor getQueryAssignmentProcessor() {
 +        return queryAssignmentProcessor;
 +    }
 +
 +    public TopicQueueMappingCleanService getTopicQueueMappingCleanService() {
 +        return topicQueueMappingCleanService;
 +    }
  }
diff --cc broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
index 6360d54,321c800..72739d8
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
@@@ -35,18 -35,14 +35,14 @@@ public class BrokerPathConfigHelper 
          return rootDir + File.separator + "config" + File.separator + "topics.json";
      }
  
 -    public static String getConsumerOffsetPath(final String rootDir) {
 -        return rootDir + File.separator + "config" + File.separator + "consumerOffset.json";
 +    public static String getTopicQueueMappingPath(final String rootDir) {
 +        return rootDir + File.separator + "config" + File.separator + "topicQueueMapping.json";
      }
  
 -    public static String getLmqConsumerOffsetPath(final String rootDir) {
 -        return rootDir + File.separator + "config" + File.separator + "lmqConsumerOffset.json";
 +    public static String getConsumerOffsetPath(final String rootDir) {
 +        return rootDir + File.separator + "config" + File.separator + "consumerOffset.json";
      }
  
-     public static String getConsumerOrderInfoPath(final String rootDir) {
-         return rootDir + File.separator + "config" + File.separator + "consumerOrderInfo.json";
-     }
- 
      public static String getSubscriptionGroupPath(final String rootDir) {
          return rootDir + File.separator + "config" + File.separator + "subscriptionGroup.json";
      }
diff --cc broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
index 44edfe0,62fd1c5..699e43c
--- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
@@@ -1,272 -1,272 +1,277 @@@
- /*
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
- 
- package org.apache.rocketmq.broker.plugin;
- 
- import java.util.HashMap;
- import java.util.LinkedList;
- import java.util.Set;
- import java.util.concurrent.CompletableFuture;
- import org.apache.rocketmq.common.message.MessageExt;
- import org.apache.rocketmq.store.CommitLogDispatcher;
- import org.apache.rocketmq.store.GetMessageResult;
- import org.apache.rocketmq.store.MessageExtBatch;
- import org.apache.rocketmq.store.MessageExtBrokerInner;
- import org.apache.rocketmq.store.MessageFilter;
- import org.apache.rocketmq.store.MessageStore;
- import org.apache.rocketmq.store.PutMessageResult;
- import org.apache.rocketmq.store.QueryMessageResult;
- import org.apache.rocketmq.store.SelectMappedBufferResult;
- import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
- import org.apache.rocketmq.store.stats.BrokerStatsManager;
- 
- public abstract class AbstractPluginMessageStore implements MessageStore {
-     protected MessageStore next = null;
-     protected MessageStorePluginContext context;
- 
-     public AbstractPluginMessageStore(MessageStorePluginContext context, MessageStore next) {
-         this.next = next;
-         this.context = context;
-     }
- 
-     @Override
-     public long getEarliestMessageTime() {
-         return next.getEarliestMessageTime();
-     }
- 
-     @Override
-     public long lockTimeMills() {
-         return next.lockTimeMills();
-     }
- 
-     @Override
-     public boolean isOSPageCacheBusy() {
-         return next.isOSPageCacheBusy();
-     }
- 
-     @Override
-     public boolean isTransientStorePoolDeficient() {
-         return next.isTransientStorePoolDeficient();
-     }
- 
-     @Override
-     public boolean load() {
-         return next.load();
-     }
- 
-     @Override
-     public void start() throws Exception {
-         next.start();
-     }
- 
-     @Override
-     public void shutdown() {
-         next.shutdown();
-     }
- 
-     @Override
-     public void destroy() {
-         next.destroy();
-     }
- 
-     @Override
-     public PutMessageResult putMessage(MessageExtBrokerInner msg) {
-         return next.putMessage(msg);
-     }
- 
-     @Override
-     public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
-         return next.asyncPutMessage(msg);
-     }
- 
-     @Override
-     public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
-         return next.asyncPutMessages(messageExtBatch);
-     }
- 
-     @Override
-     public GetMessageResult getMessage(String group, String topic, int queueId, long offset,
-         int maxMsgNums, final MessageFilter messageFilter) {
-         return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter);
-     }
- 
-     @Override
-     public long getMaxOffsetInQueue(String topic, int queueId) {
-         return next.getMaxOffsetInQueue(topic, queueId);
-     }
- 
-     @Override
-     public long getMaxOffsetInQueue(String topic, int queueId, boolean committed) {
-         return next.getMaxOffsetInQueue(topic, queueId, committed);
-     }
- 
-     @Override
-     public long getMinOffsetInQueue(String topic, int queueId) {
-         return next.getMinOffsetInQueue(topic, queueId);
-     }
- 
-     @Override
-     public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQueueOffset) {
-         return next.getCommitLogOffsetInQueue(topic, queueId, consumeQueueOffset);
-     }
- 
-     @Override
-     public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) {
-         return next.getOffsetInQueueByTime(topic, queueId, timestamp);
-     }
- 
-     @Override
-     public MessageExt lookMessageByOffset(long commitLogOffset) {
-         return next.lookMessageByOffset(commitLogOffset);
-     }
- 
-     @Override
-     public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) {
-         return next.selectOneMessageByOffset(commitLogOffset);
-     }
- 
-     @Override
-     public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset, int msgSize) {
-         return next.selectOneMessageByOffset(commitLogOffset, msgSize);
-     }
- 
-     @Override
-     public String getRunningDataInfo() {
-         return next.getRunningDataInfo();
-     }
- 
-     @Override
-     public HashMap<String, String> getRuntimeInfo() {
-         return next.getRuntimeInfo();
-     }
- 
-     @Override
-     public long getMaxPhyOffset() {
-         return next.getMaxPhyOffset();
-     }
- 
-     @Override
-     public long getMinPhyOffset() {
-         return next.getMinPhyOffset();
-     }
- 
-     @Override
-     public long getEarliestMessageTime(String topic, int queueId) {
-         return next.getEarliestMessageTime(topic, queueId);
-     }
- 
-     @Override
-     public long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset) {
-         return next.getMessageStoreTimeStamp(topic, queueId, consumeQueueOffset);
-     }
- 
-     @Override
-     public long getMessageTotalInQueue(String topic, int queueId) {
-         return next.getMessageTotalInQueue(topic, queueId);
-     }
- 
-     @Override
-     public SelectMappedBufferResult getCommitLogData(long offset) {
-         return next.getCommitLogData(offset);
-     }
- 
-     @Override
-     public boolean appendToCommitLog(long startOffset, byte[] data, int dataStart, int dataLength) {
-         return next.appendToCommitLog(startOffset, data, dataStart, dataLength);
-     }
- 
-     @Override
-     public void executeDeleteFilesManually() {
-         next.executeDeleteFilesManually();
-     }
- 
-     @Override
-     public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin,
-         long end) {
-         return next.queryMessage(topic, key, maxNum, begin, end);
-     }
- 
-     @Override
-     public void updateHaMasterAddress(String newAddr) {
-         next.updateHaMasterAddress(newAddr);
-     }
- 
-     @Override
-     public long slaveFallBehindMuch() {
-         return next.slaveFallBehindMuch();
-     }
- 
-     @Override
-     public long now() {
-         return next.now();
-     }
- 
-     @Override
-     public int cleanUnusedTopic(Set<String> topics) {
-         return next.cleanUnusedTopic(topics);
-     }
- 
-     @Override
-     public void cleanExpiredConsumerQueue() {
-         next.cleanExpiredConsumerQueue();
-     }
- 
-     @Override
-     public boolean checkInDiskByConsumeOffset(String topic, int queueId, long consumeOffset) {
-         return next.checkInDiskByConsumeOffset(topic, queueId, consumeOffset);
-     }
- 
-     @Override
-     public long dispatchBehindBytes() {
-         return next.dispatchBehindBytes();
-     }
- 
-     @Override
-     public long flush() {
-         return next.flush();
-     }
- 
-     @Override
-     public boolean resetWriteOffset(long phyOffset) {
-         return next.resetWriteOffset(phyOffset);
-     }
- 
-     @Override
-     public long getConfirmOffset() {
-         return next.getConfirmOffset();
-     }
- 
-     @Override
-     public void setConfirmOffset(long phyOffset) {
-         next.setConfirmOffset(phyOffset);
-     }
- 
-     @Override
-     public LinkedList<CommitLogDispatcher> getDispatcherList() {
-         return next.getDispatcherList();
-     }
- 
-     @Override
-     public ConsumeQueueInterface getConsumeQueue(String topic, int queueId) {
-         return next.getConsumeQueue(topic, queueId);
-     }
- 
-     @Override
-     public BrokerStatsManager getBrokerStatsManager() {
-         return next.getBrokerStatsManager();
-     };
- }
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.rocketmq.broker.plugin;
+ 
+ import java.util.HashMap;
+ import java.util.LinkedList;
+ import java.util.Set;
+ import java.util.concurrent.CompletableFuture;
+ import org.apache.rocketmq.common.message.MessageExt;
 -import org.apache.rocketmq.common.message.MessageExtBatch;
+ import org.apache.rocketmq.store.CommitLogDispatcher;
 -import org.apache.rocketmq.store.ConsumeQueue;
+ import org.apache.rocketmq.store.GetMessageResult;
++import org.apache.rocketmq.store.MessageExtBatch;
+ import org.apache.rocketmq.store.MessageExtBrokerInner;
+ import org.apache.rocketmq.store.MessageFilter;
+ import org.apache.rocketmq.store.MessageStore;
+ import org.apache.rocketmq.store.PutMessageResult;
+ import org.apache.rocketmq.store.QueryMessageResult;
+ import org.apache.rocketmq.store.SelectMappedBufferResult;
++import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
+ import org.apache.rocketmq.store.stats.BrokerStatsManager;
+ 
+ public abstract class AbstractPluginMessageStore implements MessageStore {
+     protected MessageStore next = null;
+     protected MessageStorePluginContext context;
+ 
+     public AbstractPluginMessageStore(MessageStorePluginContext context, MessageStore next) {
+         this.next = next;
+         this.context = context;
+     }
+ 
+     @Override
+     public long getEarliestMessageTime() {
+         return next.getEarliestMessageTime();
+     }
+ 
+     @Override
+     public long lockTimeMills() {
+         return next.lockTimeMills();
+     }
+ 
+     @Override
+     public boolean isOSPageCacheBusy() {
+         return next.isOSPageCacheBusy();
+     }
+ 
+     @Override
+     public boolean isTransientStorePoolDeficient() {
+         return next.isTransientStorePoolDeficient();
+     }
+ 
+     @Override
+     public boolean load() {
+         return next.load();
+     }
+ 
+     @Override
+     public void start() throws Exception {
+         next.start();
+     }
+ 
+     @Override
+     public void shutdown() {
+         next.shutdown();
+     }
+ 
+     @Override
+     public void destroy() {
+         next.destroy();
+     }
+ 
+     @Override
+     public PutMessageResult putMessage(MessageExtBrokerInner msg) {
+         return next.putMessage(msg);
+     }
+ 
+     @Override
+     public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
+         return next.asyncPutMessage(msg);
+     }
+ 
+     @Override
+     public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
+         return next.asyncPutMessages(messageExtBatch);
+     }
+ 
+     @Override
+     public GetMessageResult getMessage(String group, String topic, int queueId, long offset,
+         int maxMsgNums, final MessageFilter messageFilter) {
+         return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter);
+     }
+ 
+     @Override
+     public long getMaxOffsetInQueue(String topic, int queueId) {
+         return next.getMaxOffsetInQueue(topic, queueId);
+     }
+ 
+     @Override
++    public long getMaxOffsetInQueue(String topic, int queueId, boolean committed) {
++        return next.getMaxOffsetInQueue(topic, queueId, committed);
++    }
++
++    @Override
+     public long getMinOffsetInQueue(String topic, int queueId) {
+         return next.getMinOffsetInQueue(topic, queueId);
+     }
+ 
+     @Override
+     public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQueueOffset) {
+         return next.getCommitLogOffsetInQueue(topic, queueId, consumeQueueOffset);
+     }
+ 
+     @Override
+     public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) {
+         return next.getOffsetInQueueByTime(topic, queueId, timestamp);
+     }
+ 
+     @Override
+     public MessageExt lookMessageByOffset(long commitLogOffset) {
+         return next.lookMessageByOffset(commitLogOffset);
+     }
+ 
+     @Override
+     public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) {
+         return next.selectOneMessageByOffset(commitLogOffset);
+     }
+ 
+     @Override
+     public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset, int msgSize) {
+         return next.selectOneMessageByOffset(commitLogOffset, msgSize);
+     }
+ 
+     @Override
+     public String getRunningDataInfo() {
+         return next.getRunningDataInfo();
+     }
+ 
+     @Override
+     public HashMap<String, String> getRuntimeInfo() {
+         return next.getRuntimeInfo();
+     }
+ 
+     @Override
+     public long getMaxPhyOffset() {
+         return next.getMaxPhyOffset();
+     }
+ 
+     @Override
+     public long getMinPhyOffset() {
+         return next.getMinPhyOffset();
+     }
+ 
+     @Override
+     public long getEarliestMessageTime(String topic, int queueId) {
+         return next.getEarliestMessageTime(topic, queueId);
+     }
+ 
+     @Override
+     public long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset) {
+         return next.getMessageStoreTimeStamp(topic, queueId, consumeQueueOffset);
+     }
+ 
+     @Override
+     public long getMessageTotalInQueue(String topic, int queueId) {
+         return next.getMessageTotalInQueue(topic, queueId);
+     }
+ 
+     @Override
+     public SelectMappedBufferResult getCommitLogData(long offset) {
+         return next.getCommitLogData(offset);
+     }
+ 
+     @Override
+     public boolean appendToCommitLog(long startOffset, byte[] data, int dataStart, int dataLength) {
+         return next.appendToCommitLog(startOffset, data, dataStart, dataLength);
+     }
+ 
+     @Override
+     public void executeDeleteFilesManually() {
+         next.executeDeleteFilesManually();
+     }
+ 
+     @Override
+     public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin,
+         long end) {
+         return next.queryMessage(topic, key, maxNum, begin, end);
+     }
+ 
+     @Override
+     public void updateHaMasterAddress(String newAddr) {
+         next.updateHaMasterAddress(newAddr);
+     }
+ 
+     @Override
+     public long slaveFallBehindMuch() {
+         return next.slaveFallBehindMuch();
+     }
+ 
+     @Override
+     public long now() {
+         return next.now();
+     }
+ 
+     @Override
+     public int cleanUnusedTopic(Set<String> topics) {
+         return next.cleanUnusedTopic(topics);
+     }
+ 
+     @Override
+     public void cleanExpiredConsumerQueue() {
+         next.cleanExpiredConsumerQueue();
+     }
+ 
+     @Override
+     public boolean checkInDiskByConsumeOffset(String topic, int queueId, long consumeOffset) {
+         return next.checkInDiskByConsumeOffset(topic, queueId, consumeOffset);
+     }
+ 
+     @Override
+     public long dispatchBehindBytes() {
+         return next.dispatchBehindBytes();
+     }
+ 
+     @Override
+     public long flush() {
+         return next.flush();
+     }
+ 
+     @Override
+     public boolean resetWriteOffset(long phyOffset) {
+         return next.resetWriteOffset(phyOffset);
+     }
+ 
+     @Override
+     public long getConfirmOffset() {
+         return next.getConfirmOffset();
+     }
+ 
+     @Override
+     public void setConfirmOffset(long phyOffset) {
+         next.setConfirmOffset(phyOffset);
+     }
+ 
+     @Override
+     public LinkedList<CommitLogDispatcher> getDispatcherList() {
+         return next.getDispatcherList();
+     }
+ 
+     @Override
 -    public ConsumeQueue getConsumeQueue(String topic, int queueId) {
++    public ConsumeQueueInterface getConsumeQueue(String topic, int queueId) {
+         return next.getConsumeQueue(topic, queueId);
+     }
+ 
+     @Override
+     public BrokerStatsManager getBrokerStatsManager() {
+         return next.getBrokerStatsManager();
+     }
+ 
+     @Override
+     public void cleanUnusedLmqTopic(String topic) {
+         next.cleanUnusedLmqTopic(topic);
+     }
+ }
diff --cc broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index c1819a5,9d188ab..d79ae4a
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@@ -371,9 -292,14 +371,16 @@@ public class AdminBrokerProcessor exten
              return response;
          }
  
+         if (MixAll.isLmq(topic)) {
+             this.brokerController.getMessageStore().cleanUnusedLmqTopic(topic);
+             response.setCode(ResponseCode.SUCCESS);
+             response.setRemark(null);
+             return response;
+         }
+ 
          this.brokerController.getTopicConfigManager().deleteTopicConfig(topic);
 +        this.brokerController.getTopicQueueMappingManager().delete(topic);
 +
          this.brokerController.getMessageStore()
              .cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
          if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) {
diff --cc broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 0f1ca23,20665a3..806dcd3
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@@ -100,202 -89,9 +100,203 @@@ public class PullMessageProcessor exten
          return false;
      }
  
 +
 +
 +    private RemotingCommand rewriteRequestForStaticTopic(PullMessageRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
 +        try {
 +            if (mappingContext.getMappingDetail() == null) {
 +                return null;
 +            }
 +            TopicQueueMappingDetail mappingDetail =  mappingContext.getMappingDetail();
 +            String topic = mappingContext.getTopic();
 +            Integer globalId = mappingContext.getGlobalId();
 +            // if the leader? consider the order consumer, which will lock the mq
 +            if (!mappingContext.isLeader()) {
 +                return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d cannot find mapping item in request process of current broker %s", topic, globalId, mappingDetail.getBname()));
 +            }
 +            Long globalOffset = requestHeader.getQueueOffset();
 +
 +            LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), globalOffset, true);
 +            mappingContext.setCurrentItem(mappingItem);
 +
 +            if (globalOffset < mappingItem.getLogicOffset()) {
 +                //handleOffsetMoved
 +                //If the physical queue is reused, we should handle the PULL_OFFSET_MOVED independently
 +                //Otherwise, we could just transfer it to the physical process
 +            }
 +            //below are physical info
 +            String bname = mappingItem.getBname();
 +            Integer phyQueueId = mappingItem.getQueueId();
 +            Long phyQueueOffset = mappingItem.computePhysicalQueueOffset(globalOffset);
 +            requestHeader.setQueueId(phyQueueId);
 +            requestHeader.setQueueOffset(phyQueueOffset);
 +            if (mappingItem.checkIfEndOffsetDecided()
 +                    && requestHeader.getMaxMsgNums() != null) {
 +                requestHeader.setMaxMsgNums((int) Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(), requestHeader.getMaxMsgNums()));
 +            }
 +
 +            if (mappingDetail.getBname().equals(bname)) {
 +                //just let it go, do the local pull process
 +                return null;
 +            }
 +
 +            int sysFlag = requestHeader.getSysFlag();
 +            requestHeader.setLo(false);
 +            requestHeader.setBname(bname);
 +            sysFlag = PullSysFlag.clearSuspendFlag(sysFlag);
 +            sysFlag = PullSysFlag.clearCommitOffsetFlag(sysFlag);
 +            requestHeader.setSysFlag(sysFlag);
 +            RpcRequest rpcRequest = new RpcRequest(RequestCode.PULL_MESSAGE, requestHeader, null);
 +            RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
 +            if (rpcResponse.getException() != null) {
 +                throw rpcResponse.getException();
 +            }
 +
 +            PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) rpcResponse.getHeader();
 +            {
 +                RemotingCommand rewriteResult =  rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext, rpcResponse.getCode());
 +                if (rewriteResult != null) {
 +                    return rewriteResult;
 +                }
 +            }
 +            return RpcClientUtils.createCommandForRpcResponse(rpcResponse);
 +        } catch (Throwable t) {
 +            return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
 +        }
 +    }
 +
 +    private RemotingCommand rewriteResponseForStaticTopic(PullMessageRequestHeader requestHeader, PullMessageResponseHeader responseHeader,
 +                                                          TopicQueueMappingContext mappingContext, final int code) {
 +        try {
 +            if (mappingContext.getMappingDetail() == null) {
 +                return null;
 +            }
 +            TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
 +            LogicQueueMappingItem leaderItem = mappingContext.getLeaderItem();
 +
 +            LogicQueueMappingItem currentItem = mappingContext.getCurrentItem();
 +
 +            LogicQueueMappingItem earlistItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true);
 +
 +            assert currentItem.getLogicOffset() >= 0;
 +
 +            long requestOffset = requestHeader.getQueueOffset();
 +            long nextBeginOffset = responseHeader.getNextBeginOffset();
 +            long minOffset = responseHeader.getMinOffset();
 +            long maxOffset = responseHeader.getMaxOffset();
 +            int responseCode = code;
 +
 +            //consider the following situations
 +            // 1. read from slave, currently not supported
 +            // 2. the middle queue is truncated because of deleting commitlog
 +            if (code != ResponseCode.SUCCESS) {
 +                //note the currentItem maybe both the leader and  the earliest
 +                boolean isRevised = false;
 +                if (leaderItem.getGen() == currentItem.getGen()) {
 +                    //read the leader
 +                    if (requestOffset > maxOffset) {
 +                        //actually, we need do nothing, but keep the code structure here
 +                        if (code == ResponseCode.PULL_OFFSET_MOVED) {
 +                            responseCode = ResponseCode.PULL_OFFSET_MOVED;
 +                            nextBeginOffset = maxOffset;
 +                        } else {
 +                            //maybe current broker is the slave
 +                            responseCode = code;
 +                        }
 +                    } else  if (requestOffset < minOffset) {
 +                        nextBeginOffset = minOffset;
 +                        responseCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
 +                    } else {
 +                        responseCode = code;
 +                    }
 +                }
 +                //note the currentItem maybe both the leader and  the earliest
 +                if (earlistItem.getGen() == currentItem.getGen()) {
 +                    //read the earliest one
 +                    if (requestOffset < minOffset) {
 +                        if (code == ResponseCode.PULL_OFFSET_MOVED) {
 +                            responseCode = ResponseCode.PULL_OFFSET_MOVED;
 +                            nextBeginOffset = minOffset;
 +                        } else {
 +                            //maybe read from slave, but we still set it to moved
 +                            responseCode = ResponseCode.PULL_OFFSET_MOVED;
 +                            nextBeginOffset = minOffset;
 +                        }
 +                    } else if (requestOffset >= maxOffset) {
 +                        //just move to another item
 +                        LogicQueueMappingItem nextItem = TopicQueueMappingUtils.findNext(mappingContext.getMappingItemList(), currentItem, true);
 +                        if (nextItem != null) {
 +                            isRevised = true;
 +                            currentItem = nextItem;
 +                            nextBeginOffset = currentItem.getStartOffset();
 +                            minOffset = currentItem.getStartOffset();
 +                            maxOffset = minOffset;
 +                            responseCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
 +                        } else {
 +                            //maybe the next one's logic offset is -1
 +                            responseCode = ResponseCode.PULL_NOT_FOUND;
 +                        }
 +                    } else {
 +                        //let it go
 +                        responseCode = code;
 +                    }
 +                }
 +
 +                //read from the middle item, ignore the PULL_OFFSET_MOVED
 +                if (!isRevised
 +                    && leaderItem.getGen() != currentItem.getGen()
 +                    && earlistItem.getGen() != currentItem.getGen()) {
 +                    if (requestOffset < minOffset) {
 +                        nextBeginOffset = minOffset;
 +                        responseCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
 +                    } else if (requestOffset >= maxOffset) {
 +                        //just move to another item
 +                        LogicQueueMappingItem nextItem = TopicQueueMappingUtils.findNext(mappingContext.getMappingItemList(), currentItem, true);
 +                        if (nextItem != null) {
 +                            currentItem = nextItem;
 +                            nextBeginOffset = currentItem.getStartOffset();
 +                            minOffset = currentItem.getStartOffset();
 +                            maxOffset = minOffset;
 +                            responseCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
 +                        } else {
 +                            //maybe the next one's logic offset is -1
 +                            responseCode = ResponseCode.PULL_NOT_FOUND;
 +                        }
 +                    } else {
 +                        responseCode = code;
 +                    }
 +                }
 +            }
 +
 +            //handle nextBeginOffset
 +            //the next begin offset should no more than the end offset
 +            if (currentItem.checkIfEndOffsetDecided()
 +                    && nextBeginOffset >= currentItem.getEndOffset()) {
 +                nextBeginOffset = currentItem.getEndOffset();
 +            }
 +            responseHeader.setNextBeginOffset(currentItem.computeStaticQueueOffsetStrictly(nextBeginOffset));
 +            //handle min offset
 +            responseHeader.setMinOffset(currentItem.computeStaticQueueOffsetStrictly(Math.max(currentItem.getStartOffset(), minOffset)));
 +            //handle max offset
 +            responseHeader.setMaxOffset(Math.max(currentItem.computeStaticQueueOffsetStrictly(maxOffset),
 +                    TopicQueueMappingDetail.computeMaxOffsetFromMapping(mappingDetail, mappingContext.getGlobalId())));
 +            //set the offsetDelta
 +            responseHeader.setOffsetDelta(currentItem.computeOffsetDelta());
 +
 +            if (code != ResponseCode.SUCCESS) {
 +                return RemotingCommand.createResponseCommandWithHeader(responseCode, responseHeader);
 +            } else {
 +                return null;
 +            }
 +        } catch (Throwable t) {
 +            return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
 +        }
 +    }
 +
 +
      private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
          throws RemotingCommandException {
+         final long beginTimeMills = this.brokerController.getMessageStore().now();
          RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
          final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
          final PullMessageRequestHeader requestHeader =
diff --cc broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index a3f5c4e,c8ea4d3..f02358f
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@@ -60,8 -54,9 +60,10 @@@ import org.apache.rocketmq.remoting.exc
  import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
  import org.apache.rocketmq.remoting.netty.RemotingResponseCallback;
  import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+ import org.apache.rocketmq.store.DefaultMessageStore;
 +import org.apache.rocketmq.store.MessageExtBatch;
  import org.apache.rocketmq.store.MessageExtBrokerInner;
+ import org.apache.rocketmq.store.MessageStore;
  import org.apache.rocketmq.store.PutMessageResult;
  import org.apache.rocketmq.store.config.MessageStoreConfig;
  import org.apache.rocketmq.store.config.StorePathConfigHelper;
diff --cc broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index acf05fb,a2abdc0..e4814f0
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@@ -19,23 -20,35 +20,38 @@@ import com.alibaba.fastjson.JSON
  import com.google.common.collect.Sets;
  import io.netty.channel.Channel;
  import io.netty.channel.ChannelHandlerContext;
++import java.net.InetSocketAddress;
  import org.apache.rocketmq.broker.BrokerController;
+ import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+ import org.apache.rocketmq.broker.client.ConsumerManager;
+ import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
  import org.apache.rocketmq.broker.topic.TopicConfigManager;
  import org.apache.rocketmq.common.BrokerConfig;
  import org.apache.rocketmq.common.MixAll;
  import org.apache.rocketmq.common.TopicConfig;
  import org.apache.rocketmq.common.TopicFilterType;
 +import org.apache.rocketmq.common.TopicQueueId;
  import org.apache.rocketmq.common.constant.PermName;
+ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  import org.apache.rocketmq.common.message.MessageAccessor;
  import org.apache.rocketmq.common.message.MessageConst;
  import org.apache.rocketmq.common.message.MessageExt;
  import org.apache.rocketmq.common.protocol.RequestCode;
  import org.apache.rocketmq.common.protocol.ResponseCode;
+ import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
+ import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
  import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
  import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader;
+ import org.apache.rocketmq.common.protocol.header.GetAllTopicConfigResponseHeader;
+ import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeRequestHeader;
+ import org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader;
+ import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
 +import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
  import org.apache.rocketmq.common.protocol.header.ResumeCheckHalfMessageRequestHeader;
+ import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
+ import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+ import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
  import org.apache.rocketmq.common.topic.TopicValidator;
  import org.apache.rocketmq.remoting.exception.RemotingCommandException;
  import org.apache.rocketmq.remoting.netty.NettyClientConfig;
@@@ -43,13 -56,16 +59,16 @@@ import org.apache.rocketmq.remoting.net
  import org.apache.rocketmq.remoting.protocol.RemotingCommand;
  import org.apache.rocketmq.store.AppendMessageResult;
  import org.apache.rocketmq.store.AppendMessageStatus;
+ import org.apache.rocketmq.store.DefaultMessageStore;
 -import org.apache.rocketmq.store.MappedFile;
  import org.apache.rocketmq.store.MessageExtBrokerInner;
  import org.apache.rocketmq.store.MessageStore;
  import org.apache.rocketmq.store.PutMessageResult;
  import org.apache.rocketmq.store.PutMessageStatus;
  import org.apache.rocketmq.store.SelectMappedBufferResult;
  import org.apache.rocketmq.store.config.MessageStoreConfig;
 +import org.apache.rocketmq.store.logfile.DefaultMappedFile;
+ import org.apache.rocketmq.store.schedule.ScheduleMessageService;
+ import org.apache.rocketmq.store.stats.BrokerStats;
  import org.junit.Before;
  import org.junit.Test;
  import org.junit.runner.RunWith;
@@@ -57,12 -73,12 +76,14 @@@ import org.mockito.Mock
  import org.mockito.Spy;
  import org.mockito.junit.MockitoJUnitRunner;
  
- import java.net.InetSocketAddress;
+ import java.net.SocketAddress;
  import java.net.UnknownHostException;
  import java.nio.ByteBuffer;
+ import java.util.HashMap;
+ import java.util.Map;
  import java.util.Set;
 +import java.util.concurrent.ConcurrentMap;
 +import java.util.concurrent.atomic.LongAdder;
  
  import static org.assertj.core.api.Assertions.assertThat;
  import static org.mockito.ArgumentMatchers.any;
@@@ -88,21 -104,28 +112,36 @@@ public class AdminBrokerProcessorTest 
      @Mock
      private MessageStore messageStore;
  
 -    private Set<String> systemTopicSet;
 +    @Mock
 +    private SendMessageProcessor sendMessageProcessor;
  
      @Mock
 -    private Channel channel;
 +    private ConcurrentMap<TopicQueueId, LongAdder> inFlyWritingCouterMap;
 +
 +    private Set<String> systemTopicSet;
 +    private String topic;
 +
+     @Mock
+     private SocketAddress socketAddress;
+     @Mock
+     private BrokerStats brokerStats;
+     @Mock
+     private TopicConfigManager topicConfigManager;
+     @Mock
+     private ConsumerManager consumerManager;
+     @Mock
+     private ConsumerOffsetManager consumerOffsetManager;
+     @Mock
+     private DefaultMessageStore defaultMessageStore;
+     @Mock
+     private ScheduleMessageService scheduleMessageService;
+ 
      @Before
 -    public void init() {
 +    public void init() throws Exception {
          brokerController.setMessageStore(messageStore);
 +
 +        //doReturn(sendMessageProcessor).when(brokerController).getSendMessageProcessor();
 +
          adminBrokerProcessor = new AdminBrokerProcessor(brokerController);
  
          systemTopicSet = Sets.newHashSet(
@@@ -184,33 -201,224 +223,252 @@@
      }
  
      @Test
+     public void testGetAllTopicConfig() throws Exception {
+         GetAllTopicConfigResponseHeader getAllTopicConfigResponseHeader = new GetAllTopicConfigResponseHeader();
+         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, getAllTopicConfigResponseHeader);
+         RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+     }
+ 
+     @Test
+     public void testUpdateBrokerConfig() throws Exception {
+         handlerContext = mock(ChannelHandlerContext.class);
+         channel = mock(Channel.class);
+         when(handlerContext.channel()).thenReturn(channel);
+         socketAddress = mock(SocketAddress.class);
+         when(channel.remoteAddress()).thenReturn(socketAddress);
+         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_BROKER_CONFIG, null);
+         Map<String, String> bodyMap = new HashMap<>();
+         bodyMap.put("key", "value");
+         request.setBody(bodyMap.toString().getBytes());
+         RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+     }
+ 
+     @Test
+     public void testGetBrokerConfig() throws Exception {
+         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CONFIG, null);
+         RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+     }
+ 
+     @Test
+     public void testSearchOffsetByTimestamp() throws Exception {
+         messageStore = mock(MessageStore.class);
+         when(messageStore.getOffsetInQueueByTime(anyString(), anyInt(), anyLong())).thenReturn(Long.MIN_VALUE);
+         when(brokerController.getMessageStore()).thenReturn(messageStore);
+         SearchOffsetRequestHeader searchOffsetRequestHeader = new SearchOffsetRequestHeader();
+         searchOffsetRequestHeader.setTopic("topic");
+         searchOffsetRequestHeader.setQueueId(0);
+         searchOffsetRequestHeader.setTimestamp(System.currentTimeMillis());
+         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, searchOffsetRequestHeader);
+         request.addExtField("topic", "topic");
+         request.addExtField("queueId", "0");
+         request.addExtField("timestamp", System.currentTimeMillis() + "");
+         RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+     }
+ 
+     @Test
+     public void testGetMaxOffset() throws Exception {
+         messageStore = mock(MessageStore.class);
+         when(messageStore.getMaxOffsetInQueue(anyString(), anyInt())).thenReturn(Long.MIN_VALUE);
+         when(brokerController.getMessageStore()).thenReturn(messageStore);
+         GetMaxOffsetRequestHeader getMaxOffsetRequestHeader = new GetMaxOffsetRequestHeader();
+         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, getMaxOffsetRequestHeader);
+         request.addExtField("topic", "topic");
+         request.addExtField("queueId", "0");
+         RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+     }
+ 
+     @Test
+     public void testGetMinOffset() throws Exception {
+         messageStore = mock(MessageStore.class);
+         when(messageStore.getMinOffsetInQueue(anyString(), anyInt())).thenReturn(Long.MIN_VALUE);
+         when(brokerController.getMessageStore()).thenReturn(messageStore);
+         GetMinOffsetRequestHeader getMinOffsetRequestHeader = new GetMinOffsetRequestHeader();
+         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, getMinOffsetRequestHeader);
+         request.addExtField("topic", "topic");
+         request.addExtField("queueId", "0");
+         RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+     }
+ 
+     @Test
+     public void testGetEarliestMsgStoretime() throws Exception {
+         messageStore = mock(MessageStore.class);
+         when(brokerController.getMessageStore()).thenReturn(messageStore);
+         GetEarliestMsgStoretimeRequestHeader getEarliestMsgStoretimeRequestHeader = new GetEarliestMsgStoretimeRequestHeader();
+         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_EARLIEST_MSG_STORETIME, getEarliestMsgStoretimeRequestHeader);
+         request.addExtField("topic", "topic");
+         request.addExtField("queueId", "0");
+         RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+     }
+ 
+     @Test
+     public void testGetBrokerRuntimeInfo() throws Exception {
+         brokerStats = mock(BrokerStats.class);
+         when(brokerController.getBrokerStats()).thenReturn(brokerStats);
+         when(brokerStats.getMsgPutTotalYesterdayMorning()).thenReturn(Long.MIN_VALUE);
+         when(brokerStats.getMsgPutTotalTodayMorning()).thenReturn(Long.MIN_VALUE);
+         when(brokerStats.getMsgPutTotalTodayNow()).thenReturn(Long.MIN_VALUE);
+         when(brokerStats.getMsgGetTotalTodayMorning()).thenReturn(Long.MIN_VALUE);
+         when(brokerStats.getMsgGetTotalTodayNow()).thenReturn(Long.MIN_VALUE);
+         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_RUNTIME_INFO, null);
+         RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+     }
+ 
+     @Test
+     public void testLockBatchMQ() throws Exception {
+         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
+         LockBatchRequestBody lockBatchRequestBody = new LockBatchRequestBody();
+         lockBatchRequestBody.setClientId("1111");
+         lockBatchRequestBody.setConsumerGroup("group");
+         request.setBody(JSON.toJSON(lockBatchRequestBody).toString().getBytes());
+         RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+     }
+ 
+     @Test
+     public void testUnlockBatchMQ() throws Exception {
+         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);
+         UnlockBatchRequestBody unlockBatchRequestBody = new UnlockBatchRequestBody();
+         unlockBatchRequestBody.setClientId("11111");
+         unlockBatchRequestBody.setConsumerGroup("group");
+         request.setBody(JSON.toJSON(unlockBatchRequestBody).toString().getBytes());
+         RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+     }
+ 
+     @Test
+     public void testUpdateAndCreateSubscriptionGroup() throws RemotingCommandException {
+         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null);
+         SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+         subscriptionGroupConfig.setBrokerId(1);
+         subscriptionGroupConfig.setGroupName("groupId");
+         subscriptionGroupConfig.setConsumeEnable(Boolean.TRUE);
+         subscriptionGroupConfig.setConsumeBroadcastEnable(Boolean.TRUE);
+         subscriptionGroupConfig.setRetryMaxTimes(111);
+         subscriptionGroupConfig.setConsumeFromMinEnable(Boolean.TRUE);
+         request.setBody(JSON.toJSON(subscriptionGroupConfig).toString().getBytes());
+         RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+     }
+ 
+     @Test
+     public void testGetAllSubscriptionGroup() throws RemotingCommandException {
+         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
+         RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+     }
+ 
+     @Test
+     public void testDeleteSubscriptionGroup() throws RemotingCommandException {
+         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP, null);
+         request.addExtField("groupName", "GID-Group-Name");
+         request.addExtField("removeOffset", "true");
+         RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+     }
+ 
+     @Test
+     public void testGetTopicStatsInfo() throws RemotingCommandException {
+         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_STATS_INFO, null);
+         request.addExtField("topic", "topicTest");
+         RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+         assertThat(response.getCode()).isEqualTo(ResponseCode.TOPIC_NOT_EXIST);
+         topicConfigManager = mock(TopicConfigManager.class);
+         when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
+         TopicConfig topicConfig = new TopicConfig();
+         topicConfig.setTopicName("topicTest");
+         when(topicConfigManager.selectTopicConfig(anyString())).thenReturn(topicConfig);
+         RemotingCommand responseSuccess = adminBrokerProcessor.processRequest(handlerContext, request);
+         assertThat(responseSuccess.getCode()).isEqualTo(ResponseCode.SUCCESS);
+     }
+ 
+     @Test
+     public void testGetConsumerConnectionList() throws RemotingCommandException {
+         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_CONNECTION_LIST, null);
+         request.addExtField("consumerGroup", "GID-group-test");
+         consumerManager = mock(ConsumerManager.class);
+         when(brokerController.getConsumerManager()).thenReturn(consumerManager);
+         ConsumerGroupInfo consumerGroupInfo = new ConsumerGroupInfo("GID-group-test", ConsumeType.CONSUME_ACTIVELY, MessageModel.CLUSTERING, ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+         when(consumerManager.getConsumerGroupInfo(anyString())).thenReturn(consumerGroupInfo);
+         RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+     }
+ 
+     @Test
+     public void testGetProducerConnectionList() throws RemotingCommandException {
+         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_PRODUCER_CONNECTION_LIST, null);
+         request.addExtField("producerGroup", "ProducerGroupId");
+         RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+         assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+     }
+ 
+     @Test
+     public void testGetConsumeStats() throws RemotingCommandException {
+         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, null);
+         request.addExtField("topic", "topicTest");
+         request.addExtField("consumerGroup", "GID-test");
+         RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+     }
+ 
+     @Test
+     public void testGetAllConsumerOffset() throws RemotingCommandException {
+         consumerOffsetManager = mock(ConsumerOffsetManager.class);
+         when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
+         ConsumerOffsetManager consumerOffset = new ConsumerOffsetManager();
+         when(consumerOffsetManager.encode()).thenReturn(JSON.toJSONString(consumerOffset, false));
+         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
+         RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+     }
+ 
+     @Test
+     public void testGetAllDelayOffset() throws Exception {
+         defaultMessageStore = mock(DefaultMessageStore.class);
+         scheduleMessageService = mock(ScheduleMessageService.class);
+         when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
+         when(defaultMessageStore.getScheduleMessageService()).thenReturn(scheduleMessageService);
+         when(scheduleMessageService.encode()).thenReturn("content");
+         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null);
+         RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+     }
+ 
++    @Test
 +    public void testGetTopicConfig() throws Exception {
 +        String topic = "foobar";
 +        brokerController.getTopicConfigManager().updateTopicConfig(new TopicConfig(topic));
 +
 +        {
 +            GetTopicConfigRequestHeader requestHeader = new GetTopicConfigRequestHeader();
 +            requestHeader.setTopic(topic);
 +            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_CONFIG, requestHeader);
 +            request.makeCustomHeaderToNet();
 +            RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
 +            assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
 +            assertThat(response.getBody()).isNotEmpty();
 +        }
 +        {
 +            GetTopicConfigRequestHeader requestHeader = new GetTopicConfigRequestHeader();
 +            requestHeader.setTopic("aaaaaaa");
 +            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_CONFIG, requestHeader);
 +            request.makeCustomHeaderToNet();
 +            RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
 +            assertThat(response.getCode()).isEqualTo(ResponseCode.TOPIC_NOT_EXIST);
 +            assertThat(response.getRemark()).contains("No topic in this broker.");
 +        }
 +    }
 +
 +
 +
 +
      private RemotingCommand buildCreateTopicRequest(String topic) {
          CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
          requestHeader.setTopic(topic);
diff --cc broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
index b4ae34d,1f81bdb..0a35851
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
@@@ -90,10 -88,10 +90,12 @@@ public class SendMessageProcessorTest 
          Channel mockChannel = mock(Channel.class);
          when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024));
          when(handlerContext.channel()).thenReturn(mockChannel);
-         when(messageStore.lookMessageByOffset(anyLong())).thenReturn(new MessageExt());
+         MessageExt messageExt = new MessageExt();
+         messageExt.setTopic(topic);
+         when(messageStore.lookMessageByOffset(anyLong())).thenReturn(messageExt);
          sendMessageProcessor = new SendMessageProcessor(brokerController);
 +
 +        brokerController.getTopicConfigManager().updateTopicConfig(new TopicConfig(topic, 8, 8, PermName.PERM_WRITE|PermName.PERM_READ));
      }
  
      @Test
diff --cc client/pom.xml
index 80a58e6,51e5319..ca071ce
--- a/client/pom.xml
+++ b/client/pom.xml
@@@ -59,19 -59,5 +54,9 @@@
              <version>0.33.0</version>
              <scope>test</scope>
          </dependency>
 +        <dependency>
-             <groupId>org.apache.logging.log4j</groupId>
-             <artifactId>log4j-core</artifactId>
-             <scope>test</scope>
-         </dependency>
-         <dependency>
-             <groupId>org.apache.logging.log4j</groupId>
-             <artifactId>log4j-slf4j-impl</artifactId>
-             <scope>test</scope>
-         </dependency>
-         <dependency>
 +            <groupId>com.google.guava</groupId>
 +            <artifactId>guava</artifactId>
 +        </dependency>
      </dependencies>
  </project>
diff --cc client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index fc0b29c,15b5bec..3740f9e
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@@ -228,11 -226,11 +228,11 @@@ public class RemoteBrokerOffsetStore im
  
      private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
          InterruptedException, MQClientException {
-         FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
 -        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
++        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
          if (null == findBrokerResult) {
  
              this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
-             findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
 -            findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false);
++            findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
          }
  
          if (findBrokerResult != null) {
diff --cc client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index 73e35c5,ba4eafa..9dfd10e
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@@ -21,8 -21,8 +21,9 @@@ import java.util.ArrayList
  import java.util.Collections;
  import java.util.LinkedList;
  import java.util.List;
 +import java.util.Map;
  import java.util.Set;
+ import java.util.Objects;
  import java.util.concurrent.CountDownLatch;
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.locks.ReadWriteLock;
diff --cc client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 0181951,0cc5e3e..376e7dc
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@@ -44,10 -64,11 +44,13 @@@ import org.apache.rocketmq.common.Servi
  import org.apache.rocketmq.common.UtilAll;
  import org.apache.rocketmq.common.constant.PermName;
  import org.apache.rocketmq.common.filter.ExpressionType;
+ import org.apache.rocketmq.common.protocol.NamespaceUtil;
+ import org.apache.rocketmq.common.topic.TopicValidator;
+ import org.apache.rocketmq.logging.InternalLogger;
  import org.apache.rocketmq.common.message.MessageExt;
  import org.apache.rocketmq.common.message.MessageQueue;
 +import org.apache.rocketmq.common.message.MessageQueueAssignment;
 +import org.apache.rocketmq.common.protocol.NamespaceUtil;
  import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
  import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
  import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
@@@ -662,16 -638,8 +665,16 @@@ public class MQClientInstance 
                                  this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                              }
  
 +                            // Update endpoint map
 +                            {
 +                                ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, topicRouteData);
 +                                if (mqEndPoints != null && !mqEndPoints.isEmpty()) {
 +                                    topicEndPointsTable.put(topic, mqEndPoints);
 +                                }
 +                            }
 +
                              // Update Pub info
-                             {
+                             if (!producerTable.isEmpty()) {
                                  TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                  publishInfo.setHaveTopicRouterInfo(true);
                                  Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
diff --cc client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
index 9b9b7e4,227ea44..af2e16e
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@@ -95,9 -96,9 +95,7 @@@ public class DefaultLitePullConsumerTes
      @Before
      public void init() throws Exception {
          ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
-         for (MQClientInstance instance : factoryTable.values()) {
-             instance.shutdown();
 -        for (Map.Entry<String, MQClientInstance> entry : factoryTable.entrySet()) {
 -            entry.getValue().shutdown();
--        }
++        factoryTable.forEach((s, instance) -> instance.shutdown());
          factoryTable.clear();
  
          Field field = MQClientInstance.class.getDeclaredField("rebalanceService");
diff --cc client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
index abdd6bf,bfc87f1..2482ac1
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
@@@ -57,10 -56,9 +58,10 @@@ import org.apache.rocketmq.common.messa
  import org.apache.rocketmq.common.message.MessageExt;
  import org.apache.rocketmq.common.message.MessageQueue;
  import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
 +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
  import org.apache.rocketmq.remoting.RPCHook;
  import org.apache.rocketmq.remoting.exception.RemotingException;
- import org.junit.After;
+ import org.junit.AfterClass;
  import org.junit.Assert;
  import org.junit.Before;
  import org.junit.Test;
@@@ -92,10 -88,9 +93,10 @@@ public class DefaultMQPushConsumerTest 
  
      @Mock
      private MQClientAPIImpl mQClientAPIImpl;
 +    private PullAPIWrapper pullAPIWrapper;
      private RebalanceImpl rebalanceImpl;
-     private DefaultMQPushConsumer pushConsumer;
 -    private RebalancePushImpl rebalancePushImpl;
+     private static DefaultMQPushConsumer pushConsumer;
 +    private AtomicLong queueOffset = new AtomicLong(1024);;
  
      @Before
      public void init() throws Exception {
@@@ -105,6 -100,26 +106,27 @@@
          }
          factoryTable.clear();
  
+         when(mQClientAPIImpl.pullMessage(anyString(), any(PullMessageRequestHeader.class),
+             anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
+             .thenAnswer(new Answer<PullResult>() {
+                 @Override
+                 public PullResult answer(InvocationOnMock mock) throws Throwable {
+                     PullMessageRequestHeader requestHeader = mock.getArgument(1);
+                     MessageClientExt messageClientExt = new MessageClientExt();
+                     messageClientExt.setTopic(topic);
+                     messageClientExt.setQueueId(0);
+                     messageClientExt.setMsgId("123");
+                     messageClientExt.setBody(new byte[] {'a'});
+                     messageClientExt.setOffsetMsgId("234");
+                     messageClientExt.setBornHost(new InetSocketAddress(8080));
+                     messageClientExt.setStoreHost(new InetSocketAddress(8080));
+                     PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
+                     ((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
+                     return pullResult;
+                 }
+             });
+ 
++
          consumerGroup = "FooBarGroup" + System.currentTimeMillis();
          pushConsumer = new DefaultMQPushConsumer(consumerGroup);
          pushConsumer.setNamesrvAddr("127.0.0.1:9876");
@@@ -145,40 -154,12 +167,40 @@@
          messageQueueSet.add(createPullRequest().getMessageQueue());
          pushConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet);
  
 +        pushConsumerImpl.setmQClientFactory(mQClientFactory);
 +
 +        pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false));
 +        FieldUtils.writeDeclaredField(pushConsumerImpl, "pullAPIWrapper", pullAPIWrapper, true);
 +
 +        when(mQClientAPIImpl.pullMessage(anyString(), any(PullMessageRequestHeader.class),
 +            anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
 +            .thenAnswer(new Answer<PullResult>() {
 +                @Override
 +                public PullResult answer(InvocationOnMock mock) throws Throwable {
 +                    PullMessageRequestHeader requestHeader = mock.getArgument(1);
 +                    MessageClientExt messageClientExt = new MessageClientExt();
 +                    messageClientExt.setTopic(topic);
 +                    messageClientExt.setQueueId(0);
 +                    messageClientExt.setQueueOffset(queueOffset.getAndIncrement());
 +                    messageClientExt.setMsgId("1024");
 +                    messageClientExt.setBody(msgBody);
 +                    messageClientExt.setOffsetMsgId("234");
 +                    messageClientExt.setBornHost(new InetSocketAddress(8080));
 +                    messageClientExt.setStoreHost(new InetSocketAddress(8080));
 +                    PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
 +                    ((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
 +                    return pullResult;
 +                }
 +            });
 +
          pushConsumer.subscribe(topic, "*");
          pushConsumer.start();
 +
 +        mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
      }
  
-     @After
-     public void terminate() {
+     @AfterClass
+     public static void terminate() {
          pushConsumer.shutdown();
      }
  
diff --cc client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
index 73cfefb,ec7a4cf..f4aead1
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
@@@ -24,8 -23,8 +24,9 @@@ import org.apache.rocketmq.client.excep
  import org.apache.rocketmq.client.impl.FindBrokerResult;
  import org.apache.rocketmq.client.impl.MQClientAPIImpl;
  import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+ import org.apache.rocketmq.common.MixAll;
  import org.apache.rocketmq.common.message.MessageQueue;
 +import org.apache.rocketmq.common.protocol.ResponseCode;
  import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
  import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
  import org.apache.rocketmq.remoting.exception.RemotingException;
@@@ -60,9 -59,8 +61,9 @@@ public class RemoteBrokerOffsetStoreTes
          System.setProperty("rocketmq.client.localOffsetStoreDir", System.getProperty("java.io.tmpdir") + ".rocketmq_offsets");
          String clientId = new ClientConfig().buildMQClientId() + "#TestNamespace" + System.currentTimeMillis();
          when(mQClientFactory.getClientId()).thenReturn(clientId);
-         when(mQClientFactory.findBrokerAddressInAdmin(brokerName)).thenReturn(new FindBrokerResult("127.0.0.1", false));
+         when(mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, false)).thenReturn(new FindBrokerResult("127.0.0.1", false));
          when(mQClientFactory.getMQClientAPIImpl()).thenReturn(mqClientAPI);
 +        when(mQClientFactory.getBrokerNameFromMessageQueue(any())).thenReturn(brokerName);
      }
  
      @Test
diff --cc client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
index 17b48eb,a60d88e..f0d6b42
--- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
@@@ -22,29 -23,30 +23,31 @@@ import java.util.Set
  import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
  import org.apache.rocketmq.client.consumer.store.OffsetStore;
+ import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
 +import org.apache.rocketmq.client.exception.MQBrokerException;
  import org.apache.rocketmq.client.exception.MQClientException;
+ import org.apache.rocketmq.client.impl.MQAdminImpl;
  import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+ import org.apache.rocketmq.common.MixAll;
+ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  import org.apache.rocketmq.common.message.MessageQueue;
- import org.apache.rocketmq.common.message.MessageQueueAssignment;
- import org.apache.rocketmq.common.message.MessageRequestMode;
  import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
  import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 +import org.apache.rocketmq.remoting.exception.RemotingException;
 +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
- import org.junit.Before;
  import org.junit.Test;
  import org.junit.runner.RunWith;
  import org.mockito.Mock;
  import org.mockito.Spy;
 -import org.mockito.invocation.InvocationOnMock;
  import org.mockito.junit.MockitoJUnitRunner;
 -import org.mockito.stubbing.Answer;
  
  import static org.assertj.core.api.Assertions.assertThat;
+ import static org.junit.Assert.assertEquals;
  import static org.mockito.ArgumentMatchers.any;
 +import static org.mockito.ArgumentMatchers.anyInt;
  import static org.mockito.ArgumentMatchers.anyLong;
  import static org.mockito.ArgumentMatchers.anyString;
- import static org.mockito.Mockito.doNothing;
 -import static org.mockito.Mockito.doAnswer;
+ import static org.mockito.Mockito.mock;
  import static org.mockito.Mockito.when;
  
  @RunWith(MockitoJUnitRunner.class)
diff --cc common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 1b74eaa,401b457..7ce0456
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@@ -61,8 -60,8 +61,9 @@@ public class BrokerConfig 
       * thread numbers for send message thread pool.
       */
      private int sendMessageThreadPoolNums = Math.min(Runtime.getRuntime().availableProcessors(), 4);
+     private int putMessageFutureThreadPoolNums = Math.min(Runtime.getRuntime().availableProcessors(), 4);
      private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
 +    private int ackMessageThreadPoolNums = 3;
      private int processReplyMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
      private int queryMessageThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors();
  
@@@ -87,8 -85,8 +88,9 @@@
      @ImportantField
      private boolean fetchNamesrvAddrByAddressServer = false;
      private int sendThreadPoolQueueCapacity = 10000;
+     private int putThreadPoolQueueCapacity = 10000;
      private int pullThreadPoolQueueCapacity = 100000;
 +    private int ackThreadPoolQueueCapacity = 100000;
      private int replyThreadPoolQueueCapacity = 10000;
      private int queryThreadPoolQueueCapacity = 20000;
      private int clientManagerThreadPoolQueueCapacity = 1000000;
@@@ -962,59 -838,11 +993,67 @@@
          this.autoDeleteUnusedStats = autoDeleteUnusedStats;
      }
  
 +    public long getLoadBalancePollNameServerInterval() {
 +        return loadBalancePollNameServerInterval;
 +    }
 +
 +    public void setLoadBalancePollNameServerInterval(long loadBalancePollNameServerInterval) {
 +        this.loadBalancePollNameServerInterval = loadBalancePollNameServerInterval;
 +    }
 +
 +    public int getCleanOfflineBrokerInterval() {
 +        return cleanOfflineBrokerInterval;
 +    }
 +
 +    public void setCleanOfflineBrokerInterval(int cleanOfflineBrokerInterval) {
 +        this.cleanOfflineBrokerInterval = cleanOfflineBrokerInterval;
 +    }
 +
 +    public int getLoadBalanceProcessorThreadPoolNums() {
 +        return loadBalanceProcessorThreadPoolNums;
 +    }
 +
 +    public void setLoadBalanceProcessorThreadPoolNums(int loadBalanceProcessorThreadPoolNums) {
 +        this.loadBalanceProcessorThreadPoolNums = loadBalanceProcessorThreadPoolNums;
 +    }
 +
 +    public boolean isServerLoadBalancerEnabled() {
 +        return serverLoadBalancerEnabled;
 +    }
 +
 +    public void setServerLoadBalancerEnabled(boolean serverLoadBalancerEnabled) {
 +        this.serverLoadBalancerEnabled = serverLoadBalancerEnabled;
 +    }
 +
 +    public MessageRequestMode getDefaultMessageRequestMode() {
 +        return defaultMessageRequestMode;
 +    }
 +
 +    public void setDefaultMessageRequestMode(String defaultMessageRequestMode) {
 +        this.defaultMessageRequestMode = MessageRequestMode.valueOf(defaultMessageRequestMode);
 +    }
 +
 +    public int getDefaultPopShareQueueNum() {
 +        return defaultPopShareQueueNum;
 +    }
 +
 +    public void setDefaultPopShareQueueNum(int defaultPopShareQueueNum) {
 +        this.defaultPopShareQueueNum = defaultPopShareQueueNum;
 +    }
 +
 +    public long getForwardTimeout() {
 +        return forwardTimeout;
 +    }
 +
 +    public void setForwardTimeout(long timeout) {
 +        this.forwardTimeout = timeout;
 +    }
++
+     public boolean isIsolateLogEnable() {
+         return isolateLogEnable;
+     }
+ 
+     public void setIsolateLogEnable(boolean isolateLogEnable) {
+         this.isolateLogEnable = isolateLogEnable;
+     }
  }
diff --cc common/src/main/java/org/apache/rocketmq/common/MixAll.java
index cc12077,c2300d3..a15294f
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@@ -84,10 -83,9 +84,12 @@@ public class MixAll 
      public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS";
      public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml";
      public static final String REPLY_MESSAGE_FLAG = "reply";
+     public static final String LMQ_PREFIX = "%LMQ%";
+     public static final String MULTI_DISPATCH_QUEUE_SPLITTER = ",";
      private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
 +    public static final String LOGICAL_QUEUE_MOCK_BROKER_PREFIX = "__syslo__";
 +    public static final String METADATA_SCOPE_GLOBAL = "__global__";
 +    public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST = "__syslo__none__";
  
      public static String getWSAddr() {
          String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
@@@ -448,11 -446,7 +450,14 @@@
          return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre);
      }
  
 +    public static int compareInteger(int x, int y) {
 +        return (x < y) ? -1 : ((x == y) ? 0 : 1);
 +    }
 +
 +    public static int compareLong(long x, long y) {
 +        return (x < y) ? -1 : ((x == y) ? 0 : 1);
 +    }
+     public static boolean isLmq(String lmqMetaData) {
+         return lmqMetaData != null && lmqMetaData.startsWith(LMQ_PREFIX);
+     }
  }
diff --cc common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
index b5c14e9,81b7823..105ef5b
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
@@@ -54,10 -52,8 +54,12 @@@ public class MessageConst 
      public static final String PROPERTY_PUSH_REPLY_TIME = "PUSH_REPLY_TIME";
      public static final String PROPERTY_CLUSTER = "CLUSTER";
      public static final String PROPERTY_MESSAGE_TYPE = "MSG_TYPE";
 +    public static final String PROPERTY_POP_CK = "POP_CK";
 +    public static final String PROPERTY_FIRST_POP_TIME = "1ST_POP_TIME";
 +    public static final String PROPERTY_FORWARD_QUEUE_ID = "PROPERTY_FORWARD_QUEUE_ID";
 +    public static final String PROPERTY_REDIRECT = "REDIRECT";
+     public static final String PROPERTY_INNER_MULTI_DISPATCH = "INNER_MULTI_DISPATCH";
+     public static final String PROPERTY_INNER_MULTI_QUEUE_OFFSET = "INNER_MULTI_QUEUE_OFFSET";
  
      public static final String KEY_SEPARATOR = " ";
  
diff --cc common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
index 10d6f4d,0c3df0d..0543add
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
@@@ -268,30 -266,8 +268,30 @@@ public class ConsumerRunningInfo extend
          }
  
          {
 +            sb.append("\n\n#Consumer Pop Detail#\n");
 +            sb.append(String.format("%-32s  %-32s  %-4s  %-20s%n",
 +                "#Topic",
 +                "#Broker Name",
 +                "#QID",
 +                "#ProcessQueueInfo"
 +            ));
 +
 +            Iterator<Entry<MessageQueue, PopProcessQueueInfo>> it = this.mqPopTable.entrySet().iterator();
 +            while (it.hasNext()) {
 +                Entry<MessageQueue, PopProcessQueueInfo> next = it.next();
 +                String item = String.format("%-32s  %-32s  %-4d  %s%n",
 +                    next.getKey().getTopic(),
 +                    next.getKey().getBrokerName(),
 +                    next.getKey().getQueueId(),
 +                    next.getValue().toString());
 +
 +                sb.append(item);
 +            }
 +        }
 +
 +        {
              sb.append("\n\n#Consumer RT&TPS#\n");
-             sb.append(String.format("%-32s  %14s %14s %14s %14s %18s %25s%n",
+             sb.append(String.format("%-64s  %14s %14s %14s %14s %18s %25s%n",
                  "#Topic",
                  "#Pull RT",
                  "#Pull TPS",
diff --cc namesrv/pom.xml
index b1d3936,7ecbcd3..1da3fa2
--- a/namesrv/pom.xml
+++ b/namesrv/pom.xml
@@@ -49,9 -49,8 +49,13 @@@
              <artifactId>slf4j-api</artifactId>
          </dependency>
          <dependency>
 +            <groupId>com.google.guava</groupId>
 +            <artifactId>guava</artifactId>
 +            <scope>test</scope>
 +        </dependency>
++        <dependency>
+             <groupId>org.bouncycastle</groupId>
+             <artifactId>bcpkix-jdk15on</artifactId>
+         </dependency>
      </dependencies>
  </project>
diff --cc pom.xml
index 6541d8c,ec05130..a5ae2ae
--- a/pom.xml
+++ b/pom.xml
@@@ -562,14 -568,9 +568,9 @@@
              <dependency>
                  <groupId>com.google.guava</groupId>
                  <artifactId>guava</artifactId>
 -                <version>31.0.1-jre</version>
 +                <version>19.0</version>
              </dependency>
              <dependency>
-                 <groupId>com.googlecode.concurrentlinkedhashmap</groupId>
-                 <artifactId>concurrentlinkedhashmap-lru</artifactId>
-                 <version>1.4.2</version>
-             </dependency>
-             <dependency>
                  <groupId>io.openmessaging</groupId>
                  <artifactId>openmessaging-api</artifactId>
                  <version>0.3.1-alpha</version>
diff --cc remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
index c8a0e9e,39bbb0d..0894ea6
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
@@@ -172,18 -191,9 +190,10 @@@ public class RemotingHelper 
  
      public static String parseSocketAddressAddr(SocketAddress socketAddress) {
          if (socketAddress != null) {
 +            // Default toString of InetSocketAddress is "hostName/IP:port"
              final String addr = socketAddress.toString();
-             if (addr.length() > 0) {
-                 if (addr.contains("/")) {
-                     String[] segments = addr.split("/");
-                     if (segments.length > 1) {
-                         return segments[1];
-                     }
-                 }
-                 return addr.substring(1);
-             }
-             return addr;
+             int index = addr.lastIndexOf("/");
+             return (index != -1) ? addr.substring(index + 1) : addr;
          }
          return "";
      }
diff --cc remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index 03b1640,d43fe8d..5a1ea3f
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@@ -28,12 -21,15 +28,18 @@@ import java.lang.annotation.Annotation
  import java.lang.reflect.Field;
  import java.lang.reflect.Modifier;
  import java.nio.ByteBuffer;
 +import java.util.Arrays;
  import java.util.HashMap;
 +import java.util.HashSet;
  import java.util.Map;
 +import java.util.Set;
  import java.util.concurrent.atomic.AtomicInteger;
+ import org.apache.rocketmq.logging.InternalLogger;
+ import org.apache.rocketmq.logging.InternalLoggerFactory;
+ import org.apache.rocketmq.remoting.CommandCustomHeader;
+ import org.apache.rocketmq.remoting.annotation.CFNotNull;
+ import org.apache.rocketmq.remoting.common.RemotingHelper;
+ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
  
  public class RemotingCommand {
      public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type";
diff --cc store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 2ae84eb,bf66af5..01857cc
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@@ -16,27 -16,7 +16,27 @@@
   */
  package org.apache.rocketmq.store;
  
 +import java.net.Inet4Address;
 +import java.net.Inet6Address;
 +import java.net.InetAddress;
 +import java.net.InetSocketAddress;
 +import java.net.SocketAddress;
 +import java.nio.ByteBuffer;
- import java.util.ArrayList;
 +import java.util.Collections;
++import java.util.HashMap;
 +import java.util.LinkedList;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Optional;
 +import java.util.Set;
 +import java.util.concurrent.CompletableFuture;
++import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.TimeUnit;
- import java.util.concurrent.TimeoutException;
 +import java.util.function.Supplier;
 +
  import org.apache.rocketmq.common.ServiceThread;
 +import org.apache.rocketmq.common.TopicConfig;
  import org.apache.rocketmq.common.UtilAll;
  import org.apache.rocketmq.common.constant.LoggerName;
  import org.apache.rocketmq.common.message.MessageAccessor;
@@@ -79,40 -76,45 +80,47 @@@ public class CommitLog implements Swapp
  
      protected final PutMessageLock putMessageLock;
  
 +    protected final TopicQueueLock topicQueueLock;
 +
      private volatile Set<String> fullStorePaths = Collections.emptySet();
  
+     private final MultiDispatch multiDispatch;
+     private final FlushDiskWatcher flushDiskWatcher;
+ 
 +    protected int commitLogSize;
 +
-     public CommitLog(final MessageStore messageStore) {
-         String storePath = messageStore.getMessageStoreConfig().getStorePathCommitLog();
+     public CommitLog(final DefaultMessageStore defaultMessageStore) {
+         String storePath = defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog();
          if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) {
-             this.mappedFileQueue = new MultiPathMappedFileQueue(messageStore.getMessageStoreConfig(),
-                     messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
-                     messageStore.getAllocateMappedFileService(), this::getFullStorePaths);
+             this.mappedFileQueue = new MultiPathMappedFileQueue(defaultMessageStore.getMessageStoreConfig(),
 -                    defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
 -                    defaultMessageStore.getAllocateMappedFileService(), this::getFullStorePaths);
++                defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
++                defaultMessageStore.getAllocateMappedFileService(), this::getFullStorePaths);
          } else {
              this.mappedFileQueue = new MappedFileQueue(storePath,
-                     messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
-                     messageStore.getAllocateMappedFileService());
 -                    defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
 -                    defaultMessageStore.getAllocateMappedFileService());
++                defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
++                defaultMessageStore.getAllocateMappedFileService());
          }
  
-         this.defaultMessageStore = messageStore;
+         this.defaultMessageStore = defaultMessageStore;
  
 -        if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
 -            this.flushCommitLogService = new GroupCommitService();
 -        } else {
 -            this.flushCommitLogService = new FlushRealTimeService();
 -        }
 -
 -        this.commitLogService = new CommitRealTimeService();
 +        this.flushManager = new DefaultFlushManager();
  
-         this.appendMessageCallback = new DefaultAppendMessageCallback(messageStore.getMessageStoreConfig().getMaxMessageSize());
+         this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
          putMessageThreadLocal = new ThreadLocal<PutMessageThreadLocal>() {
              @Override
              protected PutMessageThreadLocal initialValue() {
                  return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
              }
          };
-         this.putMessageLock = messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
+         this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
+ 
+         this.multiDispatch = new MultiDispatch(defaultMessageStore, this);
+ 
+         flushDiskWatcher = new FlushDiskWatcher();
 +
 +        this.topicQueueLock = new TopicQueueLock();
 +
-         this.commitLogSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
++        this.commitLogSize = defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
      }
  
      public void setFullStorePaths(Set<String> fullStorePaths) {
@@@ -123,21 -125,36 +131,28 @@@
          return fullStorePaths;
      }
  
+     public ThreadLocal<PutMessageThreadLocal> getPutMessageThreadLocal() {
+         return putMessageThreadLocal;
+     }
+ 
      public boolean load() {
          boolean result = this.mappedFileQueue.load();
 +        this.mappedFileQueue.checkSelf();
          log.info("load commit log " + (result ? "OK" : "Failed"));
          return result;
      }
  
      public void start() {
 -        this.flushCommitLogService.start();
 -
 +        this.flushManager.start();
 +        log.info("start commitLog successfully. storeRoot: {}", this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
+         flushDiskWatcher.setDaemon(true);
+         flushDiskWatcher.start();
 -
 -
 -        if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
 -            this.commitLogService.start();
 -        }
      }
  
      public void shutdown() {
 -        if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
 -            this.commitLogService.shutdown();
 -        }
 -
 -        this.flushCommitLogService.shutdown();
 -
 +        this.flushManager.shutdown();
 +        log.info("shutdown commitLog successfully. storeRoot: {}", this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
+         flushDiskWatcher.shutdown(true);
      }
  
      public long flush() {
@@@ -407,26 -410,21 +422,26 @@@
                  return new DispatchRequest(totalSize, false/* success */);
              }
  
 -            return new DispatchRequest(
 +            DispatchRequest dispatchRequest = new DispatchRequest(
-                     topic,
-                     queueId,
-                     physicOffset,
-                     totalSize,
-                     tagsCode,
-                     storeTimestamp,
-                     queueOffset,
-                     keys,
-                     uniqKey,
-                     sysFlag,
-                     preparedTransactionOffset,
-                     propertiesMap
+                 topic,
+                 queueId,
+                 physicOffset,
+                 totalSize,
+                 tagsCode,
+                 storeTimestamp,
+                 queueOffset,
+                 keys,
+                 uniqKey,
+                 sysFlag,
+                 preparedTransactionOffset,
+                 propertiesMap
              );
 +
 +            setBatchSizeIfNeeded(propertiesMap, dispatchRequest);
 +
 +            return dispatchRequest;
          } catch (Exception e) {
 +            log.error("CheckMessageAndReturnSizeOld", e);
          }
  
          return new DispatchRequest(-1, false /* success */);
@@@ -632,11 -616,10 +647,10 @@@
          StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
  
          String topic = msg.getTopic();
-         int queueId = msg.getQueueId();
- 
+ //        int queueId msg.getQueueId();
          final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
          if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
--                || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
++            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
              // Delay Delivery
              if (msg.getDelayTimeLevel() > 0) {
                  if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
@@@ -671,76 -660,52 +685,77 @@@
          long elapsedTimeInLock = 0;
          MappedFile unlockMappedFile = null;
  
 -        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
 +        topicQueueLock.lock(topicQueueKey);
          try {
 -            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
 -            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
 -            this.beginTimeInLock = beginLockTimestamp;
 +            defaultMessageStore.assignOffset(topicQueueKey, msg, getMessageNum(msg));
  
 -            // Here settings are stored timestamp, in order to ensure an orderly
 -            // global
 -            msg.setStoreTimestamp(beginLockTimestamp);
 -
 -            if (null == mappedFile || mappedFile.isFull()) {
 -                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
 -            }
 -            if (null == mappedFile) {
 -                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
 -                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
 +            PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
 +            if (encodeResult != null) {
 +                return CompletableFuture.completedFuture(encodeResult);
              }
 +            msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);
 +            PutMessageContext putMessageContext = new PutMessageContext(topicQueueKey);
  
 -            result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
 -            switch (result.getStatus()) {
 -                case PUT_OK:
 -                    break;
 -                case END_OF_FILE:
 -                    unlockMappedFile = mappedFile;
 -                    // Create a new file, re-write the message
 -                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
 -                    if (null == mappedFile) {
 -                        // XXX: warn and notify me
 -                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
 -                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
 -                    }
 -                    result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
 -                    break;
 -                case MESSAGE_SIZE_EXCEEDED:
 -                case PROPERTIES_SIZE_EXCEEDED:
 -                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
 -                case UNKNOWN_ERROR:
 -                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
 -                default:
 -                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
 -            }
 +            putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
 +            try {
 +                MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
 +                long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
 +                this.beginTimeInLock = beginLockTimestamp;
 +
 +                // Here settings are stored timestamp, in order to ensure an orderly
 +                // global
 +                msg.setStoreTimestamp(beginLockTimestamp);
 +
 +                if (null == mappedFile || mappedFile.isFull()) {
 +                    mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
 +                }
 +                if (null == mappedFile) {
 +                    log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
 +                    beginTimeInLock = 0;
 +                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
 +                }
 +
 +                result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
 +                switch (result.getStatus()) {
 +                    case PUT_OK:
 +                        onCommitLogAppend(msg, result, mappedFile);
 +                        break;
 +                    case END_OF_FILE:
 +                        onCommitLogAppend(msg, result, mappedFile);
 +                        unlockMappedFile = mappedFile;
 +                        // Create a new file, re-write the message
 +                        mappedFile = this.mappedFileQueue.getLastMappedFile(0);
 +                        if (null == mappedFile) {
 +                            // XXX: warn and notify me
 +                            log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
 +                            beginTimeInLock = 0;
 +                            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
 +                        }
 +                        result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
 +                        if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) {
 +                            onCommitLogAppend(msg, result, mappedFile);
 +                        }
 +                        break;
 +                    case MESSAGE_SIZE_EXCEEDED:
 +                    case PROPERTIES_SIZE_EXCEEDED:
 +                        beginTimeInLock = 0;
 +                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
 +                    case UNKNOWN_ERROR:
 +                        beginTimeInLock = 0;
 +                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
 +                    default:
 +                        beginTimeInLock = 0;
 +                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
 +                }
  
 -            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
 +                elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
 +                beginTimeInLock = 0;
 +            } finally {
++                beginTimeInLock = 0;
 +                putMessageLock.unlock();
 +            }
          } finally {
 -            beginTimeInLock = 0;
 -            putMessageLock.unlock();
 +            topicQueueLock.unlock(topicQueueKey);
          }
  
          if (elapsedTimeInLock > 500) {
@@@ -902,58 -846,44 +909,58 @@@
      }
  
      public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
 -        // Synchronization flush
 -        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
 -            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
 +        return this.flushManager.handleDiskFlush(result, messageExt);
 +    }
 +
 +    public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, MessageExt messageExt) {
 +        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
 +            HAService service = this.defaultMessageStore.getHaService();
              if (messageExt.isWaitStoreMsgOK()) {
 -                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
 +                if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) {
 +                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
-                             this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
+                         this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
 -                flushDiskWatcher.add(request);
 -                service.putRequest(request);
 -                return request.future();
 -            } else {
 -                service.wakeup();
 -                return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
 -            }
 -        }
 -        // Asynchronous flush
 -        else {
 -            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
 -                flushCommitLogService.wakeup();
 -            } else  {
 -                commitLogService.wakeup();
++                    flushDiskWatcher.add(request);
 +                    service.putRequest(request);
 +                    service.getWaitNotifyObject().wakeupAll();
 +                    return request.future();
-                 }
-                 else {
++                } else {
 +                    return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);
 +                }
              }
 -            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
          }
 +        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
      }
  
- 
 -    public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, MessageExt messageExt) {
 +    public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
 +        this.flushManager.handleDiskFlush(result, putMessageResult, messageExt);
 +    }
 +
 +    public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
          if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
              HAService service = this.defaultMessageStore.getHaService();
              if (messageExt.isWaitStoreMsgOK()) {
 -                if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) {
 +                // Determine whether to wait
 +                if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
-                     GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
+                     GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
 -                            this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout());
++                        this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout());
                      service.putRequest(request);
                      service.getWaitNotifyObject().wakeupAll();
 -                    return request.future();
 +                    PutMessageStatus replicaStatus = null;
 +                    try {
 +                        replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
-                                 TimeUnit.MILLISECONDS);
++                            TimeUnit.MILLISECONDS);
 +                    } catch (InterruptedException | ExecutionException | TimeoutException e) {
 +                    }
 +                    if (replicaStatus != PutMessageStatus.PUT_OK) {
 +                        log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
 +                            + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
 +                        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
 +                    }
                  }
 +                // Slave problem
                  else {
 -                    return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);
 +                    // Tell the producer, slave not available
 +                    putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                  }
              }
          }
@@@ -1050,26 -998,10 +1057,30 @@@
          return diff;
      }
  
 +    protected short getMessageNum(MessageExtBrokerInner msgInner) {
 +        short messageNum = 1;
 +        // IF inner batch, build batchQueueOffset and batchNum property.
 +        CQType cqType = getCqType(msgInner);
 +
 +        if (MessageSysFlag.check(msgInner.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG) || CQType.BatchCQ.equals(cqType)) {
 +            if (msgInner.getProperty(MessageConst.PROPERTY_INNER_NUM) != null) {
 +                messageNum = Short.parseShort(msgInner.getProperty(MessageConst.PROPERTY_INNER_NUM));
 +                messageNum = messageNum >= 1 ? messageNum : 1;
 +            }
 +        }
 +
 +        return messageNum;
 +    }
 +
 +    private CQType getCqType(MessageExtBrokerInner msgInner) {
 +        Optional<TopicConfig> topicConfig = this.defaultMessageStore.getTopicConfig(msgInner.getTopic());
 +        return QueueTypeUtils.getCQType(topicConfig);
 +    }
 +
+     public Map<String, Long> getLmqTopicQueueTable() {
+         return this.lmqTopicQueueTable;
+     }
+ 
      abstract class FlushCommitLogService extends ServiceThread {
          protected static final int RETRY_TIMES_OVER = 10;
      }
@@@ -1105,9 -1037,10 +1116,9 @@@
                      long end = System.currentTimeMillis();
                      if (!result) {
                          this.lastCommitTimestamp = end; // result = false means some data committed.
 -                        //now wake up flush thread.
 -                        flushCommitLogService.wakeup();
 +                        CommitLog.this.flushManager.wakeUpFlush();
                      }
-                     CommitLog.this.getMessageStore().getPerfCounter().flowOnce("COMMIT_DATA_TIME_MS", (int)(end - begin));
 -
++                    CommitLog.this.getMessageStore().getPerfCounter().flowOnce("COMMIT_DATA_TIME_MS", (int) (end - begin));
                      if (end - begin > 500) {
                          log.info("Commit data to file costs {} ms", end - begin);
                      }
@@@ -1170,7 -1103,6 +1181,7 @@@
                          CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                      }
                      long past = System.currentTimeMillis() - begin;
-                     CommitLog.this.getMessageStore().getPerfCounter().flowOnce("FLUSH_DATA_TIME_MS", (int)past);
++                    CommitLog.this.getMessageStore().getPerfCounter().flowOnce("FLUSH_DATA_TIME_MS", (int) past);
                      if (past > 500) {
                          log.info("Flush data to disk costs {} ms", past);
                      }
@@@ -1216,14 -1147,13 +1226,12 @@@
  
          public GroupCommitRequest(long nextOffset, long timeoutMillis) {
              this.nextOffset = nextOffset;
-             this.timeoutMillis = timeoutMillis;
+             this.deadLine = System.nanoTime() + (timeoutMillis * 1_000_000);
          }
  
-         public GroupCommitRequest(long nextOffset) {
-             this.nextOffset = nextOffset;
+         public long getDeadLine() {
+             return deadLine;
          }
- 
--
          public long getNextOffset() {
              return nextOffset;
          }
@@@ -1339,114 -1269,6 +1347,114 @@@
          }
      }
  
 +    class GroupCheckService extends FlushCommitLogService {
 +        private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
 +        private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
 +
 +        public boolean isAynscRequestsFull() {
 +            return requestsWrite.size() > CommitLog.this.defaultMessageStore.getMessageStoreConfig().getMaxAsyncPutMessageRequests() * 2;
 +        }
 +
 +        public synchronized boolean putRequest(final GroupCommitRequest request) {
 +            synchronized (this.requestsWrite) {
 +                this.requestsWrite.add(request);
 +            }
 +            if (hasNotified.compareAndSet(false, true)) {
 +                waitPoint.countDown(); // notify
 +            }
 +            boolean flag = this.requestsWrite.size() >
-                     CommitLog.this.defaultMessageStore.getMessageStoreConfig().getMaxAsyncPutMessageRequests();
++                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getMaxAsyncPutMessageRequests();
 +            if (flag) {
 +                log.info("Async requests {} exceeded the threshold {}", requestsWrite.size(),
-                         CommitLog.this.defaultMessageStore.getMessageStoreConfig().getMaxAsyncPutMessageRequests());
++                    CommitLog.this.defaultMessageStore.getMessageStoreConfig().getMaxAsyncPutMessageRequests());
 +            }
 +
 +            return flag;
 +        }
 +
 +        private void swapRequests() {
 +            List<GroupCommitRequest> tmp = this.requestsWrite;
 +            this.requestsWrite = this.requestsRead;
 +            this.requestsRead = tmp;
 +        }
 +
 +        private void doCommit() {
 +            synchronized (this.requestsRead) {
 +                if (!this.requestsRead.isEmpty()) {
 +                    for (GroupCommitRequest req : this.requestsRead) {
 +                        // There may be a message in the next file, so a maximum of
 +                        // two times the flush
 +                        boolean flushOK = false;
 +                        for (int i = 0; i < 1000; i++) {
 +                            flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
 +                            if (flushOK) {
 +                                break;
 +                            } else {
 +                                try {
 +                                    Thread.sleep(1);
 +                                } catch (Throwable ignored) {
 +
 +                                }
 +                            }
 +                        }
 +                        req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
 +                    }
 +
 +                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
 +                    if (storeTimestamp > 0) {
 +                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
 +                    }
 +
 +                    this.requestsRead.clear();
 +                }
 +            }
 +        }
 +
 +        public void run() {
 +            CommitLog.log.info(this.getServiceName() + " service started");
 +
 +            while (!this.isStopped()) {
 +                try {
 +                    this.waitForRunning(1);
 +                    this.doCommit();
 +                } catch (Exception e) {
 +                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
 +                }
 +            }
 +
 +            // Under normal circumstances shutdown, wait for the arrival of the
 +            // request, and then flush
 +            try {
 +                Thread.sleep(10);
 +            } catch (InterruptedException e) {
 +                CommitLog.log.warn("GroupCommitService Exception, ", e);
 +            }
 +
 +            synchronized (this) {
 +                this.swapRequests();
 +            }
 +
 +            this.doCommit();
 +
 +            CommitLog.log.info(this.getServiceName() + " service end");
 +        }
 +
 +        @Override
 +        protected void onWaitEnd() {
 +            this.swapRequests();
 +        }
 +
 +        @Override
 +        public String getServiceName() {
 +            return CommitLog.GroupCheckService.class.getSimpleName();
 +        }
 +
 +        @Override
 +        public long getJointime() {
 +            return 1000 * 60 * 5;
 +        }
 +    }
 +
      class DefaultAppendMessageCallback implements AppendMessageCallback {
          // File at the end of the minimum fixed length empty
          private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
@@@ -1482,11 -1304,18 +1490,16 @@@
              };
  
              // Record ConsumeQueue information
 -            String key = putMessageContext.getTopicQueueTableKey();
 -            Long queueOffset = CommitLog.this.topicQueueTable.get(key);
 -            if (null == queueOffset) {
 -                queueOffset = 0L;
 -                CommitLog.this.topicQueueTable.put(key, queueOffset);
 -            }
 +            Long queueOffset = msgInner.getQueueOffset();
 +
 +            // this msg maybe a inner-batch msg.
 +            short messageNum = getMessageNum(msgInner);
  
+             boolean multiDispatchWrapResult = CommitLog.this.multiDispatch.wrapMultiDispatch(msgInner);
+             if (!multiDispatchWrapResult) {
+                 return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
+             }
+ 
              // Transaction messages that require special handling
              final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
              switch (tranType) {
@@@ -1517,9 -1346,9 +1530,9 @@@
                  final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
                  byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
                  return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,
--                        maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */
--                        msgIdSupplier, msgInner.getStoreTimestamp(),
--                        queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
++                    maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */
++                    msgIdSupplier, msgInner.getStoreTimestamp(),
++                    queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
              }
  
              int pos = 4 + 4 + 4 + 4 + 4;
@@@ -1534,15 -1363,28 +1547,29 @@@
              // refresh store time stamp in lock
              preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());
  
--
              final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
 +            CommitLog.this.getMessageStore().getPerfCounter().startTick("WRITE_MEMORY_TIME_MS");
              // Write messages to the queue buffer
              byteBuffer.put(preEncodeBuffer);
 +            CommitLog.this.getMessageStore().getPerfCounter().endTick("WRITE_MEMORY_TIME_MS");
              msgInner.setEncodedBuff(null);
-             return new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
-                 msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills, messageNum);
+             AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
+                 msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
+ 
+             switch (tranType) {
+                 case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
+                 case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
+                     break;
+                 case MessageSysFlag.TRANSACTION_NOT_TYPE:
+                 case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
+                     // The next update ConsumeQueue information
+                     CommitLog.this.topicQueueTable.put(key, ++queueOffset);
+                     CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner);
+                     break;
+                 default:
+                     break;
+             }
+             return result;
          }
  
          public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
@@@ -1673,7 -1521,7 +1700,7 @@@
               * Serialize message
               */
              final byte[] propertiesData =
--                    msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
++                msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
  
              final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
  
@@@ -1694,7 -1540,7 +1721,7 @@@
              // Exceeds the maximum message
              if (msgLen > this.maxMessageSize) {
                  CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
--                        + ", maxMessageSize: " + this.maxMessageSize);
++                    + ", maxMessageSize: " + this.maxMessageSize);
                  return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
              }
  
@@@ -1719,11 -1565,11 +1746,11 @@@
              // 9 BORNTIMESTAMP
              this.encoderBuffer.putLong(msgInner.getBornTimestamp());
              // 10 BORNHOST
--            socketAddress2ByteBuffer(msgInner.getBornHost() ,this.encoderBuffer);
++            socketAddress2ByteBuffer(msgInner.getBornHost(), this.encoderBuffer);
              // 11 STORETIMESTAMP
              this.encoderBuffer.putLong(msgInner.getStoreTimestamp());
              // 12 STOREHOSTADDRESS
--            socketAddress2ByteBuffer(msgInner.getStoreHost() ,this.encoderBuffer);
++            socketAddress2ByteBuffer(msgInner.getStoreHost(), this.encoderBuffer);
              // 13 RECONSUMETIMES
              this.encoderBuffer.putInt(msgInner.getReconsumeTimes());
              // 14 Prepared Transaction Offset
@@@ -1858,155 -1715,55 +1896,164 @@@
              byteBuffer.limit(limit);
          }
  
+         public ByteBuffer getEncoderBuffer() {
+             return encoderBuffer;
+         }
      }
  
 -    static class PutMessageThreadLocal {
 -        private MessageExtEncoder encoder;
 -        private StringBuilder keyBuilder;
 -        PutMessageThreadLocal(int size) {
 -            encoder = new MessageExtEncoder(size);
 -            keyBuilder = new StringBuilder();
 -        }
 +    interface FlushManager {
 +        void start();
+ 
 -        public MessageExtEncoder getEncoder() {
 -            return encoder;
 +        void shutdown();
++
 +        void wakeUpFlush();
++
 +        void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt);
++
 +        CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt);
 +    }
 +
 +    class DefaultFlushManager implements FlushManager {
 +
 +        private final FlushCommitLogService flushCommitLogService;
 +
 +        //If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
 +        private final FlushCommitLogService commitLogService;
 +
 +        public DefaultFlushManager() {
 +            if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
 +                this.flushCommitLogService = new CommitLog.GroupCommitService();
 +            } else {
 +                this.flushCommitLogService = new CommitLog.FlushRealTimeService();
 +            }
 +
 +            this.commitLogService = new CommitLog.CommitRealTimeService();
          }
  
 -        public StringBuilder getKeyBuilder() {
 -            return keyBuilder;
 +        @Override
 +        public void start() {
 +            this.flushCommitLogService.start();
 +
 +            if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
 +                this.commitLogService.start();
 +            }
          }
 -    }
  
-         public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
 -    static class PutMessageContext {
 -        private String topicQueueTableKey;
 -        private long[] phyPos;
 -        private int batchSize;
++        public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult,
++            MessageExt messageExt) {
 +            // Synchronization flush
 +            if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
 +                final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
 +                if (messageExt.isWaitStoreMsgOK()) {
 +                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
 +                    service.putRequest(request);
 +                    CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
 +                    PutMessageStatus flushStatus = null;
 +                    try {
 +                        flushStatus = flushOkFuture.get(CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
-                                 TimeUnit.MILLISECONDS);
++                            TimeUnit.MILLISECONDS);
 +                    } catch (InterruptedException | ExecutionException | TimeoutException e) {
 +                        //flushOK=false;
 +                    }
 +                    if (flushStatus != PutMessageStatus.PUT_OK) {
 +                        log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
-                                 + " client address: " + messageExt.getBornHostString());
++                            + " client address: " + messageExt.getBornHostString());
 +                        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
 +                    }
 +                } else {
 +                    service.wakeup();
 +                }
 +            }
 +            // Asynchronous flush
 +            else {
 +                if (!CommitLog.this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
 +                    flushCommitLogService.wakeup();
 +                } else {
 +                    commitLogService.wakeup();
 +                }
 +            }
 +        }
  
 -        public PutMessageContext(String topicQueueTableKey) {
 -            this.topicQueueTableKey = topicQueueTableKey;
 +        @Override
 +        public CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt) {
 +            // Synchronization flush
 +            if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
 +                final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
 +                if (messageExt.isWaitStoreMsgOK()) {
 +                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
-                             CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
++                        CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
 +                    service.putRequest(request);
 +                    return request.future();
 +                } else {
 +                    service.wakeup();
 +                    return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
 +                }
 +            }
 +            // Asynchronous flush
 +            else {
 +                if (!CommitLog.this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
 +                    flushCommitLogService.wakeup();
-                 } else  {
++                } else {
 +                    commitLogService.wakeup();
 +                }
 +                return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
 +            }
          }
  
 -        public String getTopicQueueTableKey() {
 -            return topicQueueTableKey;
 +        @Override
 +        public void wakeUpFlush() {
 +            // now wake up flush thread.
 +            flushCommitLogService.wakeup();
          }
  
 -        public long[] getPhyPos() {
 -            return phyPos;
 +        @Override
 +        public void shutdown() {
 +            if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
 +                this.commitLogService.shutdown();
 +            }
 +
 +            this.flushCommitLogService.shutdown();
          }
  
 -        public void setPhyPos(long[] phyPos) {
 -            this.phyPos = phyPos;
 +    }
 +
 +    public int getCommitLogSize() {
 +        return commitLogSize;
 +    }
 +
 +    public MappedFileQueue getMappedFileQueue() {
 +        return mappedFileQueue;
 +    }
 +
 +    public MessageStore getMessageStore() {
 +        return defaultMessageStore;
 +    }
 +
 +    @Override
 +    public void swapMap(int reserveNum, long forceSwapIntervalMs, long normalSwapIntervalMs) {
 +        this.getMappedFileQueue().swapMap(reserveNum, forceSwapIntervalMs, normalSwapIntervalMs);
 +    }
 +
 +    @Override
 +    public void cleanSwappedMap(long forceCleanSwapIntervalMs) {
 +        this.getMappedFileQueue().cleanSwappedMap(forceCleanSwapIntervalMs);
 +    }
 +
 +    static class PutMessageThreadLocal {
 +        private MessageExtEncoder encoder;
 +        private StringBuilder keyBuilder;
++
 +        PutMessageThreadLocal(int size) {
 +            encoder = new MessageExtEncoder(size);
 +            keyBuilder = new StringBuilder();
          }
  
 -        public int getBatchSize() {
 -            return batchSize;
 +        public MessageExtEncoder getEncoder() {
 +            return encoder;
          }
  
 -        public void setBatchSize(int batchSize) {
 -            this.batchSize = batchSize;
 +        public StringBuilder getKeyBuilder() {
 +            return keyBuilder;
          }
      }
  }
diff --cc store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index a1fc870,fdc725d..d4d5ef3
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@@ -439,13 -429,52 +445,59 @@@ public class ConsumeQueue implements Co
          this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
      }
  
+     private void multiDispatchLmqQueue(DispatchRequest request, int maxRetries) {
+         Map<String, String> prop = request.getPropertiesMap();
+         String multiDispatchQueue = prop.get(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
+         String multiQueueOffset = prop.get(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET);
+         String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+         String[] queueOffsets = multiQueueOffset.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+         if (queues.length != queueOffsets.length) {
+             log.error("[bug] queues.length!=queueOffsets.length ", request.getTopic());
+             return;
+         }
+         for (int i = 0; i < queues.length; i++) {
+             String queueName = queues[i];
+             long queueOffset = Long.parseLong(queueOffsets[i]);
+             int queueId = request.getQueueId();
+             if (this.defaultMessageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) {
+                 queueId = 0;
+             }
+             doDispatchLmqQueue(request, maxRetries, queueName, queueOffset, queueId);
+ 
+         }
+         return;
+     }
+ 
+     private void doDispatchLmqQueue(DispatchRequest request, int maxRetries, String queueName, long queueOffset,
+         int queueId) {
+         ConsumeQueue cq = this.defaultMessageStore.findConsumeQueue(queueName, queueId);
+         boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
+         for (int i = 0; i < maxRetries && canWrite; i++) {
+             boolean result = cq.putMessagePositionInfo(request.getCommitLogOffset(), request.getMsgSize(),
+                 request.getTagsCode(),
+                 queueOffset);
+             if (result) {
+                 break;
+             } else {
+                 log.warn("[BUG]put commit log position info to " + queueName + ":" + queueId + " " + request.getCommitLogOffset()
+                     + " failed, retry " + i + " times");
+ 
+                 try {
+                     Thread.sleep(1000);
+                 } catch (InterruptedException e) {
+                     log.warn("", e);
+                 }
+             }
+         }
+     }
+ 
 +    @Override
 +    public void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, MessageExtBrokerInner msg, short messageNum) {
 +        String topicQueueKey = getTopic() + "-" + getQueueId();
 +        long queueOffset = queueOffsetAssigner.assignQueueOffset(topicQueueKey, messageNum);
 +        msg.setQueueOffset(queueOffset);
 +    }
 +
      private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
          final long cqOffset) {
  
diff --cc store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 76165d9,f11d5f3..9b9590a
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@@ -454,20 -455,12 +477,26 @@@ public class DefaultMessageStore implem
              return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
          }
  
 +        if (msg.getProperties().containsKey(MessageConst.PROPERTY_INNER_NUM)
 +                && !MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) {
 +            log.warn("[BUG]The message had property {} but is not an inner batch", MessageConst.PROPERTY_INNER_NUM);
 +            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
 +        }
 +
 +        if (MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) {
 +            Optional<TopicConfig> topicConfig = this.getTopicConfig(msg.getTopic());
 +            if (!QueueTypeUtils.isBatchCq(topicConfig)) {
 +                log.error("[BUG]The message is an inner batch but cq type is not batch cq");
 +                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
 +            }
 +        }
 +
+         PutMessageStatus lmqMsgCheckStatus = this.checkLmqMessage(msg);
+         if (msgCheckStatus == PutMessageStatus.LMQ_CONSUME_QUEUE_NUM_EXCEEDED) {
+             return CompletableFuture.completedFuture(new PutMessageResult(lmqMsgCheckStatus, null));
+         }
+ 
+ 
          long beginTime = this.getSystemClock().now();
          CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
  
@@@ -485,8 -478,6 +514,7 @@@
  
          return putResultFuture;
      }
 +
-     @Override
      public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
          PutMessageStatus checkStoreStatus = this.checkStoreStatus();
          if (checkStoreStatus != PutMessageStatus.PUT_OK) {
@@@ -1084,10 -1058,11 +1125,11 @@@
              String topic = next.getKey();
  
              if (!topics.contains(topic) && !topic.equals(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC)
-                     && !topic.equals(TopicValidator.RMQ_SYS_TRANS_OP_HALF_TOPIC)) {
+                     && !topic.equals(TopicValidator.RMQ_SYS_TRANS_OP_HALF_TOPIC)
+                     && !MixAll.isLmq(topic)) {
 -                ConcurrentMap<Integer, ConsumeQueue> queueTable = next.getValue();
 -                for (ConsumeQueue cq : queueTable.values()) {
 -                    cq.destroy();
 +                ConcurrentMap<Integer, ConsumeQueueInterface> queueTable = next.getValue();
 +                for (ConsumeQueueInterface cq : queueTable.values()) {
 +                    this.consumeQueueStore.destroy(cq);
                      log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned",
                          cq.getTopic(),
                          cq.getQueueId()
@@@ -1598,43 -1678,7 +1663,43 @@@
  
          private volatile boolean cleanImmediately = false;
  
 +        double getDiskSpaceWarningLevelRatio() {
 +            double finalDiskSpaceWarningLevelRatio;
 +            if ("".equals(diskSpaceWarningLevelRatio)) {
 +                finalDiskSpaceWarningLevelRatio = DefaultMessageStore.this.getMessageStoreConfig().getDiskSpaceWarningLevelRatio() / 100.0;
 +            } else {
 +                finalDiskSpaceWarningLevelRatio = Double.parseDouble(diskSpaceWarningLevelRatio);
 +            }
 +
 +            if (finalDiskSpaceWarningLevelRatio > 0.90) {
 +                finalDiskSpaceWarningLevelRatio = 0.90;
 +            }
 +            if (finalDiskSpaceWarningLevelRatio < 0.35) {
 +                finalDiskSpaceWarningLevelRatio = 0.35;
 +            }
 +
 +            return finalDiskSpaceWarningLevelRatio;
 +        }
 +
 +        double getDiskSpaceCleanForciblyRatio() {
 +            double finalDiskSpaceCleanForciblyRatio;
 +            if ("".equals(diskSpaceCleanForciblyRatio)) {
 +                finalDiskSpaceCleanForciblyRatio = DefaultMessageStore.this.getMessageStoreConfig().getDiskSpaceCleanForciblyRatio() / 100.0;
 +            } else {
 +                finalDiskSpaceCleanForciblyRatio = Double.parseDouble(diskSpaceCleanForciblyRatio);
 +            }
 +
 +            if (finalDiskSpaceCleanForciblyRatio > 0.85) {
 +                finalDiskSpaceCleanForciblyRatio = 0.85;
 +            }
 +            if (finalDiskSpaceCleanForciblyRatio < 0.30) {
 +                finalDiskSpaceCleanForciblyRatio = 0.30;
 +            }
 +
 +            return finalDiskSpaceCleanForciblyRatio;
 +        }
 +
-         public void excuteDeleteFilesManualy() {
+         public void executeDeleteFilesManually() {
              this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES;
              DefaultMessageStore.log.info("executeDeleteFilesManually was invoked");
          }
@@@ -1761,10 -1802,9 +1826,9 @@@
              }
  
              {
-                 String storePathLogics = StorePathConfigHelper
-                     .getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir());
+                 String storePathLogics = DefaultMessageStore.this.getStorePathLogic();
                  double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
 -                if (logicsRatio > diskSpaceWarningLevelRatio) {
 +                if (logicsRatio > getDiskSpaceWarningLevelRatio()) {
                      boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
                      if (diskok) {
                          DefaultMessageStore.log.error("logics disk maybe full soon " + logicsRatio + ", so mark disk full");
diff --cc store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 341a29f,6771ede..9b8a9a7
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@@ -445,138 -399,12 +445,148 @@@ public interface MessageStore 
      void handleScheduleMessageService(BrokerRole brokerRole);
  
      /**
 +     * Will be triggered when a new message is appended to commit log.
 +     * @param msg the msg that is appended to commit log
 +     * @param result append message result
 +     * @param commitLogFile commit log file
 +     */
 +    void onCommitLogAppend(MessageExtBrokerInner msg, AppendMessageResult result, MappedFile commitLogFile);
 +
 +    /**
 +     * Will be triggered when a new dispatch request is sent to message store.
 +     * @param dispatchRequest dispatch request
 +     * @param doDispatch do dispatch if true
 +     * @param commitLogFile commit log file
 +     * @param isRecover is from recover process
 +     * @param isFileEnd if the dispatch request represents 'file end'
 +     */
 +    void onCommitLogDispatch(DispatchRequest dispatchRequest, boolean doDispatch, MappedFile commitLogFile, boolean isRecover, boolean isFileEnd);
 +
 +    /**
 +     * Get the message store config
 +     * @return the message store config
 +     */
 +    MessageStoreConfig getMessageStoreConfig();
 +
 +    /**
 +     * Get the statistics service
 +     * @return the statistics service
 +     */
 +    StoreStatsService getStoreStatsService();
 +
 +    /**
 +     * Get the store checkpoint component
 +     * @return the checkpoint component
 +     */
 +    StoreCheckpoint getStoreCheckpoint();
 +
 +    /**
 +     * Get the system clock
 +     * @return the system clock
 +     */
 +    SystemClock getSystemClock();
 +
 +    /**
 +     * Get the commit log
 +     * @return the commit log
 +     */
 +    CommitLog getCommitLog();
 +
 +    /**
 +     * Get running flags
 +     * @return running flags
 +     */
 +    RunningFlags getRunningFlags();
 +
 +    /**
 +     * Get the transient store pool
 +     * @return the transient store pool
 +     */
 +    TransientStorePool getTransientStorePool();
 +
 +    /**
 +     * Get the HA service
 +     * @return the HA service
 +     */
 +    HAService getHaService();
 +
 +    /**
 +     * Register clean file hook
 +     * @param logicalQueueCleanHook logical queue clean hook
 +     */
 +    void registerCleanFileHook(CleanFilesHook logicalQueueCleanHook);
 +
 +    /**
 +     * Get the allocate-mappedFile service
 +     * @return the allocate-mappedFile service
 +     */
 +    AllocateMappedFileService getAllocateMappedFileService();
 +
 +    /**
 +     * Truncate dirty logic files
 +     * @param phyOffset physical offset
 +     */
 +    void truncateDirtyLogicFiles(long phyOffset);
 +
 +    /**
 +     * Destroy logics files
 +     */
 +    void destroyLogics();
 +
 +    /**
 +     * Unlock mappedFile
 +     * @param unlockMappedFile the file that needs to be unlocked
 +     */
 +    void unlockMappedFile(MappedFile unlockMappedFile);
 +
 +    /**
 +     * Get the perf counter component
 +     * @return the perf counter component
 +     */
 +    PerfCounter.Ticks getPerfCounter();
 +
 +    /**
 +     * Get the queue store
 +     * @return the queue store
 +     */
 +    ConsumeQueueStore getQueueStore();
 +
 +    /**
 +     * If 'sync disk flush' is configured in this message store
 +     * @return yes if true, no if false
 +     */
 +    boolean isSyncDiskFlush();
 +
 +    /**
 +     * If this message store is sync master role
 +     * @return yes if true, no if false
 +     */
 +    boolean isSyncMaster();
 +
 +    /**
 +     * Assign an queue offset and increase it.
 +     * If there is a race condition, you need to lock/unlock this method yourself.
 +     *
 +     * @param topicQueueKey topic-queue key
 +     * @param msg message
 +     * @param messageNum message num
 +     */
 +    void assignOffset(String topicQueueKey, MessageExtBrokerInner msg, short messageNum);
 +
 +    /**
 +     * get topic config
 +     * @param topic topic name
 +     * @return topic config info
 +     */
 +    Optional<TopicConfig> getTopicConfig(String topic);
++
++    /**
+      * Clean unused lmq topic.
+      * When calling to clean up the lmq topic,
+      * the lmq topic cannot be used to write messages at the same time,
+      * otherwise the messages of the cleaning lmq topic may be lost,
+      * please call this method with caution
+      * @param topic lmq topic
+      */
+     void cleanUnusedLmqTopic(String topic);
  }
diff --cc store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index e58e695,45293e6..12ff598
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@@ -167,48 -160,14 +168,56 @@@ public class MessageStoreConfig 
  
      private boolean enableScheduleMessageStats = true;
  
 +    private int maxBatchDeleteFilesNum = 50;
 +    //Polish dispatch
 +    private int dispatchCqThreads = 10;
 +    private int dispatchCqCacheNum = 1024 * 4;
 +    private boolean enableAsyncReput = true;
 +    //For recheck the reput
 +    private boolean recheckReputOffsetFromCq = false;
 +
 +    // Maximum length of topic
 +    private int maxTopicLength = 1000;
 +    private int travelCqFileNumWhenGetMessage = 1;
 +    // Sleep interval between to corrections
 +    private int correctLogicMinOffsetSleepInterval = 1;
 +    // Force correct min offset interval
 +    private int correctLogicMinOffsetForceInterval = 5 * 60 * 1000;
 +    // swap
 +    private boolean mappedFileSwapEnable = true;
 +    private long commitLogForceSwapMapInterval = 12L * 60 * 60 * 1000;
 +    private long commitLogSwapMapInterval = 1L * 60 * 60 * 1000;
 +    private int commitLogSwapMapReserveFileNum = 100;
 +    private long logicQueueForceSwapMapInterval = 12L * 60 * 60 * 1000;
 +    private long logicQueueSwapMapInterval = 1L * 60 * 60 * 1000;
 +    private long cleanSwapedMapInterval = 5L * 60 * 1000;
 +    private int logicQueueSwapMapReserveFileNum = 20;
 +
 +    private boolean searchBcqByCacheEnable = true;
 +
 +    @ImportantField
 +    private boolean dispatchFromSenderThread = false;
 +
 +    @ImportantField
 +    private boolean wakeCommitWhenPutMessage = true;
 +    @ImportantField
 +    private boolean wakeFlushWhenPutMessage = false;
 +
 +    @ImportantField
 +    private boolean enableCleanExpiredOffset = false;
 +
 +    private int maxAsyncPutMessageRequests = 5000;
 +
 +    private int pullBatchMaxMessageCount = 160;
 +
+     private boolean enableLmq = false;
+     private boolean enableMultiDispatch = false;
+     private int maxLmqConsumeQueueNum = 20000;
+ 
+     private boolean enableScheduleAsyncDeliver = false;
+     private int scheduleAsyncDeliverMaxPendingLimit = 2000;
+     private int scheduleAsyncDeliverMaxResendNum2Blocked = 3;
+ 
      public boolean isDebugLockEnable() {
          return debugLockEnable;
      }
@@@ -858,163 -762,51 +875,211 @@@
          this.enableScheduleMessageStats = enableScheduleMessageStats;
      }
  
 +    public int getMaxAsyncPutMessageRequests() {
 +        return maxAsyncPutMessageRequests;
 +    }
 +
 +    public void setMaxAsyncPutMessageRequests(int maxAsyncPutMessageRequests) {
 +        this.maxAsyncPutMessageRequests = maxAsyncPutMessageRequests;
 +    }
 +
 +    public int getMaxRecoveryCommitlogFiles() {
 +        return maxRecoveryCommitlogFiles;
 +    }
 +
 +    public void setMaxRecoveryCommitlogFiles(final int maxRecoveryCommitlogFiles) {
 +        this.maxRecoveryCommitlogFiles = maxRecoveryCommitlogFiles;
 +    }
 +
 +    public boolean isDispatchFromSenderThread() {
 +        return dispatchFromSenderThread;
 +    }
 +
 +    public void setDispatchFromSenderThread(boolean dispatchFromSenderThread) {
 +        this.dispatchFromSenderThread = dispatchFromSenderThread;
 +    }
 +
 +    public int getDispatchCqThreads() {
 +        return dispatchCqThreads;
 +    }
 +
 +    public void setDispatchCqThreads(final int dispatchCqThreads) {
 +        this.dispatchCqThreads = dispatchCqThreads;
 +    }
 +
 +    public int getDispatchCqCacheNum() {
 +        return dispatchCqCacheNum;
 +    }
 +
 +    public void setDispatchCqCacheNum(final int dispatchCqCacheNum) {
 +        this.dispatchCqCacheNum = dispatchCqCacheNum;
 +    }
 +
 +    public boolean isEnableAsyncReput() {
 +        return enableAsyncReput;
 +    }
 +
 +    public void setEnableAsyncReput(final boolean enableAsyncReput) {
 +        this.enableAsyncReput = enableAsyncReput;
 +    }
 +
 +    public boolean isRecheckReputOffsetFromCq() {
 +        return recheckReputOffsetFromCq;
 +    }
 +
 +    public void setRecheckReputOffsetFromCq(final boolean recheckReputOffsetFromCq) {
 +        this.recheckReputOffsetFromCq = recheckReputOffsetFromCq;
 +    }
 +
 +    public long getCommitLogForceSwapMapInterval() {
 +        return commitLogForceSwapMapInterval;
 +    }
 +
 +    public void setCommitLogForceSwapMapInterval(long commitLogForceSwapMapInterval) {
 +        this.commitLogForceSwapMapInterval = commitLogForceSwapMapInterval;
 +    }
 +
 +    public int getCommitLogSwapMapReserveFileNum() {
 +        return commitLogSwapMapReserveFileNum;
 +    }
 +
 +    public void setCommitLogSwapMapReserveFileNum(int commitLogSwapMapReserveFileNum) {
 +        this.commitLogSwapMapReserveFileNum = commitLogSwapMapReserveFileNum;
 +    }
 +
 +    public long getLogicQueueForceSwapMapInterval() {
 +        return logicQueueForceSwapMapInterval;
 +    }
 +
 +    public void setLogicQueueForceSwapMapInterval(long logicQueueForceSwapMapInterval) {
 +        this.logicQueueForceSwapMapInterval = logicQueueForceSwapMapInterval;
 +    }
 +
 +    public int getLogicQueueSwapMapReserveFileNum() {
 +        return logicQueueSwapMapReserveFileNum;
 +    }
 +
 +    public void setLogicQueueSwapMapReserveFileNum(int logicQueueSwapMapReserveFileNum) {
 +        this.logicQueueSwapMapReserveFileNum = logicQueueSwapMapReserveFileNum;
 +    }
 +
 +    public long getCleanSwapedMapInterval() {
 +        return cleanSwapedMapInterval;
 +    }
 +
 +    public void setCleanSwapedMapInterval(long cleanSwapedMapInterval) {
 +        this.cleanSwapedMapInterval = cleanSwapedMapInterval;
 +    }
 +
 +    public long getCommitLogSwapMapInterval() {
 +        return commitLogSwapMapInterval;
 +    }
 +
 +    public void setCommitLogSwapMapInterval(long commitLogSwapMapInterval) {
 +        this.commitLogSwapMapInterval = commitLogSwapMapInterval;
 +    }
 +
 +    public long getLogicQueueSwapMapInterval() {
 +        return logicQueueSwapMapInterval;
 +    }
 +
 +    public void setLogicQueueSwapMapInterval(long logicQueueSwapMapInterval) {
 +        this.logicQueueSwapMapInterval = logicQueueSwapMapInterval;
 +    }
 +
 +    public int getMaxBatchDeleteFilesNum() {
 +        return maxBatchDeleteFilesNum;
 +    }
 +
 +    public void setMaxBatchDeleteFilesNum(int maxBatchDeleteFilesNum) {
 +        this.maxBatchDeleteFilesNum = maxBatchDeleteFilesNum;
 +    }
 +
 +    public boolean isSearchBcqByCacheEnable() {
 +        return searchBcqByCacheEnable;
 +    }
 +
 +    public void setSearchBcqByCacheEnable(boolean searchBcqByCacheEnable) {
 +        this.searchBcqByCacheEnable = searchBcqByCacheEnable;
 +    }
 +
 +    public int getDiskSpaceWarningLevelRatio() {
 +        return diskSpaceWarningLevelRatio;
 +    }
 +
 +    public void setDiskSpaceWarningLevelRatio(int diskSpaceWarningLevelRatio) {
 +        this.diskSpaceWarningLevelRatio = diskSpaceWarningLevelRatio;
 +    }
 +
 +    public int getDiskSpaceCleanForciblyRatio() {
 +        return diskSpaceCleanForciblyRatio;
 +    }
 +
 +    public void setDiskSpaceCleanForciblyRatio(int diskSpaceCleanForciblyRatio) {
 +        this.diskSpaceCleanForciblyRatio = diskSpaceCleanForciblyRatio;
 +    }
 +
 +    public boolean isMappedFileSwapEnable() {
 +        return mappedFileSwapEnable;
 +    }
 +
 +    public void setMappedFileSwapEnable(boolean mappedFileSwapEnable) {
 +        this.mappedFileSwapEnable = mappedFileSwapEnable;
 +    }
 +
 +    public int getPullBatchMaxMessageCount() {
 +        return pullBatchMaxMessageCount;
 +    }
 +
 +    public void setPullBatchMaxMessageCount(int pullBatchMaxMessageCount) {
 +        this.pullBatchMaxMessageCount = pullBatchMaxMessageCount;
 +    }
++
+     public boolean isEnableLmq() {
+         return enableLmq;
+     }
+ 
+     public void setEnableLmq(boolean enableLmq) {
+         this.enableLmq = enableLmq;
+     }
+ 
+     public boolean isEnableMultiDispatch() {
+         return enableMultiDispatch;
+     }
+ 
+     public void setEnableMultiDispatch(boolean enableMultiDispatch) {
+         this.enableMultiDispatch = enableMultiDispatch;
+     }
+ 
+     public int getMaxLmqConsumeQueueNum() {
+         return maxLmqConsumeQueueNum;
+     }
+ 
+     public void setMaxLmqConsumeQueueNum(int maxLmqConsumeQueueNum) {
+         this.maxLmqConsumeQueueNum = maxLmqConsumeQueueNum;
+     }
+ 
+     public boolean isEnableScheduleAsyncDeliver() {
+         return enableScheduleAsyncDeliver;
+     }
+ 
+     public void setEnableScheduleAsyncDeliver(boolean enableScheduleAsyncDeliver) {
+         this.enableScheduleAsyncDeliver = enableScheduleAsyncDeliver;
+     }
+ 
+     public int getScheduleAsyncDeliverMaxPendingLimit() {
+         return scheduleAsyncDeliverMaxPendingLimit;
+     }
+ 
+     public void setScheduleAsyncDeliverMaxPendingLimit(int scheduleAsyncDeliverMaxPendingLimit) {
+         this.scheduleAsyncDeliverMaxPendingLimit = scheduleAsyncDeliverMaxPendingLimit;
+     }
+ 
+     public int getScheduleAsyncDeliverMaxResendNum2Blocked() {
+         return scheduleAsyncDeliverMaxResendNum2Blocked;
+     }
+ 
+     public void setScheduleAsyncDeliverMaxResendNum2Blocked(int scheduleAsyncDeliverMaxResendNum2Blocked) {
+         this.scheduleAsyncDeliverMaxResendNum2Blocked = scheduleAsyncDeliverMaxResendNum2Blocked;
+     }
  }
diff --cc store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
index 1ee8a88,774896b..8daa4d6
--- a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
@@@ -257,22 -231,14 +257,20 @@@ public class DefaultMappedFile extends 
          return this.fileFromOffset;
      }
  
 +    @Override
      public boolean appendMessage(final byte[] data) {
 +        return appendMessage(data, 0, data.length);
 +    }
 +
 +    @Override
 +    public boolean appendMessage(ByteBuffer data) {
          int currentPos = this.wrotePosition.get();
 +        int remaining = data.remaining();
  
 -        if ((currentPos + data.length) <= this.fileSize) {
 +        if ((currentPos + remaining) <= this.fileSize) {
              try {
 -                ByteBuffer buf = this.mappedByteBuffer.slice();
 -                buf.position(currentPos);
 -                buf.put(data);
 +                this.fileChannel.position(currentPos);
-                 while (data.hasRemaining()) {
-                     this.fileChannel.write(data);
-                 }
++                this.fileChannel.write(ByteBuffer.wrap(data));
              } catch (Throwable e) {
                  log.error("Error occurred when append message to mappedFile.", e);
              }
diff --cc store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
index 677c983,d5b4e8d..4989bd3
--- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
@@@ -16,29 -16,35 +16,36 @@@
   */
  package org.apache.rocketmq.store.schedule;
  
 +import java.util.Comparator;
  import java.util.HashMap;
  import java.util.Iterator;
 +import java.util.List;
  import java.util.Map;
- import java.util.Timer;
- import java.util.TimerTask;
+ import java.util.Queue;
+ import java.util.concurrent.CompletableFuture;
  import java.util.concurrent.ConcurrentHashMap;
  import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.LinkedBlockingQueue;
+ import java.util.concurrent.ScheduledExecutorService;
+ import java.util.concurrent.ScheduledThreadPoolExecutor;
+ import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.stream.Collectors;
  import org.apache.rocketmq.common.ConfigManager;
  import org.apache.rocketmq.common.MixAll;
+ import org.apache.rocketmq.common.ThreadFactoryImpl;
  import org.apache.rocketmq.common.TopicFilterType;
  import org.apache.rocketmq.common.constant.LoggerName;
 -import org.apache.rocketmq.common.topic.TopicValidator;
 -import org.apache.rocketmq.logging.InternalLogger;
 -import org.apache.rocketmq.logging.InternalLoggerFactory;
  import org.apache.rocketmq.common.message.MessageAccessor;
  import org.apache.rocketmq.common.message.MessageConst;
  import org.apache.rocketmq.common.message.MessageDecoder;
  import org.apache.rocketmq.common.message.MessageExt;
  import org.apache.rocketmq.common.running.RunningStats;
 -import org.apache.rocketmq.store.ConsumeQueue;
 -import org.apache.rocketmq.store.ConsumeQueueExt;
 +import org.apache.rocketmq.common.topic.TopicValidator;
 +import org.apache.rocketmq.logging.InternalLogger;
 +import org.apache.rocketmq.logging.InternalLoggerFactory;
+ import org.apache.rocketmq.store.DefaultMessageStore;
  import org.apache.rocketmq.store.MessageExtBrokerInner;
  import org.apache.rocketmq.store.MessageStore;
  import org.apache.rocketmq.store.PutMessageResult;
@@@ -313,158 -376,435 +379,419 @@@ public class ScheduleMessageService ext
          }
  
          public void executeOnTimeup() {
 -            ConsumeQueue cq =
 -                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
 +            ConsumeQueueInterface cq =
 +                ScheduleMessageService.this.defaultMessageStore.getConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                      delayLevel2QueueId(delayLevel));
  
-             long failScheduleOffset = offset;
+             if (cq == null) {
+                 this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
+                 return;
+             }
  
 -            SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
 -            if (bufferCQ == null) {
 -                long resetOffset;
 -                if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) {
 -                    log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",
 -                        this.offset, resetOffset, cq.getQueueId());
 -                } else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset) {
 -                    log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",
 -                        this.offset, resetOffset, cq.getQueueId());
 -                } else {
 -                    resetOffset = this.offset;
 -                }
 -
 -                this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
 -                return;
 -            }
 -
 -            long nextOffset = this.offset;
 -            try {
 -                int i = 0;
 -                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
 -                for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
 -                    long offsetPy = bufferCQ.getByteBuffer().getLong();
 -                    int sizePy = bufferCQ.getByteBuffer().getInt();
 -                    long tagsCode = bufferCQ.getByteBuffer().getLong();
 -
 -                    if (cq.isExtAddr(tagsCode)) {
 -                        if (cq.getExt(tagsCode, cqExtUnit)) {
 -                            tagsCode = cqExtUnit.getTagsCode();
 -                        } else {
 -                            //can't find ext content.So re compute tags code.
 -                            log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
 -                                tagsCode, offsetPy, sizePy);
 -                            long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
 -                            tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
 -                        }
 -                    }
 -
 -                    long now = System.currentTimeMillis();
 -                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
 -                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
 +            if (cq != null) {
 +                ReferredIterator<CqUnit> bufferCQ = cq.iterateFrom(this.offset);
 +                if (bufferCQ != null) {
 +                    try {
 +                        long nextOffset = offset;
 +                        while (bufferCQ.hasNext()) {
 +                            CqUnit cqUnit =  bufferCQ.next();
 +                            long offsetPy = cqUnit.getPos();
 +                            int sizePy = cqUnit.getSize();
 +                            long tagsCode = cqUnit.getTagsCode();
 +
 +                            if (!cqUnit.isTagsCodeValid()) {
 +                                //can't find ext content.So re compute tags code.
 +                                log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
 +                                    tagsCode, offsetPy, sizePy);
 +                                long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
 +                                tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
 +                            }
 +
 +                            long now = System.currentTimeMillis();
 +                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
 +
 +                            long currOffset = cqUnit.getQueueOffset();
 +                            assert cqUnit.getBatchNum() == 1;
 +                            nextOffset = currOffset + cqUnit.getBatchNum();
  
-                             long countdown = deliverTimestamp - now;
- 
-                             if (countdown <= 0) {
-                                 MessageExt msgExt =
-                                     ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
-                                         offsetPy, sizePy);
- 
-                                 if (msgExt != null) {
-                                     try {
-                                         MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
-                                         if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
-                                             log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
-                                                 msgInner.getTopic(), msgInner);
-                                             continue;
-                                         }
-                                         PutMessageResult putMessageResult =
-                                             ScheduleMessageService.this.writeMessageStore
-                                                 .putMessage(msgInner);
- 
-                                         if (putMessageResult != null
-                                             && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
-                                             if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {
-                                                 ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getMsgNum());
-                                                 ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getWroteBytes());
-                                                 ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getMsgNum());
-                                                 ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getWroteBytes());
-                                                 ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
-                                                 ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
-                                                     putMessageResult.getAppendMessageResult().getWroteBytes());
-                                                 ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
-                                             }
-                                             continue;
-                                         } else {
-                                             // XXX: warn and notify me
-                                             log.error(
-                                                 "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
-                                                 msgExt.getTopic(), msgExt.getMsgId());
-                                             ScheduleMessageService.this.timer.schedule(
-                                                 new DeliverDelayedMessageTimerTask(this.delayLevel,
-                                                     currOffset), DELAY_FOR_A_PERIOD);
-                                             ScheduleMessageService.this.updateOffset(this.delayLevel,
-                                                 currOffset);
-                                             return;
-                                         }
-                                     } catch (Exception e) {
-                                         /*
-                                          * XXX: warn and notify me
-                                          */
-                                         log.error(
-                                             "ScheduleMessageService, messageTimeup execute error, drop it. msgExt={}, nextOffset={}, offsetPy={}, sizePy={}", msgExt, nextOffset, offsetPy, sizePy, e);
-                                     }
-                                 }
-                             } else {
-                                 ScheduleMessageService.this.timer.schedule(
-                                     new DeliverDelayedMessageTimerTask(this.delayLevel, currOffset),
-                                     countdown);
-                                 ScheduleMessageService.this.updateOffset(this.delayLevel, currOffset);
-                                 return;
-                             }
-                         } // end of for
- 
-                         ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
-                             this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
-                         ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
+                     long countdown = deliverTimestamp - now;
+                     if (countdown > 0) {
+                         this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
                          return;
-                     } finally {
+                     }
  
-                         bufferCQ.release();
+                     MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
+                     if (msgExt == null) {
+                         continue;
                      }
-                 } // end of if (bufferCQ != null)
-                 else {
- 
-                     long cqMinOffset = cq.getMinOffsetInQueue();
-                     long cqMaxOffset = cq.getMaxOffsetInQueue();
-                     if (offset < cqMinOffset) {
-                         failScheduleOffset = cqMinOffset;
-                         log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
-                             offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
+ 
+                     MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
+                     if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
+                         log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
+                             msgInner.getTopic(), msgInner);
+                         continue;
                      }
  
-                     if (offset > cqMaxOffset) {
-                         failScheduleOffset = cqMaxOffset;
-                         log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
-                             offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
+                     boolean deliverSuc;
+                     if (ScheduleMessageService.this.enableAsyncDeliver) {
+                         deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
+                     } else {
+                         deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
+                     }
+ 
+                     if (!deliverSuc) {
+                         this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
+                         return;
                      }
                  }
-             } // end of if (cq != null)
  
-             ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
-                 failScheduleOffset), DELAY_FOR_A_WHILE);
+                 nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+             } catch (Exception e) {
+                 log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
+             } finally {
+                 bufferCQ.release();
+             }
+ 
+             this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
          }
  
-         private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
-             MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
-             msgInner.setBody(msgExt.getBody());
-             msgInner.setFlag(msgExt.getFlag());
-             MessageAccessor.setProperties(msgInner, msgExt.getProperties());
+         public void scheduleNextTimerTask(long offset, long delay) {
+             ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(
+                 this.delayLevel, offset), delay, TimeUnit.MILLISECONDS);
+         }
+ 
+         private boolean syncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,
+             int sizePy) {
+             PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, false);
+             PutMessageResult result = resultProcess.get();
+             boolean sendStatus = result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK;
+             if (sendStatus) {
+                 ScheduleMessageService.this.updateOffset(this.delayLevel, resultProcess.getNextOffset());
+             }
+             return sendStatus;
+         }
+ 
+         private boolean asyncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,
+             int sizePy) {
+             Queue<PutResultProcess> processesQueue = ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);
+ 
+             //Flow Control
+             int currentPendingNum = processesQueue.size();
+             int maxPendingLimit = ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig()
+                 .getScheduleAsyncDeliverMaxPendingLimit();
+             if (currentPendingNum > maxPendingLimit) {
+                 log.warn("Asynchronous deliver triggers flow control, " +
+                     "currentPendingNum={}, maxPendingLimit={}", currentPendingNum, maxPendingLimit);
+                 return false;
+             }
+ 
+             //Blocked
+             PutResultProcess firstProcess = processesQueue.peek();
+             if (firstProcess != null && firstProcess.need2Blocked()) {
+                 log.warn("Asynchronous deliver block. info={}", firstProcess.toString());
+                 return false;
+             }
+ 
+             PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, true);
+             processesQueue.add(resultProcess);
+             return true;
+         }
+ 
+         private PutResultProcess deliverMessage(MessageExtBrokerInner msgInner, String msgId, long offset,
+             long offsetPy, int sizePy, boolean autoResend) {
+             CompletableFuture<PutMessageResult> future =
+                 ScheduleMessageService.this.writeMessageStore.asyncPutMessage(msgInner);
+             return new PutResultProcess()
+                 .setTopic(msgInner.getTopic())
+                 .setDelayLevel(this.delayLevel)
+                 .setOffset(offset)
+                 .setPhysicOffset(offsetPy)
+                 .setPhysicSize(sizePy)
+                 .setMsgId(msgId)
+                 .setAutoResend(autoResend)
+                 .setFuture(future)
+                 .thenProcess();
+         }
+     }
+ 
+     public class HandlePutResultTask implements Runnable {
+         private final int delayLevel;
+ 
+         public HandlePutResultTask(int delayLevel) {
+             this.delayLevel = delayLevel;
+         }
  
-             TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
-             long tagsCodeValue =
-                 MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
-             msgInner.setTagsCode(tagsCodeValue);
-             msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
+         @Override
+         public void run() {
+             LinkedBlockingQueue<PutResultProcess> pendingQueue =
+                 ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);
+ 
+             PutResultProcess putResultProcess;
+             while ((putResultProcess = pendingQueue.peek()) != null) {
+                 try {
+                     switch (putResultProcess.getStatus()) {
+                         case SUCCESS:
+                             ScheduleMessageService.this.updateOffset(this.delayLevel, putResultProcess.getNextOffset());
+                             pendingQueue.remove();
+                             break;
+                         case RUNNING:
+                             break;
+                         case EXCEPTION:
+                             if (!isStarted()) {
+                                 log.warn("HandlePutResultTask shutdown, info={}", putResultProcess.toString());
+                                 return;
+                             }
+                             log.warn("putResultProcess error, info={}", putResultProcess.toString());
+                             putResultProcess.onException();
+                             break;
+                         case SKIP:
+                             log.warn("putResultProcess skip, info={}", putResultProcess.toString());
+                             pendingQueue.remove();
+                             break;
+                     }
+                 } catch (Exception e) {
+                     log.error("HandlePutResultTask exception. info={}", putResultProcess.toString(), e);
+                     putResultProcess.onException();
+                 }
+             }
+ 
+             if (isStarted()) {
+                 ScheduleMessageService.this.handleExecutorService
+                     .schedule(new HandlePutResultTask(this.delayLevel), DELAY_FOR_A_SLEEP, TimeUnit.MILLISECONDS);
+             }
+         }
+     }
  
-             msgInner.setSysFlag(msgExt.getSysFlag());
-             msgInner.setBornTimestamp(msgExt.getBornTimestamp());
-             msgInner.setBornHost(msgExt.getBornHost());
-             msgInner.setStoreHost(msgExt.getStoreHost());
-             msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
+     public class PutResultProcess {
+         private String topic;
+         private long offset;
+         private long physicOffset;
+         private int physicSize;
+         private int delayLevel;
+         private String msgId;
+         private boolean autoResend = false;
+         private CompletableFuture<PutMessageResult> future;
+ 
+         private volatile int resendCount = 0;
+         private volatile ProcessStatus status = ProcessStatus.RUNNING;
+ 
+         public PutResultProcess setTopic(String topic) {
+             this.topic = topic;
+             return this;
+         }
  
-             msgInner.setWaitStoreMsgOK(false);
-             MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+         public PutResultProcess setOffset(long offset) {
+             this.offset = offset;
+             return this;
+         }
  
-             msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
+         public PutResultProcess setPhysicOffset(long physicOffset) {
+             this.physicOffset = physicOffset;
+             return this;
+         }
  
-             String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
-             int queueId = Integer.parseInt(queueIdStr);
-             msgInner.setQueueId(queueId);
+         public PutResultProcess setPhysicSize(int physicSize) {
+             this.physicSize = physicSize;
+             return this;
+         }
  
-             return msgInner;
+         public PutResultProcess setDelayLevel(int delayLevel) {
+             this.delayLevel = delayLevel;
+             return this;
          }
+ 
+         public PutResultProcess setMsgId(String msgId) {
+             this.msgId = msgId;
+             return this;
+         }
+ 
+         public PutResultProcess setAutoResend(boolean autoResend) {
+             this.autoResend = autoResend;
+             return this;
+         }
+ 
+         public PutResultProcess setFuture(CompletableFuture<PutMessageResult> future) {
+             this.future = future;
+             return this;
+         }
+ 
+         public String getTopic() {
+             return topic;
+         }
+ 
+         public long getOffset() {
+             return offset;
+         }
+ 
+         public long getNextOffset() {
+             return offset + 1;
+         }
+ 
+         public long getPhysicOffset() {
+             return physicOffset;
+         }
+ 
+         public int getPhysicSize() {
+             return physicSize;
+         }
+ 
+         public Integer getDelayLevel() {
+             return delayLevel;
+         }
+ 
+         public String getMsgId() {
+             return msgId;
+         }
+ 
+         public boolean isAutoResend() {
+             return autoResend;
+         }
+ 
+         public CompletableFuture<PutMessageResult> getFuture() {
+             return future;
+         }
+ 
+         public int getResendCount() {
+             return resendCount;
+         }
+ 
+         public PutResultProcess thenProcess() {
+             this.future.thenAccept(result -> {
+                 this.handleResult(result);
+             });
+ 
+             this.future.exceptionally(e -> {
+                 log.error("ScheduleMessageService put message exceptionally, info: {}",
+                     PutResultProcess.this.toString(), e);
+ 
+                 onException();
+                 return null;
+             });
+             return this;
+         }
+ 
+         private void handleResult(PutMessageResult result) {
+             if (result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
+                 onSuccess(result);
+             } else {
+                 log.warn("ScheduleMessageService put message failed. info: {}.", result);
+                 onException();
+             }
+         }
+ 
+         public void onSuccess(PutMessageResult result) {
+             this.status = ProcessStatus.SUCCESS;
+             if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {
+                 ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, result.getAppendMessageResult().getMsgNum());
+                 ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, result.getAppendMessageResult().getWroteBytes());
+                 ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, result.getAppendMessageResult().getMsgNum());
+                 ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, result.getAppendMessageResult().getWroteBytes());
+                 ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(this.topic, result.getAppendMessageResult().getMsgNum(), 1);
+                 ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(this.topic, result.getAppendMessageResult().getWroteBytes());
+                 ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(result.getAppendMessageResult().getMsgNum());
+             }
+         }
+ 
+         public void onException() {
+             log.warn("ScheduleMessageService onException, info: {}", this.toString());
+             if (this.autoResend) {
+                 this.resend();
+             } else {
+                 this.status = ProcessStatus.SKIP;
+             }
+         }
+ 
+         public ProcessStatus getStatus() {
+             return this.status;
+         }
+ 
+         public PutMessageResult get() {
+             try {
+                 return this.future.get();
+             } catch (InterruptedException | ExecutionException e) {
+                 return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
+             }
+         }
+ 
+         private void resend() {
+             log.info("Resend message, info: {}", this.toString());
+ 
+             // Gradually increase the resend interval.
+             try {
+                 Thread.sleep(Math.min(this.resendCount++ * 100, 60 * 1000));
+             } catch (InterruptedException e) {
+                 e.printStackTrace();
+             }
+ 
+             try {
+                 MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(this.physicOffset, this.physicSize);
+                 if (msgExt == null) {
+                     log.warn("ScheduleMessageService resend not found message. info: {}", this.toString());
+                     this.status = need2Skip() ? ProcessStatus.SKIP : ProcessStatus.EXCEPTION;
+                     return;
+                 }
+ 
+                 MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
+                 PutMessageResult result = ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);
+                 this.handleResult(result);
+                 if (result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
+                     log.info("Resend message success, info: {}", this.toString());
+                 }
+             } catch (Exception e) {
+                 this.status = ProcessStatus.EXCEPTION;
+                 log.error("Resend message error, info: {}", this.toString(), e);
+             }
+         }
+ 
+         public boolean need2Blocked() {
+             int maxResendNum2Blocked = ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig()
+                 .getScheduleAsyncDeliverMaxResendNum2Blocked();
+             return this.resendCount > maxResendNum2Blocked;
+         }
+ 
+         public boolean need2Skip() {
+             int maxResendNum2Blocked = ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig()
+                 .getScheduleAsyncDeliverMaxResendNum2Blocked();
+             return this.resendCount > maxResendNum2Blocked * 2;
+         }
+ 
+         @Override
+         public String toString() {
+             return "PutResultProcess{" +
+                 "topic='" + topic + '\'' +
+                 ", offset=" + offset +
+                 ", physicOffset=" + physicOffset +
+                 ", physicSize=" + physicSize +
+                 ", delayLevel=" + delayLevel +
+                 ", msgId='" + msgId + '\'' +
+                 ", autoResend=" + autoResend +
+                 ", resendCount=" + resendCount +
+                 ", status=" + status +
+                 '}';
+         }
+     }
+ 
+     public enum ProcessStatus {
+         /**
+          * In process, the processing result has not yet been returned.
+          * */
+         RUNNING,
+ 
+         /**
+          * Put message success.
+          * */
+         SUCCESS,
+ 
+         /**
+          * Put message exception.
+          * When autoResend is true, the message will be resend.
+          * */
+         EXCEPTION,
+ 
+         /**
+          * Skip put message.
+          * When the message cannot be looked, the message will be skipped.
+          * */
+         SKIP,
      }
  }
diff --cc store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
index d19ab5e,668c069..4acd9a9
--- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
@@@ -26,13 -26,11 +26,14 @@@ import java.net.UnknownHostException
  import java.util.Map;
  import org.apache.rocketmq.common.BrokerConfig;
  import org.apache.rocketmq.common.UtilAll;
+ import org.apache.rocketmq.common.message.MessageConst;
  import org.apache.rocketmq.common.message.MessageDecoder;
  import org.apache.rocketmq.store.config.MessageStoreConfig;
 +import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
 +import org.apache.rocketmq.store.queue.CqUnit;
 +import org.apache.rocketmq.store.queue.ReferredIterator;
  import org.apache.rocketmq.store.stats.BrokerStatsManager;
 +import org.junit.Assert;
  import org.junit.Test;
  
  import static org.assertj.core.api.Assertions.assertThat;
diff --cc store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
index 356e653,d8202eb..2636d40
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
@@@ -481,9 -486,9 +481,9 @@@ public class DefaultMessageStoreCleanFi
  
      private void initMessageStore(MessageStoreConfig messageStoreConfig, double diskSpaceCleanForciblyRatio) throws Exception {
          messageStore = new DefaultMessageStore(messageStoreConfig,
-                 new BrokerStatsManager("test"), new MyMessageArrivingListener(), new BrokerConfig());
+                 new BrokerStatsManager("test", true), new MyMessageArrivingListener(), new BrokerConfig());
  
 -        cleanCommitLogService = getCleanCommitLogService(diskSpaceCleanForciblyRatio);
 +        cleanCommitLogService = getCleanCommitLogService();
          cleanConsumeQueueService = getCleanConsumeQueueService();
  
          assertTrue(messageStore.load());
diff --cc store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java
index e27483e,788bdbd..2170759
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java
@@@ -71,8 -70,8 +70,8 @@@ public class DefaultMessageStoreShutDow
          messageStoreConfig.setMaxHashSlotNum(10000);
          messageStoreConfig.setMaxIndexNum(100 * 100);
          messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
 +        messageStoreConfig.setHaListenPort(StoreTestBase.nextPort());
-         return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), null, new BrokerConfig());
+         return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest", true), null, new BrokerConfig());
      }
  
 -
  }
diff --cc store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 932fc2f,b565c5c..9e6a983
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@@ -114,8 -113,7 +115,8 @@@ public class DefaultMessageStoreTest 
          messageStoreConfig.setMaxIndexNum(100 * 100);
          messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
          messageStoreConfig.setFlushIntervalConsumeQueue(1);
 +        messageStoreConfig.setHaListenPort(StoreTestBase.nextPort());
-         return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), new MyMessageArrivingListener(), new BrokerConfig());
+         return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest", true), new MyMessageArrivingListener(), new BrokerConfig());
      }
  
      @Test
diff --cc test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index 32af3bd,0f1c4bf..173f608
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@@ -95,31 -68,14 +95,34 @@@ public class BaseConf 
  
      }
  
 +    // This method can't be placed in the static block of BaseConf, which seems to lead to a strange dead lock.
 +    public static void waitBrokerRegistered(final String nsAddr, final String clusterName, final int expectedBrokerNum) {
 +        final DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(500);
 +        mqAdminExt.setNamesrvAddr(nsAddr);
 +        try {
 +            mqAdminExt.start();
 +            await().atMost(30, TimeUnit.SECONDS).until(() -> {
 +                List<BrokerData> brokerDatas = mqAdminExt.examineTopicRouteInfo(clusterName).getBrokerDatas();
 +                return brokerDatas.size() == expectedBrokerNum;
 +            });
 +            for (BrokerController brokerController: brokerControllerList) {
 +                brokerController.getBrokerOuterAPI().refreshMetadata();
 +            }
 +        } catch (Exception e) {
 +            log.error("init failed, please check BaseConf", e);
 +            Assert.fail(e.getMessage());
 +        }
 +        ForkJoinPool.commonPool().execute(mqAdminExt::shutdown);
 +    }
 +
      public static String initTopic() {
-         String topic = "tt-" + MQRandomUtils.getRandomTopic();
-         IntegrationTestBase.initTopic(topic, nsAddr, clusterName, CQType.SimpleCQ);
+         String topic = MQRandomUtils.getRandomTopic();
+         return initTopicWithName(topic);
+     }
  
-         return topic;
+     public static String initTopicWithName(String topicName) {
+         IntegrationTestBase.initTopic(topicName, nsAddr, clusterName);
+         return topicName;
      }
  
      public static String initConsumerGroup() {