You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/01/22 01:38:34 UTC

[incubator-iotdb] branch delete_dev4 updated: add roll back mechanism

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch delete_dev4
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/delete_dev4 by this push:
     new 3f5bbb0  add roll back mechanism
3f5bbb0 is described below

commit 3f5bbb095df0ce0b1f45f52bfe2ec14f598f0596
Author: 江天 <jt...@163.com>
AuthorDate: Tue Jan 22 09:37:53 2019 +0800

    add roll back mechanism
---
 .../db/engine/filenode/FileNodeProcessor.java      | 488 +++++++++++----------
 .../db/engine/modification/ModificationFile.java   |  17 +-
 .../io/LocalTextModificationAccessor.java          |  17 +-
 .../engine/modification/io/ModificationWriter.java |   5 +
 .../db/engine/overflow/ioV2/OverflowProcessor.java |  13 +-
 .../db/engine/overflow/ioV2/OverflowResource.java  |  10 +-
 .../engine/modification/ModificationFileTest.java  |  36 ++
 7 files changed, 336 insertions(+), 250 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index f810273..1ac6e42 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@ -1,6 +1,6 @@
 /**
  * Copyright © 2019 Apache IoTDB(incubating) (dev@iotdb.apache.org)
- *
+ * <p>
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,9 +8,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,7 +26,6 @@ import java.nio.file.Files;
 import java.time.Instant;
 import java.time.ZonedDateTime;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -74,7 +73,6 @@ import org.apache.iotdb.db.monitor.MonitorConstants;
 import org.apache.iotdb.db.monitor.StatMonitor;
 import org.apache.iotdb.db.query.factory.SeriesReaderFactory;
 import org.apache.iotdb.db.query.reader.IReader;
-import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.db.utils.TimeValuePair;
@@ -118,7 +116,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   private final HashMap<String, AtomicLong> statParamsHashMap = new HashMap<String, AtomicLong>() {
     {
       for (MonitorConstants.FileNodeProcessorStatConstants statConstant :
-          MonitorConstants.FileNodeProcessorStatConstants.values()) {
+              MonitorConstants.FileNodeProcessorStatConstants.values()) {
         put(statConstant.name(), new AtomicLong(0));
       }
     }
@@ -217,16 +215,16 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    * constructor of FileNodeProcessor.
    */
   public FileNodeProcessor(String fileNodeDirPath, String processorName)
-      throws FileNodeProcessorException {
+          throws FileNodeProcessorException {
     super(processorName);
     statStorageDeltaName =
-        MonitorConstants.statStorageGroupPrefix + MonitorConstants.MONITOR_PATH_SEPERATOR
-            + MonitorConstants.fileNodePath + MonitorConstants.MONITOR_PATH_SEPERATOR
-            + processorName.replaceAll("\\.", "_");
+            MonitorConstants.statStorageGroupPrefix + MonitorConstants.MONITOR_PATH_SEPERATOR
+                    + MonitorConstants.fileNodePath + MonitorConstants.MONITOR_PATH_SEPERATOR
+                    + processorName.replaceAll("\\.", "_");
 
     this.parameters = new HashMap<>();
     if (fileNodeDirPath.length() > 0
-        && fileNodeDirPath.charAt(fileNodeDirPath.length() - 1) != File.separatorChar) {
+            && fileNodeDirPath.charAt(fileNodeDirPath.length() - 1) != File.separatorChar) {
       fileNodeDirPath = fileNodeDirPath + File.separatorChar;
     }
     this.baseDirPath = fileNodeDirPath + processorName;
@@ -234,16 +232,16 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     if (!dataDir.exists()) {
       dataDir.mkdirs();
       LOGGER.info(
-          "The data directory of the filenode processor {} doesn't exist. Create new directory {}",
-          getProcessorName(), baseDirPath);
+              "The data directory of the filenode processor {} doesn't exist. Create new directory {}",
+              getProcessorName(), baseDirPath);
     }
     fileNodeRestoreFilePath = new File(dataDir, processorName + RESTORE_FILE_SUFFIX).getPath();
     try {
       fileNodeProcessorStore = readStoreFromDisk();
     } catch (FileNodeProcessorException e) {
       LOGGER.error(
-          "The fileNode processor {} encountered an error when recoverying restore information.",
-          processorName, e);
+              "The fileNode processor {} encountered an error when recoverying restore information.",
+              processorName, e);
       throw new FileNodeProcessorException(e);
     }
     // TODO deep clone the lastupdate time
@@ -266,7 +264,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     }
     // status is not NONE, or the last intervalFile is not closed
     if (isMerging != FileNodeProcessorStatus.NONE
-        || (!newFileNodes.isEmpty() && !newFileNodes.get(newFileNodes.size() - 1).isClosed())) {
+            || (!newFileNodes.isEmpty() && !newFileNodes.get(newFileNodes.size() - 1).isClosed())) {
       shouldRecovery = true;
     } else {
       // add file into the index of file
@@ -294,9 +292,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     HashMap<String, String> hashMap = new HashMap<String, String>() {
       {
         for (MonitorConstants.FileNodeProcessorStatConstants statConstant :
-            MonitorConstants.FileNodeProcessorStatConstants.values()) {
+                MonitorConstants.FileNodeProcessorStatConstants.values()) {
           put(statStorageDeltaName + MonitorConstants.MONITOR_PATH_SEPERATOR + statConstant.name(),
-              MonitorConstants.DataType);
+                  MonitorConstants.DataType);
         }
       }
     };
@@ -307,9 +305,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   public List<String> getAllPathForStatistic() {
     List<String> list = new ArrayList<>();
     for (MonitorConstants.FileNodeProcessorStatConstants statConstant :
-        MonitorConstants.FileNodeProcessorStatConstants.values()) {
+            MonitorConstants.FileNodeProcessorStatConstants.values()) {
       list.add(
-          statStorageDeltaName + MonitorConstants.MONITOR_PATH_SEPERATOR + statConstant.name());
+              statStorageDeltaName + MonitorConstants.MONITOR_PATH_SEPERATOR + statConstant.name());
     }
     return list;
   }
@@ -348,10 +346,10 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    * add interval FileNode.
    */
   public void addIntervalFileNode(long startTime, String baseDir, String fileName)
-      throws Exception {
+          throws Exception {
 
     IntervalFileNode intervalFileNode = new IntervalFileNode(OverflowChangeType.NO_CHANGE, baseDir,
-        fileName);
+            fileName);
     this.currentIntervalFileNode = intervalFileNode;
     newFileNodes.add(intervalFileNode);
     fileNodeProcessorStore.setNewFileNodes(newFileNodes);
@@ -446,29 +444,29 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       parameters.put(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION, bufferwriteFlushAction);
       parameters.put(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION, bufferwriteCloseAction);
       parameters
-          .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
+              .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
       String baseDir = directories
-          .getTsFileFolder(newFileNodes.get(newFileNodes.size() - 1).getBaseDirIndex());
+              .getTsFileFolder(newFileNodes.get(newFileNodes.size() - 1).getBaseDirIndex());
       LOGGER.info(
-          "The filenode processor {} will recovery the bufferwrite processor, "
-              + "the bufferwrite file is {}",
-          getProcessorName(), fileNames[fileNames.length - 1]);
+              "The filenode processor {} will recovery the bufferwrite processor, "
+                      + "the bufferwrite file is {}",
+              getProcessorName(), fileNames[fileNames.length - 1]);
       try {
         bufferWriteProcessor = new BufferWriteProcessor(baseDir, getProcessorName(),
-            fileNames[fileNames.length - 1], parameters, versionController, fileSchema);
+                fileNames[fileNames.length - 1], parameters, versionController, fileSchema);
       } catch (BufferWriteProcessorException e) {
         // unlock
         writeUnlock();
         LOGGER.error(
-            "The filenode processor {} failed to recovery the bufferwrite processor, "
-                + "the last bufferwrite file is {}.",
-            getProcessorName(), fileNames[fileNames.length - 1]);
+                "The filenode processor {} failed to recovery the bufferwrite processor, "
+                        + "the last bufferwrite file is {}.",
+                getProcessorName(), fileNames[fileNames.length - 1]);
         throw new FileNodeProcessorException(e);
       }
     }
     // restore the overflow processor
     LOGGER.info("The filenode processor {} will recovery the overflow processor.",
-        getProcessorName());
+            getProcessorName());
     parameters.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowFlushAction);
     parameters.put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
     try {
@@ -477,7 +475,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     } catch (IOException e) {
       writeUnlock();
       LOGGER.error("The filenode processor {} failed to recovery the overflow processor.",
-          getProcessorName());
+              getProcessorName());
       throw new FileNodeProcessorException(e);
     }
 
