You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/01/31 05:39:33 UTC
[iotdb] branch rel/1.0 updated: [IOTDB-5206] Set empty target resource to DELETED and then delete it after compaction (#8736) (#8937)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.0 by this push:
new da492cf306 [IOTDB-5206] Set empty target resource to DELETED and then delete it after compaction (#8736) (#8937)
da492cf306 is described below
commit da492cf3063c9349d78839e2067845ca7572336a
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Tue Jan 31 13:39:28 2023 +0800
[IOTDB-5206] Set empty target resource to DELETED and then delete it after compaction (#8736) (#8937)
---
.../exception/CompactionExceptionHandler.java | 11 +-
.../execute/recover/CompactionRecoverTask.java | 110 +++++----
.../execute/task/CrossSpaceCompactionTask.java | 17 +-
.../execute/task/InnerSpaceCompactionTask.java | 19 +-
.../compaction/execute/utils/CompactionUtils.java | 8 -
.../execute/utils/log/CompactionLogAnalyzer.java | 26 +-
.../execute/utils/log/CompactionLogger.java | 12 +
.../execute/utils/log/TsFileIdentifier.java | 6 +-
.../writer/AbstractCrossCompactionWriter.java | 5 +-
.../writer/AbstractInnerCompactionWriter.java | 4 +-
.../db/engine/storagegroup/TsFileManager.java | 4 +-
.../db/engine/storagegroup/TsFileResource.java | 7 +-
.../FastCrossCompactionPerformerTest.java | 6 +-
.../ReadPointCompactionPerformerTest.java | 13 +-
.../cross/CrossSpaceCompactionExceptionTest.java | 200 ++++++++++++++++
.../RewriteCrossSpaceCompactionRecoverTest.java | 266 +++++++++++++++++++++
.../inner/InnerSpaceCompactionExceptionTest.java | 178 ++++++++++++++
.../SizeTieredCompactionRecoverTest.java | 235 ++++++++++++++++++
18 files changed, 1031 insertions(+), 96 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/exception/CompactionExceptionHandler.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/exception/CompactionExceptionHandler.java
index 5254497550..70dcebf3f7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/exception/CompactionExceptionHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/exception/CompactionExceptionHandler.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.engine.compaction.execute.utils.CompactionUtils;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceList;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.rescon.TsFileResourceManager;
import org.apache.iotdb.tsfile.utils.TsFileUtils;
@@ -233,7 +234,15 @@ public class CompactionExceptionHandler {
String fullStorageGroupName)
throws IOException {
for (TsFileResource targetResource : targetResources) {
- if (targetResource != null && !TsFileUtils.isTsFileComplete(targetResource.getTsFile())) {
+ if (targetResource.isDeleted()) {
+ // target resource is empty after compaction, then delete it
+ targetResource.remove();
+ continue;
+ } else {
+ // set target resources to CLOSED, so that they can be selected to compact
+ targetResource.setStatus(TsFileResourceStatus.CLOSED);
+ }
+ if (!TsFileUtils.isTsFileComplete(targetResource.getTsFile())) {
LOGGER.error(
"{} [Compaction][ExceptionHandler] target file {} is not complete, and some source files {} is lost, do nothing. Set allowCompaction to false",
fullStorageGroupName,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/recover/CompactionRecoverTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/recover/CompactionRecoverTask.java
index ebd5ac5177..6f45ba2a7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/recover/CompactionRecoverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/recover/CompactionRecoverTask.java
@@ -91,6 +91,8 @@ public class CompactionRecoverTask {
}
List<TsFileIdentifier> sourceFileIdentifiers = logAnalyzer.getSourceFileInfos();
List<TsFileIdentifier> targetFileIdentifiers = logAnalyzer.getTargetFileInfos();
+ List<TsFileIdentifier> deletedTargetFileIdentifiers =
+ logAnalyzer.getDeletedTargetFileInfos();
// compaction log file is incomplete
if (targetFileIdentifiers.isEmpty() || sourceFileIdentifiers.isEmpty()) {
@@ -125,7 +127,8 @@ public class CompactionRecoverTask {
targetFileIdentifiers, sourceFileIdentifiers);
} else {
recoverSuccess =
- handleWithSomeSourceFilesLost(targetFileIdentifiers, sourceFileIdentifiers);
+ handleWithSomeSourceFilesLost(
+ targetFileIdentifiers, deletedTargetFileIdentifiers, sourceFileIdentifiers);
}
}
}
@@ -222,48 +225,18 @@ public class CompactionRecoverTask {
* file and compaction mods file.
*/
private boolean handleWithSomeSourceFilesLost(
- List<TsFileIdentifier> targetFileIdentifiers, List<TsFileIdentifier> sourceFileIdentifiers)
+ List<TsFileIdentifier> targetFileIdentifiers,
+ List<TsFileIdentifier> deletedTargetFileIdentifiers,
+ List<TsFileIdentifier> sourceFileIdentifiers)
throws IOException {
// some source files have been deleted, while target file must exist and complete.
- if (!checkIsTargetFilesComplete(targetFileIdentifiers)) {
+ if (!checkIsTargetFilesComplete(targetFileIdentifiers, deletedTargetFileIdentifiers)) {
return false;
}
boolean handleSuccess = true;
for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
- File sourceFile = sourceFileIdentifier.getFileFromDataDirs();
- if (sourceFile != null) {
- // delete source tsfile, resource file and mods file
- if (!new TsFileResource(sourceFile).remove()) {
- LOGGER.error(
- "{} [Compaction][Recover] fail to delete remaining source file {}.",
- fullStorageGroupName,
- sourceFile);
- handleSuccess = false;
- }
- } else {
- // if source file does not exist, its resource file may still exist, so delete it.
- File resourceFile =
- getFileFromDataDirs(
- sourceFileIdentifier.getFilePath() + TsFileResource.RESOURCE_SUFFIX);
-
- if (!checkAndDeleteFile(resourceFile)) {
- handleSuccess = false;
- }
-
- // delete .mods file of source tsfile
- File modFile =
- getFileFromDataDirs(sourceFileIdentifier.getFilePath() + ModificationFile.FILE_SUFFIX);
- if (!checkAndDeleteFile(modFile)) {
- handleSuccess = false;
- }
- }
-
- // delete .compaction.mods file of all source files
- File compactionModFile =
- getFileFromDataDirs(
- sourceFileIdentifier.getFilePath() + ModificationFile.COMPACTION_FILE_SUFFIX);
- if (!checkAndDeleteFile(compactionModFile)) {
+ if (!deleteFile(sourceFileIdentifier)) {
handleSuccess = false;
}
}
@@ -285,19 +258,28 @@ public class CompactionRecoverTask {
return null;
}
- private boolean checkIsTargetFilesComplete(List<TsFileIdentifier> targetFileIdentifiers)
+ private boolean checkIsTargetFilesComplete(
+ List<TsFileIdentifier> targetFileIdentifiers,
+ List<TsFileIdentifier> deletedTargetFileIdentifiers)
throws IOException {
for (TsFileIdentifier targetFileIdentifier : targetFileIdentifiers) {
+ targetFileIdentifier.setFilename(
+ targetFileIdentifier
+ .getFilename()
+ .replace(
+ isInnerSpace
+ ? IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX
+ : IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX,
+ TsFileConstant.TSFILE_SUFFIX));
+ boolean isTargetFileDeleted = deletedTargetFileIdentifiers.contains(targetFileIdentifier);
+ if (isTargetFileDeleted) {
+ if (!deleteFile(targetFileIdentifier)) {
+ return false;
+ }
+ continue;
+ }
// xxx.tsfile
- File targetFile =
- getFileFromDataDirs(
- targetFileIdentifier
- .getFilePath()
- .replace(
- isInnerSpace
- ? IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX
- : IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX,
- TsFileConstant.TSFILE_SUFFIX));
+ File targetFile = getFileFromDataDirs(targetFileIdentifier.getFilePath());
if (targetFile == null
|| !TsFileUtils.isTsFileComplete(new TsFileResource(targetFile).getTsFile())) {
LOGGER.error(
@@ -311,6 +293,42 @@ public class CompactionRecoverTask {
return true;
}
+ /**
+ * Delete tsfile and its corresponding files, including resource file, mods file and compaction
+ * mods file. Return true if the file is not existed or if the file is existed and has been
+ * deleted correctly. Otherwise, return false.
+ */
+ private boolean deleteFile(TsFileIdentifier tsFileIdentifier) {
+ boolean success = true;
+ // delete tsfile
+ File file = tsFileIdentifier.getFileFromDataDirs();
+ if (!checkAndDeleteFile(file)) {
+ success = false;
+ }
+
+ // delete resource file
+ file = getFileFromDataDirs(tsFileIdentifier.getFilePath() + TsFileResource.RESOURCE_SUFFIX);
+ if (!checkAndDeleteFile(file)) {
+ success = false;
+ }
+
+ // delete mods file
+ file = getFileFromDataDirs(tsFileIdentifier.getFilePath() + ModificationFile.FILE_SUFFIX);
+ if (!checkAndDeleteFile(file)) {
+ success = false;
+ }
+
+ // delete compaction mods file
+ file =
+ getFileFromDataDirs(
+ tsFileIdentifier.getFilePath() + ModificationFile.COMPACTION_FILE_SUFFIX);
+ if (!checkAndDeleteFile(file)) {
+ success = false;
+ }
+
+ return success;
+ }
+
/**
* Return true if the file is not existed or if the file is existed and has been deleted
* correctly. Otherwise, return false.
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java
index 00dd2c00d2..b7ec8741e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java
@@ -46,6 +46,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.iotdb.db.engine.compaction.execute.utils.log.CompactionLogger.STR_DELETED_TARGET_FILES;
import static org.apache.iotdb.db.engine.compaction.execute.utils.log.CompactionLogger.STR_SOURCE_FILES;
import static org.apache.iotdb.db.engine.compaction.execute.utils.log.CompactionLogger.STR_TARGET_FILES;
@@ -148,9 +149,6 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask {
compactionLogger.logFiles(selectedSequenceFiles, STR_SOURCE_FILES);
compactionLogger.logFiles(selectedUnsequenceFiles, STR_SOURCE_FILES);
compactionLogger.logFiles(targetTsfileResourceList, STR_TARGET_FILES);
- // indicates that the cross compaction is complete and the result can be reused during a
- // restart recovery
- compactionLogger.close();
performer.setSourceFiles(selectedSequenceFiles, selectedUnsequenceFiles);
performer.setTargetFiles(targetTsfileResourceList);
@@ -170,6 +168,13 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask {
timePartition,
true);
+ // find empty target files and add log
+ for (TsFileResource targetResource : targetTsfileResourceList) {
+ if (targetResource.isDeleted()) {
+ compactionLogger.logFile(targetResource, STR_DELETED_TARGET_FILES);
+ }
+ }
+
if (IoTDBDescriptor.getInstance().getConfig().isEnableCompactionValidation()
&& !CompactionUtils.validateTsFileResources(
tsFileManager, storageGroupName, timePartition)) {
@@ -186,7 +191,6 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask {
long sequenceFileSize = deleteOldFiles(selectedSequenceFiles);
long unsequenceFileSize = deleteOldFiles(selectedUnsequenceFiles);
-
CompactionUtils.deleteCompactionModsFile(selectedSequenceFiles, selectedUnsequenceFiles);
if (logFile.exists()) {
@@ -195,11 +199,14 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask {
// update the metrics finally in case of any exception occurs
for (TsFileResource targetResource : targetTsfileResourceList) {
- if (targetResource != null) {
+ if (!targetResource.isDeleted()) {
TsFileMetricManager.getInstance().addFile(targetResource.getTsFileSize(), true);
// set target resources to CLOSED, so that they can be selected to compact
targetResource.setStatus(TsFileResourceStatus.CLOSED);
+ } else {
+ // target resource is empty after compaction, then delete it
+ targetResource.remove();
}
}
TsFileMetricManager.getInstance()
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
index 12525a6610..f65c3799b7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -131,9 +131,7 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
targetTsFileList = new ArrayList<>(Collections.singletonList(targetTsFileResource));
compactionLogger.logFiles(selectedTsFileResourceList, CompactionLogger.STR_SOURCE_FILES);
compactionLogger.logFiles(targetTsFileList, CompactionLogger.STR_TARGET_FILES);
- LOGGER.info(
- "{}-{} [InnerSpaceCompactionTask] Close the logger", storageGroupName, dataRegionId);
- compactionLogger.close();
+
LOGGER.info(
"{}-{} [Compaction] compaction with {}",
storageGroupName,
@@ -179,6 +177,10 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
false);
}
+ if (targetTsFileResource.isDeleted()) {
+ compactionLogger.logFile(targetTsFileResource, CompactionLogger.STR_DELETED_TARGET_FILES);
+ }
+
if (IoTDBDescriptor.getInstance().getConfig().isEnableCompactionValidation()
&& !CompactionUtils.validateTsFileResources(
tsFileManager, storageGroupName, timePartition)) {
@@ -231,12 +233,15 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
}
// inner space compaction task has only one target file
- if (targetTsFileList.get(0) != null) {
+ if (!targetTsFileResource.isDeleted()) {
TsFileMetricManager.getInstance()
- .addFile(targetTsFileList.get(0).getTsFile().length(), sequence);
+ .addFile(targetTsFileResource.getTsFile().length(), sequence);
- // set target resources to CLOSED, so that they can be selected to compact
- targetTsFileList.get(0).setStatus(TsFileResourceStatus.CLOSED);
+ // set target resource to CLOSED, so that it can be selected to compact
+ targetTsFileResource.setStatus(TsFileResourceStatus.CLOSED);
+ } else {
+ // target resource is empty after compaction, then delete it
+ targetTsFileResource.remove();
}
TsFileMetricManager.getInstance()
.deleteFile(totalSizeOfDeletedFile, sequence, selectedTsFileResourceList.size());
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/CompactionUtils.java
index a4bf898447..697b4090ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/CompactionUtils.java
@@ -232,14 +232,6 @@ public class CompactionUtils {
// in the new file
for (int i = 0; i < targetResources.size(); i++) {
TsFileResource targetResource = targetResources.get(i);
- // remove the target file been deleted from list
- if (!targetResource.getTsFile().exists()) {
- logger.info(
- "[Compaction] target file {} has been deleted after compaction.",
- targetResource.getTsFilePath());
- targetResources.set(i, null);
- continue;
- }
for (TsFileResource unseqResource : unseqResources) {
targetResource.updatePlanIndexes(unseqResource);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/log/CompactionLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/log/CompactionLogAnalyzer.java
index 4115b7d2eb..6c05a78ad0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/log/CompactionLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/log/CompactionLogAnalyzer.java
@@ -29,6 +29,7 @@ import java.util.ArrayList;
import java.util.List;
import static org.apache.iotdb.db.engine.compaction.execute.utils.log.CompactionLogger.SEQUENCE_NAME_FROM_OLD;
+import static org.apache.iotdb.db.engine.compaction.execute.utils.log.CompactionLogger.STR_DELETED_TARGET_FILES;
import static org.apache.iotdb.db.engine.compaction.execute.utils.log.CompactionLogger.STR_MERGE_START_FROM_OLD;
import static org.apache.iotdb.db.engine.compaction.execute.utils.log.CompactionLogger.STR_SEQ_FILES_FROM_OLD;
import static org.apache.iotdb.db.engine.compaction.execute.utils.log.CompactionLogger.STR_SOURCE_FILES;
@@ -43,13 +44,14 @@ public class CompactionLogAnalyzer {
private final File logFile;
private final List<TsFileIdentifier> sourceFileInfos = new ArrayList<>();
private final List<TsFileIdentifier> targetFileInfos = new ArrayList<>();
+ private final List<TsFileIdentifier> deletedTargetFileInfos = new ArrayList<>();
private boolean isLogFromOld = false;
public CompactionLogAnalyzer(File logFile) {
this.logFile = logFile;
}
- /** @return analyze (source file list, target file) */
+ /** @return analyze (source files, target files, deleted target files) */
public void analyze() throws IOException {
String currLine;
try (BufferedReader bufferedReader = new BufferedReader(new FileReader(logFile))) {
@@ -57,10 +59,14 @@ public class CompactionLogAnalyzer {
String fileInfo;
if (currLine.startsWith(STR_SOURCE_FILES)) {
fileInfo = currLine.replace(STR_SOURCE_FILES + TsFileIdentifier.INFO_SEPARATOR, "");
- analyzeFilePath(false, fileInfo);
- } else {
+ sourceFileInfos.add(TsFileIdentifier.getFileIdentifierFromInfoString(fileInfo));
+ } else if (currLine.startsWith(STR_TARGET_FILES)) {
fileInfo = currLine.replace(STR_TARGET_FILES + TsFileIdentifier.INFO_SEPARATOR, "");
- analyzeFilePath(true, fileInfo);
+ targetFileInfos.add(TsFileIdentifier.getFileIdentifierFromInfoString(fileInfo));
+ } else {
+ fileInfo =
+ currLine.replace(STR_DELETED_TARGET_FILES + TsFileIdentifier.INFO_SEPARATOR, "");
+ deletedTargetFileInfos.add(TsFileIdentifier.getFileIdentifierFromInfoString(fileInfo));
}
}
}
@@ -120,14 +126,6 @@ public class CompactionLogAnalyzer {
}
}
- private void analyzeFilePath(boolean isTargetFile, String filePath) {
- if (isTargetFile) {
- targetFileInfos.add(TsFileIdentifier.getFileIdentifierFromInfoString(filePath));
- } else {
- sourceFileInfos.add(TsFileIdentifier.getFileIdentifierFromInfoString(filePath));
- }
- }
-
private void analyzeOldFilePath(boolean isSeqSource, String oldFilePath) {
if (oldFilePath.startsWith("root")) {
sourceFileInfos.add(TsFileIdentifier.getFileIdentifierFromOldInfoString(oldFilePath));
@@ -156,6 +154,10 @@ public class CompactionLogAnalyzer {
return targetFileInfos;
}
+ public List<TsFileIdentifier> getDeletedTargetFileInfos() {
+ return deletedTargetFileInfos;
+ }
+
public boolean isLogFromOld() {
return isLogFromOld;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/log/CompactionLogger.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/log/CompactionLogger.java
index 5b7828a969..e6cda9f099 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/log/CompactionLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/log/CompactionLogger.java
@@ -38,6 +38,8 @@ public class CompactionLogger implements AutoCloseable {
public static final String STR_SOURCE_FILES = "source";
public static final String STR_TARGET_FILES = "target";
+ public static final String STR_DELETED_TARGET_FILES = "empty";
+
public static final String STR_SOURCE_FILES_FROM_OLD = "info-source";
public static final String STR_TARGET_FILES_FROM_OLD = "info-target";
public static final String STR_SEQ_FILES_FROM_OLD = "seqFiles";
@@ -70,6 +72,16 @@ public class CompactionLogger implements AutoCloseable {
logStream.flush();
}
+ public void logFile(TsFileResource tsFile, String flag) throws IOException {
+ logStream.write(
+ flag
+ + TsFileIdentifier.INFO_SEPARATOR
+ + TsFileIdentifier.getFileIdentifierFromFilePath(tsFile.getTsFile().getAbsolutePath())
+ .toString());
+ logStream.newLine();
+ logStream.flush();
+ }
+
public static File[] findCompactionLogs(boolean isInnerSpace, String directory) {
String compactionLogSuffix =
isInnerSpace ? INNER_COMPACTION_LOG_NAME_SUFFIX : CROSS_COMPACTION_LOG_NAME_SUFFIX;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/log/TsFileIdentifier.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/log/TsFileIdentifier.java
index 8897a811fc..8cac122399 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/log/TsFileIdentifier.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/log/TsFileIdentifier.java
@@ -34,7 +34,7 @@ public class TsFileIdentifier {
private final String dataRegionId;
private final String timePartitionId;
private final boolean sequence;
- private final String filename;
+ private String filename;
public static final String INFO_SEPARATOR = " ";
// Notice: Do not change the offset of info
public static final int FILE_NAME_OFFSET_IN_PATH = 1;
@@ -200,6 +200,10 @@ public class TsFileIdentifier {
return null;
}
+ public void setFilename(String filename) {
+ this.filename = filename;
+ }
+
public String getFilename() {
return filename;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
index c781363966..e1d29296af 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.execute.utils.CompactionUtils;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -144,9 +145,9 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr
public void endFile() throws IOException {
for (int i = 0; i < isEmptyFile.length; i++) {
targetFileWriters.get(i).endFile();
- // delete empty target file
+ // set empty target file to DELETED
if (isEmptyFile[i]) {
- targetFileWriters.get(i).getFile().delete();
+ targetResources.get(i).setStatus(TsFileResourceStatus.DELETED);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
index e8d816b2ef..37f3f996a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.compaction.execute.utils.writer;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.execute.utils.CompactionUtils;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -89,7 +90,8 @@ public abstract class AbstractInnerCompactionWriter extends AbstractCompactionWr
public void endFile() throws IOException {
fileWriter.endFile();
if (isEmptyFile) {
- fileWriter.getFile().delete();
+ // set target file to DELETED
+ targetResource.setStatus(TsFileResourceStatus.DELETED);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
index c44bbabbff..96d0060c95 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
@@ -223,7 +223,7 @@ public class TsFileManager {
if (isTargetSequence) {
// seq inner space compaction or cross space compaction
for (TsFileResource resource : targetFileResources) {
- if (resource != null) {
+ if (!resource.isDeleted()) {
TsFileResourceManager.getInstance().registerSealedTsFileResource(resource);
sequenceFiles.get(timePartition).keepOrderInsert(resource);
}
@@ -231,7 +231,7 @@ public class TsFileManager {
} else {
// unseq inner space compaction
for (TsFileResource resource : targetFileResources) {
- if (resource != null) {
+ if (!resource.isDeleted()) {
TsFileResourceManager.getInstance().registerSealedTsFileResource(resource);
unsequenceFiles.get(timePartition).keepOrderInsert(resource);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 6a27ff1d54..2e454504c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -540,7 +540,7 @@ public class TsFileResource {
* file physically.
*/
public boolean remove() {
- this.status = TsFileResourceStatus.DELETED;
+ setStatus(TsFileResourceStatus.DELETED);
try {
fsFactory.deleteIfExists(file);
fsFactory.deleteIfExists(
@@ -627,6 +627,9 @@ public class TsFileResource {
case UNCLOSED:
this.status = TsFileResourceStatus.UNCLOSED;
break;
+ case DELETED:
+ this.status = TsFileResourceStatus.DELETED;
+ break;
case COMPACTING:
if (this.status == TsFileResourceStatus.COMPACTION_CANDIDATE) {
this.status = TsFileResourceStatus.COMPACTING;
@@ -893,7 +896,7 @@ public class TsFileResource {
.getFile(file.toPath() + TsFileResource.RESOURCE_SUFFIX)
.toPath());
}
- this.status = TsFileResourceStatus.DELETED;
+ setStatus(TsFileResourceStatus.DELETED);
}
public long getMaxPlanIndex() {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastCrossCompactionPerformerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastCrossCompactionPerformerTest.java
index a266315792..c104d519da 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastCrossCompactionPerformerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/FastCrossCompactionPerformerTest.java
@@ -844,7 +844,7 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest {
Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
- targetResources.removeIf(resource -> resource == null);
+ targetResources.removeIf(x -> x.isDeleted());
Assert.assertEquals(2, targetResources.size());
List<String> deviceIdList = new ArrayList<>();
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
@@ -2064,7 +2064,7 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest {
Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
- targetResources.removeIf(resource -> resource == null);
+ targetResources.removeIf(x -> x.isDeleted());
Assert.assertEquals(2, targetResources.size());
List<String> deviceIdList = new ArrayList<>();
@@ -2754,7 +2754,7 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest {
Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
- targetResources.removeIf(resource -> resource == null);
+ targetResources.removeIf(x -> x.isDeleted());
Assert.assertEquals(3, targetResources.size());
List<String> deviceIdList = new ArrayList<>();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
index 616dff44ee..6916f6d895 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.engine.compaction.execute.utils.reader.IDataBlockRead
import org.apache.iotdb.db.engine.compaction.execute.utils.reader.SeriesDataBlockReader;
import org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.query.control.FileReaderManager;
@@ -2829,12 +2830,12 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
for (int i = 0; i < targetResources.size(); i++) {
TsFileResource resource = targetResources.get(i);
if (i < 2) {
- Assert.assertEquals(null, resource);
+ Assert.assertEquals(TsFileResourceStatus.DELETED, resource.getStatus());
} else {
Assert.assertTrue(resource.getTsFile().exists());
}
}
- targetResources.removeIf(resource -> resource == null);
+ targetResources.removeIf(x -> x.isDeleted());
List<String> deviceIdList = new ArrayList<>();
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
for (int i = 0; i < 2; i++) {
@@ -3871,7 +3872,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
- targetResources.removeIf(resource -> resource == null);
+ targetResources.removeIf(x -> x.isDeleted());
Assert.assertEquals(2, targetResources.size());
List<String> deviceIdList = new ArrayList<>();
@@ -4555,7 +4556,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
- targetResources.removeIf(resource -> resource == null);
+ targetResources.removeIf(x -> x.isDeleted());
Assert.assertEquals(2, targetResources.size());
List<String> deviceIdList = new ArrayList<>();
@@ -5847,7 +5848,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
- targetResources.removeIf(resource -> resource == null);
+ targetResources.removeIf(x -> x.isDeleted());
Assert.assertEquals(3, targetResources.size());
List<String> deviceIdList = new ArrayList<>();
@@ -5978,7 +5979,7 @@ public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
Assert.assertEquals(0, FileReaderManager.getInstance().getClosedFileReaderMap().size());
Assert.assertEquals(0, FileReaderManager.getInstance().getUnclosedFileReaderMap().size());
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
- targetResources.removeIf(resource -> resource == null);
+ targetResources.removeIf(x -> x.isDeleted());
Assert.assertEquals(3, targetResources.size());
List<String> deviceIdList = new ArrayList<>();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionTest.java
index b02915db9e..2a4ac6a860 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionTest.java
@@ -19,13 +19,16 @@
package org.apache.iotdb.db.engine.compaction.cross;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.AbstractCompactionTest;
import org.apache.iotdb.db.engine.compaction.execute.exception.CompactionExceptionHandler;
import org.apache.iotdb.db.engine.compaction.execute.performer.ICompactionPerformer;
+import org.apache.iotdb.db.engine.compaction.execute.performer.impl.FastCompactionPerformer;
import org.apache.iotdb.db.engine.compaction.execute.performer.impl.ReadPointCompactionPerformer;
import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
+import org.apache.iotdb.db.engine.compaction.execute.task.subtask.FastCompactionTaskSummary;
import org.apache.iotdb.db.engine.compaction.execute.utils.CompactionUtils;
import org.apache.iotdb.db.engine.compaction.execute.utils.log.CompactionLogger;
import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer;
@@ -33,6 +36,7 @@ import org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
@@ -541,4 +545,200 @@ public class CrossSpaceCompactionExceptionTest extends AbstractCompactionTest {
Assert.assertEquals(5, tsFileManager.getUnsequenceListByTimePartition(0).size());
Assert.assertTrue(tsFileManager.isAllowCompaction());
}
+
+ @Test
+ public void testWhenTargetFileIsDeletedAfterCompactionAndSomeSourceFilesLost() throws Exception {
+ createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
+ createFiles(2, 4, 5, 300, 700, 700, 50, 50, false, true);
+ createFiles(3, 3, 4, 200, 20, 10020, 30, 30, false, false);
+ createFiles(2, 1, 5, 100, 450, 20450, 0, 0, false, false);
+ TsFileManager tsFileManager = new TsFileManager(COMPACTION_TEST_SG, "0", SEQ_DIRS.getPath());
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+
+ // generate mods file, the first target file should be deleted after compaction
+ for (int device = 0; device < 3; device++) {
+ for (int measurement = 0; measurement < 4; measurement++) {
+ Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
+ deleteMap.put(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + device + PATH_SEPARATOR + "s" + measurement,
+ new Pair(0L, 300L));
+ CompactionFileGeneratorUtils.generateMods(deleteMap, seqResources.get(0), false);
+ CompactionFileGeneratorUtils.generateMods(deleteMap, unseqResources.get(0), false);
+ CompactionFileGeneratorUtils.generateMods(deleteMap, unseqResources.get(1), false);
+ }
+ }
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+ File compactionLogFile =
+ new File(
+ SEQ_DIRS,
+ targetResources.get(0).getTsFile().getName()
+ + CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX);
+ CompactionLogger compactionLogger = new CompactionLogger(compactionLogFile);
+ compactionLogger.logFiles(targetResources, STR_TARGET_FILES);
+ compactionLogger.logFiles(seqResources, STR_SOURCE_FILES);
+ compactionLogger.logFiles(unseqResources, STR_SOURCE_FILES);
+ compactionLogger.close();
+ ICompactionPerformer performer =
+ new FastCompactionPerformer(seqResources, unseqResources, targetResources);
+ performer.setSummary(new FastCompactionTaskSummary());
+ performer.perform();
+ CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+ CompactionUtils.combineModsInCrossCompaction(seqResources, unseqResources, targetResources);
+ seqResources.get(0).remove();
+
+ // meet errors and handle exception
+ CompactionExceptionHandler.handleException(
+ COMPACTION_TEST_SG,
+ compactionLogFile,
+ targetResources,
+ seqResources,
+ unseqResources,
+ tsFileManager,
+ 0,
+ false,
+ true);
+
+ Assert.assertTrue(tsFileManager.isAllowCompaction());
+
+ // all source file should not exist
+ for (TsFileResource resource : seqResources) {
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertFalse(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+ for (TsFileResource resource : unseqResources) {
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertFalse(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+ // the first target file should be deleted after compaction, the others still exist
+ for (TsFileResource resource : targetResources) {
+ if (resource.getVersion() == 0) {
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(resource.resourceFileExists());
+ } else {
+ Assert.assertTrue(resource.getTsFile().exists());
+ Assert.assertTrue(resource.resourceFileExists());
+ Assert.assertEquals(TsFileResourceStatus.CLOSED, resource.getStatus());
+ }
+ }
+ }
+
+ @Test
+ public void testWhenTargetFileIsDeletedAfterCompactionAndAllSourceFilesExisted()
+ throws Exception {
+ createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
+ createFiles(2, 4, 5, 300, 700, 700, 50, 50, false, true);
+ createFiles(3, 3, 4, 200, 20, 10020, 30, 30, false, false);
+ createFiles(2, 1, 5, 100, 450, 20450, 0, 0, false, false);
+ TsFileManager tsFileManager = new TsFileManager(COMPACTION_TEST_SG, "0", SEQ_DIRS.getPath());
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+
+ // generate mods file, the first target file should be deleted after compaction
+ for (int device = 0; device < 3; device++) {
+ for (int measurement = 0; measurement < 4; measurement++) {
+ Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
+ deleteMap.put(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + device + PATH_SEPARATOR + "s" + measurement,
+ new Pair(0L, 300L));
+ seqResources.forEach(
+ x -> {
+ try {
+ CompactionFileGeneratorUtils.generateMods(deleteMap, x, false);
+ } catch (IllegalPathException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ unseqResources.forEach(
+ x -> {
+ try {
+ CompactionFileGeneratorUtils.generateMods(deleteMap, x, false);
+ } catch (IllegalPathException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+ File compactionLogFile =
+ new File(
+ SEQ_DIRS,
+ targetResources.get(0).getTsFile().getName()
+ + CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX);
+ CompactionLogger compactionLogger = new CompactionLogger(compactionLogFile);
+ compactionLogger.logFiles(targetResources, STR_TARGET_FILES);
+ compactionLogger.logFiles(seqResources, STR_SOURCE_FILES);
+ compactionLogger.logFiles(unseqResources, STR_SOURCE_FILES);
+ compactionLogger.close();
+ ICompactionPerformer performer =
+ new FastCompactionPerformer(seqResources, unseqResources, targetResources);
+ performer.setSummary(new FastCompactionTaskSummary());
+ performer.perform();
+ CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+ CompactionUtils.combineModsInCrossCompaction(seqResources, unseqResources, targetResources);
+
+ // meet errors and handle exception
+ CompactionExceptionHandler.handleException(
+ COMPACTION_TEST_SG,
+ compactionLogFile,
+ targetResources,
+ seqResources,
+ unseqResources,
+ tsFileManager,
+ 0,
+ false,
+ true);
+
+ Assert.assertTrue(tsFileManager.isAllowCompaction());
+
+ // all source file should exist
+ for (TsFileResource resource : seqResources) {
+ Assert.assertTrue(resource.getTsFile().exists());
+ Assert.assertTrue(
+ new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertTrue(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+ for (TsFileResource resource : unseqResources) {
+ Assert.assertTrue(resource.getTsFile().exists());
+ Assert.assertTrue(
+ new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertTrue(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+ // tmp target file, target file and target resource file should be deleted after compaction
+ for (TsFileResource resource : targetResources) {
+ if (resource == null) {
+ continue;
+ }
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(
+ resource
+ .getTsFilePath()
+ .replace(
+ IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX,
+ TsFileConstant.TSFILE_SUFFIX))
+ .exists());
+ Assert.assertFalse(
+ new File(
+ resource
+ .getTsFilePath()
+ .replace(
+ IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX,
+ TsFileConstant.TSFILE_SUFFIX)
+ + TsFileResource.RESOURCE_SUFFIX)
+ .exists());
+ }
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionRecoverTest.java
index 8139c74fc7..b54b57d7dd 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionRecoverTest.java
@@ -19,13 +19,16 @@
package org.apache.iotdb.db.engine.compaction.cross;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.AbstractCompactionTest;
import org.apache.iotdb.db.engine.compaction.execute.performer.ICompactionPerformer;
+import org.apache.iotdb.db.engine.compaction.execute.performer.impl.FastCompactionPerformer;
import org.apache.iotdb.db.engine.compaction.execute.performer.impl.ReadPointCompactionPerformer;
import org.apache.iotdb.db.engine.compaction.execute.recover.CompactionRecoverTask;
import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
+import org.apache.iotdb.db.engine.compaction.execute.task.subtask.FastCompactionTaskSummary;
import org.apache.iotdb.db.engine.compaction.execute.utils.CompactionUtils;
import org.apache.iotdb.db.engine.compaction.execute.utils.log.CompactionLogger;
import org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils;
@@ -48,6 +51,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.iotdb.db.engine.compaction.execute.utils.log.CompactionLogger.STR_DELETED_TARGET_FILES;
import static org.apache.iotdb.db.engine.compaction.execute.utils.log.CompactionLogger.STR_SOURCE_FILES;
import static org.apache.iotdb.db.engine.compaction.execute.utils.log.CompactionLogger.STR_TARGET_FILES;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
@@ -654,4 +658,266 @@ public class RewriteCrossSpaceCompactionRecoverTest extends AbstractCompactionTe
Assert.assertTrue(tsFileManager.isAllowCompaction());
}
+
+ @Test
+ public void testWhenTargetFileShouldBeDeletedAfterCompactionAndSomeSourceFilesLost()
+ throws Exception {
+ createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
+ createFiles(2, 4, 5, 300, 700, 700, 50, 50, false, true);
+ createFiles(3, 3, 4, 200, 20, 10020, 30, 30, false, false);
+ createFiles(2, 1, 5, 100, 450, 20450, 0, 0, false, false);
+ TsFileManager tsFileManager = new TsFileManager(COMPACTION_TEST_SG, "0", SEQ_DIRS.getPath());
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+
+ // generate mods file, the first target file should be deleted after compaction
+ for (int device = 0; device < 3; device++) {
+ for (int measurement = 0; measurement < 4; measurement++) {
+ Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
+ deleteMap.put(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + device + PATH_SEPARATOR + "s" + measurement,
+ new Pair(0L, 300L));
+ CompactionFileGeneratorUtils.generateMods(deleteMap, seqResources.get(0), false);
+ CompactionFileGeneratorUtils.generateMods(deleteMap, unseqResources.get(0), false);
+ CompactionFileGeneratorUtils.generateMods(deleteMap, unseqResources.get(1), false);
+ }
+ }
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+ File compactionLogFile =
+ new File(
+ SEQ_DIRS,
+ targetResources.get(0).getTsFile().getName()
+ + CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX);
+ CompactionLogger compactionLogger = new CompactionLogger(compactionLogFile);
+ compactionLogger.logFiles(targetResources, STR_TARGET_FILES);
+ compactionLogger.logFiles(seqResources, STR_SOURCE_FILES);
+ compactionLogger.logFiles(unseqResources, STR_SOURCE_FILES);
+ ICompactionPerformer performer =
+ new FastCompactionPerformer(seqResources, unseqResources, targetResources);
+ performer.setSummary(new FastCompactionTaskSummary());
+ performer.perform();
+ CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+ CompactionUtils.combineModsInCrossCompaction(seqResources, unseqResources, targetResources);
+ compactionLogger.logFile(targetResources.get(0), STR_DELETED_TARGET_FILES);
+ compactionLogger.close();
+ seqResources.get(0).remove();
+
+ new CompactionRecoverTask(COMPACTION_TEST_SG, "0", tsFileManager, compactionLogFile, false)
+ .doCompaction();
+
+ Assert.assertTrue(tsFileManager.isAllowCompaction());
+
+ // all source file should not exist
+ for (TsFileResource resource : seqResources) {
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertFalse(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+ for (TsFileResource resource : unseqResources) {
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertFalse(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+ // the first target file should be deleted after recovery
+ for (TsFileResource resource : targetResources) {
+ if (resource.getVersion() == 0) {
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(resource.resourceFileExists());
+ } else {
+ Assert.assertTrue(resource.getTsFile().exists());
+ Assert.assertTrue(resource.resourceFileExists());
+ }
+ }
+ }
+
+ @Test
+ public void testWhenTargetFileIsDeletedAfterCompactionAndSomeSourceFilesLost() throws Exception {
+ createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
+ createFiles(2, 4, 5, 300, 700, 700, 50, 50, false, true);
+ createFiles(3, 3, 4, 200, 20, 10020, 30, 30, false, false);
+ createFiles(2, 1, 5, 100, 450, 20450, 0, 0, false, false);
+ TsFileManager tsFileManager = new TsFileManager(COMPACTION_TEST_SG, "0", SEQ_DIRS.getPath());
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+
+ // generate mods file, the first target file should be deleted after compaction
+ for (int device = 0; device < 3; device++) {
+ for (int measurement = 0; measurement < 4; measurement++) {
+ Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
+ deleteMap.put(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + device + PATH_SEPARATOR + "s" + measurement,
+ new Pair(0L, 300L));
+ CompactionFileGeneratorUtils.generateMods(deleteMap, seqResources.get(0), false);
+ CompactionFileGeneratorUtils.generateMods(deleteMap, unseqResources.get(0), false);
+ CompactionFileGeneratorUtils.generateMods(deleteMap, unseqResources.get(1), false);
+ }
+ }
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+ File compactionLogFile =
+ new File(
+ SEQ_DIRS,
+ targetResources.get(0).getTsFile().getName()
+ + CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX);
+ CompactionLogger compactionLogger = new CompactionLogger(compactionLogFile);
+ compactionLogger.logFiles(targetResources, STR_TARGET_FILES);
+ compactionLogger.logFiles(seqResources, STR_SOURCE_FILES);
+ compactionLogger.logFiles(unseqResources, STR_SOURCE_FILES);
+ ICompactionPerformer performer =
+ new FastCompactionPerformer(seqResources, unseqResources, targetResources);
+ performer.setSummary(new FastCompactionTaskSummary());
+ performer.perform();
+ CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+ CompactionUtils.combineModsInCrossCompaction(seqResources, unseqResources, targetResources);
+ compactionLogger.logFile(targetResources.get(0), CompactionLogger.STR_DELETED_TARGET_FILES);
+ compactionLogger.close();
+ CompactionUtils.deleteTsFilesInDisk(seqResources, COMPACTION_TEST_SG);
+ CompactionUtils.deleteModificationForSourceFile(seqResources, COMPACTION_TEST_SG);
+ CompactionUtils.deleteTsFilesInDisk(unseqResources, COMPACTION_TEST_SG);
+ CompactionUtils.deleteModificationForSourceFile(unseqResources, COMPACTION_TEST_SG);
+
+ if (targetResources.get(0).isDeleted()) {
+ targetResources.get(0).remove();
+ }
+
+ new CompactionRecoverTask(COMPACTION_TEST_SG, "0", tsFileManager, compactionLogFile, false)
+ .doCompaction();
+
+ Assert.assertTrue(tsFileManager.isAllowCompaction());
+
+ // all source file should not exist
+ for (TsFileResource resource : seqResources) {
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertFalse(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+ for (TsFileResource resource : unseqResources) {
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertFalse(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+ // the first target file should be deleted after recovery
+ for (TsFileResource resource : targetResources) {
+ if (resource.getVersion() == 0) {
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(resource.resourceFileExists());
+ } else {
+ Assert.assertTrue(resource.getTsFile().exists());
+ Assert.assertTrue(resource.resourceFileExists());
+ }
+ }
+ }
+
+ @Test
+ public void testWhenTargetFileIsDeletedAfterCompactionAndAllSourceFilesExisted()
+ throws Exception {
+ createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
+ createFiles(2, 4, 5, 300, 700, 700, 50, 50, false, true);
+ createFiles(3, 3, 4, 200, 20, 10020, 30, 30, false, false);
+ createFiles(2, 1, 5, 100, 450, 20450, 0, 0, false, false);
+ TsFileManager tsFileManager = new TsFileManager(COMPACTION_TEST_SG, "0", SEQ_DIRS.getPath());
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+
+ // generate mods file, the first target file should be deleted after compaction
+ for (int device = 0; device < 3; device++) {
+ for (int measurement = 0; measurement < 4; measurement++) {
+ Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
+ deleteMap.put(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + device + PATH_SEPARATOR + "s" + measurement,
+ new Pair(0L, 300L));
+ seqResources.forEach(
+ x -> {
+ try {
+ CompactionFileGeneratorUtils.generateMods(deleteMap, x, false);
+ } catch (IllegalPathException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ unseqResources.forEach(
+ x -> {
+ try {
+ CompactionFileGeneratorUtils.generateMods(deleteMap, x, false);
+ } catch (IllegalPathException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+ File compactionLogFile =
+ new File(
+ SEQ_DIRS,
+ targetResources.get(0).getTsFile().getName()
+ + CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX);
+ CompactionLogger compactionLogger = new CompactionLogger(compactionLogFile);
+ compactionLogger.logFiles(targetResources, STR_TARGET_FILES);
+ compactionLogger.logFiles(seqResources, STR_SOURCE_FILES);
+ compactionLogger.logFiles(unseqResources, STR_SOURCE_FILES);
+ compactionLogger.close();
+ ICompactionPerformer performer =
+ new FastCompactionPerformer(seqResources, unseqResources, targetResources);
+ performer.setSummary(new FastCompactionTaskSummary());
+ performer.perform();
+ CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+ CompactionUtils.combineModsInCrossCompaction(seqResources, unseqResources, targetResources);
+
+ new CompactionRecoverTask(COMPACTION_TEST_SG, "0", tsFileManager, compactionLogFile, false)
+ .doCompaction();
+
+ Assert.assertTrue(tsFileManager.isAllowCompaction());
+
+ // all source file should exist
+ for (TsFileResource resource : seqResources) {
+ Assert.assertTrue(resource.getTsFile().exists());
+ Assert.assertTrue(
+ new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertTrue(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+ for (TsFileResource resource : unseqResources) {
+ Assert.assertTrue(resource.getTsFile().exists());
+ Assert.assertTrue(
+ new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertTrue(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+ // tmp target file, target file and target resource file should be deleted after compaction
+ for (TsFileResource resource : targetResources) {
+ if (resource == null) {
+ continue;
+ }
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(
+ resource
+ .getTsFilePath()
+ .replace(
+ IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX,
+ TsFileConstant.TSFILE_SUFFIX))
+ .exists());
+ Assert.assertFalse(
+ new File(
+ resource
+ .getTsFilePath()
+ .replace(
+ IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX,
+ TsFileConstant.TSFILE_SUFFIX)
+ + TsFileResource.RESOURCE_SUFFIX)
+ .exists());
+ }
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionTest.java
index cdff206b84..c53bab30e7 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.engine.compaction.inner;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.db.engine.compaction.execute.exception.CompactionExceptionHandler;
import org.apache.iotdb.db.engine.compaction.execute.performer.ICompactionPerformer;
import org.apache.iotdb.db.engine.compaction.execute.performer.impl.FastCompactionPerformer;
@@ -29,6 +31,7 @@ import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.utils.Pair;
import org.h2.store.fs.FileUtils;
@@ -37,12 +40,16 @@ import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
+import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
+
public class InnerSpaceCompactionExceptionTest extends AbstractInnerSpaceCompactionTest {
ICompactionPerformer performer = new FastCompactionPerformer(false);
@@ -469,4 +476,175 @@ public class InnerSpaceCompactionExceptionTest extends AbstractInnerSpaceCompact
Assert.assertTrue(tsFileManager.isAllowCompaction());
}
+
+ @Test
+ public void testWhenTargetFileIsDeletedAfterCompactionAndSomeSourceFilesLost() throws Exception {
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+
+ // generate mods file, the target file should be deleted after compaction
+ for (int device = 0; device < deviceNum; device++) {
+ for (int measurement = 0; measurement < measurementNum; measurement++) {
+ Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
+ deleteMap.put(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "device"
+ + device
+ + PATH_SEPARATOR
+ + "sensor"
+ + measurement,
+ new Pair(Long.MIN_VALUE, Long.MAX_VALUE));
+ seqResources.forEach(
+ x -> {
+ try {
+ CompactionFileGeneratorUtils.generateMods(deleteMap, x, false);
+ } catch (IOException | IllegalPathException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true);
+ File compactionLogFile =
+ new File(
+ SEQ_DIRS,
+ targetResources.get(0).getTsFile().getName()
+ + CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX);
+ CompactionLogger compactionLogger = new CompactionLogger(compactionLogFile);
+ compactionLogger.logFiles(targetResources, CompactionLogger.STR_TARGET_FILES);
+ compactionLogger.logFiles(seqResources, CompactionLogger.STR_SOURCE_FILES);
+ compactionLogger.close();
+ ICompactionPerformer performer =
+ new FastCompactionPerformer(seqResources, Collections.emptyList(), targetResources);
+ performer.setSummary(new FastCompactionTaskSummary());
+ performer.perform();
+ CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
+ CompactionUtils.combineModsInInnerCompaction(seqResources, targetResources.get(0));
+ seqResources.get(0).remove();
+
+ // meet errors and handle exception
+ CompactionExceptionHandler.handleException(
+ COMPACTION_TEST_SG,
+ compactionLogFile,
+ targetResources,
+ seqResources,
+ Collections.emptyList(),
+ tsFileManager,
+ 0,
+ true,
+ true);
+
+ Assert.assertTrue(tsFileManager.isAllowCompaction());
+
+ // all source file should not exist
+ for (TsFileResource resource : seqResources) {
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertFalse(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+ // the target file will be deleted
+ Assert.assertFalse(targetResources.get(0).getTsFile().exists());
+ Assert.assertFalse(targetResources.get(0).resourceFileExists());
+ }
+
+ @Test
+ public void testWhenTargetFileIsDeletedAfterCompactionAndAllSourceFilesExisted()
+ throws Exception {
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+
+ // generate mods file, the target file should be deleted after compaction
+ for (int device = 0; device < deviceNum; device++) {
+ for (int measurement = 0; measurement < measurementNum; measurement++) {
+ Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
+ deleteMap.put(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "device"
+ + device
+ + PATH_SEPARATOR
+ + "sensor"
+ + measurement,
+ new Pair(Long.MIN_VALUE, Long.MAX_VALUE));
+ seqResources.forEach(
+ x -> {
+ try {
+ CompactionFileGeneratorUtils.generateMods(deleteMap, x, false);
+ } catch (IOException | IllegalPathException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true);
+ File compactionLogFile =
+ new File(
+ SEQ_DIRS,
+ targetResources.get(0).getTsFile().getName()
+ + CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX);
+ CompactionLogger compactionLogger = new CompactionLogger(compactionLogFile);
+ compactionLogger.logFiles(targetResources, CompactionLogger.STR_TARGET_FILES);
+ compactionLogger.logFiles(seqResources, CompactionLogger.STR_SOURCE_FILES);
+ compactionLogger.close();
+ ICompactionPerformer performer =
+ new FastCompactionPerformer(seqResources, Collections.emptyList(), targetResources);
+ performer.setSummary(new FastCompactionTaskSummary());
+ performer.perform();
+ CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
+ CompactionUtils.combineModsInInnerCompaction(seqResources, targetResources.get(0));
+
+ // meet errors and handle exception
+ CompactionExceptionHandler.handleException(
+ COMPACTION_TEST_SG,
+ compactionLogFile,
+ targetResources,
+ seqResources,
+ Collections.emptyList(),
+ tsFileManager,
+ 0,
+ true,
+ true);
+
+ Assert.assertTrue(tsFileManager.isAllowCompaction());
+
+ // all source file should exist
+ for (TsFileResource resource : seqResources) {
+ Assert.assertTrue(resource.getTsFile().exists());
+ Assert.assertTrue(
+ new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertTrue(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+ // tmp target file, target file and target resource file should be deleted after compaction
+ for (TsFileResource resource : targetResources) {
+ if (resource == null) {
+ continue;
+ }
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(
+ resource
+ .getTsFilePath()
+ .replace(
+ IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX,
+ TsFileConstant.TSFILE_SUFFIX))
+ .exists());
+ Assert.assertFalse(
+ new File(
+ resource
+ .getTsFilePath()
+ .replace(
+ IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX,
+ TsFileConstant.TSFILE_SUFFIX)
+ + TsFileResource.RESOURCE_SUFFIX)
+ .exists());
+ }
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java
index d802b325ae..b3f3dd9566 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.engine.compaction.inner.sizetiered;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.path.MeasurementPath;
@@ -68,8 +69,10 @@ import java.util.List;
import java.util.Map;
import static org.apache.iotdb.db.engine.compaction.execute.utils.log.CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX;
+import static org.apache.iotdb.db.engine.compaction.execute.utils.log.CompactionLogger.STR_DELETED_TARGET_FILES;
import static org.apache.iotdb.db.engine.compaction.execute.utils.log.CompactionLogger.STR_SOURCE_FILES;
import static org.apache.iotdb.db.engine.compaction.execute.utils.log.CompactionLogger.STR_TARGET_FILES;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
import static org.junit.Assert.assertEquals;
public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactionTest {
@@ -1215,6 +1218,238 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
}
}
+ @Test
+ public void testWhenTargetFileShouldBeDeletedAfterCompactionAndSomeSourceFilesLost()
+ throws Exception {
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+
+ // generate mods file, the target file should be deleted after compaction
+ for (int device = 0; device < deviceNum; device++) {
+ for (int measurement = 0; measurement < measurementNum; measurement++) {
+ Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
+ deleteMap.put(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "device"
+ + device
+ + PATH_SEPARATOR
+ + "sensor"
+ + measurement,
+ new Pair(Long.MIN_VALUE, Long.MAX_VALUE));
+ seqResources.forEach(
+ x -> {
+ try {
+ CompactionFileGeneratorUtils.generateMods(deleteMap, x, false);
+ } catch (IllegalPathException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true);
+ File compactionLogFile =
+ new File(
+ SEQ_DIRS,
+ targetResources.get(0).getTsFile().getName()
+ + CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX);
+ CompactionLogger compactionLogger = new CompactionLogger(compactionLogFile);
+ compactionLogger.logFiles(targetResources, STR_TARGET_FILES);
+ compactionLogger.logFiles(seqResources, STR_SOURCE_FILES);
+
+ ICompactionPerformer performer =
+ new FastCompactionPerformer(seqResources, Collections.emptyList(), targetResources);
+ performer.setSummary(new FastCompactionTaskSummary());
+ performer.perform();
+ CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
+ CompactionUtils.combineModsInInnerCompaction(seqResources, targetResources.get(0));
+ compactionLogger.logFile(targetResources.get(0), STR_DELETED_TARGET_FILES);
+ compactionLogger.close();
+ seqResources.get(0).remove();
+
+ // recover compaction
+ new CompactionRecoverTask(COMPACTION_TEST_SG, "0", tsFileManager, compactionLogFile, true)
+ .doCompaction();
+
+ Assert.assertTrue(tsFileManager.isAllowCompaction());
+
+ // all source file should not exist
+ for (TsFileResource resource : seqResources) {
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertFalse(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+ // the target file should be deleted
+ Assert.assertFalse(targetResources.get(0).getTsFile().exists());
+ Assert.assertFalse(targetResources.get(0).resourceFileExists());
+ }
+
+ @Test
+ public void testWhenTargetFileIsDeletedAfterCompactionAndSomeSourceFilesLost() throws Exception {
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+
+ // generate mods file, the target file should be deleted after compaction
+ for (int device = 0; device < deviceNum; device++) {
+ for (int measurement = 0; measurement < measurementNum; measurement++) {
+ Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
+ deleteMap.put(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "device"
+ + device
+ + PATH_SEPARATOR
+ + "sensor"
+ + measurement,
+ new Pair(Long.MIN_VALUE, Long.MAX_VALUE));
+ seqResources.forEach(
+ x -> {
+ try {
+ CompactionFileGeneratorUtils.generateMods(deleteMap, x, false);
+ } catch (IllegalPathException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true);
+ File compactionLogFile =
+ new File(
+ SEQ_DIRS,
+ targetResources.get(0).getTsFile().getName()
+ + CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX);
+ CompactionLogger compactionLogger = new CompactionLogger(compactionLogFile);
+ compactionLogger.logFiles(targetResources, STR_TARGET_FILES);
+ compactionLogger.logFiles(seqResources, STR_SOURCE_FILES);
+
+ ICompactionPerformer performer =
+ new FastCompactionPerformer(seqResources, Collections.emptyList(), targetResources);
+ performer.setSummary(new FastCompactionTaskSummary());
+ performer.perform();
+ CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
+ CompactionUtils.combineModsInInnerCompaction(seqResources, targetResources.get(0));
+ compactionLogger.logFile(targetResources.get(0), STR_DELETED_TARGET_FILES);
+ compactionLogger.close();
+ CompactionUtils.deleteTsFilesInDisk(seqResources, COMPACTION_TEST_SG);
+ CompactionUtils.deleteModificationForSourceFile(seqResources, COMPACTION_TEST_SG);
+
+ if (targetResources.get(0).isDeleted()) {
+ targetResources.get(0).remove();
+ }
+
+ // recover compaction
+ new CompactionRecoverTask(COMPACTION_TEST_SG, "0", tsFileManager, compactionLogFile, true)
+ .doCompaction();
+
+ Assert.assertTrue(tsFileManager.isAllowCompaction());
+
+ // all source file should not exist
+ for (TsFileResource resource : seqResources) {
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertFalse(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+ // the target file should be deleted
+ Assert.assertFalse(targetResources.get(0).getTsFile().exists());
+ Assert.assertFalse(targetResources.get(0).resourceFileExists());
+ }
+
+ @Test
+ public void testWhenTargetFileIsDeletedAfterCompactionAndAllSourceFilesExisted()
+ throws Exception {
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+
+ // generate mods file, the target file should be deleted after compaction
+ for (int device = 0; device < deviceNum; device++) {
+ for (int measurement = 0; measurement < measurementNum; measurement++) {
+ Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
+ deleteMap.put(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "device"
+ + device
+ + PATH_SEPARATOR
+ + "sensor"
+ + measurement,
+ new Pair(Long.MIN_VALUE, Long.MAX_VALUE));
+ seqResources.forEach(
+ x -> {
+ try {
+ CompactionFileGeneratorUtils.generateMods(deleteMap, x, false);
+ } catch (IOException | IllegalPathException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true);
+ File compactionLogFile =
+ new File(
+ SEQ_DIRS,
+ targetResources.get(0).getTsFile().getName()
+ + CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX);
+ CompactionLogger compactionLogger = new CompactionLogger(compactionLogFile);
+ compactionLogger.logFiles(targetResources, STR_TARGET_FILES);
+ compactionLogger.logFiles(seqResources, STR_SOURCE_FILES);
+ compactionLogger.close();
+ ICompactionPerformer performer =
+ new FastCompactionPerformer(seqResources, Collections.emptyList(), targetResources);
+ performer.setSummary(new FastCompactionTaskSummary());
+ performer.perform();
+ CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
+ CompactionUtils.combineModsInInnerCompaction(seqResources, targetResources.get(0));
+
+ // recover compaction
+ new CompactionRecoverTask(COMPACTION_TEST_SG, "0", tsFileManager, compactionLogFile, true)
+ .doCompaction();
+
+ Assert.assertTrue(tsFileManager.isAllowCompaction());
+
+ // all source file should exist
+ for (TsFileResource resource : seqResources) {
+ Assert.assertTrue(resource.getTsFile().exists());
+ Assert.assertTrue(
+ new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertTrue(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+ // tmp target file, target file and target resource file should be deleted after compaction
+ for (TsFileResource resource : targetResources) {
+ if (resource == null) {
+ continue;
+ }
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(
+ resource
+ .getTsFilePath()
+ .replace(
+ IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX,
+ TsFileConstant.TSFILE_SUFFIX))
+ .exists());
+ Assert.assertFalse(
+ new File(
+ resource
+ .getTsFilePath()
+ .replace(
+ IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX,
+ TsFileConstant.TSFILE_SUFFIX)
+ + TsFileResource.RESOURCE_SUFFIX)
+ .exists());
+ }
+ }
+
private void closeTsFileSequenceReader() throws IOException {
for (TsFileResource tsFileResource : seqResources) {
FileReaderManager.getInstance().closeFileAndRemoveReader(tsFileResource.getTsFilePath());