You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/09/05 09:27:50 UTC

[iotdb] branch rel/0.12 updated: [To rel/0.12] [IOTDB-1611] Fix tsfile recover bug (#3895)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new 871c483  [To rel/0.12] [IOTDB-1611] Fix tsfile recover bug (#3895)
871c483 is described below

commit 871c4835b260487207c8ce11f49cd382a2a469a3
Author: liuxuxin <37...@users.noreply.github.com>
AuthorDate: Sun Sep 5 17:27:19 2021 +0800

    [To rel/0.12] [IOTDB-1611] Fix tsfile recover bug (#3895)
---
 .../level/LevelCompactionTsFileManagement.java     | 133 +++------
 .../compaction/LevelCompactionRecoverTest.java     | 319 +++++++++++++++++----
 2 files changed, 296 insertions(+), 156 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 0701b00..03fa8da 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -29,11 +29,9 @@ import org.apache.iotdb.db.engine.compaction.utils.CompactionUtils;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
-import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -430,9 +428,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
         logAnalyzer.analyze();
         Set<String> deviceSet = logAnalyzer.getDeviceSet();
         List<String> sourceFileList = logAnalyzer.getSourceFiles();
-        long offset = logAnalyzer.getOffset();
         String targetFile = logAnalyzer.getTargetFile();
-        boolean fullMerge = logAnalyzer.isFullMerge();
         boolean isSeq = logAnalyzer.isSeq();
         if (targetFile == null || sourceFileList.isEmpty()) {
           return;
@@ -445,118 +441,55 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
           }
           return;
         }
-        if (fullMerge) {
-          // get tsfile resource from list, as they have been recovered in StorageGroupProcessor
-          TsFileResource targetTsFileResource = getRecoverTsFileResource(targetFile, isSeq);
-          if (targetTsFileResource == null) {
-            targetTsFileResource = getTsFileResource(targetFile, isSeq);
-            if (targetTsFileResource == null) {
-              logger.warn("get null targetTsFileResource");
-              return;
-            }
-          }
-          long timePartition = targetTsFileResource.getTimePartition();
-          RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(target, false);
-          // if not complete compaction, resume merge
-          if (writer.hasCrashed()) {
-            if (offset > 0) {
-              writer.getIOWriterOut().truncate(offset);
-              writer.dropLastChunkGroupMetadata();
-            }
-            CompactionLogger compactionLogger =
-                new CompactionLogger(storageGroupDir, storageGroupName);
-            List<Modification> modifications = new ArrayList<>();
-            CompactionUtils.merge(
-                targetTsFileResource,
-                getTsFileList(isSeq),
-                storageGroupName,
-                compactionLogger,
-                deviceSet,
-                isSeq,
-                modifications,
-                writer);
-            compactionLogger.close();
+        // get tsfile resource from list, as they have been recovered in StorageGroupProcessor
+        TsFileResource targetResource = getRecoverTsFileResource(targetFile, isSeq);
+        if (targetResource != null) {
+          // target tsfile is not compeleted
+          targetResource.remove();
+          if (isSeq) {
+            sequenceRecoverTsFileResources.clear();
           } else {
-            writer.close();
-          }
-          // complete compaction and delete source file
-          deleteAllSubLevelFiles(isSeq, timePartition);
-        } else {
-          // get tsfile resource from list, as they have been recovered in StorageGroupProcessor
-          TsFileResource targetResource = getRecoverTsFileResource(targetFile, isSeq);
-          if (targetResource == null) {
-            // new file already merged but old file not deleted
-            targetResource = getTsFileResource(targetFile, isSeq);
-            if (targetResource == null) {
-              throw new IOException();
-            }
+            unSequenceRecoverTsFileResources.clear();
           }
+        } else if ((targetResource = getTsFileResource(targetFile, isSeq)) != null) {
+          // complete compaction, delete source files
           long timePartition = targetResource.getTimePartition();
           List<TsFileResource> sourceTsFileResources = new ArrayList<>();
           for (String file : sourceFileList) {
             // get tsfile resource from list, as they have been recovered in StorageGroupProcessor
             TsFileResource sourceTsFileResource = getTsFileResource(file, isSeq);
             if (sourceTsFileResource == null) {
-              throw new IOException();
+              // if sourceTsFileResource is null, it has been deleted
+              continue;
             }
             sourceTsFileResources.add(sourceTsFileResource);
           }
-          int level = TsFileResource.getMergeLevel(new File(sourceFileList.get(0)).getName());
-          RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(target, false);
-          List<Modification> modifications = new ArrayList<>();
-          // if not complete compaction, resume merge
-          if (writer.hasCrashed()) {
-            if (offset > 0) {
-              writer.getIOWriterOut().truncate(offset);
-              writer.dropLastChunkGroupMetadata();
-            }
-            CompactionLogger compactionLogger =
-                new CompactionLogger(storageGroupDir, storageGroupName);
-            CompactionUtils.merge(
-                targetResource,
-                sourceTsFileResources,
-                storageGroupName,
-                compactionLogger,
-                deviceSet,
-                isSeq,
-                modifications,
-                writer);
-            compactionLogger.close();
-            // complete compaction and add target tsfile
-            int targetLevel = TsFileResource.getMergeLevel(targetResource.getTsFile().getName());
-            if (isSeq) {
-              sequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
-              sequenceRecoverTsFileResources.clear();
-            } else {
-              unSequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
-              unSequenceRecoverTsFileResources.clear();
+          if (sourceFileList.size() != 0) {
+            List<Modification> modifications = new ArrayList<>();
+            // if not complete compaction, remove target file
+            writeLock();
+            try {
+              if (Thread.currentThread().isInterrupted()) {
+                throw new InterruptedException(
+                    String.format("%s [Compaction] abort", storageGroupName));
+              }
+              int level = TsFileResource.getMergeLevel(new File(sourceFileList.get(0)).getName());
+              deleteLevelFilesInList(timePartition, sourceTsFileResources, level, isSeq);
+            } finally {
+              writeUnlock();
             }
-          } else {
-            // complete compaction, just close writer
-            writer.close();
-          }
-          // complete compaction, delete source files
-          writeLock();
-          try {
-            if (Thread.currentThread().isInterrupted()) {
-              throw new InterruptedException(
-                  String.format("%s [Compaction] abort", storageGroupName));
+            for (TsFileResource tsFileResource : sourceTsFileResources) {
+              logger.info(
+                  "{} recover storage group delete source file {}",
+                  storageGroupName,
+                  tsFileResource.getTsFile().getName());
             }
-            deleteLevelFilesInList(timePartition, sourceTsFileResources, level, isSeq);
-          } finally {
-            writeUnlock();
-          }
-          for (TsFileResource tsFileResource : sourceTsFileResources) {
-            logger.warn(
-                "{} recover storage group delete source file {}",
-                storageGroupName,
-                tsFileResource.getTsFile().getName());
+            deleteLevelFilesInDisk(sourceTsFileResources);
+            renameLevelFilesMods(modifications, sourceTsFileResources, targetResource);
           }
-          deleteLevelFilesInDisk(sourceTsFileResources);
-          renameLevelFilesMods(modifications, sourceTsFileResources, targetResource);
         }
       }
-    } catch (IOException | IllegalPathException | InterruptedException e) {
+    } catch (IOException | InterruptedException e) {
       logger.error("recover level tsfile management error ", e);
       restoreCompaction();
     } finally {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
index 35ddc01..ee63ced 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
@@ -59,6 +59,7 @@ import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPA
 import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.SOURCE_NAME;
 import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.TARGET_NAME;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 public class LevelCompactionRecoverTest extends LevelCompactionTest {
 
@@ -79,9 +80,11 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
     FileUtils.deleteDirectory(tempSGDir);
   }
 
-  /** compaction recover merge finished */
+  // uncompeleted target file and log
+  /** compaction recover merge finished, delete one device - offset */
   @Test
-  public void testCompactionMergeRecoverMergeFinished() throws IOException, IllegalPathException {
+  public void testCompactionRecoverWithUncompletedTargetFileAndLog()
+      throws IOException, IllegalPathException {
     LevelCompactionTsFileManagement levelCompactionTsFileManagement =
         new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
     levelCompactionTsFileManagement.addAll(seqResources, true);
@@ -110,6 +113,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         count++;
       }
     }
+    tsFilesReader.close();
     assertEquals(500, count);
 
     CompactionLogger compactionLogger =
@@ -118,6 +122,17 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
     compactionLogger.logFile(SOURCE_NAME, seqResources.get(1).getTsFile());
     compactionLogger.logFile(SOURCE_NAME, seqResources.get(2).getTsFile());
     compactionLogger.logSequence(true);
+    deleteFileIfExists(
+        new File(
+            TestConstant.BASE_OUTPUT_PATH.concat(
+                0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 1
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + ".tsfile")));
     TsFileResource targetTsFileResource =
         new TsFileResource(
             new File(
@@ -141,7 +156,37 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         new ArrayList<>(),
         null);
     compactionLogger.close();
-    levelCompactionTsFileManagement.add(targetTsFileResource, true);
+
+    BufferedReader logReader =
+        new BufferedReader(
+            new FileReader(
+                SystemFileFactory.INSTANCE.getFile(
+                    tempSGDir.getPath(), COMPACTION_TEST_SG + COMPACTION_LOG_NAME)));
+    List<String> logs = new ArrayList<>();
+    String line;
+    while ((line = logReader.readLine()) != null) {
+      logs.add(line);
+    }
+    logReader.close();
+    BufferedWriter logStream =
+        new BufferedWriter(
+            new FileWriter(
+                SystemFileFactory.INSTANCE.getFile(
+                    tempSGDir.getPath(), COMPACTION_TEST_SG + COMPACTION_LOG_NAME),
+                false));
+    for (int i = 0; i < logs.size() - 1; i++) {
+      logStream.write(logs.get(i));
+      logStream.newLine();
+    }
+    logStream.close();
+
+    TsFileOutput out =
+        FSFactoryProducer.getFileOutputFactory()
+            .getTsFileOutput(targetTsFileResource.getTsFile().getPath(), true);
+    out.truncate(Long.parseLong(logs.get(logs.size() - 1).split(" ")[1]) - 1);
+    out.close();
+
+    levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
     levelCompactionTsFileManagement.recover();
     context = new QueryContext();
     path =
@@ -167,12 +212,13 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         count++;
       }
     }
+    tsFilesReader.close();
     assertEquals(500, count);
   }
 
-  /** compaction recover merge finished, delete one offset */
+  /** compaction recover merge finished */
   @Test
-  public void testCompactionMergeRecoverMergeFinishedAndDeleteOneOffset()
+  public void testRecoverCompleteTargetFileAndCompactionLog()
       throws IOException, IllegalPathException {
     LevelCompactionTsFileManagement levelCompactionTsFileManagement =
         new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
@@ -202,6 +248,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         count++;
       }
     }
+    tsFilesReader.close();
     assertEquals(500, count);
 
     CompactionLogger compactionLogger =
@@ -210,6 +257,17 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
     compactionLogger.logFile(SOURCE_NAME, seqResources.get(1).getTsFile());
     compactionLogger.logFile(SOURCE_NAME, seqResources.get(2).getTsFile());
     compactionLogger.logSequence(true);
+    deleteFileIfExists(
+        new File(
+            TestConstant.BASE_OUTPUT_PATH.concat(
+                0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 1
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + ".tsfile")));
     TsFileResource targetTsFileResource =
         new TsFileResource(
             new File(
@@ -233,30 +291,115 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         new ArrayList<>(),
         null);
     compactionLogger.close();
-
-    BufferedReader logReader =
-        new BufferedReader(
-            new FileReader(
-                SystemFileFactory.INSTANCE.getFile(
-                    tempSGDir.getPath(), COMPACTION_TEST_SG + COMPACTION_LOG_NAME)));
-    List<String> logs = new ArrayList<>();
-    String line;
-    while ((line = logReader.readLine()) != null) {
-      logs.add(line);
+    levelCompactionTsFileManagement.add(targetTsFileResource, true);
+    levelCompactionTsFileManagement.recover();
+    context = new QueryContext();
+    path =
+        new PartialPath(
+            deviceIds[0]
+                + TsFileConstant.PATH_SEPARATOR
+                + measurementSchemas[0].getMeasurementId());
+    tsFilesReader =
+        new SeriesRawDataBatchReader(
+            path,
+            measurementSchemas[0].getType(),
+            context,
+            levelCompactionTsFileManagement.getTsFileList(true),
+            new ArrayList<>(),
+            null,
+            null,
+            true);
+    count = 0;
+    while (tsFilesReader.hasNextBatch()) {
+      BatchData batchData = tsFilesReader.nextBatch();
+      for (int i = 0; i < batchData.length(); i++) {
+        assertEquals(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), 0.001);
+        count++;
+      }
     }
-    logReader.close();
-    BufferedWriter logStream =
-        new BufferedWriter(
-            new FileWriter(
-                SystemFileFactory.INSTANCE.getFile(
-                    tempSGDir.getPath(), COMPACTION_TEST_SG + COMPACTION_LOG_NAME),
-                false));
-    for (int i = 0; i < logs.size() - 1; i++) {
-      logStream.write(logs.get(i));
-      logStream.newLine();
+    tsFilesReader.close();
+    assertEquals(500, count);
+  }
+
+  /** compeleted target file, and not resource files, compaction log exists */
+  @Test
+  public void testCompactionRecoverWithCompletedTargetFileAndLog()
+      throws IOException, IllegalPathException {
+    LevelCompactionTsFileManagement levelCompactionTsFileManagement =
+        new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+    levelCompactionTsFileManagement.addAll(seqResources, true);
+    levelCompactionTsFileManagement.addAll(unseqResources, false);
+    QueryContext context = new QueryContext();
+    PartialPath path =
+        new PartialPath(
+            deviceIds[0]
+                + TsFileConstant.PATH_SEPARATOR
+                + measurementSchemas[0].getMeasurementId());
+    IBatchReader tsFilesReader =
+        new SeriesRawDataBatchReader(
+            path,
+            measurementSchemas[0].getType(),
+            context,
+            levelCompactionTsFileManagement.getTsFileList(true),
+            new ArrayList<>(),
+            null,
+            null,
+            true);
+    int count = 0;
+    while (tsFilesReader.hasNextBatch()) {
+      BatchData batchData = tsFilesReader.nextBatch();
+      for (int i = 0; i < batchData.length(); i++) {
+        assertEquals(batchData.getTimeByIndex(i), batchData.getDoubleByIndex(i), 0.001);
+        count++;
+      }
     }
-    logStream.close();
+    tsFilesReader.close();
+    assertEquals(500, count);
 
+    CompactionLogger compactionLogger =
+        new CompactionLogger(tempSGDir.getPath(), COMPACTION_TEST_SG);
+    compactionLogger.logFile(SOURCE_NAME, seqResources.get(0).getTsFile());
+    compactionLogger.logFile(SOURCE_NAME, seqResources.get(1).getTsFile());
+    compactionLogger.logFile(SOURCE_NAME, seqResources.get(2).getTsFile());
+    compactionLogger.logSequence(true);
+    deleteFileIfExists(
+        new File(
+            TestConstant.BASE_OUTPUT_PATH.concat(
+                0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 1
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + ".tsfile")));
+    TsFileResource targetTsFileResource =
+        new TsFileResource(
+            new File(
+                TestConstant.BASE_OUTPUT_PATH.concat(
+                    0
+                        + IoTDBConstant.FILE_NAME_SEPARATOR
+                        + 0
+                        + IoTDBConstant.FILE_NAME_SEPARATOR
+                        + 1
+                        + IoTDBConstant.FILE_NAME_SEPARATOR
+                        + 0
+                        + ".tsfile")));
+    compactionLogger.logFile(TARGET_NAME, targetTsFileResource.getTsFile());
+    CompactionUtils.merge(
+        targetTsFileResource,
+        new ArrayList<>(seqResources.subList(0, 3)),
+        COMPACTION_TEST_SG,
+        compactionLogger,
+        new HashSet<>(),
+        true,
+        new ArrayList<>(),
+        null);
+    compactionLogger.close();
+    for (TsFileResource resource : new ArrayList<>(seqResources.subList(0, 3))) {
+      levelCompactionTsFileManagement.remove(resource, true);
+      deleteFileIfExists(resource.getTsFile());
+    }
     levelCompactionTsFileManagement.add(targetTsFileResource, true);
     levelCompactionTsFileManagement.recover();
     context = new QueryContext();
@@ -283,12 +426,13 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         count++;
       }
     }
+    tsFilesReader.close();
     assertEquals(500, count);
   }
 
-  /** compaction recover merge finished, delete one device - offset */
+  /** compeleted target file, and not resource files, compaction log exists */
   @Test
-  public void testCompactionMergeRecoverMergeFinishedAndDeleteOneDeviceWithOffset()
+  public void testCompactionRecoverWithCompletedTargetFile()
       throws IOException, IllegalPathException {
     LevelCompactionTsFileManagement levelCompactionTsFileManagement =
         new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
@@ -318,6 +462,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         count++;
       }
     }