@@ -487,14 +485,14 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       // re-merge all file
       // if bufferwrite processor is not null, and close
       LOGGER.info("The filenode processor {} is recovering, the filenode status is {}.",
-          getProcessorName(),
-          isMerging);
+              getProcessorName(),
+              isMerging);
       merge();
     } else if (isMerging == FileNodeProcessorStatus.WAITING) {
       // unlock
       LOGGER.info("The filenode processor {} is recovering, the filenode status is {}.",
-          getProcessorName(),
-          isMerging);
+              getProcessorName(),
+              isMerging);
       writeUnlock();
       switchWaitingToWorkingv2(newFileNodes);
     } else {
@@ -508,23 +506,23 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    * get buffer write processor by processor name and insert time.
    */
   public BufferWriteProcessor getBufferWriteProcessor(String processorName, long insertTime)
-      throws FileNodeProcessorException {
+          throws FileNodeProcessorException {
     if (bufferWriteProcessor == null) {
       Map<String, Action> parameters = new HashMap<>();
       parameters.put(FileNodeConstants.BUFFERWRITE_FLUSH_ACTION, bufferwriteFlushAction);
       parameters.put(FileNodeConstants.BUFFERWRITE_CLOSE_ACTION, bufferwriteCloseAction);
       parameters
-          .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
+              .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
       String baseDir = directories.getNextFolderForTsfile();
       LOGGER.info("Allocate folder {} for the new bufferwrite processor.", baseDir);
       // construct processor or restore
       try {
         bufferWriteProcessor = new BufferWriteProcessor(baseDir, processorName,
-            insertTime + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR + System.currentTimeMillis(),
-            parameters, versionController, fileSchema);
+                insertTime + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR + System.currentTimeMillis(),
+                parameters, versionController, fileSchema);
       } catch (BufferWriteProcessorException e) {
         LOGGER.error("The filenode processor {} failed to get the bufferwrite processor.",
-            processorName, e);
+                processorName, e);
         throw new FileNodeProcessorException(e);
       }
     }
@@ -551,7 +549,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       // construct processor or restore
       parameters.put(FileNodeConstants.OVERFLOW_FLUSH_ACTION, overflowFlushAction);
       parameters
-          .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
+              .put(FileNodeConstants.FILENODE_PROCESSOR_FLUSH_ACTION, flushFileNodeProcessorAction);
       overflowProcessor = new OverflowProcessor(processorName, parameters, fileSchema,
               versionController);
     }
@@ -623,9 +621,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   public void changeTypeToChanged(String deviceId, long timestamp) {
     if (!invertedindexOfFiles.containsKey(deviceId)) {
       LOGGER.warn(
-          "Can not find any tsfile which will be overflowed in the filenode processor {}, "
-              + "the data is [device:{},time:{}]",
-          getProcessorName(), deviceId, timestamp);
+              "Can not find any tsfile which will be overflowed in the filenode processor {}, "
+                      + "the data is [device:{},time:{}]",
+              getProcessorName(), deviceId, timestamp);
       emptyIntervalFileNode.setStartTime(deviceId, 0L);
       emptyIntervalFileNode.setEndTime(deviceId, getLastUpdateTime(deviceId));
       emptyIntervalFileNode.changeTypeToChanged(isMerging);
@@ -645,9 +643,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   public void changeTypeToChanged(String deviceId, long startTime, long endTime) {
     if (!invertedindexOfFiles.containsKey(deviceId)) {
       LOGGER.warn(
-          "Can not find any tsfile which will be overflowed in the filenode processor {}, "
-              + "the data is [device:{}, start time:{}, end time:{}]",
-          getProcessorName(), deviceId, startTime, endTime);
+              "Can not find any tsfile which will be overflowed in the filenode processor {}, "
+                      + "the data is [device:{}, start time:{}, end time:{}]",
+              getProcessorName(), deviceId, startTime, endTime);
       emptyIntervalFileNode.setStartTime(deviceId, 0L);
       emptyIntervalFileNode.setEndTime(deviceId, getLastUpdateTime(deviceId));
       emptyIntervalFileNode.changeTypeToChanged(isMerging);
@@ -670,9 +668,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   public void changeTypeToChangedForDelete(String deviceId, long timestamp) {
     if (!invertedindexOfFiles.containsKey(deviceId)) {
       LOGGER.warn(
-          "Can not find any tsfile which will be overflowed in the filenode processor {}, "
-              + "the data is [device:{}, delete time:{}]",
-          getProcessorName(), deviceId, timestamp);
+              "Can not find any tsfile which will be overflowed in the filenode processor {}, "
+                      + "the data is [device:{}, delete time:{}]",
+              getProcessorName(), deviceId, timestamp);
       emptyIntervalFileNode.setStartTime(deviceId, 0L);
       emptyIntervalFileNode.setEndTime(deviceId, getLastUpdateTime(deviceId));
       emptyIntervalFileNode.changeTypeToChanged(isMerging);
@@ -694,7 +692,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    * @return index of interval
    */
   private int searchIndexNodeByTimestamp(String deviceId, long timestamp,
-      List<IntervalFileNode> fileList) {
+                                         List<IntervalFileNode> fileList) {
     int index = 1;
     while (index < fileList.size()) {
       if (timestamp < fileList.get(index).getStartTime(deviceId)) {
@@ -728,19 +726,19 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       newMultiPassLock.readLock().unlock();
       newMultiPassTokenSet.remove(token);
       LOGGER
-          .debug("Remove multi token:{}, nspath:{}, new set:{}, lock:{}", token, getProcessorName(),
-              newMultiPassTokenSet, newMultiPassLock);
+              .debug("Remove multi token:{}, nspath:{}, new set:{}, lock:{}", token, getProcessorName(),
+                      newMultiPassTokenSet, newMultiPassLock);
       return true;
     } else if (oldMultiPassTokenSet != null && oldMultiPassTokenSet.contains(token)) {
       // remove token first, then unlock
       oldMultiPassLock.readLock().unlock();
       oldMultiPassTokenSet.remove(token);
       LOGGER.debug("Remove multi token:{}, old set:{}, lock:{}", token, oldMultiPassTokenSet,
-          oldMultiPassLock);
+              oldMultiPassLock);
       return true;
     } else {
       LOGGER.error("remove token error:{},new set:{}, old set:{}", token, newMultiPassTokenSet,
-          oldMultiPassTokenSet);
+              oldMultiPassTokenSet);
       // should add throw exception
       return false;
     }
@@ -750,8 +748,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    * query data.
    */
   public <T extends Comparable<T>> QueryDataSource query(String deviceId, String measurementId,
-      Filter filter)
-      throws FileNodeProcessorException {
+                                                         Filter filter)
+          throws FileNodeProcessorException {
     // query overflow data
     TSDataType dataType = null;
     try {
@@ -775,35 +773,35 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       }
     }
     Pair<ReadOnlyMemChunk, List<ChunkMetaData>> bufferwritedata
-        = new Pair<ReadOnlyMemChunk, List<ChunkMetaData>>(null, null);
+            = new Pair<ReadOnlyMemChunk, List<ChunkMetaData>>(null, null);
     // bufferwrite data
     UnsealedTsFile unsealedTsFile = null;
 
     if (!newFileNodes.isEmpty() && !newFileNodes.get(newFileNodes.size() - 1).isClosed()
-        && !newFileNodes.get(newFileNodes.size() - 1).getStartTimeMap().isEmpty()) {
+            && !newFileNodes.get(newFileNodes.size() - 1).getStartTimeMap().isEmpty()) {
       unsealedTsFile = new UnsealedTsFile();
       unsealedTsFile.setFilePath(newFileNodes.get(newFileNodes.size() - 1).getFilePath());
       if (bufferWriteProcessor == null) {
         LOGGER.error(
-            "The last of tsfile {} in filenode processor {} is not closed, "
-                + "but the bufferwrite processor is null.",
-            newFileNodes.get(newFileNodes.size() - 1).getRelativePath(), getProcessorName());
+                "The last of tsfile {} in filenode processor {} is not closed, "
+                        + "but the bufferwrite processor is null.",
+                newFileNodes.get(newFileNodes.size() - 1).getRelativePath(), getProcessorName());
         throw new FileNodeProcessorException(String.format(
-            "The last of tsfile %s in filenode processor %s is not closed, "
-                + "but the bufferwrite processor is null.",
-            newFileNodes.get(newFileNodes.size() - 1).getRelativePath(), getProcessorName()));
+                "The last of tsfile %s in filenode processor %s is not closed, "
+                        + "but the bufferwrite processor is null.",
+                newFileNodes.get(newFileNodes.size() - 1).getRelativePath(), getProcessorName()));
       }
       bufferwritedata = bufferWriteProcessor
-          .queryBufferWriteData(deviceId, measurementId, dataType);
+              .queryBufferWriteData(deviceId, measurementId, dataType);
 
       try {
-          List<Modification> pathModifications = QueryUtils.getPathModifications(
-                  currentIntervalFileNode.getModFile(), deviceId
-                          + IoTDBConstant.PATH_SEPARATOR + measurementId
-          );
-          if (pathModifications.size() > 0) {
-            QueryUtils.modifyChunkMetaData(bufferwritedata.right, pathModifications);
-          }
+        List<Modification> pathModifications = QueryUtils.getPathModifications(
+                currentIntervalFileNode.getModFile(), deviceId
+                        + IoTDBConstant.PATH_SEPARATOR + measurementId
+        );
+        if (pathModifications.size() > 0) {
+          QueryUtils.modifyChunkMetaData(bufferwritedata.right, pathModifications);
+        }
       } catch (IOException e) {
         throw new FileNodeProcessorException(e);
       }
@@ -811,8 +809,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       unsealedTsFile.setTimeSeriesChunkMetaDatas(bufferwritedata.right);
     }
     GlobalSortedSeriesDataSource globalSortedSeriesDataSource = new GlobalSortedSeriesDataSource(
-        new Path(deviceId + "." + measurementId), bufferwriteDataInFiles, unsealedTsFile,
-        bufferwritedata.left);
+            new Path(deviceId + "." + measurementId), bufferwriteDataInFiles, unsealedTsFile,
+            bufferwritedata.left);
     return new QueryDataSource(globalSortedSeriesDataSource, overflowSeriesDataSource);
 
   }
