You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/01/09 09:51:04 UTC
[iotdb] branch rel/0.13 updated: set empty target resource to DELETED and then delete it after compaction (#8774)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 8310fd100c set empty target resource to DELETED and then delete it after compaction (#8774)
8310fd100c is described below
commit 8310fd100ccbeae9a87da2d56dad02f130470b1d
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Mon Jan 9 17:50:54 2023 +0800
set empty target resource to DELETED and then delete it after compaction (#8774)
---
.../db/engine/compaction/CompactionUtils.java | 60 +++----
.../task/RewriteCrossSpaceCompactionTask.java | 15 +-
.../inner/sizetiered/SizeTieredCompactionTask.java | 11 +-
.../task/CompactionExceptionHandler.java | 8 +
.../writer/CrossSpaceCompactionWriter.java | 5 +-
.../writer/InnerSpaceCompactionWriter.java | 3 +-
.../db/engine/storagegroup/TsFileManager.java | 12 +-
.../db/engine/compaction/CompactionUtilsTest.java | 8 +-
.../cross/CrossSpaceCompactionExceptionTest.java | 190 ++++++++++++++++++++
.../RewriteCrossSpaceCompactionRecoverTest.java | 164 +++++++++++++++++
.../cross/RewriteCrossSpaceCompactionTest.java | 4 +-
.../inner/InnerSpaceCompactionExceptionTest.java | 194 +++++++++++++++++++++
.../SizeTieredCompactionRecoverTest.java | 156 +++++++++++++++++
13 files changed, 778 insertions(+), 52 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
index 7c94873747..53ecedc87a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.db.engine.compaction.writer.InnerSpaceCompactionWriter;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -57,7 +56,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -274,11 +273,6 @@ public class CompactionUtils {
// in the new file
for (int i = 0; i < targetResources.size(); i++) {
TsFileResource targetResource = targetResources.get(i);
- // remove the target file that has been deleted from list
- if (!targetResource.getTsFile().exists()) {
- targetResources.remove(i--);
- continue;
- }
for (TsFileResource unseqResource : unseqResources) {
targetResource.updatePlanIndexes(unseqResource);
}
@@ -339,40 +333,38 @@ public class CompactionUtils {
List<TsFileResource> unseqResources,
List<TsFileResource> targetResources)
throws IOException {
- // target file may less than source seq files, so we should find each target file with its
- // corresponding source seq file.
- Map<String, TsFileResource> seqFileInfoMap = new HashMap<>();
- for (TsFileResource tsFileResource : seqResources) {
- seqFileInfoMap.put(
- TsFileNameGenerator.increaseCrossCompactionCnt(tsFileResource.getTsFile()).getName(),
- tsFileResource);
+ Set<Modification> modifications = new HashSet<>();
+ // get compaction mods from all source unseq files
+ for (TsFileResource unseqFile : unseqResources) {
+ modifications.addAll(ModificationFile.getCompactionMods(unseqFile).getModifications());
}
- // update each target mods file.
- for (TsFileResource tsFileResource : targetResources) {
- updateOneTargetMods(
- tsFileResource, seqFileInfoMap.get(tsFileResource.getTsFile().getName()), unseqResources);
+
+ // write target mods file
+ for (int i = 0; i < targetResources.size(); i++) {
+ TsFileResource targetResource = targetResources.get(i);
+ if (targetResource == null) {
+ continue;
+ }
+ Set<Modification> seqModifications =
+ new HashSet<>(ModificationFile.getCompactionMods(seqResources.get(i)).getModifications());
+ modifications.addAll(seqModifications);
+ updateOneTargetMods(targetResource, modifications);
+ modifications.removeAll(seqModifications);
}
}
private static void updateOneTargetMods(
- TsFileResource targetFile, TsFileResource seqFile, List<TsFileResource> unseqFiles)
- throws IOException {
- // write mods in the seq file
- if (seqFile != null) {
- ModificationFile seqCompactionModificationFile = ModificationFile.getCompactionMods(seqFile);
- for (Modification modification : seqCompactionModificationFile.getModifications()) {
- targetFile.getModFile().write(modification);
- }
- }
- // write mods in all unseq files
- for (TsFileResource unseqFile : unseqFiles) {
- ModificationFile compactionUnseqModificationFile =
- ModificationFile.getCompactionMods(unseqFile);
- for (Modification modification : compactionUnseqModificationFile.getModifications()) {
- targetFile.getModFile().write(modification);
+ TsFileResource targetFile, Set<Modification> modifications) throws IOException {
+ if (!modifications.isEmpty()) {
+ try (ModificationFile modificationFile = ModificationFile.getNormalMods(targetFile)) {
+ for (Modification modification : modifications) {
+ // we have to set modification offset to MAX_VALUE, as the offset of source chunk may
+ // change after compaction
+ modification.setFileOffset(Long.MAX_VALUE);
+ modificationFile.write(modification);
+ }
}
}
- targetFile.getModFile().close();
}
public static void deleteCompactionModsFile(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java
index 8d9f5c0cf4..fe79edf826 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/RewriteCrossSpaceCompactionTask.java
@@ -194,12 +194,21 @@ public class RewriteCrossSpaceCompactionTask extends AbstractCrossSpaceCompactio
CompactionUtils.deleteCompactionModsFile(
selectedSeqTsFileResourceList, selectedUnSeqTsFileResourceList);
- // set target resources to CLOSED, so that they can be selected to compact
- targetTsfileResourceList.forEach(x -> x.setStatus(TsFileResourceStatus.CLOSED));
-
if (logFile.exists()) {
FileUtils.delete(logFile);
}
+
+ targetTsfileResourceList.forEach(
+ x -> {
+ if (x.isDeleted()) {
+ // target resource is empty after compaction, then delete it
+ x.remove();
+ } else {
+ // set target resources to CLOSED, so that they can be selected to compact
+ x.setStatus(TsFileResourceStatus.CLOSED);
+ }
+ });
+
double costTime = (System.currentTimeMillis() - startTime) / 1000.0d;
logger.info(
"{} [Compaction] CrossSpaceCompactionTask Costs {} s, compaction speed is {} MB/s",
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
index 489bccf2f2..aba0a9adec 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
@@ -192,12 +192,17 @@ public class SizeTieredCompactionTask extends AbstractInnerSpaceCompactionTask {
costTime,
((double) selectedFileSize) / 1024.0d / 1024.0d / costTime);
- // set target resource to CLOSED, so that it can be selected to compact
- targetTsFileResource.setStatus(TsFileResourceStatus.CLOSED);
-
if (logFile.exists()) {
FileUtils.delete(logFile);
}
+
+ if (targetTsFileResource.isDeleted()) {
+ // target resource is empty after compaction, then delete it
+ targetTsFileResource.remove();
+ } else {
+ // set target resource to CLOSED, so that it can be selected to compact
+ targetTsFileResource.setStatus(TsFileResourceStatus.CLOSED);
+ }
} catch (Throwable throwable) {
LOGGER.warn("{} [Compaction] Start to handle exception", fullStorageGroupName);
if (sizeTieredCompactionLogger != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionExceptionHandler.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionExceptionHandler.java
index 6bb67d7493..566238813f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionExceptionHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionExceptionHandler.java
@@ -232,6 +232,11 @@ public class CompactionExceptionHandler {
String fullStorageGroupName)
throws IOException {
for (TsFileResource targetResource : targetResources) {
+ if (targetResource.isDeleted()) {
+ // target resource is empty after compaction, then delete it
+ targetResource.remove();
+ continue;
+ }
if (!TsFileUtils.isTsFileComplete(targetResource.getTsFile())) {
LOGGER.error(
"{} [Compaction][ExceptionHandler] target file {} is not complete, and some source files {} is lost, do nothing. Set allowCompaction to false",
@@ -240,6 +245,9 @@ public class CompactionExceptionHandler {
lostSourceResources);
return false;
}
+
+ // set target resources to CLOSED, so that they can be selected to compact
+ targetResource.setStatus(TsFileResourceStatus.CLOSED);
}
return true;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
index e57ce14d1b..79d7511ec1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/CrossSpaceCompactionWriter.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.engine.compaction.writer;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
@@ -156,9 +157,9 @@ public class CrossSpaceCompactionWriter extends AbstractCompactionWriter {
public void endFile() throws IOException {
for (int i = 0; i < isEmptyFile.length; i++) {
fileWriterList.get(i).endFile();
- // delete empty target file
+ // set empty target file to DELETED
if (isEmptyFile[i]) {
- fileWriterList.get(i).getFile().delete();
+ targetTsFileResources.get(i).setStatus(TsFileResourceStatus.DELETED);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
index 4ef3286d83..b5d6a0642c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/InnerSpaceCompactionWriter.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.engine.compaction.writer;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
@@ -96,7 +97,7 @@ public class InnerSpaceCompactionWriter extends AbstractCompactionWriter {
public void endFile() throws IOException {
fileWriter.endFile();
if (isEmptyFile) {
- fileWriter.getFile().delete();
+ resource.setStatus(TsFileResourceStatus.DELETED);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
index c40149c05d..e090ff145b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
@@ -222,14 +222,18 @@ public class TsFileManager {
if (isTargetSequence) {
// seq inner space compaction or cross space compaction
for (TsFileResource resource : targetFileResources) {
- TsFileResourceManager.getInstance().registerSealedTsFileResource(resource);
- sequenceFiles.get(timePartition).keepOrderInsert(resource);
+ if (!resource.isDeleted()) {
+ TsFileResourceManager.getInstance().registerSealedTsFileResource(resource);
+ sequenceFiles.get(timePartition).keepOrderInsert(resource);
+ }
}
} else {
// unseq inner space compaction
for (TsFileResource resource : targetFileResources) {
- TsFileResourceManager.getInstance().registerSealedTsFileResource(resource);
- unsequenceFiles.get(timePartition).keepOrderInsert(resource);
+ if (!resource.isDeleted()) {
+ TsFileResourceManager.getInstance().registerSealedTsFileResource(resource);
+ unsequenceFiles.get(timePartition).keepOrderInsert(resource);
+ }
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java
index fef8640e3a..338f76a363 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionUtilsTest.java
@@ -2423,7 +2423,7 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
CompactionUtils.compact(seqResources, unseqResources, targetResources);
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
-
+ targetResources.removeIf(x -> x.isDeleted());
Assert.assertEquals(2, targetResources.size());
List<String> deviceIdList = new ArrayList<>();
deviceIdList.add(COMPACTION_TEST_SG + PATH_SEPARATOR + "d3");
@@ -3443,6 +3443,7 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
CompactionUtils.compact(seqResources, unseqResources, targetResources);
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+ targetResources.removeIf(x -> x.isDeleted());
Assert.assertEquals(2, targetResources.size());
List<String> deviceIdList = new ArrayList<>();
@@ -3969,6 +3970,7 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
CompactionUtils.compact(seqResources, unseqResources, targetResources);
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+ targetResources.removeIf(x -> x.isDeleted());
Assert.assertEquals(2, targetResources.size());
@@ -4832,7 +4834,7 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
CompactionUtils.compact(seqResources, unseqResources, targetResources);
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
- targetResources.removeIf(resource -> resource == null);
+ targetResources.removeIf(x -> x.isDeleted());
Assert.assertEquals(3, targetResources.size());
List<String> deviceIdList = new ArrayList<>();
@@ -4960,7 +4962,7 @@ public class CompactionUtilsTest extends AbstractCompactionTest {
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
CompactionUtils.compact(seqResources, unseqResources, targetResources);
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
- targetResources.removeIf(resource -> resource == null);
+ targetResources.removeIf(x -> x.isDeleted());
Assert.assertEquals(3, targetResources.size());
List<String> deviceIdList = new ArrayList<>();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionTest.java
index 1463cbda62..4cf05a51f8 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionExceptionTest.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
@@ -522,4 +523,193 @@ public class CrossSpaceCompactionExceptionTest extends AbstractCompactionTest {
Assert.assertEquals(5, tsFileManager.getUnsequenceListByTimePartition(0).size());
Assert.assertTrue(tsFileManager.isAllowCompaction());
}
+
+ @Test
+ public void testWhenTargetFileIsDeletedAfterCompactionAndSomeSourceFilesLost() throws Exception {
+ createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
+ createFiles(2, 4, 5, 300, 700, 700, 50, 50, false, true);
+ createFiles(3, 3, 4, 200, 20, 10020, 30, 30, false, false);
+ createFiles(2, 1, 5, 100, 450, 20450, 0, 0, false, false);
+ TsFileManager tsFileManager = new TsFileManager(COMPACTION_TEST_SG, "0", SEQ_DIRS.getPath());
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+
+ // generate mods file, the first target file should be deleted after compaction
+ for (int device = 0; device < 3; device++) {
+ for (int measurement = 0; measurement < 4; measurement++) {
+ Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
+ deleteMap.put(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + device + PATH_SEPARATOR + "s" + measurement,
+ new Pair(0L, 300L));
+ CompactionFileGeneratorUtils.generateMods(deleteMap, seqResources.get(0), false);
+ CompactionFileGeneratorUtils.generateMods(deleteMap, unseqResources.get(0), false);
+ CompactionFileGeneratorUtils.generateMods(deleteMap, unseqResources.get(1), false);
+ }
+ }
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+ File compactionLogFile =
+ new File(
+ SEQ_DIRS,
+ targetResources.get(0).getTsFile().getName()
+ + CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX);
+ CompactionLogger compactionLogger = new CompactionLogger(compactionLogFile);
+ compactionLogger.logFiles(targetResources, STR_TARGET_FILES);
+ compactionLogger.logFiles(seqResources, STR_SOURCE_FILES);
+ compactionLogger.logFiles(unseqResources, STR_SOURCE_FILES);
+ compactionLogger.close();
+ CompactionUtils.compact(seqResources, unseqResources, targetResources);
+ CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+ CompactionUtils.combineModsInCompaction(seqResources, unseqResources, targetResources);
+ seqResources.get(0).remove();
+
+ // meet errors and handle exception
+ CompactionExceptionHandler.handleException(
+ COMPACTION_TEST_SG,
+ compactionLogFile,
+ targetResources,
+ seqResources,
+ unseqResources,
+ tsFileManager,
+ 0,
+ false,
+ true);
+
+ Assert.assertTrue(tsFileManager.isAllowCompaction());
+
+ // all source file should not exist
+ for (TsFileResource resource : seqResources) {
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertFalse(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+ for (TsFileResource resource : unseqResources) {
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertFalse(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+ // the first target file should be deleted after compaction, the others still exist
+ for (TsFileResource resource : targetResources) {
+ if (resource.getVersion() == 0) {
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(resource.resourceFileExists());
+ } else {
+ Assert.assertTrue(resource.getTsFile().exists());
+ Assert.assertTrue(resource.resourceFileExists());
+ }
+ }
+ }
+
+ @Test
+ public void testWhenTargetFileIsDeletedAfterCompactionAndAllSourceFilesExisted()
+ throws Exception {
+ createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
+ createFiles(2, 4, 5, 300, 700, 700, 50, 50, false, true);
+ createFiles(3, 3, 4, 200, 20, 10020, 30, 30, false, false);
+ createFiles(2, 1, 5, 100, 450, 20450, 0, 0, false, false);
+ TsFileManager tsFileManager = new TsFileManager(COMPACTION_TEST_SG, "0", SEQ_DIRS.getPath());
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+
+ // generate mods file, the first target file should be deleted after compaction
+ for (int device = 0; device < 3; device++) {
+ for (int measurement = 0; measurement < 4; measurement++) {
+ Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
+ deleteMap.put(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + device + PATH_SEPARATOR + "s" + measurement,
+ new Pair(0L, 300L));
+ seqResources.forEach(
+ x -> {
+ try {
+ CompactionFileGeneratorUtils.generateMods(deleteMap, x, false);
+ } catch (IllegalPathException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ unseqResources.forEach(
+ x -> {
+ try {
+ CompactionFileGeneratorUtils.generateMods(deleteMap, x, false);
+ } catch (IllegalPathException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+ File compactionLogFile =
+ new File(
+ SEQ_DIRS,
+ targetResources.get(0).getTsFile().getName()
+ + CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX);
+ CompactionLogger compactionLogger = new CompactionLogger(compactionLogFile);
+ compactionLogger.logFiles(targetResources, STR_TARGET_FILES);
+ compactionLogger.logFiles(seqResources, STR_SOURCE_FILES);
+ compactionLogger.logFiles(unseqResources, STR_SOURCE_FILES);
+ compactionLogger.close();
+ CompactionUtils.compact(seqResources, unseqResources, targetResources);
+ CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+ CompactionUtils.combineModsInCompaction(seqResources, unseqResources, targetResources);
+
+ // meet errors and handle exception
+ CompactionExceptionHandler.handleException(
+ COMPACTION_TEST_SG,
+ compactionLogFile,
+ targetResources,
+ seqResources,
+ unseqResources,
+ tsFileManager,
+ 0,
+ false,
+ true);
+
+ Assert.assertTrue(tsFileManager.isAllowCompaction());
+
+ // all source file should exist
+ for (TsFileResource resource : seqResources) {
+ Assert.assertTrue(resource.getTsFile().exists());
+ Assert.assertTrue(
+ new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertTrue(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+ for (TsFileResource resource : unseqResources) {
+ Assert.assertTrue(resource.getTsFile().exists());
+ Assert.assertTrue(
+ new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertTrue(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+ // tmp target file, target file and target resource file should be deleted after compaction
+ for (TsFileResource resource : targetResources) {
+ if (resource == null) {
+ continue;
+ }
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(
+ resource
+ .getTsFilePath()
+ .replace(
+ IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX,
+ TsFileConstant.TSFILE_SUFFIX))
+ .exists());
+ Assert.assertFalse(
+ new File(
+ resource
+ .getTsFilePath()
+ .replace(
+ IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX,
+ TsFileConstant.TSFILE_SUFFIX)
+ + TsFileResource.RESOURCE_SUFFIX)
+ .exists());
+ }
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionRecoverTest.java
index 4e83f3c295..ef689733df 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionRecoverTest.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
@@ -629,4 +630,167 @@ public class RewriteCrossSpaceCompactionRecoverTest extends AbstractCompactionTe
Assert.assertTrue(tsFileManager.isAllowCompaction());
}
+
+ @Test
+ public void testWhenTargetFileIsDeletedAfterCompactionAndSomeSourceFilesLost() throws Exception {
+ createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
+ createFiles(2, 4, 5, 300, 700, 700, 50, 50, false, true);
+ createFiles(3, 3, 4, 200, 20, 10020, 30, 30, false, false);
+ createFiles(2, 1, 5, 100, 450, 20450, 0, 0, false, false);
+ TsFileManager tsFileManager = new TsFileManager(COMPACTION_TEST_SG, "0", SEQ_DIRS.getPath());
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+
+ // generate mods file, the first target file should be deleted after compaction
+ for (int device = 0; device < 3; device++) {
+ for (int measurement = 0; measurement < 4; measurement++) {
+ Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
+ deleteMap.put(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + device + PATH_SEPARATOR + "s" + measurement,
+ new Pair(0L, 300L));
+ CompactionFileGeneratorUtils.generateMods(deleteMap, seqResources.get(0), false);
+ CompactionFileGeneratorUtils.generateMods(deleteMap, unseqResources.get(0), false);
+ CompactionFileGeneratorUtils.generateMods(deleteMap, unseqResources.get(1), false);
+ }
+ }
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+ File compactionLogFile =
+ new File(
+ SEQ_DIRS,
+ targetResources.get(0).getTsFile().getName()
+ + CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX);
+ CompactionLogger compactionLogger = new CompactionLogger(compactionLogFile);
+ compactionLogger.logFiles(targetResources, STR_TARGET_FILES);
+ compactionLogger.logFiles(seqResources, STR_SOURCE_FILES);
+ compactionLogger.logFiles(unseqResources, STR_SOURCE_FILES);
+ compactionLogger.close();
+ CompactionUtils.compact(seqResources, unseqResources, targetResources);
+ CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+ CompactionUtils.combineModsInCompaction(seqResources, unseqResources, targetResources);
+ seqResources.get(0).remove();
+
+ new CompactionRecoverTask(COMPACTION_TEST_SG, "0", tsFileManager, compactionLogFile, false)
+ .doCompaction();
+ Assert.assertTrue(tsFileManager.isAllowCompaction());
+
+ // all source file should not exist
+ for (TsFileResource resource : seqResources) {
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertFalse(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+ for (TsFileResource resource : unseqResources) {
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertFalse(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+ // although the first target file is empty after compaction, it will still exist after recovery.
+ for (TsFileResource resource : targetResources) {
+ Assert.assertTrue(resource.getTsFile().exists());
+ Assert.assertTrue(resource.resourceFileExists());
+ }
+ }
+
+ @Test
+ public void testWhenTargetFileIsDeletedAfterCompactionAndAllSourceFilesExisted()
+ throws Exception {
+ createFiles(2, 2, 3, 300, 0, 0, 50, 50, false, true);
+ createFiles(2, 4, 5, 300, 700, 700, 50, 50, false, true);
+ createFiles(3, 3, 4, 200, 20, 10020, 30, 30, false, false);
+ createFiles(2, 1, 5, 100, 450, 20450, 0, 0, false, false);
+ TsFileManager tsFileManager = new TsFileManager(COMPACTION_TEST_SG, "0", SEQ_DIRS.getPath());
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+
+ // generate mods file, the first target file should be deleted after compaction
+ for (int device = 0; device < 3; device++) {
+ for (int measurement = 0; measurement < 4; measurement++) {
+ Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
+ deleteMap.put(
+ COMPACTION_TEST_SG + PATH_SEPARATOR + "d" + device + PATH_SEPARATOR + "s" + measurement,
+ new Pair(0L, 300L));
+ seqResources.forEach(
+ x -> {
+ try {
+ CompactionFileGeneratorUtils.generateMods(deleteMap, x, false);
+ } catch (IllegalPathException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ unseqResources.forEach(
+ x -> {
+ try {
+ CompactionFileGeneratorUtils.generateMods(deleteMap, x, false);
+ } catch (IllegalPathException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
+ File compactionLogFile =
+ new File(
+ SEQ_DIRS,
+ targetResources.get(0).getTsFile().getName()
+ + CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX);
+ CompactionLogger compactionLogger = new CompactionLogger(compactionLogFile);
+ compactionLogger.logFiles(targetResources, STR_TARGET_FILES);
+ compactionLogger.logFiles(seqResources, STR_SOURCE_FILES);
+ compactionLogger.logFiles(unseqResources, STR_SOURCE_FILES);
+ compactionLogger.close();
+ CompactionUtils.compact(seqResources, unseqResources, targetResources);
+ CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
+ CompactionUtils.combineModsInCompaction(seqResources, unseqResources, targetResources);
+
+ new CompactionRecoverTask(COMPACTION_TEST_SG, "0", tsFileManager, compactionLogFile, false)
+ .doCompaction();
+
+ // all source file should not exist
+ for (TsFileResource resource : seqResources) {
+ Assert.assertTrue(resource.getTsFile().exists());
+ Assert.assertTrue(
+ new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertTrue(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+ for (TsFileResource resource : unseqResources) {
+ Assert.assertTrue(resource.getTsFile().exists());
+ Assert.assertTrue(
+ new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertTrue(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+ // tmp target file, target file and target resource file should be deleted after compaction
+ for (TsFileResource resource : targetResources) {
+ if (resource == null) {
+ continue;
+ }
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(
+ resource
+ .getTsFilePath()
+ .replace(
+ IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX,
+ TsFileConstant.TSFILE_SUFFIX))
+ .exists());
+ Assert.assertFalse(
+ new File(
+ resource
+ .getTsFilePath()
+ .replace(
+ IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX,
+ TsFileConstant.TSFILE_SUFFIX)
+ + TsFileResource.RESOURCE_SUFFIX)
+ .exists());
+ }
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
index 9aa3d9b76f..5cdf206a19 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
@@ -247,7 +247,7 @@ public class RewriteCrossSpaceCompactionTest extends AbstractCompactionTest {
.replace(CROSS_COMPACTION_TMP_FILE_SUFFIX, TsFileConstant.TSFILE_SUFFIX)));
resource.resetModFile();
Assert.assertTrue(resource.getModFile().exists());
- Assert.assertEquals(24, resource.getModFile().getModifications().size());
+ Assert.assertEquals(4, resource.getModFile().getModifications().size());
}
FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
@@ -485,7 +485,7 @@ public class RewriteCrossSpaceCompactionTest extends AbstractCompactionTest {
continue;
}
Assert.assertTrue(resource.getModFile().exists());
- Assert.assertEquals(180, resource.getModFile().getModifications().size());
+ Assert.assertEquals(30, resource.getModFile().getModifications().size());
}
FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionTest.java
index c9b49a6138..7ff51b6583 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionExceptionTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.engine.compaction.inner;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.engine.compaction.CompactionUtils;
import org.apache.iotdb.db.engine.compaction.inner.utils.InnerSpaceCompactionUtils;
import org.apache.iotdb.db.engine.compaction.task.CompactionExceptionHandler;
import org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils;
@@ -26,22 +28,49 @@ import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.utils.Pair;
import org.h2.store.fs.FileUtils;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
+import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import static org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogger.STR_SOURCE_FILES;
+import static org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogger.STR_TARGET_FILES;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
+
public class InnerSpaceCompactionExceptionTest extends AbstractInnerSpaceCompactionTest {
+ private final String oldThreadName = Thread.currentThread().getName();
+
+ @Before
+ public void setUp() throws IOException, WriteProcessException, MetadataException {
+ super.setUp();
+ Thread.currentThread().setName("pool-1-IoTDB-Compaction-1");
+ }
+
+ @After
+ public void tearDown() throws IOException, StorageEngineException {
+ super.tearDown();
+ Thread.currentThread().setName(oldThreadName);
+ }
+
/**
* Test when all source files exist, and target file is not complete. System should delete target
* file and its resource at this time.
@@ -435,4 +464,169 @@ public class InnerSpaceCompactionExceptionTest extends AbstractInnerSpaceCompact
Assert.assertTrue(tsFileManager.isAllowCompaction());
}
+
+ @Test
+ public void testWhenTargetFileIsDeletedAfterCompactionAndSomeSourceFilesLost() throws Exception {
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+
+ // generate mods file, the target file should be deleted after compaction
+ for (int device = 0; device < deviceNum; device++) {
+ for (int measurement = 0; measurement < measurementNum; measurement++) {
+ Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
+ deleteMap.put(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "device"
+ + device
+ + PATH_SEPARATOR
+ + "sensor"
+ + measurement,
+ new Pair(Long.MIN_VALUE, Long.MAX_VALUE));
+ seqResources.forEach(
+ x -> {
+ try {
+ CompactionFileGeneratorUtils.generateMods(deleteMap, x, false);
+ } catch (IllegalPathException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true);
+ File compactionLogFile =
+ new File(
+ SEQ_DIRS,
+ targetResources.get(0).getTsFile().getName()
+ + CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX);
+ CompactionLogger compactionLogger = new CompactionLogger(compactionLogFile);
+ compactionLogger.logFiles(targetResources, STR_TARGET_FILES);
+ compactionLogger.logFiles(seqResources, STR_SOURCE_FILES);
+ compactionLogger.close();
+ CompactionUtils.compact(seqResources, Collections.emptyList(), targetResources);
+ CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
+ CompactionUtils.combineModsInCompaction(seqResources, Collections.emptyList(), targetResources);
+ seqResources.get(0).remove();
+
+ // meet errors and handle exception
+ CompactionExceptionHandler.handleException(
+ COMPACTION_TEST_SG,
+ compactionLogFile,
+ targetResources,
+ seqResources,
+ Collections.emptyList(),
+ tsFileManager,
+ 0,
+ true,
+ true);
+
+ Assert.assertTrue(tsFileManager.isAllowCompaction());
+
+ // all source file should not exist
+ for (TsFileResource resource : seqResources) {
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertFalse(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+ // the target file should be deleted after compaction
+ Assert.assertFalse(targetResources.get(0).getTsFile().exists());
+ Assert.assertFalse(targetResources.get(0).resourceFileExists());
+ }
+
+ @Test
+ public void testWhenTargetFileIsDeletedAfterCompactionAndAllSourceFilesExisted()
+ throws Exception {
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+
+ // generate mods file, the target file should be deleted after compaction
+ for (int device = 0; device < deviceNum; device++) {
+ for (int measurement = 0; measurement < measurementNum; measurement++) {
+ Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
+ deleteMap.put(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "device"
+ + device
+ + PATH_SEPARATOR
+ + "sensor"
+ + measurement,
+ new Pair(Long.MIN_VALUE, Long.MAX_VALUE));
+ seqResources.forEach(
+ x -> {
+ try {
+ CompactionFileGeneratorUtils.generateMods(deleteMap, x, false);
+ } catch (IllegalPathException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true);
+ File compactionLogFile =
+ new File(
+ SEQ_DIRS,
+ targetResources.get(0).getTsFile().getName()
+ + CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX);
+ CompactionLogger compactionLogger = new CompactionLogger(compactionLogFile);
+ compactionLogger.logFiles(targetResources, STR_TARGET_FILES);
+ compactionLogger.logFiles(seqResources, STR_SOURCE_FILES);
+ compactionLogger.close();
+ CompactionUtils.compact(seqResources, Collections.emptyList(), targetResources);
+ CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
+ CompactionUtils.combineModsInCompaction(seqResources, Collections.emptyList(), targetResources);
+
+ // meet errors and handle exception
+ CompactionExceptionHandler.handleException(
+ COMPACTION_TEST_SG,
+ compactionLogFile,
+ targetResources,
+ seqResources,
+ Collections.emptyList(),
+ tsFileManager,
+ 0,
+ true,
+ true);
+
+ Assert.assertTrue(tsFileManager.isAllowCompaction());
+
+ // all source file should exist
+ for (TsFileResource resource : seqResources) {
+ Assert.assertTrue(resource.getTsFile().exists());
+ Assert.assertTrue(
+ new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertTrue(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+ // tmp target file, target file and target resource file should be deleted after compaction
+ for (TsFileResource resource : targetResources) {
+ if (resource == null) {
+ continue;
+ }
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(
+ resource
+ .getTsFilePath()
+ .replace(
+ IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX,
+ TsFileConstant.TSFILE_SUFFIX))
+ .exists());
+ Assert.assertFalse(
+ new File(
+ resource
+ .getTsFilePath()
+ .replace(
+ IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX,
+ TsFileConstant.TSFILE_SUFFIX)
+ + TsFileResource.RESOURCE_SUFFIX)
+ .exists());
+ }
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java
index b6bd953080..a47b5b1bfa 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTest.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.engine.compaction.inner.sizetiered;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.compaction.CompactionUtils;
import org.apache.iotdb.db.engine.compaction.inner.AbstractInnerSpaceCompactionTest;
import org.apache.iotdb.db.engine.compaction.inner.utils.InnerSpaceCompactionUtils;
import org.apache.iotdb.db.engine.compaction.task.CompactionRecoverTask;
@@ -33,6 +34,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.query.control.FileReaderManager;
@@ -67,19 +69,24 @@ import java.util.Map;
import static org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX;
import static org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogger.STR_SOURCE_FILES;
import static org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogger.STR_TARGET_FILES;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
import static org.junit.Assert.assertEquals;
public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactionTest {
+ private final String oldThreadName = Thread.currentThread().getName();
+
@Before
public void setUp() throws IOException, WriteProcessException, MetadataException {
super.setUp();
+ Thread.currentThread().setName("pool-1-IoTDB-Compaction-1");
}
@After
public void tearDown() throws IOException, StorageEngineException {
new CompactionConfigRestorer().restoreCompactionConfig();
super.tearDown();
+ Thread.currentThread().setName(oldThreadName);
}
/** Target file uncompleted, source files and log exists */
@@ -1121,6 +1128,155 @@ public class SizeTieredCompactionRecoverTest extends AbstractInnerSpaceCompactio
}
}
+ @Test
+ public void testWhenTargetFileIsDeletedAfterCompactionAndSomeSourceFilesLost() throws Exception {
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+
+ // generate mods file, the target file should be deleted after compaction
+ for (int device = 0; device < deviceNum; device++) {
+ for (int measurement = 0; measurement < measurementNum; measurement++) {
+ Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
+ deleteMap.put(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "device"
+ + device
+ + PATH_SEPARATOR
+ + "sensor"
+ + measurement,
+ new Pair(Long.MIN_VALUE, Long.MAX_VALUE));
+ seqResources.forEach(
+ x -> {
+ try {
+ CompactionFileGeneratorUtils.generateMods(deleteMap, x, false);
+ } catch (IllegalPathException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true);
+ File compactionLogFile =
+ new File(
+ SEQ_DIRS,
+ targetResources.get(0).getTsFile().getName()
+ + CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX);
+ CompactionLogger compactionLogger = new CompactionLogger(compactionLogFile);
+ compactionLogger.logFiles(targetResources, STR_TARGET_FILES);
+ compactionLogger.logFiles(seqResources, STR_SOURCE_FILES);
+ compactionLogger.close();
+ CompactionUtils.compact(seqResources, Collections.emptyList(), targetResources);
+ CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
+ CompactionUtils.combineModsInCompaction(seqResources, Collections.emptyList(), targetResources);
+ seqResources.get(0).remove();
+
+ // recover compaction
+ new CompactionRecoverTask(COMPACTION_TEST_SG, "0", tsFileManager, compactionLogFile, true)
+ .doCompaction();
+
+ Assert.assertTrue(tsFileManager.isAllowCompaction());
+
+ // all source file should not exist
+ for (TsFileResource resource : seqResources) {
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertFalse(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+ // the target file will be existed although it is empty after recover
+ Assert.assertTrue(targetResources.get(0).getTsFile().exists());
+ Assert.assertTrue(targetResources.get(0).resourceFileExists());
+ }
+
+ @Test
+ public void testWhenTargetFileIsDeletedAfterCompactionAndAllSourceFilesExisted()
+ throws Exception {
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+
+ // generate mods file, the target file should be deleted after compaction
+ for (int device = 0; device < deviceNum; device++) {
+ for (int measurement = 0; measurement < measurementNum; measurement++) {
+ Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
+ deleteMap.put(
+ COMPACTION_TEST_SG
+ + PATH_SEPARATOR
+ + "device"
+ + device
+ + PATH_SEPARATOR
+ + "sensor"
+ + measurement,
+ new Pair(Long.MIN_VALUE, Long.MAX_VALUE));
+ seqResources.forEach(
+ x -> {
+ try {
+ CompactionFileGeneratorUtils.generateMods(deleteMap, x, false);
+ } catch (IllegalPathException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+
+ List<TsFileResource> targetResources =
+ CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true);
+ File compactionLogFile =
+ new File(
+ SEQ_DIRS,
+ targetResources.get(0).getTsFile().getName()
+ + CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX);
+ CompactionLogger compactionLogger = new CompactionLogger(compactionLogFile);
+ compactionLogger.logFiles(targetResources, STR_TARGET_FILES);
+ compactionLogger.logFiles(seqResources, STR_SOURCE_FILES);
+ compactionLogger.close();
+ CompactionUtils.compact(seqResources, Collections.emptyList(), targetResources);
+ CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
+ CompactionUtils.combineModsInCompaction(seqResources, Collections.emptyList(), targetResources);
+
+ // recover compaction
+ new CompactionRecoverTask(COMPACTION_TEST_SG, "0", tsFileManager, compactionLogFile, true)
+ .doCompaction();
+
+ Assert.assertTrue(tsFileManager.isAllowCompaction());
+
+ // all source file should exist
+ for (TsFileResource resource : seqResources) {
+ Assert.assertTrue(resource.getTsFile().exists());
+ Assert.assertTrue(
+ new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).exists());
+ Assert.assertTrue(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+ // tmp target file, target file and target resource file should be deleted after compaction
+ for (TsFileResource resource : targetResources) {
+ if (resource == null) {
+ continue;
+ }
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(
+ new File(
+ resource
+ .getTsFilePath()
+ .replace(
+ IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX,
+ TsFileConstant.TSFILE_SUFFIX))
+ .exists());
+ Assert.assertFalse(
+ new File(
+ resource
+ .getTsFilePath()
+ .replace(
+ IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX,
+ TsFileConstant.TSFILE_SUFFIX)
+ + TsFileResource.RESOURCE_SUFFIX)
+ .exists());
+ }
+ }
+
private void closeTsFileSequenceReader() throws IOException {
for (TsFileResource tsFileResource : seqResources) {
FileReaderManager.getInstance().closeFileAndRemoveReader(tsFileResource.getTsFilePath());