You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/02/21 03:35:41 UTC

[incubator-iotdb] branch master updated: [IOTDB-5]Enable deletion in master (#51)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 69df377  [IOTDB-5]Enable deletion in master (#51)
69df377 is described below

commit 69df377f25c549b1d4a00fac37708c38bc5c45aa
Author: Jiang Tian <jt...@163.com>
AuthorDate: Thu Feb 21 11:35:37 2019 +0800

    [IOTDB-5]Enable deletion in master (#51)
    
    * [IOTDB-5]Deal with merge and other optimizations (#17)
    
    add deletion function.
    
    * fix IntervalFileNode
    
    * fix a wrong branch in FileNodeManager
    
    * some format refinements
    
    * fix indent and equals() in Modification
---
 .../engine/bufferwrite/BufferWriteProcessor.java   |  38 ++-
 .../iotdb/db/engine/filenode/FileNodeManager.java  | 279 ++++++++++--------
 .../db/engine/filenode/FileNodeProcessor.java      | 325 ++++++++++++++++-----
 .../iotdb/db/engine/filenode/IntervalFileNode.java |  29 +-
 .../iotdb/db/engine/memtable/AbstractMemTable.java |  63 ++++
 .../apache/iotdb/db/engine/memtable/IMemTable.java |  16 +
 .../db/engine/memtable/IWritableMemChunk.java      |   3 +
 .../db/engine/memtable/MemTableFlushUtil.java      |   6 +-
 .../db/engine/memtable/PrimitiveMemTable.java      |  18 ++
 .../iotdb/db/engine/memtable/WritableMemChunk.java |   5 +
 .../iotdb/db/engine/modification/Deletion.java     |  63 ++++
 .../iotdb/db/engine/modification/Modification.java |  84 ++++++
 .../db/engine/modification/ModificationFile.java   | 120 ++++++++
 .../io/LocalTextModificationAccessor.java          | 155 ++++++++++
 .../engine/modification/io/ModificationReader.java |  37 ++-
 .../engine/modification/io/ModificationWriter.java |  39 +--
 .../modification/package-info.java}                |  12 +-
 .../db/engine/overflow/io/OverflowProcessor.java   | 278 +++++++++---------
 .../db/engine/overflow/io/OverflowResource.java    |  68 ++++-
 .../db/engine/overflow/io/OverflowSupport.java     |  20 +-
 .../version/SimpleFileVersionController.java       | 118 ++++++++
 .../engine/version/SysTimeVersionController.java   |  26 +-
 .../VersionController.java}                        |  22 +-
 .../org/apache/iotdb/db/monitor/StatMonitor.java   |   7 +-
 .../iotdb/db/qp/executor/OverflowQPExecutor.java   |   2 +-
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    |  17 +-
 .../iotdb/db/query/context/QueryContext.java       |  78 +++++
 .../db/query/control/QueryDataSourceManager.java   |   9 +-
 .../executor/EngineExecutorWithTimeGenerator.java  |  16 +-
 .../EngineExecutorWithoutTimeGenerator.java        |  23 +-
 .../iotdb/db/query/executor/EngineQueryRouter.java |  10 +-
 .../db/query/factory/SeriesReaderFactory.java      |  24 +-
 .../query/reader/sequence/SealedTsFilesReader.java |  30 +-
 .../query/reader/sequence/SequenceDataReader.java  |   7 +-
 .../query/timegenerator/EngineNodeConstructor.java |  22 +-
 .../query/timegenerator/EngineTimeGenerator.java   |   9 +-
 .../java/org/apache/iotdb/db/utils/QueryUtils.java |  73 +++++
 .../recover/ExclusiveLogRecoverPerformer.java      |   6 +-
 .../db/writelog/replay/ConcreteLogReplayer.java    |  17 +-
 .../iotdb/db/writelog/replay/LogReplayer.java      |   2 +-
 .../engine/bufferwrite/BufferWriteBenchmark.java   |  10 +-
 .../bufferwrite/BufferWriteProcessorNewTest.java   |   6 +-
 .../bufferwrite/BufferWriteProcessorTest.java      |  14 +-
 .../bufferwrite/RestorableTsFileIOWriterTest.java  |   4 +-
 .../memcontrol/BufferwriteFileSizeControlTest.java |   3 +-
 .../memcontrol/BufferwriteMetaSizeControlTest.java |   3 +-
 .../memcontrol/OverflowFileSizeControlTest.java    |   3 +-
 .../memcontrol/OverflowMetaSizeControlTest.java    |   3 +-
 .../engine/modification/DeletionFileNodeTest.java  | 245 ++++++++++++++++
 .../db/engine/modification/DeletionQueryTest.java  | 293 +++++++++++++++++++
 .../engine/modification/ModificationFileTest.java  | 101 +++++++
 .../io/LocalTextModificationAccessorTest.java      |  80 +++++
 .../overflow/io/OverflowProcessorBenchmark.java    |   3 +-
 .../engine/overflow/io/OverflowProcessorTest.java  |  45 +--
 .../engine/overflow/io/OverflowResourceTest.java   |  14 +-
 .../db/engine/overflow/io/OverflowSupportTest.java |   4 +-
 .../version/SimpleFileVersionControllerTest.java   |  53 ++++
 .../version/SysTimeVersionControllerTest.java      |  32 +-
 .../iotdb/db/integration/IoTDBAuthorizationIT.java |   8 +-
 .../iotdb/db/integration/IoTDBCompleteIT.java      |   9 +-
 .../apache/iotdb/db/integration/IoTDBDaemonIT.java |   8 +-
 .../iotdb/db/integration/IoTDBDeletionIT.java      | 250 ++++++++++++++++
 .../db/integration/IoTDBEngineTimeGeneratorIT.java |  16 +-
 .../iotdb/db/integration/IoTDBLargeDataIT.java     |   8 +-
 .../iotdb/db/integration/IoTDBLimitSlimitIT.java   |  10 +-
 .../iotdb/db/integration/IoTDBMetadataFetchIT.java |   8 +-
 .../iotdb/db/integration/IoTDBMultiSeriesIT.java   |   8 +-
 .../iotdb/db/integration/IoTDBSeriesReaderIT.java  |   8 +-
 .../iotdb/db/integration/IoTDBTimeZoneIT.java      |   4 -
 .../org/apache/iotdb/db/writelog/RecoverTest.java  |   2 +-
 .../tsfile/file/metadata/ChunkGroupMetaData.java   |  25 +-
 .../iotdb/tsfile/file/metadata/ChunkMetaData.java  |  27 +-
 .../org/apache/iotdb/tsfile/read/common/Chunk.java |   9 +
 .../tsfile/read/controller/ChunkLoaderImpl.java    |   4 +-
 .../read/controller/MetadataQuerierByFileImpl.java |   1 +
 .../tsfile/read/reader/chunk/ChunkReader.java      |  18 +-
 .../read/reader/chunk/ChunkReaderByTimestamp.java  |   2 +-
 .../read/reader/chunk/ChunkReaderWithFilter.java   |   2 +-
 .../reader/chunk/ChunkReaderWithoutFilter.java     |   2 +-
 .../iotdb/tsfile/read/reader/page/PageReader.java  |  54 +++-
 .../read/reader/series/FileSeriesReader.java       |   5 +-
 .../reader/series/FileSeriesReaderWithFilter.java  |   1 -
 .../series/FileSeriesReaderWithoutFilter.java      |   1 -
 .../reader/series/SeriesReaderByTimestamp.java     |   1 -
 .../apache/iotdb/tsfile/write/TsFileWriter.java    |   8 +-
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |   9 +-
 .../iotdb/tsfile/write/TsFileIOWriterTest.java     |   2 +-
 87 files changed, 2925 insertions(+), 677 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
index 86d3972..13dcf57 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
 import org.apache.iotdb.db.engine.pool.FlushManager;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.utils.FlushStatus;
+import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.BufferWriteProcessorException;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
@@ -79,6 +80,7 @@ public class BufferWriteProcessor extends Processor {
   private String bufferWriteRelativePath;
 
   private WriteLogNode logNode;
+  private VersionController versionController;
 
   /**
    * constructor of BufferWriteProcessor.
@@ -91,7 +93,7 @@ public class BufferWriteProcessor extends Processor {
    * @throws BufferWriteProcessorException BufferWriteProcessorException
    */
   public BufferWriteProcessor(String baseDir, String processorName, String fileName,
-      Map<String, Action> parameters,
+      Map<String, Action> parameters, VersionController versionController,
       FileSchema fileSchema) throws BufferWriteProcessorException {
     super(processorName);
     this.fileSchema = fileSchema;
@@ -131,6 +133,7 @@ public class BufferWriteProcessor extends Processor {
         throw new BufferWriteProcessorException(e);
       }
     }
+    this.versionController = versionController;
   }
 
   /**
@@ -178,13 +181,13 @@ public class BufferWriteProcessor extends Processor {
         return true;
       case WARNING:
         memory = MemUtils.bytesCntToStr(BasicMemController.getInstance().getTotalUsage());
-        LOGGER.warn("Memory usage will exceed warning threshold, current : {}.",memory);
+        LOGGER.warn("Memory usage will exceed warning threshold, current : {}.", memory);
         checkMemThreshold4Flush(memUsage);
         return true;
       case DANGEROUS:
       default:
         memory = MemUtils.bytesCntToStr(BasicMemController.getInstance().getTotalUsage());
-        LOGGER.warn("Memory usage will exceed dangerous threshold, current : {}.",memory);
+        LOGGER.warn("Memory usage will exceed dangerous threshold, current : {}.", memory);
         return false;
     }
   }
@@ -200,7 +203,7 @@ public class BufferWriteProcessor extends Processor {
       try {
         flush();
       } catch (IOException e) {
-        LOGGER.error("Flush bufferwrite error.",e);
+        LOGGER.error("Flush bufferwrite error.", e);
         throw new BufferWriteProcessorException(e);
       }
     }
@@ -258,14 +261,15 @@ public class BufferWriteProcessor extends Processor {
     }
   }
 
-  private void flushOperation(String flushFunction) {
+  private void flushOperation(String flushFunction, long version) {
     long flushStartTime = System.currentTimeMillis();
     LOGGER.info("The bufferwrite processor {} starts flushing {}.", getProcessorName(),
         flushFunction);
     try {
       if (flushMemTable != null && !flushMemTable.isEmpty()) {
         // flush data
-        MemTableFlushUtil.flushMemTable(fileSchema, writer, flushMemTable);
+        MemTableFlushUtil.flushMemTable(fileSchema, writer, flushMemTable,
+            version);
         // write restore information
         writer.flush();
       }
@@ -346,13 +350,14 @@ public class BufferWriteProcessor extends Processor {
       valueCount = 0;
       flushStatus.setFlushing();
       switchWorkToFlush();
+      long version = versionController.nextVersion();
       BasicMemController.getInstance().reportFree(this, memSize.get());
       memSize.set(0);
       // switch
       if (synchronization) {
-        flushOperation("synchronously");
+        flushOperation("synchronously", version);
       } else {
-        FlushManager.getInstance().submit(() -> flushOperation("asynchronously"));
+        FlushManager.getInstance().submit(() -> flushOperation("asynchronously", version));
       }
     }
     // TODO return a meaningful Future
@@ -500,4 +505,21 @@ public class BufferWriteProcessor extends Processor {
   public WriteLogNode getLogNode() {
     return logNode;
   }
+
+  /**
+   * Delete data whose timestamp <= 'timestamp' and belonging to timeseries deviceId.measurementId.
+   * Delete data in both working MemTable and flushing MemTable.
+   *
+   * @param deviceId the deviceId of the timeseries to be deleted.
+   * @param measurementId the measurementId of the timeseries to be deleted.
+   * @param timestamp the upper-bound of deletion time.
+   */
+  public void delete(String deviceId, String measurementId, long timestamp) {
+    workMemTable.delete(deviceId, measurementId, timestamp);
+    if (isFlush) {
+      // flushing MemTable cannot be directly modified since another thread is reading it
+      flushMemTable = flushMemTable.copy();
+      flushMemTable.delete(deviceId, measurementId, timestamp);
+    }
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index 482c827..7f6de86 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -54,6 +54,7 @@ import org.apache.iotdb.db.monitor.StatMonitor;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
+import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.ServiceType;
@@ -92,7 +93,7 @@ public class FileNodeManager implements IStatistic, IService {
     processorMap = new ConcurrentHashMap<>();
     statParamsHashMap = new HashMap<>();
     for (MonitorConstants.FileNodeManagerStatConstants fileNodeManagerStatConstant :
-            MonitorConstants.FileNodeManagerStatConstants.values()) {
+        MonitorConstants.FileNodeManagerStatConstants.values()) {
       statParamsHashMap.put(fileNodeManagerStatConstant.name(), new AtomicLong(0));
     }
 
@@ -119,9 +120,9 @@ public class FileNodeManager implements IStatistic, IService {
 
   private void updateStatHashMapWhenFail(TSRecord tsRecord) {
     statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_REQ_FAIL.name())
-            .incrementAndGet();
+        .incrementAndGet();
     statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_FAIL.name())
-            .addAndGet(tsRecord.dataPointList.size());
+        .addAndGet(tsRecord.dataPointList.size());
   }
 
   /**
@@ -138,9 +139,9 @@ public class FileNodeManager implements IStatistic, IService {
   public List<String> getAllPathForStatistic() {
     List<String> list = new ArrayList<>();
     for (MonitorConstants.FileNodeManagerStatConstants statConstant :
-            MonitorConstants.FileNodeManagerStatConstants.values()) {
-      list.add(
-              MonitorConstants.STAT_STORAGE_DELTA_NAME + MonitorConstants.MONITOR_PATH_SEPERATOR + statConstant.name());
+        MonitorConstants.FileNodeManagerStatConstants.values()) {
+      list.add(MonitorConstants.STAT_STORAGE_DELTA_NAME + MonitorConstants.MONITOR_PATH_SEPERATOR
+          + statConstant.name());
     }
     return list;
   }
@@ -149,7 +150,8 @@ public class FileNodeManager implements IStatistic, IService {
   public Map<String, TSRecord> getAllStatisticsValue() {
     long curTime = System.currentTimeMillis();
     TSRecord tsRecord = StatMonitor
-            .convertToTSRecord(getStatParamsHashMap(), MonitorConstants.STAT_STORAGE_DELTA_NAME, curTime);
+        .convertToTSRecord(getStatParamsHashMap(), MonitorConstants.STAT_STORAGE_DELTA_NAME,
+            curTime);
     HashMap<String, TSRecord> ret = new HashMap<>();
     ret.put(MonitorConstants.STAT_STORAGE_DELTA_NAME, tsRecord);
     return ret;
@@ -162,8 +164,9 @@ public class FileNodeManager implements IStatistic, IService {
   public void registStatMetadata() {
     Map<String, String> hashMap = new HashMap<>();
     for (MonitorConstants.FileNodeManagerStatConstants statConstant :
-            MonitorConstants.FileNodeManagerStatConstants.values()) {
-      hashMap.put(MonitorConstants.STAT_STORAGE_DELTA_NAME + MonitorConstants.MONITOR_PATH_SEPERATOR +
+        MonitorConstants.FileNodeManagerStatConstants.values()) {
+      hashMap
+          .put(MonitorConstants.STAT_STORAGE_DELTA_NAME + MonitorConstants.MONITOR_PATH_SEPERATOR +
               statConstant.name(), MonitorConstants.DATA_TYPE);
     }
     StatMonitor.getInstance().registStatStorageGroup(hashMap);
@@ -180,7 +183,7 @@ public class FileNodeManager implements IStatistic, IService {
   }
 
   private FileNodeProcessor constructNewProcessor(String filenodeName)
-          throws FileNodeManagerException {
+      throws FileNodeManagerException {
     try {
       return new FileNodeProcessor(baseDir, filenodeName);
     } catch (FileNodeProcessorException e) {
@@ -190,7 +193,7 @@ public class FileNodeManager implements IStatistic, IService {
   }
 
   private FileNodeProcessor getProcessor(String path, boolean isWriteLock)
-          throws FileNodeManagerException {
+      throws FileNodeManagerException {
     String filenodeName;
     try {
       filenodeName = MManager.getInstance().getFileNameByPath(path);
@@ -212,7 +215,7 @@ public class FileNodeManager implements IStatistic, IService {
         } else {
           // calculate the value with the key monitor
           LOGGER.debug("Calcuate the processor, the filenode is {}, Thread is {}", filenodeName,
-                  Thread.currentThread().getId());
+              Thread.currentThread().getId());
           processor = constructNewProcessor(filenodeName);
           processor.lock(isWriteLock);
           processorMap.put(filenodeName, processor);
@@ -233,8 +236,7 @@ public class FileNodeManager implements IStatistic, IService {
         FileNodeProcessor fileNodeProcessor = getProcessor(filenodeName, true);
         if (fileNodeProcessor.shouldRecovery()) {
           LOGGER.info("Recovery the filenode processor, the filenode is {}, the status is {}",
-                  filenodeName,
-                  fileNodeProcessor.getFileNodeProcessorStatus());
+              filenodeName, fileNodeProcessor.getFileNodeProcessorStatus());
           fileNodeProcessor.fileNodeRecovery();
         } else {
           fileNodeProcessor.writeUnlock();
@@ -249,9 +251,9 @@ public class FileNodeManager implements IStatistic, IService {
   /**
    * insert TsRecord into storage group.
    *
-   * @param tsRecord  input Data
+   * @param tsRecord input Data
    * @param isMonitor if true, the insertion is done by StatMonitor and the statistic Info will not
-   *                  be recorded. if false, the statParamsHashMap will be updated.
+   * be recorded. if false, the statParamsHashMap will be updated.
    * @return an int value represents the insert type
    */
   public int insert(TSRecord tsRecord, boolean isMonitor) throws FileNodeManagerException {
@@ -275,7 +277,7 @@ public class FileNodeManager implements IStatistic, IService {
       }
     } catch (FileNodeProcessorException e) {
       LOGGER.error(String.format("Encounter an error when closing the buffer write processor %s.",
-              fileNodeProcessor.getProcessorName()), e);
+          fileNodeProcessor.getProcessorName()), e);
       throw new FileNodeManagerException(e);
     } finally {
       fileNodeProcessor.writeUnlock();
@@ -283,22 +285,22 @@ public class FileNodeManager implements IStatistic, IService {
     // Modify the insert
     if (!isMonitor) {
       fileNodeProcessor.getStatParamsHashMap()
-              .get(MonitorConstants.FileNodeProcessorStatConstants.TOTAL_POINTS_SUCCESS.name())
-              .addAndGet(tsRecord.dataPointList.size());
+          .get(MonitorConstants.FileNodeProcessorStatConstants.TOTAL_POINTS_SUCCESS.name())
+          .addAndGet(tsRecord.dataPointList.size());
       fileNodeProcessor.getStatParamsHashMap()
-              .get(MonitorConstants.FileNodeProcessorStatConstants.TOTAL_REQ_SUCCESS.name())
-              .incrementAndGet();
+          .get(MonitorConstants.FileNodeProcessorStatConstants.TOTAL_REQ_SUCCESS.name())
+          .incrementAndGet();
       statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_REQ_SUCCESS.name())
-              .incrementAndGet();
+          .incrementAndGet();
       statParamsHashMap
-              .get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_SUCCESS.name())
-              .addAndGet(tsRecord.dataPointList.size());
+          .get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_SUCCESS.name())
+          .addAndGet(tsRecord.dataPointList.size());
     }
     return insertType;
   }
 
   private void writeLog(TSRecord tsRecord, boolean isMonitor, WriteLogNode logNode)