@@ -820,11 +818,11 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   /**
    * append one specified tsfile to this filenode processor.
    *
-   * @param appendFile the appended tsfile information
+   * @param appendFile     the appended tsfile information
    * @param appendFilePath the seriesPath of appended file
    */
   public void appendFile(IntervalFileNode appendFile, String appendFilePath)
-      throws FileNodeProcessorException {
+          throws FileNodeProcessorException {
     try {
       if (!new File(appendFile.getFilePath()).getParentFile().exists()) {
         new File(appendFile.getFilePath()).getParentFile().mkdirs();
@@ -834,11 +832,11 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       File targetFile = new File(appendFile.getFilePath());
       if (!originFile.exists()) {
         throw new FileNodeProcessorException(
-            String.format("The appended file %s does not exist.", appendFilePath));
+                String.format("The appended file %s does not exist.", appendFilePath));
       }
       if (targetFile.exists()) {
         throw new FileNodeProcessorException(
-            String.format("The appended target file %s already exists.", appendFile.getFilePath()));
+                String.format("The appended target file %s already exists.", appendFile.getFilePath()));
       }
       originFile.renameTo(targetFile);
       // append the new tsfile
@@ -854,7 +852,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       addAllFileIntoIndex(newFileNodes);
     } catch (Exception e) {
       LOGGER.error("Failed to append the tsfile {} to filenode processor {}.", appendFile,
-          getProcessorName(), e);
+              getProcessorName(), e);
       throw new FileNodeProcessorException(e);
     }
   }
@@ -865,7 +863,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    * @param appendFile the appended tsfile information
    */
   public List<String> getOverlapFiles(IntervalFileNode appendFile, String uuid)
