You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/19 06:37:14 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated (00feb5f -> 6e183f6)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a change to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


    from 00feb5f  update
     new e8dc717  fix FNP insert
     new 6e183f6  add recover filenode processor

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bufferwriteV2/BufferWriteProcessorV2.java      |   1 -
 .../db/engine/filenode/FileNodeProcessor.java      |   4 +-
 .../filenodeV2/FileNodeProcessorStoreV2.java       |  50 ++++----
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 137 +++++++++++++--------
 .../db/engine/filenodeV2/TsFileResourceV2.java     |   3 +
 5 files changed, 121 insertions(+), 74 deletions(-)


[incubator-iotdb] 01/02: fix FNP insert

Posted by qi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit e8dc717e5abae29b6f05d772852bf61ec6dc4902
Author: qiaojialin <64...@qq.com>
AuthorDate: Wed Jun 19 14:01:27 2019 +0800

    fix FNP insert
---
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 66 +++++++++++-----------
 1 file changed, 34 insertions(+), 32 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index c1cad5b..e5902c1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.db.engine.AbstractUnsealedDataFileProcessorV2;
 import org.apache.iotdb.db.engine.bufferwriteV2.BufferWriteProcessorV2;
 import org.apache.iotdb.db.engine.filenode.CopyOnWriteLinkedList;
 import org.apache.iotdb.db.engine.filenode.FileNodeProcessorStatus;
+import org.apache.iotdb.db.engine.overflowV2.OverflowProcessorV2;
 import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.FileNodeProcessorException;