-          throws FileNodeManagerException {
+      throws FileNodeManagerException {
     try {
       if (IoTDBDescriptor.getInstance().getConfig().enableWal) {
         List<String> measurementList = new ArrayList<>();
@@ -307,9 +309,8 @@ public class FileNodeManager implements IStatistic, IService {
           measurementList.add(dp.getMeasurementId());
           insertValues.add(dp.getValue().toString());
         }
-        logNode.write(
-                new InsertPlan(2, tsRecord.deviceId, tsRecord.time, measurementList,
-                        insertValues));
+        logNode.write(new InsertPlan(2, tsRecord.deviceId, tsRecord.time, measurementList,
+            insertValues));
       }
     } catch (IOException e) {
       if (!isMonitor) {
@@ -329,13 +330,13 @@ public class FileNodeManager implements IStatistic, IService {
   private void updateStat(boolean isMonitor, TSRecord tsRecord) {
     if (!isMonitor) {
       statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS.name())
-              .addAndGet(tsRecord.dataPointList.size());
+          .addAndGet(tsRecord.dataPointList.size());
     }
   }
 
   private void insertOverflow(FileNodeProcessor fileNodeProcessor, long timestamp,
-                              TSRecord tsRecord, boolean isMonitor, String deviceId)
-          throws FileNodeManagerException {
+      TSRecord tsRecord, boolean isMonitor, String deviceId)
+      throws FileNodeManagerException {
     // get overflow processor
     OverflowProcessor overflowProcessor;
     String filenodeName = fileNodeProcessor.getProcessorName();
@@ -343,7 +344,7 @@ public class FileNodeManager implements IStatistic, IService {
       overflowProcessor = fileNodeProcessor.getOverflowProcessor(filenodeName);
     } catch (IOException e) {
       LOGGER.error("Get the overflow processor failed, the filenode is {}, insert time is {}",
-              filenodeName, timestamp);
+          filenodeName, timestamp);
       if (!isMonitor) {
         updateStatHashMapWhenFail(tsRecord);
       }
@@ -366,17 +367,16 @@ public class FileNodeManager implements IStatistic, IService {
   }
 
   private void insertBufferWrite(FileNodeProcessor fileNodeProcessor, long timestamp,
-                                 boolean isMonitor, TSRecord tsRecord, String deviceId)
-          throws FileNodeManagerException, FileNodeProcessorException {
+      boolean isMonitor, TSRecord tsRecord, String deviceId)
+      throws FileNodeManagerException, FileNodeProcessorException {
     // get bufferwrite processor
     BufferWriteProcessor bufferWriteProcessor;
     String filenodeName = fileNodeProcessor.getProcessorName();
     try {
       bufferWriteProcessor = fileNodeProcessor.getBufferWriteProcessor(filenodeName, timestamp);
     } catch (FileNodeProcessorException e) {
-      LOGGER
-              .error("Get the bufferwrite processor failed, the filenode is {}, insert time is {}",
-                      filenodeName, timestamp);
+      LOGGER.error("Get the bufferwrite processor failed, the filenode is {}, insert time is {}",
+          filenodeName, timestamp);
       if (!isMonitor) {
         updateStatHashMapWhenFail(tsRecord);
       }
@@ -388,8 +388,7 @@ public class FileNodeManager implements IStatistic, IService {
       String bufferwriteBaseDir = bufferWriteProcessor.getBaseDir();
       String bufferwriteRelativePath = bufferWriteProcessor.getFileRelativePath();
       try {
-        fileNodeProcessor
-                .addIntervalFileNode(bufferwriteBaseDir, bufferwriteRelativePath);
+        fileNodeProcessor.addIntervalFileNode(bufferwriteBaseDir, bufferwriteRelativePath);
       } catch (Exception e) {
         if (!isMonitor) {
           updateStatHashMapWhenFail(tsRecord);
@@ -412,15 +411,15 @@ public class FileNodeManager implements IStatistic, IService {
     }
 
     if (bufferWriteProcessor
-            .getFileSize() > IoTDBDescriptor.getInstance()
-            .getConfig().bufferwriteFileSizeThreshold) {
+        .getFileSize() > IoTDBDescriptor.getInstance()
+        .getConfig().bufferwriteFileSizeThreshold) {
       if (LOGGER.isInfoEnabled()) {
         LOGGER.info(
-                "The filenode processor {} will close the bufferwrite processor, "
-                        + "because the size[{}] of tsfile {} reaches the threshold {}",
-                filenodeName, MemUtils.bytesCntToStr(bufferWriteProcessor.getFileSize()),
-                bufferWriteProcessor.getFileName(), MemUtils.bytesCntToStr(
-                        IoTDBDescriptor.getInstance().getConfig().bufferwriteFileSizeThreshold));
+            "The filenode processor {} will close the bufferwrite processor, "
+                + "because the size[{}] of tsfile {} reaches the threshold {}",
+            filenodeName, MemUtils.bytesCntToStr(bufferWriteProcessor.getFileSize()),
+            bufferWriteProcessor.getFileName(), MemUtils.bytesCntToStr(
+                IoTDBDescriptor.getInstance().getConfig().bufferwriteFileSizeThreshold));
       }
 
       fileNodeProcessor.closeBufferWrite();
@@ -431,8 +430,8 @@ public class FileNodeManager implements IStatistic, IService {
    * update data.
    */
   public void update(String deviceId, String measurementId, long startTime, long endTime,
-                     TSDataType type, String v)
-          throws FileNodeManagerException {
+      TSDataType type, String v)
+      throws FileNodeManagerException {
 
     FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, true);
     try {
@@ -440,8 +439,8 @@ public class FileNodeManager implements IStatistic, IService {
       long lastUpdateTime = fileNodeProcessor.getLastUpdateTime(deviceId);
       if (startTime > lastUpdateTime) {
         LOGGER.warn("The update range is error, startTime {} is great than lastUpdateTime {}",
-                startTime,
-                lastUpdateTime);
+            startTime,
+            lastUpdateTime);
         return;
       }
       long finalEndTime = endTime > lastUpdateTime ? lastUpdateTime : endTime;
@@ -453,9 +452,9 @@ public class FileNodeManager implements IStatistic, IService {
         overflowProcessor = fileNodeProcessor.getOverflowProcessor(filenodeName);
       } catch (IOException e) {
         LOGGER.error(
-                "Get the overflow processor failed, the filenode is {}, "
-                        + "insert time range is from {} to {}",
-                filenodeName, startTime, finalEndTime);
+            "Get the overflow processor failed, the filenode is {}, "
+                + "insert time range is from {} to {}",
+            filenodeName, startTime, finalEndTime);
         throw new FileNodeManagerException(e);
       }
       overflowProcessor.update(deviceId, measurementId, startTime, finalEndTime, type, v);
@@ -466,10 +465,9 @@ public class FileNodeManager implements IStatistic, IService {
       // write wal
       try {
         if (IoTDBDescriptor.getInstance().getConfig().enableWal) {
-          overflowProcessor.getLogNode()
-                  .write(
-                          new UpdatePlan(startTime, finalEndTime, v, new Path(deviceId
-                                  + "." + measurementId)));
+          overflowProcessor.getLogNode().write(
+              new UpdatePlan(startTime, finalEndTime, v, new Path(deviceId
+                  + "." + measurementId)));
         }
       } catch (IOException e) {
         throw new FileNodeManagerException(e);
@@ -482,64 +480,97 @@ public class FileNodeManager implements IStatistic, IService {
   /**
    * delete data.
    */
-  public void delete(String deviceId, String measurementId, long timestamp, TSDataType type)
-          throws FileNodeManagerException {
+  public void delete(String deviceId, String measurementId, long timestamp)
+      throws FileNodeManagerException {
 
     FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, true);
     try {
       long lastUpdateTime = fileNodeProcessor.getLastUpdateTime(deviceId);
       // no tsfile data, the delete operation is invalid
       if (lastUpdateTime == -1) {
-        LOGGER.warn(
-                "The last update time is -1, delete overflow is invalid"
-                        + ", the filenode processor is {}",
-                fileNodeProcessor.getProcessorName());
+        LOGGER.warn("The last update time is -1, delete overflow is invalid, "
+                + "the filenode processor is {}",
+            fileNodeProcessor.getProcessorName());
       } else {
-        long t = timestamp > lastUpdateTime ? lastUpdateTime : timestamp;
+        // write wal
+        if (IoTDBDescriptor.getInstance().getConfig().enableWal) {
+          // get processors for wal
+          String filenodeName = fileNodeProcessor.getProcessorName();
+          OverflowProcessor overflowProcessor;
+          BufferWriteProcessor bufferWriteProcessor;
+          try {
+            overflowProcessor = fileNodeProcessor.getOverflowProcessor(filenodeName);
+            bufferWriteProcessor = fileNodeProcessor.getBufferWriteProcessor();
+          } catch (IOException | FileNodeProcessorException e) {
+            LOGGER.error("Getting the processor failed, the filenode is {}, delete time is {}.",
+                filenodeName, timestamp);
+            throw new FileNodeManagerException(e);
+          }
+          try {
+            overflowProcessor.getLogNode().write(new DeletePlan(timestamp,
+                    new Path(deviceId + "." + measurementId)));
+            bufferWriteProcessor.getLogNode().write(new DeletePlan(timestamp,
+                    new Path(deviceId + "." + measurementId)));
+          } catch (IOException e) {
+            throw new FileNodeManagerException(e);
+          }
+        }
 
-        String filenodeName = fileNodeProcessor.getProcessorName();
-        // get overflow processor
-        OverflowProcessor overflowProcessor;
         try {
-          overflowProcessor = fileNodeProcessor.getOverflowProcessor(filenodeName);
+          fileNodeProcessor.delete(deviceId, measurementId, timestamp);
         } catch (IOException e) {
-          LOGGER.error("Get the overflow processor failed, the filenode is {}, delete time is {}.",
-                  filenodeName, timestamp);
           throw new FileNodeManagerException(e);
         }
-        overflowProcessor.delete(deviceId, measurementId, t, type);
         // change the type of tsfile to overflowed
-        fileNodeProcessor.changeTypeToChangedForDelete(deviceId, t);
-        fileNodeProcessor.setOverflowed(true);
-        fileNodeProcessor.changeTypeToChangedForDelete(deviceId, t);
+        fileNodeProcessor.changeTypeToChangedForDelete(deviceId, timestamp);
         fileNodeProcessor.setOverflowed(true);
 
-        // write wal
-        writeDeleteWAL(overflowProcessor, deviceId, measurementId, t);
       }
     } finally {
       fileNodeProcessor.writeUnlock();
     }
   }
 
-  private void writeDeleteWAL(OverflowProcessor overflowProcessor, String deviceId,
-                              String measurementId, long t) throws FileNodeManagerException {
+  /**
+   * Similar to delete(), but only deletes data in BufferWrite. Only used by WAL recovery.
+   */
+  public void deleteBufferWrite(String deviceId, String measurementId, long timestamp)
+      throws FileNodeManagerException {
+    FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, true);
     try {
-      if (IoTDBDescriptor.getInstance().getConfig().enableWal) {
-        overflowProcessor.getLogNode()
-                .write(new DeletePlan(t, new Path(deviceId + "." + measurementId)));
-      }
+      fileNodeProcessor.deleteBufferWrite(deviceId, measurementId, timestamp);
     } catch (IOException e) {
       throw new FileNodeManagerException(e);
+    } finally {
+      fileNodeProcessor.writeUnlock();
     }
+    // change the type of tsfile to overflowed
+    fileNodeProcessor.changeTypeToChangedForDelete(deviceId, timestamp);
+    fileNodeProcessor.setOverflowed(true);
   }
 
   /**
-   * try to delete the filenode processor.
+   * Similar to delete(), but only deletes data in Overflow. Only used by WAL recovery.
    */
+  public void deleteOverflow(String deviceId, String measurementId, long timestamp)
+      throws FileNodeManagerException {
+    FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, true);
+    try {
+      fileNodeProcessor.deleteOverflow(deviceId, measurementId, timestamp);
+    } catch (IOException e) {
+      throw new FileNodeManagerException(e);
+    } finally {
+      fileNodeProcessor.writeUnlock();
+    }
+    // change the type of tsfile to overflowed
+    fileNodeProcessor.changeTypeToChangedForDelete(deviceId, timestamp);
+    fileNodeProcessor.setOverflowed(true);
+  }
+
+
   private void delete(String processorName,
-                      Iterator<Map.Entry<String, FileNodeProcessor>> processorIterator)
-          throws FileNodeManagerException {
+      Iterator<Map.Entry<String, FileNodeProcessor>> processorIterator)
+      throws FileNodeManagerException {
     if (!processorMap.containsKey(processorName)) {
       LOGGER.warn("The processorMap doesn't contain the filenode processor {}.", processorName);
       return;
@@ -577,7 +608,7 @@ public class FileNodeManager implements IStatistic, IService {
     FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, true);
     try {
       LOGGER.debug("Get the FileNodeProcessor: filenode is {}, begin query.",
-              fileNodeProcessor.getProcessorName());
+          fileNodeProcessor.getProcessorName());
       return fileNodeProcessor.addMultiPassLock();
     } finally {
       fileNodeProcessor.writeUnlock();
@@ -587,13 +618,13 @@ public class FileNodeManager implements IStatistic, IService {
   /**
    * query data.
    */
-  public QueryDataSource query(SingleSeriesExpression seriesExpression)
-          throws FileNodeManagerException {
+  public QueryDataSource query(SingleSeriesExpression seriesExpression, QueryContext context)
+      throws FileNodeManagerException {
     String deviceId = seriesExpression.getSeriesPath().getDevice();
     String measurementId = seriesExpression.getSeriesPath().getMeasurement();
     FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, false);
     LOGGER.debug("Get the FileNodeProcessor: filenode is {}, query.",
-            fileNodeProcessor.getProcessorName());
+        fileNodeProcessor.getProcessorName());
     try {
       QueryDataSource queryDataSource;
       // query operation must have overflow processor
@@ -602,16 +633,15 @@ public class FileNodeManager implements IStatistic, IService {
           fileNodeProcessor.getOverflowProcessor(fileNodeProcessor.getProcessorName());
         } catch (IOException e) {
           LOGGER.error("Get the overflow processor failed, the filenode is {}, query is {},{}",
-                  fileNodeProcessor.getProcessorName(), deviceId, measurementId);
+              fileNodeProcessor.getProcessorName(), deviceId, measurementId);
           throw new FileNodeManagerException(e);
         }
       }
       try {
-        queryDataSource = fileNodeProcessor
-                .query(deviceId, measurementId);
+        queryDataSource = fileNodeProcessor.query(deviceId, measurementId, context);
       } catch (FileNodeProcessorException e) {
         LOGGER.error("Query error: the deviceId {}, the measurementId {}", deviceId, measurementId,
-                e);
+            e);
         throw new FileNodeManagerException(e);
       }
       // return query structure
@@ -629,7 +659,7 @@ public class FileNodeManager implements IStatistic, IService {
     FileNodeProcessor fileNodeProcessor = getProcessor(deviceId, true);
     try {
       LOGGER.debug("Get the FileNodeProcessor: {} end query.",
-              fileNodeProcessor.getProcessorName());
+          fileNodeProcessor.getProcessorName());
       fileNodeProcessor.removeMultiPassLock(token);
     } finally {
       fileNodeProcessor.writeUnlock();
@@ -641,11 +671,10 @@ public class FileNodeManager implements IStatistic, IService {
    * transmission module</b>
    *
    * @param fileNodeName the seriesPath of storage group
-   * @param appendFile   the appended tsfile information
+   * @param appendFile the appended tsfile information
    */
   public boolean appendFileToFileNode(String fileNodeName, IntervalFileNode appendFile,
-                                      String appendFilePath)
-          throws FileNodeManagerException {
+      String appendFilePath) throws FileNodeManagerException {
     FileNodeProcessor fileNodeProcessor = getProcessor(fileNodeName, true);
     try {
       // check append file
@@ -671,11 +700,10 @@ public class FileNodeManager implements IStatistic, IService {
    * get all overlap tsfiles which are conflict with the appendFile.
    *
    * @param fileNodeName the seriesPath of storage group
-   * @param appendFile   the appended tsfile information
+   * @param appendFile the appended tsfile information
    */
   public List<String> getOverlapFilesFromFileNode(String fileNodeName, IntervalFileNode appendFile,
-                                                  String uuid)
-          throws FileNodeManagerException {
+      String uuid) throws FileNodeManagerException {
     FileNodeProcessor fileNodeProcessor = getProcessor(fileNodeName, true);
     List<String> overlapFiles;
     try {
@@ -696,7 +724,7 @@ public class FileNodeManager implements IStatistic, IService {
   public void mergeAll() throws FileNodeManagerException {
     if (fileNodeManagerStatus != FileNodeManagerStatus.NONE) {
       LOGGER.warn("Failed to merge all overflowed filenode, because filenode manager status is {}",
-              fileNodeManagerStatus);
+          fileNodeManagerStatus);
       return;
     }
 
@@ -730,9 +758,9 @@ public class FileNodeManager implements IStatistic, IService {
       while (!task.isDone()) {
         try {
           LOGGER.info(
-                  "Waiting for the end of merge, already waiting for {}s, "
-                          + "continue to wait anothor {}s",
-                  totalTime, time);
+              "Waiting for the end of merge, already waiting for {}s, "
+                  + "continue to wait anothor {}s",
+              totalTime, time);
           TimeUnit.SECONDS.sleep(time);
           totalTime += time;
           time = updateWaitTime(time);
@@ -799,9 +827,9 @@ public class FileNodeManager implements IStatistic, IService {
       cleanBufferWrite(processorName);
 
       MultiFileLogNodeManager.getInstance()
-              .deleteNode(processorName + IoTDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX);
+          .deleteNode(processorName + IoTDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX);
       MultiFileLogNodeManager.getInstance()
-              .deleteNode(processorName + IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX);
+          .deleteNode(processorName + IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX);
     } catch (IOException e) {
       LOGGER.error("Delete the filenode processor {} error.", processorName, e);
       throw new FileNodeManagerException(e);
@@ -816,8 +844,9 @@ public class FileNodeManager implements IStatistic, IService {
       bufferwritePath = standardizeDir(bufferwritePath) + processorName;
       File bufferDir = new File(bufferwritePath);
       // free and close the streams under this bufferwrite directory
-      if (!bufferDir.exists())
+      if (!bufferDir.exists()) {
         continue;
+      }
       File[] bufferFiles = bufferDir.listFiles();
       if (bufferFiles != null) {
         for (File bufferFile : bufferFiles) {
@@ -841,9 +870,9 @@ public class FileNodeManager implements IStatistic, IService {
             break;
           } else {
             LOGGER.info(
-                    "Can't delete the filenode processor {}, "
-                            + "because the filenode processor can't be closed."
-                            + " Wait 100ms to retry");
+                "Can't delete the filenode processor {}, "
+                    + "because the filenode processor can't be closed."
+                    + " Wait 100ms to retry");
           }
         } catch (ProcessorException e) {
           LOGGER.error("Delete the filenode processor {} error.", processorName, e);
@@ -853,8 +882,8 @@ public class FileNodeManager implements IStatistic, IService {
         }
       } else {
         LOGGER.info(
-                "Can't delete the filenode processor {}, because it can't get the write lock."
-                        + " Wait 100ms to retry", processorName);
+            "Can't delete the filenode processor {}, because it can't get the write lock."
+                + " Wait 100ms to retry", processorName);
       }
       try {
         TimeUnit.MILLISECONDS.sleep(100);
@@ -868,8 +897,8 @@ public class FileNodeManager implements IStatistic, IService {
   private String standardizeDir(String originalPath) {
     String res = originalPath;
     if ((originalPath.length() > 0
-            && originalPath.charAt(originalPath.length() - 1) != File.separatorChar)
-            || originalPath.length() == 0) {
+        && originalPath.charAt(originalPath.length() - 1) != File.separatorChar)
+        || originalPath.length() == 0) {
       res = originalPath + File.separatorChar;
     }
     return res;
@@ -879,7 +908,7 @@ public class FileNodeManager implements IStatistic, IService {
    * add time series.
    */
   public void addTimeSeries(Path path, String dataType, String encoding)
-          throws FileNodeManagerException {
+      throws FileNodeManagerException {
     FileNodeProcessor fileNodeProcessor = getProcessor(path.getFullPath(), true);
     try {
       fileNodeProcessor.addTimeSeries(path.getMeasurement(), dataType, encoding);
@@ -902,7 +931,7 @@ public class FileNodeManager implements IStatistic, IService {
       while (!closeOneProcessor(processorName)) {
         try {
           LOGGER.info("Can't force to close the filenode processor {}, wait 100ms to retry",
-                  processorName);
+              processorName);
           TimeUnit.MILLISECONDS.sleep(100);
         } catch (InterruptedException e) {
           // ignore the interrupted exception
@@ -919,7 +948,7 @@ public class FileNodeManager implements IStatistic, IService {
    * try to close the filenode processor.
    */
   private void close(String processorName) throws FileNodeManagerException {
-    if (processorMap.containsKey(processorName)) {
+    if (!processorMap.containsKey(processorName)) {
       LOGGER.warn("The processorMap doesn't contain the filenode processor {}.", processorName);
       return;
     }
@@ -959,7 +988,7 @@ public class FileNodeManager implements IStatistic, IService {
     fileNodeManagerStatus = FileNodeManagerStatus.CLOSE;
     try {
       Iterator<Map.Entry<String, FileNodeProcessor>> processorIterator = processorMap.entrySet()
-              .iterator();
+          .iterator();
       while (processorIterator.hasNext()) {
         Map.Entry<String, FileNodeProcessor> processorEntry = processorIterator.next();
         delete(processorEntry.getKey(), processorIterator);
@@ -1020,7 +1049,7 @@ public class FileNodeManager implements IStatistic, IService {
       // flush task
       case SAFE:
         if (FlushManager.getInstance().getActiveCnt() < 0.5 * FlushManager.getInstance()
-                .getThreadCnt()) {
+            .getThreadCnt()) {
           try {
             flushTop(0.01f);
           } catch (IOException e) {
@@ -1053,9 +1082,9 @@ public class FileNodeManager implements IStatistic, IService {
     // sort the tempProcessors as descending order
     tempProcessors.sort((o1, o2) -> (int) (o2.memoryUsage() - o1.memoryUsage()));
     int flushNum =
-            (int) (tempProcessors.size() * percentage) > 1
-                    ? (int) (tempProcessors.size() * percentage)
-                    : 1;
+        (int) (tempProcessors.size() * percentage) > 1
+            ? (int) (tempProcessors.size() * percentage)
+            : 1;
     for (int i = 0; i < flushNum && i < tempProcessors.size(); i++) {
       FileNodeProcessor processor = tempProcessors.get(i);
       // 64M
@@ -1109,10 +1138,10 @@ public class FileNodeManager implements IStatistic, IService {
    * recover filenode.
    */
   public void recoverFileNode(String filenodeName)
-          throws FileNodeManagerException {
+      throws FileNodeManagerException {
     FileNodeProcessor fileNodeProcessor = getProcessor(filenodeName, true);
     LOGGER.info("Recover the filenode processor, the filenode is {}, the status is {}",
-            filenodeName, fileNodeProcessor.getFileNodeProcessorStatus());
+        filenodeName, fileNodeProcessor.getFileNodeProcessorStatus());
     try {
       fileNodeProcessor.fileNodeRecovery();
     } catch (FileNodeProcessorException e) {
@@ -1131,5 +1160,5 @@ public class FileNodeManager implements IStatistic, IService {
 
     private static final FileNodeManager INSTANCE = new FileNodeManager(TsFileDBConf.fileNodeDir);
   }
-  
+
 }
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index bba4250..a4fe3ee 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.engine.filenode;
 
 import java.io.File;
@@ -38,9 +39,11 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.Directories;
 import org.apache.iotdb.db.engine.Processor;
@@ -48,6 +51,9 @@ import org.apache.iotdb.db.engine.bufferwrite.Action;
 import org.apache.iotdb.db.engine.bufferwrite.ActionException;
 import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
 import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.overflow.io.OverflowProcessor;
 import org.apache.iotdb.db.engine.pool.MergeManager;
 import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
@@ -55,6 +61,8 @@ import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.querycontext.UnsealedTsFile;
+import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
+import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.BufferWriteProcessorException;
 import org.apache.iotdb.db.exception.ErrorDebugException;
 import org.apache.iotdb.db.exception.FileNodeProcessorException;
@@ -66,10 +74,12 @@ import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.monitor.IStatistic;
 import org.apache.iotdb.db.monitor.MonitorConstants;
 import org.apache.iotdb.db.monitor.StatMonitor;
+import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
 import org.apache.iotdb.db.query.reader.IReader;
 import org.apache.iotdb.db.utils.FileSchemaUtils;
 import org.apache.iotdb.db.utils.MemUtils;
+import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
@@ -198,6 +208,13 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   };
   // Token for query which used to
   private int multiPassLockToken = 0;
+  private VersionController versionController;
+  private ReentrantLock mergeDeleteLock = new ReentrantLock();
+
+  /**
+   * This is the modification file of the result of the current merge.
+   */
+  private ModificationFile mergingModification;
 
   private TsFileIOWriter mergeFileWriter = null;
   private String mergeOutputPath = null;
@@ -278,6 +295,11 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       registStatMetadata();
       statMonitor.registStatistics(statStorageDeltaName, this);
     }
+    try {
+      versionController = new SimpleFileVersionController(fileNodeDirPath);
+    } catch (IOException e) {
+      throw new FileNodeProcessorException(e);
+    }
   }
 
   @Override
@@ -290,8 +312,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     Map<String, String> hashMap = new HashMap<>();
     for (MonitorConstants.FileNodeProcessorStatConstants statConstant :
         MonitorConstants.FileNodeProcessorStatConstants.values()) {
-      hashMap.put(statStorageDeltaName + MonitorConstants.MONITOR_PATH_SEPERATOR + statConstant.name(),
-          MonitorConstants.DATA_TYPE);
+      hashMap
+          .put(statStorageDeltaName + MonitorConstants.MONITOR_PATH_SEPERATOR + statConstant.name(),
+              MonitorConstants.DATA_TYPE);
     }
     StatMonitor.getInstance().registStatStorageGroup(hashMap);
   }
@@ -312,6 +335,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     Long curTime = System.currentTimeMillis();
     HashMap<String, TSRecord> tsRecordHashMap = new HashMap<>();
     TSRecord tsRecord = new TSRecord(curTime, statStorageDeltaName);
+
     Map<String, AtomicLong> hashMap = getStatParamsHashMap();
     tsRecord.dataPointList = new ArrayList<>();
     for (Map.Entry<String, AtomicLong> entry : hashMap.entrySet()) {
@@ -325,8 +349,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   /**
    * add interval FileNode.
    */
-  void addIntervalFileNode(String baseDir, String fileName)
-      throws ActionException {
+  void addIntervalFileNode(String baseDir, String fileName) throws ActionException {
 
     IntervalFileNode intervalFileNode = new IntervalFileNode(OverflowChangeType.NO_CHANGE, baseDir,
         fileName);
@@ -437,7 +460,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
 
       try {
         bufferWriteProcessor = new BufferWriteProcessor(baseDir, getProcessorName(),
-            fileNames[fileNames.length - 1], parameters, fileSchema);
+            fileNames[fileNames.length - 1], parameters, versionController, fileSchema);
       } catch (BufferWriteProcessorException e) {
         // unlock
         writeUnlock();
@@ -454,7 +477,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     parameters.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowFlushAction);
     parameters.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
     try {
-      overflowProcessor = new OverflowProcessor(getProcessorName(), parameters, fileSchema);
+      overflowProcessor = new OverflowProcessor(getProcessorName(), parameters, fileSchema,
+          versionController);
     } catch (IOException e) {
       writeUnlock();
       LOGGER.error("The filenode processor {} failed to recovery the overflow processor.",
@@ -468,14 +492,12 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       // re-merge all file
       // if bufferwrite processor is not null, and close
       LOGGER.info("The filenode processor {} is recovering, the filenode status is {}.",
-          getProcessorName(),
-          isMerging);
+          getProcessorName(), isMerging);
       merge();
     } else if (isMerging == FileNodeProcessorStatus.WAITING) {
       // unlock
       LOGGER.info("The filenode processor {} is recovering, the filenode status is {}.",
-          getProcessorName(),
-          isMerging);
+          getProcessorName(), isMerging);
       writeUnlock();
       switchWaitingToWorking();
     } else {
@@ -501,8 +523,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       // construct processor or restore
       try {
         bufferWriteProcessor = new BufferWriteProcessor(baseDir, processorName,
-            insertTime + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR + System.currentTimeMillis(),
-            params, fileSchema);
+            insertTime + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR
+                + System.currentTimeMillis(),
+            params, versionController, fileSchema);
       } catch (BufferWriteProcessorException e) {
         LOGGER.error("The filenode processor {} failed to get the bufferwrite processor.",
             processorName, e);
@@ -528,12 +551,13 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    */
   public OverflowProcessor getOverflowProcessor(String processorName) throws IOException {
     if (overflowProcessor == null) {
-      Map<String, Action> paramparams = new HashMap<>();
+      Map<String, Action> params = new HashMap<>();
       // construct processor or restore
-      paramparams.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowFlushAction);
-      paramparams
+      params.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowFlushAction);
+      params
           .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
-      overflowProcessor = new OverflowProcessor(processorName, paramparams, fileSchema);
+      overflowProcessor = new OverflowProcessor(processorName, params, fileSchema,
+          versionController);
     }
     return overflowProcessor;
   }
@@ -675,7 +699,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    * @return index of interval
    */
   private int searchIndexNodeByTimestamp(String deviceId, long timestamp,
-                                         List<IntervalFileNode> fileList) {
+      List<IntervalFileNode> fileList) {
     int index = 1;
     while (index < fileList.size()) {
       if (timestamp < fileList.get(index).getStartTime(deviceId)) {
@@ -702,15 +726,13 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   }
 
   /**
-   * remove multiple pass lock.
-   * TODO: use the return value or remove it.
+   * remove multiple pass lock. TODO: use the return value or remove it.
    */
   public boolean removeMultiPassLock(int token) {
     if (newMultiPassTokenSet.contains(token)) {
       newMultiPassLock.readLock().unlock();
       newMultiPassTokenSet.remove(token);
-      LOGGER
-          .debug("Remove multi token:{}, nspath:{}, new set:{}, lock:{}", token,
+      LOGGER.debug("Remove multi token:{}, nspath:{}, new set:{}, lock:{}", token,
               getProcessorName(),
               newMultiPassTokenSet, newMultiPassLock);
       return true;
@@ -732,7 +754,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   /**
    * query data.
    */
-  public <T extends Comparable<T>> QueryDataSource query(String deviceId, String measurementId)
+  public <T extends Comparable<T>> QueryDataSource query(String deviceId, String measurementId,
+      QueryContext context)
       throws FileNodeProcessorException {
     // query overflow data
     TSDataType dataType;
@@ -743,7 +766,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     }
     OverflowSeriesDataSource overflowSeriesDataSource;
     try {
-      overflowSeriesDataSource = overflowProcessor.query(deviceId, measurementId, dataType);
+      overflowSeriesDataSource = overflowProcessor.query(deviceId, measurementId, dataType,
+          context);
     } catch (IOException e) {
       throw new FileNodeProcessorException(e);
     }
@@ -755,8 +779,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
         bufferwriteDataInFiles.add(intervalFileNode.backUp());
       }
     }
-    Pair<ReadOnlyMemChunk, List<ChunkMetaData>> bufferwritedata
-        = new Pair<>(null, null);
+    Pair<ReadOnlyMemChunk, List<ChunkMetaData>> bufferwritedata = new Pair<>(null, null);
     // bufferwrite data
     UnsealedTsFile unsealedTsFile = null;
 
@@ -776,6 +799,19 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       }
       bufferwritedata = bufferWriteProcessor
           .queryBufferWriteData(deviceId, measurementId, dataType);
+
+      try {
+        List<Modification> pathModifications = context.getPathModifications(
+            currentIntervalFileNode.getModFile(), deviceId
+                + IoTDBConstant.PATH_SEPARATOR + measurementId
+        );
+        if (!pathModifications.isEmpty()) {
+          QueryUtils.modifyChunkMetaData(bufferwritedata.right, pathModifications);
+        }
+      } catch (IOException e) {
+        throw new FileNodeProcessorException(e);
+      }
+
       unsealedTsFile.setTimeSeriesChunkMetaDatas(bufferwritedata.right);
     }
     GlobalSortedSeriesDataSource globalSortedSeriesDataSource = new GlobalSortedSeriesDataSource(
@@ -788,7 +824,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   /**
    * append one specified tsfile to this filenode processor.
    *
-   * @param appendFile     the appended tsfile information
+   * @param appendFile the appended tsfile information
    * @param appendFilePath the seriesPath of appended file
    */
   public void appendFile(IntervalFileNode appendFile, String appendFilePath)
@@ -852,7 +888,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   }
 
   private void getOverlapFiles(IntervalFileNode appendFile, IntervalFileNode intervalFileNode,
-                               String uuid, List<String> overlapFiles) throws IOException {
+      String uuid, List<String> overlapFiles) throws IOException {
     for (Entry<String, Long> entry : appendFile.getStartTimeMap().entrySet()) {
       if (intervalFileNode.getStartTimeMap().containsKey(entry.getKey()) &&
           intervalFileNode.getEndTime(entry.getKey()) >= entry.getValue()
@@ -1352,7 +1388,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   }
 
   private void collectBufferWriteDirs(List<String> bufferwriteDirPathList,
-                                      List<File> bufferwriteDirList) {
+      List<File> bufferwriteDirList) {
     for (String bufferwriteDirPath : bufferwriteDirPathList) {
       if (bufferwriteDirPath.length() > 0
           && bufferwriteDirPath.charAt(bufferwriteDirPath.length() - 1)
@@ -1408,48 +1444,59 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     mergeOutputPath = null;
     mergeBaseDir = null;
     mergeFileName = null;
-    for (String deviceId : backupIntervalFile.getStartTimeMap().keySet()) {
-      // query one deviceId
-      List<Path> pathList = new ArrayList<>();
-      mergeIsChunkGroupHasData = false;
-      mergeStartPos = -1;
-      ChunkGroupFooter footer;
-      int numOfChunk = 0;
-      try {
-        List<String> pathStrings = mManager.getLeafNodePathInNextLevel(deviceId);
-        for (String string : pathStrings) {
-          pathList.add(new Path(string));
+    // modifications are blocked before mergeModification is created to avoid
+    // losing some modification.
+    mergeDeleteLock.lock();
+    QueryContext context = new QueryContext();
+    try {
+      for (String deviceId : backupIntervalFile.getStartTimeMap().keySet()) {
+        // query one deviceId
+        List<Path> pathList = new ArrayList<>();
+        mergeIsChunkGroupHasData = false;
+        mergeStartPos = -1;
+        ChunkGroupFooter footer;
+        int numOfChunk = 0;
+        try {
+          List<String> pathStrings = mManager.getLeafNodePathInNextLevel(deviceId);
+          for (String string : pathStrings) {
+            pathList.add(new Path(string));
+          }
+        } catch (PathErrorException e) {
+          LOGGER.error("Can't get all the paths from MManager, the deviceId is {}", deviceId);
+          throw new FileNodeProcessorException(e);
+        }
+        if (pathList.isEmpty()) {
+          continue;
+        }
+        for (Path path : pathList) {
+          // query one measurement in the special deviceId
+          String measurementId = path.getMeasurement();
+          TSDataType dataType = mManager.getSeriesType(path.getFullPath());
+          OverflowSeriesDataSource overflowSeriesDataSource = overflowProcessor.queryMerge(deviceId,
+              measurementId, dataType, true, context);
+          Filter timeFilter = FilterFactory
+              .and(TimeFilter.gtEq(backupIntervalFile.getStartTime(deviceId)),
+                  TimeFilter.ltEq(backupIntervalFile.getEndTime(deviceId)));
+          SingleSeriesExpression seriesFilter = new SingleSeriesExpression(path, timeFilter);
+          IReader seriesReader = SeriesReaderFactory.getInstance()
+              .createSeriesReaderForMerge(backupIntervalFile,
+                  overflowSeriesDataSource, seriesFilter, context);
+          numOfChunk += queryAndWriteSeries(seriesReader, path, seriesFilter, dataType,
+              startTimeMap, endTimeMap);
+        }
+        if (mergeIsChunkGroupHasData) {
+          // end the new rowGroupMetadata
+          long size = mergeFileWriter.getPos() - mergeStartPos;
+          footer = new ChunkGroupFooter(deviceId, size, numOfChunk);
+          mergeFileWriter.endChunkGroup(footer, 0);
         }
-      } catch (PathErrorException e) {
-        LOGGER.error("Can't get all the paths from MManager, the deviceId is {}", deviceId);
-        throw new FileNodeProcessorException(e);
-      }
-      if (pathList.isEmpty()) {
-        continue;
       }
-      for (Path path : pathList) {
-        // query one measurement in the special deviceId
-        String measurementId = path.getMeasurement();
-        TSDataType dataType = mManager.getSeriesType(path.getFullPath());
-        OverflowSeriesDataSource overflowSeriesDataSource = overflowProcessor.queryMerge(deviceId,
-            measurementId, dataType, true);
-        Filter timeFilter = FilterFactory
-            .and(TimeFilter.gtEq(backupIntervalFile.getStartTime(deviceId)),
-                TimeFilter.ltEq(backupIntervalFile.getEndTime(deviceId)));
-        SingleSeriesExpression seriesFilter = new SingleSeriesExpression(path, timeFilter);
-        IReader seriesReader = SeriesReaderFactory.getInstance()
-            .createSeriesReaderForMerge(backupIntervalFile,
-                overflowSeriesDataSource, seriesFilter);
-        numOfChunk += queryAndWriteSeries(seriesReader, path, seriesFilter, dataType,
-            startTimeMap, endTimeMap);
-      }
-      if (mergeIsChunkGroupHasData) {
-        // end the new rowGroupMetadata
-        long size = mergeFileWriter.getPos() - mergeStartPos;
-        footer = new ChunkGroupFooter(deviceId, size, numOfChunk);
-        mergeFileWriter.endChunkGroup(footer);
+    } finally {
+      if (mergeDeleteLock.isLocked()) {
+        mergeDeleteLock.unlock();
       }
     }
+
     if (mergeFileWriter != null) {
       mergeFileWriter.endFile(fileSchema);
     }
@@ -1458,12 +1505,14 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     backupIntervalFile.setOverflowChangeType(OverflowChangeType.NO_CHANGE);
     backupIntervalFile.setStartTimeMap(startTimeMap);
     backupIntervalFile.setEndTimeMap(endTimeMap);
+    backupIntervalFile.setModFile(mergingModification);
+    mergingModification = null;
     return mergeFileName;
   }
 
   private int queryAndWriteSeries(IReader seriesReader, Path path,
-                                  SingleSeriesExpression seriesFilter, TSDataType dataType,
-                                  Map<String, Long> startTimeMap, Map<String, Long> endTimeMap)
+      SingleSeriesExpression seriesFilter, TSDataType dataType,
+      Map<String, Long> startTimeMap, Map<String, Long> endTimeMap)
       throws IOException {
     int numOfChunk = 0;
     try {
@@ -1482,6 +1531,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
               mergeFileName);
           mergeFileName = getProcessorName() + File.separatorChar + mergeFileName;
           mergeFileWriter = new TsFileIOWriter(new File(mergeOutputPath));
+          mergingModification = new ModificationFile(mergeOutputPath
+              + ModificationFile.FILE_SUFFIX);
+          mergeDeleteLock.unlock();
         }
         if (!mergeIsChunkGroupHasData) {
           // start a new rowGroupMetadata
@@ -1513,10 +1565,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
 
 
   private void writeOneSeries(String deviceId, ChunkWriterImpl seriesWriterImpl,
-                              TSDataType dataType, IReader seriesReader,
-                              Map<String, Long> startTimeMap,
-                              Map<String, Long> endTimeMap,
-                              TimeValuePair firstTVPair) throws IOException {
+      TSDataType dataType, IReader seriesReader, Map<String, Long> startTimeMap,
+      Map<String, Long> endTimeMap, TimeValuePair firstTVPair) throws IOException {
     long startTime;
     long endTime;
     TimeValuePair localTV = firstTVPair;
@@ -1539,7 +1589,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   }
 
   private void writeTVPair(ChunkWriterImpl seriesWriterImpl, TSDataType dataType,
-                           TimeValuePair timeValuePair) throws IOException {
+      TimeValuePair timeValuePair) throws IOException {
     switch (dataType) {
       case BOOLEAN:
         seriesWriterImpl.write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean());
@@ -1718,6 +1768,15 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   public void close() throws FileNodeProcessorException {
     closeBufferWrite();
     closeOverflow();
+    for (IntervalFileNode fileNode : newFileNodes) {
+      if (fileNode.getModFile() != null) {
+        try {
+          fileNode.getModFile().close();
+        } catch (IOException e) {
+          throw new FileNodeProcessorException(e);
+        }
+      }
+    }
   }
 
   /**
@@ -1731,6 +1790,15 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     }
     closeBufferWrite();
     closeOverflow();
+    for (IntervalFileNode fileNode : newFileNodes) {
+      if (fileNode.getModFile() != null) {
+        try {
+          fileNode.getModFile().close();
+        } catch (IOException e) {
+          throw new FileNodeProcessorException(e);
+        }
+      }
+    }
   }
 
   @Override
@@ -1781,11 +1849,116 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     return fileNodeRestoreFilePath;
   }
 
+  /**
+   * Delete data whose timestamp <= 'timestamp' and belong to timeseries deviceId.measurementId.
+   *
+   * @param deviceId the deviceId of the timeseries to be deleted.
+   * @param measurementId the measurementId of the timeseries to be deleted.
+   * @param timestamp the delete range is (0, timestamp].
+   */
+  public void delete(String deviceId, String measurementId, long timestamp) throws IOException {
+    // TODO: how to avoid partial deletion?
+    mergeDeleteLock.lock();
+    long version = versionController.nextVersion();
+
+    // record what files are updated so we can roll back them in case of exception
+    List<ModificationFile> updatedModFiles = new ArrayList<>();
+
+    try {
+      String fullPath = deviceId +
+          IoTDBConstant.PATH_SEPARATOR + measurementId;
+      Deletion deletion = new Deletion(fullPath, version, timestamp);
+      if (mergingModification != null) {
+        mergingModification.write(deletion);
+        updatedModFiles.add(mergingModification);
+      }
+      deleteBufferWriteFiles(deviceId, deletion, updatedModFiles);
+      // delete data in memory
+      OverflowProcessor overflowProcessor = getOverflowProcessor(getProcessorName());
+      overflowProcessor.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
+      if (bufferWriteProcessor != null) {
+        bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
+      }
+    } catch (Exception e) {
+      // roll back
+      for (ModificationFile modFile : updatedModFiles) {
+        modFile.abort();
+      }
+      throw new IOException(e);
+    } finally {
+      mergeDeleteLock.unlock();
+    }
+  }
+
+  private void deleteBufferWriteFiles(String deviceId, Deletion deletion,
+      List<ModificationFile> updatedModFiles) throws IOException {
+    if (currentIntervalFileNode != null && currentIntervalFileNode.containsDevice(deviceId)) {
+      currentIntervalFileNode.getModFile().write(deletion);
+      updatedModFiles.add(currentIntervalFileNode.getModFile());
+    }
+    for (IntervalFileNode fileNode : newFileNodes) {
+      if (fileNode != currentIntervalFileNode && fileNode.containsDevice(deviceId)
+          && fileNode.getStartTime(deviceId) <= deletion.getTimestamp()) {
+        fileNode.getModFile().write(deletion);
+        updatedModFiles.add(fileNode.getModFile());
+      }
+    }
+  }
+
+  /**
+   * Similar to delete(), but only deletes data in BufferWrite. Only used by WAL recovery.
+   */
+  public void deleteBufferWrite(String deviceId, String measurementId, long timestamp)
+      throws IOException {
+    String fullPath = deviceId +
+        IoTDBConstant.PATH_SEPARATOR + measurementId;
+    long version = versionController.nextVersion();
+    Deletion deletion = new Deletion(fullPath, version, timestamp);
+
+    List<ModificationFile> updatedModFiles = new ArrayList<>();
+    try {
+      deleteBufferWriteFiles(deviceId, deletion, updatedModFiles);
+    } catch (IOException e) {
+      for (ModificationFile modificationFile : updatedModFiles) {
+        modificationFile.abort();
+      }
+      throw e;
+    }
+    if (bufferWriteProcessor != null) {
+      bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
+    }
+  }
+
+  /**
+   * Similar to delete(), but only deletes data in Overflow. Only used by WAL recovery.
+   */
+  public void deleteOverflow(String deviceId, String measurementId, long timestamp)
+      throws IOException {
+    long version = versionController.nextVersion();
+
+    OverflowProcessor overflowProcessor = getOverflowProcessor(getProcessorName());
+    List<ModificationFile> updatedModFiles = new ArrayList<>();
+    try {
+      overflowProcessor.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
+    } catch (IOException e) {
+      for (ModificationFile modificationFile : updatedModFiles) {
+        modificationFile.abort();
+      }
+      throw e;
+    }
+  }
+
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-    if (!super.equals(o)) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
     FileNodeProcessor that = (FileNodeProcessor) o;
     return isOverflowed == that.isOverflowed &&
         numOfMergeFile == that.numOfMergeFile &&
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/IntervalFileNode.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/IntervalFileNode.java
index 4096ba8..6a63b61 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/IntervalFileNode.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/IntervalFileNode.java
@@ -27,6 +27,7 @@ import java.util.Objects;
 import java.util.Set;
 
 import org.apache.iotdb.db.conf.directories.Directories;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
 
 /**
  * This class is used to store one bufferwrite file status.<br>
@@ -42,6 +43,8 @@ public class IntervalFileNode implements Serializable {
   private Map<String, Long> endTimeMap;
   private Set<String> mergeChanged = new HashSet<>();
 
+  private transient ModificationFile modFile;
+
   public IntervalFileNode(Map<String, Long> startTimeMap, Map<String, Long> endTimeMap,
                           OverflowChangeType type, int baseDirIndex, String relativePath) {
 
@@ -51,7 +54,9 @@ public class IntervalFileNode implements Serializable {
 
     this.startTimeMap = startTimeMap;
     this.endTimeMap = endTimeMap;
-
+    this.modFile = new ModificationFile(
+        Directories.getInstance().getTsFileFolder(baseDirIndex) + File.separator
+            + relativePath + ModificationFile.FILE_SUFFIX);
   }
 
   /**
@@ -68,6 +73,9 @@ public class IntervalFileNode implements Serializable {
 
     startTimeMap = new HashMap<>();
     endTimeMap = new HashMap<>();
+    this.modFile = new ModificationFile(
+        Directories.getInstance().getTsFileFolder(baseDirIndex) + File.separator
+            + relativePath + ModificationFile.FILE_SUFFIX);
   }
 
   public IntervalFileNode(OverflowChangeType type, String baseDir, String relativePath) {
@@ -78,6 +86,9 @@ public class IntervalFileNode implements Serializable {
 
     startTimeMap = new HashMap<>();
     endTimeMap = new HashMap<>();
+    this.modFile = new ModificationFile(
+        Directories.getInstance().getTsFileFolder(baseDirIndex) + File.separator
+            + relativePath + ModificationFile.FILE_SUFFIX);
   }
 
   public IntervalFileNode(OverflowChangeType type, String relativePath) {
@@ -259,4 +270,20 @@ public class IntervalFileNode implements Serializable {
     this.overflowChangeType = overflowChangeType;
   }
 
+  public synchronized ModificationFile getModFile() {
+    if (modFile == null) {
+      modFile = new ModificationFile(
+          Directories.getInstance().getTsFileFolder(baseDirIndex) + File.separator
+              + relativePath + ModificationFile.FILE_SUFFIX);
+    }
+    return modFile;
+  }
+
+  public boolean containsDevice(String deviceId) {
+    return startTimeMap.containsKey(deviceId);
+  }
+
+  public void setModFile(ModificationFile modFile) {
+    this.modFile = modFile;
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 0596e61..ddbbfcf 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -19,7 +19,9 @@
 package org.apache.iotdb.db.engine.memtable;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 public abstract class AbstractMemTable implements IMemTable {
@@ -30,6 +32,10 @@ public abstract class AbstractMemTable implements IMemTable {
     this.memTableMap = new HashMap<>();
   }
 
+  public AbstractMemTable(Map<String, Map<String, IWritableMemChunk>> memTableMap) {
+    this.memTableMap = memTableMap;
+  }
+
   @Override
   public Map<String, Map<String, IWritableMemChunk>> getMemTableMap() {
     return memTableMap;
@@ -94,4 +100,61 @@ public abstract class AbstractMemTable implements IMemTable {
     return memTableMap.get(deviceId).get(measurement);
   }
 
+  @Override
+  public void delete(String deviceId, String measurementId, long timestamp) {
+    Map<String, IWritableMemChunk> deviceMap = memTableMap.get(deviceId);
+    if (deviceMap != null) {
+      IWritableMemChunk chunk = deviceMap.get(measurementId);
+      IWritableMemChunk newChunk = filterChunk(chunk, timestamp);
+      if (newChunk != null) {
+        deviceMap.put(measurementId, newChunk);
+      }
+    }
+  }
+
+  /**
+   * If chunk contains data with timestamp less than 'timestamp', create a copy and delete all those
+   * data. Otherwise return null.
+   *
+   * @param chunk the source chunk.
+   * @param timestamp the upper-bound of deletion time.
+   * @return A reduced copy of chunk if chunk contains data with timestamp less than 'timestamp', of
+   * null.
+   */
+  private IWritableMemChunk filterChunk(IWritableMemChunk chunk, long timestamp) {
+    List<TimeValuePair> timeValuePairs = chunk.getSortedTimeValuePairList();
+    if (!timeValuePairs.isEmpty() && timeValuePairs.get(0).getTimestamp() <= timestamp) {
+      TSDataType dataType = chunk.getType();
+      IWritableMemChunk newChunk = genMemSeries(dataType);
+      for (TimeValuePair pair : timeValuePairs) {
+        if (pair.getTimestamp() > timestamp) {
+          switch (dataType) {
+            case BOOLEAN:
+              newChunk.putBoolean(pair.getTimestamp(), pair.getValue().getBoolean());
+              break;
+            case DOUBLE:
+              newChunk.putDouble(pair.getTimestamp(), pair.getValue().getDouble());
+              break;
+            case INT64:
+              newChunk.putLong(pair.getTimestamp(), pair.getValue().getLong());
+              break;
+            case INT32:
+              newChunk.putInt(pair.getTimestamp(), pair.getValue().getInt());
+              break;
+            case FLOAT:
+              newChunk.putFloat(pair.getTimestamp(), pair.getValue().getFloat());
+              break;
+            case TEXT:
+              newChunk.putBinary(pair.getTimestamp(), pair.getValue().getBinary());
+              break;
+            default:
+                throw new UnsupportedOperationException("Unknown datatype: " + dataType);
+          }
+        }
+      }
+      return newChunk;
+    }
+    return null;
+  }
+
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index f1e75e3..d51715d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -46,4 +46,20 @@ public interface IMemTable {
 
   boolean isEmpty();
 
+  /**
+   * Delete data in it whose timestamp <= 'timestamp' and belonging to timeseries
+   * deviceId.measurementId.
+   *
+   * @param deviceId the deviceId of the timeseries to be deleted.
+   * @param measurementId the measurementId of the timeseries to be deleted.
+   * @param timestamp the upper-bound of deletion time.
+   */
+  void delete(String deviceId, String measurementId, long timestamp);
+
+  /**
+   * Make a copy of this MemTable.
+   *
+   * @return a MemTable with the same data as this one.
+   */
+  IMemTable copy();
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index aba946b..05b0522 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.engine.memtable;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 
 public interface IWritableMemChunk extends TimeValuePairSorter {
@@ -39,4 +40,6 @@ public interface IWritableMemChunk extends TimeValuePairSorter {
   void reset();
 
   int count();
+
+  TSDataType getType();
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java
index 1f39461..7692602 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushUtil.java
@@ -39,7 +39,7 @@ public class MemTableFlushUtil {
   private static final Logger LOGGER = LoggerFactory.getLogger(MemTableFlushUtil.class);
   private static final int PAGE_SIZE_THRESHOLD = TSFileConfig.pageSizeInByte;
 
-  private MemTableFlushUtil(){
+  private MemTableFlushUtil() {
 
   }
 
@@ -84,7 +84,7 @@ public class MemTableFlushUtil {
    * the function for flushing memtable.
    */
   public static void flushMemTable(FileSchema fileSchema, TsFileIOWriter tsFileIoWriter,
-      IMemTable imemTable)
+      IMemTable imemTable, long version)
       throws IOException {
     for (String deviceId : imemTable.getMemTableMap().keySet()) {
       long startPos = tsFileIoWriter.getPos();
@@ -102,7 +102,7 @@ public class MemTableFlushUtil {
       }
       long memSize = tsFileIoWriter.getPos() - startPos;
       ChunkGroupFooter footer = new ChunkGroupFooter(deviceId, memSize, seriesNumber);
-      tsFileIoWriter.endChunkGroup(footer);
+      tsFileIoWriter.endChunkGroup(footer, version);
     }
   }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
index 123e4d2..455196a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
@@ -16,14 +16,32 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.engine.memtable;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 public class PrimitiveMemTable extends AbstractMemTable {
 
+  public PrimitiveMemTable() {
+  }
+
+  public PrimitiveMemTable(Map<String, Map<String, IWritableMemChunk>> memTableMap) {
+    super(memTableMap);
+  }
+
   @Override
   protected IWritableMemChunk genMemSeries(TSDataType dataType) {
     return new WritableMemChunk(dataType);
   }
+
+  @Override
+  public IMemTable copy() {
+    Map<String, Map<String, IWritableMemChunk>> newMap = new HashMap<>(getMemTableMap());
+
+    return new PrimitiveMemTable(newMap);
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index b51e899..e60f3bb 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -119,4 +119,9 @@ public class WritableMemChunk implements IWritableMemChunk {
     return list.size();
   }
 
+  @Override
+  public TSDataType getType() {
+    return dataType;
+  }
+
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java
new file mode 100644
index 0000000..340fc2f
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Deletion.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.modification;
+
+import java.util.Objects;
+
+/**
+ * Deletion is a delete operation on a timeseries.
+ */
+public class Deletion extends Modification {
+
+  /**
+   * data whose timestamp <= this field are to be deleted.
+   */
+  private long timestamp;
+
+  public Deletion(String path, long versionNum, long timestamp) {
+    super(Type.DELETION, path, versionNum);
+    this.timestamp = timestamp;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public void setTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (!(obj instanceof Deletion)) {
+      return false;
+    }
+    Deletion del = (Deletion) obj;
+    return super.equals(obj) && del.timestamp == this.timestamp;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(super.hashCode(), timestamp);
+  }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Modification.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Modification.java
new file mode 100644
index 0000000..fbdf790
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/Modification.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.modification;
+
+import java.util.Objects;
+
+/**
+ * Modification represents an UPDATE or DELETE operation on a certain timeseries.
+ */
+public abstract class Modification {
+
+  protected Type type;
+  protected String path;
+  protected long versionNum;
+
+  Modification(Type type, String path, long versionNum) {
+    this.type = type;
+    this.path = path;
+    this.versionNum = versionNum;
+  }
+
+  public String getPath() {
+    return path;
+  }
+
+  public void setPath(String path) {
+    this.path = path;
+  }
+
+  public long getVersionNum() {
+    return versionNum;
+  }
+
+  public void setVersionNum(long versionNum) {
+    this.versionNum = versionNum;
+  }
+
+  public Type getType() {
+    return type;
+  }
+
+  public void setType(Type type) {
+    this.type = type;
+  }
+
+  public enum Type {
+    DELETION
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+       return true;
+    }
+    if (!(obj instanceof Modification)) {
+      return false;
+    }
+    Modification mod = (Modification) obj;
+    return mod.type.equals(this.type) && mod.path.equals(this.path)
+            && mod.versionNum == this.versionNum;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(type, path, versionNum);
+  }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
new file mode 100644
index 0000000..9727b2b
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.modification;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.iotdb.db.engine.modification.io.LocalTextModificationAccessor;
+import org.apache.iotdb.db.engine.modification.io.ModificationReader;
+import org.apache.iotdb.db.engine.modification.io.ModificationWriter;
+
+/**
+ * ModificationFile stores the Modifications of a TsFile or unseq file in another file in the same
+ * directory. Methods in this class are highly synchronized for concurrency safety.
+ */
+public class ModificationFile {
+
+  public static final String FILE_SUFFIX = ".mods";
+
+  private List<Modification> modifications;
+  private ModificationWriter writer;
+  private ModificationReader reader;
+  private String filePath;
+
+  /**
+   * Construct a ModificationFile using a file as its storage.
+   *
+   * @param filePath the path of the storage file.
+   */
+  public ModificationFile(String filePath) {
+    LocalTextModificationAccessor accessor = new LocalTextModificationAccessor(filePath);
+    this.writer = accessor;
+    this.reader = accessor;
+    this.filePath = filePath;
+  }
+
+  private void init() throws IOException {
+    synchronized (this) {
+      modifications = (List<Modification>) reader.read();
+    }
+  }
+
+  private void checkInit() throws IOException {
+    if (modifications == null) {
+      init();
+    }
+  }
+
+  /**
+   * Release resources such as streams and caches.
+   */
+  public void close() throws IOException {
+    synchronized (this) {
+      writer.close();
+      modifications = null;
+    }
+  }
+
+  public void abort() throws IOException {
+    synchronized (this) {
+      if (modifications.size() > 0) {
+        writer.abort();
+        modifications.remove(modifications.size() - 1);
+      }
+    }
+  }
+
+  /**
+   * Write a modification in this file. The modification will first be written to the persistent
+   * store then the memory cache.
+   *
+   * @param mod the modification to be written.
+   * @throws IOException if IOException is thrown when writing the modification to the store.
+   */
+  public void write(Modification mod) throws IOException {
+    synchronized (this) {
+      checkInit();
+      writer.write(mod);
+      modifications.add(mod);
+    }
+  }
+
+  /**
+   * Get all modifications stored in this file.
+   *
+   * @return an ArrayList of modifications.
+   */
+  public Collection<Modification> getModifications() throws IOException {
+    synchronized (this) {
+      checkInit();
+      return new ArrayList<>(modifications);
+    }
+  }
+
+  public String getFilePath() {
+    return filePath;
+  }
+
+  public void setFilePath(String filePath) {
+    this.filePath = filePath;
+  }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
new file mode 100644
index 0000000..249b18a
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.modification.io;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * LocalTextModificationAccessor uses a file on local file system to store the modifications
+ * in text format, and writes modifications by appending to the tail of the file.
+ */
+public class LocalTextModificationAccessor implements ModificationReader, ModificationWriter {
+
+  private static final Logger logger = LoggerFactory.getLogger(LocalTextModificationAccessor.class);
+  private static final String SEPARATOR = ",";
+  private static final String ABORT_MARK = "aborted";
+
+  private String filePath;
+  private BufferedWriter writer;
+
+  /**
+   * Construct a LocalTextModificationAccessor using a file specified by filePath.
+   *
+   * @param filePath the path of the file that is used for storing modifications.
+   */
+  public LocalTextModificationAccessor(String filePath) {
+    this.filePath = filePath;
+  }
+  @Override
+  public Collection<Modification> read() throws IOException {
+    BufferedReader reader;
+    try {
+      reader = new BufferedReader(new FileReader(filePath));
+    } catch (FileNotFoundException e) {
+      return new ArrayList<>();
+    }
+    String line;
+
+    List<Modification> modificationList = new ArrayList<>();
+    try {
+      while ((line = reader.readLine()) != null) {
+        if (line.equals(ABORT_MARK) && modificationList.size() > 0) {
+          modificationList.remove(modificationList.size() - 1);
+        } else {
+          modificationList.add(decodeModification(line));
+        }
+      }
+    } catch (IOException e) {
+      logger.error("An error occurred when reading modifications, and the remaining modifications "
+              + "were ignored.", e);
+    } finally {
+      reader.close();
+    }
+    return modificationList;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (writer != null) {
+      writer.close();
+    }
+  }
+
+  @Override
+  public void abort() throws IOException {
+    if (writer == null) {
+      writer = new BufferedWriter(new FileWriter(filePath, true));
+    }
+    writer.write(ABORT_MARK);
+    writer.newLine();
+    writer.flush();
+  }
+
+  @Override
+  public void write(Modification mod) throws IOException {
+    if (writer == null) {
+      writer = new BufferedWriter(new FileWriter(filePath, true));
+    }
+    writer.write(encodeModification(mod));
+    writer.newLine();
+    writer.flush();
+  }
+
+  private static String encodeModification(Modification mod) {
+    if (mod instanceof Deletion)
+      return encodeDeletion((Deletion) mod);
+    return null;
+  }
+
+  private static Modification decodeModification(String src) throws IOException {
+    String[] fields = src.split(SEPARATOR);
+    if (Modification.Type.DELETION.name().equals(fields[0])) {
+      return decodeDeletion(fields);
+    }
+    throw new IOException("Unknown modification type: " + fields[0]);
+  }
+
+  private static String encodeDeletion(Deletion del) {
+    StringBuilder stringBuilder = new StringBuilder();
+    stringBuilder.append(del.getType().toString()).append(SEPARATOR).append(del.getPath())
+            .append(SEPARATOR).append(del.getVersionNum()).append(SEPARATOR)
+            .append(del.getTimestamp());
+    return stringBuilder.toString();
+  }
+
+  private static Deletion decodeDeletion(String[] fields) throws IOException {
+    if (fields.length != 4) {
+      throw new IOException("Incorrect deletion fields number: " + fields.length);
+    }
+
+    String path = fields[1];
+    long versionNum, timestamp;
+    try {
+      versionNum = Long.parseLong(fields[2]);
+    } catch (NumberFormatException e) {
+      throw new IOException("Invalide version number: " + fields[2]);
+    }
+    try {
+      timestamp = Long.parseLong(fields[3]);
+    } catch (NumberFormatException e) {
+      throw new IOException("Invalide timestamp: " + fields[3]);
+    }
+
+    return new Deletion(path, versionNum, timestamp);
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationReader.java
similarity index 59%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
copy to iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationReader.java
index 7c63be6..1abfadd 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationReader.java
@@ -16,29 +16,28 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.tsfile.read.common;
 
-import java.nio.ByteBuffer;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+package org.apache.iotdb.db.engine.modification.io;
 
-/**
- * used in query.
- */
-public class Chunk {
+import java.io.IOException;
+import java.util.Collection;
 
-  private ChunkHeader chunkHeader;
-  private ByteBuffer chunkData;
+import org.apache.iotdb.db.engine.modification.Modification;
 
-  public Chunk(ChunkHeader header, ByteBuffer buffer) {
-    this.chunkHeader = header;
-    this.chunkData = buffer;
-  }
+/**
+ * ModificationReader reads all modifications from a persistent medium like file system.
+ */
+public interface ModificationReader {
 
-  public ChunkHeader getHeader() {
-    return chunkHeader;
-  }
+  /**
+   * Read all modifications from a persistent medium.
+   *
+   * @return a list of modifications contained the medium.
+   */
+  Collection<Modification> read() throws IOException;
 
-  public ByteBuffer getData() {
-    return chunkData;
-  }
+  /**
+   * Release resources like streams.
+   */
+  void close() throws IOException;
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java
similarity index 57%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
copy to iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java
index 7c63be6..26a9208 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java
@@ -16,29 +16,32 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.tsfile.read.common;
 
-import java.nio.ByteBuffer;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+package org.apache.iotdb.db.engine.modification.io;
+
+import java.io.IOException;
+
+import org.apache.iotdb.db.engine.modification.Modification;
 
 /**
- * used in query.
+ * ModificationWriter provides methods for writing a modification to a persistent medium like file
+ * system.
  */
-public class Chunk {
-
-  private ChunkHeader chunkHeader;
-  private ByteBuffer chunkData;
+public interface ModificationWriter {
 
-  public Chunk(ChunkHeader header, ByteBuffer buffer) {
-    this.chunkHeader = header;
-    this.chunkData = buffer;
-  }
+  /**
+   * Write a new modification to the persistent medium.
+   * @param mod the modification to be written.
+   */
+  void write(Modification mod) throws IOException;
 
-  public ChunkHeader getHeader() {
-    return chunkHeader;
-  }
+  /**
+   * Release resources like streams.
+   */
+  void close() throws IOException;
 
-  public ByteBuffer getData() {
-    return chunkData;
-  }
+  /**
+   * Abort last modification.
+   */
+  void abort() throws IOException;
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/LogReplayer.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/package-info.java
similarity index 76%
copy from iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/LogReplayer.java
copy to iotdb/src/main/java/org/apache/iotdb/db/engine/modification/package-info.java
index 1510f00..e2fdfa8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/LogReplayer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/package-info.java
@@ -16,12 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.writelog.replay;
 
-import org.apache.iotdb.db.exception.ProcessorException;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-
-public interface LogReplayer {
-
-  void replay(PhysicalPlan plan) throws ProcessorException;
-}
+/**
+ * modification is the functional module responsible for processing UPDATE and DELETE.
+ */
+package org.apache.iotdb.db.engine.modification;
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index f1e16c2..01801d9 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -41,13 +41,16 @@ import org.apache.iotdb.db.engine.filenode.FileNodeManager;
 import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
 import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
 import org.apache.iotdb.db.engine.memtable.TimeValuePairSorter;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.pool.FlushManager;
 import org.apache.iotdb.db.engine.querycontext.MergeSeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
 import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.utils.FlushStatus;
+import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.OverflowProcessorException;
+import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
 import org.apache.iotdb.db.writelog.node.WriteLogNode;
@@ -79,26 +82,28 @@ public class OverflowProcessor extends Processor {
   private int valueCount;
   private String parentPath;
   private long lastFlushTime = -1;
-  private AtomicLong dataPahtCount = new AtomicLong();
+  private AtomicLong dataPathCount = new AtomicLong();
   private ReentrantLock queryFlushLock = new ReentrantLock();
 
-  private Action overflowFlushAction = null;
-  private Action filenodeFlushAction = null;
+  private Action overflowFlushAction;
+  private Action filenodeFlushAction;
   private FileSchema fileSchema;
 
   private long memThreshold = TSFileConfig.groupSizeInByte;
   private AtomicLong memSize = new AtomicLong();
 
   private WriteLogNode logNode;
+  private VersionController versionController;
 
   public OverflowProcessor(String processorName, Map<String, Action> parameters,
-                           FileSchema fileSchema)
-          throws IOException {
+      FileSchema fileSchema, VersionController versionController)
+      throws IOException {
     super(processorName);
     this.fileSchema = fileSchema;
+    this.versionController = versionController;
     String overflowDirPath = TsFileDBConf.overflowDataDir;
     if (overflowDirPath.length() > 0
-            && overflowDirPath.charAt(overflowDirPath.length() - 1) != File.separatorChar) {
+        && overflowDirPath.charAt(overflowDirPath.length() - 1) != File.separatorChar) {
       overflowDirPath = overflowDirPath + File.separatorChar;
     }
     this.parentPath = overflowDirPath + processorName;
@@ -112,13 +117,13 @@ public class OverflowProcessor extends Processor {
     workSupport = new OverflowSupport();
     overflowFlushAction = parameters.get(FileNodeConstants.OVERFLOW_FLUSH_ACTION);
     filenodeFlushAction = parameters
-            .get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
+        .get(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION);
 
     if (IoTDBDescriptor.getInstance().getConfig().enableWal) {
       logNode = MultiFileLogNodeManager.getInstance().getNode(
-              processorName + IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX,
-              getOverflowRestoreFile(),
-              FileNodeManager.getInstance().getRestoreFilePath(processorName));
+          processorName + IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX,
+          getOverflowRestoreFile(),
+          FileNodeManager.getInstance().getRestoreFilePath(processorName));
     }
   }
 
@@ -126,12 +131,11 @@ public class OverflowProcessor extends Processor {
     String[] subFilePaths = clearFile(parentFile.list());
     if (subFilePaths.length == 0) {
       workResource = new OverflowResource(parentPath,
-              String.valueOf(dataPahtCount.getAndIncrement()));
-      return;
+          String.valueOf(dataPathCount.getAndIncrement()), versionController);
     } else if (subFilePaths.length == 1) {
       long count = Long.parseLong(subFilePaths[0]);
-      dataPahtCount.addAndGet(count + 1);
-      workResource = new OverflowResource(parentPath, String.valueOf(count));
+      dataPathCount.addAndGet(count + 1);
+      workResource = new OverflowResource(parentPath, String.valueOf(count), versionController);
       LOGGER.info("The overflow processor {} recover from work status.", getProcessorName());
     } else {
       long count1 = Long.parseLong(subFilePaths[0]);
@@ -141,10 +145,10 @@ public class OverflowProcessor extends Processor {
         count1 = count2;
         count2 = temp;
       }
-      dataPahtCount.addAndGet(count2 + 1);
+      dataPathCount.addAndGet(count2 + 1);
       // work dir > merge dir
-      workResource = new OverflowResource(parentPath, String.valueOf(count2));
-      mergeResource = new OverflowResource(parentPath, String.valueOf(count1));
+      workResource = new OverflowResource(parentPath, String.valueOf(count2), versionController);
+      mergeResource = new OverflowResource(parentPath, String.valueOf(count1), versionController);
       LOGGER.info("The overflow processor {} recover from merge status.", getProcessorName());
     }
   }
@@ -166,9 +170,6 @@ public class OverflowProcessor extends Processor {
 
   /**
    * insert one time-series record
-   *
-   * @param tsRecord
-   * @throws IOException
    */
   public void insert(TSRecord tsRecord) throws IOException {
     // memory control
@@ -192,8 +193,7 @@ public class OverflowProcessor extends Processor {
    */
   @Deprecated
   public void update(String deviceId, String measurementId, long startTime, long endTime,
-                     TSDataType type,
-                     byte[] value) {
+      TSDataType type, byte[] value) {
     workSupport.update(deviceId, measurementId, startTime, endTime, type, value);
     valueCount++;
   }
@@ -203,10 +203,9 @@ public class OverflowProcessor extends Processor {
    */
   @Deprecated
   public void update(String deviceId, String measurementId, long startTime, long endTime,
-                     TSDataType type,
-                     String value) {
+      TSDataType type, String value) {
     workSupport.update(deviceId, measurementId, startTime, endTime, type,
-            convertStringToBytes(type, value));
+        convertStringToBytes(type, value));
     valueCount++;
   }
 
@@ -231,149 +230,144 @@ public class OverflowProcessor extends Processor {
   }
 
   /**
-   * @deprecated this method need re-implemented.
+   * Delete data of a timeseries whose time ranges from 0 to timestamp.
+   *
+   * @param deviceId the deviceId of the timeseries.
+   * @param measurementId the measurementId of the timeseries.
+   * @param timestamp the upper-bound of deletion time.
+   * @param version the version number of this deletion.
+   * @param updatedModFiles add successfully updated Modification files to the list, and abort them
+   * when exception is raised
    */
-  @Deprecated
-  public void delete(String deviceId, String measurementId, long timestamp, TSDataType type) {
-    workSupport.delete(deviceId, measurementId, timestamp, type);
-    valueCount++;
+  public void delete(String deviceId, String measurementId, long timestamp, long version,
+      List<ModificationFile> updatedModFiles) throws IOException {
+    workResource.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
+    workSupport.delete(deviceId, measurementId, timestamp, false);
+    if (flushStatus.isFlushing()) {
+      mergeResource.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
+      flushSupport.delete(deviceId, measurementId, timestamp, true);
+    }
   }
 
   /**
-   * query all overflow data which contain insert data in memory, insert data in file,
-   * update/delete data in memory, update/delete data in file.
+   * query all overflow data which contain insert data in memory, insert data in file, update/delete
+   * data in memory, update/delete data in file.
    *
-   * @param deviceId
-   * @param measurementId
-   * @param dataType
    * @return OverflowSeriesDataSource
-   * @throws IOException
    */
   public OverflowSeriesDataSource query(String deviceId, String measurementId,
-                                        TSDataType dataType)
-          throws IOException {
+      TSDataType dataType, QueryContext context)
+      throws IOException {
     queryFlushLock.lock();
     try {
       // query insert data in memory and unseqTsFiles
       // memory
       TimeValuePairSorter insertInMem = queryOverflowInsertInMemory(deviceId, measurementId,
-              dataType);
+          dataType);
       List<OverflowInsertFile> overflowInsertFileList = new ArrayList<>();
       // work file
       Pair<String, List<ChunkMetaData>> insertInDiskWork = queryWorkDataInOverflowInsert(deviceId,
-              measurementId,
-              dataType);
+          measurementId,
+          dataType, context);
       if (insertInDiskWork.left != null) {
         overflowInsertFileList
-                .add(0, new OverflowInsertFile(insertInDiskWork.left,
-                        insertInDiskWork.right));
+            .add(0, new OverflowInsertFile(insertInDiskWork.left,
+                insertInDiskWork.right));
       }
       // merge file
       Pair<String, List<ChunkMetaData>> insertInDiskMerge = queryMergeDataInOverflowInsert(deviceId,
-              measurementId, dataType);
+          measurementId, dataType, context);
       if (insertInDiskMerge.left != null) {
         overflowInsertFileList
-                .add(0, new OverflowInsertFile(insertInDiskMerge.left
-                        , insertInDiskMerge.right));
+            .add(0, new OverflowInsertFile(insertInDiskMerge.left
+                , insertInDiskMerge.right));
       }
       // work file
       return new OverflowSeriesDataSource(new Path(deviceId + "." + measurementId), dataType,
-              overflowInsertFileList, insertInMem);
+          overflowInsertFileList, insertInMem);
     } finally {
       queryFlushLock.unlock();
     }
   }
 
   /**
-   * query insert data in memory table. while flushing, merge the work memory table
-   * with flush memory table.
+   * query insert data in memory table. while flushing, merge the work memory table with flush
+   * memory table.
    *
-   * @param deviceId
-   * @param measurementId
-   * @param dataType
    * @return insert data in SeriesChunkInMemTable
    */
   private TimeValuePairSorter queryOverflowInsertInMemory(String deviceId, String measurementId,
-                                                          TSDataType dataType) {
+      TSDataType dataType) {
 
     MemSeriesLazyMerger memSeriesLazyMerger = new MemSeriesLazyMerger();
     if (flushStatus.isFlushing()) {
       memSeriesLazyMerger
-              .addMemSeries(
-                      flushSupport.queryOverflowInsertInMemory(deviceId, measurementId, dataType));
+          .addMemSeries(
+              flushSupport.queryOverflowInsertInMemory(deviceId, measurementId, dataType));
     }
     memSeriesLazyMerger
-            .addMemSeries(workSupport.queryOverflowInsertInMemory(deviceId, measurementId,
-                    dataType));
+        .addMemSeries(workSupport.queryOverflowInsertInMemory(deviceId, measurementId,
+            dataType));
     return new ReadOnlyMemChunk(dataType, memSeriesLazyMerger);
   }
 
   /**
    * Get the insert data which is WORK in unseqTsFile.
    *
-   * @param deviceId
-   * @param measurementId
-   * @param dataType
+   * @param deviceId deviceId of the target time-series
+   * @param measurementId measurementId of the target time-series
+   * @param dataType data type of the target time-series
    * @return the seriesPath of unseqTsFile, List of TimeSeriesChunkMetaData for the special
    * time-series.
    */
   private Pair<String, List<ChunkMetaData>> queryWorkDataInOverflowInsert(String deviceId,
-                                                                          String measurementId,
-                                                                          TSDataType dataType) {
+      String measurementId, TSDataType dataType, QueryContext context) {
     return new Pair<>(
-            workResource.getInsertFilePath(),
-            workResource.getInsertMetadatas(deviceId, measurementId, dataType));
+        workResource.getInsertFilePath(),
+        workResource.getInsertMetadatas(deviceId, measurementId, dataType, context));
   }
 
   /**
    * Get the all merge data in unseqTsFile and overflowFile.
    *
-   * @param deviceId
-   * @param measurementId
-   * @param dataType
    * @return MergeSeriesDataSource
    */
   public MergeSeriesDataSource queryMerge(String deviceId, String measurementId,
-                                          TSDataType dataType) {
+      TSDataType dataType, QueryContext context) {
     Pair<String, List<ChunkMetaData>> mergeInsert = queryMergeDataInOverflowInsert(deviceId,
-            measurementId,
-            dataType);
+        measurementId,
+        dataType, context);
     return new MergeSeriesDataSource(new OverflowInsertFile(mergeInsert.left, mergeInsert.right));
   }
 
   public OverflowSeriesDataSource queryMerge(String deviceId, String measurementId,
-                                             TSDataType dataType,
-                                             boolean isMerge) {
+      TSDataType dataType, boolean isMerge, QueryContext context) {
     Pair<String, List<ChunkMetaData>> mergeInsert = queryMergeDataInOverflowInsert(deviceId,
-            measurementId,
-            dataType);
+        measurementId,
+        dataType, context);
     OverflowSeriesDataSource overflowSeriesDataSource = new OverflowSeriesDataSource(
-            new Path(deviceId + "." + measurementId));
+        new Path(deviceId + "." + measurementId));
     overflowSeriesDataSource.setReadableMemChunk(null);
     overflowSeriesDataSource
-            .setOverflowInsertFileList(
-                    Arrays.asList(new OverflowInsertFile(mergeInsert.left, mergeInsert.right)));
+        .setOverflowInsertFileList(
+            Arrays.asList(new OverflowInsertFile(mergeInsert.left, mergeInsert.right)));
     return overflowSeriesDataSource;
   }
 
   /**
    * Get the insert data which is MERGE in unseqTsFile
    *
-   * @param deviceId
-   * @param measurementId
-   * @param dataType
-   * @return the seriesPath of unseqTsFile, List of TimeSeriesChunkMetaData for
-   * the special time-series.
-   */
+   * @return the seriesPath of unseqTsFile, List of TimeSeriesChunkMetaData for the special
+   * time-series.
+   **/
   private Pair<String, List<ChunkMetaData>> queryMergeDataInOverflowInsert(String deviceId,
-                                                                           String measurementId,
-                                                                           TSDataType dataType) {
+      String measurementId, TSDataType dataType, QueryContext context) {
     if (!isMerge) {
       return new Pair<>(null, null);
     }
     return new Pair<>(
         mergeResource.getInsertFilePath(),
-        mergeResource.getInsertMetadatas(deviceId, measurementId, dataType));
+        mergeResource.getInsertMetadatas(deviceId, measurementId, dataType, context));
   }
 
   private void switchWorkToFlush() {
@@ -401,7 +395,7 @@ public class OverflowProcessor extends Processor {
     if (mergeResource == null) {
       mergeResource = workResource;
       workResource = new OverflowResource(parentPath,
-              String.valueOf(dataPahtCount.getAndIncrement()));
+          String.valueOf(dataPathCount.getAndIncrement()), versionController);
     }
     isMerge = true;
     LOGGER.info("The overflow processor {} switch from WORK to MERGE", getProcessorName());
@@ -431,8 +425,8 @@ public class OverflowProcessor extends Processor {
     long flushStartTime = System.currentTimeMillis();
     try {
       LOGGER
-              .info("The overflow processor {} starts flushing {}.", getProcessorName(),
-                      flushFunction);
+          .info("The overflow processor {} starts flushing {}.", getProcessorName(),
+              flushFunction);
       // flush data
       workResource
           .flush(fileSchema, flushSupport.getMemTabale(),
@@ -444,10 +438,10 @@ public class OverflowProcessor extends Processor {
       }
     } catch (IOException e) {
       LOGGER.error("Flush overflow processor {} rowgroup to file error in {}. Thread {} exits.",
-              getProcessorName(), flushFunction, Thread.currentThread().getName(), e);
+          getProcessorName(), flushFunction, Thread.currentThread().getName(), e);
     } catch (Exception e) {
       LOGGER.error("FilenodeFlushAction action failed. Thread {} exits.",
-              Thread.currentThread().getName(), e);
+          Thread.currentThread().getName(), e);
     } finally {
       synchronized (flushStatus) {
         flushStatus.setUnFlushing();
@@ -461,13 +455,13 @@ public class OverflowProcessor extends Processor {
     long flushEndTime = System.currentTimeMillis();
     long timeInterval = flushEndTime - flushStartTime;
     ZonedDateTime startDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(flushStartTime),
-            IoTDBDescriptor.getInstance().getConfig().getZoneID());
+        IoTDBDescriptor.getInstance().getConfig().getZoneID());
     ZonedDateTime endDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(flushEndTime),
-            IoTDBDescriptor.getInstance().getConfig().getZoneID());
+        IoTDBDescriptor.getInstance().getConfig().getZoneID());
     LOGGER.info(
-            "The overflow processor {} flush {}, start time is {}, flush end time is {}," +
-                    " time consumption is {}ms",
-            getProcessorName(), flushFunction, startDateTime, endDateTime, timeInterval);
+        "The overflow processor {} flush {}, start time is {}, flush end time is {}," +
+            " time consumption is {}ms",
+        getProcessorName(), flushFunction, startDateTime, endDateTime, timeInterval);
   }
 
   private Future<?> flush(boolean synchronization) throws OverflowProcessorException {
@@ -475,14 +469,14 @@ public class OverflowProcessor extends Processor {
     if (lastFlushTime > 0) {
       long thisFLushTime = System.currentTimeMillis();
       ZonedDateTime lastDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(lastFlushTime),
-              IoTDBDescriptor.getInstance().getConfig().getZoneID());
+          IoTDBDescriptor.getInstance().getConfig().getZoneID());
       ZonedDateTime thisDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(thisFLushTime),
-              IoTDBDescriptor.getInstance().getConfig().getZoneID());
+          IoTDBDescriptor.getInstance().getConfig().getZoneID());
       LOGGER.info(
-              "The overflow processor {} last flush time is {}, this flush time is {},"
-                      + " flush time interval is {}s",
-              getProcessorName(), lastDateTime, thisDateTime,
-              (thisFLushTime - lastFlushTime) / 1000);
+          "The overflow processor {} last flush time is {}, this flush time is {},"
+              + " flush time interval is {}s",
+          getProcessorName(), lastDateTime, thisDateTime,
+          (thisFLushTime - lastFlushTime) / 1000);
     }
     lastFlushTime = System.currentTimeMillis();
     // value count
@@ -551,15 +545,13 @@ public class OverflowProcessor extends Processor {
     LOGGER.info("The overflow processor {} ends close operation.", getProcessorName());
     // log close time
     long closeEndTime = System.currentTimeMillis();
-    long timeInterval = closeEndTime - closeStartTime;
-    ZonedDateTime startDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(closeStartTime),
-            IoTDBDescriptor.getInstance().getConfig().getZoneID());
-    ZonedDateTime endDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(closeStartTime),
-            IoTDBDescriptor.getInstance().getConfig().getZoneID());
     LOGGER.info(
-            "The close operation of overflow processor {} starts at {} and ends at {}."
-                    + " It comsumes {}ms.",
-            getProcessorName(), startDateTime, endDateTime, timeInterval);
+        "The close operation of overflow processor {} starts at {} and ends at {}."
+            + " It comsumes {}ms.",
+        getProcessorName(), ZonedDateTime.ofInstant(Instant.ofEpochMilli(closeStartTime),
+            IoTDBDescriptor.getInstance().getConfig().getZoneID()),
+        ZonedDateTime.ofInstant(Instant.ofEpochMilli(closeStartTime),
+            IoTDBDescriptor.getInstance().getConfig().getZoneID()), closeEndTime - closeStartTime);
   }
 
   public void clear() throws IOException {
@@ -598,21 +590,12 @@ public class OverflowProcessor extends Processor {
    * @return The size of overflow file corresponding to this processor.
    */
   public long getFileSize() {
-    return workResource.getInsertFile().length() + workResource.getUpdateDeleteFile().length()
-            + memoryUsage();
-  }
-
-  /**
-   * Close current OverflowFile and open a new one for future writes. Block new writes and
-   * wait until current writes finish.
-   */
-  private void rollToNewFile() {
-    // TODO : [MemControl] implement this
+    return workResource.getInsertFile().length() + memoryUsage();
   }
 
   /**
-   * Check whether current overflow file contains too many metadata or size of current overflow
-   * file is too large If true, close current file and open a new one.
+   * Check whether current overflow file contains too many metadata or size of current overflow file
+   * is too large If true, close current file and open a new one.
    */
   private boolean checkSize() {
     IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -631,7 +614,6 @@ public class OverflowProcessor extends Processor {
           MemUtils.bytesCntToStr(config.overflowMetaSizeThreshold),
           MemUtils.bytesCntToStr(metaSize),
           MemUtils.bytesCntToStr(config.overflowMetaSizeThreshold));
-      rollToNewFile();
       return true;
     } else {
       return false;
@@ -648,34 +630,40 @@ public class OverflowProcessor extends Processor {
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-    if (!super.equals(o)) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
     OverflowProcessor that = (OverflowProcessor) o;
     return isMerge == that.isMerge &&
-            valueCount == that.valueCount &&
-            lastFlushTime == that.lastFlushTime &&
-            memThreshold == that.memThreshold &&
-            Objects.equals(workResource, that.workResource) &&
-            Objects.equals(mergeResource, that.mergeResource) &&
-            Objects.equals(workSupport, that.workSupport) &&
-            Objects.equals(flushSupport, that.flushSupport) &&
-            Objects.equals(flushStatus, that.flushStatus) &&
-            Objects.equals(parentPath, that.parentPath) &&
-            Objects.equals(dataPahtCount, that.dataPahtCount) &&
-            Objects.equals(queryFlushLock, that.queryFlushLock) &&
-            Objects.equals(overflowFlushAction, that.overflowFlushAction) &&
-            Objects.equals(filenodeFlushAction, that.filenodeFlushAction) &&
-            Objects.equals(fileSchema, that.fileSchema) &&
-            Objects.equals(memSize, that.memSize) &&
-            Objects.equals(logNode, that.logNode);
+        valueCount == that.valueCount &&
+        lastFlushTime == that.lastFlushTime &&
+        memThreshold == that.memThreshold &&
+        Objects.equals(workResource, that.workResource) &&
+        Objects.equals(mergeResource, that.mergeResource) &&
+        Objects.equals(workSupport, that.workSupport) &&
+        Objects.equals(flushSupport, that.flushSupport) &&
+        Objects.equals(flushStatus, that.flushStatus) &&
+        Objects.equals(parentPath, that.parentPath) &&
+        Objects.equals(dataPathCount, that.dataPathCount) &&
+        Objects.equals(queryFlushLock, that.queryFlushLock) &&
+        Objects.equals(overflowFlushAction, that.overflowFlushAction) &&
+        Objects.equals(filenodeFlushAction, that.filenodeFlushAction) &&
+        Objects.equals(fileSchema, that.fileSchema) &&
+        Objects.equals(memSize, that.memSize) &&
+        Objects.equals(logNode, that.logNode);
   }
 
   @Override
   public int hashCode() {
     return Objects.hash(super.hashCode(), workResource, mergeResource, workSupport,
-            flushSupport, flushStatus, isMerge, valueCount, parentPath, lastFlushTime,
-            dataPahtCount, queryFlushLock, overflowFlushAction, filenodeFlushAction, fileSchema,
-            memThreshold, memSize, logNode);
+        flushSupport, flushStatus, isMerge, valueCount, parentPath, lastFlushTime,
+        dataPathCount, queryFlushLock, overflowFlushAction, filenodeFlushAction, fileSchema,
+        memThreshold, memSize, logNode);
   }
 }
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
index fce5ef1..adf384d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowResource.java
@@ -29,9 +29,16 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.MemTableFlushUtil;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.version.VersionController;
+import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.utils.MemUtils;
+import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
@@ -39,15 +46,17 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.BytesUtils;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.FileSchema;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class OverflowResource {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(OverflowResource.class);
+
   private static final String INSERT_FILE_NAME = "unseqTsFile";
-  private static final String UPDATE_DELETE_FILE_NAME = "overflowFile";
   private static final String POSITION_FILE_NAME = "positionFile";
+
   private static final int FOOTER_LENGTH = 4;
   private static final int POS_LENGTH = 8;
   private String parentPath;
@@ -55,12 +64,14 @@ public class OverflowResource {
   private String insertFilePath;
   private String positionFilePath;
   private File insertFile;
-  private File updateFile;
   private OverflowIO insertIO;
   private Map<String, Map<String, List<ChunkMetaData>>> insertMetadatas;
   private List<ChunkGroupMetaData> appendInsertMetadatas;
+  private VersionController versionController;
+  private ModificationFile modificationFile;
 
-  public OverflowResource(String parentPath, String dataPath) throws IOException {
+  public OverflowResource(String parentPath, String dataPath, VersionController versionController)
+      throws IOException {
     this.insertMetadatas = new HashMap<>();
     this.appendInsertMetadatas = new ArrayList<>();
     this.parentPath = parentPath;
@@ -71,8 +82,8 @@ public class OverflowResource {
     }
     insertFile = new File(dataFile, INSERT_FILE_NAME);
     insertFilePath = insertFile.getPath();
-    updateFile = new File(dataFile, UPDATE_DELETE_FILE_NAME);
     positionFilePath = new File(dataFile, POSITION_FILE_NAME).getPath();
+
     Pair<Long, Long> position = readPositionInfo();
     try {
       // insert stream
@@ -90,6 +101,8 @@ public class OverflowResource {
       LOGGER.error("Failed to construct the OverflowIO.", e);
       throw e;
     }
+    this.versionController = versionController;
+    modificationFile = new ModificationFile(insertFilePath + ModificationFile.FILE_SUFFIX);
   }
 
   private Pair<Long, Long> readPositionInfo() {
@@ -108,9 +121,6 @@ public class OverflowResource {
       if (insertTempFile.exists()) {
         left = insertTempFile.length();
       }
-      if (updateFile.exists()) {
-        right = updateFile.length();
-      }
       return new Pair<>(left, right);
     }
   }
@@ -129,7 +139,7 @@ public class OverflowResource {
     // read insert meta-data
     insertIO.toTail();
     long position = insertIO.getPos();
-    while (position != insertIO.magicStringBytes.length) {
+    while (position != TsFileIOWriter.magicStringBytes.length) {
       insertIO.getReader().position(position - FOOTER_LENGTH);
       int metadataLength = insertIO.getReader().readInt();
       byte[] buf = new byte[metadataLength];
@@ -147,6 +157,7 @@ public class OverflowResource {
           insertMetadatas.put(deviceId, new HashMap<>());
         }
         for (ChunkMetaData chunkMetaData : rowGroupMetaData.getChunkMetaDataList()) {
+          chunkMetaData.setVersion(rowGroupMetaData.getVersion());
           String measurementId = chunkMetaData.getMeasurementUid();
           if (!insertMetadatas.get(deviceId).containsKey(measurementId)) {
             insertMetadatas.get(deviceId).put(measurementId, new ArrayList<>());
@@ -158,7 +169,7 @@ public class OverflowResource {
   }
 
   public List<ChunkMetaData> getInsertMetadatas(String deviceId, String measurementId,
-                                                TSDataType dataType) {
+      TSDataType dataType, QueryContext context) {
     List<ChunkMetaData> chunkMetaDatas = new ArrayList<>();
     if (insertMetadatas.containsKey(deviceId) && insertMetadatas.get(deviceId)
             .containsKey(measurementId)) {
@@ -169,6 +180,14 @@ public class OverflowResource {
         }
       }
     }
+    try {
+      List<Modification> modifications = context.getPathModifications(modificationFile,
+          deviceId + IoTDBConstant.PATH_SEPARATOR + measurementId);
+      QueryUtils.modifyChunkMetaData(chunkMetaDatas, modifications);
+    } catch (IOException e) {
+      LOGGER.error("Cannot access the modification file of Overflow {}, because:", parentPath,
+          e);
+    }
     return chunkMetaDatas;
   }
 
@@ -196,7 +215,8 @@ public class OverflowResource {
     if (memTable != null && !memTable.isEmpty()) {
       insertIO.toTail();
       long lastPosition = insertIO.getPos();
-      MemTableFlushUtil.flushMemTable(fileSchema, insertIO, memTable);
+      MemTableFlushUtil.flushMemTable(fileSchema, insertIO, memTable,
+          versionController.nextVersion());
       List<ChunkGroupMetaData> rowGroupMetaDatas = insertIO.getChunkGroupMetaDatas();
       appendInsertMetadatas.addAll(rowGroupMetaDatas);
       if (!rowGroupMetaDatas.isEmpty()) {
@@ -237,13 +257,10 @@ public class OverflowResource {
     return positionFilePath;
   }
 
-  public File getUpdateDeleteFile() {
-    return updateFile;
-  }
-
   public void close() throws IOException {
     insertMetadatas.clear();
     insertIO.close();
+    modificationFile.close();
   }
 
   public void deleteResource() throws IOException {
@@ -254,7 +271,11 @@ public class OverflowResource {
     File file = new File(dir);
     if (file.exists()) {
       if (file.isDirectory()) {
-        for (File subFile : file.listFiles()) {
+        File[] files = file.listFiles();
+        if (files == null) {
+          return;
+        }
+        for (File subFile : files) {
           cleanDir(subFile.getAbsolutePath());
         }
       }
@@ -274,4 +295,21 @@ public class OverflowResource {
     }
     insertMetadatas.get(deviceId).get(measurementId).add(chunkMetaData);
   }
+
+  /**
+   * Delete data of a timeseries whose time ranges from 0 to timestamp.
+   *
+   * @param deviceId the deviceId of the timeseries.
+   * @param measurementId the measurementId of the timeseries.
+   * @param timestamp the upper-bound of deletion time.
+   * @param updatedModFiles add successfully updated modificationFile to this list, so that the
+   * deletion can be aborted when exception is thrown.
+   */
+  public void delete(String deviceId, String measurementId, long timestamp, long version,
+      List<ModificationFile> updatedModFiles)
+      throws IOException {
+    modificationFile.write(new Deletion(deviceId + IoTDBConstant.PATH_SEPARATOR
+        + measurementId, version, timestamp));
+    updatedModFiles.add(modificationFile);
+  }
 }
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowSupport.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowSupport.java
index d847311..b04af60 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowSupport.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowSupport.java
@@ -74,18 +74,13 @@ public class OverflowSupport {
     indexTrees.get(deviceId).get(measurementId).update(startTime, endTime);
   }
 
-  /**
-   * @deprecated delete time series data
-   */
-  @Deprecated
-  public void delete(String deviceId, String measurementId, long timestamp, TSDataType dataType) {
-    if (!indexTrees.containsKey(deviceId)) {
-      indexTrees.put(deviceId, new HashMap<>());
-    }
-    if (!indexTrees.get(deviceId).containsKey(measurementId)) {
-      indexTrees.get(deviceId).put(measurementId, new OverflowSeriesImpl(measurementId, dataType));
+  public void delete(String deviceId, String measurementId, long timestamp, boolean isFlushing) {
+    if (isFlushing) {
+      memTable = memTable.copy();
+      memTable.delete(deviceId, measurementId, timestamp);
+    } else {
+      memTable.delete(deviceId, measurementId, timestamp);
     }
-    indexTrees.get(deviceId).get(measurementId).delete(timestamp);
   }
 
   public TimeValuePairSorter queryOverflowInsertInMemory(String deviceId, String measurementId,
@@ -94,8 +89,7 @@ public class OverflowSupport {
   }
 
   public BatchData queryOverflowUpdateInMemory(String deviceId, String measurementId,
-                                               TSDataType dataType,
-                                               BatchData data) {
+                                               TSDataType dataType) {
     if (indexTrees.containsKey(deviceId) && indexTrees.get(deviceId).containsKey(measurementId)
         && indexTrees.get(deviceId).get(measurementId).getDataType().equals(dataType)) {
       return indexTrees.get(deviceId).get(measurementId).query();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
new file mode 100644
index 0000000..8cf64d7
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/version/SimpleFileVersionController.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.version;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SimpleFileVersionController uses a local file and its file name to store the version.
+ */
+public class SimpleFileVersionController implements VersionController {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SimpleFileVersionController.class);
+  /**
+   * Every time currVersion - prevVersion >= SAVE_INTERVAL, currVersion is persisted and prevVersion
+   * is set to currVersion. When recovering from file, the version number is automatically increased
+   * by SAVE_INTERVAL to avoid conflicts.
+   */
+  public static final long SAVE_INTERVAL = 100;
+  private static final String FILE_PREFIX = "Version-";
+  private long prevVersion;
+  private long currVersion;
+  private String directoryPath;
+
+  public SimpleFileVersionController(String directoryPath) throws IOException {
+    this.directoryPath = directoryPath;
+    restore();
+  }
+
+  @Override
+  public synchronized long nextVersion() {
+    currVersion ++;
+    try {
+      checkPersist();
+    } catch (IOException e) {
+      LOGGER.error(e.getMessage());
+    }
+    return currVersion;
+  }
+
+  /**
+   * Test only method, no need for concurrency.
+   * @return the current version.
+   */
+  @Override
+  public long currVersion() {
+    return currVersion;
+  }
+
+  private void checkPersist() throws IOException {
+    if ((currVersion - prevVersion) >= SAVE_INTERVAL) {
+      persist();
+    }
+  }
+
+  private void persist() throws IOException {
+    File oldFile = new File(directoryPath, FILE_PREFIX + prevVersion);
+    File newFile = new File(directoryPath, FILE_PREFIX + currVersion);
+    if (!oldFile.renameTo(newFile)) {
+      throw new IOException(String
+          .format("can not rename file %s to file %s", oldFile.getAbsolutePath(),
+              newFile.getAbsolutePath()));
+    }
+    prevVersion = currVersion;
+  }
+
+  private void restore() throws IOException {
+    File directory = new File(directoryPath);
+    File[] versionFiles = directory.listFiles((dir, name) -> name.startsWith(FILE_PREFIX));
+    File versionFile = null;
+    if (versionFiles != null && versionFiles.length > 0) {
+      long maxVersion = 0;
+      int maxVersionIndex = 0;
+      for (int i = 0; i < versionFiles.length; i ++) {
+        // extract version from "Version-123456"
+        long fileVersion = Long.parseLong(versionFiles[i].getName().split("-")[1]);
+        if (fileVersion > maxVersion) {
+          maxVersion = fileVersion;
+          maxVersionIndex = i;
+        }
+      }
+      prevVersion = maxVersion;
+      for(int i = 0; i < versionFiles.length; i ++) {
+        if (i != maxVersionIndex) {
+          versionFiles[i].delete();
+        }
+      }
+    } else {
+      versionFile = new File(directory, FILE_PREFIX + "0");
+      prevVersion = 0;
+      new FileOutputStream(versionFile).close();
+    }
+    // prevent overlapping in case of failure
+    currVersion = prevVersion + SAVE_INTERVAL;
+    persist();
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/version/SysTimeVersionController.java
similarity index 63%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
copy to iotdb/src/main/java/org/apache/iotdb/db/engine/version/SysTimeVersionController.java
index 7c63be6..8e38ea2 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/version/SysTimeVersionController.java
@@ -16,29 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.tsfile.read.common;
 
-import java.nio.ByteBuffer;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+package org.apache.iotdb.db.engine.version;
 
 /**
- * used in query.
+ * SysTimeVersionController uses system timestamp as the version number.
  */
-public class Chunk {
+public class SysTimeVersionController implements VersionController {
 
-  private ChunkHeader chunkHeader;
-  private ByteBuffer chunkData;
+  public static final SysTimeVersionController INSTANCE = new SysTimeVersionController();
+
+  private SysTimeVersionController() {
 
-  public Chunk(ChunkHeader header, ByteBuffer buffer) {
-    this.chunkHeader = header;
-    this.chunkData = buffer;
   }
 
-  public ChunkHeader getHeader() {
-    return chunkHeader;
+  @Override
+  public long nextVersion() {
+    return System.currentTimeMillis();
   }
 
-  public ByteBuffer getData() {
-    return chunkData;
+  @Override
+  public long currVersion() {
+    return System.currentTimeMillis();
   }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/version/VersionController.java
similarity index 67%
copy from iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
copy to iotdb/src/main/java/org/apache/iotdb/db/engine/version/VersionController.java
index 123e4d2..e55ac34 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/version/VersionController.java
@@ -16,14 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.engine.memtable;
 
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+package org.apache.iotdb.db.engine.version;
 
-public class PrimitiveMemTable extends AbstractMemTable {
+/**
+ * VersionController controls the version(a monotonic increasing long) of a FileNode.
+ */
+public interface VersionController {
+  /**
+   * Get the next version number.
+   * @return the next version number.
+   */
+  long nextVersion();
 
-  @Override
-  protected IWritableMemChunk genMemSeries(TSDataType dataType) {
-    return new WritableMemChunk(dataType);
-  }
+  /**
+   * Get the current version number.
+   * @return the current version number.
+   */
+  long currVersion();
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java b/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
index 9b3e207..56d9891 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
@@ -38,7 +38,6 @@ import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.ServiceType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
 import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
@@ -327,14 +326,12 @@ public class StatMonitor implements IService {
             for (Map.Entry<String, IStatistic> entry : statisticMap.entrySet()) {
               for (String statParamName : entry.getValue().getStatParamsHashMap().keySet()) {
                 fManager.delete(entry.getKey(), statParamName,
-                        currentTimeMillis - statMonitorRetainIntervalSec * 1000, TSDataType.INT64);
+                    currentTimeMillis - statMonitorRetainIntervalSec * 1000);
               }
             }
           } catch (FileNodeManagerException e) {
-            LOGGER
-                    .error("Error occurred when deleting statistics information periodically, because",
+            LOGGER.error("Error occurred when deleting statistics information periodically, because",
                             e);
-            e.printStackTrace();
           }
         }
         HashMap<String, TSRecord> tsRecordHashMap = gatherStatistics();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
index 6396e1b..02f25f2 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/OverflowQPExecutor.java
@@ -226,7 +226,7 @@ public class OverflowQPExecutor extends QueryProcessExecutor {
       }
       mManager.getFileNameByPath(path.getFullPath());
       TSDataType type = mManager.getSeriesType(path.getFullPath());
-      fileNodeManager.delete(deviceId, measurementId, timestamp, type);
+      fileNodeManager.delete(deviceId, measurementId, timestamp);
       return true;
     } catch (PathErrorException e) {
       throw new ProcessorException(e.getMessage());
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index e5a7ade..65f06f0 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.qp.strategy;
 
 import java.util.List;
@@ -26,6 +27,7 @@ import org.apache.iotdb.db.exception.qp.QueryProcessorException;
 import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator;
+import org.apache.iotdb.db.qp.logical.crud.DeleteOperator;
 import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
 import org.apache.iotdb.db.qp.logical.crud.InsertOperator;
 import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
@@ -35,6 +37,7 @@ import org.apache.iotdb.db.qp.logical.sys.MetadataOperator;
 import org.apache.iotdb.db.qp.logical.sys.PropertyOperator;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
 import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -87,13 +90,13 @@ public class PhysicalGenerator {
         PropertyOperator property = (PropertyOperator) operator;
         return new PropertyPlan(property.getPropertyType(), property.getPropertyPath(),
             property.getMetadataPath());
-      // case DELETE:
-      // DeleteOperator delete = (DeleteOperator) operator;
-      // paths = delete.getSelectedPaths();
-      // if (delete.getTime() <= 0) {
-      // throw new LogicalOperatorException("For Delete command, time must greater than 0.");
-      // }
-      // return new DeletePlan(delete.getTime(), paths);
+      case DELETE:
+        DeleteOperator delete = (DeleteOperator) operator;
+        paths = delete.getSelectedPaths();
+        if (delete.getTime() <= 0) {
+          throw new LogicalOperatorException("For Delete command, time must greater than 0.");
+        }
+        return new DeletePlan(delete.getTime(), paths);
       case INSERT:
         InsertOperator Insert = (InsertOperator) operator;
         paths = Insert.getSelectedPaths();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java b/iotdb/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
new file mode 100644
index 0000000..2a7769a
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.context;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+
+/**
+ * QueryContext contains the shared information with in a query.
+ */
+public class QueryContext {
+
+  /**
+   * The outer key is the path of a ModificationFile, the inner key in the name of a timeseries and
+   * the value is the Modifications of a timeseries in this file.
+   */
+  private Map<String, Map<String, List<Modification>>> filePathModCache = new HashMap<>();
+  /**
+   * The key is the path of a ModificationFile and the value is all Modifications in this file. We
+   * use this field because each call of Modification.getModifications() return a copy of the
+   * Modifications, and we do not want it to create multiple copies within a query.
+   */
+  private Map<String, List<Modification>> fileModCache = new HashMap<>();
+
+  /**
+   * Find the modifications of timeseries 'path' in 'modFile'. If they are not in the cache, read
+   * them from 'modFile' and put then into the cache.
+   */
+  public List<Modification> getPathModifications(ModificationFile modFile, String path)
+      throws IOException {
+
+    Map<String, List<Modification>> fileModifications =
+        filePathModCache.computeIfAbsent(modFile.getFilePath(), k -> new HashMap<>());
+    List<Modification> pathModifications = fileModifications.get(path);
+
+    if (pathModifications == null) {
+      List<Modification> allModifications = fileModCache.get(modFile.getFilePath());
+      if (allModifications == null) {
+        allModifications = (List<Modification>) modFile.getModifications();
+        fileModCache.put(modFile.getFilePath(), allModifications);
+      }
+      pathModifications = new ArrayList<>();
+      if (!allModifications.isEmpty()) {
+        List<Modification> finalPathModifications = pathModifications;
+        allModifications.forEach(modification -> {
+          if (modification.getPath().equals(path)) {
+            finalPathModifications.add(modification);
+          }
+        });
+      }
+      fileModifications.put(path, pathModifications);
+    }
+
+    return pathModifications;
+  }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryDataSourceManager.java b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryDataSourceManager.java
index 2824a9f..f3fd1f3 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryDataSourceManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/control/QueryDataSourceManager.java
@@ -21,12 +21,14 @@ package org.apache.iotdb.db.query.control;
 import org.apache.iotdb.db.engine.filenode.FileNodeManager;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
 
 /**
  * <p>
- * This class is used to get query data source of a given path. See the component of <code>QueryDataSource</code>
+ * This class is used to get query data source of a given path. See the component of
+ * <code>QueryDataSource</code>
  */
 public class QueryDataSourceManager {
 
@@ -35,11 +37,12 @@ public class QueryDataSourceManager {
   private QueryDataSourceManager() {
   }
 
-  public static QueryDataSource getQueryDataSource(long jobId, Path selectedPath)
+  public static QueryDataSource getQueryDataSource(long jobId, Path selectedPath,
+      QueryContext context)
       throws FileNodeManagerException {
 
     SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(selectedPath, null);
-    QueryDataSource queryDataSource = fileNodeManager.query(singleSeriesExpression);
+    QueryDataSource queryDataSource = fileNodeManager.query(singleSeriesExpression, context);
 
     // add used files to current thread request cached map
     OpenedFilePathsManager.getInstance()
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
index 86d36dc..13f0053 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithTimeGenerator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryDataSourceManager;
 import org.apache.iotdb.db.query.control.QueryTokenManager;
 import org.apache.iotdb.db.query.dataset.EngineDataSetWithTimeGenerator;
@@ -59,7 +60,7 @@ public class EngineExecutorWithTimeGenerator {
    * @throws IOException IOException
    * @throws FileNodeManagerException FileNodeManagerException
    */
-  public QueryDataSet execute() throws FileNodeManagerException {
+  public QueryDataSet execute(QueryContext context) throws FileNodeManagerException {
 
     QueryTokenManager.getInstance()
         .beginQueryOfGivenQueryPaths(jobId, queryExpression.getSelectedSeries());
@@ -69,8 +70,9 @@ public class EngineExecutorWithTimeGenerator {
     EngineTimeGenerator timestampGenerator;
     List<EngineReaderByTimeStamp> readersOfSelectedSeries;
     try {
-      timestampGenerator = new EngineTimeGenerator(jobId, queryExpression.getExpression());
-      readersOfSelectedSeries = getReadersOfSelectedPaths(queryExpression.getSelectedSeries());
+      timestampGenerator = new EngineTimeGenerator(jobId, queryExpression.getExpression(), context);
+      readersOfSelectedSeries = getReadersOfSelectedPaths(queryExpression.getSelectedSeries(),
+          context);
     } catch (IOException ex) {
       throw new FileNodeManagerException(ex);
     }
@@ -90,20 +92,22 @@ public class EngineExecutorWithTimeGenerator {
         readersOfSelectedSeries);
   }
 
-  private List<EngineReaderByTimeStamp> getReadersOfSelectedPaths(List<Path> paths)
+  private List<EngineReaderByTimeStamp> getReadersOfSelectedPaths(List<Path> paths,
+      QueryContext context)
       throws IOException, FileNodeManagerException {
 
     List<EngineReaderByTimeStamp> readersOfSelectedSeries = new ArrayList<>();
 
     for (Path path : paths) {
 
-      QueryDataSource queryDataSource = QueryDataSourceManager.getQueryDataSource(jobId, path);
+      QueryDataSource queryDataSource = QueryDataSourceManager.getQueryDataSource(jobId, path,
+          context);
 
       PriorityMergeReaderByTimestamp mergeReaderByTimestamp = new PriorityMergeReaderByTimestamp();
 
       // reader for sequence data
       SequenceDataReader tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
-          null);
+          null, context);
       mergeReaderByTimestamp.addReaderWithPriority(tsFilesReader, 1);
 
       // reader for unSequence data
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
index 5742fcc..11c94bc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineExecutorWithoutTimeGenerator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryDataSourceManager;
 import org.apache.iotdb.db.query.control.QueryTokenManager;
 import org.apache.iotdb.db.query.dataset.EngineDataSetWithoutTimeGenerator;
@@ -55,7 +56,7 @@ public class EngineExecutorWithoutTimeGenerator {
   /**
    * with global time filter.
    */
-  public QueryDataSet executeWithGlobalTimeFilter()
+  public QueryDataSet executeWithGlobalTimeFilter(QueryContext context)
       throws FileNodeManagerException {
 
     Filter timeFilter = ((GlobalTimeExpression) queryExpression.getExpression()).getFilter();
@@ -68,7 +69,8 @@ public class EngineExecutorWithoutTimeGenerator {
 
     for (Path path : queryExpression.getSelectedSeries()) {
 
-      QueryDataSource queryDataSource = QueryDataSourceManager.getQueryDataSource(jobId, path);
+      QueryDataSource queryDataSource = QueryDataSourceManager.getQueryDataSource(jobId, path,
+          context);
 
       // add data type
       try {
@@ -80,17 +82,17 @@ public class EngineExecutorWithoutTimeGenerator {
       PriorityMergeReader priorityReader = new PriorityMergeReader();
 
       // sequence reader for one sealed tsfile
-      SequenceDataReader tsFilesReader = null;
+      SequenceDataReader tsFilesReader;
       try {
         tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
-            timeFilter);
+            timeFilter, context);
         priorityReader.addReaderWithPriority(tsFilesReader, PriorityMergeReader.LOW_PRIORITY);
       } catch (IOException e) {
         throw new FileNodeManagerException(e);
       }
 
       // unseq reader for all chunk groups in unSeqFile
-      PriorityMergeReader unSeqMergeReader = null;
+      PriorityMergeReader unSeqMergeReader;
       try {
         unSeqMergeReader = SeriesReaderFactory.getInstance()
             .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), timeFilter);
@@ -113,7 +115,7 @@ public class EngineExecutorWithoutTimeGenerator {
   /**
    * without filter.
    */
-  public QueryDataSet executeWithoutFilter()
+  public QueryDataSet executeWithoutFilter(QueryContext context)
       throws FileNodeManagerException {
 
     List<IReader> readersOfSelectedSeries = new ArrayList<>();
@@ -124,7 +126,8 @@ public class EngineExecutorWithoutTimeGenerator {
 
     for (Path path : queryExpression.getSelectedSeries()) {
 
-      QueryDataSource queryDataSource = QueryDataSourceManager.getQueryDataSource(jobId, path);
+      QueryDataSource queryDataSource = QueryDataSourceManager.getQueryDataSource(jobId, path,
+          context);
 
       // add data type
       try {
@@ -136,17 +139,17 @@ public class EngineExecutorWithoutTimeGenerator {
       PriorityMergeReader priorityReader = new PriorityMergeReader();
 
       // sequence insert data
-      SequenceDataReader tsFilesReader = null;
+      SequenceDataReader tsFilesReader;
       try {
         tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
-            null);
+            null, context);
         priorityReader.addReaderWithPriority(tsFilesReader, 1);
       } catch (IOException e) {
         throw new FileNodeManagerException(e);
       }
 
       // unseq insert data
-      PriorityMergeReader unSeqMergeReader = null;
+      PriorityMergeReader unSeqMergeReader;
       try {
         unSeqMergeReader = SeriesReaderFactory.getInstance()
             .createUnSeqMergeReader(queryDataSource.getOverflowSeriesDataSource(), null);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
index db9dfa4..c9327f4 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.OpenedFilePathsManager;
 import org.apache.iotdb.db.query.control.QueryTokenManager;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
@@ -55,6 +56,8 @@ public class EngineQueryRouter {
     QueryTokenManager.getInstance().setJobIdForCurrentRequestThread(nextJobId);
     OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(nextJobId);
 
+    QueryContext context = new QueryContext();
+
     if (queryExpression.hasQueryFilter()) {
       try {
         IExpression optimizedExpression = ExpressionOptimizer.getInstance()
@@ -64,13 +67,14 @@ public class EngineQueryRouter {
         if (optimizedExpression.getType() == GLOBAL_TIME) {
           EngineExecutorWithoutTimeGenerator engineExecutor =
               new EngineExecutorWithoutTimeGenerator(
+
                   nextJobId, queryExpression);
-          return engineExecutor.executeWithGlobalTimeFilter();
+          return engineExecutor.executeWithGlobalTimeFilter(context);
         } else {
           EngineExecutorWithTimeGenerator engineExecutor = new EngineExecutorWithTimeGenerator(
               nextJobId,
               queryExpression);
-          return engineExecutor.execute();
+          return engineExecutor.execute(context);
         }
 
       } catch (QueryFilterOptimizationException e) {
@@ -80,7 +84,7 @@ public class EngineQueryRouter {
       EngineExecutorWithoutTimeGenerator engineExecutor = new EngineExecutorWithoutTimeGenerator(
           nextJobId,
           queryExpression);
-      return engineExecutor.executeWithoutFilter();
+      return engineExecutor.executeWithoutFilter(context);
     }
   }
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
index 8062172..4854479 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/factory/SeriesReaderFactory.java
@@ -16,13 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.query.factory;
 
 import java.io.IOException;
 import java.util.List;
 import org.apache.iotdb.db.engine.filenode.IntervalFileNode;
+import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
 import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
+import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.query.reader.IReader;
 import org.apache.iotdb.db.query.reader.mem.MemChunkReaderWithFilter;
@@ -30,6 +33,7 @@ import org.apache.iotdb.db.query.reader.mem.MemChunkReaderWithoutFilter;
 import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
 import org.apache.iotdb.db.query.reader.sequence.SealedTsFilesReader;
 import org.apache.iotdb.db.query.reader.unsequence.EngineChunkReader;
+import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.tsfile.common.constant.StatisticConstant;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -129,7 +133,8 @@ public class SeriesReaderFactory {
    */
   public IReader createSeriesReaderForMerge(IntervalFileNode intervalFileNode,
       OverflowSeriesDataSource overflowSeriesDataSource,
-      SingleSeriesExpression singleSeriesExpression)
+      SingleSeriesExpression singleSeriesExpression,
+      QueryContext context)
       throws IOException {
 
     logger.debug("Create seriesReaders for merge. SeriesFilter = {}. TsFilePath = {}",
@@ -139,8 +144,8 @@ public class SeriesReaderFactory {
     PriorityMergeReader priorityMergeReader = new PriorityMergeReader();
 
     // Sequence reader
-    IReader seriesInTsFileReader = createSealedTsFileReaderForMerge(intervalFileNode.getFilePath(),
-        singleSeriesExpression);
+    IReader seriesInTsFileReader = createSealedTsFileReaderForMerge(intervalFileNode,
+        singleSeriesExpression, context);
     priorityMergeReader.addReaderWithPriority(seriesInTsFileReader, 1);
 
     // UnSequence merge reader
@@ -151,20 +156,25 @@ public class SeriesReaderFactory {
     return priorityMergeReader;
   }
 
-  private IReader createSealedTsFileReaderForMerge(String filePath,
-      SingleSeriesExpression singleSeriesExpression)
+  private IReader createSealedTsFileReaderForMerge(IntervalFileNode fileNode,
+      SingleSeriesExpression singleSeriesExpression,
+      QueryContext context)
       throws IOException {
     TsFileSequenceReader tsFileSequenceReader = FileReaderManager.getInstance()
-        .get(filePath, false);
+        .get(fileNode.getFilePath(), false);
     ChunkLoaderImpl chunkLoader = new ChunkLoaderImpl(tsFileSequenceReader);
     MetadataQuerier metadataQuerier = new MetadataQuerierByFileImpl(tsFileSequenceReader);
     List<ChunkMetaData> metaDataList = metadataQuerier
         .getChunkMetaDataList(singleSeriesExpression.getSeriesPath());
 
+    List<Modification> modifications = context.getPathModifications(fileNode.getModFile(),
+        singleSeriesExpression.getSeriesPath().getFullPath());
+    QueryUtils.modifyChunkMetaData(metaDataList, modifications);
+
     FileSeriesReader seriesInTsFileReader = new FileSeriesReaderWithFilter(chunkLoader,
         metaDataList,
         singleSeriesExpression.getFilter());
-    return new SealedTsFilesReader(seriesInTsFileReader);
+    return new SealedTsFilesReader(seriesInTsFileReader, context);
   }
 
   private static class SeriesReaderFactoryHelper {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java
index 7e21a01..4db8c9d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SealedTsFilesReader.java
@@ -16,14 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.query.reader.sequence;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.iotdb.db.engine.filenode.IntervalFileNode;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.query.reader.IReader;
+import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.db.utils.TimeValuePair;
 import org.apache.iotdb.db.utils.TimeValuePairUtils;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
@@ -47,26 +51,32 @@ public class SealedTsFilesReader implements IReader {
   private Filter filter;
   private BatchData data;
   private boolean hasCachedData;
+  private QueryContext context;
 
-  public SealedTsFilesReader(Path seriesPath, List<IntervalFileNode> sealedTsFiles, Filter filter) {
-    this(seriesPath, sealedTsFiles);
+  public SealedTsFilesReader(Path seriesPath, List<IntervalFileNode> sealedTsFiles, Filter filter,
+      QueryContext context) {
+    this(seriesPath, sealedTsFiles, context);
     this.filter = filter;
+
   }
 
   /**
    * init with seriesPath and sealedTsFiles.
    */
-  public SealedTsFilesReader(Path seriesPath, List<IntervalFileNode> sealedTsFiles) {
+  public SealedTsFilesReader(Path seriesPath, List<IntervalFileNode> sealedTsFiles,
+      QueryContext context) {
     this.seriesPath = seriesPath;
     this.sealedTsFiles = sealedTsFiles;
     this.usedIntervalFileIndex = 0;
     this.seriesReader = null;
     this.hasCachedData = false;
+    this.context = context;
   }
 
-  public SealedTsFilesReader(FileSeriesReader seriesReader) {
+  public SealedTsFilesReader(FileSeriesReader seriesReader, QueryContext context) {
     this.seriesReader = seriesReader;
     sealedTsFiles = new ArrayList<>();
+    this.context = context;
   }
 
   @Override
@@ -101,7 +111,7 @@ public class SealedTsFilesReader implements IReader {
         if (seriesReader == null || !seriesReader.hasNextBatch()) {
           IntervalFileNode fileNode = sealedTsFiles.get(usedIntervalFileIndex++);
           if (singleTsFileSatisfied(fileNode)) {
-            initSingleTsFileReader(fileNode);
+            initSingleTsFileReader(fileNode, context);
           } else {
             flag = true;
           }
@@ -156,7 +166,8 @@ public class SealedTsFilesReader implements IReader {
     return filter.satisfyStartEndTime(startTime, endTime);
   }
 
-  private void initSingleTsFileReader(IntervalFileNode fileNode) throws IOException {
+  private void initSingleTsFileReader(IntervalFileNode fileNode, QueryContext context)
+      throws IOException {
 
     // to avoid too many opened files
     TsFileSequenceReader tsFileReader = FileReaderManager.getInstance()
@@ -164,6 +175,13 @@ public class SealedTsFilesReader implements IReader {
 
     MetadataQuerierByFileImpl metadataQuerier = new MetadataQuerierByFileImpl(tsFileReader);
     List<ChunkMetaData> metaDataList = metadataQuerier.getChunkMetaDataList(seriesPath);
+
+    List<Modification> pathModifications = context.getPathModifications(fileNode.getModFile(),
+        seriesPath.getFullPath());
+    if (pathModifications.size() > 0) {
+      QueryUtils.modifyChunkMetaData(metaDataList, pathModifications);
+    }
+
     ChunkLoader chunkLoader = new ChunkLoaderImpl(tsFileReader);
 
     if (filter == null) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java
index d4cbbe4..e4ca98f 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/reader/sequence/SequenceDataReader.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSource;
+import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.reader.IReader;
 import org.apache.iotdb.db.query.reader.mem.MemChunkReaderWithFilter;
 import org.apache.iotdb.db.query.reader.mem.MemChunkReaderWithoutFilter;
@@ -45,7 +46,8 @@ public class SequenceDataReader implements IReader {
   /**
    * init with globalSortedSeriesDataSource and filter.
    */
-  public SequenceDataReader(GlobalSortedSeriesDataSource sources, Filter filter)
+  public SequenceDataReader(GlobalSortedSeriesDataSource sources, Filter filter,
+      QueryContext context)
       throws IOException {
     seriesReaders = new ArrayList<>();
 
@@ -55,7 +57,8 @@ public class SequenceDataReader implements IReader {
     // add reader for sealed TsFiles
     if (sources.hasSealedTsFiles()) {
       seriesReaders.add(
-          new SealedTsFilesReader(sources.getSeriesPath(), sources.getSealedTsFiles(), filter));
+          new SealedTsFilesReader(sources.getSeriesPath(), sources.getSealedTsFiles(), filter,
+              context));
     }
 
     // add reader for unSealed TsFile
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
index 8b42519..0cb02c8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineNodeConstructor.java
@@ -25,6 +25,7 @@ import static org.apache.iotdb.tsfile.read.expression.ExpressionType.SERIES;
 import java.io.IOException;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryDataSourceManager;
 import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
 import org.apache.iotdb.db.query.reader.IReader;
@@ -55,10 +56,12 @@ public class EngineNodeConstructor {
    * @throws IOException IOException
    * @throws FileNodeManagerException FileNodeManagerException
    */
-  public Node construct(IExpression expression) throws FileNodeManagerException {
+  public Node construct(IExpression expression, QueryContext context)
+      throws FileNodeManagerException {
     if (expression.getType() == SERIES) {
       try {
-        return new EngineLeafNode(generateSeriesReader((SingleSeriesExpression) expression));
+        return new EngineLeafNode(generateSeriesReader((SingleSeriesExpression) expression,
+            context));
       } catch (IOException e) {
         throw new FileNodeManagerException(e);
       }
@@ -66,12 +69,12 @@ public class EngineNodeConstructor {
       Node leftChild;
       Node rightChild;
       if (expression.getType() == OR) {
-        leftChild = this.construct(((IBinaryExpression) expression).getLeft());
-        rightChild = this.construct(((IBinaryExpression) expression).getRight());
+        leftChild = this.construct(((IBinaryExpression) expression).getLeft(), context);
+        rightChild = this.construct(((IBinaryExpression) expression).getRight(), context);
         return new OrNode(leftChild, rightChild);
       } else if (expression.getType() == AND) {
-        leftChild = this.construct(((IBinaryExpression) expression).getLeft());
-        rightChild = this.construct(((IBinaryExpression) expression).getRight());
+        leftChild = this.construct(((IBinaryExpression) expression).getLeft(), context);
+        rightChild = this.construct(((IBinaryExpression) expression).getRight(), context);
         return new AndNode(leftChild, rightChild);
       } else {
         throw new UnSupportedDataTypeException(
@@ -80,11 +83,12 @@ public class EngineNodeConstructor {
     }
   }
 
-  private IReader generateSeriesReader(SingleSeriesExpression singleSeriesExpression)
+  private IReader generateSeriesReader(SingleSeriesExpression singleSeriesExpression,
+      QueryContext context)
       throws IOException, FileNodeManagerException {
 
     QueryDataSource queryDataSource = QueryDataSourceManager.getQueryDataSource(jobId,
-        singleSeriesExpression.getSeriesPath());
+        singleSeriesExpression.getSeriesPath(), context);
 
     Filter filter = singleSeriesExpression.getFilter();
 
@@ -92,7 +96,7 @@ public class EngineNodeConstructor {
 
     // reader for all sequence data
     SequenceDataReader tsFilesReader = new SequenceDataReader(queryDataSource.getSeqDataSource(),
-        filter);
+        filter, context);
     priorityReader.addReaderWithPriority(tsFilesReader, 1);
 
     // reader for all unSequence data
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java
index fda9e10..a151677 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/query/timegenerator/EngineTimeGenerator.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.timegenerator;
 
 import java.io.IOException;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator;
@@ -38,16 +39,16 @@ public class EngineTimeGenerator implements TimeGenerator {
   /**
    * Constructor of EngineTimeGenerator.
    */
-  public EngineTimeGenerator(long jobId, IExpression expression)
+  public EngineTimeGenerator(long jobId, IExpression expression, QueryContext context)
       throws IOException, FileNodeManagerException {
     this.jobId = jobId;
     this.expression = expression;
-    initNode();
+    initNode(context);
   }
 
-  private void initNode() throws IOException, FileNodeManagerException {
+  private void initNode(QueryContext context) throws IOException, FileNodeManagerException {
     EngineNodeConstructor engineNodeConstructor = new EngineNodeConstructor(jobId);
-    this.operatorNode = engineNodeConstructor.construct(expression);
+    this.operatorNode = engineNodeConstructor.construct(expression, context);
   }
 
   @Override
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
new file mode 100644
index 0000000..784abc0
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils;
+
+import java.util.List;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+
+public class QueryUtils {
+
+  private QueryUtils() {
+    // util class
+  }
+
+  /**
+   * modifyChunkMetaData iterates the chunkMetaData and applies all available modifications on it to
+   * generate a ModifiedChunkMetadata.
+   * @param chunkMetaData the original chunkMetaData.
+   * @param modifications all possible modifications.
+   */
+  public static void modifyChunkMetaData(List<ChunkMetaData> chunkMetaData,
+                                         List<Modification> modifications) {
+    int modIndex = 0;
+
+    for (int metaIndex = 0; metaIndex < chunkMetaData.size(); metaIndex++) {
+      ChunkMetaData metaData = chunkMetaData.get(metaIndex);
+      for (int j = modIndex; j < modifications.size(); j++) {
+        // iterate each modification to find the max deletion time
+        Modification modification = modifications.get(j);
+        if (modification.getVersionNum() > metaData.getVersion()) {
+          // this modification is after the Chunk, try modifying the chunk
+          // if this modification succeeds, update modIndex so in the next loop the previous
+          // modifications will not be examined
+          modIndex = doModifyChunkMetaData(modification, metaData)? j : modIndex;
+        } else {
+          // skip old modifications for next metadata
+          modIndex++;
+        }
+      }
+    }
+    // remove chunks that are completely deleted
+    chunkMetaData.removeIf(metaData -> metaData.getDeletedAt() >= metaData.getEndTime());
+  }
+
+  private static boolean doModifyChunkMetaData(Modification modification, ChunkMetaData metaData) {
+    if (modification instanceof Deletion) {
+      Deletion deletion = (Deletion) modification;
+      if (metaData.getDeletedAt() < deletion.getTimestamp()) {
+        metaData.setDeletedAt(deletion.getTimestamp());
+        return true;
+      }
+    }
+    return false;
+  }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java
index 6dd8647..56db00a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/ExclusiveLogRecoverPerformer.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.engine.filenode.FileNodeManager;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.ProcessorException;
@@ -58,6 +59,8 @@ public class ExclusiveLogRecoverPerformer implements RecoverPerformer {
   private RecoverStage currStage;
   private LogReplayer replayer = new ConcreteLogReplayer();
   private RecoverPerformer fileNodeRecoverPerformer;
+  // recovery of Overflow maybe different from BufferWrite
+  private boolean isOverflow;
 
   /**
    * constructor of ExclusiveLogRecoverPerformer.
@@ -68,6 +71,7 @@ public class ExclusiveLogRecoverPerformer implements RecoverPerformer {
     this.processorStoreFilePath = processorStoreFilePath;
     this.writeLogNode = logNode;
     this.fileNodeRecoverPerformer = new FileNodeRecoverPerformer(writeLogNode.getIdentifier());
+    this.isOverflow = logNode.getFileNodeName().contains(IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX);
   }
 
   public void setFileNodeRecoverPerformer(RecoverPerformer fileNodeRecoverPerformer) {
@@ -273,7 +277,7 @@ public class ExclusiveLogRecoverPerformer implements RecoverPerformer {
             logger.error("Log node {} read a bad log", writeLogNode.getIdentifier());
             throw new RecoverException("Cannot read old log file, recovery aborted.");
           }
-          replayer.replay(physicalPlan);
+          replayer.replay(physicalPlan, isOverflow);
         } catch (ProcessorException e) {
           failedCnt++;
           logger.error("Log node {}", writeLogNode.getLogDirectory(), e);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/ConcreteLogReplayer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/ConcreteLogReplayer.java
index 959fc5e..f64c862 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/ConcreteLogReplayer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/ConcreteLogReplayer.java
@@ -43,7 +43,7 @@ public class ConcreteLogReplayer implements LogReplayer {
    * @throws ProcessorException ProcessorException
    */
   @Override
-  public void replay(PhysicalPlan plan) throws ProcessorException {
+  public void replay(PhysicalPlan plan, boolean isOverflow) throws ProcessorException {
     try {
       if (plan instanceof InsertPlan) {
         InsertPlan insertPlan = (InsertPlan) plan;
@@ -53,7 +53,7 @@ public class ConcreteLogReplayer implements LogReplayer {
         update(updatePlan);
       } else if (plan instanceof DeletePlan) {
         DeletePlan deletePlan = (DeletePlan) plan;
-        delete(deletePlan);
+        delete(deletePlan, isOverflow);
       }
     } catch (Exception e) {
       throw new ProcessorException(
@@ -88,12 +88,15 @@ public class ConcreteLogReplayer implements LogReplayer {
     }
   }
 
-  private void delete(DeletePlan deletePlan) throws FileNodeManagerException, PathErrorException {
-    MManager memManager = MManager.getInstance();
+  private void delete(DeletePlan deletePlan, boolean isOverflow) throws FileNodeManagerException {
     for (Path path : deletePlan.getPaths()) {
-      FileNodeManager.getInstance()
-          .delete(path.getDevice(), path.getMeasurement(), deletePlan.getDeleteTime(),
-              memManager.getSeriesType(path.getFullPath()));
+      if (isOverflow) {
+        FileNodeManager.getInstance().deleteOverflow(path.getDevice(), path.getMeasurement(),
+            deletePlan.getDeleteTime());
+      } else {
+        FileNodeManager.getInstance().deleteBufferWrite(path.getDevice(), path.getMeasurement(),
+            deletePlan.getDeleteTime());
+      }
     }
   }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/LogReplayer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/LogReplayer.java
index 1510f00..f20381e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/LogReplayer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/replay/LogReplayer.java
@@ -23,5 +23,5 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 
 public interface LogReplayer {
 
-  void replay(PhysicalPlan plan) throws ProcessorException;
+  void replay(PhysicalPlan plan, boolean isOverflow) throws ProcessorException;
 }
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteBenchmark.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteBenchmark.java
index 4634e18..7c62ff7 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteBenchmark.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteBenchmark.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.engine.version.SysTimeVersionController;
 import org.apache.iotdb.db.exception.BufferWriteProcessorException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -32,7 +33,8 @@ import org.apache.iotdb.tsfile.write.schema.FileSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 /**
- * BufferWrite insert Benchmark. This class is used to bench Bufferwrite module and gets its performance.
+ * BufferWrite insert Benchmark. This class is used to bench Bufferwrite module and gets its
+ * performance.
  */
 public class BufferWriteBenchmark {
 
@@ -92,9 +94,9 @@ public class BufferWriteBenchmark {
       }
     });
 
-    BufferWriteProcessor bufferWriteProcessor = new BufferWriteProcessor("BufferBenchmark", "bench",
-        "benchFile",
-        parameters, fileSchema);
+    BufferWriteProcessor bufferWriteProcessor = new BufferWriteProcessor("BufferBenchmark",
+        "bench", "benchFile",
+        parameters, SysTimeVersionController.INSTANCE, fileSchema);
 
     long startTime = System.currentTimeMillis();
     for (int i = 0; i < numOfPoint; i++) {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
index 0efe087..cdd9ec3 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorNewTest.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.engine.bufferwrite;
 
 import static org.junit.Assert.assertEquals;
@@ -30,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.iotdb.db.conf.directories.Directories;
 import org.apache.iotdb.db.engine.MetadataManagerHelper;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
+import org.apache.iotdb.db.engine.version.SysTimeVersionController;
 import org.apache.iotdb.db.exception.BufferWriteProcessorException;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.utils.FileSchemaUtils;
@@ -93,7 +95,8 @@ public class BufferWriteProcessorNewTest {
       throws BufferWriteProcessorException, WriteProcessException, IOException, InterruptedException {
     bufferwrite = new BufferWriteProcessor(Directories.getInstance().getFolderForTest(),
         processorName, filename,
-        parameters, FileSchemaUtils.constructFileSchema(processorName));
+        parameters, SysTimeVersionController.INSTANCE,
+        FileSchemaUtils.constructFileSchema(processorName));
     assertEquals(filename, bufferwrite.getFileName());
     assertEquals(processorName + File.separator + filename, bufferwrite.getFileRelativePath());
     assertEquals(true, bufferwrite.isNewProcessor());
@@ -142,6 +145,7 @@ public class BufferWriteProcessorNewTest {
     // test recovery
     BufferWriteProcessor bufferWriteProcessor = new BufferWriteProcessor(
         Directories.getInstance().getFolderForTest(), processorName, filename, parameters,
+        SysTimeVersionController.INSTANCE,
         FileSchemaUtils.constructFileSchema(processorName));
     pair = bufferWriteProcessor.queryBufferWriteData(processorName, measurementId, dataType);
     left = pair.left;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
index 95e0d55..06d47f1 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessorTest.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.engine.bufferwrite;
 
 import static org.junit.Assert.assertEquals;
@@ -32,6 +33,7 @@ import org.apache.iotdb.db.conf.directories.Directories;
 import org.apache.iotdb.db.engine.MetadataManagerHelper;
 import org.apache.iotdb.db.engine.PathUtils;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
+import org.apache.iotdb.db.engine.version.SysTimeVersionController;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.utils.FileSchemaUtils;
@@ -111,7 +113,7 @@ public class BufferWriteProcessorTest {
   public void testWriteAndAbnormalRecover()
       throws WriteProcessException, InterruptedException, IOException, ProcessorException {
     bufferwrite = new BufferWriteProcessor(directories.getFolderForTest(), deviceId, insertPath,
-        parameters,
+        parameters, SysTimeVersionController.INSTANCE,
         FileSchemaUtils.constructFileSchema(deviceId));
     for (int i = 1; i < 100; i++) {
       bufferwrite.write(deviceId, measurementId, i, dataType, String.valueOf(i));
@@ -136,7 +138,8 @@ public class BufferWriteProcessorTest {
     file.renameTo(restoreFile);
     BufferWriteProcessor bufferWriteProcessor = new BufferWriteProcessor(
         directories.getFolderForTest(), deviceId,
-        insertPath, parameters, FileSchemaUtils.constructFileSchema(deviceId));
+        insertPath, parameters, SysTimeVersionController.INSTANCE,
+        FileSchemaUtils.constructFileSchema(deviceId));
     assertEquals(true, insertFile.exists());
     assertEquals(insertFileLength, insertFile.length());
     Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = bufferWriteProcessor
@@ -155,7 +158,7 @@ public class BufferWriteProcessorTest {
   public void testWriteAndNormalRecover()
       throws WriteProcessException, ProcessorException, InterruptedException {
     bufferwrite = new BufferWriteProcessor(directories.getFolderForTest(), deviceId, insertPath,
-        parameters,
+        parameters, SysTimeVersionController.INSTANCE,
         FileSchemaUtils.constructFileSchema(deviceId));
     for (int i = 1; i < 100; i++) {
       bufferwrite.write(deviceId, measurementId, i, dataType, String.valueOf(i));
@@ -169,7 +172,8 @@ public class BufferWriteProcessorTest {
     assertEquals(true, restoreFile.exists());
     BufferWriteProcessor bufferWriteProcessor = new BufferWriteProcessor(
         directories.getFolderForTest(), deviceId,
-        insertPath, parameters, FileSchemaUtils.constructFileSchema(deviceId));
+        insertPath, parameters, SysTimeVersionController.INSTANCE,
+        FileSchemaUtils.constructFileSchema(deviceId));
     Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = bufferWriteProcessor
         .queryBufferWriteData(deviceId,
             measurementId, dataType);
@@ -187,7 +191,7 @@ public class BufferWriteProcessorTest {
   public void testWriteAndQuery()
       throws WriteProcessException, InterruptedException, ProcessorException {
     bufferwrite = new BufferWriteProcessor(directories.getFolderForTest(), deviceId, insertPath,
-        parameters,
+        parameters, SysTimeVersionController.INSTANCE,
         FileSchemaUtils.constructFileSchema(deviceId));
     assertEquals(false, bufferwrite.isFlush());
     assertEquals(true, bufferwrite.canBeClosed());
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
index 9ba0624..d29ba29 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/bufferwrite/RestorableTsFileIOWriterTest.java
@@ -164,7 +164,7 @@ public class RestorableTsFileIOWriterTest {
     memTable.write("d1", "s2", TSDataType.INT32, 3, "1");
     memTable.write("d2", "s2", TSDataType.INT32, 2, "1");
     memTable.write("d2", "s2", TSDataType.INT32, 4, "1");
-    MemTableFlushUtil.flushMemTable(schema, writer, memTable);
+    MemTableFlushUtil.flushMemTable(schema, writer, memTable, 0);
     writer.flush();
     writer.appendMetadata();
     writer.getOutput().close();
@@ -217,7 +217,7 @@ public class RestorableTsFileIOWriterTest {
         MemTableTestUtils.measurementId0,
         MemTableTestUtils.dataType0);
 
-    MemTableFlushUtil.flushMemTable(MemTableTestUtils.getFileSchema(), writer, memTable);
+    MemTableFlushUtil.flushMemTable(MemTableTestUtils.getFileSchema(), writer, memTable, 0);
     writer.flush();
 
     assertEquals(0,
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteFileSizeControlTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteFileSizeControlTest.java
index 0d4ac60..a947254 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteFileSizeControlTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteFileSizeControlTest.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.engine.bufferwrite.Action;
 import org.apache.iotdb.db.engine.bufferwrite.ActionException;
 import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
 import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
+import org.apache.iotdb.db.engine.version.SysTimeVersionController;
 import org.apache.iotdb.db.exception.BufferWriteProcessorException;
 import org.apache.iotdb.db.metadata.ColumnSchema;
 import org.apache.iotdb.db.metadata.MManager;
@@ -143,7 +144,7 @@ public class BufferwriteFileSizeControlTest {
     try {
       processor = new BufferWriteProcessor(Directories.getInstance().getFolderForTest(), nsp,
           filename,
-          parameters, constructFileSchema(nsp));
+          parameters, SysTimeVersionController.INSTANCE, constructFileSchema(nsp));
     } catch (BufferWriteProcessorException e) {
       e.printStackTrace();
       fail(e.getMessage());
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteMetaSizeControlTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteMetaSizeControlTest.java
index a6991c0..0f5f259 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteMetaSizeControlTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/BufferwriteMetaSizeControlTest.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.engine.bufferwrite.Action;
 import org.apache.iotdb.db.engine.bufferwrite.ActionException;
 import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
 import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
+import org.apache.iotdb.db.engine.version.SysTimeVersionController;
 import org.apache.iotdb.db.exception.BufferWriteProcessorException;
 import org.apache.iotdb.db.metadata.ColumnSchema;
 import org.apache.iotdb.db.metadata.MManager;
@@ -143,7 +144,7 @@ public class BufferwriteMetaSizeControlTest {
     try {
       processor = new BufferWriteProcessor(Directories.getInstance().getFolderForTest(), nsp,
           filename,
-          parameters, constructFileSchema(nsp));
+          parameters, SysTimeVersionController.INSTANCE, constructFileSchema(nsp));
     } catch (BufferWriteProcessorException e) {
       e.printStackTrace();
       fail(e.getMessage());
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowFileSizeControlTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowFileSizeControlTest.java
index 2a9ef1d..7d27f26 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowFileSizeControlTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowFileSizeControlTest.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.engine.MetadataManagerHelper;
 import org.apache.iotdb.db.engine.bufferwrite.Action;
 import org.apache.iotdb.db.engine.bufferwrite.ActionException;
 import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
+import org.apache.iotdb.db.engine.version.SysTimeVersionController;
 import org.apache.iotdb.db.engine.overflow.io.OverflowProcessor;
 import org.apache.iotdb.db.exception.OverflowProcessorException;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -120,7 +121,7 @@ public class OverflowFileSizeControlTest {
     // insert one point: int
     try {
       ofprocessor = new OverflowProcessor(nameSpacePath, parameters,
-          FileSchemaUtils.constructFileSchema(deviceId));
+          FileSchemaUtils.constructFileSchema(deviceId), SysTimeVersionController.INSTANCE);
       for (int i = 1; i < 1000000; i++) {
         TSRecord record = new TSRecord(i, deviceId);
         record.addTuple(DataPoint.getDataPoint(dataTypes[0], measurementIds[0], String.valueOf(i)));
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowMetaSizeControlTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowMetaSizeControlTest.java
index fcef385..de6a586 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowMetaSizeControlTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/OverflowMetaSizeControlTest.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.engine.MetadataManagerHelper;
 import org.apache.iotdb.db.engine.bufferwrite.Action;
 import org.apache.iotdb.db.engine.bufferwrite.ActionException;
 import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
+import org.apache.iotdb.db.engine.version.SysTimeVersionController;
 import org.apache.iotdb.db.engine.overflow.io.OverflowProcessor;
 import org.apache.iotdb.db.exception.OverflowProcessorException;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -120,7 +121,7 @@ public class OverflowMetaSizeControlTest {
     // insert one point: int
     try {
       ofprocessor = new OverflowProcessor(nameSpacePath, parameters,
-          FileSchemaUtils.constructFileSchema(deviceId));
+          FileSchemaUtils.constructFileSchema(deviceId), SysTimeVersionController.INSTANCE);
       for (int i = 1; i < 1000000; i++) {
         TSRecord record = new TSRecord(i, deviceId);
         record.addTuple(DataPoint.getDataPoint(dataTypes[0], measurementIds[0], String.valueOf(i)));
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
new file mode 100644
index 0000000..2c3bd20
--- /dev/null
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionFileNodeTest.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.modification;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.Directories;
+import org.apache.iotdb.db.engine.filenode.FileNodeManager;
+import org.apache.iotdb.db.engine.modification.io.LocalTextModificationAccessor;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.MetadataArgsErrorException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DoubleDataPoint;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DeletionFileNodeTest {
+
+  private String processorName = "root.test";
+
+  private static String[] measurements = new String[10];
+  private String dataType = TSDataType.DOUBLE.toString();
+  private String encoding = TSEncoding.PLAIN.toString();
+  private String[] args = new String[0];
+
+  static {
+    for (int i = 0; i < 10; i++) {
+      measurements[i] = "m" + i;
+    }
+  }
+
+  @Before
+  public void setup() throws MetadataArgsErrorException,
+      PathErrorException, IOException, FileNodeManagerException {
+    MManager.getInstance().setStorageLevelToMTree(processorName);
+    for (int i = 0; i < 10; i++) {
+      MManager.getInstance().addPathToMTree(processorName + "." + measurements[i], dataType,
+          encoding, args);
+      FileNodeManager.getInstance()
+          .addTimeSeries(new Path(processorName, measurements[i]), dataType,
+              encoding);
+    }
+  }
+
+  @After
+  public void teardown() throws IOException, FileNodeManagerException {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void testDeleteInBufferWriteCache() throws
+      FileNodeManagerException {
+
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, processorName);
+      for (int j = 0; j < 10; j++) {
+        record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+      }
+      FileNodeManager.getInstance().insert(record, false);
+    }
+
+    FileNodeManager.getInstance().delete(processorName, measurements[3], 50);
+    FileNodeManager.getInstance().delete(processorName, measurements[4], 50);
+    FileNodeManager.getInstance().delete(processorName, measurements[5], 30);
+    FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
+
+    SingleSeriesExpression expression = new SingleSeriesExpression(new Path(processorName,
+        measurements[5]), null);
+    QueryContext context = new QueryContext();
+    QueryDataSource dataSource = FileNodeManager.getInstance().query(expression, context);
+    Iterator<TimeValuePair> timeValuePairs =
+        dataSource.getSeqDataSource().getReadableChunk().getIterator();
+    int count = 0;
+    while (timeValuePairs.hasNext()) {
+      timeValuePairs.next();
+      count++;
+    }
+    assertEquals(50, count);
+  }
+
+  @Test
+  public void testDeleteInBufferWriteFile() throws FileNodeManagerException, IOException {
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, processorName);
+      for (int j = 0; j < 10; j++) {
+        record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+      }
+      FileNodeManager.getInstance().insert(record, false);
+    }
+    FileNodeManager.getInstance().closeAll();
+
+    FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
+    FileNodeManager.getInstance().delete(processorName, measurements[4], 40);
+    FileNodeManager.getInstance().delete(processorName, measurements[3], 30);
+
+    Modification[] realModifications = new Modification[]{
+        new Deletion(processorName + "." + measurements[5], 102, 50),
+        new Deletion(processorName + "." + measurements[4], 103, 40),
+        new Deletion(processorName + "." + measurements[3], 104, 30),
+    };
+
+    String fileNodePath = Directories.getInstance().getTsFileFolder(0) + File.separator
+        + processorName;
+    File fileNodeDir = new File(fileNodePath);
+    File[] modFiles = fileNodeDir.listFiles((dir, name)
+        -> name.endsWith(ModificationFile.FILE_SUFFIX));
+    assertEquals(modFiles.length, 1);
+
+    LocalTextModificationAccessor accessor =
+        new LocalTextModificationAccessor(modFiles[0].getPath());
+    try {
+      Collection<Modification> modifications = accessor.read();
+      assertEquals(modifications.size(), 3);
+      int i = 0;
+      for (Modification modification : modifications) {
+        assertTrue(modification.equals(realModifications[i++]));
+      }
+    } finally {
+      accessor.close();
+    }
+  }
+
+  @Test
+  public void testDeleteInOverflowCache() throws FileNodeManagerException {
+    // insert into BufferWrite
+    for (int i = 101; i <= 200; i++) {
+      TSRecord record = new TSRecord(i, processorName);
+      for (int j = 0; j < 10; j++) {
+        record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+      }
+      FileNodeManager.getInstance().insert(record, false);
+    }
+    FileNodeManager.getInstance().closeAll();
+
+    // insert into Overflow
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, processorName);
+      for (int j = 0; j < 10; j++) {
+        record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+      }
+      FileNodeManager.getInstance().insert(record, false);
+    }
+
+    FileNodeManager.getInstance().delete(processorName, measurements[3], 50);
+    FileNodeManager.getInstance().delete(processorName, measurements[4], 50);
+    FileNodeManager.getInstance().delete(processorName, measurements[5], 30);
+    FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
+
+    SingleSeriesExpression expression = new SingleSeriesExpression(new Path(processorName,
+        measurements[5]), null);
+    QueryContext context = new QueryContext();
+    QueryDataSource dataSource = FileNodeManager.getInstance().query(expression, context);
+    Iterator<TimeValuePair> timeValuePairs =
+        dataSource.getOverflowSeriesDataSource().getReadableMemChunk().getIterator();
+    int count = 0;
+    while (timeValuePairs.hasNext()) {
+      timeValuePairs.next();
+      count++;
+    }
+    assertEquals(50, count);
+  }
+
+  @Test
+  public void testDeleteInOverflowFile() throws FileNodeManagerException, IOException {
+    // insert into BufferWrite
+    for (int i = 101; i <= 200; i++) {
+      TSRecord record = new TSRecord(i, processorName);
+      for (int j = 0; j < 10; j++) {
+        record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+      }
+      FileNodeManager.getInstance().insert(record, false);
+    }
+    FileNodeManager.getInstance().closeAll();
+
+    // insert into Overflow
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, processorName);
+      for (int j = 0; j < 10; j++) {
+        record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+      }
+      FileNodeManager.getInstance().insert(record, false);
+    }
+    FileNodeManager.getInstance().closeAll();
+
+    FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
+    FileNodeManager.getInstance().delete(processorName, measurements[4], 40);
+    FileNodeManager.getInstance().delete(processorName, measurements[3], 30);
+
+    Modification[] realModifications = new Modification[]{
+        new Deletion(processorName + "." + measurements[5], 103, 50),
+        new Deletion(processorName + "." + measurements[4], 104, 40),
+        new Deletion(processorName + "." + measurements[3], 105, 30),
+    };
+
+    String fileNodePath = IoTDBDescriptor.getInstance().getConfig().overflowDataDir + File.separator
+        + processorName + File.separator + "0" + File.separator;
+    File fileNodeDir = new File(fileNodePath);
+    File[] modFiles = fileNodeDir.listFiles((dir, name)
+        -> name.endsWith(ModificationFile.FILE_SUFFIX));
+    assertEquals(modFiles.length, 1);
+
+    LocalTextModificationAccessor accessor =
+        new LocalTextModificationAccessor(modFiles[0].getPath());
+    Collection<Modification> modifications = accessor.read();
+    assertEquals(modifications.size(), 3);
+    int i = 0;
+    for (Modification modification : modifications) {
+      assertTrue(modification.equals(realModifications[i++]));
+    }
+  }
+}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java
new file mode 100644
index 0000000..ded02f8
--- /dev/null
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/DeletionQueryTest.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.modification;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.engine.filenode.FileNodeManager;
+import org.apache.iotdb.db.engine.memcontrol.BasicMemController.UsageLevel;
+import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.exception.MetadataArgsErrorException;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.query.executor.EngineQueryRouter;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DoubleDataPoint;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DeletionQueryTest {
+
+  private String processorName = "root.test";
+
+  private static String[] measurements = new String[10];
+  private String dataType = TSDataType.DOUBLE.toString();
+  private String encoding = TSEncoding.PLAIN.toString();
+  private String[] args = new String[0];
+  private EngineQueryRouter router = new EngineQueryRouter();
+
+  static {
+    for (int i = 0; i < 10; i++) {
+      measurements[i] = "m" + i;
+    }
+  }
+
+  @Before
+  public void setup() throws MetadataArgsErrorException,
+      PathErrorException, IOException, FileNodeManagerException {
+    MManager.getInstance().setStorageLevelToMTree(processorName);
+    for (int i = 0; i < 10; i++) {
+      MManager.getInstance().addPathToMTree(processorName + "." + measurements[i], dataType,
+          encoding, args);
+      FileNodeManager.getInstance()
+          .addTimeSeries(new Path(processorName, measurements[i]), dataType,
+              encoding);
+    }
+  }
+
+  @After
+  public void teardown() throws IOException, FileNodeManagerException {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void testDeleteInBufferWriteCache() throws
+      FileNodeManagerException, IOException {
+
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, processorName);
+      for (int j = 0; j < 10; j++) {
+        record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+      }
+      FileNodeManager.getInstance().insert(record, false);
+    }
+
+    FileNodeManager.getInstance().delete(processorName, measurements[3], 50);
+    FileNodeManager.getInstance().delete(processorName, measurements[4], 50);
+    FileNodeManager.getInstance().delete(processorName, measurements[5], 30);
+    FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
+
+    List<Path> pathList = new ArrayList<>();
+    pathList.add(new Path(processorName, measurements[3]));
+    pathList.add(new Path(processorName, measurements[4]));
+    pathList.add(new Path(processorName, measurements[5]));
+
+    QueryExpression queryExpression = QueryExpression.create(pathList, null);
+    QueryDataSet dataSet = router.query(queryExpression);
+
+    int count = 0;
+    while (dataSet.hasNext()) {
+      dataSet.next();
+      count++;
+    }
+    assertEquals(50, count);
+  }
+
+  @Test
+  public void testDeleteInBufferWriteFile() throws FileNodeManagerException, IOException {
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, processorName);
+      for (int j = 0; j < 10; j++) {
+        record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+      }
+      FileNodeManager.getInstance().insert(record, false);
+    }
+    FileNodeManager.getInstance().closeAll();
+
+    FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
+    FileNodeManager.getInstance().delete(processorName, measurements[4], 40);
+    FileNodeManager.getInstance().delete(processorName, measurements[3], 30);
+
+    List<Path> pathList = new ArrayList<>();
+    pathList.add(new Path(processorName, measurements[3]));
+    pathList.add(new Path(processorName, measurements[4]));
+    pathList.add(new Path(processorName, measurements[5]));
+
+    QueryExpression queryExpression = QueryExpression.create(pathList, null);
+    QueryDataSet dataSet = router.query(queryExpression);
+
+    int count = 0;
+    while (dataSet.hasNext()) {
+      dataSet.next();
+      count++;
+    }
+    assertEquals(70, count);
+  }
+
+  @Test
+  public void testDeleteInOverflowCache() throws FileNodeManagerException, IOException {
+    // insert into BufferWrite
+    for (int i = 101; i <= 200; i++) {
+      TSRecord record = new TSRecord(i, processorName);
+      for (int j = 0; j < 10; j++) {
+        record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+      }
+      FileNodeManager.getInstance().insert(record, false);
+    }
+    FileNodeManager.getInstance().closeAll();
+
+    // insert into Overflow
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, processorName);
+      for (int j = 0; j < 10; j++) {
+        record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+      }
+      FileNodeManager.getInstance().insert(record, false);
+    }
+
+    FileNodeManager.getInstance().delete(processorName, measurements[3], 50);
+    FileNodeManager.getInstance().delete(processorName, measurements[4], 50);
+    FileNodeManager.getInstance().delete(processorName, measurements[5], 30);
+    FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
+
+    List<Path> pathList = new ArrayList<>();
+    pathList.add(new Path(processorName, measurements[3]));
+    pathList.add(new Path(processorName, measurements[4]));
+    pathList.add(new Path(processorName, measurements[5]));
+
+    QueryExpression queryExpression = QueryExpression.create(pathList, null);
+    QueryDataSet dataSet = router.query(queryExpression);
+
+    int count = 0;
+    while (dataSet.hasNext()) {
+      dataSet.next();
+      count++;
+    }
+    assertEquals(150, count);
+  }
+
+  @Test
+  public void testDeleteInOverflowFile() throws FileNodeManagerException, IOException {
+    // insert into BufferWrite
+    for (int i = 101; i <= 200; i++) {
+      TSRecord record = new TSRecord(i, processorName);
+      for (int j = 0; j < 10; j++) {
+        record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+      }
+      FileNodeManager.getInstance().insert(record, false);
+    }
+    FileNodeManager.getInstance().closeAll();
+
+    // insert into Overflow
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, processorName);
+      for (int j = 0; j < 10; j++) {
+        record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+      }
+      FileNodeManager.getInstance().insert(record, false);
+    }
+    FileNodeManager.getInstance().closeAll();
+
+    FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
+    FileNodeManager.getInstance().delete(processorName, measurements[4], 40);
+    FileNodeManager.getInstance().delete(processorName, measurements[3], 30);
+
+    List<Path> pathList = new ArrayList<>();
+    pathList.add(new Path(processorName, measurements[3]));
+    pathList.add(new Path(processorName, measurements[4]));
+    pathList.add(new Path(processorName, measurements[5]));
+
+    QueryExpression queryExpression = QueryExpression.create(pathList, null);
+    QueryDataSet dataSet = router.query(queryExpression);
+
+    int count = 0;
+    while (dataSet.hasNext()) {
+      dataSet.next();
+      count++;
+    }
+    assertEquals(170, count);
+  }
+
+  @Test
+  public void testSuccessiveDeletion()
+      throws FileNodeManagerException, IOException, InterruptedException {
+    for (int i = 1; i <= 100; i++) {
+      TSRecord record = new TSRecord(i, processorName);
+      for (int j = 0; j < 10; j++) {
+        record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+      }
+      FileNodeManager.getInstance().insert(record, false);
+    }
+
+    FileNodeManager.getInstance().delete(processorName, measurements[3], 50);
+    FileNodeManager.getInstance().delete(processorName, measurements[4], 50);
+    FileNodeManager.getInstance().delete(processorName, measurements[5], 30);
+    FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
+
+    FileNodeManager.getInstance().forceFlush(UsageLevel.DANGEROUS);
+
+    for (int i = 101; i <= 200; i++) {
+      TSRecord record = new TSRecord(i, processorName);
+      for (int j = 0; j < 10; j++) {
+        record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+      }
+      FileNodeManager.getInstance().insert(record, false);
+    }
+
+    FileNodeManager.getInstance().delete(processorName, measurements[3], 250);
+    FileNodeManager.getInstance().delete(processorName, measurements[4], 250);
+    FileNodeManager.getInstance().delete(processorName, measurements[5], 230);
+    FileNodeManager.getInstance().delete(processorName, measurements[5], 250);
+
+    FileNodeManager.getInstance().forceFlush(UsageLevel.DANGEROUS);
+
+    for (int i = 201; i <= 300; i++) {
+      TSRecord record = new TSRecord(i, processorName);
+      for (int j = 0; j < 10; j++) {
+        record.addTuple(new DoubleDataPoint(measurements[j], i * 1.0));
+      }
+      FileNodeManager.getInstance().insert(record, false);
+    }
+
+    FileNodeManager.getInstance().delete(processorName, measurements[3], 50);
+    FileNodeManager.getInstance().delete(processorName, measurements[4], 50);
+    FileNodeManager.getInstance().delete(processorName, measurements[5], 30);
+    FileNodeManager.getInstance().delete(processorName, measurements[5], 50);
+
+    FileNodeManager.getInstance().forceFlush(UsageLevel.DANGEROUS);
+    Thread.sleep(3000);
+    FileNodeManager.getInstance().closeAll();
+
+    List<Path> pathList = new ArrayList<>();
+    pathList.add(new Path(processorName, measurements[3]));
+    pathList.add(new Path(processorName, measurements[4]));
+    pathList.add(new Path(processorName, measurements[5]));
+
+    QueryExpression queryExpression = QueryExpression.create(pathList, null);
+    QueryDataSet dataSet = router.query(queryExpression);
+
+    int count = 0;
+    while (dataSet.hasNext()) {
+      dataSet.next();
+      count++;
+    }
+    assertEquals(100, count);
+  }
+}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java
new file mode 100644
index 0000000..b9a85e5
--- /dev/null
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.modification;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.junit.Test;
+
+public class ModificationFileTest {
+
+  @Test
+  public void readMyWrite() {
+    String tempFileName = "mod.temp";
+    Modification[] modifications = new Modification[]{
+        new Deletion("p1", 1, 1),
+        new Deletion("p2", 2, 2),
+        new Deletion("p3", 3, 3),
+        new Deletion("p4", 4, 4),
+    };
+    try {
+      ModificationFile mFile = new ModificationFile(tempFileName);
+      for (int i = 0; i < 2; i++) {
+        mFile.write(modifications[i]);
+      }
+      List<Modification> modificationList = (List<Modification>) mFile.getModifications();
+      for (int i = 0; i < 2; i++) {
+        assertEquals(modifications[i], modificationList.get(i));
+      }
+
+      for (int i = 2; i < 4; i++) {
+        mFile.write(modifications[i]);
+      }
+      modificationList = (List<Modification>) mFile.getModifications();
+      for (int i = 0; i < 4; i++) {
+        assertEquals(modifications[i], modificationList.get(i));
+      }
+      mFile.close();
+    } catch (IOException e) {
+      fail(e.getMessage());
+    } finally {
+      new File(tempFileName).delete();
+    }
+  }
+
+  @Test
+  public void testAbort() {
+    String tempFileName = "mod.temp";
+    Modification[] modifications = new Modification[]{
+        new Deletion("p1", 1, 1),
+        new Deletion("p2", 2, 2),
+        new Deletion("p3", 3, 3),
+        new Deletion("p4", 4, 4),
+    };
+    try {
+      ModificationFile mFile = new ModificationFile(tempFileName);
+      for (int i = 0; i < 2; i++) {
+        mFile.write(modifications[i]);
+      }
+      List<Modification> modificationList = (List<Modification>) mFile.getModifications();
+      for (int i = 0; i < 2; i++) {
+        assertEquals(modifications[i], modificationList.get(i));
+      }
+
+      for (int i = 2; i < 4; i++) {
+        mFile.write(modifications[i]);
+      }
+      modificationList = (List<Modification>) mFile.getModifications();
+      mFile.abort();
+
+      for (int i = 0; i < 3; i++) {
+        assertEquals(modifications[i], modificationList.get(i));
+      }
+      mFile.close();
+    } catch (IOException e) {
+      fail(e.getMessage());
+    } finally {
+      new File(tempFileName).delete();
+    }
+  }
+}
\ No newline at end of file
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java
new file mode 100644
index 0000000..7121fa3
--- /dev/null
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessorTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.modification.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.junit.Test;
+
+public class LocalTextModificationAccessorTest {
+
+  @Test
+  public void readMyWrite() {
+    String tempFileName = "mod.temp";
+    Modification[] modifications = new Modification[]{
+        new Deletion("p1", 1, 1),
+        new Deletion("p2", 2, 2),
+        new Deletion("p3", 3, 3),
+        new Deletion("p4", 4, 4),
+    };
+    try {
+      LocalTextModificationAccessor accessor = new LocalTextModificationAccessor(tempFileName);
+      for (int i = 0; i < 2; i++) {
+        accessor.write(modifications[i]);
+      }
+      List<Modification> modificationList = (List<Modification>) accessor.read();
+      for (int i = 0; i < 2; i++) {
+        assertEquals(modifications[i], modificationList.get(i));
+      }
+
+      for (int i = 2; i < 4; i++) {
+        accessor.write(modifications[i]);
+      }
+      modificationList = (List<Modification>) accessor.read();
+      for (int i = 0; i < 4; i++) {
+        assertEquals(modifications[i], modificationList.get(i));
+      }
+      accessor.close();
+    } catch (IOException e) {
+      fail(e.getMessage());
+    } finally {
+      new File(tempFileName).delete();
+    }
+  }
+
+  @Test
+  public void readNull() throws IOException {
+    String tempFileName = "mod.temp";
+    LocalTextModificationAccessor accessor;
+    accessor = new LocalTextModificationAccessor(tempFileName);
+    new File(tempFileName).delete();
+    Collection<Modification> modifications = accessor.read();
+    assertEquals(new ArrayList<>(), modifications);
+  }
+}
\ No newline at end of file
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorBenchmark.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorBenchmark.java
index 3767f0a..db95d1d 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorBenchmark.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorBenchmark.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.bufferwrite.Action;
 import org.apache.iotdb.db.engine.bufferwrite.ActionException;
 import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
+import org.apache.iotdb.db.engine.version.SysTimeVersionController;
 import org.apache.iotdb.db.exception.OverflowProcessorException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -92,7 +93,7 @@ public class OverflowProcessorBenchmark {
       }
     });
     OverflowProcessor overflowProcessor = new OverflowProcessor("Overflow_bench", parameters,
-        fileSchema);
+        fileSchema, SysTimeVersionController.INSTANCE);
     long startTime = System.currentTimeMillis();
     for (int i = 0; i < numOfPoint; i++) {
       for (int j = 0; j < numOfDevice; j++) {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
index 3a3b3b6..4c4ee05 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessorTest.java
@@ -33,7 +33,9 @@ import org.apache.iotdb.db.engine.bufferwrite.ActionException;
 import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants;
 import org.apache.iotdb.db.engine.querycontext.MergeSeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.OverflowSeriesDataSource;
+import org.apache.iotdb.db.engine.version.SysTimeVersionController;
 import org.apache.iotdb.db.exception.OverflowProcessorException;
+import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.utils.TimeValuePair;
 import org.junit.After;
@@ -77,13 +79,16 @@ public class OverflowProcessorTest {
   @Test
   public void testInsertUpdate()
       throws IOException, OverflowProcessorException, InterruptedException {
-    processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema());
-    assertEquals(true, new File(PathUtils.getOverflowWriteDir(processorName), "0").exists());
+    processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema(),
+            SysTimeVersionController.INSTANCE);
+    assertEquals(true, new File(PathUtils.getOverflowWriteDir(processorName),
+            "0").exists());
     assertEquals(false, processor.isFlush());
     assertEquals(false, processor.isMerge());
+    QueryContext context = new QueryContext();
     // write update data
     OverflowSeriesDataSource overflowSeriesDataSource = processor.query(OverflowTestUtils.deviceId1,
-        OverflowTestUtils.measurementId1, OverflowTestUtils.dataType1);
+        OverflowTestUtils.measurementId1, OverflowTestUtils.dataType1, context);
     assertEquals(OverflowTestUtils.dataType1, overflowSeriesDataSource.getDataType());
     Assert.assertEquals(true, overflowSeriesDataSource.getReadableMemChunk().isEmpty());
     assertEquals(1, overflowSeriesDataSource.getOverflowInsertFileList().size());
@@ -98,7 +103,7 @@ public class OverflowProcessorTest {
     assertEquals(false, processor.isFlush());
     overflowSeriesDataSource = processor
         .query(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
-                OverflowTestUtils.dataType1);
+            OverflowTestUtils.dataType1, context);
     assertEquals(OverflowTestUtils.dataType1, overflowSeriesDataSource.getDataType());
     Assert.assertEquals(false, overflowSeriesDataSource.getReadableMemChunk().isEmpty());
     assertEquals(1, overflowSeriesDataSource.getOverflowInsertFileList().size());
@@ -113,7 +118,7 @@ public class OverflowProcessorTest {
     processor.close();
     overflowSeriesDataSource = processor
         .query(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
-                OverflowTestUtils.dataType1);
+            OverflowTestUtils.dataType1, context);
     Assert.assertEquals(true, overflowSeriesDataSource.getReadableMemChunk().isEmpty());
     assertEquals(1, overflowSeriesDataSource.getOverflowInsertFileList().size());
     assertEquals(1,
@@ -121,7 +126,7 @@ public class OverflowProcessorTest {
     processor.switchWorkToMerge();
     overflowSeriesDataSource = processor
         .query(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
-                OverflowTestUtils.dataType1);
+            OverflowTestUtils.dataType1, context);
     assertEquals(2, overflowSeriesDataSource.getOverflowInsertFileList().size());
     assertEquals(1,
         overflowSeriesDataSource.getOverflowInsertFileList().get(0).getChunkMetaDataList().size());
@@ -130,25 +135,27 @@ public class OverflowProcessorTest {
     assertEquals(true, processor.isMerge());
     assertEquals(false, processor.canBeClosed());
     MergeSeriesDataSource mergeSeriesDataSource = processor.queryMerge(OverflowTestUtils.deviceId1,
-        OverflowTestUtils.measurementId1, OverflowTestUtils.dataType1);
+        OverflowTestUtils.measurementId1, OverflowTestUtils.dataType1, context);
     assertEquals(1, mergeSeriesDataSource.getInsertFile().getChunkMetaDataList().size());
     processor.switchMergeToWork();
     overflowSeriesDataSource = processor
         .query(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
-                OverflowTestUtils.dataType1);
+            OverflowTestUtils.dataType1, context);
     processor.close();
     processor.clear();
   }
 
   @Test
   public void testWriteMemoryAndQuery() throws IOException, OverflowProcessorException {
-    processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema());
+    processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema(),
+            SysTimeVersionController.INSTANCE);
     OverflowTestUtils.produceInsertData(processor);
     processor.close();
+    QueryContext context = new QueryContext();
     // test query
     OverflowSeriesDataSource overflowSeriesDataSource = processor.query(OverflowTestUtils.deviceId1,
-        OverflowTestUtils.measurementId1, OverflowTestUtils.dataType2);
-    Assert.assertEquals(true, overflowSeriesDataSource.getReadableMemChunk().isEmpty());
+        OverflowTestUtils.measurementId1, OverflowTestUtils.dataType2, context);
+    Assert.assertTrue(overflowSeriesDataSource.getReadableMemChunk().isEmpty());
     assertEquals(0,
         overflowSeriesDataSource.getOverflowInsertFileList().get(0).getChunkMetaDataList().size());
     processor.clear();
@@ -156,37 +163,41 @@ public class OverflowProcessorTest {
 
   @Test
   public void testFlushAndQuery() throws IOException, OverflowProcessorException {
-    processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema());
+    processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema(),
+            SysTimeVersionController.INSTANCE);
     processor.flush();
     // waiting for the end of flush.
     try {
       TimeUnit.SECONDS.sleep(1);
     } catch (InterruptedException e) {
     }
+    QueryContext context = new QueryContext();
     processor.query(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
-            OverflowTestUtils.dataType1);
+        OverflowTestUtils.dataType1, context);
     OverflowTestUtils.produceInsertData(processor);
     processor.query(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
-            OverflowTestUtils.dataType2);
+        OverflowTestUtils.dataType2, context);
     processor.close();
     processor.clear();
   }
 
   @Test
   public void testRecovery() throws OverflowProcessorException, IOException {
-    processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema());
+    processor = new OverflowProcessor(processorName, parameters, OverflowTestUtils.getFileSchema(),
+            SysTimeVersionController.INSTANCE);
     processor.close();
     processor.switchWorkToMerge();
     assertEquals(true, processor.isMerge());
     processor.clear();
     OverflowProcessor overflowProcessor = new OverflowProcessor(processorName, parameters,
-        OverflowTestUtils.getFileSchema());
+        OverflowTestUtils.getFileSchema(), SysTimeVersionController.INSTANCE);
     // recovery query
     assertEquals(false, overflowProcessor.isMerge());
     overflowProcessor.switchWorkToMerge();
+    QueryContext context = new QueryContext();
     OverflowSeriesDataSource overflowSeriesDataSource = overflowProcessor
         .query(OverflowTestUtils.deviceId1,
-            OverflowTestUtils.measurementId1, OverflowTestUtils.dataType1);
+            OverflowTestUtils.measurementId1, OverflowTestUtils.dataType1, context);
     Assert.assertEquals(true, overflowSeriesDataSource.getReadableMemChunk().isEmpty());
     assertEquals(2, overflowSeriesDataSource.getOverflowInsertFileList().size());
     overflowProcessor.switchMergeToWork();
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowResourceTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowResourceTest.java
index 6d44781..818dd9a 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowResourceTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowResourceTest.java
@@ -24,6 +24,9 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.List;
+
+import org.apache.iotdb.db.engine.version.SysTimeVersionController;
+import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.junit.After;
@@ -45,7 +48,7 @@ public class OverflowResourceTest {
 
   @Before
   public void setUp() throws Exception {
-    work = new OverflowResource(filePath, dataPath);
+    work = new OverflowResource(filePath, dataPath, SysTimeVersionController.INSTANCE);
     insertFile = new File(new File(filePath, dataPath), insertFileName);
     updateFile = new File(new File(filePath, dataPath), updateDeleteFileName);
     positionFile = new File(new File(filePath, dataPath), positionFileName);
@@ -61,14 +64,15 @@ public class OverflowResourceTest {
   @Test
   public void testOverflowInsert() throws IOException {
     OverflowTestUtils.produceInsertData(support);
+    QueryContext context = new QueryContext();
     work.flush(OverflowTestUtils.getFileSchema(), support.getMemTabale(), "processorName");
     List<ChunkMetaData> chunkMetaDatas = work.getInsertMetadatas(OverflowTestUtils.deviceId1,
-        OverflowTestUtils.measurementId1, OverflowTestUtils.dataType2);
+        OverflowTestUtils.measurementId1, OverflowTestUtils.dataType2, context);
     assertEquals(0, chunkMetaDatas.size());
     work.appendMetadatas();
     chunkMetaDatas = work
         .getInsertMetadatas(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
-            OverflowTestUtils.dataType1);
+            OverflowTestUtils.dataType1, context);
     assertEquals(1, chunkMetaDatas.size());
     ChunkMetaData chunkMetaData = chunkMetaDatas.get(0);
     assertEquals(OverflowTestUtils.dataType1, chunkMetaData.getTsDataType());
@@ -81,10 +85,10 @@ public class OverflowResourceTest {
     fileOutputStream.write(new byte[20]);
     fileOutputStream.close();
     assertEquals(originlength + 20, insertFile.length());
-    work = new OverflowResource(filePath, dataPath);
+    work = new OverflowResource(filePath, dataPath, SysTimeVersionController.INSTANCE);
     chunkMetaDatas = work
         .getInsertMetadatas(OverflowTestUtils.deviceId1, OverflowTestUtils.measurementId1,
-            OverflowTestUtils.dataType1);
+            OverflowTestUtils.dataType1, context);
     assertEquals(1, chunkMetaDatas.size());
     chunkMetaData = chunkMetaDatas.get(0);
     assertEquals(OverflowTestUtils.dataType1, chunkMetaData.getTsDataType());
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowSupportTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowSupportTest.java
index cb7fdc8..3119ac8 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowSupportTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/overflow/io/OverflowSupportTest.java
@@ -48,7 +48,7 @@ public class OverflowSupportTest {
     support.update(deviceId1, measurementId1, 20, 30, dataType1, BytesUtils.intToBytes(20));
     // time :[2,10] [20,30] value: int [10,10] int[20,20]
     // d1 s2
-    support.delete(deviceId1, measurementId2, 10, dataType1);
+    support.delete(deviceId1, measurementId2, 10, false);
     support.update(deviceId1, measurementId2, 20, 30, dataType1, BytesUtils.intToBytes(20));
     // time: [0,-10] [20,30] value[20,20]
     // d2 s1
@@ -57,7 +57,7 @@ public class OverflowSupportTest {
     // time: [5,9] [10,40] value [10.5,10.5] [20.5,20.5]
     // d2 s2
     support.update(deviceId2, measurementId2, 2, 10, dataType2, BytesUtils.floatToBytes(5.5f));
-    support.delete(deviceId2, measurementId2, 20, dataType2);
+    support.delete(deviceId2, measurementId2, 20, false);
     // time : [0,-20]
 
   }
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SimpleFileVersionControllerTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SimpleFileVersionControllerTest.java
new file mode 100644
index 0000000..0bc062d
--- /dev/null
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SimpleFileVersionControllerTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.version;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.iotdb.db.engine.version.SimpleFileVersionController.SAVE_INTERVAL;
+import static org.junit.Assert.assertEquals;
+
+public class SimpleFileVersionControllerTest {
+  @Test
+  public void test() throws IOException {
+    String tempFilePath = "version.tmp";
+
+    try {
+      if (!new File(tempFilePath).mkdir()) {
+        Assert.fail("can not create version.tmp folder");
+      }
+      VersionController versionController = new SimpleFileVersionController(tempFilePath);
+      assertEquals(SAVE_INTERVAL, versionController.currVersion());
+      for (int i = 0; i < 150; i++) {
+        versionController.nextVersion();
+      }
+      assertEquals(SAVE_INTERVAL + 150, versionController.currVersion());
+      versionController = new SimpleFileVersionController(tempFilePath);
+      assertEquals(SAVE_INTERVAL + 200, versionController.currVersion());
+    } finally {
+      FileUtils.deleteDirectory(new File(tempFilePath));
+    }
+  }
+}
\ No newline at end of file
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SysTimeVersionControllerTest.java
similarity index 62%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
copy to iotdb/src/test/java/org/apache/iotdb/db/engine/version/SysTimeVersionControllerTest.java
index 7c63be6..4e063ad 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/version/SysTimeVersionControllerTest.java
@@ -16,29 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.tsfile.read.common;
 
-import java.nio.ByteBuffer;
-import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+package org.apache.iotdb.db.engine.version;
 
-/**
- * used in query.
- */
-public class Chunk {
+import org.junit.Test;
 
-  private ChunkHeader chunkHeader;
-  private ByteBuffer chunkData;
+import static org.junit.Assert.assertTrue;
 
-  public Chunk(ChunkHeader header, ByteBuffer buffer) {
-    this.chunkHeader = header;
-    this.chunkData = buffer;
-  }
-
-  public ChunkHeader getHeader() {
-    return chunkHeader;
-  }
+public class SysTimeVersionControllerTest {
 
-  public ByteBuffer getData() {
-    return chunkData;
+  @Test
+  public void test() {
+    VersionController versionController = SysTimeVersionController.INSTANCE;
+    long diff = versionController.currVersion() - System.currentTimeMillis();
+    assertTrue(diff >= -1 && diff <= 1);
+    diff = versionController.nextVersion() - System.currentTimeMillis();
+    assertTrue(diff >= -1 && diff <= 1);
   }
-}
+}
\ No newline at end of file
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAuthorizationIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAuthorizationIT.java
index 37d7327..ecb3cbb 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAuthorizationIT.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBAuthorizationIT.java
@@ -39,7 +39,7 @@ import org.junit.Test;
  */
 public class IoTDBAuthorizationIT {
 
-  private IoTDB deamon;
+  private IoTDB daemon;
 
   public static void main(String[] args) throws Exception {
     for (int i = 0; i < 10; i++) {
@@ -54,14 +54,14 @@ public class IoTDBAuthorizationIT {
   public void setUp() throws Exception {
     EnvironmentUtils.closeStatMonitor();
     EnvironmentUtils.closeMemControl();
-    deamon = IoTDB.getInstance();
-    deamon.active();
+    daemon = IoTDB.getInstance();
+    daemon.active();
     EnvironmentUtils.envSetUp();
   }
 
   @After
   public void tearDown() throws Exception {
-    deamon.stop();
+    daemon.stop();
     EnvironmentUtils.cleanEnv();
   }
 
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBCompleteIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBCompleteIT.java
index be187c2..7bf3056 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBCompleteIT.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBCompleteIT.java
@@ -39,20 +39,20 @@ import org.junit.Test;
  */
 public class IoTDBCompleteIT {
 
-  private IoTDB deamon;
+  private IoTDB daemon;
 
   @Before
   public void setUp() throws Exception {
     EnvironmentUtils.closeStatMonitor();
     EnvironmentUtils.closeMemControl();
-    deamon = IoTDB.getInstance();
-    deamon.active();
+    daemon = IoTDB.getInstance();
+    daemon.active();
     EnvironmentUtils.envSetUp();
   }
 
   @After
   public void tearDown() throws Exception {
-    deamon.stop();
+    daemon.stop();
     EnvironmentUtils.cleanEnv();
   }
 
@@ -63,6 +63,7 @@ public class IoTDBCompleteIT {
     simpleTest();
     insertTest();
     selectTest();
+    deleteTest();
   }
 
   public void simpleTest() throws ClassNotFoundException, SQLException {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBDaemonIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBDaemonIT.java
index 7bc96e4..2b81507 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBDaemonIT.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBDaemonIT.java
@@ -47,7 +47,7 @@ import org.junit.Test;
  */
 public class IoTDBDaemonIT {
 
-  private static IoTDB deamon;
+  private static IoTDB daemon;
 
   private static Connection connection;
 
@@ -115,8 +115,8 @@ public class IoTDBDaemonIT {
   public static void setUp() throws Exception {
     EnvironmentUtils.closeStatMonitor();
     EnvironmentUtils.closeMemControl();
-    deamon = IoTDB.getInstance();
-    deamon.active();
+    daemon = IoTDB.getInstance();
+    daemon.active();
     EnvironmentUtils.envSetUp();
 
     insertData();
@@ -127,7 +127,7 @@ public class IoTDBDaemonIT {
   @AfterClass
   public static void tearDown() throws Exception {
     connection.close();
-    deamon.stop();
+    daemon.stop();
     EnvironmentUtils.cleanEnv();
   }
 
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBDeletionIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBDeletionIT.java
new file mode 100644
index 0000000..f23bae8
--- /dev/null
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBDeletionIT.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.integration;
+
+import static org.junit.Assert.assertEquals;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IoTDBDeletionIT {
+  private static IoTDB daemon;
+
+  private static String[] creationSqls = new String[]{
+          "SET STORAGE GROUP TO root.vehicle.d0", "SET STORAGE GROUP TO root.vehicle.d1",
+
+          "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE",
+          "CREATE TIMESERIES root.vehicle.d0.s1 WITH DATATYPE=INT64, ENCODING=RLE",
+          "CREATE TIMESERIES root.vehicle.d0.s2 WITH DATATYPE=FLOAT, ENCODING=RLE",
+          "CREATE TIMESERIES root.vehicle.d0.s3 WITH DATATYPE=TEXT, ENCODING=PLAIN",
+          "CREATE TIMESERIES root.vehicle.d0.s4 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
+  };
+
+  private String insertTemplate = "INSERT INTO root.vehicle.d0(timestamp,s0,s1,s2,s3,s4"
+          + ") VALUES(%d,%d,%d,%f,%s,%b)";
+  private String deleteAllTemplate = "DELETE FROM root.vehicle.d0 WHERE time <= 10000";
+
+
+  @Before
+  public void setUp() throws Exception {
+    EnvironmentUtils.closeStatMonitor();
+    EnvironmentUtils.closeMemControl();
+    daemon = IoTDB.getInstance();
+    daemon.active();
+    EnvironmentUtils.envSetUp();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    prepareSeries();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    daemon.stop();
+
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void test() throws SQLException {
+    prepareData();
+    Connection connection = null;
+    try {
+      connection = DriverManager
+              .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+                      "root");
+      Statement statement = connection.createStatement();
+      statement.execute("DELETE FROM root.vehicle.d0.s0  WHERE time <= 300");
+      statement.execute("DELETE FROM root.vehicle.d0.s1,root.vehicle.d0.s2,root.vehicle.d0.s3"
+              + " WHERE time <= 350");
+      statement.execute("DELETE FROM root.vehicle.d0 WHERE time <= 150");
+
+      ResultSet set = statement.executeQuery("SELECT * FROM root.vehicle.d0");
+      int cnt = 0;
+      while (set.next()) {
+        cnt ++;
+      }
+      assertEquals(250, cnt);
+      set.close();
+
+      set = statement.executeQuery("SELECT s0 FROM root.vehicle.d0");
+      cnt = 0;
+      while (set.next()) {
+        cnt ++;
+      }
+      assertEquals(100, cnt);
+      set.close();
+
+      set = statement.executeQuery("SELECT s1,s2,s3 FROM root.vehicle.d0");
+      cnt = 0;
+      while (set.next()) {
+        cnt ++;
+      }
+      assertEquals(50, cnt);
+      set.close();
+
+      statement.close();
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+    cleanData();
+  }
+
+  @Test
+  public void testMerge() throws SQLException, InterruptedException {
+    prepareMerge();
+    Connection connection = DriverManager
+            .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+                    "root");
+    Statement statement = connection.createStatement();
+    statement.execute("merge");
+    statement.execute("DELETE FROM root.vehicle.d0 WHERE time <= 15000");
+
+    // before merge completes
+    ResultSet set = statement.executeQuery("SELECT * FROM root.vehicle.d0");
+    int cnt = 0;
+    while (set.next()) {
+      cnt ++;
+    }
+    assertEquals(5000, cnt);
+    set.close();
+
+    Thread.sleep(5000);
+    // after merge completes
+    set = statement.executeQuery("SELECT * FROM root.vehicle.d0");
+    cnt = 0;
+    while (set.next()) {
+      cnt ++;
+    }
+    assertEquals(5000, cnt);
+    set.close();
+    cleanData();
+  }
+
+  private static void prepareSeries() throws SQLException {
+    Connection connection = null;
+    try {
+      connection = DriverManager
+              .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+                      "root");
+      Statement statement = connection.createStatement();
+      for (String sql : creationSqls) {
+        statement.execute(sql);
+      }
+      statement.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+  }
+
+  private void prepareData() throws SQLException {
+    Connection connection = null;
+    try {
+      connection = DriverManager
+              .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+                      "root");
+      Statement statement = connection.createStatement();
+      // prepare BufferWrite file
+      for (int i = 201; i <= 300; i++) {
+        statement.execute(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'",
+                i % 2 == 0));
+      }
+      statement.execute("merge");
+      // prepare Unseq-File
+      for (int i = 1; i <= 100; i++) {
+        statement.execute(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'",
+                i % 2 == 0));
+      }
+      statement.execute("merge");
+      // prepare BufferWrite cache
+      for (int i = 301; i <= 400; i++) {
+        statement.execute(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'",
+                i % 2 == 0));
+      }
+      // prepare Overflow cache
+      for (int i = 101; i <= 200; i++) {
+        statement.execute(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'",
+                i % 2 == 0));
+      }
+
+      statement.close();
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+  }
+
+  private void cleanData() throws SQLException {
+    Connection connection = null;
+    try {
+      connection = DriverManager
+              .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+                      "root");
+      Statement statement = connection.createStatement();
+      statement.execute(deleteAllTemplate);
+
+      statement.close();
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+  }
+
+  public void prepareMerge() throws SQLException {
+    Connection connection = null;
+    try {
+      connection = DriverManager
+              .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
+                      "root");
+      Statement statement = connection.createStatement();
+      // prepare BufferWrite data
+      for (int i = 10001; i <= 20000; i++) {
+        statement.execute(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'",
+                i % 2 == 0));
+      }
+      // prepare Overflow data
+      for (int i = 1; i <= 10000; i++) {
+        statement.execute(String.format(insertTemplate, i, i, i, (double) i, "\'" + i + "\'",
+                i % 2 == 0));
+      }
+
+      statement.close();
+    } finally {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+  }
+}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java
index e7a8200..4e8a444 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBEngineTimeGeneratorIT.java
@@ -28,6 +28,7 @@ import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
+import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.OpenedFilePathsManager;
 import org.apache.iotdb.db.query.timegenerator.EngineTimeGenerator;
 import org.apache.iotdb.db.service.IoTDB;
@@ -47,8 +48,8 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 /**
- * Notice that, all test begins with "IoTDB" is integration test. All test which will start the IoTDB server should be
- * defined as integration test.
+ * Notice that, all test begins with "IoTDB" is integration test. All test which will start the
+ * IoTDB server should be defined as integration test.
  */
 public class IoTDBEngineTimeGeneratorIT {
 
@@ -201,7 +202,9 @@ public class IoTDBEngineTimeGeneratorIT {
     SingleSeriesExpression singleSeriesExpression = new SingleSeriesExpression(pd0s0,
         FilterFactory.and(valueGtEq, timeGt));
     OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(0);
-    EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, singleSeriesExpression);
+    QueryContext context = new QueryContext();
+    EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, singleSeriesExpression,
+        context);
 
     int cnt = 0;
     while (timeGenerator.hasNext()) {
@@ -225,7 +228,9 @@ public class IoTDBEngineTimeGeneratorIT {
 
     OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(0);
     IExpression singleSeriesExpression = new SingleSeriesExpression(pd1s0, valueGtEq);
-    EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, singleSeriesExpression);
+    QueryContext context = new QueryContext();
+    EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, singleSeriesExpression,
+        context);
 
     int cnt = 0;
     while (timeGenerator.hasNext()) {
@@ -258,7 +263,8 @@ public class IoTDBEngineTimeGeneratorIT {
         .and(singleSeriesExpression1, singleSeriesExpression2);
 
     OpenedFilePathsManager.getInstance().setJobIdForCurrentRequestThread(0);
-    EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, andExpression);
+    QueryContext context = new QueryContext();
+    EngineTimeGenerator timeGenerator = new EngineTimeGenerator(0, andExpression, context);
     int cnt = 0;
     while (timeGenerator.hasNext()) {
       long time = timeGenerator.next();
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBLargeDataIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBLargeDataIT.java
index 8b10d0c..2effc15 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBLargeDataIT.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBLargeDataIT.java
@@ -43,7 +43,7 @@ import org.junit.Test;
  */
 public class IoTDBLargeDataIT {
 
-  private static IoTDB deamon;
+  private static IoTDB daemon;
 
   private static boolean testFlag = Constant.testFlag;
   private static TSFileConfig tsFileConfig = TSFileDescriptor.getInstance().getConfig();
@@ -69,8 +69,8 @@ public class IoTDBLargeDataIT {
     tsFileConfig.pageSizeInByte = 1024 * 150;
     tsFileConfig.groupSizeInByte = 1024 * 1000;
 
-    deamon = IoTDB.getInstance();
-    deamon.active();
+    daemon = IoTDB.getInstance();
+    daemon.active();
     EnvironmentUtils.envSetUp();
 
     Thread.sleep(5000);
@@ -86,7 +86,7 @@ public class IoTDBLargeDataIT {
 
     connection.close();
 
-    deamon.stop();
+    daemon.stop();
 
     // recovery value
     tsFileConfig.maxNumberOfPointsInPage = maxNumberOfPointsInPage;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBLimitSlimitIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBLimitSlimitIT.java
index c9baccb..6153f69 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBLimitSlimitIT.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBLimitSlimitIT.java
@@ -42,9 +42,7 @@ import org.junit.Test;
  */
 public class IoTDBLimitSlimitIT {
 
-  private static IoTDB deamon;
-
-  private static boolean testFlag = Constant.testFlag;
+  private static IoTDB daemon;
 
   private static String[] insertSqls = new String[]{"SET STORAGE GROUP TO root.vehicle",
 
@@ -91,14 +89,14 @@ public class IoTDBLimitSlimitIT {
   public static void setUp() throws Exception {
     EnvironmentUtils.closeStatMonitor();
     EnvironmentUtils.closeMemControl();
-    deamon = IoTDB.getInstance();
-    deamon.active();
+    daemon = IoTDB.getInstance();
+    daemon.active();
     EnvironmentUtils.envSetUp();
   }
 
   @AfterClass
   public static void tearDown() throws Exception {
-    deamon.stop();
+    daemon.stop();
     EnvironmentUtils.cleanEnv();
   }
 
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java
index a266258..2c9c579 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBMetadataFetchIT.java
@@ -43,7 +43,7 @@ import org.junit.Test;
  */
 public class IoTDBMetadataFetchIT {
 
-  private static IoTDB deamon;
+  private static IoTDB daemon;
 
   private DatabaseMetaData databaseMetaData;
 
@@ -79,8 +79,8 @@ public class IoTDBMetadataFetchIT {
     EnvironmentUtils.closeStatMonitor();
     EnvironmentUtils.closeMemControl();
 
-    deamon = IoTDB.getInstance();
-    deamon.active();
+    daemon = IoTDB.getInstance();
+    daemon.active();
     EnvironmentUtils.envSetUp();
 
     insertSQL();
@@ -88,7 +88,7 @@ public class IoTDBMetadataFetchIT {
 
   @After
   public void tearDown() throws Exception {
-    deamon.stop();
+    daemon.stop();
     EnvironmentUtils.cleanEnv();
   }
 
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java
index c1ca654..2bb5a86 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiSeriesIT.java
@@ -43,7 +43,7 @@ import org.junit.Test;
  */
 public class IoTDBMultiSeriesIT {
 
-  private static IoTDB deamon;
+  private static IoTDB daemon;
 
   private static boolean testFlag = Constant.testFlag;
   private static TSFileConfig tsFileConfig = TSFileDescriptor.getInstance().getConfig();
@@ -69,8 +69,8 @@ public class IoTDBMultiSeriesIT {
     tsFileConfig.pageSizeInByte = 1024 * 150;
     tsFileConfig.groupSizeInByte = 1024 * 1000;
 
-    deamon = IoTDB.getInstance();
-    deamon.active();
+    daemon = IoTDB.getInstance();
+    daemon.active();
     EnvironmentUtils.envSetUp();
 
     Thread.sleep(5000);
@@ -86,7 +86,7 @@ public class IoTDBMultiSeriesIT {
 
     connection.close();
 
-    deamon.stop();
+    daemon.stop();
     // recovery value
     tsFileConfig.maxNumberOfPointsInPage = maxNumberOfPointsInPage;
     tsFileConfig.pageSizeInByte = pageSizeInByte;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
index e3bd59c..eb8da77 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
@@ -51,7 +51,7 @@ import org.junit.Test;
  */
 public class IoTDBSeriesReaderIT {
 
-  private static IoTDB deamon;
+  private static IoTDB daemon;
 
   private static TSFileConfig tsFileConfig = TSFileDescriptor.getInstance().getConfig();
   private static int maxNumberOfPointsInPage;
@@ -76,8 +76,8 @@ public class IoTDBSeriesReaderIT {
     tsFileConfig.pageSizeInByte = 1024 * 1024 * 150;
     tsFileConfig.groupSizeInByte = 1024 * 1024 * 1000;
 
-    deamon = IoTDB.getInstance();
-    deamon.active();
+    daemon = IoTDB.getInstance();
+    daemon.active();
     EnvironmentUtils.envSetUp();
 
     Thread.sleep(5000);
@@ -90,7 +90,7 @@ public class IoTDBSeriesReaderIT {
   @AfterClass
   public static void tearDown() throws Exception {
     connection.close();
-    deamon.stop();
+    daemon.stop();
     // recovery value
     tsFileConfig.maxNumberOfPointsInPage = maxNumberOfPointsInPage;
     tsFileConfig.pageSizeInByte = pageSizeInByte;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBTimeZoneIT.java b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBTimeZoneIT.java
index 995dc0f..76d6734 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBTimeZoneIT.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/integration/IoTDBTimeZoneIT.java
@@ -52,22 +52,18 @@ public class IoTDBTimeZoneIT {
 
   @Before
   public void setUp() throws Exception {
-    // if (testFlag) {
     EnvironmentUtils.closeStatMonitor();
     EnvironmentUtils.closeMemControl();
     deamon = IoTDB.getInstance();
     deamon.active();
     EnvironmentUtils.envSetUp();
     createTimeseries();
-    // }
   }
 
   @After
   public void tearDown() throws Exception {
-    // if (testFlag) {
     deamon.stop();
     EnvironmentUtils.cleanEnv();
-    // }
   }
 
   /**
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java
index a50b48e..650f24e 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java
@@ -298,7 +298,7 @@ public class RecoverTest {
     public int currPos = 0;
 
     @Override
-    public void replay(PhysicalPlan plan) throws ProcessorException {
+    public void replay(PhysicalPlan plan, boolean isOverflow) throws ProcessorException {
       if (currPos >= plansToCheck.size()) {
         throw new ProcessorException("More plans recovered than expected");
       }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaData.java
index 880005c..164b42f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaData.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 /**
@@ -52,6 +53,8 @@ public class ChunkGroupMetaData {
     chunkMetaDataList = new ArrayList<>();
   }
 
+  private long version;
+
   /**
    * constructor of ChunkGroupMetaData.
    *
@@ -81,10 +84,11 @@ public class ChunkGroupMetaData {
     ChunkGroupMetaData chunkGroupMetaData = new ChunkGroupMetaData();
 
     chunkGroupMetaData.deviceID = ReadWriteIOUtils.readString(inputStream);
+    chunkGroupMetaData.version = ReadWriteIOUtils.readLong(inputStream);
 
     int size = ReadWriteIOUtils.readInt(inputStream);
     chunkGroupMetaData.serializedSize =
-        Integer.BYTES + chunkGroupMetaData.deviceID.length() + Integer.BYTES;
+        Integer.BYTES + Long.BYTES + chunkGroupMetaData.deviceID.length() + Integer.BYTES;
 
     List<ChunkMetaData> chunkMetaDataList = new ArrayList<>();
 
@@ -107,12 +111,13 @@ public class ChunkGroupMetaData {
   public static ChunkGroupMetaData deserializeFrom(ByteBuffer buffer) {
     ChunkGroupMetaData chunkGroupMetaData = new ChunkGroupMetaData();
 
-    chunkGroupMetaData.deviceID = ReadWriteIOUtils.readString(buffer);
+    chunkGroupMetaData.deviceID = (ReadWriteIOUtils.readString(buffer));
+    chunkGroupMetaData.version = ReadWriteIOUtils.readLong(buffer);
 
     int size = ReadWriteIOUtils.readInt(buffer);
 
     chunkGroupMetaData.serializedSize =
-        Integer.BYTES + chunkGroupMetaData.deviceID.length() + Integer.BYTES;
+        Integer.BYTES + Long.BYTES + chunkGroupMetaData.deviceID.length() + Integer.BYTES;
 
     List<ChunkMetaData> chunkMetaDataList = new ArrayList<>();
     for (int i = 0; i < size; i++) {
@@ -130,7 +135,8 @@ public class ChunkGroupMetaData {
   }
 
   void reCalculateSerializedSize() {
-    serializedSize = Integer.BYTES + deviceID.length() + Integer.BYTES; // size of chunkMetaDataList
+    serializedSize = Integer.BYTES + Long.BYTES +
+            deviceID.length() + Integer.BYTES; // size of chunkMetaDataList
     for (ChunkMetaData chunk : chunkMetaDataList) {
       serializedSize += chunk.getSerializedSize();
     }
@@ -172,6 +178,7 @@ public class ChunkGroupMetaData {
   public int serializeTo(OutputStream outputStream) throws IOException {
     int byteLen = 0;
     byteLen += ReadWriteIOUtils.write(deviceID, outputStream);
+    byteLen += ReadWriteIOUtils.write(version, outputStream);
 
     byteLen += ReadWriteIOUtils.write(chunkMetaDataList.size(), outputStream);
     for (ChunkMetaData chunkMetaData : chunkMetaDataList) {
@@ -191,6 +198,7 @@ public class ChunkGroupMetaData {
     int byteLen = 0;
 
     byteLen += ReadWriteIOUtils.write(deviceID, buffer);
+    byteLen += ReadWriteIOUtils.write(version, buffer);
 
     byteLen += ReadWriteIOUtils.write(chunkMetaDataList.size(), buffer);
     for (ChunkMetaData chunkMetaData : chunkMetaDataList) {
@@ -199,4 +207,13 @@ public class ChunkGroupMetaData {
     return byteLen;
   }
 
+
+  public long getVersion() {
+    return version;
+  }
+
+  public void setVersion(long version) {
+    this.version = version;
+  }
+
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java
index 607a298..3dfa398 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetaData.java
@@ -47,10 +47,16 @@ public class ChunkMetaData {
   private TSDataType tsDataType;
 
   /**
-   * The maximum time of the tombstones that take effect on this chunk. Only data with larger.
-   * timestamps than this should be exposed to user.
+   * version is used to define the order of operations(insertion, deletion, update).
+   * version is set according to its belonging ChunkGroup only when being queried, so it is not
+   * persisted.
    */
-  private long maxTombstoneTime;
+  private long version;
+
+  /**
+   * All data with timestamp <= deletedAt are considered deleted.
+   */
+  private long deletedAt = -1;
 
   private TsDigest valuesStatistics;
 
@@ -243,12 +249,19 @@ public class ChunkMetaData {
     return byteLen;
   }
 
-  public long getMaxTombstoneTime() {
-    return maxTombstoneTime;
+  public long getVersion() {
+    return version;
   }
 
-  public void setMaxTombstoneTime(long maxTombstoneTime) {
-    this.maxTombstoneTime = maxTombstoneTime;
+  public void setVersion(long version) {
+    this.version = version;
   }
 
+  public long getDeletedAt() {
+    return deletedAt;
+  }
+
+  public void setDeletedAt(long deletedAt) {
+    this.deletedAt = deletedAt;
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
index 7c63be6..f4ad125 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/Chunk.java
@@ -28,6 +28,7 @@ public class Chunk {
 
   private ChunkHeader chunkHeader;
   private ByteBuffer chunkData;
+  private long deletedAt = -1;
 
   public Chunk(ChunkHeader header, ByteBuffer buffer) {
     this.chunkHeader = header;
@@ -41,4 +42,12 @@ public class Chunk {
   public ByteBuffer getData() {
     return chunkData;
   }
+
+  public long getDeletedAt() {
+    return deletedAt;
+  }
+
+  public void setDeletedAt(long deletedAt) {
+    this.deletedAt = deletedAt;
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/ChunkLoaderImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/ChunkLoaderImpl.java
index 2d7624c..4907634 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/ChunkLoaderImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/ChunkLoaderImpl.java
@@ -59,7 +59,9 @@ public class ChunkLoaderImpl implements ChunkLoader {
   @Override
   public Chunk getChunk(ChunkMetaData chunkMetaData) throws IOException {
     Chunk chunk = chunkCache.get(chunkMetaData);
-    return new Chunk(chunk.getHeader(), chunk.getData().duplicate());
+    Chunk chunkRet = new Chunk(chunk.getHeader(), chunk.getData().duplicate());
+    chunkRet.setDeletedAt(chunkMetaData.getDeletedAt());
+    return chunkRet;
   }
 
   @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java
index 839d116..c32d2a8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java
@@ -164,6 +164,7 @@ public class MetadataQuerierByFileImpl implements MetadataQuerier {
           .getChunkMetaDataList();
       for (ChunkMetaData chunkMetaData : chunkMetaDataListInOneChunkGroup) {
         if (path.getMeasurement().equals(chunkMetaData.getMeasurementUid())) {
+          chunkMetaData.setVersion(chunkGroupMetaData.getVersion());
           chunkMetaDataList.add(chunkMetaData);
         }
       }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
index b46cb87..125e131 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReader.java
@@ -48,7 +48,10 @@ public abstract class ChunkReader {
 
   private BatchData data;
 
-  private long maxTombstoneTime;
+  /**
+   * Data whose timestamp <= deletedAt should be considered deleted(not be returned).
+   */
+  protected long deletedAt;
 
   public ChunkReader(Chunk chunk) {
     this(chunk, null);
@@ -63,6 +66,7 @@ public abstract class ChunkReader {
   public ChunkReader(Chunk chunk, Filter filter) {
     this.filter = filter;
     this.chunkDataBuffer = chunk.getData();
+    this.deletedAt = chunk.getDeletedAt();
     chunkHeader = chunk.getHeader();
     this.unCompressor = IUnCompressor.getUnCompressor(chunkHeader.getCompressionType());
     valueDecoder = Decoder
@@ -127,20 +131,14 @@ public abstract class ChunkReader {
 
     chunkDataBuffer.get(compressedPageBody, 0, compressedPageBodyLength);
     valueDecoder.reset();
-    return new PageReader(ByteBuffer.wrap(unCompressor.uncompress(compressedPageBody)),
+    PageReader reader = new PageReader(ByteBuffer.wrap(unCompressor.uncompress(compressedPageBody)),
         chunkHeader.getDataType(),
         valueDecoder, timeDecoder, filter);
+    reader.setDeletedAt(deletedAt);
+    return reader;
   }
 
   public void close() {
   }
 
-  public long getMaxTombstoneTime() {
-    return this.maxTombstoneTime;
-  }
-
-  public void setMaxTombstoneTime(long maxTombStoneTime) {
-    this.maxTombstoneTime = maxTombStoneTime;
-  }
-
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java
index 0c33e0a..382a6e9 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderByTimestamp.java
@@ -33,7 +33,7 @@ public class ChunkReaderByTimestamp extends ChunkReader {
   public boolean pageSatisfied(PageHeader pageHeader) {
     long maxTimestamp = pageHeader.getMaxTimestamp();
     // if maxTimestamp > currentTimestamp, this page should NOT be skipped
-    return maxTimestamp >= currentTimestamp && maxTimestamp >= getMaxTombstoneTime();
+    return maxTimestamp >= currentTimestamp && maxTimestamp > deletedAt;
   }
 
   public void setCurrentTimestamp(long currentTimestamp) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderWithFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderWithFilter.java
index 70edb75..745b98a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderWithFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderWithFilter.java
@@ -34,7 +34,7 @@ public class ChunkReaderWithFilter extends ChunkReader {
 
   @Override
   public boolean pageSatisfied(PageHeader pageHeader) {
-    if (pageHeader.getMaxTimestamp() < getMaxTombstoneTime()) {
+    if (pageHeader.getMaxTimestamp() < deletedAt) {
       return false;
     }
     DigestForFilter digest = new DigestForFilter(pageHeader.getMinTimestamp(),
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderWithoutFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderWithoutFilter.java
index 0f5a18d..9d9bde1 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderWithoutFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/ChunkReaderWithoutFilter.java
@@ -29,7 +29,7 @@ public class ChunkReaderWithoutFilter extends ChunkReader {
 
   @Override
   public boolean pageSatisfied(PageHeader pageHeader) {
-    return pageHeader.getMaxTimestamp() > getMaxTombstoneTime();
+    return pageHeader.getMaxTimestamp() > deletedAt;
   }
 
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index 16497d7..a686d99 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -48,6 +48,8 @@ public class PageReader {
 
   private Filter filter = null;
 
+  private long deletedAt = -1;
+
   public PageReader(ByteBuffer pageData, TSDataType dataType, Decoder valueDecoder,
       Decoder timeDecoder,
       Filter filter) {
@@ -107,25 +109,48 @@ public class PageReader {
     while (timeDecoder.hasNext(timeBuffer)) {
       long timestamp = timeDecoder.readLong(timeBuffer);
 
-      pageData.putTime(timestamp);
       switch (dataType) {
         case BOOLEAN:
-          pageData.putBoolean(valueDecoder.readBoolean(valueBuffer));
+          boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
+          if (timestamp > deletedAt) {
+            pageData.putTime(timestamp);
+            pageData.putBoolean(aBoolean);
+          }
           break;
         case INT32:
-          pageData.putInt(valueDecoder.readInt(valueBuffer));
+          int anInt = valueDecoder.readInt(valueBuffer);
+          if (timestamp > deletedAt) {
+            pageData.putTime(timestamp);
+            pageData.putInt(anInt);
+          }
           break;
         case INT64:
-          pageData.putLong(valueDecoder.readLong(valueBuffer));
+          long aLong = valueDecoder.readLong(valueBuffer);
+          if (timestamp > deletedAt) {
+            pageData.putTime(timestamp);
+            pageData.putLong(aLong);
+          }
           break;
         case FLOAT:
-          pageData.putFloat(valueDecoder.readFloat(valueBuffer));
+          float aFloat = valueDecoder.readFloat(valueBuffer);
+          if (timestamp > deletedAt) {
+            pageData.putTime(timestamp);
+            pageData.putFloat(aFloat);
+          }
           break;
         case DOUBLE:
-          pageData.putDouble(valueDecoder.readDouble(valueBuffer));
+          double aDouble = valueDecoder.readDouble(valueBuffer);
+          if (timestamp > deletedAt) {
+            pageData.putTime(timestamp);
+            pageData.putDouble(aDouble);
+          }
           break;
         case TEXT:
-          pageData.putBinary(valueDecoder.readBinary(valueBuffer));
+          Binary aBinary = valueDecoder.readBinary(valueBuffer);
+          if (timestamp > deletedAt) {
+            pageData.putTime(timestamp);
+            pageData.putBinary(aBinary);
+          }
           break;
         default:
           throw new UnSupportedDataTypeException(String.valueOf(dataType));
@@ -169,7 +194,7 @@ public class PageReader {
 
   private void readBoolean(BatchData pageData, long timestamp) {
     boolean aBoolean = valueDecoder.readBoolean(valueBuffer);
-    if (filter.satisfy(timestamp, aBoolean)) {
+    if (timestamp > deletedAt && filter.satisfy(timestamp, aBoolean)) {
       pageData.putTime(timestamp);
       pageData.putBoolean(aBoolean);
     }
@@ -177,7 +202,7 @@ public class PageReader {
 
   private void readInt(BatchData pageData, long timestamp) {
     int anInt = valueDecoder.readInt(valueBuffer);
-    if (filter.satisfy(timestamp, anInt)) {
+    if (timestamp > deletedAt && filter.satisfy(timestamp, anInt)) {
       pageData.putTime(timestamp);
       pageData.putInt(anInt);
     }
@@ -185,7 +210,7 @@ public class PageReader {
 
   private void readLong(BatchData pageData, long timestamp) {
     long aLong = valueDecoder.readLong(valueBuffer);
-    if (filter.satisfy(timestamp, aLong)) {
+    if (timestamp > deletedAt && filter.satisfy(timestamp, aLong)) {
       pageData.putTime(timestamp);
       pageData.putLong(aLong);
     }
@@ -193,7 +218,7 @@ public class PageReader {
 
   private void readFloat(BatchData pageData, long timestamp) {
     float aFloat = valueDecoder.readFloat(valueBuffer);
-    if (filter.satisfy(timestamp, aFloat)) {
+    if (timestamp > deletedAt && filter.satisfy(timestamp, aFloat)) {
       pageData.putTime(timestamp);
       pageData.putFloat(aFloat);
     }
@@ -201,7 +226,7 @@ public class PageReader {
 
   private void readDouble(BatchData pageData, long timestamp) {
     double aDouble = valueDecoder.readDouble(valueBuffer);
-    if (filter.satisfy(timestamp, aDouble)) {
+    if (timestamp > deletedAt && filter.satisfy(timestamp, aDouble)) {
       pageData.putTime(timestamp);
       pageData.putDouble(aDouble);
     }
@@ -209,7 +234,7 @@ public class PageReader {
 
   private void readText(BatchData pageData, long timestamp) {
     Binary aBinary = valueDecoder.readBinary(valueBuffer);
-    if (filter.satisfy(timestamp, aBinary)) {
+    if (timestamp > deletedAt && filter.satisfy(timestamp, aBinary)) {
       pageData.putTime(timestamp);
       pageData.putBinary(aBinary);
     }
@@ -220,4 +245,7 @@ public class PageReader {
     valueBuffer = null;
   }
 
+  public void setDeletedAt(long deletedAt) {
+    this.deletedAt = deletedAt;
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java
index ceb4678..f42c8ae 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReader.java
@@ -77,7 +77,7 @@ public abstract class FileSeriesReader {
     // current chunk does not have additional batch, init new chunk reader
     while (chunkToRead < chunkMetaDataList.size()) {
 
-      ChunkMetaData chunkMetaData = chunkMetaDataList.get(chunkToRead++);
+      ChunkMetaData chunkMetaData = nextChunkMeta();
       if (chunkSatisfied(chunkMetaData)) {
         // chunk metadata satisfy the condition
         initChunkReader(chunkMetaData);
@@ -107,4 +107,7 @@ public abstract class FileSeriesReader {
     chunkLoader.close();
   }
 
+  private ChunkMetaData nextChunkMeta() {
+    return chunkMetaDataList.get(chunkToRead++);
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderWithFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderWithFilter.java
index e811d2b..c874e17 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderWithFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderWithFilter.java
@@ -46,7 +46,6 @@ public class FileSeriesReaderWithFilter extends FileSeriesReader {
   protected void initChunkReader(ChunkMetaData chunkMetaData) throws IOException {
     Chunk chunk = chunkLoader.getChunk(chunkMetaData);
     this.chunkReader = new ChunkReaderWithFilter(chunk, filter);
-    this.chunkReader.setMaxTombstoneTime(chunkMetaData.getMaxTombstoneTime());
   }
 
   @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderWithoutFilter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderWithoutFilter.java
index 1ec0c96..c4efcb8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderWithoutFilter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/FileSeriesReaderWithoutFilter.java
@@ -39,7 +39,6 @@ public class FileSeriesReaderWithoutFilter extends FileSeriesReader {
   protected void initChunkReader(ChunkMetaData chunkMetaData) throws IOException {
     Chunk chunk = chunkLoader.getChunk(chunkMetaData);
     this.chunkReader = new ChunkReaderWithoutFilter(chunk);
-    this.chunkReader.setMaxTombstoneTime(chunkMetaData.getMaxTombstoneTime());
   }
 
   @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/SeriesReaderByTimestamp.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/SeriesReaderByTimestamp.java
index 2d2e11c..070924f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/SeriesReaderByTimestamp.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/series/SeriesReaderByTimestamp.java
@@ -117,7 +117,6 @@ public class SeriesReaderByTimestamp {
   private void initChunkReader(ChunkMetaData chunkMetaData) throws IOException {
     Chunk chunk = chunkLoader.getChunk(chunkMetaData);
     this.chunkReader = new ChunkReaderByTimestamp(chunk);
-    this.chunkReader.setMaxTombstoneTime(chunkMetaData.getMaxTombstoneTime());
   }
 
   private boolean chunkSatisfied(ChunkMetaData chunkMetaData) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
index 424d3b0..0a291db 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/TsFileWriter.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.exception.write.NoMeasurementException;
@@ -72,6 +73,11 @@ public class TsFileWriter {
    **/
   private long recordCountForNextMemCheck = 100;
   private long chunkGroupSizeThreshold;
+  /**
+   * In an individual TsFile, version number is not meaningful, added
+   * only for tests.
+   */
+  private long version = 0;
 
   /**
    * init this TsFileWriter.
@@ -258,7 +264,7 @@ public class TsFileWriter {
               chunkGroupFooter.getDataSize(), fileWriter.getPos() - pos));
         }
 
-        fileWriter.endChunkGroup(chunkGroupFooter);
+        fileWriter.endChunkGroup(chunkGroupFooter, version++);
       }
       long actualTotalChunkGroupSize = fileWriter.getPos() - totalMemStart;
       LOG.info("total chunk group size:{}", actualTotalChunkGroupSize);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index 3cffa30..eb4ebf6 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -137,21 +137,21 @@ public class TsFileIOWriter {
    * @param statistics - statistic of the whole series
    * @param maxTime - maximum timestamp of the whole series in this stage
    * @param minTime - minimum timestamp of the whole series in this stage
-   * @param datasize - the serialized size of all pages
+   * @param dataSize - the serialized size of all pages
    * @return the serialized size of CHunkHeader
    * @throws IOException if I/O error occurs
    */
   public int startFlushChunk(MeasurementSchema descriptor, CompressionType compressionCodecName,
       TSDataType tsDataType, TSEncoding encodingType, Statistics<?> statistics, long maxTime,
       long minTime,
-      int datasize, int numOfPages) throws IOException {
+      int dataSize, int numOfPages) throws IOException {
     LOG.debug("start series chunk:{}, file position {}", descriptor, out.getPosition());
 
     currentChunkMetaData = new ChunkMetaData(descriptor.getMeasurementId(), tsDataType,
         out.getPosition(), minTime,
         maxTime);
 
-    ChunkHeader header = new ChunkHeader(descriptor.getMeasurementId(), datasize, tsDataType,
+    ChunkHeader header = new ChunkHeader(descriptor.getMeasurementId(), dataSize, tsDataType,
         compressionCodecName,
         encodingType, numOfPages);
     header.serializeTo(out.wrapAsStream());
@@ -191,8 +191,9 @@ public class TsFileIOWriter {
    *
    * @param chunkGroupFooter -use to serialize
    */
-  public void endChunkGroup(ChunkGroupFooter chunkGroupFooter) throws IOException {
+  public void endChunkGroup(ChunkGroupFooter chunkGroupFooter, long version) throws IOException {
     chunkGroupFooter.serializeTo(out.wrapAsStream());
+    currentChunkGroupMetaData.setVersion(version);
     chunkGroupMetaDataList.add(currentChunkGroupMetaData);
     LOG.debug("end chunk group:{}", currentChunkGroupMetaData);
     currentChunkGroupMetaData = null;
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
index e73e232..6894702 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/write/TsFileIOWriterTest.java
@@ -61,7 +61,7 @@ public class TsFileIOWriterTest {
         measurementSchema.getType(), measurementSchema.getEncodingType(), statistics, 0, 0, 0, 0);
     writer.endChunk(0);
     ChunkGroupFooter footer = new ChunkGroupFooter(deviceId, 0, 1);
-    writer.endChunkGroup(footer);
+    writer.endChunkGroup(footer, 0);
 
     // end file
     writer.endFile(fileSchema);