-      throws FileNodeProcessorException {
+          throws FileNodeProcessorException {
     List<String> overlapFiles = new ArrayList<>();
     try {
       for (IntervalFileNode intervalFileNode : newFileNodes) {
@@ -874,19 +872,19 @@ public class FileNodeProcessor extends Processor implements IStatistic {
             continue;
           }
           if (intervalFileNode.getEndTime(entry.getKey()) >= entry.getValue()
-              && intervalFileNode.getStartTime(entry.getKey()) <= appendFile
-              .getEndTime(entry.getKey())) {
+                  && intervalFileNode.getStartTime(entry.getKey()) <= appendFile
+                  .getEndTime(entry.getKey())) {
             String relativeFilePath = "postback" + File.separator + uuid + File.separator + "backup"
-                + File.separator + intervalFileNode.getRelativePath();
+                    + File.separator + intervalFileNode.getRelativePath();
             File newFile = new File(
-                Directories.getInstance().getTsFileFolder(intervalFileNode.getBaseDirIndex()),
-                relativeFilePath);
+                    Directories.getInstance().getTsFileFolder(intervalFileNode.getBaseDirIndex()),
+                    relativeFilePath);
             if (!newFile.getParentFile().exists()) {
               newFile.getParentFile().mkdirs();
             }
             java.nio.file.Path link = FileSystems.getDefault().getPath(newFile.getPath());
             java.nio.file.Path target = FileSystems.getDefault()
-                .getPath(intervalFileNode.getFilePath());
+                    .getPath(intervalFileNode.getFilePath());
             Files.createLink(link, target);
             overlapFiles.add(newFile.getPath());
             break;
@@ -904,9 +902,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    * add time series.
    */
   public void addTimeSeries(String measurementToString, String dataType, String encoding,
-      String[] encodingArgs) {
+                            String[] encodingArgs) {
     ColumnSchema col = new ColumnSchema(measurementToString, TSDataType.valueOf(dataType),
-        TSEncoding.valueOf(encoding));
+            TSEncoding.valueOf(encoding));
     JSONObject measurement = constrcutMeasurement(col);
     fileSchema.registerMeasurement(JsonConverter.convertJsonToMeasurementSchema(measurement));
   }
@@ -938,32 +936,32 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       long thisMergeTime = System.currentTimeMillis();
       long mergeTimeInterval = thisMergeTime - lastMergeTime;
       ZonedDateTime lastDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(lastMergeTime),
-          IoTDBDescriptor.getInstance().getConfig().getZoneID());
+              IoTDBDescriptor.getInstance().getConfig().getZoneID());
       ZonedDateTime thisDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(thisMergeTime),
-          IoTDBDescriptor.getInstance().getConfig().getZoneID());
+              IoTDBDescriptor.getInstance().getConfig().getZoneID());
       LOGGER.info(
-          "The filenode {} last merge time is {}, this merge time is {}, "
-              + "merge time interval is {}s",
-          getProcessorName(), lastDateTime, thisDateTime, mergeTimeInterval / 1000);
+              "The filenode {} last merge time is {}, this merge time is {}, "
+                      + "merge time interval is {}s",
+              getProcessorName(), lastDateTime, thisDateTime, mergeTimeInterval / 1000);
     }
     lastMergeTime = System.currentTimeMillis();
 
     if (overflowProcessor != null) {
       if (overflowProcessor.getFileSize() < IoTDBDescriptor.getInstance()
-          .getConfig().overflowFileSizeThreshold) {
+              .getConfig().overflowFileSizeThreshold) {
         LOGGER.info(
-            "Skip this merge taks submission, because the size{} of overflow processor {} "
-                + "does not reaches the threshold {}.",
-            MemUtils.bytesCntToStr(overflowProcessor.getFileSize()), getProcessorName(),
-            MemUtils.bytesCntToStr(
-                IoTDBDescriptor.getInstance().getConfig().overflowFileSizeThreshold));
+                "Skip this merge taks submission, because the size{} of overflow processor {} "
+                        + "does not reaches the threshold {}.",
+                MemUtils.bytesCntToStr(overflowProcessor.getFileSize()), getProcessorName(),
+                MemUtils.bytesCntToStr(
+                        IoTDBDescriptor.getInstance().getConfig().overflowFileSizeThreshold));
         return null;
       }
     } else {
       LOGGER.info(
-          "Skip this merge taks submission, because the filenode processor {} "
-              + "has no overflow processor.",
-          getProcessorName());
+              "Skip this merge taks submission, because the filenode processor {} "
+                      + "has no overflow processor.",
+              getProcessorName());
       return null;
     }
     if (isOverflowed && isMerging == FileNodeProcessorStatus.NONE) {
@@ -975,18 +973,18 @@ public class FileNodeProcessor extends Processor implements IStatistic {
           merge();
           long mergeEndTime = System.currentTimeMillis();
           ZonedDateTime startDateTime = ZonedDateTime
-              .ofInstant(Instant.ofEpochMilli(mergeStartTime),
-                  IoTDBDescriptor.getInstance().getConfig().getZoneID());
+                  .ofInstant(Instant.ofEpochMilli(mergeStartTime),
+                          IoTDBDescriptor.getInstance().getConfig().getZoneID());
           ZonedDateTime endDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(mergeEndTime),
-              IoTDBDescriptor.getInstance().getConfig().getZoneID());
+                  IoTDBDescriptor.getInstance().getConfig().getZoneID());
           long intervalTime = mergeEndTime - mergeStartTime;
           LOGGER.info(
-              "The filenode processor {} merge start time is {}, "
-                  + "merge end time is {}, merge consumes {}ms.",
-              getProcessorName(), startDateTime, endDateTime, intervalTime);
+                  "The filenode processor {} merge start time is {}, "
+                          + "merge end time is {}, merge consumes {}ms.",
+                  getProcessorName(), startDateTime, endDateTime, intervalTime);
         } catch (FileNodeProcessorException e) {
           LOGGER.error("The filenode processor {} encountered an error when merging.",
-              getProcessorName(), e);
+                  getProcessorName(), e);
           throw new ErrorDebugException(e);
         }
       };
@@ -995,13 +993,13 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     } else {
       if (!isOverflowed) {
         LOGGER.info(
-            "Skip this merge taks submission, because the filenode processor {} is not overflowed.",
-            getProcessorName());
+                "Skip this merge taks submission, because the filenode processor {} is not overflowed.",
+                getProcessorName());
       } else {
         LOGGER.warn(
-            "Skip this merge task submission, because last merge task is not over yet, "
-                + "the merge filenode processor is {}",
-            getProcessorName());
+                "Skip this merge task submission, because last merge task is not over yet, "
+                        + "the merge filenode processor is {}",
+                getProcessorName());
       }
     }
     return null;
@@ -1013,7 +1011,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   private void prepareForMerge() {
     try {
       LOGGER.info("The filenode processor {} prepares for merge, closes the bufferwrite processor",
-          getProcessorName());
+              getProcessorName());
       closeBufferWrite();
       // try to get overflow processor
       getOverflowProcessor(getProcessorName());
@@ -1021,16 +1019,16 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       while (!getOverflowProcessor().canBeClosed()) {
         try {
           LOGGER.info(
-              "The filenode processor {} prepares for merge, the overflow {} can't be closed, "
-                  + "wait 100ms,",
-              getProcessorName(), getProcessorName());
+                  "The filenode processor {} prepares for merge, the overflow {} can't be closed, "
+                          + "wait 100ms,",
+                  getProcessorName(), getProcessorName());
           TimeUnit.MICROSECONDS.sleep(100);
         } catch (InterruptedException e) {
           e.printStackTrace();
         }
       }
       LOGGER.info("The filenode processor {} prepares for merge, closes the overflow processor",
-          getProcessorName());
+              getProcessorName());
       getOverflowProcessor().close();
     } catch (FileNodeProcessorException | OverflowProcessorException | IOException e) {
       e.printStackTrace();
@@ -1063,7 +1061,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     Map<String, Long> startTimeMap = emptyIntervalFileNode.getStartTimeMap();
     if (emptyIntervalFileNode.overflowChangeType != OverflowChangeType.NO_CHANGE) {
       Iterator<Entry<String, Long>> iterator = emptyIntervalFileNode.getEndTimeMap().entrySet()
-          .iterator();
+              .iterator();
       while (iterator.hasNext()) {
         Entry<String, Long> entry = iterator.next();
         String deviceId = entry.getKey();
@@ -1106,7 +1104,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
         writeStoreToDisk(fileNodeProcessorStore);
       } catch (FileNodeProcessorException e) {
         LOGGER.error("The filenode processor {} writes restore information error when merging.",
-            getProcessorName(), e);
+                getProcessorName(), e);
         writeUnlock();
         throw new FileNodeProcessorException(e);
       }
@@ -1128,12 +1126,12 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       overflowProcessor.switchWorkToMerge();
     } catch (IOException e) {
       LOGGER.error("The filenode processor {} can't switch overflow processor from work to merge.",
-          getProcessorName(), e);
+              getProcessorName(), e);
       writeUnlock();
       throw new FileNodeProcessorException(e);
     }
     LOGGER.info("The filenode processor {} switches from {} to {}.", getProcessorName(),
-        FileNodeProcessorStatus.NONE, FileNodeProcessorStatus.MERGING_WRITE);
+            FileNodeProcessorStatus.NONE, FileNodeProcessorStatus.MERGING_WRITE);
     writeUnlock();
 
     // query tsfile data and overflow data, and merge them
@@ -1146,40 +1144,40 @@ public class FileNodeProcessor extends Processor implements IStatistic {
         String filePathBeforeMerge = backupIntervalFile.getRelativePath();
         try {
           LOGGER.info(
-              "The filenode processor {} begins merging the {}/{} tsfile[{}] with overflow file, "
-                  + "the process is {}%",
-              getProcessorName(), numOfMergeFiles, allNeedMergeFiles, filePathBeforeMerge,
-              (int) (((numOfMergeFiles - 1) / (float) allNeedMergeFiles) * 100));
+                  "The filenode processor {} begins merging the {}/{} tsfile[{}] with overflow file, "
+                          + "the process is {}%",
+                  getProcessorName(), numOfMergeFiles, allNeedMergeFiles, filePathBeforeMerge,
+                  (int) (((numOfMergeFiles - 1) / (float) allNeedMergeFiles) * 100));
           long startTime = System.currentTimeMillis();
           String newFile = queryAndWriteDataForMerge(backupIntervalFile);
           long endTime = System.currentTimeMillis();
           long timeConsume = endTime - startTime;
           ZonedDateTime startDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(startTime),
-              IoTDBDescriptor.getInstance().getConfig().getZoneID());
+                  IoTDBDescriptor.getInstance().getConfig().getZoneID());
           ZonedDateTime endDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(endTime),