+    tsFilesReader.close();
     assertEquals(500, count);
 
     CompactionLogger compactionLogger =
@@ -326,6 +471,17 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
     compactionLogger.logFile(SOURCE_NAME, seqResources.get(1).getTsFile());
     compactionLogger.logFile(SOURCE_NAME, seqResources.get(2).getTsFile());
     compactionLogger.logSequence(true);
+    deleteFileIfExists(
+        new File(
+            TestConstant.BASE_OUTPUT_PATH.concat(
+                0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 1
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + ".tsfile")));
     TsFileResource targetTsFileResource =
         new TsFileResource(
             new File(
@@ -349,37 +505,29 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         new ArrayList<>(),
         null);
     compactionLogger.close();
+    File logFile =
+        SystemFileFactory.INSTANCE.getFile(
+            tempSGDir.getPath(), COMPACTION_TEST_SG + COMPACTION_LOG_NAME);
+    long totalWaitingTime = 0;
+    while (logFile.exists()) {
+      logFile.delete();
+      System.gc();
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
 
-    BufferedReader logReader =
-        new BufferedReader(
-            new FileReader(
-                SystemFileFactory.INSTANCE.getFile(
-                    tempSGDir.getPath(), COMPACTION_TEST_SG + COMPACTION_LOG_NAME)));
-    List<String> logs = new ArrayList<>();
-    String line;
-    while ((line = logReader.readLine()) != null) {
-      logs.add(line);
+      }
+      totalWaitingTime += 100;
+      if (totalWaitingTime > 10_000) {
+        System.out.println("failed to delete " + logFile);
+        fail();
+      }
     }
-    logReader.close();
-    BufferedWriter logStream =
-        new BufferedWriter(
-            new FileWriter(
-                SystemFileFactory.INSTANCE.getFile(
-                    tempSGDir.getPath(), COMPACTION_TEST_SG + COMPACTION_LOG_NAME),
-                false));
-    for (int i = 0; i < logs.size() - 1; i++) {
-      logStream.write(logs.get(i));
-      logStream.newLine();
+    for (TsFileResource resource : new ArrayList<>(seqResources.subList(0, 3))) {
+      levelCompactionTsFileManagement.remove(resource, true);
+      deleteFileIfExists(resource.getTsFile());
     }
-    logStream.close();
-
-    TsFileOutput out =
-        FSFactoryProducer.getFileOutputFactory()
-            .getTsFileOutput(targetTsFileResource.getTsFile().getPath(), true);
-    out.truncate(Long.parseLong(logs.get(logs.size() - 1).split(" ")[1]) - 1);
-    out.close();
-
-    levelCompactionTsFileManagement.addRecover(targetTsFileResource, true);
+    levelCompactionTsFileManagement.add(targetTsFileResource, true);
     levelCompactionTsFileManagement.recover();
     context = new QueryContext();
     path =
@@ -405,6 +553,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         count++;
       }
     }
