You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/05/25 01:15:33 UTC
[iotdb] branch master updated: [IOTDB-3232] Implement read interface for WAL (#5993)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5f8c634848 [IOTDB-3232] Implement read interface for WAL (#5993)
5f8c634848 is described below
commit 5f8c63484881fd22f1d854d9f3f48da994700d51
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Wed May 25 09:15:25 2022 +0800
[IOTDB-3232] Implement read interface for WAL (#5993)
---
.../iotdb/consensus/wal/ConsensusReqReader.java | 24 +-
.../apache/iotdb/commons/conf/IoTDBConstant.java | 3 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 +
.../iotdb/db/engine/storagegroup/DataRegion.java | 15 +-
.../db/engine/storagegroup/TsFileProcessor.java | 4 +-
.../plan/node/write/InsertMultiTabletsNode.java | 15 +-
.../plan/planner/plan/node/write/InsertNode.java | 6 +-
.../planner/plan/node/write/InsertRowsNode.java | 9 +
.../plan/node/write/InsertRowsOfOneDeviceNode.java | 9 +
.../db/service/metrics/predefined/FileMetrics.java | 3 +-
.../java/org/apache/iotdb/db/tools/WalChecker.java | 13 +-
.../java/org/apache/iotdb/db/wal/WALManager.java | 111 +----
.../allocation/AbstractNodeAllocationStrategy.java | 79 ++++
.../db/wal/allocation/FirstCreateStrategy.java | 105 +++++
.../NodeAllocationStrategy.java} | 26 +-
.../db/wal/allocation/RoundRobinStrategy.java | 106 +++++
.../iotdb/db/wal/buffer/AbstractWALBuffer.java | 11 +-
.../org/apache/iotdb/db/wal/buffer/IWALBuffer.java | 13 +-
.../org/apache/iotdb/db/wal/buffer/WALBuffer.java | 59 ++-
.../iotdb/db/wal/checkpoint/CheckpointManager.java | 13 +-
.../apache/iotdb/db/wal/io/CheckpointWriter.java | 30 --
.../java/org/apache/iotdb/db/wal/io/WALWriter.java | 29 --
.../java/org/apache/iotdb/db/wal/node/WALNode.java | 434 +++++++++++++++--
.../db/wal/recover/CheckpointRecoverUtils.java | 12 +-
.../iotdb/db/wal/recover/WALNodeRecoverTask.java | 35 +-
.../iotdb/db/wal/recover/WALRecoverManager.java | 3 +-
.../CheckpointFileUtils.java} | 50 +-
.../apache/iotdb/db/wal/utils/WALFileUtils.java | 134 ++++++
.../org/apache/iotdb/db/tools/WalCheckerTest.java | 9 +-
.../org/apache/iotdb/db/wal/DisableWALTest.java | 2 +-
.../org/apache/iotdb/db/wal/WALManagerTest.java | 32 +-
.../FirstCreateStrategyTest.java} | 79 +++-
.../RoundRobinStrategyTest.java} | 47 +-
.../iotdb/db/wal/buffer/WALBufferCommonTest.java | 4 +-
.../db/wal/checkpoint/CheckpointManagerTest.java | 17 +-
.../iotdb/db/wal/node/ConsensusReqReaderTest.java | 516 +++++++++++++++++++++
.../org/apache/iotdb/db/wal/node/WALNodeTest.java | 18 +-
.../iotdb/db/wal/utils/WALFileUtilsTest.java | 215 +++++++++
38 files changed, 1954 insertions(+), 347 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/wal/ConsensusReqReader.java b/consensus/src/main/java/org/apache/iotdb/consensus/wal/ConsensusReqReader.java
index 1b1929c8c8..d27e9b93e5 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/wal/ConsensusReqReader.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/wal/ConsensusReqReader.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/** This interface provides search interface for consensus requests via index. */
@@ -30,7 +31,7 @@ public interface ConsensusReqReader {
* Gets the consensus request at the specified position.
*
* @param index index of the consensus request to return
- * @return the consensus request at the specified position
+ * @return the consensus request at the specified position, null if the request doesn't exist
*/
IConsensusRequest getReq(long index);
@@ -57,23 +58,24 @@ public interface ConsensusReqReader {
/** Like {@link Iterator#hasNext()} */
boolean hasNext();
- /** Like {@link Iterator#next()} */
+ /**
+ * Like {@link Iterator#next()}
+ *
+ * @throws java.util.NoSuchElementException if the iteration has no more elements, wait a moment
+ * or call {@link this#waitForNextReady} for more elements
+ */
IConsensusRequest next();
/**
- * Returns the next element in the iteration, blocked until next element is available.
- *
- * @return the next element in the iteration
+ * Wait for the next element in the iteration ready, blocked until next element is available.
*/
- IConsensusRequest waitForNext() throws InterruptedException;
+ void waitForNextReady() throws InterruptedException;
/**
- * Returns the next element in the iteration, blocked until next element is available or a
- * specified amount of time has elapsed.
- *
- * @return the next element in the iteration
+ * Wait for the next element in the iteration ready, blocked until next element is available or
+ * a specified amount of time has elapsed.
*/
- IConsensusRequest waitForNext(long timeout) throws InterruptedException, TimeoutException;
+ void waitForNextReady(long time, TimeUnit unit) throws InterruptedException, TimeoutException;
/**
* Skips to target position of next element in the iteration <br>
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index 50f6e90b40..0a877d95ac 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -211,8 +211,11 @@ public class IoTDBConstant {
public static final String COMPACTION_MODIFICATION_FILE_NAME_FROM_OLD = "merge.mods";
// write ahead log
+ public static final String WAL_FILE_PREFIX = "_";
public static final String WAL_FILE_SUFFIX = ".wal";
public static final String WAL_CHECKPOINT_FILE_SUFFIX = ".checkpoint";
+ public static final String WAL_VERSION_ID = "versionId";
+ public static final String WAL_START_SEARCH_INDEX = "startSearchIndex";
// client version number
public enum ClientVersion {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index e345290846..cf3e35f044 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -181,6 +181,9 @@ public class IoTDBConfig {
/** Size threshold of each wal file. Unit: byte */
private volatile long walFileSizeThresholdInByte = 10 * 1024 * 1024;
+ /** Size threshold of each checkpoint file. Unit: byte */
+ private volatile long checkpointFileSizeThresholdInByte = 3 * 1024 * 1024;
+
/** Minimum ratio of effective information in wal files */
private volatile double walMinEffectiveInfoRatio = 0.1;
@@ -1463,6 +1466,14 @@ public class IoTDBConfig {
this.walFileSizeThresholdInByte = walFileSizeThresholdInByte;
}
+ public long getCheckpointFileSizeThresholdInByte() {
+ return checkpointFileSizeThresholdInByte;
+ }
+
+ public void setCheckpointFileSizeThresholdInByte(long checkpointFileSizeThresholdInByte) {
+ this.checkpointFileSizeThresholdInByte = checkpointFileSizeThresholdInByte;
+ }
+
public double getWalMinEffectiveInfoRatio() {
return walMinEffectiveInfoRatio;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index e897ae4856..7bb7abe855 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -82,6 +82,8 @@ import org.apache.iotdb.db.sync.sender.manager.TsFileSyncManager;
import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool;
import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
import org.apache.iotdb.db.utils.UpgradeUtils;
+import org.apache.iotdb.db.wal.WALManager;
+import org.apache.iotdb.db.wal.node.IWALNode;
import org.apache.iotdb.db.wal.recover.WALRecoverManager;
import org.apache.iotdb.db.wal.recover.file.SealedTsFileRecoverPerformer;
import org.apache.iotdb.db.wal.recover.file.UnsealedTsFileRecoverPerformer;
@@ -1437,7 +1439,7 @@ public class DataRegion {
if (sequence) {
tsFileProcessor =
new TsFileProcessor(
- logicalStorageGroupName + File.separator + dataRegionId,
+ logicalStorageGroupName + FILE_NAME_SEPARATOR + dataRegionId,
fsFactory.getFileWithParent(filePath),
storageGroupInfo,
this::closeUnsealedTsFileProcessorCallBack,
@@ -1446,7 +1448,7 @@ public class DataRegion {
} else {
tsFileProcessor =
new TsFileProcessor(
- logicalStorageGroupName + File.separator + dataRegionId,
+ logicalStorageGroupName + FILE_NAME_SEPARATOR + dataRegionId,
fsFactory.getFileWithParent(filePath),
storageGroupInfo,
this::closeUnsealedTsFileProcessorCallBack,
@@ -3502,6 +3504,15 @@ public class DataRegion {
return idTable;
}
+ public IWALNode getWALNode() {
+ if (!config.isClusterMode()) {
+ throw new UnsupportedOperationException();
+ }
+ // identifier should be same with getTsFileProcessor method
+ return WALManager.getInstance()
+ .applyForWALNode(logicalStorageGroupName + FILE_NAME_SEPARATOR + dataRegionId);
+ }
+
@TestOnly
public ILastFlushTimeManager getLastFlushTimeManager() {
return lastFlushTimeManager;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 8cac805ba2..37d67e1f3b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -181,7 +181,7 @@ public class TsFileProcessor {
this.writer = new RestorableTsFileIOWriter(tsfile);
this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback;
this.sequence = sequence;
- this.walNode = WALManager.getInstance().applyForWALNode();
+ this.walNode = WALManager.getInstance().applyForWALNode(storageGroupName);
flushListeners.add(FlushListener.DefaultMemTableFLushListener.INSTANCE);
flushListeners.add(this.walNode);
closeFileListeners.add(closeTsFileCallback);
@@ -203,7 +203,7 @@ public class TsFileProcessor {
this.writer = writer;
this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback;
this.sequence = sequence;
- this.walNode = WALManager.getInstance().applyForWALNode();
+ this.walNode = WALManager.getInstance().applyForWALNode(storageGroupName);
flushListeners.add(FlushListener.DefaultMemTableFLushListener.INSTANCE);
flushListeners.add(this.walNode);
closeFileListeners.add(closeUnsealedTsFileProcessor);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index 41c37e7963..aff9170d6e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -96,11 +96,20 @@ public class InsertMultiTabletsNode extends InsertNode implements BatchInsertNod
insertTabletNodeList = new ArrayList<>();
}
+ public InsertMultiTabletsNode(
+ PlanNodeId id,
+ List<Integer> parentInsertTabletNodeIndexList,
+ List<InsertTabletNode> insertTabletNodeList) {
+ super(id);
+ this.parentInsertTabletNodeIndexList = parentInsertTabletNodeIndexList;
+ this.insertTabletNodeList = insertTabletNodeList;
+ }
+
public List<Integer> getParentInsertTabletNodeIndexList() {
return parentInsertTabletNodeIndexList;
}
- public void setParentInsertTabletNodeIndexList(List<Integer> parentInsertTabletNodeIndexList) {
+ private void setParentInsertTabletNodeIndexList(List<Integer> parentInsertTabletNodeIndexList) {
this.parentInsertTabletNodeIndexList = parentInsertTabletNodeIndexList;
}
@@ -108,7 +117,7 @@ public class InsertMultiTabletsNode extends InsertNode implements BatchInsertNod
return insertTabletNodeList;
}
- public void setInsertTabletNodeList(List<InsertTabletNode> insertTabletNodeList) {
+ private void setInsertTabletNodeList(List<InsertTabletNode> insertTabletNodeList) {
this.insertTabletNodeList = insertTabletNodeList;
}
@@ -119,11 +128,13 @@ public class InsertMultiTabletsNode extends InsertNode implements BatchInsertNod
@Override
public void setSearchIndex(long index) {
+ searchIndex = index;
insertTabletNodeList.forEach(plan -> plan.setSearchIndex(index));
}
@Override
public void setSafelyDeletedSearchIndex(long index) {
+ safelyDeletedSearchIndex = index;
insertTabletNodeList.forEach(plan -> plan.setSafelyDeletedSearchIndex(index));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index 1fa2195dde..f5b15139e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -71,7 +71,10 @@ public abstract class InsertNode extends WritePlanNode implements IConsensusRequ
*/
protected IDeviceID deviceID;
- /** this index is used by wal search, its order should be protected by the upper layer */
+ /**
+ * this index is used by wal search, its order should be protected by the upper layer, and the
+ * value should start from 1
+ */
protected long searchIndex = NO_CONSENSUS_INDEX;
/**
* this index pass info to wal, indicating that insert nodes whose search index are before this
@@ -155,6 +158,7 @@ public abstract class InsertNode extends WritePlanNode implements IConsensusRequ
return searchIndex;
}
+ /** Search index should start from 1 */
public void setSearchIndex(long searchIndex) {
this.searchIndex = searchIndex;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
index f417ebc578..033b343541 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
@@ -63,6 +63,13 @@ public class InsertRowsNode extends InsertNode implements BatchInsertNode {
insertRowNodeIndexList = new ArrayList<>();
}
+ public InsertRowsNode(
+ PlanNodeId id, List<Integer> insertRowNodeIndexList, List<InsertRowNode> insertRowNodeList) {
+ super(id);
+ this.insertRowNodeIndexList = insertRowNodeIndexList;
+ this.insertRowNodeList = insertRowNodeList;
+ }
+
/** record the result of insert rows */
private Map<Integer, TSStatus> results = new HashMap<>();
@@ -89,11 +96,13 @@ public class InsertRowsNode extends InsertNode implements BatchInsertNode {
@Override
public void setSearchIndex(long index) {
+ searchIndex = index;
insertRowNodeList.forEach(plan -> plan.setSearchIndex(index));
}
@Override
public void setSafelyDeletedSearchIndex(long index) {
+ safelyDeletedSearchIndex = index;
insertRowNodeList.forEach(plan -> plan.setSafelyDeletedSearchIndex(index));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 49a6093158..bc31abf10b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -72,17 +72,26 @@ public class InsertRowsOfOneDeviceNode extends InsertNode implements BatchInsert
insertRowNodeList = new ArrayList<>();
}
+ public InsertRowsOfOneDeviceNode(
+ PlanNodeId id, List<Integer> insertRowNodeIndexList, List<InsertRowNode> insertRowNodeList) {
+ super(id);
+ this.insertRowNodeIndexList = insertRowNodeIndexList;
+ this.insertRowNodeList = insertRowNodeList;
+ }
+
public Map<Integer, TSStatus> getResults() {
return results;
}
@Override
public void setSearchIndex(long index) {
+ searchIndex = index;
insertRowNodeList.forEach(plan -> plan.setSearchIndex(index));
}
@Override
public void setSafelyDeletedSearchIndex(long index) {
+ safelyDeletedSearchIndex = index;
insertRowNodeList.forEach(plan -> plan.setSafelyDeletedSearchIndex(index));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/predefined/FileMetrics.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/predefined/FileMetrics.java
index 0cf0c19879..3e846c132e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/metrics/predefined/FileMetrics.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/predefined/FileMetrics.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.metrics.enums.Metric;
import org.apache.iotdb.db.service.metrics.enums.Tag;
-import org.apache.iotdb.db.wal.node.WALNode;
import org.apache.iotdb.metrics.MetricManager;
import org.apache.iotdb.metrics.predefined.IMetricSet;
import org.apache.iotdb.metrics.utils.MetricLevel;
@@ -83,7 +82,7 @@ public class FileMetrics implements IMetricSet {
.mapToLong(
dir -> {
File walFolder = new File(dir);
- File[] walNodeFolders = walFolder.listFiles(WALNode::walNodeFolderNameFilter);
+ File[] walNodeFolders = walFolder.listFiles(File::isDirectory);
for (File walNodeFolder : walNodeFolders) {
if (walNodeFolder.exists() && walNodeFolder.isDirectory()) {
return org.apache.commons.io.FileUtils.listFiles(
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/WalChecker.java b/server/src/main/java/org/apache/iotdb/db/tools/WalChecker.java
index ef50112c43..f982aae06b 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/WalChecker.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/WalChecker.java
@@ -22,8 +22,7 @@ import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.db.exception.SystemCheckException;
import org.apache.iotdb.db.wal.buffer.WALEntry;
-import org.apache.iotdb.db.wal.io.WALWriter;
-import org.apache.iotdb.db.wal.node.WALNode;
+import org.apache.iotdb.db.wal.utils.WALFileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,13 +63,7 @@ public class WalChecker {
throw new SystemCheckException(walFolder);
}
- File[] walNodeFolders =
- walFolderFile.listFiles(
- (dir, name) -> {
- File walNodeFolder = SystemFileFactory.INSTANCE.getFile(dir, name);
- return walNodeFolder.isDirectory()
- && WALNode.WAL_NODE_FOLDER_PATTERN.matcher(name).find();
- });
+ File[] walNodeFolders = walFolderFile.listFiles(File::isDirectory);
if (walNodeFolders == null || walNodeFolders.length == 0) {
logger.info("No sub-directories under the given directory, check ends");
return Collections.emptyList();
@@ -80,7 +73,7 @@ public class WalChecker {
for (int dirIndex = 0; dirIndex < walNodeFolders.length; dirIndex++) {
File walNodeFolder = walNodeFolders[dirIndex];
logger.info("Checking the No.{} directory {}", dirIndex, walNodeFolder.getName());
- File[] walFiles = walNodeFolder.listFiles(WALWriter::walFilenameFilter);
+ File[] walFiles = WALFileUtils.listAllWALFiles(walNodeFolder);
if (walFiles == null) {
continue;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java b/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
index 8468fc6cc1..f1c6cda092 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/WALManager.java
@@ -26,9 +26,9 @@ import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.directories.FolderManager;
-import org.apache.iotdb.db.conf.directories.strategy.DirectoryStrategyType;
-import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.wal.allocation.FirstCreateStrategy;
+import org.apache.iotdb.db.wal.allocation.NodeAllocationStrategy;
+import org.apache.iotdb.db.wal.allocation.RoundRobinStrategy;
import org.apache.iotdb.db.wal.node.IWALNode;
import org.apache.iotdb.db.wal.node.WALFakeNode;
import org.apache.iotdb.db.wal.node.WALNode;
@@ -37,82 +37,47 @@ import org.apache.iotdb.db.wal.utils.WALMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-/** This class is used to manage all wal nodes */
+/** This class is used to manage and allocate wal nodes */
public class WALManager implements IService {
private static final Logger logger = LoggerFactory.getLogger(WALManager.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final int MAX_WAL_NODE_NUM =
config.getMaxWalNodesNum() > 0 ? config.getMaxWalNodesNum() : config.getWalDirs().length * 2;
- /** manage wal folders */
- private FolderManager folderManager;
- /** protect concurrent safety of wal nodes, including walNodes, nodeCursor and nodeIdCounter */
- private final Lock nodesLock = new ReentrantLock();
- // region these variables should be protected by nodesLock
- /** wal nodes, the max number of wal nodes is MAX_WAL_NUM */
- private final List<WALNode> walNodes = new ArrayList<>(MAX_WAL_NODE_NUM);
- /** help allocate node for users */
- private int nodeCursor = -1;
- /** each wal node has a unique long value identifier */
- private long nodeIdCounter = -1;
- // endregion
+ /** manage all wal nodes and decide how to allocate them */
+ private final NodeAllocationStrategy walNodesManager;
/** single thread to delete old .wal files */
private ScheduledExecutorService walDeleteThread;
- private WALManager() {}
+ private WALManager() {
+ if (config.isClusterMode()) {
+ walNodesManager = new FirstCreateStrategy();
+ } else {
+ walNodesManager = new RoundRobinStrategy(MAX_WAL_NODE_NUM);
+ }
+ }
/** Apply for a wal node */
- public IWALNode applyForWALNode() {
+ public IWALNode applyForWALNode(String applicantUniqueId) {
if (config.getWalMode() == WALMode.DISABLE) {
return WALFakeNode.getSuccessInstance();
}
- WALNode selectedNode;
- nodesLock.lock();
- try {
- if (walNodes.size() < MAX_WAL_NODE_NUM) {
- nodeIdCounter++;
- String identifier = String.valueOf(nodeIdCounter);
- String folder;
- // get wal folder
- try {
- folder = folderManager.getNextFolder();
- } catch (DiskSpaceInsufficientException e) {
- logger.error("All disks of wal folders are full, change system mode to read-only.", e);
- config.setReadOnly(true);
- return WALFakeNode.getFailureInstance(e);
- }
- folder = folder + File.separator + identifier;
- // create new wal node
- try {
- selectedNode = new WALNode(identifier, folder);
- } catch (FileNotFoundException e) {
- logger.error("Fail to create wal node", e);
- return WALFakeNode.getFailureInstance(e);
- }
- walNodes.add(selectedNode);
- } else {
- // select next wal node by sequence order
- nodeCursor = (nodeCursor + 1) % MAX_WAL_NODE_NUM;
- selectedNode = walNodes.get(nodeCursor);
- }
- } finally {
- nodesLock.unlock();
+ return walNodesManager.applyForWALNode(applicantUniqueId);
+ }
+
+ public void registerWALNode(String applicantUniqueId, String logDirectory) {
+ if (config.getWalMode() == WALMode.DISABLE || !config.isClusterMode()) {
+ return;
}
- return selectedNode;
+
+ ((FirstCreateStrategy) walNodesManager).registerWALNode(applicantUniqueId, logDirectory);
}
@Override
@@ -122,9 +87,6 @@ public class WALManager implements IService {
}
try {
- folderManager =
- new FolderManager(
- Arrays.asList(config.getWalDirs()), DirectoryStrategyType.SEQUENCE_STRATEGY);
walDeleteThread =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(ThreadName.WAL_DELETE.getName());
walDeleteThread.scheduleWithFixedDelay(
@@ -179,26 +141,11 @@ public class WALManager implements IService {
}
private void deleteOutdatedFiles() {
- for (WALNode walNode : getNodesSnapshot()) {
+ for (WALNode walNode : walNodesManager.getNodesSnapshot()) {
walNode.deleteOutdatedFiles();
}
}
- private List<WALNode> getNodesSnapshot() {
- List<WALNode> snapshot;
- if (walNodes.size() < MAX_WAL_NODE_NUM) {
- nodesLock.lock();
- try {
- snapshot = new ArrayList<>(walNodes);
- } finally {
- nodesLock.unlock();
- }
- } else {
- snapshot = walNodes;
- }
- return snapshot;
- }
-
@Override
public void stop() {
if (config.getWalMode() == WALMode.DISABLE) {
@@ -226,17 +173,7 @@ public class WALManager implements IService {
@TestOnly
public void clear() {
- nodesLock.lock();
- try {
- nodeCursor = -1;
- nodeIdCounter = -1;
- for (WALNode walNode : walNodes) {
- walNode.close();
- }
- walNodes.clear();
- } finally {
- nodesLock.unlock();
- }
+ walNodesManager.clear();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/allocation/AbstractNodeAllocationStrategy.java b/server/src/main/java/org/apache/iotdb/db/wal/allocation/AbstractNodeAllocationStrategy.java
new file mode 100644
index 0000000000..876d71eeb9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/wal/allocation/AbstractNodeAllocationStrategy.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.allocation;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.FolderManager;
+import org.apache.iotdb.db.conf.directories.strategy.DirectoryStrategyType;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.wal.node.IWALNode;
+import org.apache.iotdb.db.wal.node.WALFakeNode;
+import org.apache.iotdb.db.wal.node.WALNode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.Arrays;
+
+public abstract class AbstractNodeAllocationStrategy implements NodeAllocationStrategy {
+ private static final Logger logger =
+ LoggerFactory.getLogger(AbstractNodeAllocationStrategy.class);
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+ /** manage wal folders */
+ protected FolderManager folderManager;
+
+ public AbstractNodeAllocationStrategy() {
+ try {
+ folderManager =
+ new FolderManager(
+ Arrays.asList(config.getWalDirs()), DirectoryStrategyType.SEQUENCE_STRATEGY);
+ } catch (DiskSpaceInsufficientException e) {
+ logger.error("All disks of wal folders are full, change system mode to read-only.", e);
+ config.setReadOnly(true);
+ }
+ }
+
+ protected IWALNode createWALNode(String identifier) {
+ String folder;
+ // get wal folder
+ try {
+ folder = folderManager.getNextFolder();
+ } catch (DiskSpaceInsufficientException e) {
+ logger.error("All disks of wal folders are full, change system mode to read-only.", e);
+ config.setReadOnly(true);
+ return WALFakeNode.getFailureInstance(e);
+ }
+ folder = folder + File.separator + identifier;
+ // create new wal node
+ return createWALNode(identifier, folder);
+ }
+
+ protected IWALNode createWALNode(String identifier, String folder) {
+ try {
+ return new WALNode(identifier, folder);
+ } catch (FileNotFoundException e) {
+ logger.error("Fail to create wal node", e);
+ return WALFakeNode.getFailureInstance(e);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java b/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java
new file mode 100644
index 0000000000..043a5797ca
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.allocation;
+
+import org.apache.iotdb.db.wal.node.IWALNode;
+import org.apache.iotdb.db.wal.node.WALNode;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This strategy creates one wal node for each unique identifier. In other words, each identifier
+ * (like data region) has its own wal node.
+ */
+public class FirstCreateStrategy extends AbstractNodeAllocationStrategy {
+ /** protect concurrent safety of wal nodes, including walNodes, nodeCursor and nodeIdCounter */
+ private final Lock nodesLock = new ReentrantLock();
+ // region these variables should be protected by nodesLock
+ /** wal nodes */
+ private final Map<String, WALNode> identifier2Nodes = new HashMap<>();
+ // endregion
+
+ @Override
+ public IWALNode applyForWALNode(String applicantUniqueId) {
+ nodesLock.lock();
+ try {
+ if (identifier2Nodes.containsKey(applicantUniqueId)) {
+ return identifier2Nodes.get(applicantUniqueId);
+ }
+
+ IWALNode walNode = createWALNode(applicantUniqueId);
+ if (walNode instanceof WALNode) {
+ // avoid deletion
+ ((WALNode) walNode).setSafelyDeletedSearchIndex(Long.MIN_VALUE);
+ identifier2Nodes.put(applicantUniqueId, (WALNode) walNode);
+ }
+ return walNode;
+ } finally {
+ nodesLock.unlock();
+ }
+ }
+
+ public void registerWALNode(String applicantUniqueId, String logDirectory) {
+ nodesLock.lock();
+ try {
+ if (identifier2Nodes.containsKey(applicantUniqueId)) {
+ return;
+ }
+
+ IWALNode walNode = createWALNode(applicantUniqueId, logDirectory);
+ if (walNode instanceof WALNode) {
+ // avoid deletion
+ ((WALNode) walNode).setSafelyDeletedSearchIndex(Long.MIN_VALUE);
+ identifier2Nodes.put(applicantUniqueId, (WALNode) walNode);
+ }
+ } finally {
+ nodesLock.unlock();
+ }
+ }
+
+ @Override
+ public List<WALNode> getNodesSnapshot() {
+ List<WALNode> snapshot;
+ nodesLock.lock();
+ try {
+ snapshot = new ArrayList<>(identifier2Nodes.values());
+ } finally {
+ nodesLock.unlock();
+ }
+ return snapshot;
+ }
+
+ @Override
+ public void clear() {
+ nodesLock.lock();
+ try {
+ for (WALNode walNode : identifier2Nodes.values()) {
+ walNode.close();
+ }
+ identifier2Nodes.clear();
+ } finally {
+ nodesLock.unlock();
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/allocation/NodeAllocationStrategy.java
similarity index 64%
copy from server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java
copy to server/src/main/java/org/apache/iotdb/db/wal/allocation/NodeAllocationStrategy.java
index 9feef0963e..5ce3ca3ead 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/allocation/NodeAllocationStrategy.java
@@ -16,25 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.wal.buffer;
+package org.apache.iotdb.db.wal.allocation;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.wal.node.IWALNode;
+import org.apache.iotdb.db.wal.node.WALNode;
-/** Currently, there are 2 buffer types, including wal rolling buffer and wal segmented buffer. */
-public interface IWALBuffer extends AutoCloseable {
- /**
- * Write WALEntry into wal buffer.
- *
- * @param walEntry info will be written into wal buffer
- */
- void write(WALEntry walEntry);
+import java.util.List;
- /** Get current log version id */
- int getCurrentWALFileVersion();
-
- @Override
- void close();
+/** This interface */
+public interface NodeAllocationStrategy {
+ /** Allocate one wal node for the applicant */
+ IWALNode applyForWALNode(String applicantUniqueId);
+ /** Get all wal nodes */
+ List<WALNode> getNodesSnapshot();
@TestOnly
- boolean isAllWALEntriesConsumed();
+ void clear();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/allocation/RoundRobinStrategy.java b/server/src/main/java/org/apache/iotdb/db/wal/allocation/RoundRobinStrategy.java
new file mode 100644
index 0000000000..8bd57491cc
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/wal/allocation/RoundRobinStrategy.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.allocation;
+
+import org.apache.iotdb.db.wal.node.IWALNode;
+import org.apache.iotdb.db.wal.node.WALNode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This strategy creates n wal nodes and allocate them by round-robin strategy. In other words,
+ * several identifiers (like data regions) can share one wal node.
+ */
+public class RoundRobinStrategy extends AbstractNodeAllocationStrategy {
+ /** max wal nodes number */
+ private final int maxWalNodeNum;
+ /** protect concurrent safety of wal nodes, including walNodes, nodeCursor and nodeIdCounter */
+ private final Lock nodesLock = new ReentrantLock();
+ // region these variables should be protected by nodesLock
+ /** wal nodes, the max number of wal nodes is MAX_WAL_NUM */
+ private final List<WALNode> walNodes;
+ /** help allocate node for users */
+ private int nodeCursor = -1;
+ /** each wal node has a unique long value identifier */
+ private long nodeIdCounter = -1;
+ // endregion
+
+ public RoundRobinStrategy(int maxWalNodeNum) {
+ this.maxWalNodeNum = maxWalNodeNum;
+ this.walNodes = new ArrayList<>(maxWalNodeNum);
+ }
+
+ @Override
+ public IWALNode applyForWALNode(String applicantUniqueId) {
+ WALNode selectedNode;
+ nodesLock.lock();
+ try {
+ if (walNodes.size() < maxWalNodeNum) {
+ nodeIdCounter++;
+ IWALNode node = createWALNode(String.valueOf(nodeIdCounter));
+ if (!(node instanceof WALNode)) {
+ return node;
+ }
+ selectedNode = (WALNode) node;
+ walNodes.add(selectedNode);
+ } else {
+ // select next wal node by sequence order
+ nodeCursor = (nodeCursor + 1) % maxWalNodeNum;
+ selectedNode = walNodes.get(nodeCursor);
+ }
+ } finally {
+ nodesLock.unlock();
+ }
+ return selectedNode;
+ }
+
+ @Override
+ public List<WALNode> getNodesSnapshot() {
+ List<WALNode> snapshot;
+ if (walNodes.size() < maxWalNodeNum) {
+ nodesLock.lock();
+ try {
+ snapshot = new ArrayList<>(walNodes);
+ } finally {
+ nodesLock.unlock();
+ }
+ } else {
+ snapshot = walNodes;
+ }
+ return snapshot;
+ }
+
+ @Override
+ public void clear() {
+ nodesLock.lock();
+ try {
+ nodeCursor = -1;
+ nodeIdCounter = -1;
+ for (WALNode walNode : walNodes) {
+ walNode.close();
+ }
+ walNodes.clear();
+ } finally {
+ nodesLock.unlock();
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
index 4cf77c9656..127451b7d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/AbstractWALBuffer.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.wal.buffer;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.db.wal.io.ILogWriter;
import org.apache.iotdb.db.wal.io.WALWriter;
+import org.apache.iotdb.db.wal.utils.WALFileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +40,8 @@ public abstract class AbstractWALBuffer implements IWALBuffer {
protected final String logDirectory;
/** current wal file version id */
protected final AtomicInteger currentWALFileVersion = new AtomicInteger();
+ /** current search index */
+ protected volatile long currentSearchIndex = 0;
/** current wal file log writer */
protected volatile ILogWriter currentWALFileWriter;
@@ -52,7 +55,8 @@ public abstract class AbstractWALBuffer implements IWALBuffer {
currentWALFileWriter =
new WALWriter(
SystemFileFactory.INSTANCE.getFile(
- logDirectory, WALWriter.getLogFileName(currentWALFileVersion.get())));
+ logDirectory,
+ WALFileUtils.getLogFileName(currentWALFileVersion.get(), currentSearchIndex)));
}
@Override
@@ -61,11 +65,12 @@ public abstract class AbstractWALBuffer implements IWALBuffer {
}
/** Notice: only called by syncBufferThread and old log writer will be closed by this function. */
- protected void rollLogWriter() throws IOException {
+ protected void rollLogWriter(long searchIndex) throws IOException {
currentWALFileWriter.close();
File nextLogFile =
SystemFileFactory.INSTANCE.getFile(
- logDirectory, WALWriter.getLogFileName(currentWALFileVersion.incrementAndGet()));
+ logDirectory,
+ WALFileUtils.getLogFileName(currentWALFileVersion.incrementAndGet(), searchIndex));
currentWALFileWriter = new WALWriter(nextLogFile);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java
index 9feef0963e..ee57e51b2a 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/IWALBuffer.java
@@ -20,7 +20,12 @@ package org.apache.iotdb.db.wal.buffer;
import org.apache.iotdb.commons.utils.TestOnly;
-/** Currently, there are 2 buffer types, including wal rolling buffer and wal segmented buffer. */
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class serializes and flushes {@link WALEntry}. If search is enabled, the order of search
+ * index should be protected by the upper layer, and the value should start from 1.
+ */
public interface IWALBuffer extends AutoCloseable {
/**
* Write WALEntry into wal buffer.
@@ -35,6 +40,12 @@ public interface IWALBuffer extends AutoCloseable {
@Override
void close();
+ /** Wait for next flush operation done */
+ void waitForFlush() throws InterruptedException;
+
+ /** Wait for next flush operation done */
+ boolean waitForFlush(long time, TimeUnit unit) throws InterruptedException;
+
@TestOnly
boolean isAllWALEntriesConsumed();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
index 27c6ca15fa..d9e9d5007a 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/buffer/WALBuffer.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.utils.MmapUtil;
import org.apache.iotdb.db.wal.exception.WALNodeClosedException;
import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
@@ -172,7 +173,7 @@ public class WALBuffer extends AbstractWALBuffer {
// call fsync at last and set fsyncListeners
if (batchSize > 0) {
- fsyncWorkingBuffer(fsyncListeners, rollWALFileWriterListener);
+ fsyncWorkingBuffer(currentSearchIndex, fsyncListeners, rollWALFileWriterListener);
}
}
@@ -203,6 +204,14 @@ public class WALBuffer extends AbstractWALBuffer {
walEntry.getWalFlushListener().fail(e);
return false;
}
+ // update search index
+ if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
+ || walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
+ InsertNode insertNode = (InsertNode) walEntry.getValue();
+ if (insertNode.getSearchIndex() != InsertNode.NO_CONSENSUS_INDEX) {
+ currentSearchIndex = insertNode.getSearchIndex();
+ }
+ }
return true;
}
@@ -214,12 +223,12 @@ public class WALBuffer extends AbstractWALBuffer {
switch (signalWALEntry.getSignalType()) {
case ROLL_WAL_LOG_WRITER_SIGNAL:
rollWALFileWriterListener = signalWALEntry.getWalFlushListener();
- fsyncWorkingBuffer(fsyncListeners, rollWALFileWriterListener);
+ fsyncWorkingBuffer(currentSearchIndex, fsyncListeners, rollWALFileWriterListener);
return true;
case CLOSE_SIGNAL:
boolean dataExists = batchSize > 0;
if (dataExists) {
- fsyncWorkingBuffer(fsyncListeners, rollWALFileWriterListener);
+ fsyncWorkingBuffer(currentSearchIndex, fsyncListeners, rollWALFileWriterListener);
}
return dataExists;
default:
@@ -240,7 +249,7 @@ public class WALBuffer extends AbstractWALBuffer {
}
private void rollBuffer() {
- syncWorkingBuffer();
+ syncWorkingBuffer(currentSearchIndex);
}
@Override
@@ -304,16 +313,19 @@ public class WALBuffer extends AbstractWALBuffer {
}
/** Notice: this method only called when buffer is exhausted by SerializeTask. */
- private void syncWorkingBuffer() {
+ private void syncWorkingBuffer(long searchIndex) {
switchWorkingBufferToFlushing();
- syncBufferThread.submit(new SyncBufferTask(false));
+ syncBufferThread.submit(new SyncBufferTask(searchIndex, false));
}
/** Notice: this method only called at the last of SerializeTask. */
private void fsyncWorkingBuffer(
- List<WALFlushListener> fsyncListeners, WALFlushListener rollWALFileWriterListener) {
+ long searchIndex,
+ List<WALFlushListener> fsyncListeners,
+ WALFlushListener rollWALFileWriterListener) {
switchWorkingBufferToFlushing();
- syncBufferThread.submit(new SyncBufferTask(true, fsyncListeners, rollWALFileWriterListener));
+ syncBufferThread.submit(
+ new SyncBufferTask(searchIndex, true, fsyncListeners, rollWALFileWriterListener));
}
// only called by serializeThread
@@ -341,18 +353,21 @@ public class WALBuffer extends AbstractWALBuffer {
* This task syncs syncingBuffer to disk. The precondition is that syncingBuffer cannot be null.
*/
private class SyncBufferTask implements Runnable {
+ private final long searchIndex;
private final boolean forceFlag;
private final List<WALFlushListener> fsyncListeners;
private final WALFlushListener rollWALFileWriterListener;
- public SyncBufferTask(boolean forceFlag) {
- this(forceFlag, null, null);
+ public SyncBufferTask(long searchIndex, boolean forceFlag) {
+ this(searchIndex, forceFlag, null, null);
}
public SyncBufferTask(
+ long searchIndex,
boolean forceFlag,
List<WALFlushListener> fsyncListeners,
WALFlushListener rollWALFileWriterListener) {
+ this.searchIndex = searchIndex;
this.forceFlag = forceFlag;
this.fsyncListeners = fsyncListeners == null ? Collections.emptyList() : fsyncListeners;
this.rollWALFileWriterListener = rollWALFileWriterListener;
@@ -396,7 +411,7 @@ public class WALBuffer extends AbstractWALBuffer {
if (rollWALFileWriterListener != null
|| (forceFlag
&& currentWALFileWriter.size() >= config.getWalFileSizeThresholdInByte())) {
- rollLogWriter();
+ rollLogWriter(searchIndex);
if (rollWALFileWriterListener != null) {
rollWALFileWriterListener.succeed();
}
@@ -422,7 +437,27 @@ public class WALBuffer extends AbstractWALBuffer {
// and there is only one buffer can be null between syncingBuffer and idleBuffer
idleBuffer = syncingBuffer;
syncingBuffer = null;
- idleBufferReadyCondition.signal();
+ idleBufferReadyCondition.signalAll();
+ } finally {
+ buffersLock.unlock();
+ }
+ }
+
+ @Override
+ public void waitForFlush() throws InterruptedException {
+ buffersLock.lock();
+ try {
+ idleBufferReadyCondition.await();
+ } finally {
+ buffersLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean waitForFlush(long time, TimeUnit unit) throws InterruptedException {
+ buffersLock.lock();
+ try {
+ return idleBufferReadyCondition.await(time, unit);
} finally {
buffersLock.unlock();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java b/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java
index 374d426a9f..ee981f70ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManager.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.wal.io.CheckpointWriter;
import org.apache.iotdb.db.wal.io.ILogWriter;
+import org.apache.iotdb.db.wal.utils.CheckpointFileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,9 +42,6 @@ import java.util.concurrent.locks.ReentrantLock;
/** This class is used to manage checkpoints of one wal node */
public class CheckpointManager implements AutoCloseable {
- /** use size limit to control WALEntry number in each file */
- public static final long LOG_SIZE_LIMIT = 3 * 1024 * 1024;
-
private static final Logger logger = LoggerFactory.getLogger(CheckpointManager.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -77,7 +75,7 @@ public class CheckpointManager implements AutoCloseable {
currentLogWriter =
new CheckpointWriter(
SystemFileFactory.INSTANCE.getFile(
- logDirectory, CheckpointWriter.getLogFileName(currentCheckPointFileVersion)));
+ logDirectory, CheckpointFileUtils.getLogFileName(currentCheckPointFileVersion)));
makeGlobalInfoCP();
}
@@ -169,7 +167,8 @@ public class CheckpointManager implements AutoCloseable {
currentLogWriter.force();
File oldFile =
SystemFileFactory.INSTANCE.getFile(
- logDirectory, CheckpointWriter.getLogFileName(currentCheckPointFileVersion - 1));
+ logDirectory,
+ CheckpointFileUtils.getLogFileName(currentCheckPointFileVersion - 1));
oldFile.delete();
}
} catch (IOException e) {
@@ -185,14 +184,14 @@ public class CheckpointManager implements AutoCloseable {
}
private boolean tryRollingLogWriter() throws IOException {
- if (currentLogWriter.size() < LOG_SIZE_LIMIT) {
+ if (currentLogWriter.size() < config.getCheckpointFileSizeThresholdInByte()) {
return false;
}
currentLogWriter.close();
currentCheckPointFileVersion++;
File nextLogFile =
SystemFileFactory.INSTANCE.getFile(
- logDirectory, CheckpointWriter.getLogFileName(currentCheckPointFileVersion));
+ logDirectory, CheckpointFileUtils.getLogFileName(currentCheckPointFileVersion));
currentLogWriter = new CheckpointWriter(nextLogFile);
return true;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/CheckpointWriter.java b/server/src/main/java/org/apache/iotdb/db/wal/io/CheckpointWriter.java
index 8fa34c0e6a..e07a107b3c 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/io/CheckpointWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/CheckpointWriter.java
@@ -18,43 +18,13 @@
*/
package org.apache.iotdb.db.wal.io;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.wal.checkpoint.Checkpoint;
import java.io.File;
import java.io.FileNotFoundException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
/** CheckpointWriter writes the binary {@link Checkpoint} into .checkpoint file. */
public class CheckpointWriter extends LogWriter {
- public static final String FILE_SUFFIX = IoTDBConstant.WAL_CHECKPOINT_FILE_SUFFIX;
- public static final Pattern CHECKPOINT_FILE_NAME_PATTERN =
- Pattern.compile("_(?<versionId>\\d+)\\.checkpoint");
-
- /** Return true when this file is .checkpoint file */
- public static boolean checkpointFilenameFilter(File dir, String name) {
- return CHECKPOINT_FILE_NAME_PATTERN.matcher(name).find();
- }
-
- /**
- * Parse version id from filename
- *
- * @return Return {@link Integer#MIN_VALUE} when this file is not .checkpoint file
- */
- public static int parseVersionId(String filename) {
- Matcher matcher = CHECKPOINT_FILE_NAME_PATTERN.matcher(filename);
- if (matcher.find()) {
- return Integer.parseInt(matcher.group("versionId"));
- }
- return Integer.MIN_VALUE;
- }
-
- /** Get .checkpoint filename */
- public static String getLogFileName(long version) {
- return FILE_PREFIX + version + FILE_SUFFIX;
- }
-
public CheckpointWriter(File logFile) throws FileNotFoundException {
super(logFile);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java b/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java
index 34f620297f..9c9e6174b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/io/WALWriter.java
@@ -18,42 +18,13 @@
*/
package org.apache.iotdb.db.wal.io;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.wal.buffer.WALEntry;
import java.io.File;
import java.io.FileNotFoundException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
/** WALWriter writes the binary {@link WALEntry} into .wal file. */
public class WALWriter extends LogWriter {
- public static final String FILE_SUFFIX = IoTDBConstant.WAL_FILE_SUFFIX;
- public static final Pattern WAL_FILE_NAME_PATTERN = Pattern.compile("_(?<versionId>\\d+)\\.wal");
-
- /** Return true when this file is .wal file */
- public static boolean walFilenameFilter(File dir, String name) {
- return WAL_FILE_NAME_PATTERN.matcher(name).find();
- }
-
- /**
- * Parse version id from filename
- *
- * @return Return {@link Integer#MIN_VALUE} when this file is not .wal file
- */
- public static int parseVersionId(String filename) {
- Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(filename);
- if (matcher.find()) {
- return Integer.parseInt(matcher.group("versionId"));
- }
- return Integer.MIN_VALUE;
- }
-
- /** Get .wal filename */
- public static String getLogFileName(long version) {
- return FILE_PREFIX + version + FILE_SUFFIX;
- }
-
public WALWriter(File logFile) throws FileNotFoundException {
super(logFile);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index 6412658895..7b4d9dba8c 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.wal.node;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.path.PartialPath;
@@ -30,7 +31,12 @@ import org.apache.iotdb.db.engine.flush.FlushStatus;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
@@ -39,9 +45,11 @@ import org.apache.iotdb.db.wal.buffer.IWALBuffer;
import org.apache.iotdb.db.wal.buffer.SignalWALEntry;
import org.apache.iotdb.db.wal.buffer.WALBuffer;
import org.apache.iotdb.db.wal.buffer.WALEntry;
+import org.apache.iotdb.db.wal.buffer.WALEntryType;
import org.apache.iotdb.db.wal.checkpoint.CheckpointManager;
import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
-import org.apache.iotdb.db.wal.io.WALWriter;
+import org.apache.iotdb.db.wal.io.WALReader;
+import org.apache.iotdb.db.wal.utils.WALFileUtils;
import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.utils.TsFileUtils;
@@ -51,17 +59,28 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-/** This class encapsulates {@link IWALBuffer} and {@link CheckpointManager}. */
+/**
+ * This class encapsulates {@link IWALBuffer} and {@link CheckpointManager}. If search is enabled,
+ * the order of search index should be protected by the upper layer, and the value should start from
+ * 1.
+ */
public class WALNode implements IWALNode {
- public static final Pattern WAL_NODE_FOLDER_PATTERN = Pattern.compile("(?<nodeIdentifier>\\d+)");
+ public static final long DEFAULT_SAFELY_DELETED_SEARCH_INDEX =
+ InsertNode.DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
private static final Logger logger = LoggerFactory.getLogger(WALNode.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -69,7 +88,7 @@ public class WALNode implements IWALNode {
/** unique identifier of this WALNode */
private final String identifier;
/** directory to store this node's files */
- private final String logDirectory;
+ private final File logDirectory;
/** wal buffer */
private final IWALBuffer buffer;
/** manage checkpoints */
@@ -86,23 +105,19 @@ public class WALNode implements IWALNode {
private final AtomicLong totalCostOfFlushedMemTables = new AtomicLong();
/** version id -> cost sum of memTables flushed at this file version */
private final Map<Integer, Long> walFileVersionId2MemTablesTotalCost = new ConcurrentHashMap<>();
+ /** insert nodes whose search index are before this value can be deleted safely */
+ private volatile long safelyDeletedSearchIndex = DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
public WALNode(String identifier, String logDirectory) throws FileNotFoundException {
this.identifier = identifier;
- this.logDirectory = logDirectory;
- File logDirFile = SystemFileFactory.INSTANCE.getFile(logDirectory);
- if (!logDirFile.exists() && logDirFile.mkdirs()) {
+ this.logDirectory = SystemFileFactory.INSTANCE.getFile(logDirectory);
+ if (!this.logDirectory.exists() && this.logDirectory.mkdirs()) {
logger.info("create folder {} for wal node-{}.", logDirectory, identifier);
}
this.buffer = new WALBuffer(identifier, logDirectory);
this.checkpointManager = new CheckpointManager(identifier, logDirectory);
}
- /** Return true when this folder wal node folder */
- public static boolean walNodeFolderNameFilter(File dir, String name) {
- return WAL_NODE_FOLDER_PATTERN.matcher(name).find();
- }
-
@Override
public WALFlushListener log(int memTableId, InsertRowPlan insertRowPlan) {
WALEntry walEntry = new WALEntry(memTableId, insertRowPlan);
@@ -111,6 +126,9 @@ public class WALNode implements IWALNode {
@Override
public WALFlushListener log(int memTableId, InsertRowNode insertRowNode) {
+ if (insertRowNode.getSafelyDeletedSearchIndex() != DEFAULT_SAFELY_DELETED_SEARCH_INDEX) {
+ safelyDeletedSearchIndex = insertRowNode.getSafelyDeletedSearchIndex();
+ }
WALEntry walEntry = new WALEntry(memTableId, insertRowNode);
return log(walEntry);
}
@@ -125,6 +143,9 @@ public class WALNode implements IWALNode {
@Override
public WALFlushListener log(
int memTableId, InsertTabletNode insertTabletNode, int start, int end) {
+ if (insertTabletNode.getSafelyDeletedSearchIndex() != DEFAULT_SAFELY_DELETED_SEARCH_INDEX) {
+ safelyDeletedSearchIndex = insertTabletNode.getSafelyDeletedSearchIndex();
+ }
WALEntry walEntry = new WALEntry(memTableId, insertTabletNode, start, end);
return log(walEntry);
}
@@ -192,15 +213,7 @@ public class WALNode implements IWALNode {
firstValidVersionId = checkpointManager.getFirstValidWALVersionId();
if (firstValidVersionId == Integer.MIN_VALUE) {
// roll wal log writer to delete current wal file
- WALEntry rollWALFileSignal =
- new SignalWALEntry(SignalWALEntry.SignalType.ROLL_WAL_LOG_WRITER_SIGNAL, true);
- WALFlushListener walFlushListener = log(rollWALFileSignal);
- if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
- logger.error(
- "Fail to trigger rolling wal node-{}'s wal file log writer.",
- identifier,
- walFlushListener.getCause());
- }
+ rollWALFile();
// update firstValidVersionId
firstValidVersionId = checkpointManager.getFirstValidWALVersionId();
if (firstValidVersionId == Integer.MIN_VALUE) {
@@ -211,6 +224,11 @@ public class WALNode implements IWALNode {
// delete outdated files
deleteOutdatedFiles();
+ // wal is used to search, cannot optimize files deletion
+ if (safelyDeletedSearchIndex != DEFAULT_SAFELY_DELETED_SEARCH_INDEX) {
+ return;
+ }
+
// calculate effective information ratio
long costOfActiveMemTables = checkpointManager.getTotalCostOfActiveMemTables();
long costOfFlushedMemTables = totalCostOfFlushedMemTables.get();
@@ -236,15 +254,14 @@ public class WALNode implements IWALNode {
}
private void deleteOutdatedFiles() {
- File directory = SystemFileFactory.INSTANCE.getFile(logDirectory);
- File[] filesToDelete = directory.listFiles(this::filterFilesToDelete);
+ File[] filesToDelete = logDirectory.listFiles(this::filterFilesToDelete);
if (filesToDelete != null) {
for (File file : filesToDelete) {
if (!file.delete()) {
logger.info("Fail to delete outdated wal file {} of wal node-{}.", file, identifier);
}
// update totalRamCostOfFlushedMemTables
- int versionId = WALWriter.parseVersionId(file.getName());
+ int versionId = WALFileUtils.parseVersionId(file.getName());
Long memTableRamCostSum = walFileVersionId2MemTablesTotalCost.remove(versionId);
if (memTableRamCostSum != null) {
totalCostOfFlushedMemTables.addAndGet(-memTableRamCostSum);
@@ -254,12 +271,13 @@ public class WALNode implements IWALNode {
}
private boolean filterFilesToDelete(File dir, String name) {
- Pattern pattern = WALWriter.WAL_FILE_NAME_PATTERN;
+ Pattern pattern = WALFileUtils.WAL_FILE_NAME_PATTERN;
Matcher matcher = pattern.matcher(name);
boolean toDelete = false;
if (matcher.find()) {
- int versionId = Integer.parseInt(matcher.group("versionId"));
- toDelete = versionId < firstValidVersionId;
+ int versionId = Integer.parseInt(matcher.group(IoTDBConstant.WAL_VERSION_ID));
+ long startSearchIndex = Long.parseLong(matcher.group(IoTDBConstant.WAL_START_SEARCH_INDEX));
+ toDelete = versionId < firstValidVersionId && startSearchIndex < safelyDeletedSearchIndex;
}
return toDelete;
}
@@ -377,45 +395,378 @@ public class WALNode implements IWALNode {
// endregion
// region Search interfaces for consensus group
+ public void setSafelyDeletedSearchIndex(long safelyDeletedSearchIndex) {
+ this.safelyDeletedSearchIndex = safelyDeletedSearchIndex;
+ }
+
+ /**
+ * Merge insert nodes sharing same search index ( e.g. tablet-100, tablet-100, tablet-100 will be
+ * merged to one multi-tablet). <br>
+ * Notice: the continuity of insert nodes sharing same search index should be protected by the
+ * upper layer.
+ */
+ private static InsertNode mergeInsertNodes(List<InsertNode> insertNodes) {
+ int size = insertNodes.size();
+ if (size == 0) {
+ return null;
+ }
+ if (size == 1) {
+ return insertNodes.get(0);
+ }
+
+ InsertNode result;
+ if (insertNodes.get(0) instanceof InsertTabletNode) { // merge to InsertMultiTabletsNode
+ List<Integer> index = new ArrayList<>(size);
+ List<InsertTabletNode> insertTabletNodes = new ArrayList<>(size);
+ int i = 0;
+ for (InsertNode insertNode : insertNodes) {
+ insertTabletNodes.add((InsertTabletNode) insertNode);
+ index.add(i);
+ i++;
+ }
+ result = new InsertMultiTabletsNode(new PlanNodeId(""), index, insertTabletNodes);
+ } else { // merge to InsertRowsNode or InsertRowsOfOneDeviceNode
+ boolean sameDevice = true;
+ PartialPath device = insertNodes.get(0).getDevicePath();
+ List<Integer> index = new ArrayList<>(size);
+ List<InsertRowNode> insertRowNodes = new ArrayList<>(size);
+ int i = 0;
+ for (InsertNode insertNode : insertNodes) {
+ if (sameDevice && !insertNode.getDevicePath().equals(device)) {
+ sameDevice = false;
+ }
+ insertRowNodes.add((InsertRowNode) insertNode);
+ index.add(i);
+ i++;
+ }
+ result =
+ sameDevice
+ ? new InsertRowsOfOneDeviceNode(new PlanNodeId(""), index, insertRowNodes)
+ : new InsertRowsNode(new PlanNodeId(""), index, insertRowNodes);
+ }
+ result.setSearchIndex(insertNodes.get(0).getSearchIndex());
+
+ return result;
+ }
+
@Override
public IConsensusRequest getReq(long index) {
+ // find file
+ File[] currentFiles = WALFileUtils.listAllWALFiles(logDirectory);
+ WALFileUtils.ascSortByVersionId(currentFiles);
+ int fileIndex = WALFileUtils.binarySearchFileBySearchIndex(currentFiles, index);
+ if (fileIndex < 0) {
+ return null;
+ }
+ // find log
+ List<InsertNode> tmpNodes = new ArrayList<>();
+ for (int i = fileIndex; i < currentFiles.length; i++) {
+ // cannot find anymore
+ if (index < WALFileUtils.parseStartSearchIndex(currentFiles[i].getName())) {
+ if (!tmpNodes.isEmpty()) {
+ return mergeInsertNodes(tmpNodes);
+ } else {
+ break;
+ }
+ }
+
+ try (WALReader walReader = new WALReader(currentFiles[i])) {
+ while (walReader.hasNext()) {
+ WALEntry walEntry = walReader.next();
+ if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
+ || walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
+ InsertNode insertNode = (InsertNode) walEntry.getValue();
+ if (insertNode.getSearchIndex() == index) {
+ tmpNodes.add(insertNode);
+ } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
+ return mergeInsertNodes(tmpNodes);
+ }
+ } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
+ return mergeInsertNodes(tmpNodes);
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Fail to read wal from wal file {}", currentFiles[i], e);
+ }
+ }
+ // not find or not complete
return null;
}
@Override
public List<IConsensusRequest> getReqs(long startIndex, int num) {
- return null;
+ List<IConsensusRequest> result = new ArrayList<>(num);
+ // find file
+ File[] currentFiles = WALFileUtils.listAllWALFiles(logDirectory);
+ WALFileUtils.ascSortByVersionId(currentFiles);
+ int fileIndex = WALFileUtils.binarySearchFileBySearchIndex(currentFiles, startIndex);
+ if (fileIndex < 0) {
+ return result;
+ }
+ // find logs
+ long endIndex = startIndex + num - 1;
+ long targetIndex = startIndex;
+ List<InsertNode> tmpNodes = new ArrayList<>();
+ for (int i = fileIndex; i < currentFiles.length; i++) {
+ // cannot find anymore
+ if (endIndex < WALFileUtils.parseStartSearchIndex(currentFiles[i].getName())) {
+ if (!tmpNodes.isEmpty()) {
+ result.add(mergeInsertNodes(tmpNodes));
+ } else {
+ break;
+ }
+ }
+
+ try (WALReader walReader = new WALReader(currentFiles[i])) {
+ while (walReader.hasNext()) {
+ WALEntry walEntry = walReader.next();
+ if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
+ || walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
+ InsertNode insertNode = (InsertNode) walEntry.getValue();
+ if (insertNode.getSearchIndex() == targetIndex) {
+ tmpNodes.add(insertNode);
+ } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
+ result.add(mergeInsertNodes(tmpNodes));
+ if (result.size() == num) {
+ return result;
+ }
+ targetIndex++;
+ tmpNodes = new ArrayList<>();
+ // remember to add current insert node
+ if (insertNode.getSearchIndex() == targetIndex) {
+ tmpNodes.add(insertNode);
+ }
+ }
+ } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
+ result.add(mergeInsertNodes(tmpNodes));
+ if (result.size() == num) {
+ return result;
+ }
+ targetIndex++;
+ tmpNodes = new ArrayList<>();
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Fail to read wal from wal file {}", currentFiles[i], e);
+ }
+ }
+
+ return result;
}
+ /** This iterator is not concurrency-safe */
@Override
public ReqIterator getReqIterator(long startIndex) {
- return new PlanNodeIterator();
+ return new PlanNodeIterator(startIndex);
}
private class PlanNodeIterator implements ReqIterator {
+ /** search index of next element */
+ private long nextSearchIndex;
+ /** files to search */
+ private File[] filesToSearch = null;
+ /** index of current searching file in the filesToSearch */
+ private int currentFileIndex = -1;
+ /** true means filesToSearch and currentFileIndex are outdated, call updateFilesToSearch */
+ private boolean needUpdatingFilesToSearch = true;
+ /**
+ * files whose version id before this value have already been searched, avoid storing too many
+ * files in filesToSearch
+ */
+ private int searchedFilesVersionId = 0;
+ /** batch store insert nodes */
+ private final List<InsertNode> insertNodes = new LinkedList<>();
+ /** iterator of insertNodes */
+ private Iterator<InsertNode> itr = null;
+
+ public PlanNodeIterator(long startIndex) {
+ this.nextSearchIndex = startIndex;
+ }
+
@Override
public boolean hasNext() {
+ if (itr != null && itr.hasNext()) {
+ return true;
+ }
+
+ insertNodes.clear();
+ itr = null;
+
+ if (needUpdatingFilesToSearch || filesToSearch == null) {
+ updateFilesToSearch();
+ if (needUpdatingFilesToSearch) {
+ return false;
+ }
+ }
+
+ // find all insert plan of current wal file
+ List<InsertNode> tmpNodes = new ArrayList<>();
+ long targetIndex = nextSearchIndex;
+ try (WALReader walReader = new WALReader(filesToSearch[currentFileIndex])) {
+ while (walReader.hasNext()) {
+ WALEntry walEntry = walReader.next();
+ if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
+ || walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
+ InsertNode insertNode = (InsertNode) walEntry.getValue();
+ if (insertNode.getSearchIndex() == targetIndex) {
+ tmpNodes.add(insertNode);
+ } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
+ insertNodes.add(mergeInsertNodes(tmpNodes));
+ targetIndex++;
+ tmpNodes = new ArrayList<>();
+ // remember to add current insert node
+ if (insertNode.getSearchIndex() == targetIndex) {
+ tmpNodes.add(insertNode);
+ }
+ }
+ } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
+ insertNodes.add(mergeInsertNodes(tmpNodes));
+ targetIndex++;
+ tmpNodes = new ArrayList<>();
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Fail to read wal from wal file {}", filesToSearch[currentFileIndex], e);
+ }
+
+ // find remaining slices of last insert plan of targetIndex
+ if (tmpNodes.isEmpty()) { // all insert plans scanned
+ currentFileIndex++;
+ } else {
+ int fileIndex = currentFileIndex + 1;
+ while (!tmpNodes.isEmpty() && fileIndex < filesToSearch.length) {
+ try (WALReader walReader = new WALReader(filesToSearch[fileIndex])) {
+ while (walReader.hasNext()) {
+ WALEntry walEntry = walReader.next();
+ if (walEntry.getType() == WALEntryType.INSERT_TABLET_NODE
+ || walEntry.getType() == WALEntryType.INSERT_ROW_NODE) {
+ InsertNode insertNode = (InsertNode) walEntry.getValue();
+ if (insertNode.getSearchIndex() == targetIndex) {
+ tmpNodes.add(insertNode);
+ } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
+ insertNodes.add(mergeInsertNodes(tmpNodes));
+ tmpNodes = Collections.emptyList();
+ break;
+ }
+ } else if (!tmpNodes.isEmpty()) { // find all slices of insert plan
+ insertNodes.add(mergeInsertNodes(tmpNodes));
+ tmpNodes = Collections.emptyList();
+ break;
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Fail to read wal from wal file {}", filesToSearch[currentFileIndex], e);
+ }
+ if (!tmpNodes.isEmpty()) {
+ fileIndex++;
+ }
+ }
+
+ if (tmpNodes.isEmpty()) { // all insert plans scanned
+ currentFileIndex = fileIndex;
+ } else {
+ needUpdatingFilesToSearch = true;
+ }
+ }
+
+ // update file index and version id
+ if (currentFileIndex >= filesToSearch.length) {
+ needUpdatingFilesToSearch = true;
+ } else {
+ searchedFilesVersionId =
+ WALFileUtils.parseVersionId(filesToSearch[currentFileIndex].getName());
+ }
+
+ // update iterator
+ if (insertNodes.size() != 0) {
+ itr = insertNodes.iterator();
+ return true;
+ }
return false;
}
@Override
public IConsensusRequest next() {
- return null;
+ if (itr == null && !hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ InsertNode insertNode = itr.next();
+ if (insertNode.getSearchIndex() != nextSearchIndex) {
+ logger.warn(
+ "Search index of wal node-{} are not continuously, skip from {} to {}.",
+ identifier,
+ nextSearchIndex,
+ insertNode.getSearchIndex());
+ }
+ nextSearchIndex = insertNode.getSearchIndex() + 1;
+
+ return insertNode;
}
@Override
- public IConsensusRequest waitForNext() throws InterruptedException {
- return null;
+ public void waitForNextReady() throws InterruptedException {
+ while (!hasNext()) {
+ buffer.waitForFlush();
+ }
}
@Override
- public IConsensusRequest waitForNext(long timeout)
+ public void waitForNextReady(long time, TimeUnit unit)
throws InterruptedException, TimeoutException {
- return null;
+ if (!hasNext()) {
+ boolean timeout = !buffer.waitForFlush(time, unit);
+ if (timeout || !hasNext()) {
+ throw new TimeoutException();
+ }
+ }
}
@Override
- public void skipTo(long targetIndex) {}
+ public void skipTo(long targetIndex) {
+ if (targetIndex < nextSearchIndex) {
+ logger.warn(
+ "Skip from {} to {}, it's a dangerous operation because insert plan {} may have been lost.",
+ nextSearchIndex,
+ targetIndex,
+ targetIndex);
+ searchedFilesVersionId = -1;
+ insertNodes.clear();
+ itr = null;
+ }
+ nextSearchIndex = targetIndex;
+ this.filesToSearch = null;
+ this.currentFileIndex = -1;
+ needUpdatingFilesToSearch = true;
+ }
+
+ private void updateFilesToSearch() {
+ File[] filesToSearch = logDirectory.listFiles(this::filterFilesToSearch);
+ WALFileUtils.ascSortByVersionId(filesToSearch);
+ int fileIndex = WALFileUtils.binarySearchFileBySearchIndex(filesToSearch, nextSearchIndex);
+ if (filesToSearch != null && fileIndex >= 0) { // possible to find next
+ this.filesToSearch = filesToSearch;
+ this.currentFileIndex = fileIndex;
+ this.searchedFilesVersionId =
+ WALFileUtils.parseVersionId(this.filesToSearch[currentFileIndex].getName());
+ this.needUpdatingFilesToSearch = false;
+ } else { // impossible to find next
+ this.filesToSearch = null;
+ this.currentFileIndex = -1;
+ this.needUpdatingFilesToSearch = true;
+ }
+ }
+
+ private boolean filterFilesToSearch(File dir, String name) {
+ Pattern pattern = WALFileUtils.WAL_FILE_NAME_PATTERN;
+ Matcher matcher = pattern.matcher(name);
+ boolean toSearch = false;
+ if (matcher.find()) {
+ int versionId = Integer.parseInt(matcher.group(IoTDBConstant.WAL_VERSION_ID));
+ toSearch = versionId >= searchedFilesVersionId;
+ }
+ return toSearch;
+ }
}
// endregion
@@ -434,4 +785,17 @@ public class WALNode implements IWALNode {
int getCurrentLogVersion() {
return buffer.getCurrentWALFileVersion();
}
+
+ @TestOnly
+ public void rollWALFile() {
+ WALEntry rollWALFileSignal =
+ new SignalWALEntry(SignalWALEntry.SignalType.ROLL_WAL_LOG_WRITER_SIGNAL, true);
+ WALFlushListener walFlushListener = log(rollWALFileSignal);
+ if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
+ logger.error(
+ "Fail to trigger rolling wal node-{}'s wal file log writer.",
+ identifier,
+ walFlushListener.getCause());
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/CheckpointRecoverUtils.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/CheckpointRecoverUtils.java
index 20d63c1e79..dbfe1934e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/CheckpointRecoverUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/CheckpointRecoverUtils.java
@@ -21,12 +21,10 @@ package org.apache.iotdb.db.wal.recover;
import org.apache.iotdb.db.wal.checkpoint.Checkpoint;
import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
import org.apache.iotdb.db.wal.io.CheckpointReader;
-import org.apache.iotdb.db.wal.io.CheckpointWriter;
+import org.apache.iotdb.db.wal.utils.CheckpointFileUtils;
import java.io.File;
-import java.util.Arrays;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -37,14 +35,12 @@ public class CheckpointRecoverUtils {
/** Recover memTable information from checkpoint folder */
public static Map<Integer, MemTableInfo> recoverMemTableInfo(File logDirectory) {
// find all .checkpoint file
- File[] checkpointFiles = logDirectory.listFiles(CheckpointWriter::checkpointFilenameFilter);
+ File[] checkpointFiles = CheckpointFileUtils.listAllCheckpointFiles(logDirectory);
if (checkpointFiles == null) {
return Collections.emptyMap();
}
- Arrays.sort(
- checkpointFiles,
- Comparator.comparingInt(file -> CheckpointWriter.parseVersionId(((File) file).getName()))
- .reversed());
+ // desc sort by version id
+ CheckpointFileUtils.descSortByVersionId(checkpointFiles);
// find last valid .checkpoint file and load checkpoints from it
List<Checkpoint> checkpoints = null;
for (File checkpointFile : checkpointFiles) {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java
index 477e93a408..c824f00d25 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALNodeRecoverTask.java
@@ -19,18 +19,19 @@
package org.apache.iotdb.db.wal.recover;
import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.wal.WALManager;
import org.apache.iotdb.db.wal.buffer.WALEntry;
import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
import org.apache.iotdb.db.wal.io.WALReader;
-import org.apache.iotdb.db.wal.io.WALWriter;
import org.apache.iotdb.db.wal.recover.file.UnsealedTsFileRecoverPerformer;
+import org.apache.iotdb.db.wal.utils.WALFileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.util.Arrays;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@@ -38,6 +39,7 @@ import java.util.concurrent.CountDownLatch;
/** This task is responsible for the recovery of one wal node. */
public class WALNodeRecoverTask implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(WALNodeRecoverTask.class);
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final WALRecoverManager walRecoverManger = WALRecoverManager.getInstance();
/** this directory store one wal node's .wal and .checkpoint files */
@@ -77,11 +79,20 @@ public class WALNodeRecoverTask implements Runnable {
}
}
}
- // delete this wal node folder
- FileUtils.deleteDirectory(logDirectory);
- logger.info(
- "Successfully recover WAL node in the directory {}, so delete these wal files.",
- logDirectory);
+
+ if (!config.isClusterMode()) {
+ // delete this wal node folder
+ FileUtils.deleteDirectory(logDirectory);
+ logger.info(
+ "Successfully recover WAL node in the directory {}, so delete these wal files.",
+ logDirectory);
+ } else {
+ WALManager.getInstance()
+ .registerWALNode(logDirectory.getName(), logDirectory.getAbsolutePath());
+ logger.info(
+ "Successfully recover WAL node in the directory {}, add this node to WALManger.",
+ logDirectory);
+ }
}
private void recoverInfoFromCheckpoints() {
@@ -116,12 +127,14 @@ public class WALNodeRecoverTask implements Runnable {
// find all valid .wal files
File[] walFiles =
logDirectory.listFiles(
- (dir, name) -> WALWriter.parseVersionId(name) >= firstValidVersionId);
+ (dir, name) ->
+ WALFileUtils.walFilenameFilter(dir, name)
+ && WALFileUtils.parseVersionId(name) >= firstValidVersionId);
if (walFiles == null) {
return;
}
- Arrays.sort(
- walFiles, Comparator.comparingInt(file -> WALWriter.parseVersionId(file.getName())));
+ // asc sort by version id
+ WALFileUtils.ascSortByVersionId(walFiles);
// read .wal files and redo logs
for (File walFile : walFiles) {
try (WALReader walReader = new WALReader(walFile)) {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
index 3fd638225a..fff680ee69 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/WALRecoverManager.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.wal.exception.WALRecoverException;
-import org.apache.iotdb.db.wal.node.WALNode;
import org.apache.iotdb.db.wal.recover.file.UnsealedTsFileRecoverPerformer;
import org.apache.iotdb.db.wal.utils.listener.WALRecoverListener;
@@ -64,7 +63,7 @@ public class WALRecoverManager {
List<File> walNodeDirs = new ArrayList<>();
for (String walDir : config.getWalDirs()) {
File walDirFile = SystemFileFactory.INSTANCE.getFile(walDir);
- File[] nodeDirs = walDirFile.listFiles(WALNode::walNodeFolderNameFilter);
+ File[] nodeDirs = walDirFile.listFiles(File::isDirectory);
if (nodeDirs == null) {
continue;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/io/CheckpointWriter.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/CheckpointFileUtils.java
similarity index 52%
copy from server/src/main/java/org/apache/iotdb/db/wal/io/CheckpointWriter.java
copy to server/src/main/java/org/apache/iotdb/db/wal/utils/CheckpointFileUtils.java
index 8fa34c0e6a..b8123354de 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/io/CheckpointWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/CheckpointFileUtils.java
@@ -16,46 +16,54 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.wal.io;
-
-import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.db.wal.checkpoint.Checkpoint;
+package org.apache.iotdb.db.wal.utils;
import java.io.File;
-import java.io.FileNotFoundException;
+import java.util.Arrays;
+import java.util.Comparator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-/** CheckpointWriter writes the binary {@link Checkpoint} into .checkpoint file. */
-public class CheckpointWriter extends LogWriter {
- public static final String FILE_SUFFIX = IoTDBConstant.WAL_CHECKPOINT_FILE_SUFFIX;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.WAL_CHECKPOINT_FILE_SUFFIX;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.WAL_FILE_PREFIX;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.WAL_VERSION_ID;
+
+public class CheckpointFileUtils {
+ /**
+ * versionId is a self-incremented id number, helping to maintain the order of checkpoint files
+ */
public static final Pattern CHECKPOINT_FILE_NAME_PATTERN =
- Pattern.compile("_(?<versionId>\\d+)\\.checkpoint");
+ Pattern.compile(
+ WAL_FILE_PREFIX + "(?<" + WAL_VERSION_ID + ">\\d+)\\" + WAL_CHECKPOINT_FILE_SUFFIX);
/** Return true when this file is .checkpoint file */
public static boolean checkpointFilenameFilter(File dir, String name) {
return CHECKPOINT_FILE_NAME_PATTERN.matcher(name).find();
}
- /**
- * Parse version id from filename
- *
- * @return Return {@link Integer#MIN_VALUE} when this file is not .checkpoint file
- */
+ /** List all .checkpoint files in the directory */
+ public static File[] listAllCheckpointFiles(File dir) {
+ return dir.listFiles(CheckpointFileUtils::checkpointFilenameFilter);
+ }
+
+ /** Parse version id from filename */
public static int parseVersionId(String filename) {
Matcher matcher = CHECKPOINT_FILE_NAME_PATTERN.matcher(filename);
if (matcher.find()) {
- return Integer.parseInt(matcher.group("versionId"));
+ return Integer.parseInt(matcher.group(WAL_VERSION_ID));
}
- return Integer.MIN_VALUE;
+ throw new RuntimeException("Invalid checkpoint file name: " + filename);
}
- /** Get .checkpoint filename */
- public static String getLogFileName(long version) {
- return FILE_PREFIX + version + FILE_SUFFIX;
+ /** Sort checkpoint files by version id with descending order * */
+ public static void descSortByVersionId(File[] checkpointFiles) {
+ Arrays.sort(
+ checkpointFiles,
+ Comparator.comparingInt(file -> parseVersionId(((File) file).getName())).reversed());
}
- public CheckpointWriter(File logFile) throws FileNotFoundException {
- super(logFile);
+ /** Get .checkpoint filename */
+ public static String getLogFileName(long version) {
+ return WAL_FILE_PREFIX + version + WAL_CHECKPOINT_FILE_SUFFIX;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileUtils.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileUtils.java
new file mode 100644
index 0000000000..a67d323223
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALFileUtils.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.utils;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.WAL_FILE_PREFIX;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.WAL_FILE_SUFFIX;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.WAL_START_SEARCH_INDEX;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.WAL_VERSION_ID;
+
+public class WALFileUtils {
+ /**
+ * versionId is a self-incremented id number, helping to maintain the order of wal files.
+ * startSearchIndex is the valid search index of last flushed wal entry. For example: <br>
+ *   _0-0.wal: 1, 2, 3, -1, -1, 4, 5, -1 <br>
+ *   _1-5.wal: -1, -1, -1, -1 <br>
+ *   _2-5.wal: 6, 7, 8, 9, -1, -1, -1, 10, 11, -1, 12, 12 <br>
+ *   _3-12.wal: 12, 12, 12, 12, 12 <br>
+ *   _4-12.wal: 12, 13, 14, 15, 16, -1 <br>
+ */
+ public static final Pattern WAL_FILE_NAME_PATTERN =
+ Pattern.compile(
+ WAL_FILE_PREFIX
+ + "(?<"
+ + WAL_VERSION_ID
+ + ">\\d+)-(?<"
+ + WAL_START_SEARCH_INDEX
+ + ">\\d+)\\"
+ + WAL_FILE_SUFFIX);
+
+ /** Return true when this file is .wal file */
+ public static boolean walFilenameFilter(File dir, String name) {
+ return WAL_FILE_NAME_PATTERN.matcher(name).find();
+ }
+
+ /** List all .wal files in the directory */
+ public static File[] listAllWALFiles(File dir) {
+ return dir.listFiles(WALFileUtils::walFilenameFilter);
+ }
+
+ /** Parse version id from filename */
+ public static int parseVersionId(String filename) {
+ Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(filename);
+ if (matcher.find()) {
+ return Integer.parseInt(matcher.group(WAL_VERSION_ID));
+ }
+ throw new RuntimeException("Invalid wal file name: " + filename);
+ }
+
+ /** Parse start search index from filename */
+ public static long parseStartSearchIndex(String filename) {
+ Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(filename);
+ if (matcher.find()) {
+ return Long.parseLong(matcher.group(WAL_START_SEARCH_INDEX));
+ }
+ throw new RuntimeException("Invalid wal file name: " + filename);
+ }
+
+ /** Sort wal files by version id with ascending order */
+ public static void ascSortByVersionId(File[] walFiles) {
+ Arrays.sort(walFiles, Comparator.comparingInt(file -> parseVersionId(file.getName())));
+ }
+
+ /**
+ * Find index of the file which probably contains target insert plan. <br>
+ * Given wal files [ _0-0.wal, _1-5.wal, _2-5.wal, _3-12.wal, _4-12.wal ], details as below: <br>
+ *   _0-0.wal: 1, 2, 3, -1, -1, 4, 5, -1 <br>
+ *   _1-5.wal: -1, -1, -1, -1 <br>
+ *   _2-5.wal: 6, 7, 8, 9, -1, -1, -1, 10, 11, -1, 12, 12 <br>
+ *   _3-12.wal: 12, 12, 12, 12, 12 <br>
+ *   _4-12.wal: 12, 13, 14, 15, 16, -1 <br>
+ * searching [1, 5] will return 0, searching [6, 12] will return 1, search [13, infinity) will
+ * return 3, others will return -1
+ *
+ * @param files files to be searched
+ * @param targetSearchIndex search index of target insert plan
+ * @return index of the file which probably contains target insert plan , -1 if the target insert
+ * plan definitely doesn't exist
+ */
+ public static int binarySearchFileBySearchIndex(File[] files, long targetSearchIndex) {
+ if (files == null
+ || files.length == 0
+ || targetSearchIndex <= parseStartSearchIndex(files[0].getName())) {
+ return -1;
+ }
+
+ if (targetSearchIndex > parseStartSearchIndex(files[files.length - 1].getName())) {
+ return files.length - 1;
+ }
+
+ int low = 0;
+ int high = files.length - 1;
+ // search file whose search index i < targetSearchIndex
+ while (low <= high) {
+ int mid = (low + high) >>> 1;
+ long midVal = parseStartSearchIndex(files[mid].getName());
+
+ if (midVal < targetSearchIndex) {
+ low = mid + 1;
+ } else {
+ high = mid - 1;
+ }
+ }
+
+ return low - 1;
+ }
+
+ /** Get .wal filename */
+ public static String getLogFileName(int versionId, long startSearchIndex) {
+ return WAL_FILE_PREFIX + versionId + FILE_NAME_SEPARATOR + startSearchIndex + WAL_FILE_SUFFIX;
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java b/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
index 3c5d5a36be..eb9b2d0058 100644
--- a/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/tools/WalCheckerTest.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.tools;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.exception.SystemCheckException;
@@ -28,6 +27,7 @@ import org.apache.iotdb.db.wal.io.ILogWriter;
import org.apache.iotdb.db.wal.io.WALFileTest;
import org.apache.iotdb.db.wal.io.WALWriter;
import org.apache.iotdb.db.wal.utils.WALByteBufferForTest;
+import org.apache.iotdb.db.wal.utils.WALFileUtils;
import org.apache.commons.io.FileUtils;
import org.junit.Test;
@@ -80,8 +80,7 @@ public class WalCheckerTest {
File walNodeDir = new File(tempRoot, String.valueOf(i));
walNodeDir.mkdir();
- File walFile =
- new File(walNodeDir, WALWriter.FILE_PREFIX + i + IoTDBConstant.WAL_FILE_SUFFIX);
+ File walFile = new File(walNodeDir, WALFileUtils.getLogFileName(i, 0));
int fakeMemTableId = 1;
List<WALEntry> walEntries = new ArrayList<>();
walEntries.add(new WALEntry(fakeMemTableId, WALFileTest.getInsertRowPlan(DEVICE_ID)));
@@ -117,7 +116,7 @@ public class WalCheckerTest {
File walNodeDir = new File(tempRoot, String.valueOf(i));
walNodeDir.mkdir();
- File walFile = new File(walNodeDir, "_" + i + IoTDBConstant.WAL_FILE_SUFFIX);
+ File walFile = new File(walNodeDir, WALFileUtils.getLogFileName(i, 0));
int fakeMemTableId = 1;
List<WALEntry> walEntries = new ArrayList<>();
walEntries.add(new WALEntry(fakeMemTableId, WALFileTest.getInsertRowPlan(DEVICE_ID)));
@@ -158,7 +157,7 @@ public class WalCheckerTest {
File walNodeDir = new File(tempRoot, String.valueOf(i));
walNodeDir.mkdir();
- File walFile = new File(walNodeDir, "_" + i + IoTDBConstant.WAL_FILE_SUFFIX);
+ File walFile = new File(walNodeDir, WALFileUtils.getLogFileName(i, 0));
FileOutputStream fileOutputStream = new FileOutputStream(walFile);
try {
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/DisableWALTest.java b/server/src/test/java/org/apache/iotdb/db/wal/DisableWALTest.java
index 8b9475c77e..d16365cea0 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/DisableWALTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/DisableWALTest.java
@@ -48,7 +48,7 @@ public class DisableWALTest {
@Test
public void testDisableWAL() {
WALManager walManager = WALManager.getInstance();
- IWALNode walNode = walManager.applyForWALNode();
+ IWALNode walNode = walManager.applyForWALNode("");
assertEquals(WALFakeNode.getSuccessInstance(), walNode);
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/WALManagerTest.java b/server/src/test/java/org/apache/iotdb/db/wal/WALManagerTest.java
index 20ae8c8db0..83343f7859 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/WALManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/WALManagerTest.java
@@ -25,8 +25,8 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.db.wal.io.WALWriter;
-import org.apache.iotdb.db.wal.node.IWALNode;
+import org.apache.iotdb.db.wal.node.WALNode;
+import org.apache.iotdb.db.wal.utils.WALFileUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.junit.After;
@@ -66,22 +66,42 @@ public class WALManagerTest {
}
@Test
- public void testAllocateWALNode() throws IllegalPathException {
+ public void testDeleteOutdatedWALFiles() throws IllegalPathException {
WALManager walManager = WALManager.getInstance();
- IWALNode[] walNodes = new IWALNode[6];
+ WALNode[] walNodes = new WALNode[6];
for (int i = 0; i < 12; i++) {
- IWALNode walNode = walManager.applyForWALNode();
+ WALNode walNode = (WALNode) walManager.applyForWALNode(String.valueOf(i));
if (i < 6) {
walNodes[i] = walNode;
} else {
assertEquals(walNodes[i % 6], walNode);
}
walNode.log(i, getInsertRowPlan());
+ walNode.rollWALFile();
}
+
for (String walDir : walDirs) {
File walDirFile = new File(walDir);
assertTrue(walDirFile.exists());
- assertNotNull(walDirFile.list(WALWriter::walFilenameFilter));
+ File[] nodeDirs = walDirFile.listFiles(File::isDirectory);
+ assertNotNull(nodeDirs);
+ for (File nodeDir : nodeDirs) {
+ assertTrue(nodeDir.exists());
+ assertEquals(3, WALFileUtils.listAllWALFiles(nodeDir).length);
+ }
+ }
+
+ walManager.deleteOutdatedWALFiles();
+
+ for (String walDir : walDirs) {
+ File walDirFile = new File(walDir);
+ assertTrue(walDirFile.exists());
+ File[] nodeDirs = walDirFile.listFiles(File::isDirectory);
+ assertNotNull(nodeDirs);
+ for (File nodeDir : nodeDirs) {
+ assertTrue(nodeDir.exists());
+ assertEquals(1, WALFileUtils.listAllWALFiles(nodeDir).length);
+ }
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/WALManagerTest.java b/server/src/test/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategyTest.java
similarity index 57%
copy from server/src/test/java/org/apache/iotdb/db/wal/WALManagerTest.java
copy to server/src/test/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategyTest.java
index 20ae8c8db0..8b36ee3f27 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/WALManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategyTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.wal;
+package org.apache.iotdb.db.wal.allocation;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
@@ -25,8 +25,8 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.db.wal.io.WALWriter;
import org.apache.iotdb.db.wal.node.IWALNode;
+import org.apache.iotdb.db.wal.utils.WALFileUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.junit.After;
@@ -36,10 +36,11 @@ import org.junit.Test;
import java.io.File;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-public class WALManagerTest {
+public class FirstCreateStrategyTest {
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final String[] walDirs =
new String[] {
@@ -67,21 +68,71 @@ public class WALManagerTest {
@Test
public void testAllocateWALNode() throws IllegalPathException {
- WALManager walManager = WALManager.getInstance();
+ FirstCreateStrategy roundRobinStrategy = new FirstCreateStrategy();
IWALNode[] walNodes = new IWALNode[6];
- for (int i = 0; i < 12; i++) {
- IWALNode walNode = walManager.applyForWALNode();
- if (i < 6) {
- walNodes[i] = walNode;
- } else {
- assertEquals(walNodes[i % 6], walNode);
+ try {
+ for (int i = 0; i < 12; i++) {
+ String identifier = String.valueOf(i % 6);
+ IWALNode walNode = roundRobinStrategy.applyForWALNode(identifier);
+ if (i < 6) {
+ walNodes[i] = walNode;
+ } else {
+ assertEquals(walNodes[i % 6], walNode);
+ }
+ walNode.log(i, getInsertRowPlan());
+ }
+ for (String walDir : walDirs) {
+ File walDirFile = new File(walDir);
+ assertTrue(walDirFile.exists());
+ File[] nodeDirs = walDirFile.listFiles(File::isDirectory);
+ assertNotNull(nodeDirs);
+ assertEquals(2, nodeDirs.length);
+ for (File nodeDir : nodeDirs) {
+ assertTrue(nodeDir.exists());
+ assertNotEquals(0, WALFileUtils.listAllWALFiles(nodeDir).length);
+ }
+ }
+ } finally {
+ for (IWALNode walNode : walNodes) {
+ if (walNode != null) {
+ walNode.close();
+ }
}
- walNode.log(i, getInsertRowPlan());
}
- for (String walDir : walDirs) {
- File walDirFile = new File(walDir);
+ }
+
+ @Test
+ public void testRegisterWALNode() throws IllegalPathException {
+ FirstCreateStrategy roundRobinStrategy = new FirstCreateStrategy();
+ IWALNode[] walNodes = new IWALNode[6];
+ try {
+ for (int i = 0; i < 12; i++) {
+ String identifier = String.valueOf(i % 6);
+ roundRobinStrategy.registerWALNode(identifier, walDirs[0] + File.separator + identifier);
+ IWALNode walNode = roundRobinStrategy.applyForWALNode(identifier);
+ if (i < 6) {
+ walNodes[i] = walNode;
+ } else {
+ assertEquals(walNodes[i % 6], walNode);
+ }
+ walNode.log(i, getInsertRowPlan());
+ }
+
+ File walDirFile = new File(walDirs[0]);
assertTrue(walDirFile.exists());
- assertNotNull(walDirFile.list(WALWriter::walFilenameFilter));
+ File[] nodeDirs = walDirFile.listFiles(File::isDirectory);
+ assertNotNull(nodeDirs);
+ assertEquals(6, nodeDirs.length);
+ for (File nodeDir : nodeDirs) {
+ assertTrue(nodeDir.exists());
+ assertNotEquals(0, WALFileUtils.listAllWALFiles(nodeDir).length);
+ }
+ } finally {
+ for (IWALNode walNode : walNodes) {
+ if (walNode != null) {
+ walNode.close();
+ }
+ }
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/WALManagerTest.java b/server/src/test/java/org/apache/iotdb/db/wal/allocation/RoundRobinStrategyTest.java
similarity index 73%
copy from server/src/test/java/org/apache/iotdb/db/wal/WALManagerTest.java
copy to server/src/test/java/org/apache/iotdb/db/wal/allocation/RoundRobinStrategyTest.java
index 20ae8c8db0..cdf0c2b4ac 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/WALManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/allocation/RoundRobinStrategyTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.wal;
+package org.apache.iotdb.db.wal.allocation;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
@@ -25,8 +25,8 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.db.wal.io.WALWriter;
import org.apache.iotdb.db.wal.node.IWALNode;
+import org.apache.iotdb.db.wal.utils.WALFileUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.junit.After;
@@ -36,10 +36,11 @@ import org.junit.Test;
import java.io.File;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-public class WALManagerTest {
+public class RoundRobinStrategyTest {
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final String[] walDirs =
new String[] {
@@ -67,21 +68,35 @@ public class WALManagerTest {
@Test
public void testAllocateWALNode() throws IllegalPathException {
- WALManager walManager = WALManager.getInstance();
+ RoundRobinStrategy roundRobinStrategy = new RoundRobinStrategy(6);
IWALNode[] walNodes = new IWALNode[6];
- for (int i = 0; i < 12; i++) {
- IWALNode walNode = walManager.applyForWALNode();
- if (i < 6) {
- walNodes[i] = walNode;
- } else {
- assertEquals(walNodes[i % 6], walNode);
+ try {
+ for (int i = 0; i < 12; i++) {
+ IWALNode walNode = roundRobinStrategy.applyForWALNode(String.valueOf(i));
+ if (i < 6) {
+ walNodes[i] = walNode;
+ } else {
+ assertEquals(walNodes[i % 6], walNode);
+ }
+ walNode.log(i, getInsertRowPlan());
+ }
+ for (String walDir : walDirs) {
+ File walDirFile = new File(walDir);
+ assertTrue(walDirFile.exists());
+ File[] nodeDirs = walDirFile.listFiles(File::isDirectory);
+ assertNotNull(nodeDirs);
+ assertEquals(2, nodeDirs.length);
+ for (File nodeDir : nodeDirs) {
+ assertTrue(nodeDir.exists());
+ assertNotEquals(0, WALFileUtils.listAllWALFiles(nodeDir).length);
+ }
+ }
+ } finally {
+ for (IWALNode walNode : walNodes) {
+ if (walNode != null) {
+ walNode.close();
+ }
}
- walNode.log(i, getInsertRowPlan());
- }
- for (String walDir : walDirs) {
- File walDirFile = new File(walDir);
- assertTrue(walDirFile.exists());
- assertNotNull(walDirFile.list(WALWriter::walFilenameFilter));
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/buffer/WALBufferCommonTest.java b/server/src/test/java/org/apache/iotdb/db/wal/buffer/WALBufferCommonTest.java
index aa30956d4e..a803a410b6 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/buffer/WALBufferCommonTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/buffer/WALBufferCommonTest.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.wal.io.WALReader;
-import org.apache.iotdb.db.wal.io.WALWriter;
+import org.apache.iotdb.db.wal.utils.WALFileUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.junit.After;
@@ -94,7 +94,7 @@ public abstract class WALBufferCommonTest {
}
Thread.sleep(1_000);
// check .wal files
- File[] walFiles = new File(logDirectory).listFiles(WALWriter::walFilenameFilter);
+ File[] walFiles = WALFileUtils.listAllWALFiles(new File(logDirectory));
Set<InsertRowPlan> actualInsertRowPlans = new HashSet<>();
if (walFiles != null) {
for (File walFile : walFiles) {
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManagerTest.java b/server/src/test/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManagerTest.java
index 91e947c022..5e77070e68 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/checkpoint/CheckpointManagerTest.java
@@ -18,12 +18,14 @@
*/
package org.apache.iotdb.db.wal.checkpoint;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.wal.io.CheckpointReader;
-import org.apache.iotdb.db.wal.io.CheckpointWriter;
import org.apache.iotdb.db.wal.recover.CheckpointRecoverUtils;
+import org.apache.iotdb.db.wal.utils.CheckpointFileUtils;
import org.junit.After;
import org.junit.Before;
@@ -46,19 +48,24 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class CheckpointManagerTest {
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final String identifier = String.valueOf(Integer.MAX_VALUE);
private static final String logDirectory = TestConstant.BASE_OUTPUT_PATH.concat("wal-test");
private CheckpointManager checkpointManager;
+ private long prevFileSize;
@Before
public void setUp() throws Exception {
EnvironmentUtils.cleanDir(logDirectory);
+ prevFileSize = config.getCheckpointFileSizeThresholdInByte();
+ config.setCheckpointFileSizeThresholdInByte(10 * 1024);
checkpointManager = new CheckpointManager(identifier, logDirectory);
}
@After
public void tearDown() throws Exception {
checkpointManager.close();
+ config.setCheckpointFileSizeThresholdInByte(prevFileSize);
EnvironmentUtils.cleanDir(logDirectory);
}
@@ -69,7 +76,7 @@ public class CheckpointManagerTest {
List<Checkpoint> expectedCheckpoints = Collections.singletonList(initCheckpoint);
CheckpointReader checkpointReader =
new CheckpointReader(
- new File(logDirectory + File.separator + CheckpointWriter.getLogFileName(0)));
+ new File(logDirectory + File.separator + CheckpointFileUtils.getLogFileName(0)));
List<Checkpoint> actualCheckpoints = checkpointReader.readAll();
assertEquals(expectedCheckpoints, actualCheckpoints);
}
@@ -122,7 +129,7 @@ public class CheckpointManagerTest {
int versionId = 0;
Map<Integer, MemTableInfo> expectedMemTableId2Info = new HashMap<>();
Map<Integer, Integer> versionId2memTableId = new HashMap<>();
- while (size < CheckpointManager.LOG_SIZE_LIMIT) {
+ while (size < config.getCheckpointFileSizeThresholdInByte()) {
++versionId;
String tsFilePath = logDirectory + File.separator + versionId + ".tsfile";
MemTableInfo memTableInfo = new MemTableInfo(new PrimitiveMemTable(), tsFilePath, versionId);
@@ -146,9 +153,9 @@ public class CheckpointManagerTest {
assertEquals(5, checkpointManager.getFirstValidWALVersionId());
// check checkpoint files
assertFalse(
- new File(logDirectory + File.separator + CheckpointWriter.getLogFileName(0)).exists());
+ new File(logDirectory + File.separator + CheckpointFileUtils.getLogFileName(0)).exists());
assertTrue(
- new File(logDirectory + File.separator + CheckpointWriter.getLogFileName(1)).exists());
+ new File(logDirectory + File.separator + CheckpointFileUtils.getLogFileName(1)).exists());
// recover info from checkpoint file
Map<Integer, MemTableInfo> actualMemTableId2Info =
CheckpointRecoverUtils.recoverMemTableInfo(new File(logDirectory));
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
new file mode 100644
index 0000000000..7eb5f06b90
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/wal/node/ConsensusReqReaderTest.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.node;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.wal.ConsensusReqReader;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.wal.utils.WALMode;
+import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+public class ConsensusReqReaderTest {
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private static final String identifier = String.valueOf(Integer.MAX_VALUE);
+ private static final String logDirectory = TestConstant.BASE_OUTPUT_PATH.concat("wal-test");
+ private static final String devicePath = "root.test_sg.test_d";
+ private WALMode prevMode;
+ private WALNode walNode;
+
+ @Before
+ public void setUp() throws Exception {
+ EnvironmentUtils.cleanDir(logDirectory);
+ prevMode = config.getWalMode();
+ config.setWalMode(WALMode.SYNC);
+ walNode = new WALNode(identifier, logDirectory);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ walNode.close();
+ config.setWalMode(prevMode);
+ EnvironmentUtils.cleanDir(logDirectory);
+ }
+
+ /**
+ * Generate wal files as below: <br>
+ *   _0-0.wal: 1,-1 <br>
+ *   _1-1.wal: 2,2,2 <br>
+ *   _2-2.wal: 3,3 <br>
+ *   _3-3.wal: 3,4 <br>
+ *   _4-4.wal: 4 <br>
+ *   _5-4.wal: 4,4,5 <br>
+ *   _6-5.wal: 6 <br>
+ * 1 - InsertRowNode, 2 - InsertRowsOfOneDeviceNode, 3 - InsertRowsNode, 4 -
+ * InsertMultiTabletsNode, 5 - InsertTabletNode, 6 - InsertRowNode
+ */
+ private void simulateFileScenario01() throws IllegalPathException {
+ InsertTabletNode insertTabletNode;
+ InsertRowNode insertRowNode;
+ // _0-0.wal
+ insertRowNode = getInsertRowNode(devicePath);
+ insertRowNode.setSearchIndex(1);
+ walNode.log(0, insertRowNode); // 1
+ insertTabletNode = getInsertTabletNode(devicePath, new long[] {2});
+ walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // -1
+ walNode.rollWALFile();
+ // _1-1.wal
+ insertRowNode = getInsertRowNode(devicePath);
+ insertRowNode.setSearchIndex(2);
+ walNode.log(0, insertRowNode); // 2
+ walNode.log(0, insertRowNode); // 2
+ walNode.log(0, insertRowNode); // 2
+ walNode.rollWALFile();
+ // _2-2.wal
+ insertRowNode = getInsertRowNode(devicePath);
+ insertRowNode.setSearchIndex(3);
+ walNode.log(0, insertRowNode); // 3
+ walNode.log(0, insertRowNode); // 3
+ walNode.rollWALFile();
+ // _3-3.wal
+ insertRowNode.setDevicePath(new PartialPath(devicePath + "test"));
+ walNode.log(0, insertRowNode); // 3
+ insertTabletNode = getInsertTabletNode(devicePath, new long[] {4});
+ insertTabletNode.setSearchIndex(4);
+ walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 4
+ walNode.rollWALFile();
+ // _4-4.wal
+ walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 4
+ walNode.rollWALFile();
+ // _5-4.wal
+ walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 4
+ walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 4
+ insertTabletNode = getInsertTabletNode(devicePath, new long[] {5});
+ insertTabletNode.setSearchIndex(5);
+ walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 5
+ walNode.rollWALFile();
+ // _6-5.wal
+ insertRowNode = getInsertRowNode(devicePath);
+ insertRowNode.setSearchIndex(6);
+ WALFlushListener walFlushListener = walNode.log(0, insertRowNode); // 6
+ walFlushListener.waitForResult();
+ }
+
+ @Test
+ public void scenario01TestGetReq01() throws Exception {
+ simulateFileScenario01();
+
+ IConsensusRequest request;
+ request = walNode.getReq(1);
+ Assert.assertTrue(request instanceof InsertRowNode);
+ Assert.assertEquals(1, ((InsertRowNode) request).getSearchIndex());
+ request = walNode.getReq(2);
+ Assert.assertTrue(request instanceof InsertRowsOfOneDeviceNode);
+ Assert.assertEquals(2, ((InsertRowsOfOneDeviceNode) request).getSearchIndex());
+ Assert.assertEquals(
+ 3, ((InsertRowsOfOneDeviceNode) request).getInsertRowNodeIndexList().size());
+ request = walNode.getReq(3);
+ Assert.assertTrue(request instanceof InsertRowsNode);
+ Assert.assertEquals(3, ((InsertRowsNode) request).getSearchIndex());
+ Assert.assertEquals(3, ((InsertRowsNode) request).getInsertRowNodeIndexList().size());
+ request = walNode.getReq(4);
+ Assert.assertTrue(request instanceof InsertMultiTabletsNode);
+ Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
+ Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
+ request = walNode.getReq(5);
+ Assert.assertTrue(request instanceof InsertTabletNode);
+ Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+ request = walNode.getReq(6);
+ Assert.assertNull(request);
+ }
+
+ @Test
+ public void scenario01TestGetReqs01() throws Exception {
+ simulateFileScenario01();
+ List<IConsensusRequest> requests;
+ IConsensusRequest request;
+
+ requests = walNode.getReqs(1, 6);
+ Assert.assertEquals(5, requests.size());
+ request = requests.get(0);
+ Assert.assertTrue(request instanceof InsertRowNode);
+ Assert.assertEquals(1, ((InsertRowNode) request).getSearchIndex());
+ request = requests.get(1);
+ Assert.assertTrue(request instanceof InsertRowsOfOneDeviceNode);
+ Assert.assertEquals(2, ((InsertRowsOfOneDeviceNode) request).getSearchIndex());
+ Assert.assertEquals(
+ 3, ((InsertRowsOfOneDeviceNode) request).getInsertRowNodeIndexList().size());
+ request = requests.get(2);
+ Assert.assertTrue(request instanceof InsertRowsNode);
+ Assert.assertEquals(3, ((InsertRowsNode) request).getSearchIndex());
+ Assert.assertEquals(3, ((InsertRowsNode) request).getInsertRowNodeIndexList().size());
+ request = requests.get(3);
+ Assert.assertTrue(request instanceof InsertMultiTabletsNode);
+ Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
+ Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
+ request = requests.get(4);
+ Assert.assertTrue(request instanceof InsertTabletNode);
+ Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+ }
+
+ @Test
+ public void scenario01TestGetReqs02() throws Exception {
+ simulateFileScenario01();
+ List<IConsensusRequest> requests;
+ IConsensusRequest request;
+
+ requests = walNode.getReqs(3, 1);
+ Assert.assertEquals(1, requests.size());
+ request = requests.get(0);
+ Assert.assertTrue(request instanceof InsertRowsNode);
+ Assert.assertEquals(3, ((InsertRowsNode) request).getSearchIndex());
+ Assert.assertEquals(3, ((InsertRowsNode) request).getInsertRowNodeIndexList().size());
+ }
+
+ @Test
+ public void scenario01TestGetReqs03() throws Exception {
+ simulateFileScenario01();
+ List<IConsensusRequest> requests;
+ IConsensusRequest request;
+
+ requests = walNode.getReqs(4, 2);
+ Assert.assertEquals(2, requests.size());
+ request = requests.get(0);
+ Assert.assertTrue(request instanceof InsertMultiTabletsNode);
+ Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
+ Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
+ request = requests.get(1);
+ Assert.assertTrue(request instanceof InsertTabletNode);
+ Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+ }
+
+ @Test
+ public void scenario01TestGetReqs04() throws Exception {
+ simulateFileScenario01();
+ List<IConsensusRequest> requests;
+ IConsensusRequest request;
+
+ requests = walNode.getReqs(5, 100);
+ Assert.assertEquals(1, requests.size());
+ request = requests.get(0);
+ Assert.assertTrue(request instanceof InsertTabletNode);
+ Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+ }
+
+ @Test
+ public void scenario01TestGetReqs05() throws Exception {
+ simulateFileScenario01();
+ List<IConsensusRequest> requests;
+
+ requests = walNode.getReqs(6, 100);
+ Assert.assertEquals(0, requests.size());
+ }
+
+ @Test
+ public void scenario01TestGetReqIterator01() throws Exception {
+ simulateFileScenario01();
+ IConsensusRequest request;
+ ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
+
+ Assert.assertTrue(iterator.hasNext());
+ request = iterator.next();
+ Assert.assertTrue(request instanceof InsertRowNode);
+ Assert.assertEquals(1, ((InsertRowNode) request).getSearchIndex());
+ Assert.assertTrue(iterator.hasNext());
+ request = iterator.next();
+ Assert.assertTrue(request instanceof InsertRowsOfOneDeviceNode);
+ Assert.assertEquals(2, ((InsertRowsOfOneDeviceNode) request).getSearchIndex());
+ Assert.assertEquals(
+ 3, ((InsertRowsOfOneDeviceNode) request).getInsertRowNodeIndexList().size());
+ Assert.assertTrue(iterator.hasNext());
+ request = iterator.next();
+ Assert.assertTrue(request instanceof InsertRowsNode);
+ Assert.assertEquals(3, ((InsertRowsNode) request).getSearchIndex());
+ Assert.assertEquals(3, ((InsertRowsNode) request).getInsertRowNodeIndexList().size());
+ Assert.assertTrue(iterator.hasNext());
+ request = iterator.next();
+ Assert.assertTrue(request instanceof InsertMultiTabletsNode);
+ Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
+ Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
+ Assert.assertTrue(iterator.hasNext());
+ request = iterator.next();
+ Assert.assertTrue(request instanceof InsertTabletNode);
+ Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+ Assert.assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void scenario01TestGetReqIterator02() throws Exception {
+ simulateFileScenario01();
+ IConsensusRequest request;
+ ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(4);
+
+ Assert.assertTrue(iterator.hasNext());
+ request = iterator.next();
+ Assert.assertTrue(request instanceof InsertMultiTabletsNode);
+ Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
+ Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
+ Assert.assertTrue(iterator.hasNext());
+ request = iterator.next();
+ Assert.assertTrue(request instanceof InsertTabletNode);
+ Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+
+ Assert.assertFalse(iterator.hasNext());
+ // wait for next
+ ExecutorService checkThread = Executors.newSingleThreadExecutor();
+ Future<Boolean> future =
+ checkThread.submit(
+ () -> {
+ iterator.waitForNextReady();
+ Assert.assertTrue(iterator.hasNext());
+ IConsensusRequest req = iterator.next();
+ Assert.assertTrue(req instanceof InsertRowNode);
+ Assert.assertEquals(6, ((InsertRowNode) req).getSearchIndex());
+ return true;
+ });
+
+ Thread.sleep(500);
+ InsertRowNode insertRowNode = getInsertRowNode(devicePath);
+ walNode.log(0, insertRowNode); // put -1 after 6
+ Assert.assertTrue(future.get());
+ }
+
+ @Test
+ public void scenario01TestGetReqIterator03() throws Exception {
+ simulateFileScenario01();
+ IConsensusRequest request;
+ ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(5);
+
+ Assert.assertTrue(iterator.hasNext());
+ request = iterator.next();
+ Assert.assertTrue(request instanceof InsertTabletNode);
+ Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+
+ Assert.assertFalse(iterator.hasNext());
+ // wait for next
+ ExecutorService checkThread = Executors.newSingleThreadExecutor();
+ Future<Boolean> future =
+ checkThread.submit(
+ () -> {
+ iterator.waitForNextReady();
+ Assert.assertTrue(iterator.hasNext());
+ IConsensusRequest req = iterator.next();
+ Assert.assertTrue(req instanceof InsertRowNode);
+ Assert.assertEquals(6, ((InsertRowNode) req).getSearchIndex());
+ return true;
+ });
+
+ Thread.sleep(500);
+ walNode.rollWALFile();
+ InsertRowNode insertRowNode = getInsertRowNode(devicePath);
+ walNode.log(0, insertRowNode); // put -1 after 6
+ Assert.assertTrue(future.get());
+ }
+
+ @Test
+ public void scenario01TestGetReqIterator04() throws Exception {
+ simulateFileScenario01();
+ IConsensusRequest request;
+ ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
+
+ Assert.assertTrue(iterator.hasNext());
+ request = iterator.next();
+ Assert.assertTrue(request instanceof InsertRowNode);
+ Assert.assertEquals(1, ((InsertRowNode) request).getSearchIndex());
+ Assert.assertTrue(iterator.hasNext());
+ request = iterator.next();
+ Assert.assertTrue(request instanceof InsertRowsOfOneDeviceNode);
+ Assert.assertEquals(2, ((InsertRowsOfOneDeviceNode) request).getSearchIndex());
+ Assert.assertEquals(
+ 3, ((InsertRowsOfOneDeviceNode) request).getInsertRowNodeIndexList().size());
+
+ iterator.skipTo(4);
+
+ Assert.assertTrue(iterator.hasNext());
+ request = iterator.next();
+ Assert.assertTrue(request instanceof InsertMultiTabletsNode);
+ Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
+ Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
+ Assert.assertTrue(iterator.hasNext());
+ request = iterator.next();
+ Assert.assertTrue(request instanceof InsertTabletNode);
+ Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+ Assert.assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void scenario01TestGetReqIterator05() throws Exception {
+ simulateFileScenario01();
+ IConsensusRequest request;
+ ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(5);
+
+ Assert.assertTrue(iterator.hasNext());
+ request = iterator.next();
+ Assert.assertTrue(request instanceof InsertTabletNode);
+ Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+
+ iterator.skipTo(2);
+
+ Assert.assertTrue(iterator.hasNext());
+ request = iterator.next();
+ Assert.assertTrue(request instanceof InsertRowsOfOneDeviceNode);
+ Assert.assertEquals(2, ((InsertRowsOfOneDeviceNode) request).getSearchIndex());
+ Assert.assertEquals(
+ 3, ((InsertRowsOfOneDeviceNode) request).getInsertRowNodeIndexList().size());
+ Assert.assertTrue(iterator.hasNext());
+ request = iterator.next();
+ Assert.assertTrue(request instanceof InsertRowsNode);
+ Assert.assertEquals(3, ((InsertRowsNode) request).getSearchIndex());
+ Assert.assertEquals(3, ((InsertRowsNode) request).getInsertRowNodeIndexList().size());
+ Assert.assertTrue(iterator.hasNext());
+ request = iterator.next();
+ Assert.assertTrue(request instanceof InsertMultiTabletsNode);
+ Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getSearchIndex());
+ Assert.assertEquals(4, ((InsertMultiTabletsNode) request).getInsertTabletNodeList().size());
+ Assert.assertTrue(iterator.hasNext());
+ request = iterator.next();
+ Assert.assertTrue(request instanceof InsertTabletNode);
+ Assert.assertEquals(5, ((InsertTabletNode) request).getSearchIndex());
+ Assert.assertFalse(iterator.hasNext());
+ }
+
+ public static InsertRowNode getInsertRowNode(String devicePath) throws IllegalPathException {
+ long time = 110L;
+ TSDataType[] dataTypes =
+ new TSDataType[] {
+ TSDataType.DOUBLE,
+ TSDataType.FLOAT,
+ TSDataType.INT64,
+ TSDataType.INT32,
+ TSDataType.BOOLEAN,
+ TSDataType.TEXT
+ };
+
+ Object[] columns = new Object[6];
+ columns[0] = 1.0;
+ columns[1] = 2.0f;
+ columns[2] = 10000L;
+ columns[3] = 100;
+ columns[4] = false;
+ columns[5] = new Binary("hh" + 0);
+
+ InsertRowNode insertRowNode =
+ new InsertRowNode(
+ new PlanNodeId(""),
+ new PartialPath(devicePath),
+ false,
+ new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+ dataTypes,
+ time,
+ columns,
+ false);
+
+ insertRowNode.setMeasurementSchemas(
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.DOUBLE),
+ new MeasurementSchema("s2", TSDataType.FLOAT),
+ new MeasurementSchema("s3", TSDataType.INT64),
+ new MeasurementSchema("s4", TSDataType.INT32),
+ new MeasurementSchema("s5", TSDataType.BOOLEAN),
+ new MeasurementSchema("s6", TSDataType.TEXT)
+ });
+ return insertRowNode;
+ }
+
+ private InsertTabletNode getInsertTabletNode(String devicePath, long[] times)
+ throws IllegalPathException {
+ TSDataType[] dataTypes =
+ new TSDataType[] {
+ TSDataType.DOUBLE,
+ TSDataType.FLOAT,
+ TSDataType.INT64,
+ TSDataType.INT32,
+ TSDataType.BOOLEAN,
+ TSDataType.TEXT
+ };
+
+ Object[] columns = new Object[6];
+ columns[0] = new double[times.length];
+ columns[1] = new float[times.length];
+ columns[2] = new long[times.length];
+ columns[3] = new int[times.length];
+ columns[4] = new boolean[times.length];
+ columns[5] = new Binary[times.length];
+
+ for (int r = 0; r < times.length; r++) {
+ ((double[]) columns[0])[r] = 1.0 + r;
+ ((float[]) columns[1])[r] = 2 + r;
+ ((long[]) columns[2])[r] = 10000 + r;
+ ((int[]) columns[3])[r] = 100 + r;
+ ((boolean[]) columns[4])[r] = (r % 2 == 0);
+ ((Binary[]) columns[5])[r] = new Binary("hh" + r);
+ }
+
+ BitMap[] bitMaps = new BitMap[dataTypes.length];
+ for (int i = 0; i < dataTypes.length; i++) {
+ if (bitMaps[i] == null) {
+ bitMaps[i] = new BitMap(times.length);
+ }
+ bitMaps[i].mark(i % times.length);
+ }
+
+ InsertTabletNode insertTabletNode =
+ new InsertTabletNode(
+ new PlanNodeId(""),
+ new PartialPath(devicePath),
+ false,
+ new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+ dataTypes,
+ times,
+ bitMaps,
+ columns,
+ times.length);
+
+ insertTabletNode.setMeasurementSchemas(
+ new MeasurementSchema[] {
+ new MeasurementSchema("s1", TSDataType.DOUBLE),
+ new MeasurementSchema("s2", TSDataType.FLOAT),
+ new MeasurementSchema("s3", TSDataType.INT64),
+ new MeasurementSchema("s4", TSDataType.INT32),
+ new MeasurementSchema("s5", TSDataType.BOOLEAN),
+ new MeasurementSchema("s6", TSDataType.TEXT)
+ });
+
+ return insertTabletNode;
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java b/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java
index 28e6bd6004..6c72f0aeeb 100644
--- a/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/wal/node/WALNodeTest.java
@@ -29,8 +29,8 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.wal.checkpoint.MemTableInfo;
import org.apache.iotdb.db.wal.io.WALReader;
-import org.apache.iotdb.db.wal.io.WALWriter;
import org.apache.iotdb.db.wal.recover.CheckpointRecoverUtils;
+import org.apache.iotdb.db.wal.utils.WALFileUtils;
import org.apache.iotdb.db.wal.utils.WALMode;
import org.apache.iotdb.db.wal.utils.listener.WALFlushListener;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -115,7 +115,7 @@ public class WALNodeTest {
}
Thread.sleep(1_000);
// check .wal files
- File[] walFiles = new File(logDirectory).listFiles(WALWriter::walFilenameFilter);
+ File[] walFiles = WALFileUtils.listAllWALFiles(new File(logDirectory));
Set<InsertTabletPlan> actualInsertTabletPlans = new HashSet<>();
if (walFiles != null) {
for (File walFile : walFiles) {
@@ -258,12 +258,16 @@ public class WALNodeTest {
}
walNode.onMemTableFlushed(memTable);
walNode.onMemTableCreated(new PrimitiveMemTable(), tsFilePath);
- // check existence of _0.wal file
- assertTrue(new File(logDirectory + File.separator + WALWriter.getLogFileName(0)).exists());
- assertTrue(new File(logDirectory + File.separator + WALWriter.getLogFileName(1)).exists());
+ // check existence of _0-0.wal file
+ assertTrue(
+ new File(logDirectory + File.separator + WALFileUtils.getLogFileName(0, 0)).exists());
+ assertTrue(
+ new File(logDirectory + File.separator + WALFileUtils.getLogFileName(1, 0)).exists());
walNode.deleteOutdatedFiles();
- assertFalse(new File(logDirectory + File.separator + WALWriter.getLogFileName(0)).exists());
- assertTrue(new File(logDirectory + File.separator + WALWriter.getLogFileName(1)).exists());
+ assertFalse(
+ new File(logDirectory + File.separator + WALFileUtils.getLogFileName(0, 0)).exists());
+ assertTrue(
+ new File(logDirectory + File.separator + WALFileUtils.getLogFileName(1, 0)).exists());
// check flush listeners
try {
for (WALFlushListener walFlushListener : walFlushListeners) {
diff --git a/server/src/test/java/org/apache/iotdb/db/wal/utils/WALFileUtilsTest.java b/server/src/test/java/org/apache/iotdb/db/wal/utils/WALFileUtilsTest.java
new file mode 100644
index 0000000000..007b40346b
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/wal/utils/WALFileUtilsTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.wal.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+
+public class WALFileUtilsTest {
+ @Test
+ public void binarySearchFileBySearchIndex01() {
+ File[] files =
+ new File[] {
+ new File(WALFileUtils.getLogFileName(0, 0)),
+ new File(WALFileUtils.getLogFileName(1, 5)),
+ new File(WALFileUtils.getLogFileName(2, 5)),
+ new File(WALFileUtils.getLogFileName(3, 12)),
+ new File(WALFileUtils.getLogFileName(4, 12))
+ };
+ int i = WALFileUtils.binarySearchFileBySearchIndex(files, 0);
+ Assert.assertEquals(-1, i);
+ }
+
+ @Test
+ public void binarySearchFileBySearchIndex02() {
+ File[] files =
+ new File[] {
+ new File(WALFileUtils.getLogFileName(0, 0)),
+ new File(WALFileUtils.getLogFileName(1, 5)),
+ new File(WALFileUtils.getLogFileName(2, 5)),
+ new File(WALFileUtils.getLogFileName(3, 12)),
+ new File(WALFileUtils.getLogFileName(4, 12))
+ };
+ int i = WALFileUtils.binarySearchFileBySearchIndex(files, 2);
+ Assert.assertEquals(0, i);
+ }
+
+ @Test
+ public void binarySearchFileBySearchIndex03() {
+ File[] files =
+ new File[] {
+ new File(WALFileUtils.getLogFileName(0, 0)),
+ new File(WALFileUtils.getLogFileName(1, 5)),
+ new File(WALFileUtils.getLogFileName(2, 5)),
+ new File(WALFileUtils.getLogFileName(3, 12)),
+ new File(WALFileUtils.getLogFileName(4, 12))
+ };
+ int i = WALFileUtils.binarySearchFileBySearchIndex(files, 5);
+ Assert.assertEquals(0, i);
+ }
+
+ @Test
+ public void binarySearchFileBySearchIndex04() {
+ File[] files =
+ new File[] {
+ new File(WALFileUtils.getLogFileName(0, 0)),
+ new File(WALFileUtils.getLogFileName(1, 5)),
+ new File(WALFileUtils.getLogFileName(2, 5)),
+ new File(WALFileUtils.getLogFileName(3, 12)),
+ new File(WALFileUtils.getLogFileName(4, 12))
+ };
+ int i = WALFileUtils.binarySearchFileBySearchIndex(files, 6);
+ Assert.assertEquals(2, i);
+ }
+
+ @Test
+ public void binarySearchFileBySearchIndex05() {
+ File[] files =
+ new File[] {
+ new File(WALFileUtils.getLogFileName(0, 0)),
+ new File(WALFileUtils.getLogFileName(1, 5)),
+ new File(WALFileUtils.getLogFileName(2, 5)),
+ new File(WALFileUtils.getLogFileName(3, 12)),
+ new File(WALFileUtils.getLogFileName(4, 12))
+ };
+ int i = WALFileUtils.binarySearchFileBySearchIndex(files, 10);
+ Assert.assertEquals(2, i);
+ }
+
+ @Test
+ public void binarySearchFileBySearchIndex06() {
+ File[] files =
+ new File[] {
+ new File(WALFileUtils.getLogFileName(0, 0)),
+ new File(WALFileUtils.getLogFileName(1, 5)),
+ new File(WALFileUtils.getLogFileName(2, 5)),
+ new File(WALFileUtils.getLogFileName(3, 12)),
+ new File(WALFileUtils.getLogFileName(4, 12))
+ };
+ int i = WALFileUtils.binarySearchFileBySearchIndex(files, 12);
+ Assert.assertEquals(2, i);
+ }
+
+ @Test
+ public void binarySearchFileBySearchIndex07() {
+ File[] files =
+ new File[] {
+ new File(WALFileUtils.getLogFileName(0, 0)),
+ new File(WALFileUtils.getLogFileName(1, 5)),
+ new File(WALFileUtils.getLogFileName(2, 5)),
+ new File(WALFileUtils.getLogFileName(3, 12)),
+ new File(WALFileUtils.getLogFileName(4, 12))
+ };
+ int i = WALFileUtils.binarySearchFileBySearchIndex(files, Long.MAX_VALUE);
+ Assert.assertEquals(4, i);
+ }
+
+ @Test
+ public void binarySearchFileBySearchIndex08() {
+ File[] files =
+ new File[] {
+ new File(WALFileUtils.getLogFileName(0, 0)),
+ new File(WALFileUtils.getLogFileName(1, 100)),
+ new File(WALFileUtils.getLogFileName(2, 200)),
+ new File(WALFileUtils.getLogFileName(3, 300)),
+ new File(WALFileUtils.getLogFileName(4, 400))
+ };
+ int i = WALFileUtils.binarySearchFileBySearchIndex(files, 10);
+ Assert.assertEquals(0, i);
+ }
+
+ @Test
+ public void binarySearchFileBySearchIndex09() {
+ File[] files =
+ new File[] {
+ new File(WALFileUtils.getLogFileName(0, 0)),
+ new File(WALFileUtils.getLogFileName(1, 100)),
+ new File(WALFileUtils.getLogFileName(2, 200)),
+ new File(WALFileUtils.getLogFileName(3, 300)),
+ new File(WALFileUtils.getLogFileName(4, 400))
+ };
+ int i = WALFileUtils.binarySearchFileBySearchIndex(files, 250);
+ Assert.assertEquals(2, i);
+ }
+
+ @Test
+ public void binarySearchFileBySearchIndex10() {
+ File[] files =
+ new File[] {
+ new File(WALFileUtils.getLogFileName(0, 0)),
+ new File(WALFileUtils.getLogFileName(1, 0)),
+ new File(WALFileUtils.getLogFileName(2, 0)),
+ new File(WALFileUtils.getLogFileName(3, 0)),
+ new File(WALFileUtils.getLogFileName(4, 5)),
+ new File(WALFileUtils.getLogFileName(5, 5)),
+ new File(WALFileUtils.getLogFileName(6, 5)),
+ new File(WALFileUtils.getLogFileName(7, 5)),
+ new File(WALFileUtils.getLogFileName(8, 12)),
+ new File(WALFileUtils.getLogFileName(9, 12)),
+ new File(WALFileUtils.getLogFileName(10, 12)),
+ new File(WALFileUtils.getLogFileName(11, 12))
+ };
+ int i = WALFileUtils.binarySearchFileBySearchIndex(files, 5);
+ Assert.assertEquals(3, i);
+ }
+
+ @Test
+ public void binarySearchFileBySearchIndex11() {
+ File[] files =
+ new File[] {
+ new File(WALFileUtils.getLogFileName(0, 0)),
+ new File(WALFileUtils.getLogFileName(1, 0)),
+ new File(WALFileUtils.getLogFileName(2, 0)),
+ new File(WALFileUtils.getLogFileName(3, 0)),
+ new File(WALFileUtils.getLogFileName(4, 5)),
+ new File(WALFileUtils.getLogFileName(5, 5)),
+ new File(WALFileUtils.getLogFileName(6, 5)),
+ new File(WALFileUtils.getLogFileName(7, 5)),
+ new File(WALFileUtils.getLogFileName(8, 12)),
+ new File(WALFileUtils.getLogFileName(9, 12)),
+ new File(WALFileUtils.getLogFileName(10, 12)),
+ new File(WALFileUtils.getLogFileName(11, 12))
+ };
+ int i = WALFileUtils.binarySearchFileBySearchIndex(files, 6);
+ Assert.assertEquals(7, i);
+ }
+
+ @Test
+ public void binarySearchFileBySearchIndex12() {
+ File[] files =
+ new File[] {
+ new File(WALFileUtils.getLogFileName(0, 0)),
+ new File(WALFileUtils.getLogFileName(1, 0)),
+ new File(WALFileUtils.getLogFileName(2, 0)),
+ new File(WALFileUtils.getLogFileName(3, 0)),
+ new File(WALFileUtils.getLogFileName(4, 5)),
+ new File(WALFileUtils.getLogFileName(5, 5)),
+ new File(WALFileUtils.getLogFileName(6, 5)),
+ new File(WALFileUtils.getLogFileName(7, 5)),
+ new File(WALFileUtils.getLogFileName(8, 12)),
+ new File(WALFileUtils.getLogFileName(9, 12)),
+ new File(WALFileUtils.getLogFileName(10, 12)),
+ new File(WALFileUtils.getLogFileName(11, 12))
+ };
+ int i = WALFileUtils.binarySearchFileBySearchIndex(files, 12);
+ Assert.assertEquals(7, i);
+ }
+}