You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/06/30 01:07:49 UTC

[rocketmq] branch 5.0.0-beta updated: [ISSUE #4435] Code optimization for ConsumeQueue abstraction. (#4439)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-beta
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta by this push:
     new 05fee0d77 [ISSUE #4435] Code optimization for ConsumeQueue abstraction. (#4439)
05fee0d77 is described below

commit 05fee0d773d971cc56aafbbb95955f44fb963bed
Author: Hongjian Fei <er...@163.com>
AuthorDate: Thu Jun 30 09:07:30 2022 +0800

    [ISSUE #4435] Code optimization for ConsumeQueue abstraction. (#4439)
    
    * Delay the start of some heavy components.
    
    * Remove the code for rechecking reput position.
    
    * Make ConsumeQueue and BatchConsumeQueue program to an interface: MessageStore, not an implementation: DefaultMessageStore.
    
    * Transfer methods that should not belong to FileQueueLifeCycle to ConsumeQueueInterface.
    
    * Configurable method doCheckReputOffsetFromCq.
    
    * Fix naming issues.
---
 .../org/apache/rocketmq/store/ConsumeQueue.java    |  50 ++++-----
 .../apache/rocketmq/store/DefaultMessageStore.java | 123 ++++++++++++---------
 .../org/apache/rocketmq/store/MessageStore.java    |   7 ++
 .../rocketmq/store/queue/BatchConsumeQueue.java    |  26 ++---
 .../store/queue/ConsumeQueueInterface.java         |  22 ++++
 .../rocketmq/store/queue/ConsumeQueueStore.java    |  10 +-
 .../rocketmq/store/queue/FileQueueLifeCycle.java   |  59 +++++++++-
 .../java/org/apache/rocketmq/store/HATest.java     |   2 +-
 .../store/dledger/MessageStoreTestBase.java        |   2 +
 9 files changed, 197 insertions(+), 104 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 740377dc4..1e59dd92e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -47,7 +47,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
     public static final int CQ_STORE_UNIT_SIZE = 20;
     private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
 
-    private final DefaultMessageStore defaultMessageStore;
+    private final MessageStore messageStore;
 
     private final MappedFileQueue mappedFileQueue;
     private final String topic;
@@ -65,10 +65,10 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
         final int queueId,
         final String storePath,
         final int mappedFileSize,
-        final DefaultMessageStore defaultMessageStore) {
+        final MessageStore messageStore) {
         this.storePath = storePath;
         this.mappedFileSize = mappedFileSize;
-        this.defaultMessageStore = defaultMessageStore;
+        this.messageStore = messageStore;
 
         this.topic = topic;
         this.queueId = queueId;
@@ -81,13 +81,13 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
 
         this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
 
-        if (defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt()) {
+        if (messageStore.getMessageStoreConfig().isEnableConsumeQueueExt()) {
             this.consumeQueueExt = new ConsumeQueueExt(
                 topic,
                 queueId,
-                StorePathConfigHelper.getStorePathConsumeQueueExt(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()),
-                defaultMessageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueueExt(),
-                defaultMessageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt()
+                StorePathConfigHelper.getStorePathConsumeQueueExt(messageStore.getMessageStoreConfig().getStorePathRootDir()),
+                messageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueueExt(),
+                messageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt()
             );
         }
     }
@@ -188,7 +188,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
             int high = 0;
             int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1;
             long leftIndexValue = -1L, rightIndexValue = -1L;
-            long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset();
+            long minPhysicOffset = this.messageStore.getMinPhyOffset();
             SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0);
             if (null != sbr) {
                 ByteBuffer byteBuffer = sbr.getByteBuffer();
@@ -206,7 +206,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
                         }
 
                         long storeTime =
-                            this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
+                            this.messageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
                         if (storeTime < 0) {
                             return 0;
                         } else if (storeTime == timestamp) {
@@ -427,7 +427,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
     @Override
     public void putMessagePositionInfoWrapper(DispatchRequest request) {
         final int maxRetries = 30;
-        boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
+        boolean canWrite = this.messageStore.getRunningFlags().isCQWriteable();
         for (int i = 0; i < maxRetries && canWrite; i++) {
             long tagsCode = request.getTagsCode();
             if (isExtWriteEnable()) {
@@ -447,11 +447,11 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
             boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
                 request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
             if (result) {
-                if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE ||
-                    this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
-                    this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
+                if (this.messageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE ||
+                    this.messageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
+                    this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
                 }
-                this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
+                this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
                 if (checkMultiDispatchQueue(request)) {
                     multiDispatchLmqQueue(request, maxRetries);
                 }
@@ -471,11 +471,11 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
 
         // XXX: warn and notify me
         log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
-        this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
+        this.messageStore.getRunningFlags().makeLogicsQueueError();
     }
 
     private boolean checkMultiDispatchQueue(DispatchRequest dispatchRequest) {
-        if (!this.defaultMessageStore.getMessageStoreConfig().isEnableMultiDispatch()) {
+        if (!this.messageStore.getMessageStoreConfig().isEnableMultiDispatch()) {
             return false;
         }
         Map<String, String> prop = dispatchRequest.getPropertiesMap();
@@ -504,7 +504,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
             String queueName = queues[i];
             long queueOffset = Long.parseLong(queueOffsets[i]);
             int queueId = request.getQueueId();
-            if (this.defaultMessageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) {
+            if (this.messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) {
                 queueId = 0;
             }
             doDispatchLmqQueue(request, maxRetries, queueName, queueOffset, queueId);
@@ -515,8 +515,8 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
 
     private void doDispatchLmqQueue(DispatchRequest request, int maxRetries, String queueName, long queueOffset,
         int queueId) {
-        ConsumeQueueInterface cq = this.defaultMessageStore.findConsumeQueue(queueName, queueId);
-        boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
+        ConsumeQueueInterface cq = this.messageStore.findConsumeQueue(queueName, queueId);
+        boolean canWrite = this.messageStore.getRunningFlags().isCQWriteable();
         for (int i = 0; i < maxRetries && canWrite; i++) {
             boolean result = ((ConsumeQueue) cq).putMessagePositionInfo(request.getCommitLogOffset(), request.getMsgSize(),
                 request.getTagsCode(),
@@ -543,7 +543,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
         long queueOffset = queueOffsetAssigner.assignQueueOffset(topicQueueKey, messageNum);
         msg.setQueueOffset(queueOffset);
         // For LMQ
-        if (!defaultMessageStore.getMessageStoreConfig().isEnableMultiDispatch()) {
+        if (!messageStore.getMessageStoreConfig().isEnableMultiDispatch()) {
             return;
         }
         String multiDispatchQueue = msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
@@ -554,7 +554,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
         Long[] queueOffsets = new Long[queues.length];
         for (int i = 0; i < queues.length; i++) {
             String key = queueKey(queues[i], msg);
-            if (defaultMessageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(key)) {
+            if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(key)) {
                 queueOffsets[i] = queueOffsetAssigner.assignLmqOffset(key, (short) 1);
             }
         }
@@ -568,7 +568,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
         keyBuilder.append(queueName);
         keyBuilder.append('-');
         int queueId = msgInner.getQueueId();
-        if (defaultMessageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) {
+        if (messageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) {
             queueId = 0;
         }
         keyBuilder.append(queueId);
@@ -809,10 +809,10 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
     }
 
     @Override
-    public long rollNextFile(final long index) {
+    public long rollNextFile(final long nextBeginOffset) {
         int mappedFileSize = this.mappedFileSize;
         int totalUnitsInFile = mappedFileSize / CQ_STORE_UNIT_SIZE;
-        return index + totalUnitsInFile - index % totalUnitsInFile;
+        return nextBeginOffset + totalUnitsInFile - nextBeginOffset % totalUnitsInFile;
     }
 
     @Override
@@ -873,7 +873,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
 
     protected boolean isExtWriteEnable() {
         return this.consumeQueueExt != null
-            && this.defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt();
+            && this.messageStore.getMessageStoreConfig().isEnableConsumeQueueExt();
     }
 
     /**
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index bcb826e53..0da12a232 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -193,21 +193,12 @@ public class DefaultMessageStore implements MessageStore {
                 this.haService = new DefaultHAService();
                 LOGGER.warn("Load default HA Service: {}", DefaultHAService.class.getSimpleName());
             }
-            this.haService.init(this);
         }
 
         this.reputMessageService = new ReputMessageService();
 
         this.transientStorePool = new TransientStorePool(messageStoreConfig);
 
-        if (messageStoreConfig.isTransientStorePoolEnable()) {
-            this.transientStorePool.init();
-        }
-
-        this.allocateMappedFileService.start();
-
-        this.indexService.start();
-
         this.scheduledExecutorService =
             Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity()));
 
@@ -309,6 +300,17 @@ public class DefaultMessageStore implements MessageStore {
      */
     @Override
     public void start() throws Exception {
+        if (!messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
+            this.haService.init(this);
+        }
+
+        if (messageStoreConfig.isTransientStorePoolEnable()) {
+            this.transientStorePool.init();
+        }
+
+        this.allocateMappedFileService.start();
+
+        this.indexService.start();
 
         lock = lockFile.getChannel().tryLock(0, 1, false);
         if (lock == null || lock.isShared() || !lock.isValid()) {
@@ -321,53 +323,14 @@ public class DefaultMessageStore implements MessageStore {
         if (this.getMessageStoreConfig().isDuplicationEnable()) {
             this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
         } else {
-            /**
-             * 1. Make sure the fast-forward messages to be truncated during the recovering according to the max physical offset of the commitlog;
-             * 2. DLedger committedPos may be missing, so the maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by DLedgerCommitLog, just let it go;
-             * 3. Calculate the reput offset according to the consume queue;
-             * 4. Make sure the fall-behind messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed.
-             */
-            long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
-            for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.getConsumeQueueTable().values()) {
-                for (ConsumeQueueInterface logic : maps.values()) {
-                    if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
-                        maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
-                    }
-                }
-            }
-            if (maxPhysicalPosInLogicQueue < 0) {
-                maxPhysicalPosInLogicQueue = 0;
-            }
-            if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
-                maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
-                /**
-                 * This happens in following conditions:
-                 * 1. If someone removes all the consumequeue files or the disk get damaged.
-                 * 2. Launch a new broker, and copy the commitlog from other brokers.
-                 *
-                 * All the conditions has the same in common that the maxPhysicalPosInLogicQueue should be 0.
-                 * If the maxPhysicalPosInLogicQueue is gt 0, there maybe something wrong.
-                 */
-                LOGGER.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
-            }
-            LOGGER.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
-                maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
-            this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
+            // It is [recover]'s responsibility to fully dispatch the commit log data before the max offset of commit log.
+            this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
         }
         this.reputMessageService.start();
 
-        /**
-         *  1. Finish dispatching the messages fall behind, then to start other services.
-         *  2. DLedger committedPos may be missing, so here just require dispatchBehindBytes <= 0
-         */
-        while (true) {
-            if (dispatchBehindBytes() <= 0) {
-                break;
-            }
-            Thread.sleep(1000);
-            LOGGER.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes());
-        }
-        this.recoverTopicQueueTable();
+        // Checking is not necessary, as long as the dLedger's implementation exactly follows the definition of Recover,
+        // which is eliminating the dispatch inconsistency between the commitLog and consumeQueue at the end of recovery.
+        this.doRecheckReputOffsetFromCq();
 
         this.flushConsumeQueueService.start();
         this.commitLog.start();
@@ -383,6 +346,59 @@ public class DefaultMessageStore implements MessageStore {
         this.shutdown = false;
     }
 
+    private void doRecheckReputOffsetFromCq() throws InterruptedException {
+        if (!messageStoreConfig.isRecheckReputOffsetFromCq()) {
+            return;
+        }
+
+        /**
+         * 1. Make sure the fast-forward messages to be truncated during the recovering according to the max physical offset of the commitlog;
+         * 2. DLedger committedPos may be missing, so the maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by DLedgerCommitLog, just let it go;
+         * 3. Calculate the reput offset according to the consume queue;
+         * 4. Make sure the fall-behind messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed.
+         */
+        long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
+        for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.getConsumeQueueTable().values()) {
+            for (ConsumeQueueInterface logic : maps.values()) {
+                if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
+                    maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
+                }
+            }
+        }
+        // If maxPhyPos(CQs) < minPhyPos(CommitLog), some newly deleted topics may be re-dispatched into cqs mistakenly.
+        if (maxPhysicalPosInLogicQueue < 0) {
+            maxPhysicalPosInLogicQueue = 0;
+        }
+        if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
+            maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
+            /**
+             * This happens in following conditions:
+             * 1. If someone removes all the consumequeue files or the disk get damaged.
+             * 2. Launch a new broker, and copy the commitlog from other brokers.
+             *
+             * All the conditions has the same in common that the maxPhysicalPosInLogicQueue should be 0.
+             * If the maxPhysicalPosInLogicQueue is gt 0, there maybe something wrong.
+             */
+            LOGGER.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
+        }
+        LOGGER.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}",
+                maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());
+        this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
+
+        /**
+         *  1. Finish dispatching the messages fall behind, then to start other services.
+         *  2. DLedger committedPos may be missing, so here just require dispatchBehindBytes <= 0
+         */
+        while (true) {
+            if (dispatchBehindBytes() <= 0) {
+                break;
+            }
+            Thread.sleep(1000);
+            LOGGER.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes());
+        }
+        this.recoverTopicQueueTable();
+    }
+
     @Override
     public void shutdown() {
         if (!this.shutdown) {
@@ -1459,6 +1475,7 @@ public class DefaultMessageStore implements MessageStore {
         return null;
     }
 
+    @Override
     public ConsumeQueueInterface findConsumeQueue(String topic, int queueId) {
         return this.consumeQueueStore.findOrCreateConsumeQueue(topic, queueId);
     }
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 9dc27faff..94b618696 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -466,6 +466,13 @@ public interface MessageStore {
      */
     ConsumeQueueInterface getConsumeQueue(String topic, int queueId);
 
+    /**
+     * Get consume queue of the topic/queue. If consume queue not exist, will create one then return it.
+     * @param topic Topic.
+     * @param queueId Queue ID.
+     * @return Consume queue.
+     */
+    ConsumeQueueInterface findConsumeQueue(String topic, int queueId);
 
     /**
      * Get BrokerStatsManager of the messageStore.
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
index 3db3c0e7e..40bc76303 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
@@ -51,7 +51,7 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy
     private static final int MSG_COMPACT_OFFSET_LENGTH = 4;
     public static final int INVALID_POS = -1;
     final MappedFileQueue mappedFileQueue;
-    private MessageStore defaultMessageStore;
+    private final MessageStore messageStore;
     private final String topic;
     private final int queueId;
     private final ByteBuffer byteBufferItem;
@@ -74,11 +74,11 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy
         final int queueId,
         final String storePath,
         final int mappedFileSize,
-        final MessageStore defaultMessageStore) {
+        final MessageStore messageStore) {
         this.storePath = storePath;
         this.mappedFileSize = mappedFileSize;
-        this.defaultMessageStore = defaultMessageStore;
-        this.commitLogSize = defaultMessageStore.getCommitLog().getCommitLogSize();
+        this.messageStore = messageStore;
+        this.commitLogSize = messageStore.getCommitLog().getCommitLogSize();
 
         this.topic = topic;
         this.queueId = queueId;
@@ -100,7 +100,7 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy
     }
 
     private void refreshCache() {
-        if (!this.defaultMessageStore.getMessageStoreConfig().isSearchBcqByCacheEnable()) {
+        if (!this.messageStore.getMessageStoreConfig().isSearchBcqByCacheEnable()) {
             return ;
         }
         ConcurrentSkipListMap<Long, MappedFile> newOffsetCache = new ConcurrentSkipListMap<>();
@@ -447,7 +447,7 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy
     @Override
     public void putMessagePositionInfoWrapper(DispatchRequest request) {
         final int maxRetries = 30;
-        boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
+        boolean canWrite = this.messageStore.getRunningFlags().isCQWriteable();
         if (request.getMsgBaseOffset() < 0 || request.getBatchSize() < 0) {
             log.warn("[NOTIFYME]unexpected dispacth request in batch consume queue topic:{} queue:{} offset:{}", topic, queueId, request.getCommitLogOffset());
             return;
@@ -457,10 +457,10 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy
                 request.getMsgSize(), request.getTagsCode(),
                 request.getStoreTimestamp(), request.getMsgBaseOffset(), request.getBatchSize());
             if (result) {
-                if (BrokerRole.SLAVE == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
-                    this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
+                if (BrokerRole.SLAVE == this.messageStore.getMessageStoreConfig().getBrokerRole()) {
+                    this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
                 }
-                this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
+                this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
                 return;
             } else {
                 // XXX: warn and notify me
@@ -476,7 +476,7 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy
         }
         // XXX: warn and notify me
         log.error("[NOTIFYME]batch consume queue can not write, {} {}", this.topic, this.queueId);
-        this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
+        this.messageStore.getRunningFlags().makeLogicsQueueError();
     }
 
     @Override
@@ -594,7 +594,7 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy
             targetBcq = lastBcq;
             targetMinOffset = minForLastBcq;
         } else {
-            boolean searchBcqByCacheEnable = this.defaultMessageStore.getMessageStoreConfig().isSearchBcqByCacheEnable();
+            boolean searchBcqByCacheEnable = this.messageStore.getMessageStoreConfig().isSearchBcqByCacheEnable();
             if (searchBcqByCacheEnable) {
                 // it's not the last BCQ file, so search it through cache.
                 targetBcq = this.searchOffsetFromCache(msgOffset);
@@ -675,7 +675,7 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy
             targetBcq = lastBcq;
             targetMinOffset = minForLastBcq;
         } else {
-            boolean searchBcqByCacheEnable = this.defaultMessageStore.getMessageStoreConfig().isSearchBcqByCacheEnable();
+            boolean searchBcqByCacheEnable = this.messageStore.getMessageStoreConfig().isSearchBcqByCacheEnable();
             if (searchBcqByCacheEnable) {
                 // it's not the last BCQ file, so search it through cache.
                 targetBcq = this.searchTimeFromCache(timestamp);
@@ -941,7 +941,7 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy
     }
 
     @Override
-    public long rollNextFile(long offset) {
+    public long rollNextFile(long nextBeginOffset) {
         return 0;
     }
 
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
index 6d1b68c45..c45592542 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
@@ -18,6 +18,8 @@
 package org.apache.rocketmq.store.queue;
 
 import org.apache.rocketmq.common.attribute.CQType;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.store.DispatchRequest;
 
 public interface ConsumeQueueInterface {
     /**
@@ -117,4 +119,24 @@ public interface ConsumeQueueInterface {
      * @return total size
      */
     long getTotalSize();
+
+    /**
+     * Correct min offset by min commit log offset.
+     * @param minCommitLogOffset min commit log offset
+     */
+    void correctMinOffset(long minCommitLogOffset);
+
+    /**
+     * Do dispatch.
+     * @param request the request containing dispatch information.
+     */
+    void putMessagePositionInfoWrapper(DispatchRequest request);
+
+    /**
+     * Assign queue offset.
+     * @param queueOffsetAssigner the delegated queue offset assigner
+     * @param msg message itself
+     * @param messageNum message number
+     */
+    void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, MessageExtBrokerInner msg, short messageNum);
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index beb053247..48cef5092 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -78,8 +78,7 @@ public class ConsumeQueueStore {
     }
 
     public void correctMinOffset(ConsumeQueueInterface consumeQueue, long minCommitLogOffset) {
-        FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
-        fileQueueLifeCycle.correctMinOffset(minCommitLogOffset);
+        consumeQueue.correctMinOffset(minCommitLogOffset);
     }
 
     /**
@@ -89,8 +88,7 @@ public class ConsumeQueueStore {
      * @param request dispatch request
      */
     public void putMessagePositionInfoWrapper(ConsumeQueueInterface consumeQueue, DispatchRequest request) {
-        FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
-        fileQueueLifeCycle.putMessagePositionInfoWrapper(request);
+        consumeQueue.putMessagePositionInfoWrapper(request);
     }
 
     public void putMessagePositionInfoWrapper(DispatchRequest dispatchRequest) {
@@ -315,8 +313,8 @@ public class ConsumeQueueStore {
     }
 
     public void assignQueueOffset(MessageExtBrokerInner msg, short messageNum) {
-        FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(msg.getTopic(), msg.getQueueId());
-        fileQueueLifeCycle.assignQueueOffset(this.queueOffsetAssigner, msg, messageNum);
+        ConsumeQueueInterface consumeQueue = findOrCreateConsumeQueue(msg.getTopic(), msg.getQueueId());
+        consumeQueue.assignQueueOffset(this.queueOffsetAssigner, msg, messageNum);
     }
 
     public void updateQueueOffset(String topic, int queueId, long offset) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java b/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java
index b2211d8ad..95cc0887f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java
@@ -16,22 +16,69 @@
  */
 package org.apache.rocketmq.store.queue;
 
-import org.apache.rocketmq.store.DispatchRequest;
-import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.store.Swappable;
 
+/**
+ * FileQueueLifeCycle contains life cycle methods of ConsumerQueue that is directly implemented by FILE.
+ */
 public interface FileQueueLifeCycle extends Swappable {
+    /**
+     * Load from file.
+     * @return true if loaded successfully.
+     */
     boolean load();
+
+    /**
+     * Recover from file.
+     */
     void recover();
+
+    /**
+     * Check files.
+     */
     void checkSelf();
+
+    /**
+     * Flush cache to file.
+     * @param flushLeastPages  the minimum number of pages to be flushed
+     * @return true if any data has been flushed.
+     */
     boolean flush(int flushLeastPages);
+
+    /**
+     * Destroy files.
+     */
     void destroy();
+
+    /**
+     * Truncate dirty logic files starting at max commit log position.
+     * @param maxCommitLogPos max commit log position
+     */
     void truncateDirtyLogicFiles(long maxCommitLogPos);
+
+    /**
+     * Delete expired files ending at min commit log position.
+     * @param minCommitLogPos min commit log position
+     * @return deleted file numbers.
+     */
     int deleteExpiredFile(long minCommitLogPos);
-    long rollNextFile(final long offset);
+
+    /**
+     * Roll to next file.
+     * @param nextBeginOffset next begin offset
+     * @return the beginning offset of the next file
+     */
+    long rollNextFile(final long nextBeginOffset);
+
+    /**
+     * Is the first file available?
+     * @return true if it's available
+     */
     boolean isFirstFileAvailable();
+
+    /**
+     * Does the first file exist?
+     * @return true if it exists
+     */
     boolean isFirstFileExist();
-    void correctMinOffset(long minCommitLogOffset);
-    void putMessagePositionInfoWrapper(DispatchRequest request);
-    void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, MessageExtBrokerInner msg, short messageNum);
 }
diff --git a/store/src/test/java/org/apache/rocketmq/store/HATest.java b/store/src/test/java/org/apache/rocketmq/store/HATest.java
index 9ef960f50..bee9d6429 100644
--- a/store/src/test/java/org/apache/rocketmq/store/HATest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/HATest.java
@@ -85,11 +85,11 @@ public class HATest {
         slaveMessageStore = buildMessageStore(slaveStoreConfig, 1L);
         boolean load = messageStore.load();
         boolean slaveLoad = slaveMessageStore.load();
-        slaveMessageStore.updateHaMasterAddress("127.0.0.1:10912");
         assertTrue(load);
         assertTrue(slaveLoad);
         messageStore.start();
         slaveMessageStore.start();
+        slaveMessageStore.updateHaMasterAddress("127.0.0.1:10912");
         Thread.sleep(6000L);//because the haClient will wait 5s after the first connectMaster failed,sleep 6s
     }
 
diff --git a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
index f1bc996a8..3ae0cb64e 100644
--- a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
+++ b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
@@ -56,6 +56,8 @@ public class MessageStoreTestBase extends StoreTestBase {
         storeConfig.setdLegerGroup(group);
         storeConfig.setdLegerPeers(peers);
         storeConfig.setdLegerSelfId(selfId);
+
+        storeConfig.setRecheckReputOffsetFromCq(true);
         DefaultMessageStore defaultMessageStore = new DefaultMessageStore(storeConfig,  new BrokerStatsManager("DLedgerCommitlogTest", true), (topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties) -> {
 
         }, new BrokerConfig());