@@ -73,7 +74,7 @@ public class FileNodeProcessorV2 {
 
   // for overflow
   private List<TsFileResourceV2> unsequenceFileList;
-  private BufferWriteProcessorV2 workOverflowProcessor = null;
+  private OverflowProcessorV2 workOverflowProcessor = null;
   private CopyOnWriteLinkedList<BufferWriteProcessorV2> closingOverflowProcessor = new CopyOnWriteLinkedList<>();
 
   /**
@@ -123,8 +124,7 @@ public class FileNodeProcessorV2 {
     try {
       fileNodeProcessorStore = readStoreFromDisk();
     } catch (FileNodeProcessorException e) {
-      LOGGER.error(
-          "The fileNode processor {} encountered an error when recoverying restore " +
+      LOGGER.error("The fileNode processor {} encountered an error when recoverying restore " +
               "information.", storageGroup);
       throw new FileNodeProcessorException(e);
     }
@@ -176,24 +176,51 @@ public class FileNodeProcessorV2 {
     }
   }
 
-  private void writeUnsealedDataFile(AbstractUnsealedDataFileProcessorV2 udfProcessor, TSRecord tsRecord, boolean sequence) {
+  public boolean insert(TSRecord tsRecord) {
+    lock.writeLock().lock();
+    boolean result = true;
+
+    try {
+      // init map
+      latestTimeMap.putIfAbsent(tsRecord.deviceId, Long.MIN_VALUE);
+      latestFlushTimeMap.putIfAbsent(tsRecord.deviceId, Long.MIN_VALUE);
+
+      // write to sequence or unsequence file
+      if (tsRecord.time > latestFlushTimeMap.get(tsRecord.deviceId)) {
+        writeUnsealedDataFile(workBufferWriteProcessor, tsRecord, true);
+      } else {
+        writeUnsealedDataFile(workOverflowProcessor, tsRecord, false);
+      }
+    } catch (Exception e) {
+      LOGGER.error("insert tsRecord to unsealed data file failed, because {}", e.getMessage(), e);
+      result = false;
+    } finally {
+      lock.writeLock().unlock();
+    }
+
+    return result;
+  }
+
+
+  private void writeUnsealedDataFile(AbstractUnsealedDataFileProcessorV2 udfProcessor,
+      TSRecord tsRecord, boolean sequence) throws IOException {
     boolean result;
     // create a new BufferWriteProcessor
     if (udfProcessor == null) {
       if (sequence) {
         String baseDir = directories.getNextFolderForTsfile();
         String filePath = Paths.get(baseDir, storageGroup, tsRecord.time + "").toString();
-        workBufferWriteProcessor = new BufferWriteProcessorV2(storageGroup, new File(filePath),
+        udfProcessor = new BufferWriteProcessorV2(storageGroup, new File(filePath),
             fileSchema, versionController, this::closeBufferWriteProcessorCallBack);
         sequenceFileList.add(udfProcessor.getTsFileResource());
       } else {
+        // TODO check if the disk is full
         String baseDir = IoTDBDescriptor.getInstance().getConfig().getOverflowDataDir();
         String filePath = Paths.get(baseDir, storageGroup, tsRecord.time + "").toString();
-        workBufferWriteProcessor = new BufferWriteProcessorV2(storageGroup, new File(filePath),
+        udfProcessor = new OverflowProcessorV2(storageGroup, new File(filePath),
             fileSchema, versionController, this::closeBufferWriteProcessorCallBack);
         unsequenceFileList.add(udfProcessor.getTsFileResource());
       }
-      // TODO check if the disk is full
     }
 
     // write BufferWrite
@@ -210,31 +237,6 @@ public class FileNodeProcessorV2 {
     }
   }
 
-  public boolean insert(TSRecord tsRecord) {
-    lock.writeLock().lock();
-    boolean result = true;
-
-    try {
-
-      // init map
-      latestTimeMap.putIfAbsent(tsRecord.deviceId, Long.MIN_VALUE);
-      latestFlushTimeMap.putIfAbsent(tsRecord.deviceId, Long.MIN_VALUE);
-
-      if (tsRecord.time > latestFlushTimeMap.get(tsRecord.deviceId)) {
-
-
-
-      } else {
-        // TODO write to overflow
-      }
-    } catch (Exception e) {
-
-    } finally {
-      lock.writeLock().unlock();
-    }
-
-    return result;
-  }
 
   /**
    * ensure there must be a flush thread submitted after close() is called,


[incubator-iotdb] 02/02: add recover filenode processor

Posted by qi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 6e183f6db4b1d5d178b98104277245a522acb965
Merge: e8dc717 00feb5f
Author: qiaojialin <64...@qq.com>
AuthorDate: Wed Jun 19 14:36:59 2019 +0800

    add recover filenode processor

 .../bufferwriteV2/BufferWriteProcessorV2.java      |  1 -
 .../db/engine/filenode/FileNodeProcessor.java      |  4 +-
 .../filenodeV2/FileNodeProcessorStoreV2.java       | 50 ++++++------
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 93 +++++++++++++++-------
 .../db/engine/filenodeV2/TsFileResourceV2.java     |  3 +
 5 files changed, 98 insertions(+), 53 deletions(-)

diff --cc iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java
index ecf28ea,ecf28ea..144c66e
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwriteV2/BufferWriteProcessorV2.java
@@@ -34,7 -34,7 +34,6 @@@ import org.apache.iotdb.tsfile.file.met
  import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
  import org.apache.iotdb.tsfile.utils.Pair;
  import org.apache.iotdb.tsfile.write.schema.FileSchema;
--import org.apache.iotdb.tsfile.write.writer.NativeRestorableIOWriter;
  
  public class BufferWriteProcessorV2 extends AbstractUnsealedDataFileProcessorV2 {
  
diff --cc iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
index 0d670a7,0d670a7..54191e9
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeProcessor.java
@@@ -202,9 -202,9 +202,9 @@@ public class FileNodeProcessor extends 
  //    @Override
  //    public void act() {
  //      synchronized (fileNodeProcessorStore) {
--//        fileNodeProcessorStore.setLastUpdateTimeMap(lastUpdateTimeMap);
++//        fileNodeProcessorStore.setLatestTimeMap(lastUpdateTimeMap);
  //        addLastTimeToIntervalFile();
--//        fileNodeProcessorStore.setNewFileNodes(newFileNodes);
++//        fileNodeProcessorStore.setSequenceFileList(newFileNodes);
  //      }
  //    }
  //
diff --cc iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorStoreV2.java
index 7ae9776,7ae9776..b136511
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorStoreV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorStoreV2.java
@@@ -34,7 -34,7 +34,7 @@@ import org.apache.iotdb.tsfile.utils.Re
  /**
   * FileNodeProcessorStore is used to store information about FileNodeProcessor's status.
   * lastUpdateTime is changed and stored by BufferWrite flushMetadata or BufferWrite close.
-- * emptyTsFileResource and newFileNodes are changed and stored by Overflow flushMetadata and
++ * emptyTsFileResource and sequenceFileList are changed and stored by Overflow flushMetadata and
   * Overflow close. fileNodeProcessorState is changed and stored by the change of FileNodeProcessor's
   * status such as "work->merge merge->wait wait->work". numOfMergeFile is changed
   * and stored when FileNodeProcessor's status changes from work to merge.
@@@ -44,8 -44,8 +44,9 @@@ public class FileNodeProcessorStoreV2 i
    private static final long serialVersionUID = -54525372941897565L;
  
    private boolean isOverflowed;
--  private Map<String, Long> lastUpdateTimeMap;
--  private List<TsFileResourceV2> newFileNodes;
++  private Map<String, Long> latestTimeMap;
++  private List<TsFileResourceV2> sequenceFileList;
++  private List<TsFileResourceV2> unSequenceFileList;
    private int numOfMergeFile;
    private FileNodeProcessorStatus fileNodeProcessorStatus;
  
@@@ -53,18 -53,18 +54,17 @@@
     * Constructor of FileNodeProcessorStore.
     *
     * @param isOverflowed whether this FileNode contains unmerged Overflow operations.
--   * @param lastUpdateTimeMap the timestamp of last data point of each device in this FileNode.
--   * @param newFileNodes TsFiles in the FileNode.
++   * @param latestTimeMap the timestamp of last data point of each device in this FileNode.
++   * @param sequenceFileList TsFiles in the FileNode.
     * @param fileNodeProcessorStatus the status of the FileNode.
     * @param numOfMergeFile the number of files already merged in one merge operation.
     */
--  public FileNodeProcessorStoreV2(boolean isOverflowed, Map<String, Long> lastUpdateTimeMap,
--      List<TsFileResourceV2> newFileNodes,
--      FileNodeProcessorStatus fileNodeProcessorStatus,
++  public FileNodeProcessorStoreV2(boolean isOverflowed, Map<String, Long> latestTimeMap,
++      List<TsFileResourceV2> sequenceFileList, FileNodeProcessorStatus fileNodeProcessorStatus,
        int numOfMergeFile) {
      this.isOverflowed = isOverflowed;
--    this.lastUpdateTimeMap = lastUpdateTimeMap;
--    this.newFileNodes = newFileNodes;
++    this.latestTimeMap = latestTimeMap;
++    this.sequenceFileList = sequenceFileList;
      this.fileNodeProcessorStatus = fileNodeProcessorStatus;
      this.numOfMergeFile = numOfMergeFile;
    }
@@@ -72,14 -72,14 +72,14 @@@
    public void serialize(OutputStream outputStream) throws IOException {
      ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
      ReadWriteIOUtils.write(this.isOverflowed, byteArrayOutputStream);
--    // lastUpdateTimeMap
--    ReadWriteIOUtils.write(lastUpdateTimeMap.size(), byteArrayOutputStream);
--    for (Entry<String, Long> entry : lastUpdateTimeMap.entrySet()) {
++    // latestTimeMap
++    ReadWriteIOUtils.write(latestTimeMap.size(), byteArrayOutputStream);
++    for (Entry<String, Long> entry : latestTimeMap.entrySet()) {
        ReadWriteIOUtils.write(entry.getKey(), byteArrayOutputStream);
        ReadWriteIOUtils.write(entry.getValue(), byteArrayOutputStream);
      }
--    ReadWriteIOUtils.write(this.newFileNodes.size(), byteArrayOutputStream);
--    for (TsFileResourceV2 tsFileResource : this.newFileNodes) {
++    ReadWriteIOUtils.write(this.sequenceFileList.size(), byteArrayOutputStream);
++    for (TsFileResourceV2 tsFileResource : this.sequenceFileList) {
        tsFileResource.serialize(byteArrayOutputStream);
      }
      ReadWriteIOUtils.write(this.numOfMergeFile, byteArrayOutputStream);
@@@ -126,20 -126,20 +126,24 @@@
      this.fileNodeProcessorStatus = fileNodeProcessorStatus;
    }
  
--  public Map<String, Long> getLastUpdateTimeMap() {
--    return new HashMap<>(lastUpdateTimeMap);
++  public Map<String, Long> getLatestTimeMap() {
++    return new HashMap<>(latestTimeMap);
    }
  
--  public void setLastUpdateTimeMap(Map<String, Long> lastUpdateTimeMap) {
--    this.lastUpdateTimeMap = lastUpdateTimeMap;
++  public void setLatestTimeMap(Map<String, Long> latestTimeMap) {
++    this.latestTimeMap = latestTimeMap;
    }
  
--  public List<TsFileResourceV2> getNewFileNodes() {
--    return newFileNodes;
++  public List<TsFileResourceV2> getSequenceFileList() {
++    return sequenceFileList;
    }
  
--  public void setNewFileNodes(List<TsFileResourceV2> newFileNodes) {
--    this.newFileNodes = newFileNodes;
++  public void setSequenceFileList(List<TsFileResourceV2> sequenceFileList) {
++    this.sequenceFileList = sequenceFileList;
++  }
++
++  public List<TsFileResourceV2> getUnSequenceFileList() {
++    return unSequenceFileList;
    }
  
    public int getNumOfMergeFile() {
diff --cc iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index e5902c1,89a12e0..f28bdfd
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@@ -22,7 -22,7 +22,6 @@@ import java.io.File
  import java.io.FileInputStream;
  import java.io.FileOutputStream;
  import java.io.IOException;
--import java.io.UTFDataFormatException;
  import java.nio.file.Paths;
  import java.util.ArrayList;
  import java.util.HashMap;
@@@ -43,9 -43,9 +42,12 @@@ import org.apache.iotdb.db.engine.versi
  import org.apache.iotdb.db.engine.version.VersionController;
  import org.apache.iotdb.db.exception.FileNodeProcessorException;
  import org.apache.iotdb.db.metadata.MManager;
--import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
++import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
++import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
++import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
  import org.apache.iotdb.tsfile.write.record.TSRecord;
  import org.apache.iotdb.tsfile.write.schema.FileSchema;
++import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -61,10 -61,10 +63,10 @@@ public class FileNodeProcessorV2 
  
    private FileSchema fileSchema;
  
--  /**
--   * device -> tsfile list
--   */
--  private Map<String, List<TsFileResourceV2>> invertedIndexOfFiles = new HashMap<>();
++//  /**
++//   * device -> tsfile list
++//   */
++//  private Map<String, List<TsFileResourceV2>> invertedIndexOfFiles = new HashMap<>();
  
    // for bufferwrite
    //includes sealed and unsealed tsfiles
@@@ -73,14 -73,14 +75,15 @@@
    private CopyOnWriteLinkedList<BufferWriteProcessorV2> closingBufferWriteProcessor = new CopyOnWriteLinkedList<>();
  
    // for overflow
--  private List<TsFileResourceV2> unsequenceFileList;
++  private List<TsFileResourceV2> unSequenceFileList;
    private OverflowProcessorV2 workOverflowProcessor = null;
-   private CopyOnWriteLinkedList<BufferWriteProcessorV2> closingOverflowProcessor = new CopyOnWriteLinkedList<>();
++
+   private CopyOnWriteLinkedList<OverflowProcessorV2> closingOverflowProcessor = new CopyOnWriteLinkedList<>();
  
    /**
     * device -> global latest timestamp of each device
     */
--  private Map<String, Long> latestTimeMap = new HashMap<>();
++  private Map<String, Long> latestTimeMap;
  
    /**
     * device -> largest timestamp of the latest memtable to be submitted to asyncFlush
@@@ -97,7 -97,7 +100,6 @@@
    private FileNodeProcessorStoreV2 fileNodeProcessorStore;
    private final Object fileNodeRestoreLock = new Object();
  
--
    public FileNodeProcessorV2(String baseDir, String storageGroup) throws FileNodeProcessorException {
      this.storageGroup = storageGroup;
      lock = new ReentrantReadWriteLock();
@@@ -105,8 -105,8 +107,7 @@@
      File storageGroupDir = new File(baseDir + storageGroup);
      if (!storageGroupDir.exists()) {
        storageGroupDir.mkdir();
--      LOGGER.info(
--          "The directory of the storage group {} doesn't exist. Create a new " +
++      LOGGER.info("The directory of the storage group {} doesn't exist. Create a new " +
                "directory {}", storageGroup, storageGroupDir.getAbsolutePath());
      }
  
@@@ -119,18 -119,19 +120,21 @@@
        LOGGER.info("The restore directory of the filenode processor {} doesn't exist. Create new " +
                "directory {}", storageGroup, restoreFolder.getAbsolutePath());
      }
--    fileNodeRestoreFilePath = new File(restoreFolder, storageGroup + RESTORE_FILE_SUFFIX)
--        .getPath();
++
++    fileNodeRestoreFilePath = new File(restoreFolder, storageGroup + RESTORE_FILE_SUFFIX).getPath();
++
      try {
--      fileNodeProcessorStore = readStoreFromDisk();
++      fileNodeProcessorStore = readStoreFromDiskOrCreate();
      } catch (FileNodeProcessorException e) {
-       LOGGER.error("The fileNode processor {} encountered an error when recoverying restore " +
 -      LOGGER.error(
 -          "The fileNode processor {} encountered an error when recoverying restore " +
++      LOGGER.error("The fileNode processor {} encountered an error when recovering restore " +
                "information.", storageGroup);
        throw new FileNodeProcessorException(e);
      }
--    // TODO deep clone the lastupdate time, change the getNewFileNodes to V2
--//    sequenceFileList = fileNodeProcessorStore.getNewFileNodes();
--    invertedIndexOfFiles = new HashMap<>();
++
++    // TODO deep clone the lastupdate time, change the getSequenceFileList to V2
++    sequenceFileList = fileNodeProcessorStore.getSequenceFileList();
++    unSequenceFileList = fileNodeProcessorStore.getUnSequenceFileList();
++    latestTimeMap = fileNodeProcessorStore.getLatestTimeMap();
  
      /**
       * version controller
@@@ -141,10 -142,10 +145,42 @@@
        throw new FileNodeProcessorException(e);
      }
  
++    // construct the file schema
++    this.fileSchema = constructFileSchema(storageGroup);
 +  }
 +
++  private FileSchema constructFileSchema(String storageGroupName) {
++    List<MeasurementSchema> columnSchemaList;
++    columnSchemaList = mManager.getSchemaForFileName(storageGroupName);
++
++    FileSchema schema = new FileSchema();
++    for (MeasurementSchema measurementSchema : columnSchemaList) {
++      schema.registerMeasurement(measurementSchema);
++    }
++    return schema;
++
++  }
 +
-   private FileNodeProcessorStoreV2 readStoreFromDisk() throws FileNodeProcessorException {
++
++  /**
++   * add time series.
++   */
++  public void addTimeSeries(String measurementId, TSDataType dataType, TSEncoding encoding,
++      CompressionType compressor, Map<String, String> props) {
++    lock.writeLock().lock();
++    try {
++      fileSchema.registerMeasurement(new MeasurementSchema(measurementId, dataType, encoding,
++          compressor, props));
++    } finally {
++      lock.writeLock().unlock();
++    }
+   }
+ 
+ 
 -  private FileNodeProcessorStoreV2 readStoreFromDisk() throws FileNodeProcessorException {
++  /**
++   * read file node store from disk or create a new one
++   */
++  private FileNodeProcessorStoreV2 readStoreFromDiskOrCreate() throws FileNodeProcessorException {
  
      synchronized (fileNodeRestoreLock) {
        File restoreFile = new File(fileNodeRestoreFilePath);
@@@ -176,34 -177,8 +212,35 @@@
      }
    }
  
 -  private void writeUnsealedDataFile(AbstractUnsealedDataFileProcessorV2 udfProcessor, TSRecord tsRecord, boolean sequence)
 -      throws IOException {
++
 +  public boolean insert(TSRecord tsRecord) {
 +    lock.writeLock().lock();
 +    boolean result = true;
 +
 +    try {
 +      // init map
 +      latestTimeMap.putIfAbsent(tsRecord.deviceId, Long.MIN_VALUE);
 +      latestFlushTimeMap.putIfAbsent(tsRecord.deviceId, Long.MIN_VALUE);
 +
 +      // write to sequence or unsequence file
 +      if (tsRecord.time > latestFlushTimeMap.get(tsRecord.deviceId)) {
-         writeUnsealedDataFile(workBufferWriteProcessor, tsRecord, true);
++        result = writeUnsealedDataFile(workBufferWriteProcessor, tsRecord, true);
 +      } else {
-         writeUnsealedDataFile(workOverflowProcessor, tsRecord, false);
++        result = writeUnsealedDataFile(workOverflowProcessor, tsRecord, false);
 +      }
 +    } catch (Exception e) {
 +      LOGGER.error("insert tsRecord to unsealed data file failed, because {}", e.getMessage(), e);
 +      result = false;
 +    } finally {
 +      lock.writeLock().unlock();
 +    }
 +
 +    return result;
 +  }
 +
 +
-   private void writeUnsealedDataFile(AbstractUnsealedDataFileProcessorV2 udfProcessor,
++  private boolean writeUnsealedDataFile(AbstractUnsealedDataFileProcessorV2 udfProcessor,
 +      TSRecord tsRecord, boolean sequence) throws IOException {
      boolean result;
      // create a new BufferWriteProcessor
      if (udfProcessor == null) {
@@@ -219,8 -193,9 +256,8 @@@
          String filePath = Paths.get(baseDir, storageGroup, tsRecord.time + "").toString();
          udfProcessor = new OverflowProcessorV2(storageGroup, new File(filePath),
              fileSchema, versionController, this::closeBufferWriteProcessorCallBack);
--        unsequenceFileList.add(udfProcessor.getTsFileResource());
++        unSequenceFileList.add(udfProcessor.getTsFileResource());
        }
 -      // TODO check if the disk is full
      }
  
      // write BufferWrite
@@@ -235,9 -210,32 +272,11 @@@
      if (udfProcessor.shouldFlush()) {
        flushAndCheckClose(udfProcessor, sequence);
      }
 -  }
 -
 -  public boolean insert(TSRecord tsRecord) {
 -    lock.writeLock().lock();
 -    boolean result = true;
 -
 -    try {
 -
 -      // init map
 -      latestTimeMap.putIfAbsent(tsRecord.deviceId, Long.MIN_VALUE);
 -      latestFlushTimeMap.putIfAbsent(tsRecord.deviceId, Long.MIN_VALUE);
 -
 -      if (tsRecord.time > latestFlushTimeMap.get(tsRecord.deviceId)) {
 -        writeUnsealedDataFile(workBufferWriteProcessor, tsRecord, true);
 -      } else {
 -        writeUnsealedDataFile(workOverflowProcessor, tsRecord, false);
 -      }
 -    } catch (Exception e) {
 -
 -    } finally {
 -      lock.writeLock().unlock();
 -    }
+ 
+     return result;
    }
  
 +
    /**
     * ensure there must be a flush thread submitted after close() is called,
     * therefore the close task will be executed by a flush thread.
@@@ -281,7 -279,7 +320,7 @@@
    private void closeBufferWriteProcessorCallBack(Object bufferWriteProcessor) {
      closingBufferWriteProcessor.remove((BufferWriteProcessorV2) bufferWriteProcessor);
      synchronized (fileNodeProcessorStore) {
--      fileNodeProcessorStore.setLastUpdateTimeMap(latestTimeMap);
++      fileNodeProcessorStore.setLatestTimeMap(latestTimeMap);
  
        if (!sequenceFileList.isEmpty()) {
          // end time with one start time
@@@ -293,7 -291,7 +332,7 @@@
          }
          resource.setEndTimeMap(endTimeMap);
        }
--      fileNodeProcessorStore.setNewFileNodes(sequenceFileList);
++      fileNodeProcessorStore.setSequenceFileList(sequenceFileList);
        try {
          writeStoreToDisk(fileNodeProcessorStore);
        } catch (FileNodeProcessorException e) {
diff --cc iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
index b49c00a,b49c00a..d587dc1
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java
@@@ -71,6 -71,6 +71,9 @@@ public class TsFileResourceV2 
      this.endTimeMap = endTimeMap;
    }
  
++  public Map<String, Long> getEndTimeMap() {
++    return endTimeMap;
++  }
  
    public void serialize(OutputStream outputStream) throws IOException {
  //    ReadWriteIOUtils.write(this.overflowChangeType.serialize(), outputStream);