-              IoTDBDescriptor.getInstance().getConfig().getZoneID());
+                  IoTDBDescriptor.getInstance().getConfig().getZoneID());
           LOGGER.info(
-              "The fileNode processor {} has merged the {}/{} tsfile[{}->{}] over, "
-                  + "start time of merge is {}, end time of merge is {}, time consumption is {}ms,"
-                  + " the process is {}%",
-              getProcessorName(), numOfMergeFiles, allNeedMergeFiles, filePathBeforeMerge, newFile,
-              startDateTime, endDateTime, timeConsume,
-              (int) (numOfMergeFiles) / (float) allNeedMergeFiles * 100);
+                  "The fileNode processor {} has merged the {}/{} tsfile[{}->{}] over, "
+                          + "start time of merge is {}, end time of merge is {}, time consumption is {}ms,"
+                          + " the process is {}%",
+                  getProcessorName(), numOfMergeFiles, allNeedMergeFiles, filePathBeforeMerge, newFile,
+                  startDateTime, endDateTime, timeConsume,
+                  (int) (numOfMergeFiles) / (float) allNeedMergeFiles * 100);
         } catch (IOException | WriteProcessException | PathErrorException e) {
           LOGGER.error("Merge: query and write data error.", e);
           throw new FileNodeProcessorException(e);
         }
       } else if (backupIntervalFile.overflowChangeType == OverflowChangeType.MERGING_CHANGE) {
         LOGGER.error("The overflowChangeType of backupIntervalFile must not be {}",
-            OverflowChangeType.MERGING_CHANGE);
+                OverflowChangeType.MERGING_CHANGE);
         // handle this error, throw one runtime exception
         throw new FileNodeProcessorException(
-            "The overflowChangeType of backupIntervalFile must not be "
-                + OverflowChangeType.MERGING_CHANGE);
+                "The overflowChangeType of backupIntervalFile must not be "
+                        + OverflowChangeType.MERGING_CHANGE);
       } else {
         LOGGER.debug(
-            "The filenode processor {} is merging, the interval file {} doesn't need to be merged.",
-            getProcessorName(), backupIntervalFile.getRelativePath());
+                "The filenode processor {} is merging, the interval file {} doesn't need to be merged.",
+                getProcessorName(), backupIntervalFile.getRelativePath());
       }
     }
 
@@ -1211,8 +1209,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       result.add(emptyIntervalFileNode.backUp());
       if (!newFileNodes.isEmpty()) {
         throw new FileNodeProcessorException(
-            String.format("The status of empty file is %s, but the new file list is not empty",
-                emptyIntervalFileNode.overflowChangeType));
+                String.format("The status of empty file is %s, but the new file list is not empty",
+                        emptyIntervalFileNode.overflowChangeType));
       }
       return result;
     }
@@ -1241,15 +1239,15 @@ public class FileNodeProcessor extends Processor implements IStatistic {
             }
           }
           IntervalFileNode node = new IntervalFileNode(startTimeMap, endTimeMap,
-              intervalFileNode.overflowChangeType, intervalFileNode.getBaseDirIndex(),
-              intervalFileNode.getRelativePath());
+                  intervalFileNode.overflowChangeType, intervalFileNode.getBaseDirIndex(),
+                  intervalFileNode.getRelativePath());
           result.add(node);
         }
       }
     } else {
       LOGGER.error("No file was changed when merging, the filenode is {}", getProcessorName());
       throw new FileNodeProcessorException(
-          "No file was changed when merging, the filenode is " + getProcessorName());
+              "No file was changed when merging, the filenode is " + getProcessorName());
     }
     return result;
   }
@@ -1308,9 +1306,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    */
 
   private void switchMergeToWaitingv2(List<IntervalFileNode> backupIntervalFiles, boolean needEmpty)
-      throws FileNodeProcessorException {
+          throws FileNodeProcessorException {
     LOGGER.info("The status of filenode processor {} switches from {} to {}.", getProcessorName(),
-        FileNodeProcessorStatus.MERGING_WRITE, FileNodeProcessorStatus.WAITING);
+            FileNodeProcessorStatus.MERGING_WRITE, FileNodeProcessorStatus.WAITING);
     writeLock();
     try {
       oldMultiPassTokenSet = newMultiPassTokenSet;
@@ -1348,7 +1346,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
               temp.overflowChangeType = OverflowChangeType.CHANGED;
             } else {
               changeTypeToChanged(deviceId, newFile.getStartTime(deviceId),
-                  newFile.getEndTime(deviceId));
+                      newFile.getEndTime(deviceId));
             }
           }
         }
@@ -1383,11 +1381,11 @@ public class FileNodeProcessor extends Processor implements IStatistic {
           writeStoreToDisk(fileNodeProcessorStore);
         } catch (FileNodeProcessorException e) {
           LOGGER.error(
-              "Merge: failed to write filenode information to revocery file, the filenode is {}.",
-              getProcessorName(), e);
+                  "Merge: failed to write filenode information to revocery file, the filenode is {}.",
+                  getProcessorName(), e);
           throw new FileNodeProcessorException(
-              "Merge: write filenode information to revocery file failed, the filenode is "
-                  + getProcessorName());
+                  "Merge: write filenode information to revocery file failed, the filenode is "
+                          + getProcessorName());
         }
       }
     } finally {
@@ -1396,15 +1394,15 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   }
 
   private void switchWaitingToWorkingv2(List<IntervalFileNode> backupIntervalFiles)
-      throws FileNodeProcessorException {
+          throws FileNodeProcessorException {
 
     LOGGER.info("The status of filenode processor {} switches from {} to {}.", getProcessorName(),
-        FileNodeProcessorStatus.WAITING, FileNodeProcessorStatus.NONE);
+            FileNodeProcessorStatus.WAITING, FileNodeProcessorStatus.NONE);
 
     if (oldMultiPassLock != null) {
       LOGGER.info("The old Multiple Pass Token set is {}, the old Multiple Pass Lock is {}",
-          oldMultiPassTokenSet,
-          oldMultiPassLock);
+              oldMultiPassTokenSet,
+              oldMultiPassLock);
       oldMultiPassLock.writeLock().lock();
     }
     try {
@@ -1417,7 +1415,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
         List<File> bufferwriteDirList = new ArrayList<>();
         for (String bufferwriteDirPath : bufferwriteDirPathList) {
           if (bufferwriteDirPath.length() > 0
-              && bufferwriteDirPath.charAt(bufferwriteDirPath.length() - 1) != File.separatorChar) {
+                  && bufferwriteDirPath.charAt(bufferwriteDirPath.length() - 1) != File.separatorChar) {
             bufferwriteDirPath = bufferwriteDirPath + File.separatorChar;
           }
           bufferwriteDirPath = bufferwriteDirPath + getProcessorName();
@@ -1439,7 +1437,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
         // add the restore file, if the last file is not closed
         if (!newFileNodes.isEmpty() && !newFileNodes.get(newFileNodes.size() - 1).isClosed()) {
           String bufferFileRestorePath =
-              newFileNodes.get(newFileNodes.size() - 1).getFilePath() + ".restore";
+                  newFileNodes.get(newFileNodes.size() - 1).getFilePath() + ".restore";
           bufferFiles.add(bufferFileRestorePath);
         }
 
@@ -1472,10 +1470,10 @@ public class FileNodeProcessor extends Processor implements IStatistic {
         }
       } catch (IOException e) {
         LOGGER.info(
-            "The filenode processor {} encountered an error when its "
-                + "status switched from {} to {}.",
-            getProcessorName(), FileNodeProcessorStatus.NONE, FileNodeProcessorStatus.MERGING_WRITE,
-            e);
+                "The filenode processor {} encountered an error when its "
+                        + "status switched from {} to {}.",
+                getProcessorName(), FileNodeProcessorStatus.NONE, FileNodeProcessorStatus.MERGING_WRITE,
+                e);
         throw new FileNodeProcessorException(e);
       } finally {
         writeUnlock();
@@ -1494,12 +1492,12 @@ public class FileNodeProcessor extends Processor implements IStatistic {
                                      String measurementId) {
     TSRecord record = new TSRecord(timeValuePair.getTimestamp(), deviceId);
     record.addTuple(DataPoint.getDataPoint(timeValuePair.getValue().getDataType(), measurementId,
-        timeValuePair.getValue().getValue().toString()));
+            timeValuePair.getValue().getValue().toString()));
     return record;
   }
 
   private String queryAndWriteDataForMerge(IntervalFileNode backupIntervalFile)
