You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/05/06 08:34:49 UTC

[rocketmq] branch 5.0.0-beta-dledger-controller updated: [Summer of Code] Support switch role for ha service (#4236)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller by this push:
     new 911ee340c [Summer of Code] Support switch role for ha service (#4236)
911ee340c is described below

commit 911ee340cb502645cd75603f44ba72c198611434
Author: hzh0425 <58...@users.noreply.github.com>
AuthorDate: Fri May 6 16:34:27 2022 +0800

    [Summer of Code] Support switch role for ha service (#4236)
    
    * feature: support auto switch role ha service
    
    * feature:
    1.add EpochStartOffset in ha protocal
    2.notify AutoSwitchHAService when delete expired files
    3.add more tests for AutoSwitchHAService
    
    * feature:
    1.transfer syncFromLastFile from slave to master in handshake state
    2.return false if find consistent point failed
---
 .../org/apache/rocketmq/common/EpochEntry.java     |  68 +++
 .../rocketmq/common/utils/CheckpointFile.java      | 122 ++++
 .../namesrv/routeinfo/RouteInfoManager.java        |   1 -
 .../apache/rocketmq/store/DefaultMessageStore.java |  17 +-
 .../rocketmq/store/config/MessageStoreConfig.java  |  40 ++
 .../apache/rocketmq/store/ha/DefaultHAService.java |  70 ++-
 .../org/apache/rocketmq/store/ha/HAService.java    |  21 +
 .../store/ha/autoswitch/AutoSwitchHAClient.java    | 503 ++++++++++++++++
 .../ha/autoswitch/AutoSwitchHAConnection.java      | 630 +++++++++++++++++++++
 .../store/ha/autoswitch/AutoSwitchHAService.java   | 215 +++++++
 .../store/ha/autoswitch/EpochFileCache.java        | 328 +++++++++++
 .../rocketmq/store/ha/io/AbstractHAReader.java     |  80 +++
 .../apache/rocketmq/store/ha/io/HAReadHook.java    |  22 +
 .../apache/rocketmq/store/ha/io/HAWriteHook.java   |  22 +
 .../org/apache/rocketmq/store/ha/io/HAWriter.java  |  67 +++
 .../rocketmq/store/ha/EpochFileCacheTest.java      | 150 +++++
 .../store/ha/autoswitch/AutoSwitchHATest.java      | 332 +++++++++++
 17 files changed, 2662 insertions(+), 26 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/EpochEntry.java b/common/src/main/java/org/apache/rocketmq/common/EpochEntry.java
new file mode 100644
index 000000000..6cd3ceb23
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/EpochEntry.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common;
+
+public class EpochEntry {
+
+    private int epoch;
+    private long startOffset;
+    private long endOffset = Long.MAX_VALUE;
+
+    public EpochEntry(EpochEntry entry) {
+        this.epoch = entry.getEpoch();
+        this.startOffset = entry.getStartOffset();
+        this.endOffset = entry.getEndOffset();
+    }
+
+    public EpochEntry(int epoch, long startOffset) {
+        this.epoch = epoch;
+        this.startOffset = startOffset;
+    }
+
+    public int getEpoch() {
+        return epoch;
+    }
+
+    public void setEpoch(int epoch) {
+        this.epoch = epoch;
+    }
+
+    public long getStartOffset() {
+        return startOffset;
+    }
+
+    public void setStartOffset(long startOffset) {
+        this.startOffset = startOffset;
+    }
+
+    public long getEndOffset() {
+        return endOffset;
+    }
+
+    public void setEndOffset(long endOffset) {
+        this.endOffset = endOffset;
+    }
+
+    @Override public String toString() {
+        return "EpochEntry{" +
+            "epoch=" + epoch +
+            ", startOffset=" + startOffset +
+            ", endOffset=" + endOffset +
+            '}';
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/CheckpointFile.java b/common/src/main/java/org/apache/rocketmq/common/utils/CheckpointFile.java
new file mode 100644
index 000000000..e47a2cd5c
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/CheckpointFile.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.utils;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Entry Checkpoint file util
+ * Format:
+ *   First line: Entries size
+ *   Then: every Entry(String)
+ */
+public class CheckpointFile<T> {
+
+    public interface CheckpointSerializer<T> {
+        /**
+         * Serialize entry to line
+         */
+        String toLine(final T entry);
+
+        /**
+         * DeSerialize line to entry
+         */
+        T fromLine(final String line);
+    }
+
+    private final String path;
+    private final CheckpointSerializer<T> serializer;
+
+    public CheckpointFile(final String path, final CheckpointSerializer<T> serializer) {
+        this.path = path;
+        this.serializer = serializer;
+    }
+
+    /**
+     * Write entries to file
+     */
+    public void write(final List<T> entries) throws IOException  {
+        if (entries.isEmpty()) {
+            return;
+        }
+        synchronized (this) {
+            final FileOutputStream fos = new FileOutputStream(this.path);
+            try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8))) {
+                // Write size
+                writer.write(entries.size() + "");
+                writer.newLine();
+
+                // Write entries
+                for (T entry : entries) {
+                    final String line = this.serializer.toLine(entry);
+                    if (line != null && !line.isEmpty()) {
+                        writer.write(line);
+                        writer.newLine();
+                    }
+                }
+
+                writer.flush();
+                fos.getFD().sync();
+            }
+        }
+    }
+
+    /**
+     * Read entries from file
+     */
+    public List<T> read() throws IOException {
+        final ArrayList<T> result = new ArrayList<>();
+        synchronized (this) {
+            final File file = new File(this.path);
+            if (!file.exists()) {
+                return result;
+            }
+            final BufferedReader reader = Files.newBufferedReader(file.toPath());
+            try {
+                // Read size
+                int expectedLines = Integer.parseInt(reader.readLine());
+
+                // Read entries
+                String line = reader.readLine();
+                while (line != null) {
+                    final T entry = this.serializer.fromLine(line);
+                    if (entry != null) {
+                        result.add(entry);
+                    }
+                    line = reader.readLine();
+                }
+                if (result.size() != expectedLines) {
+                    final String err = String.format("Expect %d entries, only found %d entries", expectedLines, result.size());
+                    throw new IOException(err);
+                }
+                return result;
+            } finally {
+                reader.close();
+            }
+        }
+    }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index dd206bbb4..89b807744 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -692,7 +692,6 @@ public class RouteInfoManager {
                 this.lock.writeLock().unlock();
             }
         } catch (Exception e) {
-            e.printStackTrace();
             log.error("unregisterBroker Exception", e);
         }
     }
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index dc6cbf591..b3d9e6556 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -77,6 +77,7 @@ import org.apache.rocketmq.store.config.StorePathConfigHelper;
 import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
 import org.apache.rocketmq.store.ha.DefaultHAService;
 import org.apache.rocketmq.store.ha.HAService;
+import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
 import org.apache.rocketmq.store.hook.PutMessageHook;
 import org.apache.rocketmq.store.hook.SendMessageBackHook;
 import org.apache.rocketmq.store.index.IndexService;
@@ -189,8 +190,13 @@ public class DefaultMessageStore implements MessageStore {
         if (!messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
             this.haService = ServiceProvider.loadClass(ServiceProvider.HA_SERVICE_ID, HAService.class);
             if (null == this.haService) {
-                this.haService = new DefaultHAService();
-                LOGGER.warn("Load default HA Service: {}", DefaultHAService.class.getSimpleName());
+                if (this.messageStoreConfig.isStartupControllerMode()) {
+                    this.haService = new AutoSwitchHAService();
+                    LOGGER.warn("Load AutoSwitch HA Service: {}", AutoSwitchHAService.class.getSimpleName());
+                } else {
+                    this.haService = new DefaultHAService();
+                    LOGGER.warn("Load default HA Service: {}", DefaultHAService.class.getSimpleName());
+                }
             }
             this.haService.init(this);
         }
@@ -1932,6 +1938,13 @@ public class DefaultMessageStore implements MessageStore {
                 deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
                     destroyMappedFileIntervalForcibly, cleanAtOnce, deleteFileBatchMax);
                 if (deleteCount > 0) {
+                    // If in the controller mode, we should notify the AutoSwitchHaService to truncateEpochFile
+                    if (DefaultMessageStore.this.messageStoreConfig.isStartupControllerMode()) {
+                        if (DefaultMessageStore.this.haService instanceof AutoSwitchHAService) {
+                            final long minPhyOffset = getMinPhyOffset();
+                            ((AutoSwitchHAService) DefaultMessageStore.this.haService).truncateEpochFilePrefix(minPhyOffset - 1);
+                        }
+                    }
                 } else if (isUsageExceedsThreshold) {
                     LOGGER.warn("disk space will be full soon, but delete file failed.");
                 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 1a6d6ae48..99b88d521 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -33,6 +33,11 @@ public class MessageStoreConfig {
     private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
         + File.separator + "commitlog";
 
+    //The directory in which the commitlog is kept
+    @ImportantField
+    private String storePathEpochFile = System.getProperty("user.home") + File.separator + "store"
+        + File.separator + "commitlog" + File.separator + "epochFileCheckpoint";
+
     private String readOnlyCommitLogStorePaths = null;
 
     // CommitLog file size,default is 1G
@@ -248,6 +253,12 @@ public class MessageStoreConfig {
     @ImportantField
     private boolean enableAutoInSyncReplicas = false;
 
+    /**
+     * Whether startup controller mode
+     */
+    @ImportantField
+    private boolean startupControllerMode = false;
+
     /**
      * Enable or not ha flow control
      */
@@ -285,6 +296,11 @@ public class MessageStoreConfig {
 
     private boolean syncFromMinOffset = false;
 
+    /**
+     * Whether sync from lastFile when a new broker replicas join the master.
+     */
+    private boolean syncFromLastFile = false;
+
     public boolean isDebugLockEnable() {
         return debugLockEnable;
     }
@@ -459,6 +475,14 @@ public class MessageStoreConfig {
         this.storePathCommitLog = storePathCommitLog;
     }
 
+    public String getStorePathEpochFile() {
+        return storePathEpochFile;
+    }
+
+    public void setStorePathEpochFile(String storePathEpochFile) {
+        this.storePathEpochFile = storePathEpochFile;
+    }
+
     public String getDeleteWhen() {
         return deleteWhen;
     }
@@ -1150,6 +1174,14 @@ public class MessageStoreConfig {
         this.enableAutoInSyncReplicas = enableAutoInSyncReplicas;
     }
 
+    public boolean isStartupControllerMode() {
+        return startupControllerMode;
+    }
+
+    public void setStartupControllerMode(boolean startupControllerMode) {
+        this.startupControllerMode = startupControllerMode;
+    }
+
     public boolean isHaFlowControlEnable() {
         return haFlowControlEnable;
     }
@@ -1222,6 +1254,14 @@ public class MessageStoreConfig {
         this.syncFromMinOffset = syncFromMinOffset;
     }
 
+    public boolean isSyncFromLastFile() {
+        return syncFromLastFile;
+    }
+
+    public void setSyncFromLastFile(boolean syncFromLastFile) {
+        this.syncFromLastFile = syncFromLastFile;
+    }
+
     public boolean isEnableLmq() {
         return enableLmq;
     }
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAService.java
index 8c480a017..20f7b2046 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAService.java
@@ -43,22 +43,22 @@ public class DefaultHAService implements HAService {
 
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
 
-    private final AtomicInteger connectionCount = new AtomicInteger(0);
+    protected final AtomicInteger connectionCount = new AtomicInteger(0);
 
-    private final List<HAConnection> connectionList = new LinkedList<>();
+    protected final List<HAConnection> connectionList = new LinkedList<>();
 
-    private AcceptSocketService acceptSocketService;
+    protected AcceptSocketService acceptSocketService;
 
-    private DefaultMessageStore defaultMessageStore;
+    protected DefaultMessageStore defaultMessageStore;
 
-    private WaitNotifyObject waitNotifyObject = new WaitNotifyObject();
-    private AtomicLong push2SlaveMaxOffset = new AtomicLong(0);
+    protected WaitNotifyObject waitNotifyObject = new WaitNotifyObject();
+    protected AtomicLong push2SlaveMaxOffset = new AtomicLong(0);
 
-    private GroupTransferService groupTransferService;
+    protected GroupTransferService groupTransferService;
 
-    private DefaultHAClient haClient;
+    protected HAClient haClient;
 
-    private HAConnectionStateNotificationService haConnectionStateNotificationService;
+    protected HAConnectionStateNotificationService haConnectionStateNotificationService;
 
     public DefaultHAService() {
     }
@@ -66,7 +66,7 @@ public class DefaultHAService implements HAService {
     public void init(final DefaultMessageStore defaultMessageStore) throws IOException {
         this.defaultMessageStore = defaultMessageStore;
         this.acceptSocketService =
-            new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
+            new DefaultAcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
         this.groupTransferService = new GroupTransferService(this, defaultMessageStore);
         if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
             this.haClient = new DefaultHAClient(this.defaultMessageStore);
@@ -125,13 +125,13 @@ public class DefaultHAService implements HAService {
         }
     }
 
-    public void addConnection(final DefaultHAConnection conn) {
+    public void addConnection(final HAConnection conn) {
         synchronized (this.connectionList) {
             this.connectionList.add(conn);
         }
     }
 
-    public void removeConnection(final DefaultHAConnection conn) {
+    public void removeConnection(final HAConnection conn) {
         this.haConnectionStateNotificationService.checkConnectionStateAndNotify(conn);
         synchronized (this.connectionList) {
             this.connectionList.remove(conn);
@@ -148,6 +148,18 @@ public class DefaultHAService implements HAService {
         this.haConnectionStateNotificationService.shutdown();
     }
 
+    @Override public boolean changeToMaster(int masterEpoch) {
+        return false;
+    }
+
+    @Override public boolean changeToSlave(String newMasterAddr, String newHaMasterAddr, int newMasterEpoch) {
+        return false;
+    }
+
+    @Override public Set<String> checkSyncStateSetChanged() {
+        return null;
+    }
+
     public void destroyConnections() {
         synchronized (this.connectionList) {
             for (HAConnection c : this.connectionList) {
@@ -180,7 +192,7 @@ public class DefaultHAService implements HAService {
         return inSyncNums;
     }
 
-    private boolean isInSyncSlave(final long masterPutWhere, HAConnection conn) {
+    protected boolean isInSyncSlave(final long masterPutWhere, HAConnection conn) {
         if (masterPutWhere - conn.getSlaveAckOffset() < this.defaultMessageStore.getMessageStoreConfig()
             .getHaMaxGapNotInSync()) {
             return true;
@@ -241,10 +253,28 @@ public class DefaultHAService implements HAService {
         return info;
     }
 
+    class DefaultAcceptSocketService extends AcceptSocketService {
+
+        public DefaultAcceptSocketService(int port) {
+            super(port);
+        }
+
+        @Override protected HAConnection createConnection(SocketChannel sc) throws IOException {
+            return new DefaultHAConnection(DefaultHAService.this, sc);
+        }
+
+        @Override public String getServiceName() {
+            if (defaultMessageStore.getBrokerConfig().isInBrokerContainer()) {
+                return defaultMessageStore.getBrokerConfig().getLoggerIdentifier() + AcceptSocketService.class.getSimpleName();
+            }
+            return DefaultAcceptSocketService.class.getSimpleName();
+        }
+    }
+
     /**
      * Listens to slave connections to create {@link HAConnection}.
      */
-    class AcceptSocketService extends ServiceThread {
+    protected abstract class AcceptSocketService extends ServiceThread {
         private final SocketAddress socketAddressListen;
         private ServerSocketChannel serverSocketChannel;
         private Selector selector;
@@ -302,7 +332,7 @@ public class DefaultHAService implements HAService {
                                     DefaultHAService.log.info("HAService receive new connection, "
                                         + sc.socket().getRemoteSocketAddress());
                                     try {
-                                        DefaultHAConnection conn = new DefaultHAConnection(DefaultHAService.this, sc);
+                                        HAConnection conn = createConnection(sc);
                                         conn.start();
                                         DefaultHAService.this.addConnection(conn);
                                     } catch (Exception e) {
@@ -326,14 +356,8 @@ public class DefaultHAService implements HAService {
         }
 
         /**
-         * {@inheritDoc}
+         * Create ha connection
          */
-        @Override
-        public String getServiceName() {
-            if (defaultMessageStore.getBrokerConfig().isInBrokerContainer()) {
-                return defaultMessageStore.getBrokerConfig().getLoggerIdentifier() + AcceptSocketService.class.getSimpleName();
-            }
-            return AcceptSocketService.class.getSimpleName();
-        }
+        protected abstract HAConnection createConnection(final SocketChannel sc) throws IOException;
     }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
index 5f714b90e..20d72c499 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.store.ha;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.common.protocol.body.HARuntimeInfo;
@@ -48,6 +49,26 @@ public interface HAService {
      */
     void shutdown();
 
+    /**
+     * Change to master state
+     * @param masterEpoch the new masterEpoch
+     */
+    boolean changeToMaster(int masterEpoch);
+
+    /**
+     * Change to slave state
+     * @param newMasterAddr new master addr
+     * @param newHaMasterAddr new master HA addr
+     * @param newMasterEpoch new masterEpoch
+     */
+    boolean changeToSlave(String newMasterAddr, String newHaMasterAddr, int newMasterEpoch);
+
+    /**
+     * Check whether the syncStateSet changed
+     * @return new syncStateSet
+     */
+    Set<String> checkSyncStateSetChanged();
+
     /**
      * Update master address
      *
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
new file mode 100644
index 000000000..3ddd367a7
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.ha.FlowMonitor;
+import org.apache.rocketmq.store.ha.HAClient;
+import org.apache.rocketmq.store.ha.HAConnectionState;
+import org.apache.rocketmq.store.ha.io.AbstractHAReader;
+import org.apache.rocketmq.store.ha.io.HAWriter;
+
+public class AutoSwitchHAClient extends ServiceThread implements HAClient {
+
+
+    /**
+     * Transfer header buffer size. Schema: state ordinal + additional info(maxOffset or flag)
+     * If in handshake state, we reuse additional info as the flag -- isSyncFromLastFile.
+     */
+    public static final int TRANSFER_HEADER_SIZE = 4 + 8;
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
+    private final AtomicReference<String> masterHaAddress = new AtomicReference<>();
+    private final AtomicReference<String> masterAddress = new AtomicReference<>();
+    private final ByteBuffer transferHeaderBuffer = ByteBuffer.allocate(TRANSFER_HEADER_SIZE);
+    private final AutoSwitchHAService haService;
+    private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+    private final DefaultMessageStore messageStore;
+    private final EpochFileCache epochCache;
+
+    private SocketChannel socketChannel;
+    private Selector selector;
+    private AbstractHAReader haReader;
+    private HAWriter haWriter;
+    private FlowMonitor flowMonitor;
+    /**
+     * last time that slave reads date from master.
+     */
+    private long lastReadTimestamp;
+    /**
+     * last time that slave reports offset to master.
+     */
+    private long lastWriteTimestamp;
+
+    private long currentReportedOffset;
+    private int processPosition;
+    private volatile HAConnectionState currentState;
+    /**
+     * Current epoch
+     */
+    private volatile long currentReceivedEpoch;
+
+    /**
+     * Confirm offset = min(localMaxOffset, master confirm offset).
+     */
+    private volatile long confirmOffset;
+
+    public static final int SYNC_FROM_LAST_FILE = -1;
+
+    public static final int SYNC_FROM_FIRST_FILE = -2;
+
+    public AutoSwitchHAClient(AutoSwitchHAService haService, DefaultMessageStore defaultMessageStore,
+        EpochFileCache epochCache) throws IOException {
+        this.haService = haService;
+        this.messageStore = defaultMessageStore;
+        this.epochCache = epochCache;
+        init();
+    }
+
+    public void init() throws IOException {
+        this.selector = RemotingUtil.openSelector();
+        this.flowMonitor = new FlowMonitor(this.messageStore.getMessageStoreConfig());
+        this.haReader = new HAClientReader();
+        haReader.registerHook(readSize -> {
+            if (readSize > 0) {
+                AutoSwitchHAClient.this.flowMonitor.addByteCountTransferred(readSize);
+                lastReadTimestamp = System.currentTimeMillis();
+            }
+        });
+        this.haWriter = new HAWriter();
+        haWriter.registerHook(writeSize -> {
+            if (writeSize > 0) {
+                lastWriteTimestamp = System.currentTimeMillis();
+            }
+        });
+        changeCurrentState(HAConnectionState.READY);
+        this.currentReceivedEpoch = -1;
+        this.currentReportedOffset = 0;
+        this.processPosition = 0;
+        this.confirmOffset = -1;
+        this.lastReadTimestamp = System.currentTimeMillis();
+        this.lastWriteTimestamp = System.currentTimeMillis();
+    }
+
+    public void reOpen() throws IOException {
+        shutdown();
+        init();
+    }
+
+    @Override public String getServiceName() {
+        return AutoSwitchHAClient.class.getSimpleName();
+    }
+
+    @Override public void updateMasterAddress(String newAddress) {
+        String currentAddr = this.masterAddress.get();
+        if (masterAddress.compareAndSet(currentAddr, newAddress)) {
+            LOGGER.info("update master address, OLD: " + currentAddr + " NEW: " + newAddress);
+        }
+    }
+
+    @Override public void updateHaMasterAddress(String newAddress) {
+        String currentAddr = this.masterHaAddress.get();
+        if (masterHaAddress.compareAndSet(currentAddr, newAddress)) {
+            LOGGER.info("update master ha address, OLD: " + currentAddr + " NEW: " + newAddress);
+        }
+    }
+
+    @Override public String getMasterAddress() {
+        return this.masterAddress.get();
+    }
+
+    @Override public String getHaMasterAddress() {
+        return this.masterHaAddress.get();
+    }
+
+    @Override public long getLastReadTimestamp() {
+        return this.lastReadTimestamp;
+    }
+
+    @Override public long getLastWriteTimestamp() {
+        return this.lastWriteTimestamp;
+    }
+
+    @Override public HAConnectionState getCurrentState() {
+        return this.currentState;
+    }
+
+    @Override public void changeCurrentState(HAConnectionState haConnectionState) {
+        LOGGER.info("change state to {}", haConnectionState);
+        this.currentState = haConnectionState;
+    }
+
+    public void closeMasterAndWait() {
+        this.closeMaster();
+        this.waitForRunning(1000 * 5);
+    }
+
+    @Override public void closeMaster() {
+        if (null != this.socketChannel) {
+            try {
+                SelectionKey sk = this.socketChannel.keyFor(this.selector);
+                if (sk != null) {
+                    sk.cancel();
+                }
+
+                this.socketChannel.close();
+                this.socketChannel = null;
+
+                LOGGER.info("AutoSwitchHAClient close connection with master {}", this.masterHaAddress.get());
+                this.changeCurrentState(HAConnectionState.READY);
+            } catch (IOException e) {
+                LOGGER.warn("CloseMaster exception. ", e);
+            }
+
+            this.lastReadTimestamp = 0;
+            this.processPosition = 0;
+
+            this.byteBufferRead.position(0);
+            this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
+        }
+    }
+
+    @Override public long getTransferredByteInSecond() {
+        return this.flowMonitor.getTransferredByteInSecond();
+    }
+
+    @Override public void shutdown() {
+        changeCurrentState(HAConnectionState.SHUTDOWN);
+        // Shutdown thread firstly
+        this.flowMonitor.shutdown();
+        super.shutdown();
+
+        closeMaster();
+        try {
+            this.selector.close();
+        } catch (IOException e) {
+            LOGGER.warn("Close the selector of AutoSwitchHAClient error, ", e);
+        }
+    }
+
+    private boolean isTimeToReportOffset() {
+        long interval = this.messageStore.now() - this.lastWriteTimestamp;
+        return interval > this.messageStore.getMessageStoreConfig().getHaSendHeartbeatInterval();
+    }
+
+    private boolean sendHandshakeHeader() {
+        this.transferHeaderBuffer.position(0);
+        this.transferHeaderBuffer.limit(TRANSFER_HEADER_SIZE);
+        this.transferHeaderBuffer.putInt(HAConnectionState.HANDSHAKE.ordinal());
+        if (this.haService.getDefaultMessageStore().getMessageStoreConfig().isSyncFromLastFile()) {
+            this.transferHeaderBuffer.putLong(SYNC_FROM_LAST_FILE);
+        } else {
+            this.transferHeaderBuffer.putLong(SYNC_FROM_FIRST_FILE);
+        }
+        this.transferHeaderBuffer.flip();
+        return this.haWriter.write(this.socketChannel, this.transferHeaderBuffer);
+    }
+
+    private void handshakeWithMaster() throws IOException {
+        sendHandshakeHeader();
+        boolean result = this.sendHandshakeHeader();
+        if (!result) {
+            closeMasterAndWait();
+        }
+
+        this.selector.select(5000);
+
+        result = this.haReader.read(this.socketChannel, this.byteBufferRead);
+        if (!result) {
+            closeMasterAndWait();
+            return;
+        }
+    }
+
+    private boolean reportSlaveOffset(final long offsetToReport) {
+        this.transferHeaderBuffer.position(0);
+        this.transferHeaderBuffer.limit(TRANSFER_HEADER_SIZE);
+        this.transferHeaderBuffer.putInt(this.currentState.ordinal());
+        this.transferHeaderBuffer.putLong(offsetToReport);
+        this.transferHeaderBuffer.flip();
+        return this.haWriter.write(this.socketChannel, this.transferHeaderBuffer);
+    }
+
+    private boolean reportSlaveMaxOffset() {
+        boolean result = true;
+        final long maxPhyOffset = this.messageStore.getMaxPhyOffset();
+        if (maxPhyOffset > this.currentReportedOffset) {
+            this.currentReportedOffset = maxPhyOffset;
+            result = reportSlaveOffset(this.currentReportedOffset);
+        }
+        return result;
+    }
+
+    public boolean connectMaster() throws ClosedChannelException {
+        if (null == this.socketChannel) {
+            String addr = this.masterHaAddress.get();
+            if (addr != null) {
+                SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
+                this.socketChannel = RemotingUtil.connect(socketAddress);
+                if (this.socketChannel != null) {
+                    this.socketChannel.register(this.selector, SelectionKey.OP_READ);
+                    LOGGER.info("AutoSwitchHAClient connect to master {}", addr);
+                    changeCurrentState(HAConnectionState.HANDSHAKE);
+                }
+            }
+            this.currentReportedOffset = this.messageStore.getMaxPhyOffset();
+            this.lastReadTimestamp = System.currentTimeMillis();
+        }
+        return this.socketChannel != null;
+    }
+
+    private boolean transferFromMaster() throws IOException {
+        boolean result;
+        if (isTimeToReportOffset()) {
+            LOGGER.info("Slave report current offset {}", this.currentReportedOffset);
+            result = reportSlaveOffset(this.currentReportedOffset);
+            if (!result) {
+                return false;
+            }
+        }
+
+        this.selector.select(1000);
+
+        result = this.haReader.read(this.socketChannel, this.byteBufferRead);
+        if (!result) {
+            return false;
+        }
+
+        return this.reportSlaveMaxOffset();
+    }
+
+    @Override public void run() {
+        LOGGER.info(this.getServiceName() + " service started");
+
+        this.flowMonitor.start();
+        while (!this.isStopped()) {
+            try {
+                switch (this.currentState) {
+                    case SHUTDOWN:
+                        return;
+                    case READY:
+                        // Truncate invalid msg first
+                        final long truncateOffset = AutoSwitchHAClient.this.haService.truncateInvalidMsg();
+                        if (truncateOffset >= 0) {
+                            AutoSwitchHAClient.this.epochCache.truncateSuffixByOffset(truncateOffset);
+                        }
+                        if (!connectMaster()) {
+                            LOGGER.warn("AutoSwitchHAClient connect to master {} failed", this.masterHaAddress.get());
+                            waitForRunning(1000 * 5);
+                        }
+                        continue;
+                    case HANDSHAKE:
+                        handshakeWithMaster();
+                        continue;
+                    case TRANSFER:
+                        if (!transferFromMaster()) {
+                            closeMasterAndWait();
+                            continue;
+                        }
+                        break;
+                    case SUSPEND:
+                    default:
+                        waitForRunning(1000 * 5);
+                        continue;
+                }
+                long interval = this.messageStore.now() - this.lastReadTimestamp;
+                if (interval > this.messageStore.getMessageStoreConfig().getHaHousekeepingInterval()) {
+                    LOGGER.warn("AutoSwitchHAClient, housekeeping, found this connection[" + this.masterHaAddress
+                        + "] expired, " + interval);
+                    closeMaster();
+                    LOGGER.warn("AutoSwitchHAClient, master not response some time, so close connection");
+                }
+            } catch (Exception e) {
+                LOGGER.warn(this.getServiceName() + " service has exception. ", e);
+                closeMasterAndWait();
+            }
+        }
+
+    }
+
+    /**
+     * Compare the master and slave's epoch file, find consistent point, do truncate.
+     */
+    private boolean doTruncate(List<EpochEntry> masterEpochEntries, long masterEndOffset) {
+        if (this.epochCache.getEntrySize() == 0) {
+            // If epochMap is empty, means the broker is a new replicas
+            LOGGER.info("Slave local epochCache is empty, skip truncate log");
+            changeCurrentState(HAConnectionState.TRANSFER);
+            this.currentReportedOffset = 0;
+        } else {
+            final EpochFileCache masterEpochCache = new EpochFileCache();
+            masterEpochCache.initCacheFromEntries(masterEpochEntries);
+            masterEpochCache.setLastEpochEntryEndOffset(masterEndOffset);
+            final List<EpochEntry> localEpochEntries = this.epochCache.getAllEntries();
+            final EpochFileCache localEpochCache = new EpochFileCache();
+            localEpochCache.initCacheFromEntries(localEpochEntries);
+            localEpochCache.setLastEpochEntryEndOffset(this.messageStore.getMaxPhyOffset());
+
+            final long truncateOffset = localEpochCache.findConsistentPoint(masterEpochCache);
+            if (truncateOffset < 0) {
+                // If truncateOffset < 0, means we can't find a consistent point
+                LOGGER.error("Failed to find a consistent point between masterEpoch:{} and slaveEpoch:{}", masterEpochEntries, localEpochEntries);
+                return false;
+            }
+            if (!this.messageStore.truncateFiles(truncateOffset)) {
+                LOGGER.error("Failed to truncate slave log to {}", truncateOffset);
+                return false;
+            }
+            this.epochCache.truncateSuffixByOffset(truncateOffset);
+            LOGGER.info("Truncate slave log to {} success, change to transfer state", truncateOffset);
+            changeCurrentState(HAConnectionState.TRANSFER);
+            this.currentReportedOffset = truncateOffset;
+        }
+        if (!reportSlaveMaxOffset()) {
+            LOGGER.error("AutoSwitchHAClient report max offset to master failed");
+            return false;
+        }
+        return true;
+    }
+
+    class HAClientReader extends AbstractHAReader {
+
+        @Override
+        protected boolean processReadResult(ByteBuffer byteBufferRead) {
+            int readSocketPos = byteBufferRead.position();
+
+            while (true) {
+                int diff = byteBufferRead.position() - AutoSwitchHAClient.this.processPosition;
+                if (diff >= AutoSwitchHAConnection.MSG_HEADER_SIZE) {
+                    int processPosition =  AutoSwitchHAClient.this.processPosition;
+                    int masterState = byteBufferRead.getInt(processPosition);
+                    int bodySize = byteBufferRead.getInt(processPosition + 4);
+                    long masterOffset = byteBufferRead.getLong(processPosition + 4 + 4);
+                    int masterEpoch = byteBufferRead.getInt(processPosition + 4 + 4 + 8);
+                    long masterEpochStartOffset = byteBufferRead.getLong(processPosition + 4 + 4 + 8 + 4);
+                    long confirmOffset = byteBufferRead.getLong(processPosition + 4 + 4 + 8 + 4 + 8);
+
+                    if (masterState != AutoSwitchHAClient.this.currentState.ordinal()) {
+                        AutoSwitchHAClient.this.processPosition += AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize;
+                        AutoSwitchHAClient.this.waitForRunning(1);
+                        LOGGER.error("State not matched, masterState:{}, slaveState:{}, bodySize:{}, offset:{}, masterEpoch:{}, masterEpochStartOffset:{}, confirmOffset:{}",
+                            masterState, AutoSwitchHAClient.this.currentState, bodySize, masterOffset, masterEpoch, masterEpochStartOffset, confirmOffset);
+                        return true;
+                    }
+                    LOGGER.info("Receive master msg, masterState:{}, bodySize:{}, offset:{}, masterEpoch:{}, masterEpochStartOffset:{}, confirmOffset:{}",
+                        HAConnectionState.values()[masterState], bodySize, masterOffset, masterEpoch, masterEpochStartOffset, confirmOffset);
+
+                    if (diff >= (AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize)) {
+                        switch (AutoSwitchHAClient.this.currentState) {
+                            case HANDSHAKE:
+                                AutoSwitchHAClient.this.processPosition += AutoSwitchHAConnection.MSG_HEADER_SIZE;
+                                // Truncate log
+                                int entrySize = AutoSwitchHAConnection.EPOCH_ENTRY_SIZE;
+                                final int entryNums = bodySize / entrySize;
+                                final ArrayList<EpochEntry> epochEntries = new ArrayList<>(entryNums);
+                                for (int i = 0; i < entryNums; i++) {
+                                    int epoch = byteBufferRead.getInt(AutoSwitchHAClient.this.processPosition + i * entrySize);
+                                    long startOffset = byteBufferRead.getLong(AutoSwitchHAClient.this.processPosition + i * entrySize + 4);
+                                    epochEntries.add(new EpochEntry(epoch, startOffset));
+                                }
+                                byteBufferRead.position(readSocketPos);
+                                AutoSwitchHAClient.this.processPosition += bodySize;
+                                LOGGER.info("Receive handshake, masterMaxPosition {}, masterEpochEntries:{}, try truncate log", masterOffset, epochEntries);
+                                if (!doTruncate(epochEntries, masterOffset)) {
+                                    waitForRunning(1000 * 2);
+                                    LOGGER.error("AutoSwitchHAClient truncate log failed in handshake state");
+                                    return false;
+                                }
+                                break;
+                            case TRANSFER:
+                                byte[] bodyData = new byte[bodySize];
+                                byteBufferRead.position(AutoSwitchHAClient.this.processPosition + AutoSwitchHAConnection.MSG_HEADER_SIZE);
+                                byteBufferRead.get(bodyData);
+                                byteBufferRead.position(readSocketPos);
+                                AutoSwitchHAClient.this.processPosition += AutoSwitchHAConnection.MSG_HEADER_SIZE + bodySize;
+
+                                long slavePhyOffset = AutoSwitchHAClient.this.messageStore.getMaxPhyOffset();
+                                if (slavePhyOffset != 0) {
+                                    if (slavePhyOffset != masterOffset) {
+                                        LOGGER.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
+                                            + slavePhyOffset + " MASTER: " + masterOffset);
+                                        return false;
+                                    }
+                                }
+
+                                // If epoch changed
+                                if (masterEpoch != AutoSwitchHAClient.this.currentReceivedEpoch) {
+                                    AutoSwitchHAClient.this.currentReceivedEpoch = masterEpoch;
+                                    AutoSwitchHAClient.this.epochCache.appendEntry(new EpochEntry(masterEpoch, masterEpochStartOffset));
+                                }
+                                AutoSwitchHAClient.this.confirmOffset = Math.min(confirmOffset, messageStore.getMaxPhyOffset());
+
+                                if (bodySize > 0) {
+                                    final DefaultMessageStore messageStore = AutoSwitchHAClient.this.messageStore;
+                                    if (messageStore.appendToCommitLog(masterOffset, bodyData, 0, bodyData.length)) {
+                                        LOGGER.info("Slave append master log success, from {}, size {}, epoch:{}", masterOffset, bodySize, masterEpoch);
+                                    }
+                                }
+
+                                if (!reportSlaveMaxOffset()) {
+                                    LOGGER.error("AutoSwitchHAClient report max offset to master failed");
+                                    return false;
+                                }
+                                break;
+                            default:
+                                break;
+                        }
+                        continue;
+                    }
+                }
+
+                if (!byteBufferRead.hasRemaining()) {
+                    byteBufferRead.position(AutoSwitchHAClient.this.processPosition);
+                    byteBufferRead.compact();
+                    AutoSwitchHAClient.this.processPosition = 0;
+                }
+
+                break;
+            }
+            return true;
+        }
+    }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
new file mode 100644
index 000000000..0b7ce6b39
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -0,0 +1,630 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.List;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.ha.FlowMonitor;
+import org.apache.rocketmq.store.ha.HAConnection;
+import org.apache.rocketmq.store.ha.HAConnectionState;
+import org.apache.rocketmq.store.ha.io.AbstractHAReader;
+import org.apache.rocketmq.store.ha.io.HAWriter;
+
+public class AutoSwitchHAConnection implements HAConnection {
+    /**
+     * Header protocol in syncing msg from master.
+     * Format: current state + body size + offset + epoch  + epochStartOffset + additionalInfo(confirmOffset).
+     * If the msg is hankeShakeMsg, the body size = EpochEntrySize * EpochEntryNums, the offset is maxOffset in master.
+     */
+    public static final int MSG_HEADER_SIZE = 4 + 4 + 8 + 4 + 8 + 8;
+    public static final int EPOCH_ENTRY_SIZE = 12;
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private final AutoSwitchHAService haService;
+    private final SocketChannel socketChannel;
+    private final String clientAddress;
+    private final EpochFileCache epochCache;
+    private final AbstractWriteSocketService writeSocketService;
+    private final ReadSocketService readSocketService;
+    private volatile HAConnectionState currentState = HAConnectionState.HANDSHAKE;
+    private volatile long slaveRequestOffset = -1;
+    private volatile long slaveAckOffset = -1;
+    /**
+     * Whether the slave have already sent a handshake message
+     */
+    private volatile boolean isSlaveSendHandshake = false;
+    private volatile int currentTransferEpoch = -1;
+    private volatile long currentTransferEpochEndOffset = 0;
+    private volatile boolean isSyncFromLastFile = false;
+    private final FlowMonitor flowMonitor;
+
+    public AutoSwitchHAConnection(AutoSwitchHAService haService, SocketChannel socketChannel,
+        EpochFileCache epochCache) throws IOException {
+        this.haService = haService;
+        this.socketChannel = socketChannel;
+        this.epochCache = epochCache;
+        this.clientAddress = this.socketChannel.socket().getRemoteSocketAddress().toString();
+        this.socketChannel.configureBlocking(false);
+        this.socketChannel.socket().setSoLinger(false, -1);
+        this.socketChannel.socket().setTcpNoDelay(true);
+        this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
+        this.socketChannel.socket().setSendBufferSize(1024 * 64);
+        this.writeSocketService = new WriteSocketService(this.socketChannel);
+        this.readSocketService = new ReadSocketService(this.socketChannel);
+        this.haService.getConnectionCount().incrementAndGet();
+        this.flowMonitor = new FlowMonitor(haService.getDefaultMessageStore().getMessageStoreConfig());
+    }
+
+    @Override public void start() {
+        changeCurrentState(HAConnectionState.HANDSHAKE);
+        this.flowMonitor.start();
+        this.readSocketService.start();
+        this.writeSocketService.start();
+    }
+
+    @Override public void shutdown() {
+        changeCurrentState(HAConnectionState.SHUTDOWN);
+        this.flowMonitor.shutdown(true);
+        this.writeSocketService.shutdown(true);
+        this.readSocketService.shutdown(true);
+        this.close();
+    }
+
+    @Override public void close() {
+        if (this.socketChannel != null) {
+            try {
+                this.socketChannel.close();
+            } catch (final IOException e) {
+                LOGGER.error("", e);
+            }
+        }
+    }
+
+    public void changeCurrentState(HAConnectionState connectionState) {
+        LOGGER.info("change state to {}", connectionState);
+        this.currentState = connectionState;
+    }
+
+    @Override public HAConnectionState getCurrentState() {
+        return currentState;
+    }
+
+    @Override public SocketChannel getSocketChannel() {
+        return socketChannel;
+    }
+
+    @Override public String getClientAddress() {
+        return clientAddress;
+    }
+
+    @Override public long getSlaveAckOffset() {
+        return slaveAckOffset;
+    }
+
+    @Override public long getTransferredByteInSecond() {
+        return flowMonitor.getTransferredByteInSecond();
+    }
+
+    @Override public long getTransferFromWhere() {
+        return this.writeSocketService.getNextTransferFromWhere();
+    }
+
+    private void changeTransferEpochToNext(final EpochEntry entry) {
+        this.currentTransferEpoch = entry.getEpoch();
+        this.currentTransferEpochEndOffset = entry.getEndOffset();
+        if (entry.getEpoch() == this.epochCache.lastEpoch()) {
+            // Use -1 to stand for Long.max
+            this.currentTransferEpochEndOffset = -1;
+        }
+    }
+
+    class ReadSocketService extends ServiceThread {
+        private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
+        private final Selector selector;
+        private final SocketChannel socketChannel;
+        private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+        private final AbstractHAReader haReader;
+        private int processPosition = 0;
+        private volatile long lastReadTimestamp = System.currentTimeMillis();
+
+        public ReadSocketService(final SocketChannel socketChannel) throws IOException {
+            this.selector = RemotingUtil.openSelector();
+            this.socketChannel = socketChannel;
+            this.socketChannel.register(this.selector, SelectionKey.OP_READ);
+            this.setDaemon(true);
+            haReader = new HAServerReader();
+            haReader.registerHook(readSize -> {
+                if (readSize > 0) {
+                    ReadSocketService.this.lastReadTimestamp =
+                        haService.getDefaultMessageStore().getSystemClock().now();
+                }
+            });
+        }
+
+        @Override
+        public void run() {
+            LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.selector.select(1000);
+                    boolean ok = this.haReader.read(this.socketChannel, this.byteBufferRead);
+                    if (!ok) {
+                        AutoSwitchHAConnection.LOGGER.error("processReadEvent error");
+                        break;
+                    }
+
+                    long interval = haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
+                    if (interval > haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
+                        LOGGER.warn("ha housekeeping, found this connection[" + clientAddress + "] expired, " + interval);
+                        break;
+                    }
+                } catch (Exception e) {
+                    AutoSwitchHAConnection.LOGGER.error(this.getServiceName() + " service has exception.", e);
+                    break;
+                }
+            }
+
+            this.makeStop();
+
+            changeCurrentState(HAConnectionState.SHUTDOWN);
+
+            writeSocketService.makeStop();
+
+            haService.removeConnection(AutoSwitchHAConnection.this);
+
+            haService.getConnectionCount().decrementAndGet();
+
+            SelectionKey sk = this.socketChannel.keyFor(this.selector);
+            if (sk != null) {
+                sk.cancel();
+            }
+
+            try {
+                this.selector.close();
+                this.socketChannel.close();
+            } catch (IOException e) {
+                AutoSwitchHAConnection.LOGGER.error("", e);
+            }
+
+            AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        @Override
+        public String getServiceName() {
+            return ReadSocketService.class.getSimpleName();
+        }
+
+        class HAServerReader extends AbstractHAReader {
+            @Override
+            protected boolean processReadResult(ByteBuffer byteBufferRead) {
+                while (true) {
+                    int diff = byteBufferRead.position() - ReadSocketService.this.processPosition;
+                    if (diff >= AutoSwitchHAClient.TRANSFER_HEADER_SIZE) {
+                        int readPosition = ReadSocketService.this.processPosition;
+                        HAConnectionState slaveState = HAConnectionState.values()[byteBufferRead.getInt(readPosition)];
+
+                        switch (slaveState) {
+                            case HANDSHAKE:
+                                isSlaveSendHandshake = true;
+                                long syncFromLastFileFlag = byteBufferRead.getLong(readPosition + 4);
+                                if (syncFromLastFileFlag == AutoSwitchHAClient.SYNC_FROM_LAST_FILE) {
+                                    AutoSwitchHAConnection.this.isSyncFromLastFile = true;
+                                    LOGGER.info("Slave request sync from lastFile");
+                                }
+                                ReadSocketService.this.processPosition += AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
+                                break;
+                            case TRANSFER:
+                                long slaveMaxOffset = byteBufferRead.getLong(readPosition + 4);
+                                ReadSocketService.this.processPosition += AutoSwitchHAClient.TRANSFER_HEADER_SIZE;
+
+                                slaveAckOffset = slaveMaxOffset;
+                                if (slaveRequestOffset < 0) {
+                                    slaveRequestOffset = slaveMaxOffset;
+                                }
+                                LOGGER.info("slave[" + clientAddress + "] request offset " + slaveMaxOffset);
+                                AutoSwitchHAConnection.this.haService.notifyTransferSome(AutoSwitchHAConnection.this.slaveAckOffset);
+                                break;
+                            default:
+                                LOGGER.error("Current state illegal {}", currentState);
+                                break;
+                        }
+
+                        if (!slaveState.equals(currentState)) {
+                            LOGGER.warn("Master change state from {} to {}", currentState, slaveState);
+                            changeCurrentState(slaveState);
+                        }
+                        continue;
+                    }
+
+                    if (!byteBufferRead.hasRemaining()) {
+                        byteBufferRead.position(ReadSocketService.this.processPosition);
+                        byteBufferRead.compact();
+                        ReadSocketService.this.processPosition = 0;
+                    }
+                    break;
+                }
+
+                return true;
+            }
+        }
+    }
+
+    class WriteSocketService extends AbstractWriteSocketService {
+        private SelectMappedBufferResult selectMappedBufferResult;
+
+        public WriteSocketService(final SocketChannel socketChannel) throws IOException {
+            super(socketChannel);
+        }
+
+        @Override
+        protected int getNextTransferDataSize() {
+            SelectMappedBufferResult selectResult = haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
+            if (selectResult == null || selectResult.getSize() <= 0) {
+                return 0;
+            }
+            this.selectMappedBufferResult = selectResult;
+            return selectResult.getSize();
+        }
+
+        @Override
+        protected void releaseData() {
+            this.selectMappedBufferResult.release();
+            this.selectMappedBufferResult = null;
+        }
+
+        @Override
+        protected boolean transferData(int maxTransferSize) {
+
+            if (null != this.selectMappedBufferResult && maxTransferSize >= 0) {
+                this.selectMappedBufferResult.getByteBuffer().limit(maxTransferSize);
+            }
+
+            // Write Header
+            boolean result = haWriter.write(this.socketChannel, this.byteBufferHeader);
+
+            if (!result) {
+                return false;
+            }
+
+            if (null == this.selectMappedBufferResult) {
+                return true;
+            }
+
+            // Write Body
+            result = haWriter.write(this.socketChannel, this.selectMappedBufferResult.getByteBuffer());
+
+            if (result) {
+                releaseData();
+            }
+            return result;
+        }
+
+        @Override
+        protected void onStop() {
+            if (this.selectMappedBufferResult != null) {
+                this.selectMappedBufferResult.release();
+            }
+        }
+
+        @Override
+        public String getServiceName() {
+            if (haService.getDefaultMessageStore().getBrokerConfig().isInBrokerContainer()) {
+                return haService.getDefaultMessageStore().getBrokerConfig().getLoggerIdentifier() + WriteSocketService.class.getSimpleName();
+            }
+            return WriteSocketService.class.getSimpleName();
+        }
+    }
+
+    abstract class AbstractWriteSocketService extends ServiceThread {
+        protected final Selector selector;
+        protected final SocketChannel socketChannel;
+        protected final HAWriter haWriter;
+
+        protected final ByteBuffer byteBufferHeader = ByteBuffer.allocate(MSG_HEADER_SIZE);
+        // Store master epochFileCache: (Epoch + startOffset) * 1000
+        private final ByteBuffer handShakeBuffer = ByteBuffer.allocate(EPOCH_ENTRY_SIZE * 1000);
+        protected long nextTransferFromWhere = -1;
+        protected boolean lastWriteOver = true;
+        protected long lastWriteTimestamp = System.currentTimeMillis();
+        protected long lastPrintTimestamp = System.currentTimeMillis();
+        protected long transferOffset = 0;
+
+        public AbstractWriteSocketService(final SocketChannel socketChannel) throws IOException {
+            this.selector = RemotingUtil.openSelector();
+            this.socketChannel = socketChannel;
+            this.socketChannel.register(this.selector, SelectionKey.OP_WRITE);
+            this.setDaemon(true);
+            haWriter = new HAWriter();
+            haWriter.registerHook(writeSize -> {
+                flowMonitor.addByteCountTransferred(writeSize);
+                if (writeSize > 0) {
+                    AbstractWriteSocketService.this.lastWriteTimestamp =
+                        haService.getDefaultMessageStore().getSystemClock().now();
+                }
+            });
+        }
+
+        public long getNextTransferFromWhere() {
+            return this.nextTransferFromWhere;
+        }
+
+        // Handle shake method
+        private boolean buildHandshakeBuffer() {
+            final List<EpochEntry> epochEntries = AutoSwitchHAConnection.this.epochCache.getAllEntries();
+            final int lastEpoch = AutoSwitchHAConnection.this.epochCache.lastEpoch();
+            final long maxPhyOffset = AutoSwitchHAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset();
+            this.byteBufferHeader.position(0);
+            this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+            // State
+            this.byteBufferHeader.putInt(currentState.ordinal());
+            // Body size
+            this.byteBufferHeader.putInt(epochEntries.size() * EPOCH_ENTRY_SIZE);
+            // Offset
+            this.byteBufferHeader.putLong(maxPhyOffset);
+            // Epoch
+            this.byteBufferHeader.putInt(lastEpoch);
+            // EpochStartOffset (not needed in handshake)
+            this.byteBufferHeader.putLong(0L);
+            // Additional info (not needed in handshake)
+            this.byteBufferHeader.putLong(0L);
+            this.byteBufferHeader.flip();
+
+            // EpochEntries
+            this.handShakeBuffer.position(0);
+            this.handShakeBuffer.limit(EPOCH_ENTRY_SIZE * epochEntries.size());
+            for (final EpochEntry entry : epochEntries) {
+                if (entry != null) {
+                    this.handShakeBuffer.putInt(entry.getEpoch());
+                    this.handShakeBuffer.putLong(entry.getStartOffset());
+                }
+            }
+            this.handShakeBuffer.flip();
+            LOGGER.info("Master build handshake header: maxEpoch:{}, maxOffset:{}, epochEntries:{}", lastEpoch, maxPhyOffset, epochEntries);
+            return true;
+        }
+
+        private boolean handshakeWithSlave() {
+            // Write Header
+            boolean result = this.haWriter.write(this.socketChannel, this.byteBufferHeader);
+
+            if (!result) {
+                return false;
+            }
+
+            // Write Body
+            return this.haWriter.write(this.socketChannel, this.handShakeBuffer);
+        }
+
+        // Normal transfer method
+        private void buildTransferHeaderBuffer(long nextOffset, int bodySize) {
+            final EpochEntry entry = AutoSwitchHAConnection.this.epochCache.getEntry(AutoSwitchHAConnection.this.currentTransferEpoch);
+            if (entry == null) {
+                LOGGER.error("Failed to find epochEntry with epoch {} when build msg header", AutoSwitchHAConnection.this.currentTransferEpoch);
+                return;
+            }
+            // Build Header
+            this.byteBufferHeader.position(0);
+            this.byteBufferHeader.limit(MSG_HEADER_SIZE);
+            // State
+            this.byteBufferHeader.putInt(currentState.ordinal());
+            // Body size
+            this.byteBufferHeader.putInt(bodySize);
+            // Offset
+            this.byteBufferHeader.putLong(nextOffset);
+            // Epoch
+            this.byteBufferHeader.putInt(entry.getEpoch());
+            // EpochStartOffset
+            this.byteBufferHeader.putLong(entry.getStartOffset());
+            // Additional info(confirm offset)
+            final long confirmOffset = AutoSwitchHAConnection.this.haService.getConfirmOffset();
+            this.byteBufferHeader.putLong(confirmOffset);
+            this.byteBufferHeader.flip();
+            LOGGER.info("Master send msg, state:{}, size:{}, offset:{}, epoch:{}, epochStartOffset:{}, confirmOffset:{}",
+                currentState, bodySize, nextOffset, entry.getEpoch(), entry.getStartOffset(), confirmOffset);
+        }
+
+        private void transferToSlave() throws Exception {
+            if (this.lastWriteOver) {
+                long interval =
+                    haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
+
+                if (interval > haService.getDefaultMessageStore().getMessageStoreConfig()
+                    .getHaSendHeartbeatInterval()) {
+
+                    buildTransferHeaderBuffer(this.nextTransferFromWhere, 0);
+
+                    this.lastWriteOver = this.transferData(0);
+                    if (!this.lastWriteOver) {
+                        return;
+                    }
+                }
+            } else {
+                // maxTransferSize == -1 means to continue transfer remaining data.
+                this.lastWriteOver = this.transferData(-1);
+                if (!this.lastWriteOver) {
+                    return;
+                }
+            }
+
+            int size = this.getNextTransferDataSize();
+            if (size > 0) {
+                if (size > haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
+                    size = haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
+                }
+                int canTransferMaxBytes = flowMonitor.canTransferMaxByteNum();
+                if (size > canTransferMaxBytes) {
+                    if (System.currentTimeMillis() - lastPrintTimestamp > 1000) {
+                        LOGGER.warn("Trigger HA flow control, max transfer speed {}KB/s, current speed: {}KB/s",
+                            String.format("%.2f", flowMonitor.maxTransferByteInSecond() / 1024.0),
+                            String.format("%.2f", flowMonitor.getTransferredByteInSecond() / 1024.0));
+                        lastPrintTimestamp = System.currentTimeMillis();
+                    }
+                    size = canTransferMaxBytes;
+                }
+                if (size <= 0) {
+                    this.releaseData();
+                    this.waitForRunning(100);
+                    return;
+                }
+
+                // We must ensure that the transmitted logs are within the same epoch
+                // If currentEpochEndOffset == -1, means that currentTransferEpoch = last epoch, so the endOffset = Long.max
+                final long currentEpochEndOffset = AutoSwitchHAConnection.this.currentTransferEpochEndOffset;
+                if (currentEpochEndOffset != -1 && this.nextTransferFromWhere + size > currentEpochEndOffset) {
+                    final EpochEntry epochEntry = AutoSwitchHAConnection.this.epochCache.nextEntry(AutoSwitchHAConnection.this.currentTransferEpoch);
+                    if (epochEntry == null) {
+                        LOGGER.error("Can't find a bigger epochEntry than epoch {}", AutoSwitchHAConnection.this.currentTransferEpoch);
+                        waitForRunning(100);
+                        return;
+                    }
+                    size = (int) (currentEpochEndOffset - this.nextTransferFromWhere);
+                    changeTransferEpochToNext(epochEntry);
+                }
+
+                this.transferOffset = this.nextTransferFromWhere;
+                this.nextTransferFromWhere += size;
+
+                // Build Header
+                buildTransferHeaderBuffer(this.transferOffset, size);
+
+                this.lastWriteOver = this.transferData(size);
+            } else {
+                haService.getWaitNotifyObject().allWaitForRunning(100);
+            }
+        }
+
+        @Override public void run() {
+            AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.selector.select(1000);
+
+                    switch (currentState) {
+                        case HANDSHAKE:
+                            // Wait until the slave send it handshake msg to master.
+                            if (!isSlaveSendHandshake) {
+                                this.waitForRunning(10);
+                                continue;
+                            }
+
+                            if (this.lastWriteOver) {
+                                if (!buildHandshakeBuffer()) {
+                                    LOGGER.error("AutoSwitchHAConnection build handshake buffer failed");
+                                    this.waitForRunning(5000);
+                                    continue;
+                                }
+                            }
+
+                            this.lastWriteOver = handshakeWithSlave();
+                            if (this.lastWriteOver) {
+                                // change flag to {false} to wait for slave notification
+                                isSlaveSendHandshake = false;
+                            }
+                            break;
+                        case TRANSFER:
+                            if (-1 == slaveRequestOffset) {
+                                this.waitForRunning(10);
+                                continue;
+                            }
+
+                            if (-1 == this.nextTransferFromWhere) {
+                                if (0 == slaveRequestOffset) {
+                                    // We must ensure that the starting point of syncing log
+                                    // must be the startOffset of a file (maybe the last file, or the minOffset)
+                                    final MessageStoreConfig config = haService.getDefaultMessageStore().getMessageStoreConfig();
+                                    if (AutoSwitchHAConnection.this.isSyncFromLastFile) {
+                                        long masterOffset = haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
+                                        masterOffset = masterOffset - (masterOffset % config.getMappedFileSizeCommitLog());
+                                        if (masterOffset < 0) {
+                                            masterOffset = 0;
+                                        }
+                                        this.nextTransferFromWhere = masterOffset;
+                                    } else {
+                                        this.nextTransferFromWhere = haService.getDefaultMessageStore().getCommitLog().getMinOffset();
+                                    }
+                                } else {
+                                    this.nextTransferFromWhere = slaveRequestOffset;
+                                }
+                                // Setup initial transferEpoch
+                                EpochEntry epochEntry = AutoSwitchHAConnection.this.epochCache.findEpochEntryByOffset(this.nextTransferFromWhere);
+                                if (epochEntry == null) {
+                                    LOGGER.error("Failed to find an epochEntry to match slaveRequestOffset {}", this.nextTransferFromWhere);
+                                    waitForRunning(500);
+                                    break;
+                                }
+                                changeTransferEpochToNext(epochEntry);
+                                LOGGER.info("Master transfer data to slave {}, from offset:{}, currentEpoch:{}",
+                                    AutoSwitchHAConnection.this.clientAddress, this.nextTransferFromWhere, epochEntry);
+                            }
+                            transferToSlave();
+                            break;
+                        default:
+                            throw new Exception("unexpected state " + currentState);
+                    }
+                } catch (Exception e) {
+                    AutoSwitchHAConnection.LOGGER.error(this.getServiceName() + " service has exception.", e);
+                    break;
+                }
+            }
+
+            this.onStop();
+
+            changeCurrentState(HAConnectionState.SHUTDOWN);
+
+            this.makeStop();
+
+            readSocketService.makeStop();
+
+            haService.removeConnection(AutoSwitchHAConnection.this);
+
+            SelectionKey sk = this.socketChannel.keyFor(this.selector);
+            if (sk != null) {
+                sk.cancel();
+            }
+
+            try {
+                this.selector.close();
+                this.socketChannel.close();
+            } catch (IOException e) {
+                AutoSwitchHAConnection.LOGGER.error("", e);
+            }
+            AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " service end");
+        }
+
+        abstract protected int getNextTransferDataSize();
+
+        abstract protected void releaseData();
+
+        abstract protected boolean transferData(int maxTransferSize) throws Exception;
+
+        abstract protected void onStop();
+    }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
new file mode 100644
index 000000000..4ccddf215
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.DispatchRequest;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.ha.DefaultHAService;
+import org.apache.rocketmq.store.ha.GroupTransferService;
+import org.apache.rocketmq.store.ha.HAConnection;
+import org.apache.rocketmq.store.ha.HAConnectionStateNotificationService;
+
+/**
+ * SwitchAble ha service, support switch role to master or slave.
+ */
+public class AutoSwitchHAService extends DefaultHAService {
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private EpochFileCache epochCache;
+    private AutoSwitchHAClient haClient;
+
+    public AutoSwitchHAService() {
+    }
+
+    @Override
+    public void init(final DefaultMessageStore defaultMessageStore) throws IOException {
+        this.epochCache = new EpochFileCache(defaultMessageStore.getMessageStoreConfig().getStorePathEpochFile());
+        this.epochCache.initCacheFromFile();
+        this.defaultMessageStore = defaultMessageStore;
+        this.acceptSocketService =
+            new AutoSwitchAcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
+        this.groupTransferService = new GroupTransferService(this, defaultMessageStore);
+        if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
+            this.haClient = new AutoSwitchHAClient(this, defaultMessageStore, this.epochCache);
+        }
+        this.haConnectionStateNotificationService = new HAConnectionStateNotificationService(this, defaultMessageStore);
+    }
+
+    @Override
+    public boolean changeToMaster(int masterEpoch) {
+        final int lastEpoch = this.epochCache.lastEpoch();
+        if (masterEpoch <= lastEpoch) {
+            return false;
+        }
+        destroyConnections();
+        // Stop ha client if needed
+        if (this.haClient != null) {
+            this.haClient.shutdown();
+        }
+
+        // Truncate dirty file
+        final long truncateOffset = truncateInvalidMsg();
+        if (truncateOffset >= 0) {
+            this.epochCache.truncateSuffixByOffset(truncateOffset);
+        }
+
+        // Append new epoch to epochFile
+        final EpochEntry newEpochEntry = new EpochEntry(masterEpoch, this.defaultMessageStore.getMaxPhyOffset());
+        if (this.epochCache.lastEpoch() >= masterEpoch) {
+            this.epochCache.truncateSuffixByEpoch(masterEpoch);
+        }
+        this.epochCache.appendEntry(newEpochEntry);
+
+        this.defaultMessageStore.recoverTopicQueueTable();
+        LOGGER.info("Change ha to master success, newMasterEpoch:{}, startOffset:{}", masterEpoch, newEpochEntry.getStartOffset());
+        return true;
+    }
+
+    @Override
+    public boolean changeToSlave(String newMasterAddr, String newHaMasterAddr, int newMasterEpoch) {
+        final int lastEpoch = this.epochCache.lastEpoch();
+        if (newMasterEpoch <= lastEpoch) {
+            return false;
+        }
+        try {
+            destroyConnections();
+            if (this.haClient == null) {
+                this.haClient = new AutoSwitchHAClient(this, defaultMessageStore, this.epochCache);
+            } else {
+                this.haClient.reOpen();
+            }
+            this.haClient.updateHaMasterAddress(newHaMasterAddr);
+            this.haClient.updateMasterAddress(newMasterAddr);
+            this.haClient.start();
+            LOGGER.info("Change ha to slave success, newMasterAddress:{}, newMasterEpoch:{}", newMasterAddr, newMasterEpoch);
+            return true;
+        } catch (final Exception e) {
+            LOGGER.error("Error happen when change ha to slave", e);
+            return false;
+        }
+    }
+
+    @Override
+    public Set<String> checkSyncStateSetChanged() {
+        final HashSet<String> newSyncStateSet = new HashSet<>(this.connectionList.size());
+        final long masterOffset = this.defaultMessageStore.getMaxPhyOffset();
+        for (HAConnection connection : this.connectionList) {
+            if (isInSyncSlave(masterOffset, connection)) {
+                newSyncStateSet.add(connection.getClientAddress());
+            }
+        }
+        return newSyncStateSet;
+    }
+
+    public void truncateEpochFilePrefix(final long offset) {
+        this.epochCache.truncatePrefixByOffset(offset);
+    }
+
+    public void truncateEpochFileSuffix(final long offset) {
+        this.epochCache.truncateSuffixByOffset(offset);
+    }
+
+    /**
+     * Try to truncate incomplete msg transferred from master.
+     */
+    public long truncateInvalidMsg() {
+        long dispatchBehind = this.defaultMessageStore.dispatchBehindBytes();
+        if (dispatchBehind <= 0) {
+            LOGGER.info("Dispatch complete, skip truncate");
+            return -1;
+        }
+
+        long reputFromOffset = this.defaultMessageStore.getMaxPhyOffset() - dispatchBehind;
+
+        boolean doNext = true;
+        while (reputFromOffset < this.defaultMessageStore.getMaxPhyOffset() && doNext) {
+            SelectMappedBufferResult result = this.defaultMessageStore.getCommitLog().getData(reputFromOffset);
+            if (result == null) {
+                break;
+            }
+
+            try {
+                reputFromOffset = result.getStartOffset();
+
+                int readSize = 0;
+                while (readSize < result.getSize()) {
+                    DispatchRequest dispatchRequest = this.defaultMessageStore.getCommitLog().checkMessageAndReturnSize(result.getByteBuffer(), false, false);
+
+                    int size = dispatchRequest.getMsgSize();
+
+                    if (dispatchRequest.isSuccess()) {
+                        if (size > 0) {
+                            reputFromOffset += size;
+                            readSize += size;
+                        } else {
+                            reputFromOffset = this.defaultMessageStore.getCommitLog().rollNextFile(reputFromOffset);
+                            break;
+                        }
+                    } else {
+                        doNext = false;
+                        break;
+                    }
+                }
+            } finally {
+                result.release();
+            }
+        }
+
+        LOGGER.info("AutoRecoverHAClient truncate commitLog to {}", reputFromOffset);
+        this.defaultMessageStore.truncateDirtyFiles(reputFromOffset);
+        return reputFromOffset;
+    }
+
+    /**
+     * Get confirm offset (min slaveAckOffset of all syncStateSet)
+     */
+    public long getConfirmOffset() {
+        long confirmOffset = this.defaultMessageStore.getMaxPhyOffset();
+        for (HAConnection connection : this.connectionList) {
+            confirmOffset = Math.min(confirmOffset, connection.getSlaveAckOffset());
+        }
+        return confirmOffset;
+    }
+
+    class AutoSwitchAcceptSocketService extends AcceptSocketService {
+
+        public AutoSwitchAcceptSocketService(int port) {
+            super(port);
+        }
+
+        @Override public String getServiceName() {
+            if (defaultMessageStore.getBrokerConfig().isInBrokerContainer()) {
+                return defaultMessageStore.getBrokerConfig().getLoggerIdentifier() + AcceptSocketService.class.getSimpleName();
+            }
+            return AutoSwitchAcceptSocketService.class.getSimpleName();
+        }
+
+        @Override protected HAConnection createConnection(SocketChannel sc) throws IOException {
+            return new AutoSwitchHAConnection(AutoSwitchHAService.this, sc, AutoSwitchHAService.this.epochCache);
+        }
+    }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/EpochFileCache.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/EpochFileCache.java
new file mode 100644
index 000000000..f2b8b7d85
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/EpochFileCache.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.CheckpointFile;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+/**
+ * Cache for epochFile.
+ * Mapping (Epoch -> StartOffset)
+ */
+public class EpochFileCache {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    private final Lock readLock = this.readWriteLock.readLock();
+    private final Lock writeLock = this.readWriteLock.writeLock();
+    private final TreeMap<Integer, EpochEntry> epochMap;
+    private CheckpointFile<EpochEntry> checkpoint;
+
+    public EpochFileCache() {
+        this.epochMap = new TreeMap<>();
+    }
+
+    public EpochFileCache(final String path) {
+        this.epochMap = new TreeMap<>();
+        this.checkpoint = new CheckpointFile<>(path, new EpochEntrySerializer());
+    }
+
+    public boolean initCacheFromFile() {
+        this.writeLock.lock();
+        try {
+            final List<EpochEntry> entries = this.checkpoint.read();
+            initEntries(entries);
+            return true;
+        } catch (final IOException e) {
+            log.error("Error happen when init epoch entries from epochFile", e);
+            return false;
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    public void initCacheFromEntries(final List<EpochEntry> entries) {
+        this.writeLock.lock();
+        try {
+            initEntries(entries);
+            flush();
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    private void initEntries(final List<EpochEntry> entries) {
+        this.epochMap.clear();
+        EpochEntry preEntry = null;
+        for (final EpochEntry entry : entries) {
+            this.epochMap.put(entry.getEpoch(), entry);
+            if (preEntry != null) {
+                preEntry.setEndOffset(entry.getStartOffset());
+            }
+            preEntry = entry;
+        }
+    }
+
+    public int getEntrySize() {
+        this.readLock.lock();
+        try {
+            return this.epochMap.size();
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public boolean appendEntry(final EpochEntry entry) {
+        this.writeLock.lock();
+        try {
+            if (!this.epochMap.isEmpty()) {
+                final EpochEntry lastEntry = this.epochMap.lastEntry().getValue();
+                if (lastEntry.getEpoch() >= entry.getEpoch() || lastEntry.getStartOffset() >= entry.getStartOffset()) {
+                    log.error("The appending entry's lastEpoch or endOffset {} is not bigger than lastEntry {}, append failed", entry, lastEntry);
+                    return false;
+                }
+                lastEntry.setEndOffset(entry.getStartOffset());
+            }
+            this.epochMap.put(entry.getEpoch(), new EpochEntry(entry));
+            flush();
+            return true;
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    /**
+     * Set endOffset for lastEpochEntry.
+     */
+    public void setLastEpochEntryEndOffset(final long endOffset) {
+        this.writeLock.lock();
+        try {
+            if (!this.epochMap.isEmpty()) {
+                final EpochEntry lastEntry = this.epochMap.lastEntry().getValue();
+                if (lastEntry.getStartOffset() <= endOffset) {
+                    lastEntry.setEndOffset(endOffset);
+                }
+            }
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    public EpochEntry firstEntry() {
+        this.readLock.lock();
+        try {
+            if (this.epochMap.isEmpty()) {
+                return null;
+            }
+            return new EpochEntry(this.epochMap.firstEntry().getValue());
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public EpochEntry lastEntry() {
+        this.readLock.lock();
+        try {
+            if (this.epochMap.isEmpty()) {
+                return null;
+            }
+            return new EpochEntry(this.epochMap.lastEntry().getValue());
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public int lastEpoch() {
+        final EpochEntry entry = lastEntry();
+        if (entry != null) {
+            return entry.getEpoch();
+        }
+        return -1;
+    }
+
+    public EpochEntry getEntry(final int epoch) {
+        this.readLock.lock();
+        try {
+            if (this.epochMap.containsKey(epoch)) {
+                final EpochEntry entry = this.epochMap.get(epoch);
+                return new EpochEntry(entry);
+            }
+            return null;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public EpochEntry findEpochEntryByOffset(final long offset) {
+        this.readLock.lock();
+        try {
+            if (!this.epochMap.isEmpty()) {
+                for (Map.Entry<Integer, EpochEntry> entry : this.epochMap.entrySet()) {
+                    if (entry.getValue().getStartOffset() <= offset && entry.getValue().getEndOffset() > offset) {
+                        return new EpochEntry(entry.getValue());
+                    }
+                }
+            }
+            return null;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public EpochEntry nextEntry(final int epoch) {
+        this.readLock.lock();
+        try {
+            final Map.Entry<Integer, EpochEntry> entry = this.epochMap.ceilingEntry(epoch + 1);
+            if (entry != null) {
+                return new EpochEntry(entry.getValue());
+            }
+            return null;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public List<EpochEntry> getAllEntries() {
+        this.readLock.lock();
+        try {
+            final ArrayList<EpochEntry> result = new ArrayList<>(this.epochMap.size());
+            this.epochMap.forEach((key, value) -> result.add(new EpochEntry(value)));
+            return result;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    /**
+     * Find the consistentPoint between compareCache and local.
+     *
+     * @return the consistent offset
+     */
+    public long findConsistentPoint(final EpochFileCache compareCache) {
+        this.readLock.lock();
+        try {
+            long consistentOffset = -1;
+            final Map<Integer, EpochEntry> descendingMap = new TreeMap<>(this.epochMap).descendingMap();
+            final Iterator<Map.Entry<Integer, EpochEntry>> iter = descendingMap.entrySet().iterator();
+            while (iter.hasNext()) {
+                final Map.Entry<Integer, EpochEntry> curLocalEntry = iter.next();
+                final EpochEntry compareEntry = compareCache.getEntry(curLocalEntry.getKey());
+                if (compareEntry != null && compareEntry.getStartOffset() == curLocalEntry.getValue().getStartOffset()) {
+                    consistentOffset = Math.min(curLocalEntry.getValue().getEndOffset(), compareEntry.getEndOffset());
+                    break;
+                }
+            }
+            return consistentOffset;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    /**
+     * Remove epochEntries with epoch >= truncateEpoch.
+     */
+    public void truncateSuffixByEpoch(final int truncateEpoch) {
+        Predicate<EpochEntry> predict = (entry) -> entry.getEpoch() >= truncateEpoch;
+        doTruncateSuffix(predict);
+    }
+
+    /**
+     * Remove epochEntries with startOffset >= truncateOffset.
+     */
+    public void truncateSuffixByOffset(final long truncateOffset) {
+        Predicate<EpochEntry> predict = (entry) -> entry.getStartOffset() >= truncateOffset;
+        doTruncateSuffix(predict);
+    }
+
+    private void doTruncateSuffix(Predicate<EpochEntry> predict) {
+        this.writeLock.lock();
+        try {
+            this.epochMap.entrySet().removeIf(entry -> predict.test(entry.getValue()));
+            final EpochEntry entry = lastEntry();
+            if (entry != null) {
+                entry.setEndOffset(Long.MAX_VALUE);
+            }
+            flush();
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    /**
+     * Remove epochEntries with endOffset <= truncateOffset.
+     */
+    public void truncatePrefixByOffset(final long truncateOffset) {
+        Predicate<EpochEntry> predict = (entry) -> entry.getEndOffset() <= truncateOffset;
+        this.writeLock.lock();
+        try {
+            this.epochMap.entrySet().removeIf(entry -> predict.test(entry.getValue()));
+            flush();
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    private void flush() {
+        this.writeLock.lock();
+        try {
+            if (this.checkpoint != null) {
+                final ArrayList<EpochEntry> entries = new ArrayList<>(this.epochMap.values());
+                this.checkpoint.write(entries);
+            }
+        } catch (final IOException e) {
+            log.error("Error happen when flush epochEntries to epochCheckpointFile", e);
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    static class EpochEntrySerializer implements CheckpointFile.CheckpointSerializer<EpochEntry> {
+
+        @Override
+        public String toLine(EpochEntry entry) {
+            if (entry != null) {
+                return String.format("%d-%d", entry.getEpoch(), entry.getStartOffset());
+            } else {
+                return null;
+            }
+        }
+
+        @Override
+        public EpochEntry fromLine(String line) {
+            final String[] arr = line.split("-");
+            if (arr.length == 2) {
+                final int epoch = Integer.parseInt(arr[0]);
+                final long startOffset = Long.parseLong(arr[1]);
+                return new EpochEntry(epoch, startOffset);
+            }
+            return null;
+        }
+    }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/io/AbstractHAReader.java b/store/src/main/java/org/apache/rocketmq/store/ha/io/AbstractHAReader.java
new file mode 100644
index 000000000..105295076
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/io/AbstractHAReader.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+public abstract class AbstractHAReader {
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    protected final List<HAReadHook> readHookList = new ArrayList<>();
+
+    public boolean read(SocketChannel socketChannel, ByteBuffer byteBufferRead) {
+        int readSizeZeroTimes = 0;
+        while (byteBufferRead.hasRemaining()) {
+            try {
+                int readSize = socketChannel.read(byteBufferRead);
+                for (HAReadHook readHook : readHookList) {
+                    readHook.afterRead(readSize);
+                }
+                if (readSize > 0) {
+                    readSizeZeroTimes = 0;
+                    boolean result = processReadResult(byteBufferRead);
+                    if (!result) {
+                        LOGGER.error("Process read result failed");
+                        return false;
+                    }
+                } else if (readSize == 0) {
+                    if (++readSizeZeroTimes >= 3) {
+                        break;
+                    }
+                } else {
+                    LOGGER.info("Read socket < 0");
+                    return false;
+                }
+            } catch (IOException e) {
+                LOGGER.info("Read socket exception", e);
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    public void registerHook(HAReadHook readHook) {
+        readHookList.add(readHook);
+    }
+
+    public void clearHook() {
+        readHookList.clear();
+    }
+
+    /**
+     * Process read result.
+     *
+     * @param byteBufferRead read result
+     * @return true if process succeed, false otherwise
+     */
+    protected abstract boolean processReadResult(ByteBuffer byteBufferRead);
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/io/HAReadHook.java b/store/src/main/java/org/apache/rocketmq/store/ha/io/HAReadHook.java
new file mode 100644
index 000000000..4efcadd55
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/io/HAReadHook.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.io;
+
+public interface HAReadHook {
+    void afterRead(int readSize);
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/io/HAWriteHook.java b/store/src/main/java/org/apache/rocketmq/store/ha/io/HAWriteHook.java
new file mode 100644
index 000000000..959432888
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/io/HAWriteHook.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.io;
+
+public interface HAWriteHook {
+    void afterWrite(int writeSize);
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/io/HAWriter.java b/store/src/main/java/org/apache/rocketmq/store/ha/io/HAWriter.java
new file mode 100644
index 000000000..21daaa6f7
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/io/HAWriter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+public class HAWriter {
+    private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    protected final List<HAWriteHook> writeHookList = new ArrayList<>();
+
+    public boolean write(SocketChannel socketChannel, ByteBuffer byteBufferWrite) {
+        int writeSizeZeroTimes = 0;
+        while (byteBufferWrite.hasRemaining()) {
+            try {
+                int writeSize = socketChannel.write(byteBufferWrite);
+                for (HAWriteHook writeHook : writeHookList) {
+                    writeHook.afterWrite(writeSize);
+                }
+                if (writeSize > 0) {
+                    writeSizeZeroTimes = 0;
+                } else if (writeSize == 0) {
+                    if (++writeSizeZeroTimes >= 3) {
+                        break;
+                    }
+                } else {
+                    LOGGER.info("Write socket < 0");
+                }
+            } catch (IOException e) {
+                LOGGER.info("Write socket exception", e);
+                return false;
+            }
+
+        }
+
+        return !byteBufferWrite.hasRemaining();
+    }
+
+    public void registerHook(HAWriteHook writeHook) {
+        writeHookList.add(writeHook);
+    }
+
+    public void clearHook() {
+        writeHookList.clear();
+    }
+}
diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/EpochFileCacheTest.java b/store/src/test/java/org/apache/rocketmq/store/ha/EpochFileCacheTest.java
new file mode 100644
index 000000000..aadf683a2
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/ha/EpochFileCacheTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.ha;
+
+import java.io.File;
+import java.nio.file.Paths;
+import org.apache.rocketmq.common.EpochEntry;
+import org.apache.rocketmq.store.ha.autoswitch.EpochFileCache;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class EpochFileCacheTest {
+    private EpochFileCache epochCache;
+    private EpochFileCache epochCache2;
+    private String path;
+    private String path2;
+
+    @Before
+    public void setup() {
+        this.path = Paths.get(File.separator + "tmp", "EpochCheckpoint").toString();
+        this.epochCache = new EpochFileCache(path);
+        assertTrue(this.epochCache.appendEntry(new EpochEntry(1, 100)));
+        assertTrue(this.epochCache.appendEntry(new EpochEntry(2, 300)));
+        assertTrue(this.epochCache.appendEntry(new EpochEntry(3, 500)));
+        final EpochEntry entry = this.epochCache.getEntry(2);
+        assertEquals(entry.getEpoch(), 2);
+        assertEquals(entry.getStartOffset(), 300);
+        assertEquals(entry.getEndOffset(), 500);
+    }
+
+    @After
+    public void shutdown() {
+        new File(this.path).delete();
+        if (this.path2 != null) {
+            new File(this.path2).delete();
+        }
+    }
+
+    @Test
+    public void testInitFromFile() {
+        // Remove entries, init from file
+        assertTrue(this.epochCache.initCacheFromFile());
+        final EpochEntry entry = this.epochCache.getEntry(2);
+        assertEquals(entry.getEpoch(), 2);
+        assertEquals(entry.getStartOffset(), 300);
+        assertEquals(entry.getEndOffset(), 500);
+    }
+
+    @Test
+    public void testTruncate() {
+        this.epochCache.truncateSuffixByOffset(150);
+        assertNotNull(this.epochCache.getEntry(1));
+        assertNull(this.epochCache.getEntry(2));
+    }
+
+    @Test
+    public void testFindEpochEntryByOffset() {
+        final EpochEntry entry = this.epochCache.findEpochEntryByOffset(350);
+        assertEquals(entry.getEpoch(), 2);
+        assertEquals(entry.getStartOffset(), 300);
+        assertEquals(entry.getEndOffset(), 500);
+    }
+
+    @Test
+    public void testFindConsistentPointSample1() {
+        this.path2 = Paths.get(File.separator + "tmp", "EpochCheckpoint2").toString();
+        this.epochCache2 = new EpochFileCache(path2);
+        assertTrue(this.epochCache2.appendEntry(new EpochEntry(1, 100)));
+        assertTrue(this.epochCache2.appendEntry(new EpochEntry(2, 300)));
+        assertTrue(this.epochCache2.appendEntry(new EpochEntry(3, 450)));
+        /**
+         *  cache1: <Epoch1, 100>, <Epoch2, 300>, <Epoch3, 500>
+         *  cache2: <Epoch1, 100>, <Epoch2, 300>, <Epoch3, 450>
+         *  The consistent point should be 450
+         */
+        final long consistentPoint = this.epochCache.findConsistentPoint(this.epochCache2);
+        assertEquals(consistentPoint, 450);
+    }
+
+    @Test
+    public void testFindConsistentPointSample2() {
+        this.path2 = Paths.get(File.separator + "tmp", "EpochCheckpoint2").toString();
+        this.epochCache2 = new EpochFileCache(path2);
+        assertTrue(this.epochCache2.appendEntry(new EpochEntry(1, 100)));
+        assertTrue(this.epochCache2.appendEntry(new EpochEntry(2, 300)));
+        assertTrue(this.epochCache2.appendEntry(new EpochEntry(3, 500)));
+        /**
+         *  cache1: <Epoch1, 100>, <Epoch2, 300>, <Epoch3, 500, 700>
+         *  cache2: <Epoch1, 100>, <Epoch2, 300>, <Epoch3, 500, 600>
+         *  The consistent point should be 600
+         */
+        this.epochCache.setLastEpochEntryEndOffset(700);
+        this.epochCache2.setLastEpochEntryEndOffset(600);
+        final long consistentPoint = this.epochCache.findConsistentPoint(this.epochCache2);
+        assertEquals(consistentPoint, 600);
+    }
+
+    @Test
+    public void testFindConsistentPointSample3() {
+        this.path2 = Paths.get(File.separator + "tmp", "EpochCheckpoint2").toString();
+        this.epochCache2 = new EpochFileCache(path2);
+        assertTrue(this.epochCache2.appendEntry(new EpochEntry(1, 200)));
+        assertTrue(this.epochCache2.appendEntry(new EpochEntry(2, 500)));
+        /**
+         *  cache1: <Epoch1, 100>, <Epoch2, 300>, <Epoch3, 500, 700>
+         *  cache2: <Epoch1, 200>, <Epoch2, 500>
+         *  The consistent point should be -1
+         */
+        final long consistentPoint = this.epochCache.findConsistentPoint(this.epochCache2);
+        assertEquals(consistentPoint, -1);
+    }
+
+    @Test
+    public void testFindConsistentPointSample4() {
+        this.path2 = Paths.get(File.separator + "tmp", "EpochCheckpoint2").toString();
+        this.epochCache2 = new EpochFileCache(path2);
+        assertTrue(this.epochCache2.appendEntry(new EpochEntry(1, 100)));
+        assertTrue(this.epochCache2.appendEntry(new EpochEntry(2, 300)));
+        assertTrue(this.epochCache2.appendEntry(new EpochEntry(3, 500)));
+        assertTrue(this.epochCache2.appendEntry(new EpochEntry(4, 800)));
+        /**
+         *  cache1: <Epoch1, 100>, <Epoch2, 300>, <Epoch3, 500, 700>
+         *  cache2: <Epoch1, 100>, <Epoch2, 300>, <Epoch3, 500>, <Epoch4, 800>
+         *  The consistent point should be 700
+         */
+        this.epochCache.setLastEpochEntryEndOffset(700);
+        final long consistentPoint = this.epochCache2.findConsistentPoint(this.epochCache);
+        assertEquals(consistentPoint, 700);
+    }
+}
\ No newline at end of file
diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
new file mode 100644
index 000000000..1f6360295
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.ha.autoswitch;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.GetMessageStatus;
+import org.apache.rocketmq.store.MappedFileQueue;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.config.FlushDiskType;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.logfile.MappedFile;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class AutoSwitchHATest {
+    private final String StoreMessage = "Once, there was a chance for me!";
+    private final int defaultMappedFileSize = 1024 * 1024;
+    private int QUEUE_TOTAL = 100;
+    private AtomicInteger QueueId = new AtomicInteger(0);
+    private SocketAddress BornHost;
+    private SocketAddress StoreHost;
+    private byte[] MessageBody;
+
+    private DefaultMessageStore messageStore1;
+    private DefaultMessageStore messageStore2;
+    private DefaultMessageStore messageStore3;
+    private MessageStoreConfig storeConfig1;
+    private MessageStoreConfig storeConfig2;
+    private MessageStoreConfig storeConfig3;
+    private String store1HaAddress;
+    private String store2HaAddress;
+    private String store3HaAddress;
+
+    private BrokerStatsManager brokerStatsManager = new BrokerStatsManager("simpleTest", true);
+    private String storePathRootParentDir = System.getProperty("user.home") + File.separator +
+        UUID.randomUUID().toString().replace("-", "");
+    private String storePathRootDir = storePathRootParentDir + File.separator + "store";
+
+    public void init(int mappedFileSize) throws Exception {
+        QUEUE_TOTAL = 1;
+        MessageBody = StoreMessage.getBytes();
+        StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
+        BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
+        storeConfig1 = new MessageStoreConfig();
+        storeConfig1.setBrokerRole(BrokerRole.SYNC_MASTER);
+        storeConfig1.setStorePathRootDir(storePathRootDir + File.separator + "broker1");
+        storeConfig1.setStorePathCommitLog(storePathRootDir + File.separator + "broker1" + File.separator + "commitlog");
+        storeConfig1.setStorePathEpochFile(storePathRootDir + File.separator + "broker1" + File.separator + "EpochFileCache");
+        storeConfig1.setTotalReplicas(3);
+        storeConfig1.setInSyncReplicas(2);
+        storeConfig1.setStartupControllerMode(true);
+        buildMessageStoreConfig(storeConfig1, mappedFileSize);
+        this.store1HaAddress = "127.0.0.1:10912";
+
+        storeConfig2 = new MessageStoreConfig();
+        storeConfig2.setBrokerRole(BrokerRole.SLAVE);
+        storeConfig2.setStorePathRootDir(storePathRootDir + File.separator + "broker2");
+        storeConfig2.setStorePathCommitLog(storePathRootDir + File.separator + "broker2" + File.separator + "commitlog");
+        storeConfig2.setStorePathEpochFile(storePathRootDir + File.separator + "broker2" + File.separator + "EpochFileCache");
+        storeConfig2.setHaListenPort(10943);
+        storeConfig2.setTotalReplicas(3);
+        storeConfig2.setInSyncReplicas(2);
+        storeConfig2.setStartupControllerMode(true);
+        buildMessageStoreConfig(storeConfig2, mappedFileSize);
+        this.store2HaAddress = "127.0.0.1:10943";
+
+        messageStore1 = buildMessageStore(storeConfig1, 0L);
+        messageStore2 = buildMessageStore(storeConfig2, 1L);
+
+        storeConfig3 = new MessageStoreConfig();
+        storeConfig3.setBrokerRole(BrokerRole.SLAVE);
+        storeConfig3.setStorePathRootDir(storePathRootDir + File.separator + "broker3");
+        storeConfig3.setStorePathCommitLog(storePathRootDir + File.separator + "broker3" + File.separator + "commitlog");
+        storeConfig3.setStorePathEpochFile(storePathRootDir + File.separator + "broker3" + File.separator + "EpochFileCache");
+        storeConfig3.setHaListenPort(10980);
+        storeConfig3.setTotalReplicas(3);
+        storeConfig3.setInSyncReplicas(2);
+        storeConfig3.setStartupControllerMode(true);
+        buildMessageStoreConfig(storeConfig3, mappedFileSize);
+        messageStore3 = buildMessageStore(storeConfig3, 3L);
+        this.store3HaAddress = "127.0.0.1:10980";
+
+        assertTrue(messageStore1.load());
+        assertTrue(messageStore2.load());
+        assertTrue(messageStore3.load());
+        messageStore1.start();
+        messageStore2.start();
+        messageStore3.start();
+    }
+
+    private void changeMasterAndPutMessage(DefaultMessageStore master, MessageStoreConfig masterConfig,
+        DefaultMessageStore slave, MessageStoreConfig slaveConfig, int epoch, String masterHaAddress,
+        int totalPutMessageNums) throws Exception {
+
+        // Change role
+        slaveConfig.setBrokerRole(BrokerRole.SLAVE);
+        masterConfig.setBrokerRole(BrokerRole.SYNC_MASTER);
+        slave.getHaService().changeToSlave("", masterHaAddress, epoch);
+        master.getHaService().changeToMaster(epoch);
+        Thread.sleep(6000);
+
+        // Put message on master
+        for (int i = 0; i < totalPutMessageNums; i++) {
+            master.putMessage(buildMessage());
+        }
+        Thread.sleep(200);
+    }
+
+    private void checkMessage(final DefaultMessageStore messageStore, int totalMsgs, int startOffset) {
+        for (long i = 0; i < totalMsgs; i++) {
+            GetMessageResult result = messageStore.getMessage("GROUP_A", "FooBar", 0, startOffset + i, 1024 * 1024, null);
+            assertThat(result).isNotNull();
+            if (!GetMessageStatus.FOUND.equals(result.getStatus())) {
+                System.out.println("Failed i :" + i);
+            }
+            assertTrue(GetMessageStatus.FOUND.equals(result.getStatus()));
+            result.release();
+        }
+    }
+
+    @Test
+    public void testChangeRoleManyTimes() throws Exception {
+        // Step1, change store1 to master, store2 to follower
+        init(defaultMappedFileSize);
+        changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, this.storeConfig2, 1, store1HaAddress, 10);
+        checkMessage(this.messageStore2, 10, 0);
+
+        // Step2, change store1 to follower, store2 to master, epoch = 2
+        changeMasterAndPutMessage(this.messageStore2, this.storeConfig2, this.messageStore1, this.storeConfig1, 2, store2HaAddress, 10);
+        checkMessage(this.messageStore1, 20, 0);
+
+        // Step3, change store2 to follower, store1 to master, epoch = 3
+        changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, this.storeConfig2, 3, store1HaAddress, 10);
+        checkMessage(this.messageStore2, 30, 0);
+    }
+
+    @Test
+    public void testAddBroker() throws Exception {
+        // Step1: broker1 as leader, broker2 as follower
+        init(defaultMappedFileSize);
+        changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, this.storeConfig2, 1, store1HaAddress, 10);
+        checkMessage(this.messageStore2, 10, 0);
+
+        // Step2: add new broker3, link to broker1
+        messageStore3.getHaService().changeToSlave("", "127.0.0.1:10912", 1);
+        Thread.sleep(6000);
+        checkMessage(messageStore3, 10, 0);
+    }
+
+    @Test
+    public void testTruncateEpochLogAndAddBroker() throws Exception {
+        // Noted that 10 msg 's total size = 1570, and if init the mappedFileSize = 1700, one file only be used to store 10 msg.
+        init(1700);
+
+        // Step1: broker1 as leader, broker2 as follower, append 2 epoch, each epoch will be stored on one file(Because fileSize = 1700, which only can hold 10 msgs);
+        // Master: <Epoch1, 0, 1570> <Epoch2, 1570, 3270>
+
+        changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, this.storeConfig2, 1, store1HaAddress, 10);
+        checkMessage(this.messageStore2, 10, 0);
+        changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, this.storeConfig2, 2, store1HaAddress, 10);
+        checkMessage(this.messageStore2, 20, 0);
+
+        // Step2: Check file position, each epoch will be stored on one file(Because fileSize = 1570, which equal to 10 msg size);
+        // So epoch1 was stored in firstFile, epoch2 was stored in second file, the lastFile was empty.
+        final MappedFileQueue fileQueue = this.messageStore1.getCommitLog().getMappedFileQueue();
+        assertEquals(2, fileQueue.getTotalFileSize() / 1700);
+
+        // Step3: truncate epoch1's log (truncateEndOffset = 1570), which means we should delete the first file directly.
+        final MappedFile firstFile = this.messageStore1.getCommitLog().getMappedFileQueue().getFirstMappedFile();
+        firstFile.shutdown(1000);
+        fileQueue.retryDeleteFirstFile(1000);
+        assertEquals(this.messageStore1.getCommitLog().getMinOffset(), 1700);
+        checkMessage(this.messageStore1, 10, 10);
+
+        final AutoSwitchHAService haService = (AutoSwitchHAService) this.messageStore1.getHaService();
+        haService.truncateEpochFilePrefix(1570);
+
+        // Step4: add broker3 as slave, only have 10 msg from offset 10;
+        messageStore3.getHaService().changeToSlave("", store1HaAddress, 2);
+        Thread.sleep(6000);
+
+        checkMessage(messageStore3, 10, 10);
+    }
+
+    @Test
+    public void testTruncateEpochLogAndChangeMaster() throws Exception {
+        // Noted that 10 msg 's total size = 1570, and if init the mappedFileSize = 1700, one file only be used to store 10 msg.
+        init(1700);
+
+        // Step1: broker1 as leader, broker2 as follower, append 2 epoch, each epoch will be stored on one file(Because fileSize = 1700, which only can hold 10 msgs);
+        // Master: <Epoch1, 0, 1570> <Epoch2, 1570, 3270>
+
+        changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, this.storeConfig2, 1, store1HaAddress, 10);
+        checkMessage(this.messageStore2, 10, 0);
+        changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, this.storeConfig2, 2, store1HaAddress, 10);
+        checkMessage(this.messageStore2, 20, 0);
+
+        // Step2: Check file position, each epoch will be stored on one file(Because fileSize = 1570, which equal to 10 msg size);
+        // So epoch1 was stored in firstFile, epoch2 was stored in second file, the lastFile was empty.
+        final MappedFileQueue fileQueue = this.messageStore1.getCommitLog().getMappedFileQueue();
+        assertEquals(2, fileQueue.getTotalFileSize() / 1700);
+
+        // Step3: truncate epoch1's log (truncateEndOffset = 1570), which means we should delete the first file directly.
+        final MappedFile firstFile = this.messageStore1.getCommitLog().getMappedFileQueue().getFirstMappedFile();
+        firstFile.shutdown(1000);
+        fileQueue.retryDeleteFirstFile(1000);
+        assertEquals(this.messageStore1.getCommitLog().getMinOffset(), 1700);
+
+        final AutoSwitchHAService haService = (AutoSwitchHAService) this.messageStore1.getHaService();
+        haService.truncateEpochFilePrefix(1570);
+        checkMessage(this.messageStore1, 10, 10);
+
+        // Step4: add broker3 as slave
+        messageStore3.getHaService().changeToSlave("", store1HaAddress, 2);
+        Thread.sleep(6000);
+        checkMessage(messageStore3, 10, 10);
+
+        // Step5: change broker2 as leader, broker3 as follower
+        changeMasterAndPutMessage(this.messageStore2, this.storeConfig2, this.messageStore3, this.storeConfig3, 3, this.store2HaAddress, 10);
+        checkMessage(messageStore3, 20, 10);
+
+        // Step6, let broker1 link to broker2, it should sync log from epoch3.
+        this.storeConfig1.setBrokerRole(BrokerRole.SLAVE);
+        this.messageStore1.getHaService().changeToSlave("", this.store2HaAddress, 3);
+        Thread.sleep(6000);
+        checkMessage(messageStore1, 20, 0);
+    }
+
+
+    @Test
+    public void testAddBrokerAndSyncFromLastFile() throws Exception {
+        init(1700);
+
+        // Step1: broker1 as leader, broker2 as follower, append 2 epoch, each epoch will be stored on one file(Because fileSize = 1700, which only can hold 10 msgs);
+        // Master: <Epoch1, 0, 1570> <Epoch2, 1570, 3270>
+        changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, this.storeConfig2, 1, store1HaAddress, 10);
+        checkMessage(this.messageStore2, 10, 0);
+        changeMasterAndPutMessage(this.messageStore1, this.storeConfig1, this.messageStore2, this.storeConfig2, 2, store1HaAddress, 10);
+        checkMessage(this.messageStore2, 20, 0);
+
+
+        // Step2: restart broker3
+        messageStore3.shutdown();
+        messageStore3.destroy();
+
+        storeConfig3.setSyncFromLastFile(true);
+        messageStore3 = buildMessageStore(storeConfig3, 3L);
+        assertTrue(messageStore3.load());
+        messageStore3.start();
+
+        // Step2: add new broker3, link to broker1. because broker3 request sync from lastFile, so it only synced 10 msg from offset 10;
+        messageStore3.getHaService().changeToSlave("", "127.0.0.1:10912", 2);
+        Thread.sleep(6000);
+        checkMessage(messageStore3, 10, 10);
+    }
+
+
+    @After
+    public void destroy() throws Exception {
+        Thread.sleep(5000L);
+        messageStore2.shutdown();
+        messageStore2.destroy();
+        messageStore1.shutdown();
+        messageStore1.destroy();
+        messageStore3.shutdown();
+        messageStore3.destroy();
+        File file = new File(storePathRootParentDir);
+        UtilAll.deleteFile(file);
+    }
+
+    private DefaultMessageStore buildMessageStore(MessageStoreConfig messageStoreConfig,
+        long brokerId) throws Exception {
+        BrokerConfig brokerConfig = new BrokerConfig();
+        brokerConfig.setBrokerId(brokerId);
+        return new DefaultMessageStore(messageStoreConfig, brokerStatsManager, null, brokerConfig);
+    }
+
+    private void buildMessageStoreConfig(MessageStoreConfig messageStoreConfig, int mappedFileSize) {
+        messageStoreConfig.setMappedFileSizeCommitLog(mappedFileSize);
+        messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 1024);
+        messageStoreConfig.setMaxHashSlotNum(10000);
+        messageStoreConfig.setMaxIndexNum(100 * 100);
+        messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
+        messageStoreConfig.setFlushIntervalConsumeQueue(1);
+    }
+
+    private MessageExtBrokerInner buildMessage() {
+        MessageExtBrokerInner msg = new MessageExtBrokerInner();
+        msg.setTopic("FooBar");
+        msg.setTags("TAG1");
+        msg.setBody(MessageBody);
+        msg.setKeys(String.valueOf(System.currentTimeMillis()));
+        msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
+        msg.setSysFlag(0);
+        msg.setBornTimestamp(System.currentTimeMillis());
+        msg.setStoreHost(StoreHost);
+        msg.setBornHost(BornHost);
+        msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
+        return msg;
+    }
+}