+    tsFilesReader.close();
     assertEquals(500, count);
   }
 
@@ -443,6 +592,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         count++;
       }
     }
+    tsFilesReader.close();
     assertEquals(500, count);
 
     CompactionLogger compactionLogger =
@@ -451,6 +601,17 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
     compactionLogger.logFile(SOURCE_NAME, seqResources.get(1).getTsFile());
     compactionLogger.logFile(SOURCE_NAME, seqResources.get(2).getTsFile());
     compactionLogger.logSequence(false);
+    deleteFileIfExists(
+        new File(
+            TestConstant.BASE_OUTPUT_PATH.concat(
+                0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 1
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + ".tsfile")));
     TsFileResource targetTsFileResource =
         new TsFileResource(
             new File(
@@ -500,10 +661,12 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         count++;
       }
     }
+    tsFilesReader.close();
     assertEquals(500, count);
     IoTDBDescriptor.getInstance().getConfig().setUnseqLevelNum(prevUnseqLevelNum);
   }
 
+  // log exists, target file not exists
   /** compaction recover merge start just log source file */
   @Test
   public void testCompactionMergeRecoverMergeStartSourceLog()
@@ -543,6 +706,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         count++;
       }
     }
+    tsFilesReader.close();
     assertEquals(500, count);
   }
 