-      throws IOException, WriteProcessException, FileNodeProcessorException, PathErrorException {
+          throws IOException, WriteProcessException, FileNodeProcessorException, PathErrorException {
     Map<String, Long> startTimeMap = new HashMap<>();
     Map<String, Long> endTimeMap = new HashMap<>();
 
@@ -1537,26 +1535,26 @@ public class FileNodeProcessor extends Processor implements IStatistic {
           String measurementId = path.getMeasurement();
           TSDataType dataType = mManager.getSeriesType(path.getFullPath());
           OverflowSeriesDataSource overflowSeriesDataSource = overflowProcessor.queryMerge(deviceId,
-              measurementId, dataType, true);
+                  measurementId, dataType, true);
           Filter timeFilter = FilterFactory
-              .and(TimeFilter.gtEq(backupIntervalFile.getStartTime(deviceId)),
-                  TimeFilter.ltEq(backupIntervalFile.getEndTime(deviceId)));
+                  .and(TimeFilter.gtEq(backupIntervalFile.getStartTime(deviceId)),
+                          TimeFilter.ltEq(backupIntervalFile.getEndTime(deviceId)));
           SingleSeriesExpression seriesFilter = new SingleSeriesExpression(path, timeFilter);
           IReader seriesReader = SeriesReaderFactory.getInstance()
-              .createSeriesReaderForMerge(backupIntervalFile,
-                  overflowSeriesDataSource, seriesFilter);
+                  .createSeriesReaderForMerge(backupIntervalFile,
+                          overflowSeriesDataSource, seriesFilter);
           try {
             if (!seriesReader.hasNext()) {
               LOGGER.debug(
-                  "The time-series {} has no data with the filter {} in the filenode processor {}",
-                  path, seriesFilter, getProcessorName());
+                      "The time-series {} has no data with the filter {} in the filenode processor {}",
+                      path, seriesFilter, getProcessorName());
             } else {
               numOfChunk++;
               TimeValuePair timeValuePair = seriesReader.next();
               if (fileIoWriter == null) {
                 baseDir = directories.getNextFolderForTsfile();
                 fileName = String.valueOf(timeValuePair.getTimestamp()
-                    + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR + System.currentTimeMillis());
+                        + FileNodeConstants.BUFFERWRITE_FILE_SEPARATOR + System.currentTimeMillis());
                 outputPath = constructOutputFilePath(baseDir, getProcessorName(), fileName);
                 fileName = getProcessorName() + File.separatorChar + fileName;
                 fileIoWriter = new TsFileIOWriter(new File(outputPath));
@@ -1577,11 +1575,11 @@ public class FileNodeProcessor extends Processor implements IStatistic {
               ChunkBuffer pageWriter = new ChunkBuffer(measurementSchema);
               int pageSizeThreshold = TsFileConf.pageSizeInByte;
               ChunkWriterImpl seriesWriterImpl = new ChunkWriterImpl(measurementSchema, pageWriter,
-                  pageSizeThreshold);
+                      pageSizeThreshold);
               // write the series data
               recordCount += writeOneSeries(deviceId, measurementId, seriesWriterImpl, dataType,
-                  seriesReader,
-                  startTimeMap, endTimeMap, timeValuePair);
+                      seriesReader,
+                      startTimeMap, endTimeMap, timeValuePair);
               // flush the series data
               seriesWriterImpl.writeToFileWriter(fileIoWriter);
             }
@@ -1599,7 +1597,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
         }
       }
     } finally {
-      if(mergeDeleteLock.isLocked())
+      if (mergeDeleteLock.isLocked())
         mergeDeleteLock.unlock();
     }
 
@@ -1639,7 +1637,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
           timeValuePair = seriesReader.next();
           endTime = timeValuePair.getTimestamp();
           seriesWriterImpl
-              .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean());
+                  .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBoolean());
         }
         if (!endTimeMap.containsKey(deviceId) || endTimeMap.get(deviceId) < endTime) {
           endTimeMap.put(deviceId, endTime);
@@ -1720,7 +1718,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
           timeValuePair = seriesReader.next();
           endTime = timeValuePair.getTimestamp();
           seriesWriterImpl
-              .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble());
+                  .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble());
         }
         if (!endTimeMap.containsKey(deviceId) || endTimeMap.get(deviceId) < endTime) {
           endTimeMap.put(deviceId, endTime);
@@ -1741,7 +1739,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
           timeValuePair = seriesReader.next();
           endTime = timeValuePair.getTimestamp();
           seriesWriterImpl
-              .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary());
+                  .write(timeValuePair.getTimestamp(), timeValuePair.getValue().getBinary());
         }
         if (!endTimeMap.containsKey(deviceId) || endTimeMap.get(deviceId) < endTime) {
           endTimeMap.put(deviceId, endTime);
@@ -1762,7 +1760,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     File dataDir = new File(baseDir);
     if (!dataDir.exists()) {
       LOGGER.warn("The bufferwrite processor data dir doesn't exists, create new directory {}",
-          baseDir);
+              baseDir);
       dataDir.mkdirs();
     }
     File outputFile = new File(dataDir, fileName);
@@ -1786,7 +1784,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   }
 
   private FileSchema getFileSchemaFromColumnSchema(List<ColumnSchema> schemaList, String deviceType)
-      throws WriteProcessException {
+          throws WriteProcessException {
     JSONArray rowGroup = new JSONArray();
 
     for (ColumnSchema col : schemaList) {
@@ -1824,8 +1822,8 @@ public class FileNodeProcessor extends Processor implements IStatistic {
               }
             } else {
               LOGGER
-                  .info("The filenode {} can't be closed, because it can't get oldMultiPassLock {}",
-                      getProcessorName(), oldMultiPassLock);
+                      .info("The filenode {} can't be closed, because it can't get oldMultiPassLock {}",
+                              getProcessorName(), oldMultiPassLock);
               return false;
             }
           } else {
@@ -1836,13 +1834,13 @@ public class FileNodeProcessor extends Processor implements IStatistic {
         }
       } else {
         LOGGER.info("The filenode {} can't be closed, because it can't get newMultiPassLock {}",
-            getProcessorName(), newMultiPassLock);
+                getProcessorName(), newMultiPassLock);
         return false;
       }
     } else {
       LOGGER.info("The filenode {} can't be closed, because the filenode status is {}",
-          getProcessorName(),
-          isMerging);
+              getProcessorName(),
+              isMerging);
       return false;
     }
   }
