You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/12/19 12:21:09 UTC
[iotdb] branch master updated: [IOTDB-2112] Do not use .tsfile when compacting the target file (#4549)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 385f34b [IOTDB-2112] Do not use .tsfile when compacting the target file (#4549)
385f34b is described below
commit 385f34b7ff266dd4468d140269bf47a3959a8baf
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Sun Dec 19 20:20:39 2021 +0800
[IOTDB-2112] Do not use .tsfile when compacting the target file (#4549)
---
.../resources/conf/iotdb-engine.properties | 4 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 4 +-
.../org/apache/iotdb/db/conf/IoTDBConstant.java | 3 +
.../db/engine/compaction/TsFileIdentifier.java | 12 +
.../InnerSpaceCompactionExceptionHandler.java | 63 +--
.../SizeTieredCompactionRecoverTask.java | 210 ++++++----
.../inner/sizetiered/SizeTieredCompactionTask.java | 46 +--
.../inner/utils/InnerSpaceCompactionUtils.java | 87 +++-
.../engine/storagegroup/TsFileNameGenerator.java | 4 +-
.../db/engine/storagegroup/TsFileResource.java | 1 +
.../inner/AbstractInnerSpaceCompactionTest.java | 42 +-
.../compaction/inner/InnerSeqCompactionTest.java | 3 +
.../inner/InnerSpaceCompactionExceptionTest.java | 9 +-
.../inner/InnerSpaceCompactionUtilsTest.java | 3 +-
.../compaction/inner/InnerUnseqCompactionTest.java | 1 +
.../SizeTieredCompactionRecoverTest.java | 448 ++++++++++++++++++++-
.../recover/SizeTieredCompactionRecoverTest.java | 47 ++-
.../utils/CompactionFileGeneratorUtils.java | 2 +-
18 files changed, 790 insertions(+), 199 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 6d21884..c748c5b 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -440,13 +440,13 @@ timestamp_precision=ms
# The max file when selecting compaction candidate file
# Datatype: int
-# max_compaction_candidate_file_num=10
+# max_compaction_candidate_file_num=30
# The max open file num in each cross space compaction task.
# We use the unseq file num as the open file num
# This parameters have to be much smaller than the permitted max open file num of each process controlled by operator system(65535 in most system)
# Datatype: int
-# max_open_file_num_in_cross_space_compaction=2000
+# max_open_file_num_in_cross_space_compaction=100
# When the average point number of chunks in the target file reaches this, merge the file to the top level.
# During a merge, if a chunk with less number of points than this parameter, the chunk will be
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 5f58f0b..27f99bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -377,7 +377,7 @@ public class IoTDBConfig {
private long targetCompactionFileSize = 2147483648L;
/** The max candidate file num in compaction */
- private int maxCompactionCandidateFileNum = 10;
+ private int maxCompactionCandidateFileNum = 30;
/**
* When merge point number reaches this, merge the files to the last level. During a merge, if a
* chunk with less number of chunks than this parameter, the chunk will be merged with its
@@ -402,7 +402,7 @@ public class IoTDBConfig {
* num # This parameters have to be much smaller than the permitted max open file num of each
* process controlled by operator system(65535 in most system).
*/
- private int maxOpenFileNumInCrossSpaceCompaction = 2000;
+ private int maxOpenFileNumInCrossSpaceCompaction = 100;
/** whether to cache meta data(ChunkMetaData and TsFileMetaData) or not. */
private boolean metaDataCacheEnable = true;
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index 5ced5eb..6a99068 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -169,4 +169,7 @@ public class IoTDBConstant {
public static final int FILE_NAME_SUFFIX_MERGECNT_INDEX = 2;
public static final int FILE_NAME_SUFFIX_UNSEQMERGECNT_INDEX = 3;
public static final String FILE_NAME_SUFFIX_SEPARATOR = "\\.";
+
+ // compaction
+ public static final String COMPACTION_TMP_FILE_SUFFIX = ".target";
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileIdentifier.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileIdentifier.java
index 6cf9121..daf8808 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileIdentifier.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileIdentifier.java
@@ -166,6 +166,18 @@ public class TsFileIdentifier {
return filename;
}
+ public String getFilePath() {
+ return (sequence ? IoTDBConstant.SEQUENCE_FLODER_NAME : IoTDBConstant.UNSEQUENCE_FLODER_NAME)
+ + File.separator
+ + logicalStorageGroupName
+ + File.separator
+ + virtualStorageGroupId
+ + File.separator
+ + timePartitionId
+ + File.separator
+ + filename;
+ }
+
public String getLogicalStorageGroupName() {
return logicalStorageGroupName;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionHandler.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionHandler.java
index abcb489..002723c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionHandler.java
@@ -18,13 +18,13 @@
*/
package org.apache.iotdb.db.engine.compaction.inner;
+import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.inner.utils.InnerSpaceCompactionUtils;
-import org.apache.iotdb.db.engine.modification.Modification;
-import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.apache.commons.io.FileUtils;
@@ -34,7 +34,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
/**
@@ -71,7 +70,7 @@ public class InnerSpaceCompactionExceptionHandler {
if (allSourceFileExist) {
handleSuccess =
handleWhenAllSourceFilesExist(
- fullStorageGroupName, targetTsFile, selectedTsFileResourceList, tsFileResourceList);
+ fullStorageGroupName, targetTsFile, selectedTsFileResourceList);
} else {
// some source file does not exists
// it means we start to delete source file
@@ -134,17 +133,44 @@ public class InnerSpaceCompactionExceptionHandler {
return allSourceFileExist;
}
- private static boolean handleWhenAllSourceFilesExist(
+ public static boolean handleWhenAllSourceFilesExist(
String fullStorageGroupName,
TsFileResource targetTsFile,
- List<TsFileResource> selectedTsFileResourceList,
- TsFileResourceList tsFileResourceList) {
+ List<TsFileResource> selectedTsFileResourceList) {
// all source file exists, delete the target file
LOGGER.info(
"{} [Compaction][ExceptionHandler] all source files {} exists, delete target file {}",
fullStorageGroupName,
selectedTsFileResourceList,
targetTsFile);
+ TsFileResource tmpTargetTsFile;
+ if (targetTsFile.getTsFilePath().endsWith(IoTDBConstant.COMPACTION_TMP_FILE_SUFFIX)) {
+ tmpTargetTsFile = targetTsFile;
+ targetTsFile =
+ new TsFileResource(
+ new File(
+ tmpTargetTsFile
+ .getTsFilePath()
+ .replace(
+ IoTDBConstant.COMPACTION_TMP_FILE_SUFFIX, TsFileConstant.TSFILE_SUFFIX)));
+ } else {
+ tmpTargetTsFile =
+ new TsFileResource(
+ new File(
+ targetTsFile
+ .getTsFilePath()
+ .replace(
+ TsFileConstant.TSFILE_SUFFIX, IoTDBConstant.COMPACTION_TMP_FILE_SUFFIX)));
+ }
+ if (!tmpTargetTsFile.remove()) {
+ // failed to remove tmp target tsfile
+ // system should not carry out the subsequent compaction in case of data redundant
+ LOGGER.warn(
+ "{} [Compaction][ExceptionHandler] failed to remove target file {}",
+ fullStorageGroupName,
+ tmpTargetTsFile);
+ return false;
+ }
if (!targetTsFile.remove()) {
// failed to remove target tsfile
// system should not carry out the subsequent compaction in case of data redundant
@@ -156,22 +182,7 @@ public class InnerSpaceCompactionExceptionHandler {
}
// deal with compaction modification
try {
- for (TsFileResource sourceFile : selectedTsFileResourceList) {
- if (sourceFile.getCompactionModFile().exists()) {
- ModificationFile compactionModificationFile =
- ModificationFile.getCompactionMods(sourceFile);
- Collection<Modification> newModification = compactionModificationFile.getModifications();
- compactionModificationFile.close();
- // write the modifications to a new modification file
- sourceFile.resetModFile();
- try (ModificationFile newModificationFile = sourceFile.getModFile()) {
- for (Modification modification : newModification) {
- newModificationFile.write(modification);
- }
- }
- FileUtils.delete(new File(ModificationFile.getCompactionMods(sourceFile).getFilePath()));
- }
- }
+ InnerSpaceCompactionUtils.appendNewModificationsToOldModsFile(selectedTsFileResourceList);
} catch (Throwable e) {
LOGGER.error(
"{} Exception occurs while handling exception, set allowCompaction to false",
@@ -210,12 +221,6 @@ public class InnerSpaceCompactionExceptionHandler {
}
}
- if (targetTsFile.getModFile().exists()) {
- // if origin mods file exists, remove it, and generate a new mods file
- FileUtils.delete(new File(targetTsFile.getModFile().getFilePath()));
- }
-
- InnerSpaceCompactionUtils.combineModsInCompaction(selectedTsFileResourceList, targetTsFile);
InnerSpaceCompactionUtils.deleteModificationForSourceFile(
selectedTsFileResourceList, fullStorageGroupName);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTask.java
index 21b359f..8e4fe4c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTask.java
@@ -18,14 +18,18 @@
*/
package org.apache.iotdb.db.engine.compaction.inner.sizetiered;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.TsFileIdentifier;
+import org.apache.iotdb.db.engine.compaction.inner.InnerSpaceCompactionExceptionHandler;
import org.apache.iotdb.db.engine.compaction.inner.utils.InnerSpaceCompactionUtils;
import org.apache.iotdb.db.engine.compaction.inner.utils.SizeTieredCompactionLogAnalyzer;
import org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,23 +70,24 @@ public class SizeTieredCompactionRecoverTask extends SizeTieredCompactionTask {
}
/**
- * Clear unfinished compaction task, there are several situations:
+ * We support tmp target file is xxx.target, target file is xxx.tsfile, resource file is
+ * xxx.tsfile.resource. To clear unfinished compaction task, there are several situations:
*
* <ol>
- * <li><b>Target file is uncompleted</b>: delete the target file and compaction log.
- * <li><b>Target file is completed, not all source files have been deleted</b>: delete the
- * source files and compaction logs
- * <li><b>Target file is completed, all source files have been deleted, compaction log file
- * exists</b>: delete the compaction log
- * <li><b>No compaction log file exists</b>: do nothing
+ * <li>Compaction log is incomplete, then delete it.
+ * <li>All source files exist, then delete tmp target file, target file, resource file, mods
+ * file of target file and compaction log if exist. Also append new modifications of all
+ * source files to corresponding mods file.
+ * <li>Not all source files exist, then delete the remaining source files, all mods files of
+ * each source file and compaction log.
* </ol>
*/
@Override
public void doCompaction() {
- // read log -> Set<Device> -> doCompaction -> clear
+ boolean handleSuccess = true;
+ LOGGER.info(
+ "{} [Compaction][Recover] compaction log is {}", fullStorageGroupName, compactionLogFile);
try {
- LOGGER.info(
- "{} [Compaction][Recover] compaction log is {}", fullStorageGroupName, compactionLogFile);
if (compactionLogFile.exists()) {
LOGGER.info(
"{}-{} [Compaction][Recover] compaction log file {} exists, start to recover it",
@@ -94,6 +99,8 @@ public class SizeTieredCompactionRecoverTask extends SizeTieredCompactionTask {
logAnalyzer.analyze();
List<TsFileIdentifier> sourceFileIdentifiers = logAnalyzer.getSourceFileInfos();
TsFileIdentifier targetFileIdentifier = logAnalyzer.getTargetFileInfo();
+
+ // compaction log file is incomplete
if (targetFileIdentifier == null || sourceFileIdentifiers.isEmpty()) {
LOGGER.info(
"{}-{} [Compaction][Recover] incomplete log file, abort recover",
@@ -101,86 +108,68 @@ public class SizeTieredCompactionRecoverTask extends SizeTieredCompactionTask {
virtualStorageGroup);
return;
}
- File targetFile = targetFileIdentifier.getFileFromDataDirs();
- if (targetFile == null) {
- // cannot find target file from data dirs
- LOGGER.info(
- "{}-{} [Compaction][Recover] cannot find target file {} from data dirs, abort recover",
- logicalStorageGroupName,
- virtualStorageGroup,
- targetFileIdentifier);
- return;
- }
- File resourceFile = new File(targetFile.getPath() + ".resource");
- RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetFile, false);
- writer.close();
- if (writer.hasCrashed()) {
- LOGGER.info(
- "{}-{} [Compaction][Recover] target file {} crash, start to delete it",
- logicalStorageGroupName,
- virtualStorageGroup,
- targetFile);
- // the target tsfile is crashed, it is not completed
- if (!targetFile.delete()) {
- LOGGER.error(
- "{}-{} [Compaction][Recover] fail to delete target file {}, this may cause data incorrectness",
- logicalStorageGroupName,
- virtualStorageGroup,
- targetFile);
+ // check is all source files existed
+ boolean isAllSourcesFileExisted = true;
+ for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
+ File sourceFile = sourceFileIdentifier.getFileFromDataDirs();
+ if (sourceFile == null) {
+ isAllSourcesFileExisted = false;
+ break;
}
- if (resourceFile.exists() && !resourceFile.delete()) {
- LOGGER.error(
- "{}-{} [Compaction][Recover] fail to delete target file {}, this may cause data incorrectness",
- logicalStorageGroupName,
- virtualStorageGroup,
- resourceFile);
+ }
+
+ if (isAllSourcesFileExisted) {
+ // xxx.target
+ File tmpTargetFile = targetFileIdentifier.getFileFromDataDirs();
+ // xxx.tsfile
+ File targetFile =
+ getFileFromDataDirs(
+ targetFileIdentifier
+ .getFilePath()
+ .replace(
+ IoTDBConstant.COMPACTION_TMP_FILE_SUFFIX, TsFileConstant.TSFILE_SUFFIX));
+ TsFileResource targetResource;
+ if (tmpTargetFile != null) {
+ targetResource = new TsFileResource(tmpTargetFile);
+ } else {
+ targetResource = new TsFileResource(targetFile);
}
- } else {
- // the target tsfile is completed
- LOGGER.info(
- "{}-{} [Compaction][Recover] target file {} is completed, delete source files {}",
- logicalStorageGroupName,
- virtualStorageGroup,
- targetFile,
- sourceFileIdentifiers);
- TsFileResource targetResource = new TsFileResource(targetFile);
- List<TsFileResource> sourceTsFileResources = new ArrayList<>();
+ List<TsFileResource> sourceResources = new ArrayList<>();
for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
- File sourceFile = sourceFileIdentifier.getFileFromDataDirs();
- if (sourceFile != null) {
- sourceTsFileResources.add(new TsFileResource(sourceFile));
- }
+ sourceResources.add(new TsFileResource(sourceFileIdentifier.getFileFromDataDirs()));
}
- ModificationFile modificationFileForTargetFile =
- ModificationFile.getNormalMods(targetResource);
- if (!modificationFileForTargetFile.exists()) {
- InnerSpaceCompactionUtils.combineModsInCompaction(
- sourceTsFileResources, targetResource);
- }
-
- InnerSpaceCompactionUtils.deleteTsFilesInDisk(
- sourceTsFileResources, fullStorageGroupName);
- InnerSpaceCompactionUtils.deleteModificationForSourceFile(
- sourceTsFileResources, logicalStorageGroupName + "-" + virtualStorageGroup);
+ handleSuccess =
+ InnerSpaceCompactionExceptionHandler.handleWhenAllSourceFilesExist(
+ fullStorageGroupName, targetResource, sourceResources);
+ } else {
+ handleSuccess = handleWithoutAllSourceFilesExist(sourceFileIdentifiers);
}
}
} catch (IOException e) {
LOGGER.error("recover inner space compaction error", e);
} finally {
- if (compactionLogFile.exists()) {
- if (!compactionLogFile.delete()) {
- LOGGER.warn(
- "{}-{} [Compaction][Recover] fail to delete {}",
- logicalStorageGroupName,
- virtualStorageGroup,
- compactionLogFile);
- } else {
- LOGGER.info(
- "{}-{} [Compaction][Recover] delete compaction log {}",
- logicalStorageGroupName,
- virtualStorageGroup,
- compactionLogFile);
+ if (!handleSuccess) {
+ LOGGER.error(
+ "{} [Compaction][Recover] Failed to recover compaction, set allowCompaction to false",
+ fullStorageGroupName);
+ tsFileManager.setAllowCompaction(false);
+ } else {
+ if (compactionLogFile.exists()) {
+ try {
+ LOGGER.info(
+ "{} [Compaction][Recover] Recover compaction successfully, delete log file {}",
+ fullStorageGroupName,
+ compactionLogFile);
+ FileUtils.delete(compactionLogFile);
+ } catch (IOException e) {
+ LOGGER.error(
+ "{} [Compaction][Recover] Exception occurs while deleting log file {}, set allowCompaction to false",
+ fullStorageGroupName,
+ compactionLogFile,
+ e);
+ tsFileManager.setAllowCompaction(false);
+ }
}
}
}
@@ -203,4 +192,63 @@ public class SizeTieredCompactionRecoverTask extends SizeTieredCompactionTask {
public boolean checkValidAndSetMerging() {
return compactionLogFile.exists();
}
+
+ private boolean handleWithoutAllSourceFilesExist(List<TsFileIdentifier> sourceFileIdentifiers) {
+ // some source files have been deleted, while .tsfile and .tsfile.resource must exist.
+ boolean handleSuccess = true;
+ List<TsFileResource> remainSourceTsFileResources = new ArrayList<>();
+ for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
+ File sourceFile = sourceFileIdentifier.getFileFromDataDirs();
+ if (sourceFile != null) {
+ remainSourceTsFileResources.add(new TsFileResource(sourceFile));
+ }
+ // delete .compaction.mods file and .mods file of all source files
+ File compactionModFile =
+ getFileFromDataDirs(
+ sourceFileIdentifier.getFilePath() + ModificationFile.COMPACTION_FILE_SUFFIX);
+ File modFile =
+ getFileFromDataDirs(sourceFileIdentifier.getFilePath() + ModificationFile.FILE_SUFFIX);
+ if (compactionModFile != null && !compactionModFile.delete()) {
+ LOGGER.error(
+ "{}-{} [Compaction][Recover] fail to delete target file {}, this may cause data incorrectness",
+ logicalStorageGroupName,
+ virtualStorageGroup,
+ compactionModFile);
+ handleSuccess = false;
+ }
+ if (modFile != null && !modFile.delete()) {
+ LOGGER.error(
+ "{}-{} [Compaction][Recover] fail to delete target file {}, this may cause data incorrectness",
+ logicalStorageGroupName,
+ virtualStorageGroup,
+ modFile);
+ handleSuccess = false;
+ }
+ }
+ // delete remaining source files
+ if (!InnerSpaceCompactionUtils.deleteTsFilesInDisk(
+ remainSourceTsFileResources, fullStorageGroupName)) {
+ LOGGER.error(
+ "{}-{} [Compaction][Recover] fail to delete remaining source files.",
+ logicalStorageGroupName,
+ virtualStorageGroup);
+ handleSuccess = false;
+ }
+ return handleSuccess;
+ }
+
+ /**
+ * This method find the File object of given filePath by searching it in every data directory. If
+ * the file is not found, it will return null.
+ */
+ private File getFileFromDataDirs(String filePath) {
+ String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+ for (String dataDir : dataDirs) {
+ File f = new File(dataDir, filePath);
+ if (f.exists()) {
+ return f;
+ }
+ }
+ return null;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
index e2ab204..fb9fe11 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
@@ -74,6 +74,7 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
long startTime = System.currentTimeMillis();
// get resource of target file
String dataDirectory = selectedTsFileResourceList.get(0).getTsFile().getParent();
+ // Here is tmpTargetFile, which is xxx.target
String targetFileName =
TsFileNameGenerator.getInnerCompactionFileName(selectedTsFileResourceList, sequence)
.getName();
@@ -119,6 +120,7 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
// carry out the compaction
InnerSpaceCompactionUtils.compact(
targetTsFileResource, selectedTsFileResourceList, fullStorageGroupName, true);
+ InnerSpaceCompactionUtils.moveTargetFile(targetTsFileResource, fullStorageGroupName);
LOGGER.info(
"{} [SizeTiredCompactionTask] compact finish, close the logger", fullStorageGroupName);
sizeTieredCompactionLogger.close();
@@ -140,10 +142,16 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
selectedTsFileResourceList.get(i).writeLock();
isHoldingWriteLock[i] = true;
}
+
LOGGER.info(
- "{} [Compaction] Get the write lock of files, try to get the write lock of TsFileResourceList",
+ "{} [SizeTiredCompactionTask] old file deleted, start to rename mods file",
fullStorageGroupName);
+ InnerSpaceCompactionUtils.combineModsInCompaction(
+ selectedTsFileResourceList, targetTsFileResource);
+ LOGGER.info(
+ "{} [Compaction] Get the write lock of files, try to get the write lock of TsFileResourceList",
+ fullStorageGroupName);
// get write lock for TsFileResource list with timeout
try {
tsFileManager.writeLockWithTimeout("size-tired compaction", 60_000);
@@ -161,29 +169,20 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
fullStorageGroupName));
}
- try {
- LOGGER.info(
- "{} [SizeTiredCompactionTask] old file deleted, start to rename mods file",
- fullStorageGroupName);
- InnerSpaceCompactionUtils.combineModsInCompaction(
- selectedTsFileResourceList, targetTsFileResource);
+ // delete the old files
+ InnerSpaceCompactionUtils.deleteTsFilesInDisk(
+ selectedTsFileResourceList, fullStorageGroupName);
+ InnerSpaceCompactionUtils.deleteModificationForSourceFile(
+ selectedTsFileResourceList, fullStorageGroupName);
- // delete the old files
- InnerSpaceCompactionUtils.deleteTsFilesInDisk(
- selectedTsFileResourceList, fullStorageGroupName);
- InnerSpaceCompactionUtils.deleteModificationForSourceFile(
- selectedTsFileResourceList, fullStorageGroupName);
- // replace the old files with new file, the new is in same position as the old
- for (TsFileResource resource : selectedTsFileResourceList) {
- TsFileResourceManager.getInstance().removeTsFileResource(resource);
- }
- tsFileResourceList.insertBefore(selectedTsFileResourceList.get(0), targetTsFileResource);
- TsFileResourceManager.getInstance().registerSealedTsFileResource(targetTsFileResource);
- for (TsFileResource resource : selectedTsFileResourceList) {
- tsFileResourceList.remove(resource);
- }
- } finally {
- tsFileManager.writeUnlock();
+ // replace the old files with new file, the new is in same position as the old
+ for (TsFileResource resource : selectedTsFileResourceList) {
+ TsFileResourceManager.getInstance().removeTsFileResource(resource);
+ }
+ tsFileResourceList.insertBefore(selectedTsFileResourceList.get(0), targetTsFileResource);
+ TsFileResourceManager.getInstance().registerSealedTsFileResource(targetTsFileResource);
+ for (TsFileResource resource : selectedTsFileResourceList) {
+ tsFileResourceList.remove(resource);
}
long costTime = System.currentTimeMillis() - startTime;
@@ -213,6 +212,7 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
tsFileManager,
tsFileResourceList);
} finally {
+ tsFileManager.writeUnlock();
for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
if (isHoldingReadLock[i]) {
selectedTsFileResourceList.get(i).readUnlock();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
index af2a114..4dcb7c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.engine.compaction.inner.utils;
+import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.cross.inplace.manage.CrossSpaceMergeResource;
import org.apache.iotdb.db.engine.compaction.cross.inplace.manage.MergeManager;
@@ -35,7 +36,9 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
@@ -48,6 +51,7 @@ import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.commons.collections4.keyvalue.DefaultMapEntry;
+import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -296,7 +300,7 @@ public class InnerSpaceCompactionUtils {
}
while (hasNextChunkMetadataList(chunkMetadataListIteratorCache.values())) {
String lastSensor = null;
- Set<String> allSensors = new HashSet<>();
+ Set<String> candidateSensors = new HashSet<>();
for (Entry<TsFileSequenceReader, Map<String, List<ChunkMetadata>>>
chunkMetadataListCacheForMergeEntry : chunkMetadataListCacheForMerge.entrySet()) {
TsFileSequenceReader reader = chunkMetadataListCacheForMergeEntry.getKey();
@@ -320,15 +324,15 @@ public class InnerSpaceCompactionUtils {
}
}
// get all sensor used later
- allSensors.addAll(sensorChunkMetadataListMap.keySet());
+ candidateSensors.addAll(sensorChunkMetadataListMap.keySet());
}
// if there is no more chunkMetaData, merge all the sensors
if (!hasNextChunkMetadataList(chunkMetadataListIteratorCache.values())) {
- lastSensor = Collections.max(allSensors);
+ lastSensor = Collections.max(candidateSensors);
}
- for (String sensor : allSensors) {
+ for (String sensor : candidateSensors) {
if (sensor.compareTo(lastSensor) <= 0) {
Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataListMap =
new TreeMap<>(
@@ -454,10 +458,7 @@ public class InnerSpaceCompactionUtils {
for (TsFileResource tsFileResource : tsFileResources) {
targetResource.updatePlanIndexes(tsFileResource);
}
- targetResource.serialize();
writer.endFile();
- targetResource.close();
-
} finally {
for (TsFileSequenceReader reader : tsFileSequenceReaderMap.values()) {
reader.close();
@@ -540,14 +541,18 @@ public class InnerSpaceCompactionUtils {
new ModificationFile(fileName + ModificationFile.FILE_SUFFIX).getModifications()));
}
- public static void deleteTsFilesInDisk(
+ public static boolean deleteTsFilesInDisk(
Collection<TsFileResource> mergeTsFiles, String storageGroupName) {
logger.info("{} [Compaction] Compaction starts to delete real file ", storageGroupName);
+ boolean result = true;
for (TsFileResource mergeTsFile : mergeTsFiles) {
- deleteTsFile(mergeTsFile);
+ if (!deleteTsFile(mergeTsFile)) {
+ result = false;
+ }
logger.info(
"{} [Compaction] delete TsFile {}", storageGroupName, mergeTsFile.getTsFilePath());
}
+ return result;
}
/** Delete all modification files for source files */
@@ -569,6 +574,35 @@ public class InnerSpaceCompactionUtils {
}
/**
+ * This method is called to recover modifications while an exception occurs during compaction. It
+ * append new modifications of each selected tsfile to its corresponding old mods file and delete
+ * the compaction mods file.
+ *
+ * @param selectedTsFileResources
+ * @throws IOException
+ */
+ public static void appendNewModificationsToOldModsFile(
+ List<TsFileResource> selectedTsFileResources) throws IOException {
+ for (TsFileResource sourceFile : selectedTsFileResources) {
+ // if there are modifications to this seqFile during compaction
+ if (sourceFile.getCompactionModFile().exists()) {
+ ModificationFile compactionModificationFile =
+ ModificationFile.getCompactionMods(sourceFile);
+ Collection<Modification> newModification = compactionModificationFile.getModifications();
+ compactionModificationFile.close();
+ sourceFile.resetModFile();
+ // write the new modifications to its old modification file
+ try (ModificationFile oldModificationFile = sourceFile.getModFile()) {
+ for (Modification modification : newModification) {
+ oldModificationFile.write(modification);
+ }
+ }
+ FileUtils.delete(new File(ModificationFile.getCompactionMods(sourceFile).getFilePath()));
+ }
+ }
+ }
+
+ /**
* Collect all the compaction modification files of source files, and combines them as the
* modification file of target file.
*/
@@ -593,14 +627,16 @@ public class InnerSpaceCompactionUtils {
}
}
- public static void deleteTsFile(TsFileResource seqFile) {
+ public static boolean deleteTsFile(TsFileResource seqFile) {
try {
FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath());
seqFile.setDeleted(true);
seqFile.delete();
} catch (IOException e) {
logger.error(e.getMessage(), e);
+ return false;
}
+ return true;
}
public static ICrossSpaceMergeFileSelector getCrossSpaceFileSelector(
@@ -625,4 +661,35 @@ public class InnerSpaceCompactionUtils {
return new File[0];
}
}
+
+ /**
+ * Update the targetResource. Move xxx.target to xxx.tsfile and serialize xxx.tsfile.resource .
+ *
+ * @param targetResource the old tsfile to be moved, which is xxx.target
+ */
+ public static void moveTargetFile(TsFileResource targetResource, String fullStorageGroupName)
+ throws IOException {
+ if (!targetResource.getTsFilePath().endsWith(IoTDBConstant.COMPACTION_TMP_FILE_SUFFIX)) {
+ logger.warn(
+ "{} [Compaction] Tmp target tsfile {} should be end with {}",
+ fullStorageGroupName,
+ targetResource.getTsFilePath(),
+ IoTDBConstant.COMPACTION_TMP_FILE_SUFFIX);
+ return;
+ }
+ File oldFile = targetResource.getTsFile();
+
+ // move TsFile and delete old tsfile
+ String newFilePath =
+ targetResource
+ .getTsFilePath()
+ .replace(IoTDBConstant.COMPACTION_TMP_FILE_SUFFIX, TsFileConstant.TSFILE_SUFFIX);
+ File newFile = new File(newFilePath);
+ FSFactoryProducer.getFSFactory().moveFile(oldFile, newFile);
+
+ // serialize xxx.tsfile.resource
+ targetResource.setFile(newFile);
+ targetResource.serialize();
+ targetResource.close();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java
index 2253f61..cea32c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java
@@ -184,7 +184,7 @@ public class TsFileNameGenerator {
+ (maxInnerMergeCount + 1)
+ FILE_NAME_SEPARATOR
+ maxCrossMergeCount
- + TSFILE_SUFFIX)
+ + IoTDBConstant.COMPACTION_TMP_FILE_SUFFIX)
: new File(
tsFileResources.get(0).getTsFile().getParent(),
maxTime
@@ -194,7 +194,7 @@ public class TsFileNameGenerator {
+ (maxInnerMergeCount + 1)
+ FILE_NAME_SEPARATOR
+ maxCrossMergeCount
- + TSFILE_SUFFIX);
+ + IoTDBConstant.COMPACTION_TMP_FILE_SUFFIX);
}
public static class TsFileName {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index ab2f0e1..59e83ba 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -479,6 +479,7 @@ public class TsFileResource {
public boolean removeResourceFile() {
try {
fsFactory.deleteIfExists(fsFactory.getFile(file.getPath() + RESOURCE_SUFFIX));
+ fsFactory.deleteIfExists(fsFactory.getFile(file.getPath() + RESOURCE_SUFFIX + TEMP_SUFFIX));
} catch (IOException e) {
logger.error("TsFileResource {} cannot be deleted: {}", file, e.getMessage());
return false;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/AbstractInnerSpaceCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/AbstractInnerSpaceCompactionTest.java
index d382a71..7d09e33 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/AbstractInnerSpaceCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/AbstractInnerSpaceCompactionTest.java
@@ -226,31 +226,31 @@ public abstract class AbstractInnerSpaceCompactionTest {
void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum, long valueOffset)
throws IOException, WriteProcessException {
- TsFileWriter fileWriter = new TsFileWriter(tsFileResource.getTsFile());
- for (String deviceId : deviceIds) {
- for (UnaryMeasurementSchema measurementSchema : measurementSchemas) {
- fileWriter.registerTimeseries(new Path(deviceId), measurementSchema);
- }
- }
- for (long i = timeOffset; i < timeOffset + ptNum; i++) {
- for (int j = 0; j < deviceNum; j++) {
- TSRecord record = new TSRecord(i, deviceIds[j]);
- for (int k = 0; k < measurementNum; k++) {
- record.addTuple(
- DataPoint.getDataPoint(
- measurementSchemas[k].getType(),
- measurementSchemas[k].getMeasurementId(),
- String.valueOf(i + valueOffset)));
+ try (TsFileWriter fileWriter = new TsFileWriter(tsFileResource.getTsFile()); ) {
+ for (String deviceId : deviceIds) {
+ for (UnaryMeasurementSchema measurementSchema : measurementSchemas) {
+ fileWriter.registerTimeseries(new Path(deviceId), measurementSchema);
}
- fileWriter.write(record);
- tsFileResource.updateStartTime(deviceIds[j], i);
- tsFileResource.updateEndTime(deviceIds[j], i);
}
- if ((i + 1) % flushInterval == 0) {
- fileWriter.flushAllChunkGroups();
+ for (long i = timeOffset; i < timeOffset + ptNum; i++) {
+ for (int j = 0; j < deviceNum; j++) {
+ TSRecord record = new TSRecord(i, deviceIds[j]);
+ for (int k = 0; k < measurementNum; k++) {
+ record.addTuple(
+ DataPoint.getDataPoint(
+ measurementSchemas[k].getType(),
+ measurementSchemas[k].getMeasurementId(),
+ String.valueOf(i + valueOffset)));
+ }
+ fileWriter.write(record);
+ tsFileResource.updateStartTime(deviceIds[j], i);
+ tsFileResource.updateEndTime(deviceIds[j], i);
+ }
+ if ((i + 1) % flushInterval == 0) {
+ fileWriter.flushAllChunkGroups();
+ }
}
}
- fileWriter.close();
}
@After
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java
index 15966cd..00658d5 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java
@@ -225,6 +225,7 @@ public class InnerSeqCompactionTest {
new SizeTieredCompactionLogger("target", COMPACTION_TEST_SG);
InnerSpaceCompactionUtils.compact(
targetTsFileResource, sourceResources, COMPACTION_TEST_SG, true);
+ InnerSpaceCompactionUtils.moveTargetFile(targetTsFileResource, COMPACTION_TEST_SG);
InnerSpaceCompactionUtils.combineModsInCompaction(
sourceResources, targetTsFileResource);
List<TsFileResource> targetTsFileResources = new ArrayList<>();
@@ -452,6 +453,7 @@ public class InnerSeqCompactionTest {
new SizeTieredCompactionLogger("target", COMPACTION_TEST_SG);
InnerSpaceCompactionUtils.compact(
targetTsFileResource, toMergeResources, COMPACTION_TEST_SG, true);
+ InnerSpaceCompactionUtils.moveTargetFile(targetTsFileResource, COMPACTION_TEST_SG);
InnerSpaceCompactionUtils.combineModsInCompaction(
toMergeResources, targetTsFileResource);
List<TsFileResource> targetTsFileResources = new ArrayList<>();
@@ -729,6 +731,7 @@ public class InnerSeqCompactionTest {
new SizeTieredCompactionLogger("target", COMPACTION_TEST_SG);
InnerSpaceCompactionUtils.compact(
targetTsFileResource, toMergeResources, COMPACTION_TEST_SG, true);
+ InnerSpaceCompactionUtils.moveTargetFile(targetTsFileResource, COMPACTION_TEST_SG);
InnerSpaceCompactionUtils.combineModsInCompaction(
toMergeResources, targetTsFileResource);
List<TsFileResource> targetTsFileResources = new ArrayList<>();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionTest.java
index e343ae3..ffd7290 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionTest.java
@@ -65,6 +65,7 @@ public class InnerSpaceCompactionExceptionTest extends AbstractInnerSpaceCompact
compactionLogger.logFileInfo(
SizeTieredCompactionLogger.TARGET_INFO, targetResource.getTsFile());
InnerSpaceCompactionUtils.compact(targetResource, seqResources, COMPACTION_TEST_SG, true);
+ InnerSpaceCompactionUtils.moveTargetFile(targetResource, COMPACTION_TEST_SG);
try (FileOutputStream os = new FileOutputStream(targetResource.getTsFile(), true);
FileChannel channel = os.getChannel()) {
channel.truncate(targetResource.getTsFileSize() - 10);
@@ -113,6 +114,7 @@ public class InnerSpaceCompactionExceptionTest extends AbstractInnerSpaceCompact
compactionLogger.logFileInfo(
SizeTieredCompactionLogger.TARGET_INFO, targetResource.getTsFile());
InnerSpaceCompactionUtils.compact(targetResource, seqResources, COMPACTION_TEST_SG, true);
+ InnerSpaceCompactionUtils.moveTargetFile(targetResource, COMPACTION_TEST_SG);
compactionLogger.close();
InnerSpaceCompactionExceptionHandler.handleException(
COMPACTION_TEST_SG,
@@ -157,6 +159,7 @@ public class InnerSpaceCompactionExceptionTest extends AbstractInnerSpaceCompact
compactionLogger.logFileInfo(
SizeTieredCompactionLogger.TARGET_INFO, targetResource.getTsFile());
InnerSpaceCompactionUtils.compact(targetResource, seqResources, COMPACTION_TEST_SG, true);
+ InnerSpaceCompactionUtils.moveTargetFile(targetResource, COMPACTION_TEST_SG);
seqResources.get(0).remove();
compactionLogger.close();
InnerSpaceCompactionExceptionHandler.handleException(
@@ -205,6 +208,7 @@ public class InnerSpaceCompactionExceptionTest extends AbstractInnerSpaceCompact
compactionLogger.logFileInfo(
SizeTieredCompactionLogger.TARGET_INFO, targetResource.getTsFile());
InnerSpaceCompactionUtils.compact(targetResource, seqResources, COMPACTION_TEST_SG, true);
+ InnerSpaceCompactionUtils.moveTargetFile(targetResource, COMPACTION_TEST_SG);
seqResources.get(0).remove();
try (FileOutputStream os = new FileOutputStream(targetResource.getTsFile(), true);
FileChannel channel = os.getChannel()) {
@@ -257,6 +261,7 @@ public class InnerSpaceCompactionExceptionTest extends AbstractInnerSpaceCompact
compactionLogger.logFileInfo(
SizeTieredCompactionLogger.TARGET_INFO, targetResource.getTsFile());
InnerSpaceCompactionUtils.compact(targetResource, seqResources, COMPACTION_TEST_SG, true);
+ InnerSpaceCompactionUtils.moveTargetFile(targetResource, COMPACTION_TEST_SG);
for (int i = 0; i < seqResources.size(); i++) {
Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
deleteMap.put(
@@ -264,6 +269,7 @@ public class InnerSpaceCompactionExceptionTest extends AbstractInnerSpaceCompact
new Pair<>(i * ptNum, i * ptNum + 10));
CompactionFileGeneratorUtils.generateMods(deleteMap, seqResources.get(i), true);
}
+ InnerSpaceCompactionUtils.combineModsInCompaction(seqResources, targetResource);
seqResources.get(0).remove();
compactionLogger.close();
@@ -329,7 +335,7 @@ public class InnerSpaceCompactionExceptionTest extends AbstractInnerSpaceCompact
CompactionFileGeneratorUtils.generateMods(deleteMap, seqResources.get(i), false);
}
InnerSpaceCompactionUtils.compact(targetResource, seqResources, COMPACTION_TEST_SG, true);
-
+ InnerSpaceCompactionUtils.moveTargetFile(targetResource, COMPACTION_TEST_SG);
seqResources.get(0).remove();
compactionLogger.close();
@@ -387,6 +393,7 @@ public class InnerSpaceCompactionExceptionTest extends AbstractInnerSpaceCompact
CompactionFileGeneratorUtils.generateMods(deleteMap, seqResources.get(i), false);
}
InnerSpaceCompactionUtils.compact(targetResource, seqResources, COMPACTION_TEST_SG, true);
+ InnerSpaceCompactionUtils.moveTargetFile(targetResource, COMPACTION_TEST_SG);
for (int i = 0; i < seqResources.size(); i++) {
Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
deleteMap.put(
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionUtilsTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionUtilsTest.java
index 4191a40..1dfd83e 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionUtilsTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionUtilsTest.java
@@ -81,7 +81,7 @@ public class InnerSpaceCompactionUtilsTest extends InnerCompactionTest {
+ 1
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
- + ".tsfile")));
+ + IoTDBConstant.COMPACTION_TMP_FILE_SUFFIX)));
File targetFile =
new File(
TestConstant.getTestTsFileDir("root.compactionTest", 0, 0)
@@ -105,6 +105,7 @@ public class InnerSpaceCompactionUtilsTest extends InnerCompactionTest {
}
sizeTieredCompactionLogger.logSequence(true);
InnerSpaceCompactionUtils.compact(targetTsFileResource, seqResources, COMPACTION_TEST_SG, true);
+ InnerSpaceCompactionUtils.moveTargetFile(targetTsFileResource, COMPACTION_TEST_SG);
sizeTieredCompactionLogger.close();
Path path = new Path(deviceIds[0], measurementSchemas[0].getMeasurementId());
try (TsFileSequenceReader reader =
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java
index cbe519c..ed76b1a 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java
@@ -353,6 +353,7 @@ public class InnerUnseqCompactionTest {
new SizeTieredCompactionLogger("target", COMPACTION_TEST_SG);
InnerSpaceCompactionUtils.compact(
targetTsFileResource, toMergeResources, COMPACTION_TEST_SG, false);
+ InnerSpaceCompactionUtils.moveTargetFile(targetTsFileResource, COMPACTION_TEST_SG);
InnerSpaceCompactionUtils.combineModsInCompaction(
toMergeResources, targetTsFileResource);
List<TsFileResource> targetTsFileResources = new ArrayList<>();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java
index 58c9935..aed040d 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java
@@ -24,12 +24,15 @@ import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.inner.AbstractInnerSpaceCompactionTest;
import org.apache.iotdb.db.engine.compaction.inner.utils.InnerSpaceCompactionUtils;
import org.apache.iotdb.db.engine.compaction.inner.utils.SizeTieredCompactionLogger;
+import org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.SchemaTestUtils;
@@ -38,9 +41,11 @@ import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -51,7 +56,9 @@ import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import static org.apache.iotdb.db.engine.compaction.inner.utils.SizeTieredCompactionLogger.COMPACTION_LOG_NAME;
import static org.apache.iotdb.db.engine.compaction.inner.utils.SizeTieredCompactionLogger.SOURCE_INFO;
@@ -101,6 +108,7 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
}
}
tsFilesReader.close();
+ closeTsFileSequenceReader();
assertEquals(500, count);
TsFileResource targetTsFileResource =
@@ -115,7 +123,7 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
+ 1
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
- + ".tsfile")));
+ + IoTDBConstant.COMPACTION_TMP_FILE_SUFFIX)));
File compactionLogFile =
new File(
seqResources.get(0).getTsFile().getParent()
@@ -136,7 +144,7 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
COMPACTION_TEST_SG,
true);
compactionLogger.close();
-
+ InnerSpaceCompactionUtils.moveTargetFile(targetTsFileResource, COMPACTION_TEST_SG);
BufferedReader logReader = new BufferedReader(new FileReader(compactionLogFile));
List<String> logs = new ArrayList<>();
String line;
@@ -196,9 +204,417 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
}
}
tsFilesReader.close();
+ closeTsFileSequenceReader();
+ assertEquals(500, count);
+ }
+
+ @Test
+ public void testRecoverWithAllSourceFilesExisted() throws Exception {
+ TsFileManager tsFileManager =
+ new TsFileManager(COMPACTION_TEST_SG, "0", tempSGDir.getAbsolutePath());
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ MeasurementPath path =
+ SchemaTestUtils.getMeasurementPath(
+ deviceIds[0]
+ + TsFileConstant.PATH_SEPARATOR
+ + measurementSchemas[0].getMeasurementId());
+ IBatchReader tsFilesReader =
+ new SeriesRawDataBatchReader(
+ path,
+ measurementSchemas[0].getType(),
+ EnvironmentUtils.TEST_QUERY_CONTEXT,
+ tsFileManager.getTsFileList(true),
+ new ArrayList<>(),
+ null,
+ null,
+ true);
+ int count = 0;
+ while (tsFilesReader.hasNextBatch()) {
+ BatchData batchData = tsFilesReader.nextBatch();
+ for (int i = 0; i < batchData.length(); i++) {
+ assertEquals(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), 0.001);
+ count++;
+ }
+ }
+ tsFilesReader.close();
+ closeTsFileSequenceReader();
+ assertEquals(500, count);
+
+ TsFileResource targetTsFileResource =
+ new TsFileResource(
+ new File(
+ SEQ_DIRS
+ + File.separator.concat(
+ 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 1
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.COMPACTION_TMP_FILE_SUFFIX)));
+ File compactionLogFile =
+ new File(
+ seqResources.get(0).getTsFile().getParent()
+ + File.separator
+ + targetTsFileResource.getTsFile().getName()
+ + COMPACTION_LOG_NAME);
+ SizeTieredCompactionLogger compactionLogger =
+ new SizeTieredCompactionLogger(compactionLogFile.getPath());
+ compactionLogger.logFileInfo(SOURCE_INFO, seqResources.get(0).getTsFile());
+ compactionLogger.logFileInfo(SOURCE_INFO, seqResources.get(1).getTsFile());
+ compactionLogger.logFileInfo(SOURCE_INFO, seqResources.get(2).getTsFile());
+ compactionLogger.logSequence(true);
+ deleteFileIfExists(targetTsFileResource.getTsFile());
+ compactionLogger.logFileInfo(TARGET_INFO, targetTsFileResource.getTsFile());
+ InnerSpaceCompactionUtils.compact(
+ targetTsFileResource,
+ new ArrayList<>(seqResources.subList(0, 3)),
+ COMPACTION_TEST_SG,
+ true);
+ compactionLogger.close();
+ new SizeTieredCompactionRecoverTask(
+ COMPACTION_LOG_NAME,
+ "0",
+ 0,
+ compactionLogFile,
+ tempSGDir.getAbsolutePath(),
+ true,
+ CompactionTaskManager.currentTaskNum)
+ .call();
+ // all source file should still exist
+ Assert.assertTrue(seqResources.get(0).getTsFile().exists());
+ Assert.assertTrue(seqResources.get(1).getTsFile().exists());
+ Assert.assertTrue(seqResources.get(2).getTsFile().exists());
+ // tmp target file, target file and target resource file should be deleted
+ Assert.assertFalse(targetTsFileResource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(
+ targetTsFileResource
+ .getTsFilePath()
+ .replace(
+ IoTDBConstant.COMPACTION_TMP_FILE_SUFFIX, TsFileConstant.TSFILE_SUFFIX))
+ .exists());
+ Assert.assertFalse(
+ new File(targetTsFileResource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+
+ path =
+ SchemaTestUtils.getMeasurementPath(
+ deviceIds[0]
+ + TsFileConstant.PATH_SEPARATOR
+ + measurementSchemas[0].getMeasurementId());
+ System.out.println(tsFileManager.getTsFileList(true));
+ tsFilesReader =
+ new SeriesRawDataBatchReader(
+ path,
+ measurementSchemas[0].getType(),
+ EnvironmentUtils.TEST_QUERY_CONTEXT,
+ tsFileManager.getTsFileList(true),
+ new ArrayList<>(),
+ null,
+ null,
+ true);
+ count = 0;
+ while (tsFilesReader.hasNextBatch()) {
+ BatchData batchData = tsFilesReader.nextBatch();
+ for (int i = 0; i < batchData.length(); i++) {
+ assertEquals(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), 0.001);
+ count++;
+ }
+ }
+ tsFilesReader.close();
+ closeTsFileSequenceReader();
+ assertEquals(500, count);
+ }
+
+ @Test
+ public void testRecoverWithoutAllSourceFilesExisted() throws Exception {
+ TsFileManager tsFileManager =
+ new TsFileManager(COMPACTION_TEST_SG, "0", tempSGDir.getAbsolutePath());
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ MeasurementPath path =
+ SchemaTestUtils.getMeasurementPath(
+ deviceIds[0]
+ + TsFileConstant.PATH_SEPARATOR
+ + measurementSchemas[0].getMeasurementId());
+ IBatchReader tsFilesReader =
+ new SeriesRawDataBatchReader(
+ path,
+ measurementSchemas[0].getType(),
+ EnvironmentUtils.TEST_QUERY_CONTEXT,
+ tsFileManager.getTsFileList(true),
+ new ArrayList<>(),
+ null,
+ null,
+ true);
+ int count = 0;
+ while (tsFilesReader.hasNextBatch()) {
+ BatchData batchData = tsFilesReader.nextBatch();
+ for (int i = 0; i < batchData.length(); i++) {
+ assertEquals(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), 0.001);
+ count++;
+ }
+ }
+ tsFilesReader.close();
+ closeTsFileSequenceReader();
+ assertEquals(500, count);
+
+ TsFileResource targetTsFileResource =
+ new TsFileResource(
+ new File(
+ SEQ_DIRS
+ + File.separator.concat(
+ 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 1
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.COMPACTION_TMP_FILE_SUFFIX)));
+ File compactionLogFile =
+ new File(
+ seqResources.get(0).getTsFile().getParent()
+ + File.separator
+ + targetTsFileResource.getTsFile().getName()
+ + COMPACTION_LOG_NAME);
+ SizeTieredCompactionLogger compactionLogger =
+ new SizeTieredCompactionLogger(compactionLogFile.getPath());
+ compactionLogger.logFileInfo(SOURCE_INFO, seqResources.get(0).getTsFile());
+ compactionLogger.logFileInfo(SOURCE_INFO, seqResources.get(1).getTsFile());
+ compactionLogger.logFileInfo(SOURCE_INFO, seqResources.get(2).getTsFile());
+ compactionLogger.logSequence(true);
+ deleteFileIfExists(targetTsFileResource.getTsFile());
+ compactionLogger.logFileInfo(TARGET_INFO, targetTsFileResource.getTsFile());
+ InnerSpaceCompactionUtils.compact(
+ targetTsFileResource,
+ new ArrayList<>(seqResources.subList(0, 3)),
+ COMPACTION_TEST_SG,
+ true);
+ InnerSpaceCompactionUtils.moveTargetFile(targetTsFileResource, COMPACTION_TEST_SG);
+ // delete one source file
+ seqResources.get(0).remove();
+ compactionLogger.close();
+ new SizeTieredCompactionRecoverTask(
+ COMPACTION_LOG_NAME,
+ "0",
+ 0,
+ compactionLogFile,
+ tempSGDir.getAbsolutePath(),
+ true,
+ CompactionTaskManager.currentTaskNum)
+ .call();
+ // all source files should be deleted
+ Assert.assertFalse(seqResources.get(0).getTsFile().exists());
+ Assert.assertFalse(seqResources.get(1).getTsFile().exists());
+ Assert.assertFalse(seqResources.get(2).getTsFile().exists());
+ // target file and target resource file should exist
+ Assert.assertTrue(targetTsFileResource.getTsFile().exists());
+ Assert.assertTrue(
+ new File(targetTsFileResource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ // tmp target file should be deleted
+ Assert.assertFalse(
+ new File(
+ targetTsFileResource
+ .getTsFilePath()
+ .replace(
+ TsFileConstant.TSFILE_SUFFIX, IoTDBConstant.COMPACTION_TMP_FILE_SUFFIX))
+ .exists());
+
+ tsFileManager.add(targetTsFileResource, true);
+ path =
+ SchemaTestUtils.getMeasurementPath(
+ deviceIds[0]
+ + TsFileConstant.PATH_SEPARATOR
+ + measurementSchemas[0].getMeasurementId());
+ System.out.println(tsFileManager.getTsFileList(true));
+ tsFilesReader =
+ new SeriesRawDataBatchReader(
+ path,
+ measurementSchemas[0].getType(),
+ EnvironmentUtils.TEST_QUERY_CONTEXT,
+ tsFileManager.getTsFileList(true),
+ new ArrayList<>(),
+ null,
+ null,
+ true);
+ count = 0;
+ while (tsFilesReader.hasNextBatch()) {
+ BatchData batchData = tsFilesReader.nextBatch();
+ for (int i = 0; i < batchData.length(); i++) {
+ assertEquals(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), 0.001);
+ count++;
+ }
+ }
+ tsFilesReader.close();
+ closeTsFileSequenceReader();
assertEquals(500, count);
}
+ /**
+ * All source files exist, each source file has compaction mods file which have been combined into
+ * new mods file of the target file.
+ */
+ @Test
+ public void testRecoverWithAllSourcesFileAndCompactonModFileExist() throws Exception {
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ String targetFileName =
+ TsFileNameGenerator.getInnerCompactionFileName(seqResources, true).getName();
+ TsFileResource targetResource =
+ new TsFileResource(new File(seqResources.get(0).getTsFile().getParent(), targetFileName));
+ File logFile =
+ new File(
+ targetResource.getTsFile().getPath() + SizeTieredCompactionLogger.COMPACTION_LOG_NAME);
+ SizeTieredCompactionLogger compactionLogger = new SizeTieredCompactionLogger(logFile.getPath());
+ for (TsFileResource source : seqResources) {
+ compactionLogger.logFileInfo(SizeTieredCompactionLogger.SOURCE_INFO, source.getTsFile());
+ }
+ compactionLogger.logSequence(true);
+ compactionLogger.logFileInfo(
+ SizeTieredCompactionLogger.TARGET_INFO, targetResource.getTsFile());
+ InnerSpaceCompactionUtils.compact(targetResource, seqResources, COMPACTION_TEST_SG, true);
+ InnerSpaceCompactionUtils.moveTargetFile(targetResource, COMPACTION_TEST_SG);
+ for (int i = 0; i < seqResources.size(); i++) {
+ Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
+ deleteMap.put(
+ deviceIds[0] + "." + measurementSchemas[0].getMeasurementId(),
+ new Pair<>(i * ptNum, i * ptNum + 10));
+ CompactionFileGeneratorUtils.generateMods(deleteMap, seqResources.get(i), true);
+ CompactionFileGeneratorUtils.generateMods(deleteMap, seqResources.get(i), false);
+ }
+ InnerSpaceCompactionUtils.combineModsInCompaction(seqResources, targetResource);
+ compactionLogger.close();
+
+ new SizeTieredCompactionRecoverTask(
+ COMPACTION_LOG_NAME,
+ "0",
+ 0,
+ logFile,
+ tempSGDir.getAbsolutePath(),
+ true,
+ CompactionTaskManager.currentTaskNum)
+ .call();
+ // all source file should exist
+ for (int i = 0; i < seqResources.size(); i++) {
+ Assert.assertTrue(seqResources.get(i).getTsFile().exists());
+ Assert.assertTrue(seqResources.get(i).resourceFileExists());
+ }
+
+ // tmp target file, target file and target resource file should be deleted
+ Assert.assertFalse(targetResource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(
+ targetResource
+ .getTsFilePath()
+ .replace(
+ IoTDBConstant.COMPACTION_TMP_FILE_SUFFIX, TsFileConstant.TSFILE_SUFFIX))
+ .exists());
+ Assert.assertFalse(
+ new File(targetResource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+
+ // all compaction mods file of each source file should not exist
+ for (int i = 0; i < seqResources.size(); i++) {
+ Assert.assertFalse(seqResources.get(i).getCompactionModFile().exists());
+ }
+
+ // all mods file of each source file should exist
+ for (int i = 0; i < seqResources.size(); i++) {
+ seqResources.get(i).resetModFile();
+ Assert.assertTrue(seqResources.get(i).getModFile().exists());
+ Assert.assertEquals(2, seqResources.get(i).getModFile().getModifications().size());
+ }
+
+ // mods file of the target file should not exist
+ Assert.assertFalse(targetResource.getModFile().exists());
+
+ // compaction log file should not exist
+ Assert.assertFalse(logFile.exists());
+
+ Assert.assertTrue(tsFileManager.isAllowCompaction());
+ }
+
+ /**
+ * Some source files have been deleted, each source file has old mods file and new compaction mods
+ * file.
+ */
+ @Test
+ public void testRecoverWithoutAllSourceFilesExistAndModFiles() throws Exception {
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+ String targetFileName =
+ TsFileNameGenerator.getInnerCompactionFileName(seqResources, true).getName();
+ TsFileResource targetResource =
+ new TsFileResource(new File(seqResources.get(0).getTsFile().getParent(), targetFileName));
+ File logFile =
+ new File(
+ targetResource.getTsFile().getPath() + SizeTieredCompactionLogger.COMPACTION_LOG_NAME);
+ SizeTieredCompactionLogger compactionLogger = new SizeTieredCompactionLogger(logFile.getPath());
+ for (TsFileResource source : seqResources) {
+ compactionLogger.logFileInfo(SizeTieredCompactionLogger.SOURCE_INFO, source.getTsFile());
+ }
+ compactionLogger.logSequence(true);
+ compactionLogger.logFileInfo(
+ SizeTieredCompactionLogger.TARGET_INFO, targetResource.getTsFile());
+ InnerSpaceCompactionUtils.compact(targetResource, seqResources, COMPACTION_TEST_SG, true);
+ InnerSpaceCompactionUtils.moveTargetFile(targetResource, COMPACTION_TEST_SG);
+ for (int i = 0; i < seqResources.size(); i++) {
+ Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
+ deleteMap.put(
+ deviceIds[0] + "." + measurementSchemas[0].getMeasurementId(),
+ new Pair<>(i * ptNum, i * ptNum + 10));
+ CompactionFileGeneratorUtils.generateMods(deleteMap, seqResources.get(i), true);
+ CompactionFileGeneratorUtils.generateMods(deleteMap, seqResources.get(i), false);
+ }
+ InnerSpaceCompactionUtils.combineModsInCompaction(seqResources, targetResource);
+ seqResources.get(0).remove();
+ compactionLogger.close();
+
+ new SizeTieredCompactionRecoverTask(
+ COMPACTION_LOG_NAME,
+ "0",
+ 0,
+ logFile,
+ tempSGDir.getAbsolutePath(),
+ true,
+ CompactionTaskManager.currentTaskNum)
+ .call();
+ // all source files should not exist
+ for (int i = 0; i < seqResources.size(); i++) {
+ Assert.assertFalse(seqResources.get(i).getTsFile().exists());
+ Assert.assertFalse(seqResources.get(i).resourceFileExists());
+ }
+
+ // target file and target resource file should exist
+ Assert.assertTrue(targetResource.getTsFile().exists());
+ Assert.assertTrue(targetResource.resourceFileExists());
+
+ // tmp target file should be deleted
+ Assert.assertFalse(
+ new File(
+ targetResource
+ .getTsFilePath()
+ .replace(
+ TsFileConstant.TSFILE_SUFFIX, IoTDBConstant.COMPACTION_TMP_FILE_SUFFIX))
+ .exists());
+
+ // all compaction mods file and old mods file of each source file should not exist
+ for (int i = 0; i < seqResources.size(); i++) {
+ Assert.assertFalse(seqResources.get(i).getCompactionModFile().exists());
+ Assert.assertFalse(seqResources.get(i).getModFile().exists());
+ }
+
+ // mods file of the target file should exist
+ Assert.assertTrue(targetResource.getModFile().exists());
+
+ // compaction log file should not exist
+ Assert.assertFalse(logFile.exists());
+
+ Assert.assertTrue(tsFileManager.isAllowCompaction());
+ }
+
/** compaction recover merge finished, delete one offset */
@Test
public void testRecoverCompleteTargetFileAndCompactionLog() throws Exception {
@@ -230,6 +646,7 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
}
}
tsFilesReader.close();
+ closeTsFileSequenceReader();
assertEquals(500, count);
TsFileResource targetTsFileResource =
@@ -244,7 +661,7 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
+ 1
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
- + ".tsfile")));
+ + IoTDBConstant.COMPACTION_TMP_FILE_SUFFIX)));
File compactionLogFile =
new File(
seqResources.get(0).getTsFile().getParent()
@@ -265,6 +682,7 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
COMPACTION_TEST_SG,
true);
compactionLogger.close();
+ InnerSpaceCompactionUtils.moveTargetFile(targetTsFileResource, COMPACTION_TEST_SG);
tsFileManager.add(targetTsFileResource, true);
new SizeTieredCompactionRecoverTask(
COMPACTION_LOG_NAME,
@@ -300,6 +718,7 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
}
}
tsFilesReader.close();
+ closeTsFileSequenceReader();
assertEquals(500, count);
}
@@ -333,6 +752,7 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
}
}
tsFilesReader.close();
+ closeTsFileSequenceReader();
assertEquals(500, count);
TsFileResource targetTsFileResource =
@@ -347,7 +767,7 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
+ 1
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
- + ".tsfile")));
+ + IoTDBConstant.COMPACTION_TMP_FILE_SUFFIX)));
File compactionLogFile =
new File(
seqResources.get(0).getTsFile().getParent()
@@ -367,10 +787,11 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
new ArrayList<>(seqResources.subList(0, 3)),
COMPACTION_TEST_SG,
true);
+ InnerSpaceCompactionUtils.moveTargetFile(targetTsFileResource, COMPACTION_TEST_SG);
compactionLogger.close();
for (TsFileResource resource : new ArrayList<>(seqResources.subList(0, 3))) {
- tsFileManager.remove(resource, true);
deleteFileIfExists(resource.getTsFile());
+ tsFileManager.remove(resource, true);
}
tsFileManager.add(targetTsFileResource, true);
new SizeTieredCompactionRecoverTask(
@@ -406,6 +827,7 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
}
}
tsFilesReader.close();
+ closeTsFileSequenceReader();
assertEquals(500, count);
}
@@ -440,6 +862,7 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
}
}
tsFilesReader.close();
+ closeTsFileSequenceReader();
assertEquals(500, count);
TsFileResource targetTsFileResource =
@@ -454,7 +877,7 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
+ 1
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
- + ".tsfile")));
+ + IoTDBConstant.COMPACTION_TMP_FILE_SUFFIX)));
File compactionLogFile =
new File(
seqResources.get(0).getTsFile().getParent()
@@ -475,7 +898,7 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
COMPACTION_TEST_SG,
true);
compactionLogger.close();
- long totalWaitingTime = 0;
+ InnerSpaceCompactionUtils.moveTargetFile(targetTsFileResource, COMPACTION_TEST_SG);
deleteFileIfExists(compactionLogFile);
for (TsFileResource resource : new ArrayList<>(seqResources.subList(0, 3))) {
tsFileManager.remove(resource, true);
@@ -516,6 +939,7 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
}
}
tsFilesReader.close();
+ closeTsFileSequenceReader();
assertEquals(500, count);
}
@@ -555,6 +979,7 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
}
}
tsFilesReader.close();
+ closeTsFileSequenceReader();
assertEquals(500, count);
}
@@ -595,6 +1020,7 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
}
}
tsFilesReader.close();
+ closeTsFileSequenceReader();
assertEquals(500, count);
}
@@ -621,7 +1047,7 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
+ 1
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
- + ".tsfile")));
+ + IoTDBConstant.COMPACTION_TMP_FILE_SUFFIX)));
sizeTieredCompactionLogger.logFileInfo(TARGET_INFO, targetTsFileResource.getTsFile());
tsFileManager.addForRecover(targetTsFileResource, true);
sizeTieredCompactionLogger.close();
@@ -668,4 +1094,10 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
}
}
}
+
+ private void closeTsFileSequenceReader() throws IOException {
+ for (TsFileResource tsFileResource : seqResources) {
+ FileReaderManager.getInstance().closeFileAndRemoveReader(tsFileResource.getTsFilePath());
+ }
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/recover/SizeTieredCompactionRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/recover/SizeTieredCompactionRecoverTest.java
index 3b8d798..1ef63a0 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/recover/SizeTieredCompactionRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/recover/SizeTieredCompactionRecoverTest.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.commons.io.FileUtils;
import org.junit.After;
@@ -150,7 +151,8 @@ public class SizeTieredCompactionRecoverTest {
}
/**
- * Test a compaction task in finished. The compaction log use file info to record files.
+ * Test a compaction task in finished. The compaction log use file info to record files. The
+ * sources file are all existed.
*
* @throws Exception
*/
@@ -191,15 +193,16 @@ public class SizeTieredCompactionRecoverTest {
logger.logFileInfo(SizeTieredCompactionLogger.TARGET_INFO, targetResource.getTsFile());
logger.close();
InnerSpaceCompactionUtils.compact(targetResource, sourceFiles, COMPACTION_TEST_SG, true);
+ InnerSpaceCompactionUtils.moveTargetFile(targetResource, COMPACTION_TEST_SG);
SizeTieredCompactionRecoverTask recoverTask =
new SizeTieredCompactionRecoverTask(
COMPACTION_TEST_SG, "0", 0, new File(logFilePath), "", true, new AtomicInteger(0));
recoverTask.doCompaction();
- // all the source file should be deleted
+ // all the source file should still exist
for (TsFileResource resource : sourceFiles) {
- Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertTrue(resource.getTsFile().exists());
}
- Assert.assertTrue(targetResource.getTsFile().exists());
+ Assert.assertFalse(targetResource.getTsFile().exists());
}
/**
@@ -244,6 +247,7 @@ public class SizeTieredCompactionRecoverTest {
logger.logFileInfo(SizeTieredCompactionLogger.TARGET_INFO, targetResource.getTsFile());
logger.close();
InnerSpaceCompactionUtils.compact(targetResource, sourceFiles, COMPACTION_TEST_SG, true);
+ InnerSpaceCompactionUtils.moveTargetFile(targetResource, COMPACTION_TEST_SG);
FileOutputStream targetStream = new FileOutputStream(targetResource.getTsFile(), true);
FileChannel channel = targetStream.getChannel();
channel.truncate(targetResource.getTsFile().length() - 100);
@@ -260,7 +264,8 @@ public class SizeTieredCompactionRecoverTest {
}
/**
- * Test a compaction task in finished. The compaction log use file path to record files.
+ * Test a compaction task in finished. The compaction log use file path to record files. All the
+ * sources file is still existed.
*
* @throws Exception
*/
@@ -301,15 +306,16 @@ public class SizeTieredCompactionRecoverTest {
logger.logFile(SizeTieredCompactionLogger.TARGET_NAME, targetResource.getTsFile());
logger.close();
InnerSpaceCompactionUtils.compact(targetResource, sourceFiles, COMPACTION_TEST_SG, true);
+ InnerSpaceCompactionUtils.moveTargetFile(targetResource, COMPACTION_TEST_SG);
SizeTieredCompactionRecoverTask recoverTask =
new SizeTieredCompactionRecoverTask(
COMPACTION_TEST_SG, "0", 0, new File(logFilePath), "", true, new AtomicInteger(0));
recoverTask.doCompaction();
- // all the source file should be deleted
+ // all the source file should still exist
for (TsFileResource resource : sourceFiles) {
- Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertTrue(resource.getTsFile().exists());
}
- Assert.assertTrue(targetResource.getTsFile().exists());
+ Assert.assertFalse(targetResource.getTsFile().exists());
}
/**
@@ -354,6 +360,7 @@ public class SizeTieredCompactionRecoverTest {
logger.logFile(SizeTieredCompactionLogger.TARGET_NAME, targetResource.getTsFile());
logger.close();
InnerSpaceCompactionUtils.compact(targetResource, sourceFiles, COMPACTION_TEST_SG, true);
+ InnerSpaceCompactionUtils.moveTargetFile(targetResource, COMPACTION_TEST_SG);
FileOutputStream targetStream = new FileOutputStream(targetResource.getTsFile(), true);
FileChannel channel = targetStream.getChannel();
channel.truncate(targetResource.getTsFile().length() - 100);
@@ -415,6 +422,7 @@ public class SizeTieredCompactionRecoverTest {
logger.logFileInfo(SizeTieredCompactionLogger.TARGET_INFO, targetResource.getTsFile());
logger.close();
InnerSpaceCompactionUtils.compact(targetResource, sourceFiles, COMPACTION_TEST_SG, true);
+ InnerSpaceCompactionUtils.moveTargetFile(targetResource, COMPACTION_TEST_SG);
long sizeOfTargetFile = targetResource.getTsFileSize();
FileUtils.moveDirectory(
new File(TestConstant.BASE_OUTPUT_PATH + File.separator + "data"),
@@ -424,9 +432,9 @@ public class SizeTieredCompactionRecoverTest {
new SizeTieredCompactionRecoverTask(
COMPACTION_TEST_SG, "0", 0, new File(logFilePath), "", true, new AtomicInteger(0));
recoverTask.doCompaction();
- // all the source file should be deleted
+ // all the source files should exist
for (String sourceFileName : sourceFileNames) {
- Assert.assertFalse(
+ Assert.assertTrue(
new File(
TestConstant.BASE_OUTPUT_PATH
+ File.separator
@@ -455,9 +463,9 @@ public class SizeTieredCompactionRecoverTest {
+ "0"
+ File.separator
+ "0",
- targetFileName);
- Assert.assertTrue(targetFileAfterMoved.exists());
- Assert.assertEquals(targetFileAfterMoved.length(), sizeOfTargetFile);
+ targetFileName.replace(
+ IoTDBConstant.COMPACTION_TMP_FILE_SUFFIX, TsFileConstant.TSFILE_SUFFIX));
+ Assert.assertFalse(targetFileAfterMoved.exists());
} finally {
FileUtils.deleteDirectory(new File(TestConstant.BASE_OUTPUT_PATH + File.separator + "data1"));
}
@@ -510,6 +518,7 @@ public class SizeTieredCompactionRecoverTest {
logger.logFileInfo(SizeTieredCompactionLogger.TARGET_INFO, targetResource.getTsFile());
logger.close();
InnerSpaceCompactionUtils.compact(targetResource, sourceFiles, COMPACTION_TEST_SG, true);
+ InnerSpaceCompactionUtils.moveTargetFile(targetResource, COMPACTION_TEST_SG);
FileOutputStream targetStream = new FileOutputStream(targetResource.getTsFile(), true);
FileChannel channel = targetStream.getChannel();
channel.truncate(targetResource.getTsFile().length() - 100);
@@ -606,6 +615,7 @@ public class SizeTieredCompactionRecoverTest {
logger.logFile(SizeTieredCompactionLogger.TARGET_NAME, targetResource.getTsFile());
logger.close();
InnerSpaceCompactionUtils.compact(targetResource, sourceFiles, COMPACTION_TEST_SG, true);
+ InnerSpaceCompactionUtils.moveTargetFile(targetResource, COMPACTION_TEST_SG);
long sizeOfTargetFile = targetResource.getTsFileSize();
FileUtils.moveDirectory(
new File(TestConstant.BASE_OUTPUT_PATH + File.separator + "data"),
@@ -615,9 +625,9 @@ public class SizeTieredCompactionRecoverTest {
new SizeTieredCompactionRecoverTask(
COMPACTION_TEST_SG, "0", 0, new File(logFilePath), "", true, new AtomicInteger(0));
recoverTask.doCompaction();
- // all the source file should be deleted
+ // all the source files should exist
for (String sourceFileName : sourceFileNames) {
- Assert.assertFalse(
+ Assert.assertTrue(
new File(
TestConstant.BASE_OUTPUT_PATH
+ File.separator
@@ -646,9 +656,9 @@ public class SizeTieredCompactionRecoverTest {
+ "0"
+ File.separator
+ "0",
- targetFileName);
- Assert.assertTrue(targetFileAfterMoved.exists());
- Assert.assertEquals(targetFileAfterMoved.length(), sizeOfTargetFile);
+ targetFileName.replace(
+ IoTDBConstant.COMPACTION_TMP_FILE_SUFFIX, TsFileConstant.TSFILE_SUFFIX));
+ Assert.assertFalse(targetFileAfterMoved.exists());
} finally {
FileUtils.deleteDirectory(new File(TestConstant.BASE_OUTPUT_PATH + File.separator + "data1"));
}
@@ -701,6 +711,7 @@ public class SizeTieredCompactionRecoverTest {
logger.logFile(SizeTieredCompactionLogger.TARGET_NAME, targetResource.getTsFile());
logger.close();
InnerSpaceCompactionUtils.compact(targetResource, sourceFiles, COMPACTION_TEST_SG, true);
+ InnerSpaceCompactionUtils.moveTargetFile(targetResource, COMPACTION_TEST_SG);
FileOutputStream targetStream = new FileOutputStream(targetResource.getTsFile(), true);
FileChannel channel = targetStream.getChannel();
channel.truncate(targetResource.getTsFile().length() - 100);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionFileGeneratorUtils.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionFileGeneratorUtils.java
index 57b4b6b..dd2c541 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionFileGeneratorUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionFileGeneratorUtils.java
@@ -59,7 +59,7 @@ public class CompactionFileGeneratorUtils {
+ (tsFileName.getInnerCompactionCnt() + 1)
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ tsFileName.getCrossCompactionCnt()
- + ".tsfile")));
+ + IoTDBConstant.COMPACTION_TMP_FILE_SUFFIX)));
}
public static TsFileResource generateTsFileResource(boolean sequence, int index) {