You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/09/16 08:53:15 UTC

[incubator-iotdb] branch dev_TTL updated: add timed TTL check task

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev_TTL
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/dev_TTL by this push:
     new afbd649  add timed TTL check task
afbd649 is described below

commit afbd64942a625f10a00c3659a28a9f5610fea20d
Author: jt <jt...@163.com>
AuthorDate: Mon Sep 16 16:42:12 2019 +0800

    add timed TTL check task
---
 .../iotdb/jdbc/IoTDBDatabaseMetadataTest.java      |   2 +-
 .../org/apache/iotdb/jdbc/IoTDBStatementTest.java  |   2 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  40 +++--
 .../db/engine/merge/manage/MergeResource.java      |  46 +++---
 .../iotdb/db/engine/merge/task/MergeTask.java      |   4 +
 .../engine/storagegroup/StorageGroupProcessor.java | 164 +++++++++++++++------
 .../db/engine/storagegroup/TsFileResource.java     |  51 +++++--
 .../iotdb/db/query/control/JobFileManager.java     |  25 +++-
 .../fileRelated/UnSealedTsFileIterateReader.java   |   2 +-
 .../UnSealedTsFileReaderByTimestamp.java           |   2 +-
 .../resourceRelated/UnseqResourceMergeReader.java  |   2 +-
 .../UnseqResourceReaderByTimestamp.java            |   2 +-
 .../db/engine/memtable/PrimitiveMemTableTest.java  |   4 +-
 .../engine/storagegroup/TsFileProcessorTest.java   |  20 +--
 .../iotdb/db/metadata/MManagerAdvancedTest.java    |   2 +-
 .../db/query/control/FileReaderManagerTest.java    |   6 +-
 .../iotdb/db/writelog/recover/LogReplayerTest.java |   2 +-
 17 files changed, 256 insertions(+), 120 deletions(-)