@@ -1867,7 +1865,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
         while (!bufferWriteProcessor.canBeClosed()) {
           try {
             LOGGER.info("The bufferwrite {} can't be closed, wait 100ms",
-                bufferWriteProcessor.getProcessorName());
+                    bufferWriteProcessor.getProcessorName());
             TimeUnit.MICROSECONDS.sleep(100);
           } catch (InterruptedException e) {
             e.printStackTrace();
@@ -1913,7 +1911,7 @@ public class FileNodeProcessor extends Processor implements IStatistic {
         while (!overflowProcessor.canBeClosed()) {
           try {
             LOGGER.info("The overflow {} can't be closed, wait 100ms",
-                overflowProcessor.getProcessorName());
+                    overflowProcessor.getProcessorName());
             TimeUnit.MICROSECONDS.sleep(100);
           } catch (InterruptedException e) {
             e.printStackTrace();
@@ -1970,14 +1968,14 @@ public class FileNodeProcessor extends Processor implements IStatistic {
   }
 
   private void writeStoreToDisk(FileNodeProcessorStore fileNodeProcessorStore)
-      throws FileNodeProcessorException {
+          throws FileNodeProcessorException {
 
     synchronized (fileNodeRestoreFilePath) {
       SerializeUtil<FileNodeProcessorStore> serializeUtil = new SerializeUtil<>();
       try {
         serializeUtil.serialize(fileNodeProcessorStore, fileNodeRestoreFilePath);
         LOGGER.debug("The filenode processor {} writes restore information to the restore file",
-            getProcessorName());
+                getProcessorName());
       } catch (IOException e) {
         throw new FileNodeProcessorException(e);
       }
@@ -1991,9 +1989,9 @@ public class FileNodeProcessor extends Processor implements IStatistic {
       SerializeUtil<FileNodeProcessorStore> serializeUtil = new SerializeUtil<>();
       try {
         fileNodeProcessorStore = serializeUtil.deserialize(fileNodeRestoreFilePath)
-            .orElse(new FileNodeProcessorStore(false, new HashMap<>(),
-                new IntervalFileNode(OverflowChangeType.NO_CHANGE, null),
-                new ArrayList<IntervalFileNode>(), FileNodeProcessorStatus.NONE, 0));
+                .orElse(new FileNodeProcessorStore(false, new HashMap<>(),
+                        new IntervalFileNode(OverflowChangeType.NO_CHANGE, null),
+                        new ArrayList<IntervalFileNode>(), FileNodeProcessorStatus.NONE, 0));
       } catch (IOException e) {
         throw new FileNodeProcessorException(e);
       }
@@ -2006,48 +2004,62 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    * { mergeIndex(); switchMergeIndex(); }
    */
 
-    public String getFileNodeRestoreFilePath() {
-        return fileNodeRestoreFilePath;
-    }
+  public String getFileNodeRestoreFilePath() {
+    return fileNodeRestoreFilePath;
+  }
+
+  /**
+   * Delete data whose timestamp <= 'timestamp' and belong to timeseries deviceId.measurementId.
+   *
+   * @param deviceId      the deviceId of the timeseries to be deleted.
+   * @param measurementId the measurementId of the timeseries to be deleted.
+   * @param timestamp     the delete range is (0, timestamp].
+   */
+  public void delete(String deviceId, String measurementId, long timestamp) throws IOException {
+    // TODO: how to avoid partial deletion?
+    mergeDeleteLock.lock();
+    long version = versionController.nextVersion();
 
-    /**
-     * Delete data whose timestamp <= 'timestamp' and belong to timeseries deviceId.measurementId.
-     * @param deviceId the deviceId of the timeseries to be deleted.
-     * @param measurementId the measurementId of the timeseries to be deleted.
-     * @param timestamp the delete range is (0, timestamp].
-     */
-    public void delete(String deviceId, String measurementId, long timestamp) throws IOException {
-      // TODO: how to avoid partial deletion?
-      mergeDeleteLock.lock();
-      long version = versionController.nextVersion();
+    // record what files are updated so we can roll back them in case of exception
+    List<ModificationFile> updatedModFiles = new ArrayList<>();
 
-      try {
-        String fullPath = deviceId +
-                IoTDBConstant.PATH_SEPARATOR + measurementId;
-        Deletion deletion = new Deletion(fullPath, version, timestamp);
-        if (mergingModification != null) {
-          mergingModification.write(deletion);
-        }
+    try {
+      String fullPath = deviceId +
+              IoTDBConstant.PATH_SEPARATOR + measurementId;
+      Deletion deletion = new Deletion(fullPath, version, timestamp);
+      if (mergingModification != null) {
+        mergingModification.write(deletion);
+        updatedModFiles.add(mergingModification);
+      }
 
-        deleteBufferWriteFiles(deviceId, deletion);
-        // delete data in memory
-        OverflowProcessor overflowProcessor = getOverflowProcessor(getProcessorName());
-        overflowProcessor.delete(deviceId, measurementId, timestamp, version);
-        if (bufferWriteProcessor != null) {
-          bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
-        }
-      } finally {
-        mergeDeleteLock.unlock();
+      deleteBufferWriteFiles(deviceId, deletion, updatedModFiles);
+      // delete data in memory
+      OverflowProcessor overflowProcessor = getOverflowProcessor(getProcessorName());
+      overflowProcessor.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
+      if (bufferWriteProcessor != null) {
+        bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
+      }
+    } catch (Exception e) {
+      // roll back
+      for (ModificationFile modFile : updatedModFiles) {
+        modFile.abort();
       }
+      throw new IOException(e);
+    } finally {
+      mergeDeleteLock.unlock();
     }
+  }
 
-  private void deleteBufferWriteFiles(String deviceId, Deletion deletion) throws IOException {
+  private void deleteBufferWriteFiles(String deviceId, Deletion deletion,
+                                      List<ModificationFile> updatedModFiles) throws IOException {
     if (currentIntervalFileNode != null && currentIntervalFileNode.containsDevice(deviceId)) {
       currentIntervalFileNode.getModFile().write(deletion);
+      updatedModFiles.add(currentIntervalFileNode.getModFile());
     }
     for (IntervalFileNode fileNode : newFileNodes) {
-      if(fileNode != currentIntervalFileNode && fileNode.containsDevice(deviceId)) {
+      if (fileNode != currentIntervalFileNode && fileNode.containsDevice(deviceId)) {
         fileNode.getModFile().write(deletion);
+        updatedModFiles.add(fileNode.getModFile());
       }
     }
   }
@@ -2056,17 +2068,17 @@ public class FileNodeProcessor extends Processor implements IStatistic {
    * Similar to delete(), but only deletes data in BufferWrite.
    * Only used by WAL recovery.
    */
-    public void deleteBufferWrite(String deviceId, String measurementId, long timestamp) throws IOException {
-      String fullPath = deviceId +
-              IoTDBConstant.PATH_SEPARATOR + measurementId;
-      long version = versionController.nextVersion();
-      Deletion deletion = new Deletion(fullPath, version, timestamp);
+  public void deleteBufferWrite(String deviceId, String measurementId, long timestamp) throws IOException {
+    String fullPath = deviceId +
+            IoTDBConstant.PATH_SEPARATOR + measurementId;
+    long version = versionController.nextVersion();
+    Deletion deletion = new Deletion(fullPath, version, timestamp);
 
-      deleteBufferWriteFiles(deviceId, deletion);
-      if (bufferWriteProcessor != null) {
-        bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
-      }
+    deleteBufferWriteFiles(deviceId, deletion, updatedModFiles);
+    if (bufferWriteProcessor != null) {
+      bufferWriteProcessor.delete(deviceId, measurementId, timestamp);
     }
+  }
 
   /**
    * Similar to delete(), but only deletes data in Overflow.
@@ -2079,6 +2091,6 @@ public class FileNodeProcessor extends Processor implements IStatistic {
     Deletion deletion = new Deletion(fullPath, version, timestamp);
 
     OverflowProcessor overflowProcessor = getOverflowProcessor(getProcessorName());
-    overflowProcessor.delete(deviceId, measurementId, timestamp, version);
+    overflowProcessor.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
   }
 }
\ No newline at end of file
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
index 2815e36..d09ab84 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
@@ -19,6 +19,7 @@ package org.apache.iotdb.db.engine.modification;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 
 import org.apache.iotdb.db.engine.modification.io.LocalTextModificationAccessor;
 import org.apache.iotdb.db.engine.modification.io.ModificationReader;
@@ -32,13 +33,14 @@ import org.apache.iotdb.db.engine.modification.io.ModificationWriter;
 public class ModificationFile {
   public static final String FILE_SUFFIX = ".mods";
 
-  private Collection<Modification> modifications;
+  private List<Modification> modifications;
   private ModificationWriter writer;
   private ModificationReader reader;
   private String filePath;
 
   /**
    * Construct a ModificationFile using a file as its storage.
+   *
    * @param filePath the path of the storage file.
    */
   public ModificationFile(String filePath) {
@@ -50,7 +52,7 @@ public class ModificationFile {
 
   private void init() throws IOException {
     synchronized (this) {
-      Collection<Modification> mods = reader.read();
+      List<Modification> mods = (List<Modification>) reader.read();
       if (mods == null) {
         mods = new ArrayList<>();
       }
@@ -74,9 +76,19 @@ public class ModificationFile {
     }
   }
 
+  public void abort() throws IOException {
+    synchronized (this) {
+      if (modifications.size() > 0) {
+        writer.abort();
+        modifications.remove(modifications.size() - 1);
+      }
+    }
+  }
+
   /**
    * Write a modification in this file. The modification will first be written to the persistent
    * store then the memory cache.
+   *
    * @param mod the modification to be written.
    * @throws IOException if IOException is thrown when writing the modification to the store.
    */
@@ -90,6 +102,7 @@ public class ModificationFile {
 
   /**
    * Get all modifications stored in this file.
+   *
    * @return an ArrayList of modifications.
    */
   public Collection<Modification> getModifications() throws IOException {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
index 9f11bae..a6d0d49 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/LocalTextModificationAccessor.java
@@ -39,6 +39,7 @@ public class LocalTextModificationAccessor implements ModificationReader, Modifi
 
   private static final Logger logger = LoggerFactory.getLogger(LocalTextModificationAccessor.class);
   private static final String SEPARATOR = ",";
+  private static final String ABORT_MARK = "aborted";
 
   private String filePath;
   private BufferedWriter writer;
@@ -64,7 +65,11 @@ public class LocalTextModificationAccessor implements ModificationReader, Modifi
     List<Modification> modificationList = new ArrayList<>();
     try {
       while ((line = reader.readLine()) != null) {
-        modificationList.add(decodeModification(line));
+        if (line.equals(ABORT_MARK) && modificationList.size() > 0) {
+          modificationList.remove(modificationList.size() - 1);
+        } else {
+          modificationList.add(decodeModification(line));
+        }
       }
     } catch (IOException e) {
       reader.close();
@@ -82,6 +87,16 @@ public class LocalTextModificationAccessor implements ModificationReader, Modifi
   }
 
   @Override
+  public void abort() throws IOException {
+    if (writer == null) {
+      writer = new BufferedWriter(new FileWriter(filePath, true));
+    }
+    writer.write(ABORT_MARK);
+    writer.newLine();
+    writer.flush();
+  }
+
+  @Override
   public void write(Modification mod) throws IOException {
     if (writer == null) {
       writer = new BufferedWriter(new FileWriter(filePath, true));
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java
index 5c3806b..61a7d34 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/io/ModificationWriter.java
@@ -36,4 +36,9 @@ public interface ModificationWriter {
    * Release resources like streams.
    */
   void close() throws IOException;
+
+  /**
+   * Abort last modification.
+   */
+  void abort() throws IOException;
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessor.java
index d834ffa..40aa7c8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowProcessor.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.db.engine.filenode.FileNodeManager;
 import org.apache.iotdb.db.engine.memcontrol.BasicMemController;
 import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
 import org.apache.iotdb.db.engine.memtable.TimeValuePairSorter;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.pool.FlushManager;
 import org.apache.iotdb.db.engine.querycontext.MergeSeriesDataSource;
 import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
@@ -236,17 +237,19 @@ public class OverflowProcessor extends Processor {
 
   /**
    * Delete data of a timeseries whose time ranges from 0 to timestamp.
-   *
-   * @param deviceId the deviceId of the timeseries.
+   *  @param deviceId the deviceId of the timeseries.
    * @param measurementId the measurementId of the timeseries.
    * @param timestamp the upper-bound of deletion time.
    * @param version the version number of this deletion.
+   * @param updatedModFiles add successfully updated Modification files to the list, and abort them
+   *                        when exception is
    */
-  public void delete(String deviceId, String measurementId, long timestamp, long version) throws IOException {
-    workResource.delete(deviceId, measurementId, timestamp, version);
+  public void delete(String deviceId, String measurementId, long timestamp, long version,
+                     List<ModificationFile> updatedModFiles) throws IOException {
+    workResource.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
     workSupport.delete(deviceId, measurementId, timestamp, false);
     if (flushStatus.isFlushing()) {
-      mergeResource.delete(deviceId, measurementId, timestamp, version);
+      mergeResource.delete(deviceId, measurementId, timestamp, version, updatedModFiles);
       flushSupport.delete(deviceId, measurementId, timestamp, true);
     }
   }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowResource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowResource.java
index 3a528f0..9383d61 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowResource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/ioV2/OverflowResource.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.utils.QueryUtils;
 import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
@@ -294,14 +293,17 @@ public class OverflowResource {
 
   /**
    * Delete data of a timeseries whose time ranges from 0 to timestamp.
-   *
-   * @param deviceId the deviceId of the timeseries.
+   *  @param deviceId the deviceId of the timeseries.
    * @param measurementId the measurementId of the timeseries.
    * @param timestamp the upper-bound of deletion time.
+   * @param updatedModFiles add successfully updated modificationFile to this list, so that the
+   *                        deletion can be aborted when exception is thrown.
    */
-  public void delete(String deviceId, String measurementId, long timestamp, long version)
+  public void delete(String deviceId, String measurementId, long timestamp, long version,
+                     List<ModificationFile> updatedModFiles)
           throws IOException {
     modificationFile.write(new Deletion(deviceId + IoTDBConstant.PATH_SEPARATOR
             + measurementId, version, timestamp));
+    updatedModFiles.add(modificationFile);
   }
 }
\ No newline at end of file
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java
index 6907a3d..be19061 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/modification/ModificationFileTest.java
@@ -59,4 +59,40 @@ public class ModificationFileTest {
       new File(tempFileName).delete();
     }
   }
+
+  @Test
+  public void testAbort() {
+    String tempFileName = "mod.temp";
+    Modification[] modifications = new Modification[]{
+            new Deletion("p1", 1, 1),
+            new Deletion("p2", 2, 2),
+            new Deletion("p3", 3, 3),
+            new Deletion("p4", 4, 4),
+    };
+    try {
+      ModificationFile mFile = new ModificationFile(tempFileName);
+      for (int i = 0; i < 2; i++) {
+        mFile.write(modifications[i]);
+      }
+      List<Modification> modificationList = (List<Modification>) mFile.getModifications();
+      for (int i = 0; i < 2; i++) {
+        assertEquals(modifications[i], modificationList.get(i));
+      }
+
+      for (int i = 2; i < 4; i++) {
+        mFile.write(modifications[i]);
+      }
+      modificationList = (List<Modification>) mFile.getModifications();
+      mFile.abort();
+
+      for (int i = 0; i < 3; i++) {
+        assertEquals(modifications[i], modificationList.get(i));
+      }
+      mFile.close();
+    } catch (IOException e) {
+      fail(e.getMessage());
+    } finally {
+      new File(tempFileName).delete();
+    }
+  }
 }
\ No newline at end of file