You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2023/05/04 01:39:58 UTC

[rocketmq] branch develop updated: [ISSUE #6609] Fix the issue that consume queue building exceeds confirmOffset when node restarts to recover (#6618)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new b410557e8 [ISSUE #6609] Fix the issue that consume queue building exceeds confirmOffset when node restarts to recover (#6618)
b410557e8 is described below

commit b410557e873ee4bdfa56561eb719a26007c1a3bb
Author: rongtong <ji...@163.com>
AuthorDate: Thu May 4 09:39:51 2023 +0800

    [ISSUE #6609] Fix the issue that consume queue building exceeds confirmOffset when node restarts to recover (#6618)
    
    * Fix the issue that consume queue building exceeds confirmOffset when node restarts to recover
---
 .../apache/rocketmq/broker/BrokerController.java   |  9 ++--
 .../broker/controller/ReplicasManager.java         |  8 ++--
 .../controller/ReplicasManagerRegisterTest.java    |  5 +-
 .../broker/controller/ReplicasManagerTest.java     |  4 ++
 .../java/org/apache/rocketmq/store/CommitLog.java  | 55 ++++++++++++++++------
 .../apache/rocketmq/store/DefaultMessageStore.java |  9 ++--
 .../org/apache/rocketmq/store/RunningFlags.java    | 18 +++++--
 .../org/apache/rocketmq/store/StoreCheckpoint.java | 12 +++++
 .../store/ha/autoswitch/AutoSwitchHAClient.java    |  3 +-
 .../ha/autoswitch/AutoSwitchHAConnection.java      |  2 +-
 .../store/ha/autoswitch/AutoSwitchHAService.java   | 35 +++-----------
 .../store/ha/autoswitch/AutoSwitchHATest.java      | 32 +++++++++++--
 12 files changed, 124 insertions(+), 68 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index a35618dc0..68c9d963b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -763,6 +763,11 @@ public class BrokerController {
                 LOG.error("BrokerController#initialize: unexpected error occurs", e);
             }
         }
+
+        if (this.brokerConfig.isEnableControllerMode()) {
+            this.replicasManager.setIsolatedAndBrokerPermission(false);
+        }
+
         if (messageStore != null) {
             registerMessageStoreHook();
             result = result && this.messageStore.load();
@@ -1556,10 +1561,6 @@ public class BrokerController {
             isIsolated = true;
         }
 
-        if (this.brokerConfig.isEnableControllerMode()) {
-            this.replicasManager.setIsolatedAndBrokerPermission(false);
-        }
-
         if (this.brokerOuterAPI != null) {
             this.brokerOuterAPI.start();
         }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index 3c7e061a2..005d6b3cb 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -122,10 +122,6 @@ public class ReplicasManager {
         this.tempBrokerMetadata = new TempBrokerMetadata(this.brokerController.getMessageStoreConfig().getStorePathBrokerIdentity() + "-temp");
     }
 
-    public long getConfirmOffset() {
-        return this.haService.getConfirmOffset();
-    }
-
     enum State {
         INITIAL,
         FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE,
@@ -419,7 +415,7 @@ public class ReplicasManager {
                     this.brokerConfig.getSendHeartbeatTimeoutMillis(),
                     this.brokerConfig.isInBrokerContainer(), this.getLastEpoch(),
                     this.brokerController.getMessageStore().getMaxPhyOffset(),
-                    this.getConfirmOffset(),
+                    this.brokerController.getMessageStore().getConfirmOffset(),
                     this.brokerConfig.getControllerHeartBeatTimeoutMills(),
                     this.brokerConfig.getBrokerElectionPriority()
                 );
@@ -881,11 +877,13 @@ public class ReplicasManager {
         if (isBrokerRoleConfirmed) {
             this.brokerController.setIsolated(false);
             this.brokerConfig.setBrokerPermission(this.originalBrokerPermission);
+            this.brokerController.getMessageStore().getRunningFlags().makeIsolated(false);
         } else {
             // prohibit writing and reading before confirming the broker role
             this.brokerController.setIsolated(true);
             this.originalBrokerPermission = this.brokerConfig.getBrokerPermission();
             this.brokerConfig.setBrokerPermission(0);
+            this.brokerController.getMessageStore().getRunningFlags().makeIsolated(true);
         }
     }
 }
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java
index 7fb9d9aeb..d01a6f76f 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java
@@ -30,6 +30,7 @@ import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBro
 import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader;
 import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.RunningFlags;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
 import org.apache.rocketmq.store.ha.autoswitch.BrokerMetadata;
@@ -103,6 +104,8 @@ public class ReplicasManagerRegisterTest {
 
     private AutoSwitchHAService mockedAutoSwitchHAService;
 
+    private RunningFlags runningFlags = new RunningFlags();
+
     @Before
     public void setUp() throws Exception {
         UtilAll.deleteFile(new File(STORE_BASE_PATH));
@@ -116,8 +119,8 @@ public class ReplicasManagerRegisterTest {
         when(mockedBrokerController.getBrokerConfig()).thenReturn(BROKER_CONFIG);
         when(mockedBrokerController.getTopicConfigManager()).thenReturn(mockedTopicConfigManager);
         when(mockedMessageStore.getHaService()).thenReturn(mockedAutoSwitchHAService);
+        when(mockedMessageStore.getRunningFlags()).thenReturn(runningFlags);
         when(mockedBrokerController.getSlaveSynchronize()).thenReturn(new SlaveSynchronize(mockedBrokerController));
-
         when(mockedBrokerOuterAPI.getControllerMetaData(any())).thenReturn(
                 new GetMetaDataResponseHeader("default-group", "dledger-a", CONTROLLER_ADDR, true, CONTROLLER_ADDR));
         when(mockedBrokerOuterAPI.checkAddressReachable(any())).thenReturn(true);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
index e03828cff..c863f7ac9 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
@@ -38,6 +38,7 @@ import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextB
 import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerResponseHeader;
 import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.RunningFlags;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
 import org.assertj.core.api.Assertions;
@@ -95,6 +96,8 @@ public class ReplicasManagerTest {
 
     private SyncStateSet syncStateSet;
 
+    private RunningFlags runningFlags = new RunningFlags();
+
     private static final String OLD_MASTER_ADDRESS = "192.168.1.1";
 
     private static final String NEW_MASTER_ADDRESS = "192.168.1.2";
@@ -150,6 +153,7 @@ public class ReplicasManagerTest {
         when(defaultMessageStore.getMessageStoreConfig()).thenReturn(messageStoreConfig);
         when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
         when(brokerController.getMessageStore().getHaService()).thenReturn(autoSwitchHAService);
+        when(brokerController.getMessageStore().getRunningFlags()).thenReturn(runningFlags);
         when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
         when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
         when(brokerController.getSlaveSynchronize()).thenReturn(slaveSynchronize);
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 75b4042dc..18cc32179 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -52,6 +52,7 @@ import org.apache.rocketmq.store.MessageExtEncoder.PutMessageThreadLocal;
 import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.config.FlushDiskType;
 import org.apache.rocketmq.store.ha.HAService;
+import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
 import org.apache.rocketmq.store.logfile.MappedFile;
 
 /**
@@ -337,10 +338,19 @@ public class CommitLog implements Swappable {
             }
 
             processOffset += mappedFileOffset;
-            // Set a candidate confirm offset.
-            // In most cases, this value will be overwritten by confirmLog.init.
-            // It works if some confirmed messages are lost.
-            this.setConfirmOffset(lastValidMsgPhyOffset);
+
+            if (this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
+                if (this.defaultMessageStore.getConfirmOffset() < this.defaultMessageStore.getMinPhyOffset()) {
+                    log.error("confirmOffset {} is less than minPhyOffset {}, correct confirmOffset to minPhyOffset", this.defaultMessageStore.getConfirmOffset(), this.defaultMessageStore.getMinPhyOffset());
+                    this.defaultMessageStore.setConfirmOffset(this.defaultMessageStore.getMinPhyOffset());
+                } else if (this.defaultMessageStore.getConfirmOffset() > processOffset) {
+                    log.error("confirmOffset {} is larger than lastValidMsgPhyOffset {}, correct confirmOffset to processOffset", this.defaultMessageStore.getConfirmOffset(), processOffset);
+                    this.defaultMessageStore.setConfirmOffset(processOffset);
+                }
+            } else {
+                this.setConfirmOffset(lastValidMsgPhyOffset);
+            }
+
             this.mappedFileQueue.setFlushedWhere(processOffset);
             this.mappedFileQueue.setCommittedWhere(processOffset);
             this.mappedFileQueue.truncateDirtyFiles(processOffset);
@@ -544,7 +554,18 @@ public class CommitLog implements Swappable {
     }
 
     public long getConfirmOffset() {
-        if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
+        if (this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
+            if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE && !this.defaultMessageStore.getRunningFlags().isIsolated()) {
+                if (((AutoSwitchHAService) this.defaultMessageStore.getHaService()).getLocalSyncStateSet().size() == 1) {
+                    return this.defaultMessageStore.getMaxPhyOffset();
+                }
+                // First time compute confirmOffset.
+                if (this.confirmOffset <= 0) {
+                    setConfirmOffset(((AutoSwitchHAService) this.defaultMessageStore.getHaService()).computeConfirmOffset());
+                }
+            }
+            return this.confirmOffset;
+        } else if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
             return this.confirmOffset;
         } else {
             return getMaxOffset();
@@ -553,6 +574,7 @@ public class CommitLog implements Swappable {
 
     public void setConfirmOffset(long phyOffset) {
         this.confirmOffset = phyOffset;
+        this.defaultMessageStore.getStoreCheckpoint().setConfirmPhyOffset(confirmOffset);
     }
 
     public long getLastFileFromOffset() {
@@ -605,8 +627,8 @@ public class CommitLog implements Swappable {
                         lastValidMsgPhyOffset = processOffset + mappedFileOffset;
                         mappedFileOffset += size;
 
-                        if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
-                            if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
+                        if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable() || this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
+                            if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getCommitLog().getConfirmOffset()) {
                                 this.getMessageStore().onCommitLogDispatch(dispatchRequest, doDispatch, mappedFile, true, false);
                             }
                         } else {
@@ -644,10 +666,17 @@ public class CommitLog implements Swappable {
             }
 
             processOffset += mappedFileOffset;
-            // Set a candidate confirm offset.
-            // In most cases, this value will be overwritten by confirmLog.init.
-            // It works if some confirmed messages are lost.
-            this.setConfirmOffset(lastValidMsgPhyOffset);
+            if (this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
+                if (this.defaultMessageStore.getConfirmOffset() < this.defaultMessageStore.getMinPhyOffset()) {
+                    log.error("confirmOffset {} is less than minPhyOffset {}, correct confirmOffset to minPhyOffset", this.defaultMessageStore.getConfirmOffset(), this.defaultMessageStore.getMinPhyOffset());
+                    this.defaultMessageStore.setConfirmOffset(this.defaultMessageStore.getMinPhyOffset());
+                } else if (this.defaultMessageStore.getConfirmOffset() > processOffset) {
+                    log.error("confirmOffset {} is larger than lastValidMsgPhyOffset {}, correct confirmOffset to processOffset", this.defaultMessageStore.getConfirmOffset(), processOffset);
+                    this.defaultMessageStore.setConfirmOffset(processOffset);
+                }
+            } else {
+                this.setConfirmOffset(lastValidMsgPhyOffset);
+            }
             this.mappedFileQueue.setFlushedWhere(processOffset);
             this.mappedFileQueue.setCommittedWhere(processOffset);
             this.mappedFileQueue.truncateDirtyFiles(processOffset);
@@ -744,7 +773,7 @@ public class CommitLog implements Swappable {
         // dynamically adjust maxMessageSize, but not support DLedger mode temporarily
         int newMaxMessageSize = this.defaultMessageStore.getMessageStoreConfig().getMaxMessageSize();
         if (newMaxMessageSize >= 10 &&
-                putMessageThreadLocal.getEncoder().getMaxMessageBodySize() != newMaxMessageSize) {
+            putMessageThreadLocal.getEncoder().getMaxMessageBodySize() != newMaxMessageSize) {
             putMessageThreadLocal.getEncoder().updateEncoderBufferCapacity(newMaxMessageSize);
         }
     }
@@ -956,7 +985,6 @@ public class CommitLog implements Swappable {
         int needAckNums = this.defaultMessageStore.getMessageStoreConfig().getInSyncReplicas();
         boolean needHandleHA = needHandleHA(messageExtBatch);
 
-
         if (needHandleHA && this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
             if (this.defaultMessageStore.getHaService().inSyncReplicasNums(currOffset) < this.defaultMessageStore.getMessageStoreConfig().getMinInSyncReplicas()) {
                 return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
@@ -1829,7 +1857,6 @@ public class CommitLog implements Swappable {
 
     }
 
-
     class DefaultFlushManager implements FlushManager {
 
         private final FlushCommitLogService flushCommitLogService;
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index e1bdc6e71..ca8f30684 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -335,11 +335,14 @@ public class DefaultMessageStore implements MessageStore {
                     new StoreCheckpoint(
                         StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
                 this.masterFlushedOffset = this.storeCheckpoint.getMasterFlushedOffset();
+                setConfirmOffset(this.storeCheckpoint.getConfirmPhyOffset());
+
                 result = this.indexService.load(lastExitOK);
                 this.recover(lastExitOK);
                 LOGGER.info("message store recover end, and the max phy offset = {}", this.getMaxPhyOffset());
             }
 
+
             long maxOffset = this.getMaxPhyOffset();
             this.setBrokerInitMaxOffset(maxOffset);
             LOGGER.info("load over, and the max phy offset = {}", maxOffset);
@@ -1589,9 +1592,6 @@ public class DefaultMessageStore implements MessageStore {
 
     @Override
     public long getConfirmOffset() {
-        if (this.brokerConfig.isEnableControllerMode()) {
-            return ((AutoSwitchHAService) this.haService).getConfirmOffset();
-        }
         return this.commitLog.getConfirmOffset();
     }
 
@@ -2747,9 +2747,6 @@ public class DefaultMessageStore implements MessageStore {
         }
 
         public boolean isCommitLogAvailable() {
-            if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()) {
-                return this.reputFromOffset <= DefaultMessageStore.this.commitLog.getConfirmOffset();
-            }
             return this.reputFromOffset < DefaultMessageStore.this.getConfirmOffset();
         }
 
diff --git a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
index 7ff11a282..6a0ef5a5f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
+++ b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java
@@ -28,6 +28,8 @@ public class RunningFlags {
 
     private static final int DISK_FULL_BIT = 1 << 4;
 
+    private static final int ISOLATED_BIT = 1 << 5;
+
     private volatile int flagBits = 0;
 
     public RunningFlags() {
@@ -46,11 +48,11 @@ public class RunningFlags {
     }
 
     public boolean isReadable() {
-        if ((this.flagBits & NOT_READABLE_BIT) == 0) {
-            return true;
-        }
+        return (this.flagBits & NOT_READABLE_BIT) == 0;
+    }
 
-        return false;
+    public boolean isIsolated() {
+        return (this.flagBits & ISOLATED_BIT) != 0;
     }
 
     public boolean getAndMakeNotReadable() {
@@ -98,6 +100,14 @@ public class RunningFlags {
         this.flagBits |= WRITE_LOGICS_QUEUE_ERROR_BIT;
     }
 
+    public void makeIsolated(boolean isolated) {
+        if (isolated) {
+            this.flagBits |= ISOLATED_BIT;
+        } else {
+            this.flagBits &= ~ISOLATED_BIT;
+        }
+    }
+
     public boolean isLogicsQueueError() {
         if ((this.flagBits & WRITE_LOGICS_QUEUE_ERROR_BIT) == WRITE_LOGICS_QUEUE_ERROR_BIT) {
             return true;
diff --git a/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java b/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java
index a06aa2853..1e2504a2b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java
+++ b/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java
@@ -37,6 +37,7 @@ public class StoreCheckpoint {
     private volatile long logicsMsgTimestamp = 0;
     private volatile long indexMsgTimestamp = 0;
     private volatile long masterFlushedOffset = 0;
+    private volatile long confirmPhyOffset = 0;
 
     public StoreCheckpoint(final String scpPath) throws IOException {
         File file = new File(scpPath);
@@ -53,6 +54,7 @@ public class StoreCheckpoint {
             this.logicsMsgTimestamp = this.mappedByteBuffer.getLong(8);
             this.indexMsgTimestamp = this.mappedByteBuffer.getLong(16);
             this.masterFlushedOffset = this.mappedByteBuffer.getLong(24);
+            this.confirmPhyOffset = this.mappedByteBuffer.getLong(32);
 
             log.info("store checkpoint file physicMsgTimestamp " + this.physicMsgTimestamp + ", "
                 + UtilAll.timeMillisToHumanString(this.physicMsgTimestamp));
@@ -61,6 +63,7 @@ public class StoreCheckpoint {
             log.info("store checkpoint file indexMsgTimestamp " + this.indexMsgTimestamp + ", "
                 + UtilAll.timeMillisToHumanString(this.indexMsgTimestamp));
             log.info("store checkpoint file masterFlushedOffset " + this.masterFlushedOffset);
+            log.info("store checkpoint file confirmPhyOffset " + this.confirmPhyOffset);
         } else {
             log.info("store checkpoint file not exists, " + scpPath);
         }
@@ -84,6 +87,7 @@ public class StoreCheckpoint {
         this.mappedByteBuffer.putLong(8, this.logicsMsgTimestamp);
         this.mappedByteBuffer.putLong(16, this.indexMsgTimestamp);
         this.mappedByteBuffer.putLong(24, this.masterFlushedOffset);
+        this.mappedByteBuffer.putLong(32, this.confirmPhyOffset);
         this.mappedByteBuffer.force();
     }
 
@@ -103,6 +107,14 @@ public class StoreCheckpoint {
         this.logicsMsgTimestamp = logicsMsgTimestamp;
     }
 
+    public long getConfirmPhyOffset() {
+        return confirmPhyOffset;
+    }
+
+    public void setConfirmPhyOffset(long confirmPhyOffset) {
+        this.confirmPhyOffset = confirmPhyOffset;
+    }
+
     public long getMinTimestampIndex() {
         return Math.min(this.getMinTimestamp(), this.indexMsgTimestamp);
     }
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
index 2ef225e69..936db0c4c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
@@ -168,7 +168,6 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
         this.processPosition = 0;
         this.lastReadTimestamp = System.currentTimeMillis();
         this.lastWriteTimestamp = System.currentTimeMillis();
-        haService.updateConfirmOffset(-1);
     }
 
     public void reOpen() throws IOException {
@@ -565,7 +564,7 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient {
                                     AutoSwitchHAClient.this.messageStore.appendToCommitLog(masterOffset, bodyData, 0, bodyData.length);
                                 }
 
-                                haService.updateConfirmOffset(Math.min(confirmOffset, messageStore.getMaxPhyOffset()));
+                                haService.getDefaultMessageStore().setConfirmOffset(Math.min(confirmOffset, messageStore.getMaxPhyOffset()));
 
                                 if (!reportSlaveMaxOffset(HAConnectionState.TRANSFER)) {
                                     LOGGER.error("AutoSwitchHAClient report max offset to master failed");
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index 60710f143..440cd3c7a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -547,7 +547,7 @@ public class AutoSwitchHAConnection implements HAConnection {
             // EpochStartOffset
             this.byteBufferHeader.putLong(entry.getStartOffset());
             // Additional info(confirm offset)
-            final long confirmOffset = AutoSwitchHAConnection.this.haService.getConfirmOffset();
+            final long confirmOffset = AutoSwitchHAConnection.this.haService.getDefaultMessageStore().getConfirmOffset();
             this.byteBufferHeader.putLong(confirmOffset);
             this.byteBufferHeader.flip();
         }
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index d1e623ca7..75ef622ec 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -69,7 +69,6 @@ public class AutoSwitchHAService extends DefaultHAService {
 
     //  Indicate whether the syncStateSet is currently in the process of being synchronized to controller.
     private volatile boolean isSynchronizingSyncStateSet = false;
-    private volatile long confirmOffset = -1;
 
     private EpochFileCache epochCache;
     private AutoSwitchHAClient haClient;
@@ -128,7 +127,7 @@ public class AutoSwitchHAService extends DefaultHAService {
         // Truncate dirty file
         final long truncateOffset = truncateInvalidMsg();
 
-        updateConfirmOffset(computeConfirmOffset());
+        this.defaultMessageStore.setConfirmOffset(computeConfirmOffset());
 
         if (truncateOffset >= 0) {
             this.epochCache.truncateSuffixByOffset(truncateOffset);
@@ -155,7 +154,7 @@ public class AutoSwitchHAService extends DefaultHAService {
             defaultMessageStore.getTransientStorePool().setRealCommit(true);
         }
 
-        LOGGER.info("TruncateOffset is {}, confirmOffset is {}, maxPhyOffset is {}", truncateOffset, getConfirmOffset(), this.defaultMessageStore.getMaxPhyOffset());
+        LOGGER.info("TruncateOffset is {}, confirmOffset is {}, maxPhyOffset is {}", truncateOffset, this.defaultMessageStore.getConfirmOffset(), this.defaultMessageStore.getMaxPhyOffset());
         this.defaultMessageStore.recoverTopicQueueTable();
         this.defaultMessageStore.setStateMachineVersion(masterEpoch);
         LOGGER.info("Change ha to master success, newMasterEpoch:{}, startOffset:{}", masterEpoch, newEpochEntry.getStartOffset());
@@ -309,7 +308,7 @@ public class AutoSwitchHAService extends DefaultHAService {
         if (currentSyncStateSet.contains(slaveBrokerId)) {
             return;
         }
-        final long confirmOffset = getConfirmOffset();
+        final long confirmOffset = this.defaultMessageStore.getConfirmOffset();
         if (slaveMaxOffset >= confirmOffset) {
             final EpochEntry currentLeaderEpoch = this.epochCache.lastEntry();
             if (slaveMaxOffset >= currentLeaderEpoch.getStartOffset()) {
@@ -346,27 +345,11 @@ public class AutoSwitchHAService extends DefaultHAService {
         this.connectionCaughtUpTimeTable.put(slaveBrokerId, Math.max(prevTime, lastCaughtUpTimeMs));
     }
 
-    /**
-     * Get confirm offset (min slaveAckOffset of all syncStateSet members) for master
-     */
-    public long getConfirmOffset() {
-        if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
-            if (getLocalSyncStateSet().size() == 1) {
-                return this.defaultMessageStore.getMaxPhyOffset();
-            }
-            // First time compute confirmOffset.
-            if (this.confirmOffset <= 0) {
-                this.confirmOffset = computeConfirmOffset();
-            }
-        }
-        return confirmOffset;
-    }
-
     public void updateConfirmOffsetWhenSlaveAck(final Long slaveBrokerId) {
         this.readLock.lock();
         try {
             if (this.syncStateSet.contains(slaveBrokerId)) {
-                this.confirmOffset = computeConfirmOffset();
+                this.defaultMessageStore.setConfirmOffset(computeConfirmOffset());
             }
         } finally {
             this.readLock.unlock();
@@ -425,11 +408,7 @@ public class AutoSwitchHAService extends DefaultHAService {
         return info;
     }
 
-    public void updateConfirmOffset(long confirmOffset) {
-        this.confirmOffset = confirmOffset;
-    }
-
-    private long computeConfirmOffset() {
+    public long computeConfirmOffset() {
         final Set<Long> currentSyncStateSet = getSyncStateSet();
         long newConfirmOffset = this.defaultMessageStore.getMaxPhyOffset();
         List<Long> idList = this.connectionList.stream().map(connection -> ((AutoSwitchHAConnection)connection).getSlaveId()).collect(Collectors.toList());
@@ -439,7 +418,7 @@ public class AutoSwitchHAService extends DefaultHAService {
         for (Long syncId : currentSyncStateSet) {
             if (!idList.contains(syncId) && this.brokerControllerId != null && !Objects.equals(syncId, this.brokerControllerId)) {
                 LOGGER.warn("Slave {} is still in syncStateSet, but has lost its connection. So new offset can't be compute.", syncId);
-                return this.confirmOffset;
+                return this.defaultMessageStore.getConfirmOffset();
             }
         }
 
@@ -458,7 +437,7 @@ public class AutoSwitchHAService extends DefaultHAService {
             markSynchronizingSyncStateSetDone();
             this.syncStateSet.clear();
             this.syncStateSet.addAll(syncStateSet);
-            this.confirmOffset = computeConfirmOffset();
+            this.defaultMessageStore.setConfirmOffset(computeConfirmOffset());
         } finally {
             this.writeLock.unlock();
         }
diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index 6d105289f..19b4c8eb7 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -30,6 +30,7 @@ import org.apache.rocketmq.store.GetMessageStatus;
 import org.apache.rocketmq.store.MappedFileQueue;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.StoreCheckpoint;
 import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.config.FlushDiskType;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
@@ -214,7 +215,7 @@ public class AutoSwitchHATest {
         assertTrue(masterAndPutMessage);
         checkMessage(this.messageStore2, 10, 0);
 
-        final long confirmOffset = ((AutoSwitchHAService) this.messageStore1.getHaService()).getConfirmOffset();
+        final long confirmOffset = this.messageStore1.getConfirmOffset();
 
         // Step2, shutdown store2
         this.messageStore2.shutdown();
@@ -224,16 +225,18 @@ public class AutoSwitchHATest {
         assertEquals(putMessageResult.getPutMessageStatus(), PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
 
         // The confirmOffset still don't change, because syncStateSet contains broker2, but broker2 shutdown
-        assertEquals(confirmOffset, ((AutoSwitchHAService) this.messageStore1.getHaService()).getConfirmOffset());
+        assertEquals(confirmOffset, this.messageStore1.getConfirmOffset());
 
         // Step3, shutdown store1, start store2, change store2 to master, epoch = 2
         this.messageStore1.shutdown();
 
         storeConfig2.setBrokerRole(BrokerRole.SYNC_MASTER);
         messageStore2 = buildMessageStore(storeConfig2, 2L);
+        messageStore2.getRunningFlags().makeIsolated(true);
         assertTrue(messageStore2.load());
         messageStore2.start();
         messageStore2.getHaService().changeToMaster(2);
+        messageStore2.getRunningFlags().makeIsolated(false);
         ((AutoSwitchHAService) messageStore2.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList(2L)));
 
         // Put message on master
@@ -461,7 +464,7 @@ public class AutoSwitchHATest {
 
         // Step2: check flag SynchronizingSyncStateSet
         Assert.assertTrue(masterHAService.isSynchronizingSyncStateSet());
-        Assert.assertEquals(masterHAService.getConfirmOffset(), 1570);
+        Assert.assertEquals(this.messageStore1.getConfirmOffset(), 1570);
         Set<Long> syncStateSet = masterHAService.getSyncStateSet();
         Assert.assertEquals(syncStateSet.size(), 2);
         Assert.assertTrue(syncStateSet.contains(1L));
@@ -475,6 +478,29 @@ public class AutoSwitchHATest {
         Assert.assertFalse(masterHAService.isSynchronizingSyncStateSet());
     }
 
+    @Test
+    public void testBuildConsumeQueueNotExceedConfirmOffset() throws Exception {
+        init(defaultMappedFileSize);
+        ((AutoSwitchHAService) this.messageStore1.getHaService()).setSyncStateSet(new HashSet<>(Collections.singletonList(1L)));
+        changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, 2, this.storeConfig2, 1, store1HaAddress, 10);
+        checkMessage(this.messageStore2, 10, 0);
+
+        long tmpConfirmOffset = this.messageStore2.getConfirmOffset();
+        long setConfirmOffset = this.messageStore2.getConfirmOffset() - this.messageStore2.getConfirmOffset() / 2;
+        messageStore2.shutdown();
+        StoreCheckpoint storeCheckpoint = new StoreCheckpoint(storeConfig2.getStorePathRootDir() + File.separator + "checkpoint");
+        assertEquals(tmpConfirmOffset, storeCheckpoint.getConfirmPhyOffset());
+        storeCheckpoint.setConfirmPhyOffset(setConfirmOffset);
+        storeCheckpoint.shutdown();
+        messageStore2 = buildMessageStore(storeConfig2, 2L);
+        messageStore2.getRunningFlags().makeIsolated(true);
+        assertTrue(messageStore2.load());
+        messageStore2.start();
+        messageStore2.getRunningFlags().makeIsolated(false);
+        assertEquals(setConfirmOffset, messageStore2.getConfirmOffset());
+        checkMessage(this.messageStore2, 5, 0);
+    }
+
     @After
     public void destroy() throws Exception {
         if (this.messageStore2 != null) {