You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2019/02/21 09:44:57 UTC

[rocketmq] branch rip7_multi_directories updated: [RIP-7] Multiple Directories Storage Support (#751)

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch rip7_multi_directories
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/rip7_multi_directories by this push:
     new 0a31de1  [RIP-7] Multiple Directories Storage Support (#751)
0a31de1 is described below

commit 0a31de1cfd71f77cf3f167fdd9fff2b63fdd6123
Author: Jason918 <ja...@qq.com>
AuthorDate: Thu Feb 21 17:44:49 2019 +0800

    [RIP-7] Multiple Directories Storage Support (#751)
    
    * Update issue_template.md
    
    * Update README.md
    
    * RIP-7 Multiple Directories Storage Suppor
    
    * add readonly commit log paths
    
    * update
    
    * finish test
    
    * add test for testUpdatePathsOnline
    
    * bug fix
    
    * code format
---
 .github/ISSUE_TEMPLATE/issue_template.md           |   2 +-
 README.md                                          |   3 +-
 .../broker/processor/SendMessageProcessor.java     |  11 +-
 .../java/org/apache/rocketmq/store/CommitLog.java  |  12 ++-
 .../apache/rocketmq/store/DefaultMessageStore.java |  56 +++++++----
 .../org/apache/rocketmq/store/MappedFileQueue.java | 111 ++++++++++++---------
 .../rocketmq/store/MultiPathMappedFileQueue.java   |  98 ++++++++++++++++++
 .../rocketmq/store/config/MessageStoreConfig.java  |  47 +++++++++
 .../store/MultiPathMappedFileQueueTest.java        | 103 +++++++++++++++++++
 9 files changed, 368 insertions(+), 75 deletions(-)

diff --git a/.github/ISSUE_TEMPLATE/issue_template.md b/.github/ISSUE_TEMPLATE/issue_template.md
index a77fb61..1c8fa94 100644
--- a/.github/ISSUE_TEMPLATE/issue_template.md
+++ b/.github/ISSUE_TEMPLATE/issue_template.md
@@ -4,7 +4,7 @@ about: Describe this issue template's purpose here.
 
 ---
 
-The issue tracker is **ONLY** used for bug report and feature request. Keep in mind, please check whether there is an existing same report before your raise a new one.
+The issue tracker is **ONLY** used for bug report(feature request need to follow [RIP process](https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal)). Keep in mind, please check whether there is an existing same report before your raise a new one.
 
 Alternately (especially if your communication is not a bug report), you can send mail to our [mailing lists](http://rocketmq.apache.org/about/contact/). We welcome any friendly suggestions, bug fixes, collaboration and other improvements.
 
diff --git a/README.md b/README.md
index 478f32f..3581742 100644
--- a/README.md
+++ b/README.md
@@ -32,6 +32,7 @@ It offers a variety of features:
 * Home: <https://rocketmq.apache.org>
 * Docs: <https://rocketmq.apache.org/docs/quick-start/>
 * Issues: <https://github.com/apache/rocketmq/issues>
+* Rips: <https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal>
 * Ask: <https://stackoverflow.com/questions/tagged/rocketmq>
 * Slack: <https://rocketmq-invite-automation.herokuapp.com/>
  
@@ -43,7 +44,7 @@ It offers a variety of features:
 ----------
 
 ## Contributing
-We always welcome new contributions, whether for trivial cleanups, big new features or other material rewards, more details see [here](http://rocketmq.apache.org/docs/how-to-contribute/).
+We always welcome new contributions, whether for trivial cleanups, [big new features](https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal) or other material rewards, more details see [here](http://rocketmq.apache.org/docs/how-to-contribute/).
  
 ----------
 ## License
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index b7e7a61..6bae91f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -560,8 +560,15 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
     }
 
     private String diskUtil() {
-        String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
-        double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
+        double physicRatio = -1;
+        if (this.brokerController.getMessageStoreConfig().isMultiCommitLogPathEnable()) {
+            for (String storePathPhysic : this.brokerController.getMessageStoreConfig().getCommitLogStorePaths()) {
+                physicRatio = Math.max(physicRatio, UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic));
+            }
+        } else {
+            String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
+            physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
+        }
 
         String storePathLogis =
             StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 03b1151..d920c41 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -64,8 +64,16 @@ public class CommitLog {
     private final PutMessageLock putMessageLock;
 
     public CommitLog(final DefaultMessageStore defaultMessageStore) {
-        this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
-            defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
+        if (defaultMessageStore.getMessageStoreConfig().isMultiCommitLogPathEnable()) {
+            this.mappedFileQueue = new MultiPathMappedFileQueue(defaultMessageStore.getMessageStoreConfig(),
+                defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(),
+                defaultMessageStore.getAllocateMappedFileService());
+        } else {
+            this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
+                    defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(),
+                    defaultMessageStore.getAllocateMappedFileService());
+        }
+
         this.defaultMessageStore = defaultMessageStore;
 
         if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index e0aef4f..86dd689 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -693,11 +694,18 @@ public class DefaultMessageStore implements MessageStore {
     public HashMap<String, String> getRuntimeInfo() {
         HashMap<String, String> result = this.storeStatsService.getRuntimeInfo();
 
-        {
+        if (DefaultMessageStore.this.getMessageStoreConfig().isMultiCommitLogPathEnable()) {
+            double maxValue = Double.MIN_VALUE;
+            for (String clPath : DefaultMessageStore.this.getMessageStoreConfig().getCommitLogStorePaths()) {
+                double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(clPath);
+                result.put(RunningStats.commitLogDiskRatio.name() + "_" + clPath, String.valueOf(physicRatio));
+                maxValue = Math.max(maxValue, physicRatio);
+            }
+            result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(maxValue));
+        } else {
             String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
             double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
             result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio));
-
         }
 
         {
@@ -1537,27 +1545,35 @@ public class DefaultMessageStore implements MessageStore {
             cleanImmediately = false;
 
             {
-                String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
-                double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
-                if (physicRatio > diskSpaceWarningLevelRatio) {
-                    boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
-                    if (diskok) {
-                        DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
-                    }
-
-                    cleanImmediately = true;
-                } else if (physicRatio > diskSpaceCleanForciblyRatio) {
-                    cleanImmediately = true;
+                List<String> storePaths;
+                if (DefaultMessageStore.this.getMessageStoreConfig().isMultiCommitLogPathEnable()) {
+                    storePaths = DefaultMessageStore.this.getMessageStoreConfig().getCommitLogStorePaths();
                 } else {
-                    boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
-                    if (!diskok) {
-                        DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
-                    }
+                    storePaths = Collections.singletonList(DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog());
                 }
 
-                if (physicRatio < 0 || physicRatio > ratio) {
-                    DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
-                    return true;
+                for (String storePathPhysic : storePaths) {
+                    double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
+                    if (physicRatio > diskSpaceWarningLevelRatio) {
+                        boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
+                        if (diskok) {
+                            DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full, storePathPhysic=" + storePathPhysic);
+                        }
+
+                        cleanImmediately = true;
+                    } else if (physicRatio > diskSpaceCleanForciblyRatio) {
+                        cleanImmediately = true;
+                    } else {
+                        boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
+                        if (!diskok) {
+                            DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok, storePathPhysic=" + storePathPhysic);
+                        }
+                    }
+
+                    if (physicRatio < 0 || physicRatio > ratio) {
+                        DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio + ", storePathPhysic=" + storePathPhysic);
+                        return true;
+                    }
                 }
             }
 
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
index 86de3d2..be29e6c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
@@ -20,6 +20,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
@@ -37,13 +38,13 @@ public class MappedFileQueue {
 
     private final String storePath;
 
-    private final int mappedFileSize;
+    protected final int mappedFileSize;
 
-    private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
+    protected final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
 
     private final AllocateMappedFileService allocateMappedFileService;
 
-    private long flushedWhere = 0;
+    protected long flushedWhere = 0;
     private long committedWhere = 0;
 
     private volatile long storeTimestamp = 0;
@@ -144,35 +145,39 @@ public class MappedFileQueue {
         }
     }
 
+
     public boolean load() {
         File dir = new File(this.storePath);
-        File[] files = dir.listFiles();
-        if (files != null) {
-            // ascending order
-            Arrays.sort(files);
-            for (File file : files) {
-
-                if (file.length() != this.mappedFileSize) {
-                    log.warn(file + "\t" + file.length()
+        File[] ls = dir.listFiles();
+        if (ls != null) {
+            return doLoad(Arrays.asList(ls));
+        }
+        return true;
+    }
+
+    public boolean doLoad(List<File> files) {
+        // ascending order
+        Collections.sort(files);
+        for (File file : files) {
+            if (file.length() != this.mappedFileSize) {
+                log.warn(file + "\t" + file.length()
                         + " length not matched message store config value, ignore it");
-                    return true;
-                }
+                return true;
+            }
 
-                try {
-                    MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
-
-                    mappedFile.setWrotePosition(this.mappedFileSize);
-                    mappedFile.setFlushedPosition(this.mappedFileSize);
-                    mappedFile.setCommittedPosition(this.mappedFileSize);
-                    this.mappedFiles.add(mappedFile);
-                    log.info("load " + file.getPath() + " OK");
-                } catch (IOException e) {
-                    log.error("load file " + file + " error", e);
-                    return false;
-                }
+            try {
+                MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
+
+                mappedFile.setWrotePosition(this.mappedFileSize);
+                mappedFile.setFlushedPosition(this.mappedFileSize);
+                mappedFile.setCommittedPosition(this.mappedFileSize);
+                this.mappedFiles.add(mappedFile);
+                log.info("load " + file.getPath() + " OK");
+            } catch (IOException e) {
+                log.error("load file " + file + " error", e);
+                return false;
             }
         }
-
         return true;
     }
 
@@ -204,33 +209,41 @@ public class MappedFileQueue {
         }
 
         if (createOffset != -1 && needCreate) {
-            String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
-            String nextNextFilePath = this.storePath + File.separator
-                + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
-            MappedFile mappedFile = null;
+            return tryCreateMappedFile(createOffset);
+        }
+
+        return mappedFileLast;
+    }
+
+    protected MappedFile tryCreateMappedFile(long createOffset) {
+        String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
+        String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset
+                + this.mappedFileSize);
+        return doCreateMappedFile(nextFilePath, nextNextFilePath);
+    }
 
-            if (this.allocateMappedFileService != null) {
-                mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
+    protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {
+        MappedFile mappedFile = null;
+
+        if (this.allocateMappedFileService != null) {
+            mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                     nextNextFilePath, this.mappedFileSize);
-            } else {
-                try {
-                    mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
-                } catch (IOException e) {
-                    log.error("create mappedFile exception", e);
-                }
+        } else {
+            try {
+                mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
+            } catch (IOException e) {
+                log.error("create mappedFile exception", e);
             }
+        }
 
-            if (mappedFile != null) {
-                if (this.mappedFiles.isEmpty()) {
-                    mappedFile.setFirstCreateInQueue(true);
-                }
-                this.mappedFiles.add(mappedFile);
+        if (mappedFile != null) {
+            if (this.mappedFiles.isEmpty()) {
+                mappedFile.setFirstCreateInQueue(true);
             }
-
-            return mappedFile;
+            this.mappedFiles.add(mappedFile);
         }
 
-        return mappedFileLast;
+        return mappedFile;
     }
 
     public MappedFile getLastMappedFile(final long startOffset) {
@@ -398,7 +411,7 @@ public class MappedFileQueue {
                     destroy = maxOffsetInLogicQueue < offset;
                     if (destroy) {
                         log.info("physic min offset " + offset + ", logics in current mappedFile max offset "
-                            + maxOffsetInLogicQueue + ", delete it");
+                                + maxOffsetInLogicQueue + ", delete it");
                     }
                 } else if (!mappedFile.isAvailable()) { // Handle hanged file.
                     log.warn("Found a hanged consume queue file, attempting to delete it.");
@@ -466,7 +479,7 @@ public class MappedFileQueue {
             if (firstMappedFile != null && lastMappedFile != null) {
                 if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
                     LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
-                        offset,
+                            offset,
                         firstMappedFile.getFileFromOffset(),
                         lastMappedFile.getFileFromOffset() + this.mappedFileSize,
                         this.mappedFileSize,
@@ -480,7 +493,7 @@ public class MappedFileQueue {
                     }
 
                     if (targetFile != null && offset >= targetFile.getFileFromOffset()
-                        && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
+                            && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
                         return targetFile;
                     }
 
diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java
new file mode 100644
index 0000000..78ef4e0
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class MultiPathMappedFileQueue extends MappedFileQueue {
+
+    private final MessageStoreConfig config;
+
+    public MultiPathMappedFileQueue(MessageStoreConfig messageStoreConfig, int mappedFileSize,
+        AllocateMappedFileService allocateMappedFileService) {
+        super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, allocateMappedFileService);
+        this.config = messageStoreConfig;
+    }
+
+
+    @Override
+    public boolean load() {
+        List<File> files = new ArrayList<>();
+        for (String path : config.getCommitLogStorePaths()) {
+            File dir = new File(path);
+            File[] ls = dir.listFiles();
+            if (ls != null) {
+                Collections.addAll(files, ls);
+            }
+        }
+        if (config.getReadOnlyCommitLogStorePaths() != null) {
+            for (String path : config.getReadOnlyCommitLogStorePaths()) {
+                File dir = new File(path);
+                File[] ls = dir.listFiles();
+                if (ls != null) {
+                    Collections.addAll(files, ls);
+                }
+            }
+        }
+
+        return doLoad(files);
+    }
+
+    @Override
+    protected MappedFile tryCreateMappedFile(long createOffset) {
+        long fileIdx = createOffset / this.mappedFileSize;
+        List<String> pathList = config.getCommitLogStorePaths();
+        String nextFilePath = pathList.get((int) (fileIdx % pathList.size())) + File.separator
+                + UtilAll.offset2FileName(createOffset);
+        String nextNextFilePath = pathList.get((int) ((fileIdx + 1) % pathList.size())) + File.separator
+                + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
+        return doCreateMappedFile(nextFilePath, nextNextFilePath);
+    }
+
+    @Override
+    public void destroy() {
+        for (MappedFile mf : this.mappedFiles) {
+            mf.destroy(1000 * 3);
+        }
+        this.mappedFiles.clear();
+        this.flushedWhere = 0;
+
+        if (config.getCommitLogStorePaths() != null) {
+            for (String path : config.getCommitLogStorePaths()) {
+                File file = new File(path);
+                if (file.isDirectory()) {
+                    file.delete();
+                }
+            }
+        }
+        if (config.getReadOnlyCommitLogStorePaths() != null) {
+            for (String path : config.getReadOnlyCommitLogStorePaths()) {
+                File file = new File(path);
+                if (file.isDirectory()) {
+                    file.delete();
+                }
+            }
+        }
+    }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 8d60321..7218c35 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -17,6 +17,10 @@
 package org.apache.rocketmq.store.config;
 
 import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
 import org.apache.rocketmq.common.annotation.ImportantField;
 import org.apache.rocketmq.store.ConsumeQueue;
 
@@ -30,6 +34,12 @@ public class MessageStoreConfig {
     private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
         + File.separator + "commitlog";
 
+    private boolean multiCommitLogPathEnable = false;
+
+    private List<String> commitLogStorePaths = null;
+
+    private List<String> readOnlyCommitLogStorePaths = null;
+
     // CommitLog file size,default is 1G
     private int mapedFileSizeCommitLog = 1024 * 1024 * 1024;
     // ConsumeQueue file size,default is 30W
@@ -666,4 +676,41 @@ public class MessageStoreConfig {
         this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval;
     }
 
+    public boolean isMultiCommitLogPathEnable() {
+        return multiCommitLogPathEnable;
+    }
+
+    public void setMultiCommitLogPathEnable(boolean multiCommitLogPathEnable) {
+        this.multiCommitLogPathEnable = multiCommitLogPathEnable;
+    }
+
+    public List<String> getCommitLogStorePaths() {
+        return commitLogStorePaths;
+    }
+
+    public void setCommitLogStorePaths(String commitLogStorePaths) {
+        String[] tokens = commitLogStorePaths.trim().split(":");
+        List<String> pathList = Arrays.asList(tokens);
+        Collections.sort(pathList);
+        this.commitLogStorePaths = pathList;
+    }
+
+    public void setCommitLogStorePaths(List<String> commitLogStorePaths) {
+        this.commitLogStorePaths = commitLogStorePaths;
+    }
+
+    public List<String> getReadOnlyCommitLogStorePaths() {
+        return readOnlyCommitLogStorePaths;
+    }
+
+    public void setReadOnlyCommitLogStorePaths(List<String> readOnlyCommitLogStorePaths) {
+        this.readOnlyCommitLogStorePaths = readOnlyCommitLogStorePaths;
+    }
+
+    public void setReadOnlyCommitLogStorePaths(String readOnlyCommitLogStorePaths) {
+        String[] tokens = readOnlyCommitLogStorePaths.trim().split(":");
+        List<String> pathList = Arrays.asList(tokens);
+        Collections.sort(pathList);
+        this.readOnlyCommitLogStorePaths = pathList;
+    }
 }
diff --git a/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java
new file mode 100644
index 0000000..1d0a6e5
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+public class MultiPathMappedFileQueueTest {
+
+    @Test
+    public void testGetLastMappedFile() {
+        final byte[] fixedMsg = new byte[1024];
+
+        MessageStoreConfig config = new MessageStoreConfig();
+        config.setMultiCommitLogPathEnable(true);
+        config.setCommitLogStorePaths("target/unit_test_store/a/:target/unit_test_store/b/:target/unit_test_store/c/");
+        MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null);
+        for (int i = 0; i < 1024; i++) {
+            MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i);
+            assertThat(mappedFile).isNotNull();
+            assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
+            int idx = i % config.getCommitLogStorePaths().size();
+            assertThat(mappedFile.getFileName().startsWith(config.getCommitLogStorePaths().get(idx))).isTrue();
+        }
+        mappedFileQueue.shutdown(1000);
+        mappedFileQueue.destroy();
+    }
+
+    @Test
+    public void testLoadReadOnlyMappedFiles() {
+        {
+            //create old mapped files
+            final byte[] fixedMsg = new byte[1024];
+            MessageStoreConfig config = new MessageStoreConfig();
+            config.setMultiCommitLogPathEnable(true);
+            config.setCommitLogStorePaths("target/unit_test_store/a/:target/unit_test_store/b/:target/unit_test_store/c/");
+            MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null);
+            for (int i = 0; i < 1024; i++) {
+                MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i);
+                assertThat(mappedFile).isNotNull();
+                assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
+                int idx = i % config.getCommitLogStorePaths().size();
+                assertThat(mappedFile.getFileName().startsWith(config.getCommitLogStorePaths().get(idx))).isTrue();
+            }
+            mappedFileQueue.shutdown(1000);
+        }
+
+        // test load and readonly
+        MessageStoreConfig config = new MessageStoreConfig();
+        config.setMultiCommitLogPathEnable(true);
+        config.setCommitLogStorePaths("target/unit_test_store/b/");
+        config.setReadOnlyCommitLogStorePaths("target/unit_test_store/a:target/unit_test_store/c");
+        MultiPathMappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null);
+
+        mappedFileQueue.load();
+
+        assertThat(mappedFileQueue.mappedFiles.size()).isEqualTo(1024);
+        mappedFileQueue.destroy();
+
+    }
+
+    @Test
+    public void testUpdatePathsOnline() {
+        final byte[] fixedMsg = new byte[1024];
+
+        MessageStoreConfig config = new MessageStoreConfig();
+        config.setMultiCommitLogPathEnable(true);
+        config.setCommitLogStorePaths("target/unit_test_store/a/:target/unit_test_store/b/:target/unit_test_store/c/");
+        MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null);
+        for (int i = 0; i < 1024; i++) {
+            MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i);
+            assertThat(mappedFile).isNotNull();
+            assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
+            int idx = i % config.getCommitLogStorePaths().size();
+            assertThat(mappedFile.getFileName().startsWith(config.getCommitLogStorePaths().get(idx))).isTrue();
+
+            if (i == 500) {
+                config.setCommitLogStorePaths("target/unit_test_store/a/:target/unit_test_store/b/");
+                assertThat(config.getCommitLogStorePaths().size()).isEqualTo(2);
+            }
+        }
+        mappedFileQueue.shutdown(1000);
+        mappedFileQueue.destroy();
+    }
+}
\ No newline at end of file