You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2019/02/21 09:44:57 UTC
[rocketmq] branch rip7_multi_directories updated: [RIP-7] Multiple
Directories Storage Support (#751)
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch rip7_multi_directories
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/rip7_multi_directories by this push:
new 0a31de1 [RIP-7] Multiple Directories Storage Support (#751)
0a31de1 is described below
commit 0a31de1cfd71f77cf3f167fdd9fff2b63fdd6123
Author: Jason918 <ja...@qq.com>
AuthorDate: Thu Feb 21 17:44:49 2019 +0800
[RIP-7] Multiple Directories Storage Support (#751)
* Update issue_template.md
* Update README.md
* RIP-7 Multiple Directories Storage Suppor
* add readonly commit log paths
* update
* finish test
* add test for testUpdatePathsOnline
* bug fix
* code format
---
.github/ISSUE_TEMPLATE/issue_template.md | 2 +-
README.md | 3 +-
.../broker/processor/SendMessageProcessor.java | 11 +-
.../java/org/apache/rocketmq/store/CommitLog.java | 12 ++-
.../apache/rocketmq/store/DefaultMessageStore.java | 56 +++++++----
.../org/apache/rocketmq/store/MappedFileQueue.java | 111 ++++++++++++---------
.../rocketmq/store/MultiPathMappedFileQueue.java | 98 ++++++++++++++++++
.../rocketmq/store/config/MessageStoreConfig.java | 47 +++++++++
.../store/MultiPathMappedFileQueueTest.java | 103 +++++++++++++++++++
9 files changed, 368 insertions(+), 75 deletions(-)
diff --git a/.github/ISSUE_TEMPLATE/issue_template.md b/.github/ISSUE_TEMPLATE/issue_template.md
index a77fb61..1c8fa94 100644
--- a/.github/ISSUE_TEMPLATE/issue_template.md
+++ b/.github/ISSUE_TEMPLATE/issue_template.md
@@ -4,7 +4,7 @@ about: Describe this issue template's purpose here.
---
-The issue tracker is **ONLY** used for bug report and feature request. Keep in mind, please check whether there is an existing same report before your raise a new one.
+The issue tracker is **ONLY** used for bug report(feature request need to follow [RIP process](https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal)). Keep in mind, please check whether there is an existing same report before your raise a new one.
Alternately (especially if your communication is not a bug report), you can send mail to our [mailing lists](http://rocketmq.apache.org/about/contact/). We welcome any friendly suggestions, bug fixes, collaboration and other improvements.
diff --git a/README.md b/README.md
index 478f32f..3581742 100644
--- a/README.md
+++ b/README.md
@@ -32,6 +32,7 @@ It offers a variety of features:
* Home: <https://rocketmq.apache.org>
* Docs: <https://rocketmq.apache.org/docs/quick-start/>
* Issues: <https://github.com/apache/rocketmq/issues>
+* Rips: <https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal>
* Ask: <https://stackoverflow.com/questions/tagged/rocketmq>
* Slack: <https://rocketmq-invite-automation.herokuapp.com/>
@@ -43,7 +44,7 @@ It offers a variety of features:
----------
## Contributing
-We always welcome new contributions, whether for trivial cleanups, big new features or other material rewards, more details see [here](http://rocketmq.apache.org/docs/how-to-contribute/).
+We always welcome new contributions, whether for trivial cleanups, [big new features](https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal) or other material rewards, more details see [here](http://rocketmq.apache.org/docs/how-to-contribute/).
----------
## License
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index b7e7a61..6bae91f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -560,8 +560,15 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
private String diskUtil() {
- String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
- double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
+ double physicRatio = -1;
+ if (this.brokerController.getMessageStoreConfig().isMultiCommitLogPathEnable()) {
+ for (String storePathPhysic : this.brokerController.getMessageStoreConfig().getCommitLogStorePaths()) {
+ physicRatio = Math.max(physicRatio, UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic));
+ }
+ } else {
+ String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
+ physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
+ }
String storePathLogis =
StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 03b1151..d920c41 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -64,8 +64,16 @@ public class CommitLog {
private final PutMessageLock putMessageLock;
public CommitLog(final DefaultMessageStore defaultMessageStore) {
- this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
- defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
+ if (defaultMessageStore.getMessageStoreConfig().isMultiCommitLogPathEnable()) {
+ this.mappedFileQueue = new MultiPathMappedFileQueue(defaultMessageStore.getMessageStoreConfig(),
+ defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(),
+ defaultMessageStore.getAllocateMappedFileService());
+ } else {
+ this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
+ defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(),
+ defaultMessageStore.getAllocateMappedFileService());
+ }
+
this.defaultMessageStore = defaultMessageStore;
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index e0aef4f..86dd689 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -693,11 +694,18 @@ public class DefaultMessageStore implements MessageStore {
public HashMap<String, String> getRuntimeInfo() {
HashMap<String, String> result = this.storeStatsService.getRuntimeInfo();
- {
+ if (DefaultMessageStore.this.getMessageStoreConfig().isMultiCommitLogPathEnable()) {
+ double maxValue = Double.MIN_VALUE;
+ for (String clPath : DefaultMessageStore.this.getMessageStoreConfig().getCommitLogStorePaths()) {
+ double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(clPath);
+ result.put(RunningStats.commitLogDiskRatio.name() + "_" + clPath, String.valueOf(physicRatio));
+ maxValue = Math.max(maxValue, physicRatio);
+ }
+ result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(maxValue));
+ } else {
String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio));
-
}
{
@@ -1537,27 +1545,35 @@ public class DefaultMessageStore implements MessageStore {
cleanImmediately = false;
{
- String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
- double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
- if (physicRatio > diskSpaceWarningLevelRatio) {
- boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
- if (diskok) {
- DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
- }
-
- cleanImmediately = true;
- } else if (physicRatio > diskSpaceCleanForciblyRatio) {
- cleanImmediately = true;
+ List<String> storePaths;
+ if (DefaultMessageStore.this.getMessageStoreConfig().isMultiCommitLogPathEnable()) {
+ storePaths = DefaultMessageStore.this.getMessageStoreConfig().getCommitLogStorePaths();
} else {
- boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
- if (!diskok) {
- DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
- }
+ storePaths = Collections.singletonList(DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog());
}
- if (physicRatio < 0 || physicRatio > ratio) {
- DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
- return true;
+ for (String storePathPhysic : storePaths) {
+ double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
+ if (physicRatio > diskSpaceWarningLevelRatio) {
+ boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
+ if (diskok) {
+ DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full, storePathPhysic=" + storePathPhysic);
+ }
+
+ cleanImmediately = true;
+ } else if (physicRatio > diskSpaceCleanForciblyRatio) {
+ cleanImmediately = true;
+ } else {
+ boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
+ if (!diskok) {
+ DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok, storePathPhysic=" + storePathPhysic);
+ }
+ }
+
+ if (physicRatio < 0 || physicRatio > ratio) {
+ DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio + ", storePathPhysic=" + storePathPhysic);
+ return true;
+ }
}
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
index 86de3d2..be29e6c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
@@ -20,6 +20,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
@@ -37,13 +38,13 @@ public class MappedFileQueue {
private final String storePath;
- private final int mappedFileSize;
+ protected final int mappedFileSize;
- private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
+ protected final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
private final AllocateMappedFileService allocateMappedFileService;
- private long flushedWhere = 0;
+ protected long flushedWhere = 0;
private long committedWhere = 0;
private volatile long storeTimestamp = 0;
@@ -144,35 +145,39 @@ public class MappedFileQueue {
}
}
+
public boolean load() {
File dir = new File(this.storePath);
- File[] files = dir.listFiles();
- if (files != null) {
- // ascending order
- Arrays.sort(files);
- for (File file : files) {
-
- if (file.length() != this.mappedFileSize) {
- log.warn(file + "\t" + file.length()
+ File[] ls = dir.listFiles();
+ if (ls != null) {
+ return doLoad(Arrays.asList(ls));
+ }
+ return true;
+ }
+
+ public boolean doLoad(List<File> files) {
+ // ascending order
+ Collections.sort(files);
+ for (File file : files) {
+ if (file.length() != this.mappedFileSize) {
+ log.warn(file + "\t" + file.length()
+ " length not matched message store config value, ignore it");
- return true;
- }
+ return true;
+ }
- try {
- MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
-
- mappedFile.setWrotePosition(this.mappedFileSize);
- mappedFile.setFlushedPosition(this.mappedFileSize);
- mappedFile.setCommittedPosition(this.mappedFileSize);
- this.mappedFiles.add(mappedFile);
- log.info("load " + file.getPath() + " OK");
- } catch (IOException e) {
- log.error("load file " + file + " error", e);
- return false;
- }
+ try {
+ MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
+
+ mappedFile.setWrotePosition(this.mappedFileSize);
+ mappedFile.setFlushedPosition(this.mappedFileSize);
+ mappedFile.setCommittedPosition(this.mappedFileSize);
+ this.mappedFiles.add(mappedFile);
+ log.info("load " + file.getPath() + " OK");
+ } catch (IOException e) {
+ log.error("load file " + file + " error", e);
+ return false;
}
}
-
return true;
}
@@ -204,33 +209,41 @@ public class MappedFileQueue {
}
if (createOffset != -1 && needCreate) {
- String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
- String nextNextFilePath = this.storePath + File.separator
- + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
- MappedFile mappedFile = null;
+ return tryCreateMappedFile(createOffset);
+ }
+
+ return mappedFileLast;
+ }
+
+ protected MappedFile tryCreateMappedFile(long createOffset) {
+ String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
+ String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset
+ + this.mappedFileSize);
+ return doCreateMappedFile(nextFilePath, nextNextFilePath);
+ }
- if (this.allocateMappedFileService != null) {
- mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
+ protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {
+ MappedFile mappedFile = null;
+
+ if (this.allocateMappedFileService != null) {
+ mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
- } else {
- try {
- mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
- } catch (IOException e) {
- log.error("create mappedFile exception", e);
- }
+ } else {
+ try {
+ mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
+ } catch (IOException e) {
+ log.error("create mappedFile exception", e);
}
+ }
- if (mappedFile != null) {
- if (this.mappedFiles.isEmpty()) {
- mappedFile.setFirstCreateInQueue(true);
- }
- this.mappedFiles.add(mappedFile);
+ if (mappedFile != null) {
+ if (this.mappedFiles.isEmpty()) {
+ mappedFile.setFirstCreateInQueue(true);
}
-
- return mappedFile;
+ this.mappedFiles.add(mappedFile);
}
- return mappedFileLast;
+ return mappedFile;
}
public MappedFile getLastMappedFile(final long startOffset) {
@@ -398,7 +411,7 @@ public class MappedFileQueue {
destroy = maxOffsetInLogicQueue < offset;
if (destroy) {
log.info("physic min offset " + offset + ", logics in current mappedFile max offset "
- + maxOffsetInLogicQueue + ", delete it");
+ + maxOffsetInLogicQueue + ", delete it");
}
} else if (!mappedFile.isAvailable()) { // Handle hanged file.
log.warn("Found a hanged consume queue file, attempting to delete it.");
@@ -466,7 +479,7 @@ public class MappedFileQueue {
if (firstMappedFile != null && lastMappedFile != null) {
if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
- offset,
+ offset,
firstMappedFile.getFileFromOffset(),
lastMappedFile.getFileFromOffset() + this.mappedFileSize,
this.mappedFileSize,
@@ -480,7 +493,7 @@ public class MappedFileQueue {
}
if (targetFile != null && offset >= targetFile.getFileFromOffset()
- && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
+ && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
return targetFile;
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java
new file mode 100644
index 0000000..78ef4e0
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class MultiPathMappedFileQueue extends MappedFileQueue {
+
+ private final MessageStoreConfig config;
+
+ public MultiPathMappedFileQueue(MessageStoreConfig messageStoreConfig, int mappedFileSize,
+ AllocateMappedFileService allocateMappedFileService) {
+ super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, allocateMappedFileService);
+ this.config = messageStoreConfig;
+ }
+
+
+ @Override
+ public boolean load() {
+ List<File> files = new ArrayList<>();
+ for (String path : config.getCommitLogStorePaths()) {
+ File dir = new File(path);
+ File[] ls = dir.listFiles();
+ if (ls != null) {
+ Collections.addAll(files, ls);
+ }
+ }
+ if (config.getReadOnlyCommitLogStorePaths() != null) {
+ for (String path : config.getReadOnlyCommitLogStorePaths()) {
+ File dir = new File(path);
+ File[] ls = dir.listFiles();
+ if (ls != null) {
+ Collections.addAll(files, ls);
+ }
+ }
+ }
+
+ return doLoad(files);
+ }
+
+ @Override
+ protected MappedFile tryCreateMappedFile(long createOffset) {
+ long fileIdx = createOffset / this.mappedFileSize;
+ List<String> pathList = config.getCommitLogStorePaths();
+ String nextFilePath = pathList.get((int) (fileIdx % pathList.size())) + File.separator
+ + UtilAll.offset2FileName(createOffset);
+ String nextNextFilePath = pathList.get((int) ((fileIdx + 1) % pathList.size())) + File.separator
+ + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
+ return doCreateMappedFile(nextFilePath, nextNextFilePath);
+ }
+
+ @Override
+ public void destroy() {
+ for (MappedFile mf : this.mappedFiles) {
+ mf.destroy(1000 * 3);
+ }
+ this.mappedFiles.clear();
+ this.flushedWhere = 0;
+
+ if (config.getCommitLogStorePaths() != null) {
+ for (String path : config.getCommitLogStorePaths()) {
+ File file = new File(path);
+ if (file.isDirectory()) {
+ file.delete();
+ }
+ }
+ }
+ if (config.getReadOnlyCommitLogStorePaths() != null) {
+ for (String path : config.getReadOnlyCommitLogStorePaths()) {
+ File file = new File(path);
+ if (file.isDirectory()) {
+ file.delete();
+ }
+ }
+ }
+ }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 8d60321..7218c35 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -17,6 +17,10 @@
package org.apache.rocketmq.store.config;
import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.store.ConsumeQueue;
@@ -30,6 +34,12 @@ public class MessageStoreConfig {
private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
+ File.separator + "commitlog";
+ private boolean multiCommitLogPathEnable = false;
+
+ private List<String> commitLogStorePaths = null;
+
+ private List<String> readOnlyCommitLogStorePaths = null;
+
// CommitLog file size,default is 1G
private int mapedFileSizeCommitLog = 1024 * 1024 * 1024;
// ConsumeQueue file size,default is 30W
@@ -666,4 +676,41 @@ public class MessageStoreConfig {
this.commitCommitLogThoroughInterval = commitCommitLogThoroughInterval;
}
+ public boolean isMultiCommitLogPathEnable() {
+ return multiCommitLogPathEnable;
+ }
+
+ public void setMultiCommitLogPathEnable(boolean multiCommitLogPathEnable) {
+ this.multiCommitLogPathEnable = multiCommitLogPathEnable;
+ }
+
+ public List<String> getCommitLogStorePaths() {
+ return commitLogStorePaths;
+ }
+
+ public void setCommitLogStorePaths(String commitLogStorePaths) {
+ String[] tokens = commitLogStorePaths.trim().split(":");
+ List<String> pathList = Arrays.asList(tokens);
+ Collections.sort(pathList);
+ this.commitLogStorePaths = pathList;
+ }
+
+ public void setCommitLogStorePaths(List<String> commitLogStorePaths) {
+ this.commitLogStorePaths = commitLogStorePaths;
+ }
+
+ public List<String> getReadOnlyCommitLogStorePaths() {
+ return readOnlyCommitLogStorePaths;
+ }
+
+ public void setReadOnlyCommitLogStorePaths(List<String> readOnlyCommitLogStorePaths) {
+ this.readOnlyCommitLogStorePaths = readOnlyCommitLogStorePaths;
+ }
+
+ public void setReadOnlyCommitLogStorePaths(String readOnlyCommitLogStorePaths) {
+ String[] tokens = readOnlyCommitLogStorePaths.trim().split(":");
+ List<String> pathList = Arrays.asList(tokens);
+ Collections.sort(pathList);
+ this.readOnlyCommitLogStorePaths = pathList;
+ }
}
diff --git a/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java
new file mode 100644
index 0000000..1d0a6e5
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/MultiPathMappedFileQueueTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+public class MultiPathMappedFileQueueTest {
+
+ @Test
+ public void testGetLastMappedFile() {
+ final byte[] fixedMsg = new byte[1024];
+
+ MessageStoreConfig config = new MessageStoreConfig();
+ config.setMultiCommitLogPathEnable(true);
+ config.setCommitLogStorePaths("target/unit_test_store/a/:target/unit_test_store/b/:target/unit_test_store/c/");
+ MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null);
+ for (int i = 0; i < 1024; i++) {
+ MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i);
+ assertThat(mappedFile).isNotNull();
+ assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
+ int idx = i % config.getCommitLogStorePaths().size();
+ assertThat(mappedFile.getFileName().startsWith(config.getCommitLogStorePaths().get(idx))).isTrue();
+ }
+ mappedFileQueue.shutdown(1000);
+ mappedFileQueue.destroy();
+ }
+
+ @Test
+ public void testLoadReadOnlyMappedFiles() {
+ {
+ //create old mapped files
+ final byte[] fixedMsg = new byte[1024];
+ MessageStoreConfig config = new MessageStoreConfig();
+ config.setMultiCommitLogPathEnable(true);
+ config.setCommitLogStorePaths("target/unit_test_store/a/:target/unit_test_store/b/:target/unit_test_store/c/");
+ MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null);
+ for (int i = 0; i < 1024; i++) {
+ MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i);
+ assertThat(mappedFile).isNotNull();
+ assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
+ int idx = i % config.getCommitLogStorePaths().size();
+ assertThat(mappedFile.getFileName().startsWith(config.getCommitLogStorePaths().get(idx))).isTrue();
+ }
+ mappedFileQueue.shutdown(1000);
+ }
+
+ // test load and readonly
+ MessageStoreConfig config = new MessageStoreConfig();
+ config.setMultiCommitLogPathEnable(true);
+ config.setCommitLogStorePaths("target/unit_test_store/b/");
+ config.setReadOnlyCommitLogStorePaths("target/unit_test_store/a:target/unit_test_store/c");
+ MultiPathMappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null);
+
+ mappedFileQueue.load();
+
+ assertThat(mappedFileQueue.mappedFiles.size()).isEqualTo(1024);
+ mappedFileQueue.destroy();
+
+ }
+
+ @Test
+ public void testUpdatePathsOnline() {
+ final byte[] fixedMsg = new byte[1024];
+
+ MessageStoreConfig config = new MessageStoreConfig();
+ config.setMultiCommitLogPathEnable(true);
+ config.setCommitLogStorePaths("target/unit_test_store/a/:target/unit_test_store/b/:target/unit_test_store/c/");
+ MappedFileQueue mappedFileQueue = new MultiPathMappedFileQueue(config, 1024, null);
+ for (int i = 0; i < 1024; i++) {
+ MappedFile mappedFile = mappedFileQueue.getLastMappedFile(fixedMsg.length * i);
+ assertThat(mappedFile).isNotNull();
+ assertThat(mappedFile.appendMessage(fixedMsg)).isTrue();
+ int idx = i % config.getCommitLogStorePaths().size();
+ assertThat(mappedFile.getFileName().startsWith(config.getCommitLogStorePaths().get(idx))).isTrue();
+
+ if (i == 500) {
+ config.setCommitLogStorePaths("target/unit_test_store/a/:target/unit_test_store/b/");
+ assertThat(config.getCommitLogStorePaths().size()).isEqualTo(2);
+ }
+ }
+ mappedFileQueue.shutdown(1000);
+ mappedFileQueue.destroy();
+ }
+}
\ No newline at end of file