@@ -586,6 +750,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         count++;
       }
     }
+    tsFilesReader.close();
     assertEquals(500, count);
   }
 
@@ -602,6 +767,17 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
     compactionLogger.logFile(SOURCE_NAME, seqResources.get(1).getTsFile());
     compactionLogger.logFile(SOURCE_NAME, seqResources.get(2).getTsFile());
     compactionLogger.logSequence(true);
+    deleteFileIfExists(
+        new File(
+            TestConstant.BASE_OUTPUT_PATH.concat(
+                0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 1
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + ".tsfile")));
     TsFileResource targetTsFileResource =
         new TsFileResource(
             new File(
@@ -642,6 +818,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         count++;
       }
     }
+    tsFilesReader.close();
     assertEquals(500, count);
   }
 
@@ -659,6 +836,17 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
     compactionLogger.logFile(SOURCE_NAME, seqResources.get(1).getTsFile());
     compactionLogger.logFile(SOURCE_NAME, seqResources.get(2).getTsFile());
     compactionLogger.logSequence(true);
+    deleteFileIfExists(
+        new File(
+            TestConstant.BASE_OUTPUT_PATH.concat(
+                0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 1
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + ".tsfile")));
     TsFileResource targetTsFileResource =
         new TsFileResource(
             new File(
@@ -708,6 +896,25 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
         count++;
       }
     }
+    tsFilesReader.close();
     assertEquals(500, count);
   }
+
+  public void deleteFileIfExists(File file) {
+    long waitingTime = 0l;
+    while (file.exists()) {
+      file.delete();
+      System.gc();
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+
+      }
+      waitingTime += 100;
+      if (waitingTime > 20_000) {
+        System.out.println("fail to delete " + file);
+        break;
+      }
+    }
+  }
 }