You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/09/05 09:27:50 UTC
[iotdb] branch rel/0.12 updated: [To rel/0.12] [IOTDB-1611] Fix
tsfile recover bug (#3895)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 871c483 [To rel/0.12] [IOTDB-1611] Fix tsfile recover bug (#3895)
871c483 is described below
commit 871c4835b260487207c8ce11f49cd382a2a469a3
Author: liuxuxin <37...@users.noreply.github.com>
AuthorDate: Sun Sep 5 17:27:19 2021 +0800
[To rel/0.12] [IOTDB-1611] Fix tsfile recover bug (#3895)
---
.../level/LevelCompactionTsFileManagement.java | 133 +++------
.../compaction/LevelCompactionRecoverTest.java | 319 +++++++++++++++++----
2 files changed, 296 insertions(+), 156 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 0701b00..03fa8da 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -29,11 +29,9 @@ import org.apache.iotdb.db.engine.compaction.utils.CompactionUtils;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
-import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -430,9 +428,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
logAnalyzer.analyze();
Set<String> deviceSet = logAnalyzer.getDeviceSet();
List<String> sourceFileList = logAnalyzer.getSourceFiles();
- long offset = logAnalyzer.getOffset();
String targetFile = logAnalyzer.getTargetFile();
- boolean fullMerge = logAnalyzer.isFullMerge();
boolean isSeq = logAnalyzer.isSeq();
if (targetFile == null || sourceFileList.isEmpty()) {
return;
@@ -445,118 +441,55 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
}
return;
}
- if (fullMerge) {
- // get tsfile resource from list, as they have been recovered in StorageGroupProcessor
- TsFileResource targetTsFileResource = getRecoverTsFileResource(targetFile, isSeq);
- if (targetTsFileResource == null) {
- targetTsFileResource = getTsFileResource(targetFile, isSeq);
- if (targetTsFileResource == null) {
- logger.warn("get null targetTsFileResource");
- return;
- }
- }
- long timePartition = targetTsFileResource.getTimePartition();
- RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(target, false);
- // if not complete compaction, resume merge
- if (writer.hasCrashed()) {
- if (offset > 0) {
- writer.getIOWriterOut().truncate(offset);
- writer.dropLastChunkGroupMetadata();
- }
- CompactionLogger compactionLogger =
- new CompactionLogger(storageGroupDir, storageGroupName);
- List<Modification> modifications = new ArrayList<>();
- CompactionUtils.merge(
- targetTsFileResource,
- getTsFileList(isSeq),
- storageGroupName,
- compactionLogger,
- deviceSet,
- isSeq,
- modifications,
- writer);
- compactionLogger.close();
+ // get tsfile resource from list, as they have been recovered in StorageGroupProcessor
+ TsFileResource targetResource = getRecoverTsFileResource(targetFile, isSeq);
+ if (targetResource != null) {
+ // target tsfile is not compeleted
+ targetResource.remove();
+ if (isSeq) {
+ sequenceRecoverTsFileResources.clear();
} else {
- writer.close();
- }
- // complete compaction and delete source file
- deleteAllSubLevelFiles(isSeq, timePartition);
- } else {
- // get tsfile resource from list, as they have been recovered in StorageGroupProcessor
- TsFileResource targetResource = getRecoverTsFileResource(targetFile, isSeq);
- if (targetResource == null) {
- // new file already merged but old file not deleted
- targetResource = getTsFileResource(targetFile, isSeq);
- if (targetResource == null) {
- throw new IOException();
- }
+ unSequenceRecoverTsFileResources.clear();
}
+ } else if ((targetResource = getTsFileResource(targetFile, isSeq)) != null) {
+ // complete compaction, delete source files
long timePartition = targetResource.getTimePartition();
List<TsFileResource> sourceTsFileResources = new ArrayList<>();
for (String file : sourceFileList) {
// get tsfile resource from list, as they have been recovered in StorageGroupProcessor
TsFileResource sourceTsFileResource = getTsFileResource(file, isSeq);
if (sourceTsFileResource == null) {
- throw new IOException();
+ // if sourceTsFileResource is null, it has been deleted
+ continue;
}
sourceTsFileResources.add(sourceTsFileResource);
}
- int level = TsFileResource.getMergeLevel(new File(sourceFileList.get(0)).getName());
- RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(target, false);
- List<Modification> modifications = new ArrayList<>();
- // if not complete compaction, resume merge
- if (writer.hasCrashed()) {
- if (offset > 0) {
- writer.getIOWriterOut().truncate(offset);
- writer.dropLastChunkGroupMetadata();
- }
- CompactionLogger compactionLogger =
- new CompactionLogger(storageGroupDir, storageGroupName);
- CompactionUtils.merge(
- targetResource,
- sourceTsFileResources,
- storageGroupName,
- compactionLogger,
- deviceSet,
- isSeq,
- modifications,
- writer);
- compactionLogger.close();
- // complete compaction and add target tsfile
- int targetLevel = TsFileResource.getMergeLevel(targetResource.getTsFile().getName());
- if (isSeq) {
- sequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
- sequenceRecoverTsFileResources.clear();
- } else {
- unSequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
- unSequenceRecoverTsFileResources.clear();
+ if (sourceFileList.size() != 0) {
+ List<Modification> modifications = new ArrayList<>();
+ // if not complete compaction, remove target file
+ writeLock();
+ try {
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException(
+ String.format("%s [Compaction] abort", storageGroupName));
+ }
+ int level = TsFileResource.getMergeLevel(new File(sourceFileList.get(0)).getName());
+ deleteLevelFilesInList(timePartition, sourceTsFileResources, level, isSeq);
+ } finally {
+ writeUnlock();
}
- } else {
- // complete compaction, just close writer
- writer.close();
- }
- // complete compaction, delete source files
- writeLock();
- try {
- if (Thread.currentThread().isInterrupted()) {
- throw new InterruptedException(
- String.format("%s [Compaction] abort", storageGroupName));
+ for (TsFileResource tsFileResource : sourceTsFileResources) {
+ logger.info(
+ "{} recover storage group delete source file {}",
+ storageGroupName,
+ tsFileResource.getTsFile().getName());
}
- deleteLevelFilesInList(timePartition, sourceTsFileResources, level, isSeq);
- } finally {
- writeUnlock();
- }
- for (TsFileResource tsFileResource : sourceTsFileResources) {
- logger.warn(
- "{} recover storage group delete source file {}",
- storageGroupName,
- tsFileResource.getTsFile().getName());
+ deleteLevelFilesInDisk(sourceTsFileResources);
+ renameLevelFilesMods(modifications, sourceTsFileResources, targetResource);
}
- deleteLevelFilesInDisk(sourceTsFileResources);
- renameLevelFilesMods(modifications, sourceTsFileResources, targetResource);
}
}
- } catch (IOException | IllegalPathException | InterruptedException e) {
+ } catch (IOException | InterruptedException e) {
logger.error("recover level tsfile management error ", e);
restoreCompaction();
} finally {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
index 35ddc01..ee63ced 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
@@ -59,6 +59,7 @@ import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPA
import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.SOURCE_NAME;
import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.TARGET_NAME;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
public class LevelCompactionRecoverTest extends LevelCompactionTest {
@@ -79,9 +80,11 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
FileUtils.deleteDirectory(tempSGDir);
}
- /** compaction recover merge finished */
+ // uncompeleted target file and log
+ /** compaction recover merge finished, delete one device - offset */
@Test
- public void testCompactionMergeRecoverMergeFinished() throws IOException, IllegalPathException {
+ public void testCompactionRecoverWithUncompletedTargetFileAndLog()
+ throws IOException, IllegalPathException {
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
levelCompactionTsFileManagement.addAll(seqResources, true);
@@ -110,6 +113,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
count++;
}
}
+ tsFilesReader.close();
assertEquals(500, count);
CompactionLogger compactionLogger =
@@ -118,6 +122,17 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
compactionLogger.logFile(SOURCE_NAME, seqResources.get(1).getTsFile());
compactionLogger.logFile(SOURCE_NAME, seqResources.get(2).getTsFile());
compactionLogger.logSequence(true);
+ deleteFileIfExists(
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 1
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile")));
TsFileResource targetTsFileResource =
new TsFileResource(
new File(
@@ -141,7 +156,37 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
new ArrayList<>(),
null);
compactionLogger.close();
- levelCompactionTsFileManagement.add(targetTsFileResource, true);
+
+ BufferedReader logReader =
+ new BufferedReader(
+ new FileReader(
+ SystemFileFactory.INSTANCE.getFile(
+ tempSGDir.getPath(), COMPACTION_TEST_SG + COMPACTION_LOG_NAME)));
+ List<String> logs = new ArrayList<>();
+ String line;
+ while ((line = logReader.readLine()) != null) {
+ logs.add(line);
+ }
+ logReader.close();
+ BufferedWriter logStream =
+ new BufferedWriter(
+ new FileWriter(
+ SystemFileFactory.INSTANCE.getFile(
+ tempSGDir.getPath(), COMPACTION_TEST_SG + COMPACTION_LOG_NAME),
+ false));
+ for (int i = 0; i < logs.size() - 1; i++) {
+ logStream.write(logs.get(i));
+ logStream.newLine();
+ }
+ logStream.close();
+
+ TsFileOutput out =
+ FSFactoryProducer.getFileOutputFactory()
+ .getTsFileOutput(targetTsFileResource.getTsFile().getPath(), true);
+ out.truncate(Long.parseLong(logs.get(logs.size() - 1).split(" ")[1]) - 1);
+ out.close();
+
+ levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
levelCompactionTsFileManagement.recover();
context = new QueryContext();
path =
@@ -167,12 +212,13 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
count++;
}
}
+ tsFilesReader.close();
assertEquals(500, count);
}
- /** compaction recover merge finished, delete one offset */
+ /** compaction recover merge finished */
@Test
- public void testCompactionMergeRecoverMergeFinishedAndDeleteOneOffset()
+ public void testRecoverCompleteTargetFileAndCompactionLog()
throws IOException, IllegalPathException {
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
@@ -202,6 +248,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
count++;
}
}
+ tsFilesReader.close();
assertEquals(500, count);
CompactionLogger compactionLogger =
@@ -210,6 +257,17 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
compactionLogger.logFile(SOURCE_NAME, seqResources.get(1).getTsFile());
compactionLogger.logFile(SOURCE_NAME, seqResources.get(2).getTsFile());
compactionLogger.logSequence(true);
+ deleteFileIfExists(
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 1
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile")));
TsFileResource targetTsFileResource =
new TsFileResource(
new File(
@@ -233,30 +291,115 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
new ArrayList<>(),
null);
compactionLogger.close();
-
- BufferedReader logReader =
- new BufferedReader(
- new FileReader(
- SystemFileFactory.INSTANCE.getFile(
- tempSGDir.getPath(), COMPACTION_TEST_SG + COMPACTION_LOG_NAME)));
- List<String> logs = new ArrayList<>();
- String line;
- while ((line = logReader.readLine()) != null) {
- logs.add(line);
+ levelCompactionTsFileManagement.add(targetTsFileResource, true);
+ levelCompactionTsFileManagement.recover();
+ context = new QueryContext();
+ path =
+ new PartialPath(
+ deviceIds[0]
+ + TsFileConstant.PATH_SEPARATOR
+ + measurementSchemas[0].getMeasurementId());
+ tsFilesReader =
+ new SeriesRawDataBatchReader(
+ path,
+ measurementSchemas[0].getType(),
+ context,
+ levelCompactionTsFileManagement.getTsFileList(true),
+ new ArrayList<>(),
+ null,
+ null,
+ true);
+ count = 0;
+ while (tsFilesReader.hasNextBatch()) {
+ BatchData batchData = tsFilesReader.nextBatch();
+ for (int i = 0; i < batchData.length(); i++) {
+ assertEquals(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), 0.001);
+ count++;
+ }
}
- logReader.close();
- BufferedWriter logStream =
- new BufferedWriter(
- new FileWriter(
- SystemFileFactory.INSTANCE.getFile(
- tempSGDir.getPath(), COMPACTION_TEST_SG + COMPACTION_LOG_NAME),
- false));
- for (int i = 0; i < logs.size() - 1; i++) {
- logStream.write(logs.get(i));
- logStream.newLine();
+ tsFilesReader.close();
+ assertEquals(500, count);
+ }
+
+ /** compeleted target file, and not resource files, compaction log exists */
+ @Test
+ public void testCompactionRecoverWithCompletedTargetFileAndLog()
+ throws IOException, IllegalPathException {
+ LevelCompactionTsFileManagement levelCompactionTsFileManagement =
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+ levelCompactionTsFileManagement.addAll(seqResources, true);
+ levelCompactionTsFileManagement.addAll(unseqResources, false);
+ QueryContext context = new QueryContext();
+ PartialPath path =
+ new PartialPath(
+ deviceIds[0]
+ + TsFileConstant.PATH_SEPARATOR
+ + measurementSchemas[0].getMeasurementId());
+ IBatchReader tsFilesReader =
+ new SeriesRawDataBatchReader(
+ path,
+ measurementSchemas[0].getType(),
+ context,
+ levelCompactionTsFileManagement.getTsFileList(true),
+ new ArrayList<>(),
+ null,
+ null,
+ true);
+ int count = 0;
+ while (tsFilesReader.hasNextBatch()) {
+ BatchData batchData = tsFilesReader.nextBatch();
+ for (int i = 0; i < batchData.length(); i++) {
+ assertEquals(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), 0.001);
+ count++;
+ }
}
- logStream.close();
+ tsFilesReader.close();
+ assertEquals(500, count);
+ CompactionLogger compactionLogger =
+ new CompactionLogger(tempSGDir.getPath(), COMPACTION_TEST_SG);
+ compactionLogger.logFile(SOURCE_NAME, seqResources.get(0).getTsFile());
+ compactionLogger.logFile(SOURCE_NAME, seqResources.get(1).getTsFile());
+ compactionLogger.logFile(SOURCE_NAME, seqResources.get(2).getTsFile());
+ compactionLogger.logSequence(true);
+ deleteFileIfExists(
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 1
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile")));
+ TsFileResource targetTsFileResource =
+ new TsFileResource(
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 1
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile")));
+ compactionLogger.logFile(TARGET_NAME, targetTsFileResource.getTsFile());
+ CompactionUtils.merge(
+ targetTsFileResource,
+ new ArrayList<>(seqResources.subList(0, 3)),
+ COMPACTION_TEST_SG,
+ compactionLogger,
+ new HashSet<>(),
+ true,
+ new ArrayList<>(),
+ null);
+ compactionLogger.close();
+ for (TsFileResource resource : new ArrayList<>(seqResources.subList(0, 3))) {
+ levelCompactionTsFileManagement.remove(resource, true);
+ deleteFileIfExists(resource.getTsFile());
+ }
levelCompactionTsFileManagement.add(targetTsFileResource, true);
levelCompactionTsFileManagement.recover();
context = new QueryContext();
@@ -283,12 +426,13 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
count++;
}
}
+ tsFilesReader.close();
assertEquals(500, count);
}
- /** compaction recover merge finished, delete one device - offset */
+ /** compeleted target file, and not resource files, compaction log exists */
@Test
- public void testCompactionMergeRecoverMergeFinishedAndDeleteOneDeviceWithOffset()
+ public void testCompactionRecoverWithCompletedTargetFile()
throws IOException, IllegalPathException {
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
@@ -318,6 +462,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
count++;
}
}
+ tsFilesReader.close();
assertEquals(500, count);
CompactionLogger compactionLogger =
@@ -326,6 +471,17 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
compactionLogger.logFile(SOURCE_NAME, seqResources.get(1).getTsFile());
compactionLogger.logFile(SOURCE_NAME, seqResources.get(2).getTsFile());
compactionLogger.logSequence(true);
+ deleteFileIfExists(
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 1
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile")));
TsFileResource targetTsFileResource =
new TsFileResource(
new File(
@@ -349,37 +505,29 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
new ArrayList<>(),
null);
compactionLogger.close();
+ File logFile =
+ SystemFileFactory.INSTANCE.getFile(
+ tempSGDir.getPath(), COMPACTION_TEST_SG + COMPACTION_LOG_NAME);
+ long totalWaitingTime = 0;
+ while (logFile.exists()) {
+ logFile.delete();
+ System.gc();
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
- BufferedReader logReader =
- new BufferedReader(
- new FileReader(
- SystemFileFactory.INSTANCE.getFile(
- tempSGDir.getPath(), COMPACTION_TEST_SG + COMPACTION_LOG_NAME)));
- List<String> logs = new ArrayList<>();
- String line;
- while ((line = logReader.readLine()) != null) {
- logs.add(line);
+ }
+ totalWaitingTime += 100;
+ if (totalWaitingTime > 10_000) {
+ System.out.println("failed to delete " + logFile);
+ fail();
+ }
}
- logReader.close();
- BufferedWriter logStream =
- new BufferedWriter(
- new FileWriter(
- SystemFileFactory.INSTANCE.getFile(
- tempSGDir.getPath(), COMPACTION_TEST_SG + COMPACTION_LOG_NAME),
- false));
- for (int i = 0; i < logs.size() - 1; i++) {
- logStream.write(logs.get(i));
- logStream.newLine();
+ for (TsFileResource resource : new ArrayList<>(seqResources.subList(0, 3))) {
+ levelCompactionTsFileManagement.remove(resource, true);
+ deleteFileIfExists(resource.getTsFile());
}
- logStream.close();
-
- TsFileOutput out =
- FSFactoryProducer.getFileOutputFactory()
- .getTsFileOutput(targetTsFileResource.getTsFile().getPath(), true);
- out.truncate(Long.parseLong(logs.get(logs.size() - 1).split(" ")[1]) - 1);
- out.close();
-
- levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
+ levelCompactionTsFileManagement.add(targetTsFileResource, true);
levelCompactionTsFileManagement.recover();
context = new QueryContext();
path =
@@ -405,6 +553,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
count++;
}
}
+ tsFilesReader.close();
assertEquals(500, count);
}
@@ -443,6 +592,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
count++;
}
}
+ tsFilesReader.close();
assertEquals(500, count);
CompactionLogger compactionLogger =
@@ -451,6 +601,17 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
compactionLogger.logFile(SOURCE_NAME, seqResources.get(1).getTsFile());
compactionLogger.logFile(SOURCE_NAME, seqResources.get(2).getTsFile());
compactionLogger.logSequence(false);
+ deleteFileIfExists(
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 1
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile")));
TsFileResource targetTsFileResource =
new TsFileResource(
new File(
@@ -500,10 +661,12 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
count++;
}
}
+ tsFilesReader.close();
assertEquals(500, count);
IoTDBDescriptor.getInstance().getConfig().setUnseqLevelNum(prevUnseqLevelNum);
}
+ // log exists, target file not exists
/** compaction recover merge start just log source file */
@Test
public void testCompactionMergeRecoverMergeStartSourceLog()
@@ -543,6 +706,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
count++;
}
}
+ tsFilesReader.close();
assertEquals(500, count);
}
@@ -586,6 +750,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
count++;
}
}
+ tsFilesReader.close();
assertEquals(500, count);
}
@@ -602,6 +767,17 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
compactionLogger.logFile(SOURCE_NAME, seqResources.get(1).getTsFile());
compactionLogger.logFile(SOURCE_NAME, seqResources.get(2).getTsFile());
compactionLogger.logSequence(true);
+ deleteFileIfExists(
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 1
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile")));
TsFileResource targetTsFileResource =
new TsFileResource(
new File(
@@ -642,6 +818,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
count++;
}
}
+ tsFilesReader.close();
assertEquals(500, count);
}
@@ -659,6 +836,17 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
compactionLogger.logFile(SOURCE_NAME, seqResources.get(1).getTsFile());
compactionLogger.logFile(SOURCE_NAME, seqResources.get(2).getTsFile());
compactionLogger.logSequence(true);
+ deleteFileIfExists(
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 1
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile")));
TsFileResource targetTsFileResource =
new TsFileResource(
new File(
@@ -708,6 +896,25 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
count++;
}
}
+ tsFilesReader.close();
assertEquals(500, count);
}
+
+ public void deleteFileIfExists(File file) {
+ long waitingTime = 0l;
+ while (file.exists()) {
+ file.delete();
+ System.gc();
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+
+ }
+ waitingTime += 100;
+ if (waitingTime > 20_000) {
+ System.out.println("fail to delete " + file);
+ break;
+ }
+ }
+ }
}