You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/09/07 09:02:21 UTC

[rocketmq] branch 5.0.0-beta-compact updated: Support replicate between master and slave (#4998)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch 5.0.0-beta-compact
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta-compact by this push:
     new 07185a6ff Support replicate between master and slave (#4998)
07185a6ff is described below

commit 07185a6ff6588a5ddeec91d2d43bef2d0c95dcf5
Author: ltamber <lt...@gmail.com>
AuthorDate: Wed Sep 7 17:02:00 2022 +0800

    Support replicate between master and slave (#4998)
    
    * replicate from master
    
    * fix unit test
    
    * fix unit test
    
    * LogAndCq
    
    * combine log and CQ
    
    * replace file
    
    * clean after replicating
    
    * unit test
    
    * bug fix
    
    * dump command
    
    * compaction cycle and dump log tool
    
    * fix the first compaction not trigger
    
    * optimize stop pull logic
    
    * fix checkstyle
    
    * replaces error when compact replicating and current
    
    * force clean cq in replicating
    
    * force clean cq in replicating
    
    * mapped file queue empty judgement fix
    
    * clean compacting folder
    
    * validate cqUnit when getMessage
    
    * clear files when replace completed
    
    * optimize sparse consume queue recover
    
    * disable allocate mapped file & separate compaction load and start
    
    * code optimize
    
    * fix compaction error
    
    * checkstyle
    
    * checkstyle
    
    * compactionLog size base on sparse consume queue size
    
    * bugfix
    
    * fix
    
    * fix unit test
---
 .../apache/rocketmq/broker/BrokerController.java   |   1 +
 .../java/org/apache/rocketmq/store/CommitLog.java  |   2 +-
 .../apache/rocketmq/store/DefaultMessageStore.java |  10 +-
 .../org/apache/rocketmq/store/MappedFileQueue.java |   4 +
 .../rocketmq/store/config/MessageStoreConfig.java  |   2 +-
 .../apache/rocketmq/store/kv/CompactionLog.java    | 564 +++++++++++++++------
 .../rocketmq/store/kv/CompactionPositionMgr.java   |   4 +
 .../rocketmq/store/kv/CompactionService.java       |  22 +-
 .../apache/rocketmq/store/kv/CompactionStore.java  |  64 ++-
 .../apache/rocketmq/store/kv/MessageFetcher.java   | 200 ++++++++
 .../rocketmq/store/queue/SparseConsumeQueue.java   | 113 ++++-
 .../rocketmq/store/kv/CompactionLogTest.java       |  72 ++-
 .../rocketmq/tools/command/MQAdminStartup.java     |   2 +
 .../command/message/DumpCompactionLogCommand.java  | 109 ++++
 14 files changed, 953 insertions(+), 216 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 2fdb6f83d..b49ed5dd5 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1650,6 +1650,7 @@ public class BrokerController {
             if (registerBrokerResult != null) {
                 if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
                     this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
+                    this.messageStore.updateMasterAddress(registerBrokerResult.getMasterAddr());
                 }
 
                 this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 82ceaae9b..cf1a7f857 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -525,7 +525,7 @@ public class CommitLog implements Swappable {
 
             return dispatchRequest;
         } catch (Exception e) {
-            log.error("CheckMessageAndReturnSizeOld", e);
+
         }
 
         return new DispatchRequest(-1, false /* success */);
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 81aa99c53..b1a5247ab 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -211,8 +211,6 @@ public class DefaultMessageStore implements MessageStore {
 
         this.transientStorePool = new TransientStorePool(messageStoreConfig);
 
-        this.compactionService.start();
-
         this.scheduledExecutorService =
             Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity()));
 
@@ -284,6 +282,8 @@ public class DefaultMessageStore implements MessageStore {
             // load Consume Queue
             result = result && this.consumeQueueStore.load();
 
+            result = result && this.compactionService.load(lastExitOK);
+
             if (result) {
                 this.storeCheckpoint =
                     new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
@@ -349,6 +349,7 @@ public class DefaultMessageStore implements MessageStore {
 
         this.flushConsumeQueueService.start();
         this.commitLog.start();
+        this.compactionService.start();
         this.storeStatsService.start();
 
         if (this.haService != null) {
@@ -1213,6 +1214,9 @@ public class DefaultMessageStore implements MessageStore {
         if (this.haService != null) {
             this.haService.updateMasterAddress(newAddr);
         }
+        if (this.compactionService != null) {
+            this.compactionService.updateMasterAddress(newAddr);
+        }
     }
 
     @Override
@@ -2355,7 +2359,7 @@ public class DefaultMessageStore implements MessageStore {
                 }
             }
 
-            compactionStore.flushCq(flushConsumeQueueLeastPages);
+            compactionStore.flushCQ(flushConsumeQueueLeastPages);
 
             if (0 == flushConsumeQueueLeastPages) {
                 if (logicsMsgTimestamp > 0) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
index b9b7a1fdb..5b7c84ad0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
@@ -776,4 +776,8 @@ public class MappedFileQueue implements Swappable {
     public long getTotalFileSize() {
         return (long) mappedFileSize * mappedFiles.size();
     }
+
+    public String getStorePath() {
+        return storePath;
+    }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index eb34fbdb6..da27b1dd6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -44,7 +44,7 @@ public class MessageStoreConfig {
     // CompactionLog consumeQueue file size, default is 10M
     private int compactionCqMappedFileSize = 10 * 1024 * 1024;
 
-    private int compactionScheduleInternal = 15 * 60 * 60;
+    private int compactionScheduleInternal = 15 * 60 * 1000;
 
     private int maxOffsetMapSize = 100 * 1024 * 1024;
 
diff --git a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionLog.java b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionLog.java
index 1ed6ea1a5..e5fdae714 100644
--- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionLog.java
@@ -57,13 +57,12 @@ import java.security.DigestException;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import static org.apache.rocketmq.store.CommitLog.BLANK_MAGIC_CODE;
@@ -73,101 +72,197 @@ public class CompactionLog {
 
     private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
     private static final int MAX_PULL_MSG_SIZE = 128 * 1024 * 1024;
-    public static final String SUB_FOLDER = "compacting";
+    public static final String COMPACTING_SUB_FOLDER = "compacting";
+    public static final String REPLICATING_SUB_FOLDER = "replicating";
 
     private final int compactionLogMappedFileSize;
     private final int compactionCqMappedFileSize;
     private final String compactionLogFilePath;
     private final String compactionCqFilePath;
     private final MessageStore defaultMessageStore;
+    private final CompactionStore compactionStore;
     private final MessageStoreConfig messageStoreConfig;
-
     private final CompactionAppendMsgCallback endMsgCallback;
-
     private final String topic;
     private final int queueId;
     private final int offsetMapMemorySize;
-
     private final PutMessageLock putMessageLock;
     private final PutMessageLock readMessageLock;
-    private MappedFileQueue currentMappedFileQueue;
-    private MappedFileQueue newMappedFileQueue;
-    private SparseConsumeQueue currentBcq;
-    private SparseConsumeQueue compactionBcq;
+    private TopicPartitionLog current;
+    private TopicPartitionLog compacting;
+    private TopicPartitionLog replicating;
     private CompactionPositionMgr positionMgr;
-    private AtomicBoolean compacting = new AtomicBoolean(false);
+    private AtomicReference<State> state;
 
-    public CompactionLog(final MessageStore messageStore, final String topic, final int queueId,
-        final int offsetMapMemorySize, CompactionPositionMgr positionMgr, String compactionLogStoreRootPath,
-        String compactionCqStoreRootPath)
+    public CompactionLog(final MessageStore messageStore, final CompactionStore compactionStore, final String topic, final int queueId)
         throws IOException {
         this.topic = topic;
         this.queueId = queueId;
-        this.offsetMapMemorySize = offsetMapMemorySize;
         this.defaultMessageStore = messageStore;
+        this.compactionStore = compactionStore;
         this.messageStoreConfig = messageStore.getMessageStoreConfig();
-        this.compactionLogMappedFileSize = messageStoreConfig.getCompactionMappedFileSize();
+        this.offsetMapMemorySize = compactionStore.getOffsetMapSize();
         this.compactionCqMappedFileSize =
             messageStoreConfig.getCompactionCqMappedFileSize() / BatchConsumeQueue.CQ_STORE_UNIT_SIZE
                 * BatchConsumeQueue.CQ_STORE_UNIT_SIZE;
-        this.compactionLogFilePath = Paths.get(compactionLogStoreRootPath, topic, String.valueOf(queueId)).toString();
-        this.compactionCqFilePath = compactionCqStoreRootPath;        // batch consume queue already separated
+        this.compactionLogMappedFileSize = getCompactionLogSize(compactionCqMappedFileSize,
+            messageStoreConfig.getCompactionMappedFileSize());
+        this.compactionLogFilePath = Paths.get(compactionStore.getCompactionLogPath(),
+            topic, String.valueOf(queueId)).toString();
+        this.compactionCqFilePath = compactionStore.getCompactionCqPath();        // batch consume queue already separated
+        this.positionMgr = compactionStore.getPositionMgr();
 
-        this.currentMappedFileQueue = new MappedFileQueue(compactionLogFilePath, compactionLogMappedFileSize,
-            defaultMessageStore.getAllocateMappedFileService());
         this.putMessageLock =
             messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() :
                 new PutMessageSpinLock();
         this.readMessageLock =
             messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() :
                 new PutMessageSpinLock();
-        this.positionMgr = positionMgr;
         this.endMsgCallback = new CompactionAppendEndMsgCallback();
-        this.load();
-        log.info("CompactionLog init completed.");
+        this.state = new AtomicReference<>(State.INITIALIZING);
+//        this.load();
+        log.info("CompactionLog {}:{} init completed.", topic, queueId);
     }
 
-    private void load() throws IOException, RuntimeException {
-        if (!this.currentMappedFileQueue.load()) {
-            throw new IOException("load compactionLog exception");
+    private int getCompactionLogSize(int cqSize, int origLogSize) {
+        int n = origLogSize / cqSize;
+        if (n < 5) {
+            return cqSize * 5;
+        }
+        int m = origLogSize % cqSize;
+        if (m > 0 && m < (cqSize >> 1)) {
+            return n * cqSize;
+        } else {
+            return (n + 1) * cqSize;
         }
-        loadCq();
-        recover();
-        sanityCheck();
     }
 
-    private void loadCq() {
-        SparseConsumeQueue bcq = new SparseConsumeQueue(topic, queueId, compactionCqFilePath,
-            compactionCqMappedFileSize, defaultMessageStore);
-        bcq.load();
-        bcq.recover();
-        currentBcq = bcq;
+    public void load(boolean exitOk) throws IOException, RuntimeException {
+        initLogAndCq(exitOk);
+        if (defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE
+            && getLog().isMappedFilesEmpty()) {
+            log.info("{}:{} load compactionLog from remote master", topic, queueId);
+            loadFromRemoteAsync();
+        } else {
+            state.compareAndSet(State.INITIALIZING, State.NORMAL);
+        }
     }
 
-    private void recover() {
-        long maxCqPhysicOffset = currentBcq.getMaxPhysicOffset();
-        if (maxCqPhysicOffset > 0) {
-            log.info("max cq physical offset is {}", maxCqPhysicOffset);
-            this.currentMappedFileQueue.setFlushedWhere(maxCqPhysicOffset);
-            this.currentMappedFileQueue.setCommittedWhere(maxCqPhysicOffset);
-            this.currentMappedFileQueue.truncateDirtyFiles(maxCqPhysicOffset);
-        }
+    private void initLogAndCq(boolean exitOk) throws IOException, RuntimeException {
+        current = new TopicPartitionLog(this);
+        current.init(exitOk);
     }
 
-    void sanityCheck() throws RuntimeException {
-        List<MappedFile> mappedFileList = currentMappedFileQueue.getMappedFiles();
-        for (MappedFile file : mappedFileList) {
-            if (!currentBcq.containsOffsetFile(Long.parseLong(file.getFile().getName()))) {
-                throw new RuntimeException("log file mismatch with consumeQueue file " + file.getFileName());
+
+    private boolean putMessageFromRemote(byte[] bytes) {
+        ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+        // split bytebuffer to avoid encode message again
+        while (byteBuffer.hasRemaining()) {
+            int mark = byteBuffer.position();
+            ByteBuffer bb = byteBuffer.slice();
+            int size = bb.getInt();
+            if (size < 0 || size > byteBuffer.capacity()) {
+                break;
+            } else {
+                bb.limit(size);
+                bb.rewind();
+            }
+
+            MessageExt messageExt = MessageDecoder.decode(bb, false, false);
+            long messageOffset = messageExt.getQueueOffset();
+            long minOffsetInQueue = getCQ().getMinOffsetInQueue();
+            if (getLog().isMappedFilesEmpty() || messageOffset < minOffsetInQueue) {
+                asyncPutMessage(bb, messageExt, replicating);
+            } else {
+                log.info("{}:{} message offset {} >= minOffsetInQueue {}, stop pull...",
+                    topic, queueId, messageOffset, minOffsetInQueue);
+                return false;
             }
+
+            byteBuffer.position(mark + size);
+        }
+
+        return true;
+
+    }
+
+    private void pullMessageFromMaster() throws Exception {
+
+        if (StringUtils.isBlank(compactionStore.getMasterAddr())) {
+            compactionStore.getCompactionSchedule().schedule(() -> {
+                try {
+                    pullMessageFromMaster();
+                } catch (Exception e) {
+                    log.error("pullMessageFromMaster exception: ", e);
+                }
+            }, 5, TimeUnit.SECONDS);
+            return;
+        }
+
+        replicating = new TopicPartitionLog(this, REPLICATING_SUB_FOLDER);
+        try (MessageFetcher messageFetcher = new MessageFetcher()) {
+            messageFetcher.pullMessageFromMaster(topic, queueId, getCQ().getMinOffsetInQueue(),
+                compactionStore.getMasterAddr(), (currOffset, response) -> {
+                    if (currOffset < 0) {
+                        log.info("{}:{} current offset {}, stop pull...", topic, queueId, currOffset);
+                        return false;
+                    }
+                    return putMessageFromRemote(response.getBody());
+//                    positionMgr.setOffset(topic, queueId, currOffset);
+                });
         }
 
-        List<MappedFile> cqMappedFileList = currentBcq.getMappedFileQueue().getMappedFiles();
-        for (MappedFile file: cqMappedFileList) {
-            if (mappedFileList.stream().noneMatch(m -> Objects.equals(m.getFile().getName(), file.getFile().getName()))) {
-                throw new RuntimeException("consumeQueue file mismatch with log file " + file.getFileName());
+        // merge files
+        if (getLog().isMappedFilesEmpty()) {
+            replaceFiles(getLog().getMappedFiles(), current, replicating);
+        } else if (replicating.getLog().isMappedFilesEmpty()) {
+            log.info("replicating message is empty");   //break
+        } else {
+            List<MappedFile> newFiles = Lists.newArrayList();
+            List<MappedFile> toCompactFiles = Lists.newArrayList(replicating.getLog().getMappedFiles());
+            putMessageLock.lock();
+            try {
+                // combine current and replicating to mappedFileList
+                newFiles = Lists.newArrayList(getLog().getMappedFiles());
+                toCompactFiles.addAll(newFiles);  //all from current
+                current.roll(toCompactFiles.size() * compactionLogMappedFileSize);
+            } catch (Throwable e) {
+                log.error("roll log and cq exception: ", e);
+            } finally {
+                putMessageLock.unlock();
+            }
+
+            try {
+                // doCompaction with current and replicating
+                compactAndReplace(new FileList(toCompactFiles, newFiles));
+            } catch (Throwable e) {
+                log.error("do merge replicating and current exception: ", e);
             }
         }
+
+        // cleanReplicatingResource, force clean cq
+        replicating.clean(false, true);
+
+//        positionMgr.setOffset(topic, queueId, currentPullOffset);
+        state.compareAndSet(State.INITIALIZING, State.NORMAL);
+    }
+    private void loadFromRemoteAsync() {
+        compactionStore.getCompactionSchedule().submit(() -> {
+            try {
+                pullMessageFromMaster();
+            } catch (Exception e) {
+                log.error("fetch message from master exception: ", e);
+            }
+        });
+
+        // update (currentStatus) = LOADING
+
+        // request => get (start, end)
+        // pull message => current message offset > end
+        // done
+        // positionMgr.persist();
+
+        // update (currentStatus) = RUNNING
     }
 
     private long nextOffsetCorrection(long oldOffset, long newOffset) {
@@ -220,28 +315,6 @@ public class CompactionLog {
         return false;
     }
 
-    private boolean shouldRoll(MappedFileQueue mappedFileQueue, SparseConsumeQueue bcq, final int msgSize) {
-        return mappedFileQueue.shouldRoll(msgSize) || bcq.shouldRoll();
-    }
-
-    private boolean isEmptyOrCurrentFileFull(MappedFileQueue mappedFileQueue, SparseConsumeQueue bcq) {
-        return mappedFileQueue.isEmptyOrCurrentFileFull() || bcq.getMappedFileQueue().isEmptyOrCurrentFileFull();
-    }
-
-    private void rollLogAndCq(MappedFileQueue mappedFileQueue, SparseConsumeQueue bcq) throws IOException {
-        MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
-        if (mappedFile == null) {
-            throw new IOException("create new file error");
-        }
-        long baseOffset = mappedFile.getFileFromOffset();
-        MappedFile cqFile = bcq.createFile(baseOffset);
-        if (cqFile == null) {
-            mappedFile.destroy(1000);
-            mappedFileQueue.getMappedFiles().remove(mappedFile);
-            throw new IOException("create new consumeQueue file error");
-        }
-    }
-
     public long rollNextFile(final long offset) {
         return offset + compactionLogMappedFileSize - offset % compactionLogMappedFileSize;
     }
@@ -263,28 +336,28 @@ public class CompactionLog {
     }
 
     public void checkAndPutMessage(final SelectMappedBufferResult selectMappedBufferResult, final MessageExt msgExt,
-        final OffsetMap offsetMap, final MappedFileQueue mappedFileQueue, final SparseConsumeQueue bcq)
+        final OffsetMap offsetMap, final TopicPartitionLog tpLog)
         throws DigestException {
         if (shouldRetainMsg(msgExt, offsetMap)) {
-            asyncPutMessage(selectMappedBufferResult, msgExt, mappedFileQueue, bcq);
+            asyncPutMessage(selectMappedBufferResult.getByteBuffer(), msgExt, tpLog);
         }
     }
 
     public CompletableFuture<PutMessageResult> asyncPutMessage(final SelectMappedBufferResult selectMappedBufferResult) {
-        return asyncPutMessage(selectMappedBufferResult, currentMappedFileQueue, currentBcq);
+        return asyncPutMessage(selectMappedBufferResult, current);
     }
 
     public CompletableFuture<PutMessageResult> asyncPutMessage(final SelectMappedBufferResult selectMappedBufferResult,
-        MappedFileQueue mappedFileQueue, SparseConsumeQueue bcq) {
+        final TopicPartitionLog tpLog) {
         MessageExt msgExt = MessageDecoder.decode(selectMappedBufferResult.getByteBuffer(), false, false);
-        return asyncPutMessage(selectMappedBufferResult, msgExt, mappedFileQueue, bcq);
+        return asyncPutMessage(selectMappedBufferResult.getByteBuffer(), msgExt, tpLog);
     }
 
-    public CompletableFuture<PutMessageResult> asyncPutMessage(final SelectMappedBufferResult selectMappedBufferResult,
-        final MessageExt msgExt, MappedFileQueue mappedFileQueue, SparseConsumeQueue bcq) {
+    public CompletableFuture<PutMessageResult> asyncPutMessage(final ByteBuffer msgBuffer,
+        final MessageExt msgExt, final TopicPartitionLog tpLog) {
 
         // fix duplicate
-        if (bcq.getMaxOffsetInQueue() >= msgExt.getQueueOffset()) {
+        if (tpLog.getCQ().getMaxOffsetInQueue() - 1 >= msgExt.getQueueOffset()) {
             return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
         }
 
@@ -298,32 +371,32 @@ public class CompactionLog {
         try {
             long beginTime = System.nanoTime();
 
-            if (isEmptyOrCurrentFileFull(mappedFileQueue, bcq)) {
+            if (tpLog.isEmptyOrCurrentFileFull()) {
                 try {
-                    rollLogAndCq(mappedFileQueue, bcq);
+                    tpLog.roll();
                 } catch (IOException e) {
                     log.error("create mapped file or consumerQueue exception: ", e);
                     return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null));
                 }
             }
 
-            MappedFile mappedFile = mappedFileQueue.getLastMappedFile();
+            MappedFile mappedFile = tpLog.getLog().getLastMappedFile();
 
-            CompactionAppendMsgCallback callback = new CompactionAppendMessageCallback(msgExt, bcq);
-            AppendMessageResult result = mappedFile.appendMessage(selectMappedBufferResult.getByteBuffer(), callback);
+            CompactionAppendMsgCallback callback = new CompactionAppendMessageCallback(msgExt, tpLog.getCQ());
+            AppendMessageResult result = mappedFile.appendMessage(msgBuffer, callback);
 
             switch (result.getStatus()) {
                 case PUT_OK:
                     break;
                 case END_OF_FILE:
                     try {
-                        rollLogAndCq(mappedFileQueue, bcq);
+                        tpLog.roll();
                     } catch (IOException e) {
                         log.error("create mapped file2 error, topic: {}, clientAddr: {}", msgExt.getTopic(), msgExt.getBornHostString());
                         return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result));
                     }
-                    mappedFile = mappedFileQueue.getLastMappedFile();
-                    result = mappedFile.appendMessage(selectMappedBufferResult.getByteBuffer(), callback);
+                    mappedFile = tpLog.getLog().getLastMappedFile();
+                    result = mappedFile.appendMessage(msgBuffer, callback);
                     break;
                 default:
                     return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
@@ -337,7 +410,7 @@ public class CompactionLog {
 
     private SelectMappedBufferResult getMessage(final long offset, final int size) {
 
-        MappedFile mappedFile = this.currentMappedFileQueue.findMappedFileByOffset(offset, offset == 0);
+        MappedFile mappedFile = this.getLog().findMappedFileByOffset(offset, offset == 0);
         if (mappedFile != null) {
             int pos = (int) (offset % compactionLogMappedFileSize);
             return mappedFile.selectMappedBuffer(pos, size);
@@ -345,6 +418,13 @@ public class CompactionLog {
         return null;
     }
 
+    private boolean validateCqUnit(CqUnit cqUnit) {
+        return cqUnit.getPos() >= 0
+            && cqUnit.getSize() > 0
+            && cqUnit.getQueueOffset() >= 0
+            && cqUnit.getBatchNum() > 0;
+    }
+
     public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
         final int maxMsgNums, final int maxTotalMsgSize) {
         readMessageLock.lock();
@@ -358,9 +438,9 @@ public class CompactionLog {
 
             GetMessageResult getResult = new GetMessageResult();
 
-            final long maxOffsetPy = currentMappedFileQueue.getMaxOffset();
+            final long maxOffsetPy = getLog().getMaxOffset();
 
-            SparseConsumeQueue consumeQueue = currentBcq;
+            SparseConsumeQueue consumeQueue = getCQ();
             if (consumeQueue != null) {
                 minOffset = consumeQueue.getMinOffsetInQueue();
                 maxOffset = consumeQueue.getMaxOffsetInQueue();
@@ -407,13 +487,11 @@ public class CompactionLog {
                             long nextPhyFileStartOffset = Long.MIN_VALUE;
                             while (bufferConsumeQueue.hasNext() && nextBeginOffset < maxOffset) {
                                 CqUnit cqUnit = bufferConsumeQueue.next();
-                                long offsetPy = cqUnit.getPos();
-                                int sizePy = cqUnit.getSize();
-
-                                if (offsetPy < 0) {
-                                    // indicate that the mapped file ended
+                                if (!validateCqUnit(cqUnit)) {
                                     break;
                                 }
+                                long offsetPy = cqUnit.getPos();
+                                int sizePy = cqUnit.getSize();
 
                                 boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
 
@@ -476,50 +554,60 @@ public class CompactionLog {
         }
     }
 
-    List<MappedFile> getCompactionFile() {
-        List<MappedFile> mappedFileList = Lists.newArrayList(currentMappedFileQueue.getMappedFiles());
+    FileList getCompactionFile() {
+        List<MappedFile> mappedFileList = Lists.newArrayList(getLog().getMappedFiles());
         if (mappedFileList.size() < 2) {
-            return Collections.emptyList();
+            return null;
         }
 
+        List<MappedFile> toCompactFiles = mappedFileList.subList(0, mappedFileList.size() - 1);
+
         //exclude the last writing file
-        for (int i = mappedFileList.size() - 2; i >= 0; i--) {
+        List<MappedFile> newFiles = Lists.newArrayList();
+        for (int i = 0; i < mappedFileList.size() - 1; i++) {
             MappedFile mf = mappedFileList.get(i);
-            long maxQueueOffset = currentBcq.getMaxMsgOffsetFromFile(mf.getFile().getName());
-            if (maxQueueOffset <= positionMgr.getOffset(topic, queueId)) {
-                if (i < mappedFileList.size() - 2) {
-                    // next to end
-                    return mappedFileList.subList(i + 1, mappedFileList.size() - 2);
-                } else {
-                    return Collections.emptyList();
-                }
+            long maxQueueOffsetInFile = getCQ().getMaxMsgOffsetFromFile(mf.getFile().getName());
+            if (maxQueueOffsetInFile > positionMgr.getOffset(topic, queueId)) {
+                newFiles.add(mf);
             }
         }
 
-        return Collections.emptyList();
+        if (newFiles.isEmpty()) {
+            return null;
+        }
+
+        return new FileList(toCompactFiles, newFiles);
+    }
+
+    void compactAndReplace(FileList compactFiles) throws Throwable {
+        if (compactFiles == null || compactFiles.isEmpty()) {
+            return;
+        }
+
+        long startTime = System.nanoTime();
+        OffsetMap offsetMap = getOffsetMap(compactFiles.newFiles);
+        compaction(compactFiles.toCompactFiles, offsetMap);
+        replaceFiles(compactFiles.toCompactFiles, current, compacting);
+        positionMgr.setOffset(topic, queueId, offsetMap.lastOffset);
+        positionMgr.persist();
+        compacting.clean(false, false);
+        log.info("this compaction elapsed {} milliseconds",
+            TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
+
     }
 
     void doCompaction() {
-        if (!compacting.compareAndSet(false, true)) {
-            log.warn("compaction is running, skip this time.");
+        if (!state.compareAndSet(State.NORMAL, State.COMPACTING)) {
+            log.warn("compactionLog state is {}, skip this time", state.get());
             return;
         }
 
         try {
-            List<MappedFile> mappedFileList = getCompactionFile();
-            if (CollectionUtils.isNotEmpty(mappedFileList)) {
-                long startTime = System.nanoTime();
-                OffsetMap offsetMap = getOffsetMap(mappedFileList);
-                compaction(mappedFileList, offsetMap);
-                replaceFiles(mappedFileList, currentMappedFileQueue, newMappedFileQueue, offsetMap);
-                positionMgr.setOffset(topic, queueId, offsetMap.lastOffset);
-                log.info("this compaction elapsed {} milliseconds",
-                    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
-            }
+            compactAndReplace(getCompactionFile());
         } catch (Throwable e) {
             log.error("do compaction exception: ", e);
         }
-        compacting.set(false);
+        state.compareAndSet(State.COMPACTING, State.NORMAL);
     }
 
     protected OffsetMap getOffsetMap(List<MappedFile> mappedFileList) throws NoSuchAlgorithmException, DigestException {
@@ -535,7 +623,9 @@ public class CompactionLog {
                     MessageExt msg = MessageDecoder.decode(smb.getByteBuffer(), true, false);
                     if (msg != null) {
                         ////get key & offset and put to offsetMap
-                        offsetMap.put(msg.getKeys(), msg.getQueueOffset());
+                        if (msg.getQueueOffset() > positionMgr.getOffset(topic, queueId)) {
+                            offsetMap.put(msg.getKeys(), msg.getQueueOffset());
+                        }
                     } else {
                         // msg is null indicate that file is end
                         break;
@@ -561,10 +651,7 @@ public class CompactionLog {
     }
 
     protected void compaction(List<MappedFile> mappedFileList, OffsetMap offsetMap) throws DigestException {
-        newMappedFileQueue = new MappedFileQueue(compactionLogFilePath + File.separator + SUB_FOLDER,
-            compactionLogMappedFileSize, null);
-        compactionBcq = new SparseConsumeQueue(topic, queueId, compactionCqFilePath,
-            compactionCqMappedFileSize, defaultMessageStore, SUB_FOLDER);
+        compacting = new TopicPartitionLog(this, COMPACTING_SUB_FOLDER);
 
         for (MappedFile mappedFile : mappedFileList) {
             Iterator<SelectMappedBufferResult> iterator = mappedFile.iterator(0);
@@ -577,7 +664,7 @@ public class CompactionLog {
                         // file end
                         break;
                     } else {
-                        checkAndPutMessage(smb, msgExt, offsetMap, newMappedFileQueue, compactionBcq);
+                        checkAndPutMessage(smb, msgExt, offsetMap, compacting);
                     }
                 } finally {
                     if (smb != null) {
@@ -586,20 +673,28 @@ public class CompactionLog {
                 }
             }
         }
-        putEndMessage(newMappedFileQueue);
+        putEndMessage(compacting.getLog());
     }
 
-    private void replaceFiles(List<MappedFile> mappedFileList, MappedFileQueue current,
-        MappedFileQueue compacted, OffsetMap offsetMap) {
+    protected void replaceFiles(List<MappedFile> mappedFileList, TopicPartitionLog current,
+        TopicPartitionLog newLog) {
+
+        MappedFileQueue dest = current.getLog();
+        MappedFileQueue src = newLog.getLog();
 
         long beginTime = System.nanoTime();
-        List<String> fileNameToReplace = mappedFileList.stream()
-            .map(m -> m.getFile().getName())
+//        List<String> fileNameToReplace = mappedFileList.stream()
+//            .map(m -> m.getFile().getName())
+//            .collect(Collectors.toList());
+
+        List<String> fileNameToReplace = dest.getMappedFiles().stream()
+            .filter(mappedFileList::contains)
+            .map(mf -> mf.getFile().getName())
             .collect(Collectors.toList());
 
         mappedFileList.forEach(MappedFile::renameToDelete);
 
-        compacted.getMappedFiles().forEach(mappedFile -> {
+        src.getMappedFiles().forEach(mappedFile -> {
             try {
                 mappedFile.moveToParent();
             } catch (IOException e) {
@@ -607,27 +702,28 @@ public class CompactionLog {
             }
         });
 
-        current.getMappedFiles().stream()
+        dest.getMappedFiles().stream()
             .filter(m -> !mappedFileList.contains(m))
-            .forEach(m -> compacted.getMappedFiles().add(m));
+            .forEach(m -> src.getMappedFiles().add(m));
 
         readMessageLock.lock();
         try {
             mappedFileList.forEach(mappedFile -> mappedFile.destroy(1000));
 
-            current.getMappedFiles().clear();
-            current.getMappedFiles().addAll(compacted.getMappedFiles());
+            dest.getMappedFiles().clear();
+            dest.getMappedFiles().addAll(src.getMappedFiles());
+            src.getMappedFiles().clear();
 
-            replaceCqFiles(currentBcq, compactionBcq, fileNameToReplace);
+            replaceCqFiles(getCQ(), newLog.getCQ(), fileNameToReplace);
 
-            log.info("replace file elapsed {} millisecs",
+            log.info("replace file elapsed {} milliseconds",
                 TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beginTime));
         } finally {
             readMessageLock.unlock();
         }
     }
 
-    private void replaceCqFiles(SparseConsumeQueue currentBcq, SparseConsumeQueue compactionBcq,
+    protected void replaceCqFiles(SparseConsumeQueue currentBcq, SparseConsumeQueue compactionBcq,
         List<String> fileNameToReplace) {
         long beginTime = System.nanoTime();
 
@@ -653,21 +749,27 @@ public class CompactionLog {
 
         currentMq.getMappedFiles().clear();
         currentMq.getMappedFiles().addAll(compactMq.getMappedFiles());
+        compactMq.getMappedFiles().clear();
+
         currentBcq.refresh();
         log.info("replace consume queue file elapsed {} millsecs.",
             TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beginTime));
     }
 
-    public SparseConsumeQueue getCurrentBcq() {
-        return currentBcq;
+    public MappedFileQueue getLog() {
+        return current.mappedFileQueue;
     }
 
-    public SparseConsumeQueue getCompactionBcq() {
-        return compactionBcq;
+    public SparseConsumeQueue getCQ() {
+        return current.consumeQueue;
     }
 
-    public void flushCq(int flushLeastPages) {
-        currentBcq.flush(flushLeastPages);
+//    public SparseConsumeQueue getCompactionScq() {
+//        return compactionScq;
+//    }
+
+    public void flushCQ(int flushLeastPages) {
+        getCQ().flush(flushLeastPages);
     }
 
     static class CompactionAppendEndMsgCallback implements CompactionAppendMsgCallback {
@@ -831,4 +933,168 @@ public class CompactionLog {
         }
     }
 
+    static class TopicPartitionLog {
+        MappedFileQueue mappedFileQueue;
+        SparseConsumeQueue consumeQueue;
+
+        public TopicPartitionLog(CompactionLog compactionLog) {
+            this(compactionLog, null);
+        }
+        public TopicPartitionLog(CompactionLog compactionLog, String subFolder) {
+            if (StringUtils.isBlank(subFolder)) {
+                mappedFileQueue = new MappedFileQueue(compactionLog.compactionLogFilePath,
+                    compactionLog.compactionLogMappedFileSize, null);
+                consumeQueue = new SparseConsumeQueue(compactionLog.topic, compactionLog.queueId,
+                    compactionLog.compactionCqFilePath, compactionLog.compactionCqMappedFileSize,
+                    compactionLog.defaultMessageStore);
+            } else {
+                mappedFileQueue = new MappedFileQueue(compactionLog.compactionLogFilePath + File.separator + subFolder,
+                    compactionLog.compactionLogMappedFileSize, null);
+                consumeQueue = new SparseConsumeQueue(compactionLog.topic, compactionLog.queueId,
+                    compactionLog.compactionCqFilePath, compactionLog.compactionCqMappedFileSize,
+                    compactionLog.defaultMessageStore, subFolder);
+            }
+        }
+
+        public void shutdown() {
+            mappedFileQueue.shutdown(1000 * 30);
+            consumeQueue.getMappedFileQueue().shutdown(1000 * 30);
+        }
+
+        public void init(boolean exitOk) throws IOException, RuntimeException {
+            if (!mappedFileQueue.load()) {
+                shutdown();
+                throw new IOException("load log exception");
+            }
+
+            if (!consumeQueue.load()) {
+                shutdown();
+                throw new IOException("load consume queue exception");
+            }
+
+            try {
+                consumeQueue.recover();
+                recover();
+                sanityCheck();
+            } catch (Exception e) {
+                shutdown();
+                throw e;
+            }
+        }
+
+        private void recover() {
+            long maxCqPhysicOffset = consumeQueue.getMaxPhyOffsetInLog();
+            log.info("{}:{} max physical offset in compaction log is {}",
+                consumeQueue.getTopic(), consumeQueue.getQueueId(), maxCqPhysicOffset);
+            if (maxCqPhysicOffset > 0) {
+                this.mappedFileQueue.setFlushedWhere(maxCqPhysicOffset);
+                this.mappedFileQueue.setCommittedWhere(maxCqPhysicOffset);
+                this.mappedFileQueue.truncateDirtyFiles(maxCqPhysicOffset);
+            }
+        }
+
+        void sanityCheck() throws RuntimeException {
+            List<MappedFile> mappedFileList = mappedFileQueue.getMappedFiles();
+            for (MappedFile file : mappedFileList) {
+                if (!consumeQueue.containsOffsetFile(Long.parseLong(file.getFile().getName()))) {
+                    throw new RuntimeException("log file mismatch with consumeQueue file " + file.getFileName());
+                }
+            }
+
+            List<MappedFile> cqMappedFileList = consumeQueue.getMappedFileQueue().getMappedFiles();
+            for (MappedFile file: cqMappedFileList) {
+                if (mappedFileList.stream().noneMatch(m -> Objects.equals(m.getFile().getName(), file.getFile().getName()))) {
+                    throw new RuntimeException("consumeQueue file mismatch with log file " + file.getFileName());
+                }
+            }
+        }
+
+        public synchronized void roll() throws IOException {
+            MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
+            if (mappedFile == null) {
+                throw new IOException("create new file error");
+            }
+            long baseOffset = mappedFile.getFileFromOffset();
+            MappedFile cqFile = consumeQueue.createFile(baseOffset);
+            if (cqFile == null) {
+                mappedFile.destroy(1000);
+                mappedFileQueue.getMappedFiles().remove(mappedFile);
+                throw new IOException("create new consumeQueue file error");
+            }
+        }
+
+        public synchronized void roll(int baseOffset) throws IOException {
+
+            MappedFile mappedFile = mappedFileQueue.tryCreateMappedFile(baseOffset);
+            if (mappedFile == null) {
+                throw new IOException("create new file error");
+            }
+
+            MappedFile cqFile = consumeQueue.createFile(baseOffset);
+            if (cqFile == null) {
+                mappedFile.destroy(1000);
+                mappedFileQueue.getMappedFiles().remove(mappedFile);
+                throw new IOException("create new consumeQueue file error");
+            }
+        }
+
+        public boolean isEmptyOrCurrentFileFull() {
+            return mappedFileQueue.isEmptyOrCurrentFileFull() ||
+                consumeQueue.getMappedFileQueue().isEmptyOrCurrentFileFull();
+        }
+
+        public void clean(MappedFileQueue mappedFileQueue) throws IOException {
+            for (MappedFile mf : mappedFileQueue.getMappedFiles()) {
+                if (mf.getFile().exists()) {
+                    log.error("directory {} with {} not empty.", mappedFileQueue.getStorePath(), mf.getFileName());
+                    throw new IOException("directory " + mappedFileQueue.getStorePath() + " not empty.");
+                }
+            }
+
+            mappedFileQueue.destroy();
+        }
+
+        public void clean(boolean forceCleanLog, boolean forceCleanCq) throws IOException {
+            //clean and delete sub_folder
+            if (forceCleanLog) {
+                mappedFileQueue.destroy();
+            } else {
+                clean(mappedFileQueue);
+            }
+
+            if (forceCleanCq) {
+                consumeQueue.getMappedFileQueue().destroy();
+            } else {
+                clean(consumeQueue.getMappedFileQueue());
+            }
+        }
+
+        public MappedFileQueue getLog() {
+            return mappedFileQueue;
+        }
+
+        public SparseConsumeQueue getCQ() {
+            return consumeQueue;
+        }
+    }
+
+    static enum State {
+        NORMAL,
+        INITIALIZING,
+        COMPACTING,
+    }
+
+    static class FileList {
+        List<MappedFile> newFiles;
+        List<MappedFile> toCompactFiles;
+        public FileList(List<MappedFile> toCompactFiles, List<MappedFile> newFiles) {
+            this.toCompactFiles = toCompactFiles;
+            this.newFiles = newFiles;
+        }
+
+        boolean isEmpty() {
+            return CollectionUtils.isEmpty(newFiles) || CollectionUtils.isEmpty(toCompactFiles);
+        }
+    }
+
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionPositionMgr.java b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionPositionMgr.java
index c32bb1d3d..4181b34b8 100644
--- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionPositionMgr.java
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionPositionMgr.java
@@ -49,6 +49,10 @@ public class CompactionPositionMgr extends ConfigManager {
         return queueOffsetMap.getOrDefault(topic + "_" + queueId, -1L);
     }
 
+    public boolean isEmpty() {
+        return queueOffsetMap.isEmpty();
+    }
+
     public boolean isCompaction(String topic, int queueId, long offset) {
         return getOffset(topic, queueId) > offset;
     }
diff --git a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionService.java b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionService.java
index 891aa1f0f..0aff597eb 100644
--- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionService.java
@@ -107,18 +107,32 @@ public class CompactionService extends ServiceThread {
         }
     }
 
-    @Override
-    public void start() {
-        compactionStore.load();
-        super.start();
+    public boolean load(boolean exitOK) {
+        try {
+            compactionStore.load(exitOK);
+            return true;
+        } catch (Exception e) {
+            log.error("load compaction store error ", e);
+            return false;
+        }
     }
 
+//    @Override
+//    public void start() {
+//        compactionStore.load();
+//        super.start();
+//    }
+
     @Override
     public void shutdown() {
         super.shutdown();
         compactionStore.shutdown();
     }
 
+    public void updateMasterAddress(String addr) {
+        compactionStore.updateMasterAddress(addr);
+    }
+
     static class TopicPartitionOffset {
         String topic;
         int queueId;
diff --git a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
index 805ee6d93..9e69505e4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/CompactionStore.java
@@ -51,6 +51,7 @@ public class CompactionStore {
     private final int compactionInterval;
     private final int compactionThreadNum;
     private final int offsetMapSize;
+    private String masterAddr;
 
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
 
@@ -75,7 +76,7 @@ public class CompactionStore {
         this.compactionInterval = defaultMessageStore.getMessageStoreConfig().getCompactionScheduleInternal();
     }
 
-    public void load() {
+    public void load(boolean exitOk) throws Exception {
         File logRoot = new File(compactionLogPath);
         File[] fileTopicList = logRoot.listFiles();
         if (fileTopicList != null) {
@@ -95,16 +96,18 @@ public class CompactionStore {
                             int queueId = Integer.parseInt(fileQueueId.getName());
 
                             if (Files.isDirectory(Paths.get(compactionCqPath, topic, String.valueOf(queueId)))) {
-                                CompactionLog log = new CompactionLog(defaultMessageStore, topic, queueId,
-                                    offsetMapSize, positionMgr, compactionLogPath, compactionCqPath);
+                                CompactionLog log = new CompactionLog(defaultMessageStore, this, topic, queueId);
+                                log.load(exitOk);
                                 compactionLogTable.put(topic + "_" + queueId, log);
-                                compactionSchedule.scheduleWithFixedDelay(log::doCompaction, compactionInterval, compactionInterval, TimeUnit.SECONDS);
+                                compactionSchedule.scheduleWithFixedDelay(log::doCompaction, compactionInterval, compactionInterval, TimeUnit.MILLISECONDS);
                             } else {
                                 log.error("{}:{} compactionLog mismatch with compactionCq", topic, queueId);
                             }
                         } catch (Exception e) {
-                            log.warn("build bcq {}:{} exception: ",fileTopic.getName(), fileQueueId.getName(), e);
-                            continue;
+                            log.error("load compactionLog {}:{} exception: ",
+                                fileTopic.getName(), fileQueueId.getName(), e);
+                            throw new Exception("load compactionLog " + fileTopic.getName()
+                                + ":" + fileQueueId.getName() + " exception: " + e.getMessage());
                         }
                     }
                 }
@@ -114,21 +117,23 @@ public class CompactionStore {
     }
 
     public void putMessage(String topic, int queueId, SelectMappedBufferResult smr) throws Exception {
-        compactionLogTable.compute(topic + "_" + queueId, (k, v) -> {
+        CompactionLog clog = compactionLogTable.compute(topic + "_" + queueId, (k, v) -> {
             if (v == null) {
                 try {
-                    v = new CompactionLog(defaultMessageStore, topic, queueId, offsetMapSize, positionMgr,
-                        compactionLogPath, compactionCqPath);
-                    compactionSchedule.scheduleWithFixedDelay(v::doCompaction, compactionInterval, compactionInterval, TimeUnit.SECONDS);
+                    v = new CompactionLog(defaultMessageStore,this, topic, queueId);
+                    v.load(true);
+                    compactionSchedule.scheduleWithFixedDelay(v::doCompaction, compactionInterval, compactionInterval, TimeUnit.MILLISECONDS);
                 } catch (IOException e) {
                     log.error("create compactionLog exception: ", e);
                     return null;
                 }
             }
-
-            v.asyncPutMessage(smr);
             return v;
         });
+
+        if (clog != null) {
+            clog.asyncPutMessage(smr);
+        }
     }
 
     public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
@@ -142,10 +147,12 @@ public class CompactionStore {
 
     }
 
-    public void flushCq(int flushLeastPages) {
-        compactionLogTable.values().forEach(log -> {
-            log.flushCq(flushLeastPages);
-        });
+    public void flushCQ(int flushLeastPages) {
+        compactionLogTable.values().forEach(log -> log.flushCQ(flushLeastPages));
+    }
+
+    public void updateMasterAddress(String addr) {
+        this.masterAddr = addr;
     }
 
     public void shutdown() {
@@ -160,4 +167,29 @@ public class CompactionStore {
             log.warn("wait compaction schedule shutdown interrupted. ");
         }
     }
+
+    public ScheduledExecutorService getCompactionSchedule() {
+        return compactionSchedule;
+    }
+
+    public String getCompactionLogPath() {
+        return compactionLogPath;
+    }
+
+    public String getCompactionCqPath() {
+        return compactionCqPath;
+    }
+
+    public CompactionPositionMgr getPositionMgr() {
+        return positionMgr;
+    }
+
+    public int getOffsetMapSize() {
+        return offsetMapSize;
+    }
+
+    public String getMasterAddr() {
+        return masterAddr;
+    }
+
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/kv/MessageFetcher.java b/store/src/main/java/org/apache/rocketmq/store/kv/MessageFetcher.java
new file mode 100644
index 000000000..5b6ac992e
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/kv/MessageFetcher.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.kv;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.collect.Sets;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
+import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+import java.io.IOException;
+import java.util.function.BiFunction;
+
+public class MessageFetcher implements AutoCloseable {
+
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private final RemotingClient client;
+    public MessageFetcher() {
+        NettyClientConfig nettyClientConfig = new NettyClientConfig();
+        nettyClientConfig.setUseTLS(false);
+        this.client = new NettyRemotingClient(nettyClientConfig);
+        this.client.start();
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.client.shutdown();
+    }
+
+    private PullMessageRequestHeader createPullMessageRequest(String topic, int queueId, long queueOffset, long subVersion) {
+        int sysFlag = PullSysFlag.buildSysFlag(false, false, false, false, true);
+
+        PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
+        requestHeader.setConsumerGroup(String.join("-", topic, String.valueOf(queueId), "pull", "group"));
+        requestHeader.setTopic(topic);
+        requestHeader.setQueueId(queueId);
+        requestHeader.setQueueOffset(queueOffset);
+        requestHeader.setMaxMsgNums(10);
+        requestHeader.setSysFlag(sysFlag);
+        requestHeader.setCommitOffset(0L);
+        requestHeader.setSuspendTimeoutMillis(1000 * 20L);
+//        requestHeader.setSubscription(subExpression);
+        requestHeader.setSubVersion(subVersion);
+        requestHeader.setMaxMsgBytes(Integer.MAX_VALUE);
+//        requestHeader.setExpressionType(expressionType);
+        return requestHeader;
+    }
+
+    private boolean prepare(String masterAddr, String topic, String groupName, long subVersion)
+        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
+        HeartbeatData heartbeatData = new HeartbeatData();
+
+        heartbeatData.setClientID(String.join("@",
+            RemotingUtil.getLocalAddress(), "compactionIns", "compactionUnit"));
+
+        ConsumerData consumerData = new ConsumerData();
+        consumerData.setGroupName(groupName);
+        consumerData.setConsumeType(ConsumeType.CONSUME_ACTIVELY);
+        consumerData.setMessageModel(MessageModel.CLUSTERING);
+        consumerData.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+//        consumerData.setSubscriptionDataSet();
+        SubscriptionData subscriptionData = new SubscriptionData();
+        subscriptionData.setTopic(topic);
+        subscriptionData.setSubString(SubscriptionData.SUB_ALL);
+        subscriptionData.setSubVersion(subVersion);
+        consumerData.setSubscriptionDataSet(Sets.newHashSet(subscriptionData));
+
+        heartbeatData.getConsumerDataSet().add(consumerData);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
+        request.setLanguage(LanguageCode.JAVA);
+        request.setBody(heartbeatData.encode());
+
+        RemotingCommand response = client.invokeSync(masterAddr, request, 1000 * 30L);
+        if (response != null && response.getCode() == ResponseCode.SUCCESS) {
+            return true;
+        }
+        return false;
+    }
+
+    private boolean pullDone(String masterAddr, String groupName)
+        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
+        UnregisterClientRequestHeader requestHeader = new UnregisterClientRequestHeader();
+        requestHeader.setClientID(String.join("@",
+            RemotingUtil.getLocalAddress(), "compactionIns", "compactionUnit"));
+        requestHeader.setProducerGroup("");
+        requestHeader.setConsumerGroup(groupName);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader);
+
+        RemotingCommand response = client.invokeSync(masterAddr, request, 1000 * 30L);
+        if (response != null && response.getCode() == ResponseCode.SUCCESS) {
+            return true;
+        }
+        return false;
+    }
+
+    private boolean stopPull(long currPullOffset, long endOffset) {
+        return currPullOffset >= endOffset && endOffset != -1;
+    }
+
+    public void pullMessageFromMaster(String topic, int queueId, long endOffset, String masterAddr,
+        BiFunction<Long, RemotingCommand, Boolean> responseHandler) throws Exception {
+        long currentPullOffset = 0;
+
+        try {
+            long subVersion = System.currentTimeMillis();
+            String groupName = String.join("-", topic, String.valueOf(queueId), "pull", "group");
+            prepare(masterAddr, topic, groupName, subVersion);
+
+
+            boolean noNewMsg = false;
+            boolean keepPull = true;
+//            PullMessageRequestHeader requestHeader = createPullMessageRequest(topic, queueId, subVersion, currentPullOffset);
+            while (!stopPull(currentPullOffset, endOffset)) {
+//                requestHeader.setQueueOffset(currentPullOffset);
+                PullMessageRequestHeader requestHeader = createPullMessageRequest(topic, queueId, currentPullOffset, subVersion);
+
+                RemotingCommand
+                    request = RemotingCommand.createRequestCommand(RequestCode.LITE_PULL_MESSAGE, requestHeader);
+                RemotingCommand response = client.invokeSync(masterAddr, request, 1000 * 30L);
+
+                PullMessageResponseHeader responseHeader = response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
+                if (responseHeader == null) {
+                    log.error("{}:{} pull message responseHeader is null", topic, queueId);
+                    throw new RemotingCommandException(topic + ":" + queueId + " pull message responseHeader is null");
+                }
+
+                switch (response.getCode()) {
+                    case ResponseCode.SUCCESS:
+                        long curOffset = responseHeader.getNextBeginOffset() - 1;
+                        keepPull = responseHandler.apply(curOffset, response);
+                        currentPullOffset = responseHeader.getNextBeginOffset();
+                        break;
+                    case ResponseCode.PULL_NOT_FOUND:       // NO_NEW_MSG, need break loop
+                        log.info("PULL_NOT_FOUND, topic:{}, queueId:{}, pullOffset:{},",
+                            topic, queueId, currentPullOffset);
+                        noNewMsg = true;
+                        break;
+                    case ResponseCode.PULL_RETRY_IMMEDIATELY:
+                        log.info("PULL_RETRY_IMMEDIATE, topic:{}, queueId:{}, pullOffset:{},",
+                            topic, queueId, currentPullOffset);
+                        break;
+                    case ResponseCode.PULL_OFFSET_MOVED:
+                        log.info("PULL_OFFSET_MOVED, topic:{}, queueId:{}, pullOffset:{},",
+                            topic, queueId, currentPullOffset);
+                        break;
+                    default:
+                        log.warn("Pull Message error, response code: {}, remark: {}",
+                            response.getCode(), response.getRemark());
+                }
+
+                if (noNewMsg || !keepPull) {
+                    break;
+                }
+            }
+            pullDone(masterAddr, groupName);
+        } finally {
+            if (client != null) {
+                client.closeChannels(Lists.newArrayList(masterAddr));
+            }
+        }
+    }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java
index fd2c37e2d..79b745d89 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java
@@ -23,6 +23,7 @@ import org.apache.rocketmq.store.logfile.MappedFile;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.function.Function;
@@ -50,33 +51,62 @@ public class SparseConsumeQueue extends BatchConsumeQueue {
 
     @Override
     public void recover() {
-        MappedFile lastMappedFile = this.mappedFileQueue.getLastMappedFile();
-        if (lastMappedFile == null) {
-            return;
-        }
-        ByteBuffer byteBuffer = lastMappedFile.sliceByteBuffer();
-        int mappedFileOffset = 0;
-        for (int i = 0; i < mappedFileSize; i += CQ_STORE_UNIT_SIZE) {
-            byteBuffer.position(i);
-            long offset = byteBuffer.getLong();
-            int size = byteBuffer.getInt();
-            byteBuffer.getLong();   //tagscode
-            byteBuffer.getLong();   //timestamp
-            long msgBaseOffset = byteBuffer.getLong();
-            short batchSize = byteBuffer.getShort();
-            if (offset >= 0 && size > 0 && msgBaseOffset >= 0 && batchSize > 0) {
-                mappedFileOffset += CQ_STORE_UNIT_SIZE;
-            } else {
-                log.info("Recover current batch consume queue file over, file:{} offset:{} size:{} msgBaseOffset:{} batchSize:{} mappedFileOffset:{}",
-                    lastMappedFile.getFileName(), offset, size, msgBaseOffset, batchSize, mappedFileOffset);
-                break;
+        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
+        if (!mappedFiles.isEmpty()) {
+            int index = mappedFiles.size() - 3;
+            if (index < 0) {
+                index = 0;
             }
-        }
 
-        lastMappedFile.setWrotePosition(mappedFileOffset);
-        lastMappedFile.setFlushedPosition(mappedFileOffset);
-        lastMappedFile.setCommittedPosition(mappedFileOffset);
-        reviseMaxAndMinOffsetInQueue();
+            MappedFile mappedFile = mappedFiles.get(index);
+            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
+            int mappedFileOffset = 0;
+            long processOffset = mappedFile.getFileFromOffset();
+            while (true) {
+                for (int i = 0; i < mappedFileSize; i += CQ_STORE_UNIT_SIZE) {
+                    byteBuffer.position(i);
+                    long offset = byteBuffer.getLong();
+                    int size = byteBuffer.getInt();
+                    byteBuffer.getLong();   //tagscode
+                    byteBuffer.getLong();   //timestamp
+                    long msgBaseOffset = byteBuffer.getLong();
+                    short batchSize = byteBuffer.getShort();
+                    if (offset >= 0 && size > 0 && msgBaseOffset >= 0 && batchSize > 0) {
+                        mappedFileOffset += CQ_STORE_UNIT_SIZE;
+                        this.maxMsgPhyOffsetInCommitLog = offset;
+                    } else {
+                        log.info("Recover current batch consume queue file over, " + "file:{} offset:{} size:{} msgBaseOffset:{} batchSize:{} mappedFileOffset:{}",
+                            mappedFile.getFileName(), offset, size, msgBaseOffset, batchSize, mappedFileOffset);
+
+                        if (mappedFileOffset != mappedFileSize) {
+                            mappedFile.setWrotePosition(mappedFileOffset);
+                            mappedFile.setFlushedPosition(mappedFileOffset);
+                            mappedFile.setCommittedPosition(mappedFileOffset);
+                        }
+
+                        break;
+                    }
+                }
+
+                index++;
+                if (index >= mappedFiles.size()) {
+                    log.info("Recover last batch consume queue file over, last mapped file:{} ", mappedFile.getFileName());
+                    break;
+                } else {
+                    mappedFile = mappedFiles.get(index);
+                    byteBuffer = mappedFile.sliceByteBuffer();
+                    processOffset = mappedFile.getFileFromOffset();
+                    mappedFileOffset = 0;
+                    log.info("Recover next batch consume queue file: " + mappedFile.getFileName());
+                }
+            }
+
+            processOffset += mappedFileOffset;
+            mappedFileQueue.setFlushedWhere(processOffset);
+            mappedFileQueue.setCommittedWhere(processOffset);
+            mappedFileQueue.truncateDirtyFiles(processOffset);
+            reviseMaxAndMinOffsetInQueue();
+        }
     }
 
     public ReferredIterator<CqUnit> iterateFromOrNext(long startOffset) {
@@ -274,6 +304,39 @@ public class SparseConsumeQueue extends BatchConsumeQueue {
             .anyMatch(mf -> Objects.equals(mf.getFile().getName(), fileName));
     }
 
+    public long getMaxPhyOffsetInLog() {
+        MappedFile lastMappedFile = mappedFileQueue.getLastMappedFile();
+        Long maxOffsetInLog = getMax(lastMappedFile, b -> b.getLong(0) + b.getInt(8));
+        if (maxOffsetInLog != null) {
+            return maxOffsetInLog;
+        } else {
+            return -1;
+        }
+    }
+
+    private <T> T getMax(MappedFile mappedFile, Function<ByteBuffer, T> function) {
+        if (mappedFile == null || mappedFile.getReadPosition() < CQ_STORE_UNIT_SIZE) {
+            return null;
+        }
+
+        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
+        for (int i = mappedFile.getReadPosition() - CQ_STORE_UNIT_SIZE; i >= 0; i -= CQ_STORE_UNIT_SIZE) {
+            byteBuffer.position(i);
+            long offset = byteBuffer.getLong();
+            int size = byteBuffer.getInt();
+            long tagsCode = byteBuffer.getLong();   //tagscode
+            long timestamp = byteBuffer.getLong();  //timestamp
+            long msgBaseOffset = byteBuffer.getLong();
+            short batchSize = byteBuffer.getShort();
+            if (offset >= 0 && size > 0 && msgBaseOffset >= 0 && batchSize > 0) {
+                byteBuffer.position(i);     //reset position
+                return function.apply(byteBuffer);
+            }
+        }
+
+        return null;
+    }
+
     @Override
     protected BatchOffsetIndex getMaxMsgOffset(MappedFile mappedFile, boolean getBatchSize, boolean getStoreTime) {
         if (mappedFile == null || mappedFile.getReadPosition() < CQ_STORE_UNIT_SIZE) {
diff --git a/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java b/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java
index 45f05c33b..9da8a03e3 100644
--- a/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java
@@ -31,6 +31,7 @@ import org.apache.rocketmq.store.MappedFileQueue;
 import org.apache.rocketmq.store.MessageExtEncoder;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageSpinLock;
 import org.apache.rocketmq.store.PutMessageStatus;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
@@ -60,8 +61,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
+import static org.apache.rocketmq.store.kv.CompactionLog.COMPACTING_SUB_FOLDER;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -112,6 +116,7 @@ public class CompactionLogTest {
         defaultMessageStore = mock(DefaultMessageStore.class);
         doReturn(storeConfig).when(defaultMessageStore).getMessageStoreConfig();
         positionMgr = mock(CompactionPositionMgr.class);
+        doReturn(-1L).when(positionMgr).getOffset(topic, queueId);
     }
 
     static int queueOffset = 0;
@@ -122,7 +127,7 @@ public class CompactionLogTest {
         msg.setTags(System.currentTimeMillis() + "TAG");
         msg.setKeys(String.valueOf(queueOffset % keyCount));
         msg.setBody(RandomStringUtils.randomAlphabetic(100).getBytes(StandardCharsets.UTF_8));
-        msg.setQueueId(queueOffset % 8);
+        msg.setQueueId(0);
         msg.setSysFlag(0);
         msg.setBornTimestamp(System.currentTimeMillis());
         msg.setStoreHost(StoreHost);
@@ -144,15 +149,15 @@ public class CompactionLogTest {
         MappedFileQueue smfq = mock(MappedFileQueue.class);
         SparseConsumeQueue scq = mock(SparseConsumeQueue.class);
         doReturn(smfq).when(scq).getMappedFileQueue();
-        CompactionLog clog = mock(CompactionLog.class);
-        FieldUtils.writeField(clog, "currentMappedFileQueue", mfq, true);
-        FieldUtils.writeField(clog, "currentBcq", scq, true);
+        CompactionLog.TopicPartitionLog tpLog = mock(CompactionLog.TopicPartitionLog.class);
+        FieldUtils.writeField(tpLog, "mappedFileQueue", mfq, true);
+        FieldUtils.writeField(tpLog, "consumeQueue", scq, true);
 
         doReturn(Lists.newArrayList()).when(mfq).getMappedFiles();
         doReturn(Lists.newArrayList()).when(smfq).getMappedFiles();
 
-        doCallRealMethod().when(clog).sanityCheck();
-        clog.sanityCheck();
+        doCallRealMethod().when(tpLog).sanityCheck();
+        tpLog.sanityCheck();
     }
 
     @Test(expected = RuntimeException.class)
@@ -161,9 +166,9 @@ public class CompactionLogTest {
         MappedFileQueue smfq = mock(MappedFileQueue.class);
         SparseConsumeQueue scq = mock(SparseConsumeQueue.class);
         doReturn(smfq).when(scq).getMappedFileQueue();
-        CompactionLog clog = mock(CompactionLog.class);
-        FieldUtils.writeField(clog, "currentMappedFileQueue", mfq, true);
-        FieldUtils.writeField(clog, "currentBcq", scq, true);
+        CompactionLog.TopicPartitionLog tpLog = mock(CompactionLog.TopicPartitionLog.class);
+        FieldUtils.writeField(tpLog, "mappedFileQueue", mfq, true);
+        FieldUtils.writeField(tpLog, "consumeQueue", scq, true);
 
         Files.createDirectories(Paths.get(logPath, topic, String.valueOf(queueId)));
         Files.write(Paths.get(logPath, topic, String.valueOf(queueId), "102400"),
@@ -175,8 +180,8 @@ public class CompactionLogTest {
         doReturn(Lists.newArrayList(mappedFile)).when(mfq).getMappedFiles();
         doReturn(Lists.newArrayList()).when(smfq).getMappedFiles();
 
-        doCallRealMethod().when(clog).sanityCheck();
-        clog.sanityCheck();
+        doCallRealMethod().when(tpLog).sanityCheck();
+        tpLog.sanityCheck();
     }
 
     @Test
@@ -184,8 +189,8 @@ public class CompactionLogTest {
         Iterator<SelectMappedBufferResult> iterator = mock(Iterator.class);
         SelectMappedBufferResult smb = mock(SelectMappedBufferResult.class);
         when(iterator.hasNext()).thenAnswer((Answer<Boolean>)invocationOnMock -> queueOffset < 1024);
-        when(iterator.next()).thenReturn(smb);
-        when(smb.getByteBuffer()).thenAnswer((Answer<ByteBuffer>)invocation -> buildMessage());
+        when(iterator.next()).thenAnswer((Answer<SelectMappedBufferResult>)invocation ->
+            new SelectMappedBufferResult(0, buildMessage(), 0, null));
 
         MappedFile mf = mock(MappedFile.class);
         List<MappedFile> mappedFileList = Lists.newArrayList(mf);
@@ -198,6 +203,7 @@ public class CompactionLogTest {
         CompactionLog clog = mock(CompactionLog.class);
         FieldUtils.writeField(clog, "defaultMessageStore", messageStore, true);
         doCallRealMethod().when(clog).getOffsetMap(any());
+        FieldUtils.writeField(clog, "positionMgr", positionMgr, true);
 
         queueOffset = 0;
         CompactionLog.OffsetMap offsetMap = clog.getOffsetMap(mappedFileList);
@@ -205,12 +211,12 @@ public class CompactionLogTest {
 
         doCallRealMethod().when(clog).compaction(any(List.class), any(CompactionLog.OffsetMap.class));
         doNothing().when(clog).putEndMessage(any(MappedFileQueue.class));
-        doCallRealMethod().when(clog).checkAndPutMessage(any(SelectMappedBufferResult.class), any(MessageExt.class), any(
-            CompactionLog.OffsetMap.class), any(MappedFileQueue.class), any(SparseConsumeQueue.class));
+        doCallRealMethod().when(clog).checkAndPutMessage(any(SelectMappedBufferResult.class),
+            any(MessageExt.class), any(CompactionLog.OffsetMap.class), any(CompactionLog.TopicPartitionLog.class));
         doCallRealMethod().when(clog).shouldRetainMsg(any(MessageExt.class), any(CompactionLog.OffsetMap.class));
         List<MessageExt> compactResult = Lists.newArrayList();
-        when(clog.asyncPutMessage(any(SelectMappedBufferResult.class), any(MessageExt.class),
-            any(MappedFileQueue.class), any(SparseConsumeQueue.class)))
+        when(clog.asyncPutMessage(any(ByteBuffer.class), any(MessageExt.class),
+            any(CompactionLog.TopicPartitionLog.class)))
             .thenAnswer((Answer<CompletableFuture< PutMessageResult >>)invocation -> {
                 compactResult.add(invocation.getArgument(1));
                 return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.PUT_OK,
@@ -223,4 +229,36 @@ public class CompactionLogTest {
         assertEquals(1023, compactResult.stream().mapToLong(MessageExt::getQueueOffset).max().orElse(0));
     }
 
+    @Test
+    public void testReplaceFiles() throws IOException, IllegalAccessException {
+        CompactionLog clog = mock(CompactionLog.class);
+        doCallRealMethod().when(clog).replaceFiles(anyList(), any(CompactionLog.TopicPartitionLog.class),
+            any(CompactionLog.TopicPartitionLog.class));
+        doCallRealMethod().when(clog).replaceCqFiles(any(SparseConsumeQueue.class),
+            any(SparseConsumeQueue.class), anyList());
+
+        CompactionLog.TopicPartitionLog dest = mock(CompactionLog.TopicPartitionLog.class);
+        MappedFileQueue destMFQ = mock(MappedFileQueue.class);
+        when(dest.getLog()).thenReturn(destMFQ);
+        List<MappedFile> destFiles = Lists.newArrayList();
+        when(destMFQ.getMappedFiles()).thenReturn(destFiles);
+
+        List<MappedFile> srcFiles = Lists.newArrayList();
+        String fileName = logPath + File.separator + COMPACTING_SUB_FOLDER + File.separator + String.format("%010d", 0);
+        MappedFile mf = new DefaultMappedFile(fileName, 1024);
+        srcFiles.add(mf);
+        MappedFileQueue srcMFQ = mock(MappedFileQueue.class);
+        when(srcMFQ.getMappedFiles()).thenReturn(srcFiles);
+        CompactionLog.TopicPartitionLog src = mock(CompactionLog.TopicPartitionLog.class);
+        when(src.getLog()).thenReturn(srcMFQ);
+
+        FieldUtils.writeField(clog, "readMessageLock", new PutMessageSpinLock(), true);
+
+        clog.replaceFiles(Lists.newArrayList(), dest, src);
+        assertEquals(destFiles.size(), 1);
+        destFiles.forEach(f -> {
+            assertFalse(f.getFileName().contains(COMPACTING_SUB_FOLDER));
+        });
+    }
+
 }
\ No newline at end of file
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 9c1af641b..a4544544b 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -64,6 +64,7 @@ import org.apache.rocketmq.tools.command.export.ExportMetricsCommand;
 import org.apache.rocketmq.tools.command.ha.HAStatusSubCommand;
 import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand;
 import org.apache.rocketmq.tools.command.message.ConsumeMessageCommand;
+import org.apache.rocketmq.tools.command.message.DumpCompactionLogCommand;
 import org.apache.rocketmq.tools.command.message.PrintMessageByQueueCommand;
 import org.apache.rocketmq.tools.command.message.PrintMessageSubCommand;
 import org.apache.rocketmq.tools.command.message.QueryMsgByIdSubCommand;
@@ -247,6 +248,7 @@ public class MQAdminStartup {
         initCommand(new ExportMetricsCommand());
 
         initCommand(new HAStatusSubCommand());
+        initCommand(new DumpCompactionLogCommand());
     }
 
     private static void initLogback() throws JoranException {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java
new file mode 100644
index 000000000..b1c8c33cb
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/DumpCompactionLogCommand.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.message;
+
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+public class DumpCompactionLogCommand implements SubCommand {
+    @Override
+    public String commandDesc() {
+        return "parse compaction log to message";
+    }
+
+    @Override
+    public String commandName() {
+        return "dumpCompactionLog";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("f", "file", true, "to dump file name");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook)
+            throws SubCommandException {
+        if (commandLine.hasOption("f")) {
+            String fileName = commandLine.getOptionValue("f");
+            Path filePath = Paths.get(fileName);
+            if (!Files.exists(filePath)) {
+                throw new SubCommandException("file " + fileName + " not exist.");
+            }
+
+            if (Files.isDirectory(filePath)) {
+                throw new SubCommandException("file " + fileName + " is a directory.");
+            }
+
+            try {
+                long fileSize = Files.size(filePath);
+                FileChannel fileChannel = new RandomAccessFile(fileName, "rw").getChannel();
+                ByteBuffer buf = fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
+
+                int current = 0;
+                while (current < fileSize) {
+                    buf.position(current);
+                    ByteBuffer bb = buf.slice();
+                    int size = bb.getInt();
+                    if (size > buf.capacity() || size < 0) {
+                        break;
+                    } else {
+                        bb.limit(size);
+                        bb.rewind();
+                    }
+
+                    MessageExt messageExt = MessageDecoder.decode(bb, false, false);
+                    if (messageExt == null) {
+                        break;
+                    } else {
+                        current += size;
+                        System.out.printf(messageExt + "\n");
+                    }
+                }
+
+                UtilAll.cleanBuffer(buf);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+
+        } else {
+            System.out.print("miss dump log file name\n");
+        }
+
+
+    }
+}