diff --git a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadataTest.java b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadataTest.java
index f0a20a1..8cdbce4 100644
--- a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadataTest.java
+++ b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadataTest.java
@@ -361,7 +361,7 @@ public class IoTDBDatabaseMetadataTest {
   @SuppressWarnings("resource")
   @Test
   public void ShowStorageGroup() throws Exception {
-    Set<String> sgSet = new HashSet<>();
+    List<String> sgSet = new ArrayList<>();
     sgSet.add("root.vehicle");
     when(fetchMetadataResp.getShowStorageGroups()).thenReturn(sgSet);
 
diff --git a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBStatementTest.java b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBStatementTest.java
index 44fd2f4..5bd2cc4 100644
--- a/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBStatementTest.java
+++ b/jdbc/src/test/java/org/apache/iotdb/jdbc/IoTDBStatementTest.java
@@ -141,7 +141,7 @@ public class IoTDBStatementTest {
   @Test
   public void testExecuteSQL3() throws SQLException, TException {
     IoTDBStatement stmt = new IoTDBStatement(connection, client, sessHandle, zoneID);
-    Set<String> sgSet = new HashSet<>();
+    List<String> sgSet = new ArrayList<>();
     sgSet.add("root.vehicle");
     when(fetchMetadataResp.getShowStorageGroups()).thenReturn(sgSet);
     String standard = "Storage Group,\nroot.vehicle,\n";
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 76c6072..8df7cc8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -21,17 +21,20 @@ package org.apache.iotdb.db.engine;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.ConcurrentModificationException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.MetadataErrorException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.exception.StorageEngineException;
@@ -57,6 +60,7 @@ public class StorageEngine implements IService {
 
   private static final Logger logger = LoggerFactory.getLogger(StorageEngine.class);
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private static final long TTL_CHECK_INTERVAL = 60 * 1000;
 
   /**
    * a folder (system/storage_groups/ by default) that persist system info. Each Storage Processor
@@ -75,6 +79,8 @@ public class StorageEngine implements IService {
     return INSTANCE;
   }
 
+  private ScheduledExecutorService ttlCheckThread;
+
   private StorageEngine() {
     systemDir = FilePathUtils.regularizePath(config.getSystemDir()) + "storage_groups";
     // create systemDir
@@ -104,11 +110,32 @@ public class StorageEngine implements IService {
   @Override
   public void start() {
     // nothing to be done
+    ttlCheckThread = Executors.newSingleThreadScheduledExecutor();
+    ttlCheckThread.scheduleAtFixedRate(this::checkTTL, TTL_CHECK_INTERVAL, TTL_CHECK_INTERVAL
+    ,TimeUnit.MILLISECONDS);
+  }
+
+  private void checkTTL() {
+    try {
+      for (StorageGroupProcessor processor : processorMap.values()) {
+        processor.checkFilesTTL();
+      }
+    } catch (ConcurrentModificationException e) {
+      // ignore
+    } catch (Exception e) {
+      logger.error("An error occurred when checking TTL", e);
+    }
   }
 
   @Override
   public void stop() {
     syncCloseAllProcessor();
+    ttlCheckThread.shutdownNow();
+    try {
+      ttlCheckThread.awaitTermination(30, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      logger.warn("TTL check thread still doesn't exist after 30s");
+    }
   }
 
   @Override
@@ -193,15 +220,6 @@ public class StorageEngine implements IService {
   }
 
   /**
-   * only for unit test
-   */
-  public void asyncFlushAndSealAllFiles() {
-    for (StorageGroupProcessor storageGroupProcessor : processorMap.values()) {
-      storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList();
-    }
-  }
-
-  /**
    * flush command Sync asyncCloseOneProcessor all file node processors.
    */
   public void syncCloseAllProcessor() {
@@ -270,7 +288,7 @@ public class StorageEngine implements IService {
   }
 
   /**
-   * get all overlap tsfiles which are conflict with the appendFile.
+   * get all overlap TsFiles which are conflict with the appendFile.
    *
    * @param storageGroupName the seriesPath of storage group
    * @param appendFile the appended tsfile information
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
index d8b6c4c..9384d1b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
@@ -19,6 +19,19 @@
 
 package org.apache.iotdb.db.engine.merge.manage;
 
+import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.iotdb.db.engine.fileSystem.FileFactory;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.query.reader.IPointReader;
@@ -27,7 +40,6 @@ import org.apache.iotdb.db.utils.MergeUtils;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.db.engine.fileSystem.FileFactory;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.common.Path;
@@ -36,16 +48,6 @@ import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-
-import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
 
 /**
  * MergeResource manages files and caches of readers, writers, MeasurementSchemas and
@@ -53,8 +55,6 @@ import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
  */
 public class MergeResource {
 
-  private static final Logger logger = LoggerFactory.getLogger(MergeResource.class);
-
   private List<TsFileResource> seqFiles;
   private List<TsFileResource> unseqFiles;
 
@@ -67,9 +67,21 @@ public class MergeResource {
   private boolean cacheDeviceMeta = false;
 
   public MergeResource(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles) {
-    this.seqFiles = seqFiles.stream().filter(TsFileResource::isClosed).collect(Collectors.toList());
+    this.seqFiles = seqFiles.stream().filter(res -> res.isClosed() && !res.isDeleted())
+        .collect(Collectors.toList());
     this.unseqFiles =
-        unseqFiles.stream().filter(TsFileResource::isClosed).collect(Collectors.toList());
+        unseqFiles.stream().filter(res -> res.isClosed() && !res.isDeleted())
+            .collect(Collectors.toList());
+  }
+
+  public MergeResource(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles,
+      long timeBound) {
+    this.seqFiles =
+        seqFiles.stream().filter(res -> res.isClosed() && !res.isDeleted()
+            && res.stillLives(timeBound)).collect(Collectors.toList());
+    this.unseqFiles =
+        unseqFiles.stream().filter(res -> res.isClosed() && !res.isDeleted()
+            && res.stillLives(timeBound)).collect(Collectors.toList());
   }
 
   public void clear() throws IOException {
@@ -229,10 +241,6 @@ public class MergeResource {
     this.unseqFiles = unseqFiles;
   }
 
-  public Map<String, MeasurementSchema> getMeasurementSchemaMap() {
-    return measurementSchemaMap;
-  }
-
   public void removeOutdatedSeqReaders() throws IOException {
     Iterator<Entry<TsFileResource, TsFileSequenceReader>> entryIterator =
         fileReaderCache.entrySet().iterator();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
index 9e99747..e4a6d3f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
@@ -158,6 +158,10 @@ public class MergeTask implements Callable<Void> {
     for (TsFileResource seqFile : resource.getSeqFiles()) {
       File mergeFile = new File(seqFile.getFile().getPath() + MERGE_SUFFIX);
       mergeFile.delete();
+      seqFile.setMerging(false);
+    }
+    for (TsFileResource unseqFile : resource.getUnseqFiles()) {
+      unseqFile.setMerging(false);
     }
 
     File logFile = new File(storageGroupSysDir, MergeLogger.MERGE_LOG_NAME);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 2ce3d42..6e7f19b 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -40,6 +41,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.fileSystem.FileFactory;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.engine.merge.manage.MergeResource;
 import org.apache.iotdb.db.engine.merge.selector.IMergeFileSelector;
@@ -67,18 +69,17 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.JobFileManager;
-import org.apache.iotdb.rpc.TSStatusType;
 import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
 import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
+import org.apache.iotdb.rpc.TSStatusType;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.db.engine.fileSystem.FileFactory;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.schema.Schema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -104,7 +105,7 @@ import org.slf4j.LoggerFactory;
  */
 public class StorageGroupProcessor {
 
-  private static final String MERGING_MODIFICAITON_FILE_NAME = "merge.mods";
+  private static final String MERGING_MODIFICATION_FILE_NAME = "merge.mods";
   private static final Logger logger = LoggerFactory.getLogger(StorageGroupProcessor.class);
   /**
    * a read write lock for guaranteeing concurrent safety when accessing all fields in this class
@@ -220,7 +221,7 @@ public class StorageGroupProcessor {
       recoverUnseqFiles(unseqTsFiles);
 
       String taskName = storageGroupName + "-" + System.currentTimeMillis();
-      File mergingMods = new File(storageGroupSysDir, MERGING_MODIFICAITON_FILE_NAME);
+      File mergingMods = new File(storageGroupSysDir, MERGING_MODIFICATION_FILE_NAME);
       if (mergingMods.exists()) {
         mergingModification = new ModificationFile(mergingMods.getPath());
       }
@@ -242,7 +243,7 @@ public class StorageGroupProcessor {
     }
   }
 
-  private List<TsFileResource> getAllFiles(List<String> folders) throws IOException {
+  private List<TsFileResource> getAllFiles(List<String> folders) {
     List<File> tsFiles = new ArrayList<>();
     for (String baseDir : folders) {
       File fileFolder = FileFactory.INSTANCE.getFile(baseDir, storageGroupName);
@@ -565,7 +566,7 @@ public class StorageGroupProcessor {
           try {
             FileUtils.deleteDirectory(storageGroupFolder);
           } catch (IOException e) {
-            logger.error("Delete tsfiles failed", e);
+            logger.error("Delete TsFiles failed", e);
           }
         }
       }
@@ -582,6 +583,52 @@ public class StorageGroupProcessor {
     }
   }
 
+  public synchronized void checkFilesTTL() {
+    long timeBound = System.currentTimeMillis() - dataTTL;
+    logger.info("TTL removing files before {}", new Date(timeBound));
+    for (TsFileResource tsFileResource : unSequenceFileList) {
+      checkFileTTL(tsFileResource, timeBound, true);
+    }
+    for (TsFileResource tsFileResource : sequenceFileList) {
+      checkFileTTL(tsFileResource, timeBound, false);
+    }
+  }
+
+  private void checkFileTTL(TsFileResource resource, long timeBound, boolean isSeq) {
+    if (resource.isMerging() || !resource.isClosed()
+        || !resource.isDeleted() && resource.stillLives(timeBound)) {
+      return;
+    }
+
+    writeLock();
+    try {
+      // prevent new merges and queries from choosing this file
+      resource.setDeleted(true);
+      // the file may be chosen for merge after the last check and before writeLock()
+      // double check to ensure the file is not used by a merge
+      if (resource.isMerging()) {
+        return;
+      }
+      // ensure that the file is not used by any queries
+      if (resource.getMergeQueryLock().writeLock().tryLock()) {
+        try {
+          // physical removal
+          resource.remove();
+          logger.info("Removed a file {} by ttl ({}ms)", resource.getFile().getPath(), dataTTL);
+          if (isSeq) {
+            sequenceFileList.remove(resource);
+          } else {
+            unSequenceFileList.remove(resource);
+          }
+        } finally {
+          resource.getMergeQueryLock().writeLock().unlock();
+        }
+      }
+    } finally {
+      writeUnlock();
+    }
+  }
+
   /**
    * This method will be blocked until all tsfile processors are closed.
    */
@@ -835,7 +882,7 @@ public class StorageGroupProcessor {
    * put the memtable back to the MemTablePool and make the metadata in writer visible
    */
   // TODO please consider concurrency with query and insert method.
-  public void closeUnsealedTsFileProcessor(
+  private void closeUnsealedTsFileProcessor(
       TsFileProcessor tsFileProcessor) throws TsFileProcessorException {
     closeQueryLock.writeLock().lock();
     try {
@@ -871,7 +918,9 @@ public class StorageGroupProcessor {
       }
 
       long budget = IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget();
-      MergeResource mergeResource = new MergeResource(sequenceFileList, unSequenceFileList);
+      long timeBound = System.currentTimeMillis() - dataTTL;
+      MergeResource mergeResource = new MergeResource(sequenceFileList, unSequenceFileList, timeBound);
+
       IMergeFileSelector fileSelector = getMergeFileSelector(budget, mergeResource);
       try {
         List[] mergeFiles = fileSelector.select();
@@ -887,9 +936,16 @@ public class StorageGroupProcessor {
         // cached during selection
         mergeResource.setCacheDeviceMeta(true);
 
+        for (TsFileResource tsFileResource : mergeResource.getSeqFiles()) {
+          tsFileResource.setMerging(true);
+        }
+        for (TsFileResource tsFileResource : mergeResource.getUnseqFiles()) {
+          tsFileResource.setMerging(true);
+        }
+
         MergeTask mergeTask = new MergeTask(mergeResource, storageGroupSysDir.getPath(),
             this::mergeEndAction, taskName, fullMerge, fileSelector.getConcurrentMergeNum(), storageGroupName);
-        mergingModification = new ModificationFile(storageGroupSysDir + File.separator + MERGING_MODIFICAITON_FILE_NAME);
+        mergingModification = new ModificationFile(storageGroupSysDir + File.separator + MERGING_MODIFICATION_FILE_NAME);
         MergeManager.getINSTANCE().submitMainTask(mergeTask);
         if (logger.isInfoEnabled()) {
           logger.info("{} submits a merge task {}, merging {} seqFiles, {} unseqFiles",
@@ -918,17 +974,7 @@ public class StorageGroupProcessor {
     }
   }
 
-  protected void mergeEndAction(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles,
-      File mergeLog) {
-    logger.info("{} a merge task is ending...", storageGroupName);
-
-    if (unseqFiles.isEmpty()) {
-      // merge runtime exception arose, just end this merge
-      isMerging = false;
-      logger.info("{} a merge task abnormally ends", storageGroupName);
-      return;
-    }
-
+  private void removeUnseqFiles(List<TsFileResource> unseqFiles) {
     mergeLock.writeLock().lock();
     try {
       unSequenceFileList.removeAll(unseqFiles);
@@ -944,39 +990,61 @@ public class StorageGroupProcessor {
         unseqFile.getMergeQueryLock().writeLock().unlock();
       }
     }
+  }
+
+  private void updateMergeModification(TsFileResource seqFile) {
+    seqFile.getMergeQueryLock().writeLock().lock();
+    try {
+      // remove old modifications and write modifications generated during merge
+      seqFile.removeModFile();
+      if (mergingModification != null) {
+        for (Modification modification : mergingModification.getModifications()) {
+          seqFile.getModFile().write(modification);
+        }
+      }
+    } catch (IOException e) {
+      logger.error("{} cannot clean the ModificationFile of {} after merge", storageGroupName,
+          seqFile.getFile(), e);
+    } finally {
+      seqFile.getMergeQueryLock().writeLock().unlock();
+    }
+  }
+
+  private void removeMergingModification() {
+    try {
+      if (mergingModification != null) {
+        mergingModification.remove();
+        mergingModification = null;
+      }
+    } catch (IOException e) {
+      logger.error("{} cannot remove merging modification ", storageGroupName, e);
+    }
+  }
+
+  protected void mergeEndAction(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles,
+      File mergeLog) {
+    logger.info("{} a merge task is ending...", storageGroupName);
+
+    if (unseqFiles.isEmpty()) {
+      // merge runtime exception arose, just end this merge
+      isMerging = false;
+      logger.info("{} a merge task abnormally ends", storageGroupName);
+      return;
+    }
+
+    removeUnseqFiles(unseqFiles);
 
     for (int i = 0; i < seqFiles.size(); i++) {
       TsFileResource seqFile = seqFiles.get(i);
-      seqFile.getMergeQueryLock().writeLock().lock();
       mergeLock.writeLock().lock();
       try {
-        logger.debug("{} is updating the {} merged file's modification file", storageGroupName, i);
-        try {
-          // remove old modifications and write modifications generated during merge
-          seqFile.removeModFile();
-          if (mergingModification != null) {
-            for (Modification modification : mergingModification.getModifications()) {
-              seqFile.getModFile().write(modification);
-            }
-          }
-        } catch (IOException e) {
-          logger.error("{} cannot clean the ModificationFile of {} after merge", storageGroupName,
-              seqFile.getFile(), e);
-        }
+        updateMergeModification(seqFile);
         if (i == seqFiles.size() - 1) {
-          try {
-            if (mergingModification != null) {
-              mergingModification.remove();
-              mergingModification = null;
-            }
-          } catch (IOException e) {
-            logger.error("{} cannot remove merging modification ", storageGroupName, e);
-          }
+          removeMergingModification();
           isMerging = false;
+          mergeLog.delete();
         }
       } finally {
-        mergeLog.delete();
-        seqFile.getMergeQueryLock().writeLock().unlock();
         mergeLock.writeLock().unlock();
       }
     }
@@ -984,6 +1052,7 @@ public class StorageGroupProcessor {
   }
 
 
+
   public TsFileProcessor getWorkSequenceTsFileProcessor() {
     return workSequenceTsFileProcessor;
   }
@@ -994,11 +1063,8 @@ public class StorageGroupProcessor {
     void call(TsFileProcessor caller) throws TsFileProcessorException, IOException;
   }
 
-  public long getDataTTL() {
-    return dataTTL;
-  }
-
   public void setDataTTL(long dataTTL) {
     this.dataTTL = dataTTL;
+    checkFilesTTL();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 6dfdf57..66b3a8c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -38,7 +38,7 @@ public class TsFileResource {
   private File file;
 
   public static final String RESOURCE_SUFFIX = ".resource";
-  public static final String TEMP_SUFFIX = ".temp";
+  static final String TEMP_SUFFIX = ".temp";
 
   /**
    * device -> start time
@@ -55,12 +55,14 @@ public class TsFileResource {
   private ModificationFile modFile;
 
   private volatile boolean closed = false;
+  private volatile boolean deleted = false;
+  private volatile boolean isMerging = false;
 
   /**
    * Chunk metadata list of unsealed tsfile. Only be set in a temporal TsFileResource in a query
    * process.
    */
-  private List<ChunkMetaData> chunkMetaDatas;
+  private List<ChunkMetaData> chunkMetaDataList;
 
   /**
    * Mem chunk data. Only be set in a temporal TsFileResource in a query process.
@@ -96,11 +98,11 @@ public class TsFileResource {
       Map<String, Long> startTimeMap,
       Map<String, Long> endTimeMap,
       ReadOnlyMemChunk readOnlyMemChunk,
-      List<ChunkMetaData> chunkMetaDatas) {
+      List<ChunkMetaData> chunkMetaDataList) {
     this.file = file;
     this.startTimeMap = startTimeMap;
     this.endTimeMap = endTimeMap;
-    this.chunkMetaDatas = chunkMetaDatas;
+    this.chunkMetaDataList = chunkMetaDataList;
     this.readOnlyMemChunk = readOnlyMemChunk;
   }
 
@@ -168,8 +170,8 @@ public class TsFileResource {
       endTimeMap.put(device, time);
   }
 
-  public List<ChunkMetaData> getChunkMetaDatas() {
-    return chunkMetaDatas;
+  public List<ChunkMetaData> getChunkMetaDataList() {
+    return chunkMetaDataList;
   }
 
   public ReadOnlyMemChunk getReadOnlyMemChunk() {
@@ -199,10 +201,6 @@ public class TsFileResource {
     return startTimeMap;
   }
 
-  public void setEndTimeMap(Map<String, Long> endTimeMap) {
-    this.endTimeMap = endTimeMap;
-  }
-
   public Map<String, Long> getEndTimeMap() {
     return endTimeMap;
   }
@@ -218,7 +216,7 @@ public class TsFileResource {
       modFile = null;
     }
     processor = null;
-    chunkMetaDatas = null;
+    chunkMetaDataList = null;
   }
 
   public TsFileProcessor getUnsealedFileProcessor() {
@@ -265,4 +263,35 @@ public class TsFileResource {
   public void setClosed(boolean closed) {
     this.closed = closed;
   }
+
+  public boolean isDeleted() {
+    return deleted;
+  }
+
+  public void setDeleted(boolean deleted) {
+    this.deleted = deleted;
+  }
+
+  public boolean isMerging() {
+    return isMerging;
+  }
+
+  public void setMerging(boolean merging) {
+    isMerging = merging;
+  }
+
+  /**
+   * check if any of the device lives over the given time bound
+   * @param timeBound
+   * @return
+   */
+  public boolean stillLives(long timeBound) {
+    for (long endTime : endTimeMap.values()) {
+      // the file cannot be deleted if any device still lives
+      if (endTime >= timeBound) {
+        return true;
+      }
+    }
+    return false;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java
index d8901f7..10242f1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/JobFileManager.java
@@ -59,13 +59,26 @@ public class JobFileManager {
   public void addUsedFilesForGivenJob(long jobId, QueryDataSource dataSource) {
 
     //sequence data
-    for(TsFileResource tsFileResource : dataSource.getSeqResources()){
-      addFilePathToMap(jobId, tsFileResource, tsFileResource.isClosed());
-    }
+    addUsedFilesForGivenJob(jobId, dataSource.getSeqResources());
 
     //unsequence data
-    for(TsFileResource tsFileResource : dataSource.getUnseqResources()){
-      addFilePathToMap(jobId, tsFileResource, tsFileResource.isClosed());
+    addUsedFilesForGivenJob(jobId, dataSource.getUnseqResources());
+  }
+
+  private void addUsedFilesForGivenJob(long jobId, List<TsFileResource> resources) {
+    for (TsFileResource tsFileResource : resources) {
+      // the file may change from open to closed within the few statements, so the initial status
+      // should be recorded to ensure consistency
+      boolean isClosed = tsFileResource.isClosed();
+      addFilePathToMap(jobId, tsFileResource, isClosed);
+      // this file may be deleted just before we lock it
+      if (tsFileResource.isDeleted()) {
+        ConcurrentHashMap<Long, Set<TsFileResource>> pathMap = !isClosed ?
+            unsealedFilePathsMap : sealedFilePathsMap;
+        pathMap.get(jobId).remove(tsFileResource);
+        FileReaderManager.getInstance().decreaseFileReaderReference(tsFileResource, isClosed);
+        resources.remove(tsFileResource);
+      }
     }
   }
 
@@ -97,7 +110,7 @@ public class JobFileManager {
    * must not return null.
    */
   void addFilePathToMap(long jobId, TsFileResource tsFile, boolean isClosed) {
-    ConcurrentHashMap<Long, Set<TsFileResource>> pathMap = !isClosed ? unsealedFilePathsMap :
+    ConcurrentHashMap<Long, Set<TsFileResource>> pathMap = !tsFile.isClosed() ? unsealedFilePathsMap :
         sealedFilePathsMap;
     //TODO this is not an atomic operation, is there concurrent problem?
     if (!pathMap.get(jobId).contains(tsFile)) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/UnSealedTsFileIterateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/UnSealedTsFileIterateReader.java
index c2faa12..34c31e4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/UnSealedTsFileIterateReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/UnSealedTsFileIterateReader.java
@@ -106,7 +106,7 @@ public class UnSealedTsFileIterateReader extends IterateReader {
       Filter filter)
       throws IOException {
     FileSeriesReader fileSeriesReader;
-    List<ChunkMetaData> metaDataList = unSealedTsFile.getChunkMetaDatas();
+    List<ChunkMetaData> metaDataList = unSealedTsFile.getChunkMetaDataList();
 
     if (metaDataList == null || metaDataList.isEmpty()) {
       // init fileSeriesReader
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/UnSealedTsFileReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/UnSealedTsFileReaderByTimestamp.java
index 86d6d76..6c03b2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/UnSealedTsFileReaderByTimestamp.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/fileRelated/UnSealedTsFileReaderByTimestamp.java
@@ -67,7 +67,7 @@ public class UnSealedTsFileReaderByTimestamp implements IReaderByTimestamp {
         .get(unsealedTsFile, false);
     IChunkLoader chunkLoader = new ChunkLoaderImpl(unClosedTsFileReader);
     unSealedTsFileDiskReaderByTs = new FileSeriesReaderByTimestamp(chunkLoader,
-        unsealedTsFile.getChunkMetaDatas());
+        unsealedTsFile.getChunkMetaDataList());
 
     unSealedTsFileDiskReaderEnded = false;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceMergeReader.java
index 9d8cf96..767aa79 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceMergeReader.java
@@ -84,7 +84,7 @@ public class UnseqResourceMergeReader extends PriorityMergeReader {
             continue;
           }
         }
-        metaDataList = tsFileResource.getChunkMetaDatas();
+        metaDataList = tsFileResource.getChunkMetaDataList();
       }
 
       ChunkLoaderImpl chunkLoader = null;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceReaderByTimestamp.java
index 36a5e64..675471f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceReaderByTimestamp.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/resourceRelated/UnseqResourceReaderByTimestamp.java
@@ -64,7 +64,7 @@ public class UnseqResourceReaderByTimestamp extends PriorityMergeReaderByTimesta
           QueryUtils.modifyChunkMetaData(metaDataList, pathModifications);
         }
       } else {
-        metaDataList = tsFileResource.getChunkMetaDatas();
+        metaDataList = tsFileResource.getChunkMetaDataList();
       }
 
       ChunkLoaderImpl chunkLoader = null;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
index f742585..5529c9c 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
@@ -79,7 +79,7 @@ public class PrimitiveMemTableTest {
       memTable.write(deviceId, measurementId[0], TSDataType.INT32, i, String.valueOf(i));
     }
     Iterator<TimeValuePair> tvPair = memTable
-        .query(deviceId, measurementId[0], TSDataType.INT32, Collections.emptyMap())
+        .query(deviceId, measurementId[0], TSDataType.INT32, Collections.emptyMap(), Long.MIN_VALUE)
         .getSortedTimeValuePairList().iterator();
     for (int i = 0; i < dataSize; i++) {
       TimeValuePair timeValuePair = tvPair.next();
@@ -97,7 +97,7 @@ public class PrimitiveMemTableTest {
           ret[i].getValue().getStringValue());
     }
     Iterator<TimeValuePair> tvPair = memTable
-        .query(deviceId, sensorId, dataType, Collections.emptyMap())
+        .query(deviceId, sensorId, dataType, Collections.emptyMap(), Long.MIN_VALUE)
         .getSortedTimeValuePairList()
         .iterator();
     Arrays.sort(ret);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
index 407dd94..a5f82e2 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
@@ -82,7 +82,7 @@ public class TsFileProcessorTest {
         () -> true, true);
 
     Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = processor
-        .query(deviceId, measurementId, dataType, props, context);
+        .query(deviceId, measurementId, dataType, props, context, Long.MIN_VALUE);
     ReadOnlyMemChunk left = pair.left;
     List<ChunkMetaData> right = pair.right;
     assertTrue(left.isEmpty());
@@ -95,7 +95,7 @@ public class TsFileProcessorTest {
     }
 
     // query data in memory
-    pair = processor.query(deviceId, measurementId, dataType, props, context);
+    pair = processor.query(deviceId, measurementId, dataType, props, context, Long.MIN_VALUE);
     left = pair.left;
     assertFalse(left.isEmpty());
     int num = 1;
@@ -110,7 +110,7 @@ public class TsFileProcessorTest {
     // flush synchronously
     processor.syncFlush();
 
-    pair = processor.query(deviceId, measurementId, dataType, props, context);
+    pair = processor.query(deviceId, measurementId, dataType, props, context, Long.MIN_VALUE);
     left = pair.left;
     right = pair.right;
     assertTrue(left.isEmpty());
@@ -129,7 +129,7 @@ public class TsFileProcessorTest {
         () -> true, true);
 
     Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = processor
-        .query(deviceId, measurementId, dataType, props, context);
+        .query(deviceId, measurementId, dataType, props, context, Long.MIN_VALUE);
     ReadOnlyMemChunk left = pair.left;
     List<ChunkMetaData> right = pair.right;
     assertTrue(left.isEmpty());
@@ -142,7 +142,7 @@ public class TsFileProcessorTest {
     }
 
     // query data in memory
-    pair = processor.query(deviceId, measurementId, dataType, props, context);
+    pair = processor.query(deviceId, measurementId, dataType, props, context, Long.MIN_VALUE);
     left = pair.left;
     assertFalse(left.isEmpty());
     int num = 1;
@@ -157,7 +157,7 @@ public class TsFileProcessorTest {
     // flush synchronously
     processor.syncFlush();
 
-    pair = processor.query(deviceId, measurementId, dataType, props, context);
+    pair = processor.query(deviceId, measurementId, dataType, props, context, Long.MIN_VALUE);
     left = pair.left;
     right = pair.right;
     assertTrue(left.isEmpty());
@@ -196,7 +196,7 @@ public class TsFileProcessorTest {
         () -> true, true);
 
     Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = processor
-        .query(deviceId, measurementId, dataType, props, context);
+        .query(deviceId, measurementId, dataType, props, context, Long.MIN_VALUE);
     ReadOnlyMemChunk left = pair.left;
     List<ChunkMetaData> right = pair.right;
     assertTrue(left.isEmpty());
@@ -212,7 +212,7 @@ public class TsFileProcessorTest {
     }
     processor.syncFlush();
 
-    pair = processor.query(deviceId, measurementId, dataType, props, context);
+    pair = processor.query(deviceId, measurementId, dataType, props, context, Long.MIN_VALUE);
     left = pair.left;
     right = pair.right;
     assertTrue(left.isEmpty());
@@ -244,7 +244,7 @@ public class TsFileProcessorTest {
         }, () -> true, true);
 
     Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = processor
-        .query(deviceId, measurementId, dataType, props, context);
+        .query(deviceId, measurementId, dataType, props, context, Long.MIN_VALUE);
     ReadOnlyMemChunk left = pair.left;
     List<ChunkMetaData> right = pair.right;
     assertTrue(left.isEmpty());
@@ -257,7 +257,7 @@ public class TsFileProcessorTest {
     }
 
     // query data in memory
-    pair = processor.query(deviceId, measurementId, dataType, props, context);
+    pair = processor.query(deviceId, measurementId, dataType, props, context, Long.MIN_VALUE);
     left = pair.left;
     assertFalse(left.isEmpty());
     int num = 1;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerAdvancedTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerAdvancedTest.java
index de4e9ec..1ac44e9 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerAdvancedTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerAdvancedTest.java
@@ -75,7 +75,7 @@ public class MManagerAdvancedTest {
     try {
       // test file name
       List<String> fileNames = mmanager.getAllStorageGroupNames();
-      assertEquals(2, fileNames.size());
+      assertEquals(3, fileNames.size());
       if (fileNames.get(0).equals("root.vehicle.d0")) {
         assertEquals("root.vehicle.d1", fileNames.get(1));
       } else {
diff --git a/server/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java b/server/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
index 0545df1..6c091c0 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
@@ -72,8 +72,7 @@ public class FileReaderManagerTest {
 
         for (int i = 1; i <= 6; i++) {
           TsFileResource tsFile = tsFileResources[i];
-          testManager.addFilePathToMap(1L, tsFile,
-              false);
+          testManager.addFilePathToMap(1L, tsFile, false);
           manager.get(tsFile, false);
           Assert.assertTrue(manager.contains(tsFile, false));
         }
@@ -95,8 +94,7 @@ public class FileReaderManagerTest {
 
         for (int i = 4; i <= MAX_FILE_SIZE; i++) {
           TsFileResource tsFile = tsFileResources[i];
-          testManager.addFilePathToMap(2L, tsFile,
-              false);
+          testManager.addFilePathToMap(2L, tsFile, false);
           manager.get(tsFile, false);
           Assert.assertTrue(manager.contains(tsFile, false));
         }
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
index 12bfbbc..6ac5619 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
@@ -96,7 +96,7 @@ public class LogReplayerTest {
 
       for (int i = 0; i < 5; i++) {
         ReadOnlyMemChunk chunk = memTable.query("device" + i, "sensor" + i, TSDataType.INT64,
-            Collections.emptyMap());
+            Collections.emptyMap(), Long.MIN_VALUE);
         Iterator<TimeValuePair> iterator = chunk.getIterator();
         if (i == 0) {
           assertFalse(iterator.hasNext());