You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/20 08:33:19 UTC

[incubator-iotdb] 02/03: add filenodeManager

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit e89b1b64d7cdf8279f32497c56e3989795624c84
Author: lta <li...@163.com>
AuthorDate: Thu Jun 20 16:32:43 2019 +0800

    add filenodeManager
---
 .../iotdb/db/engine/UnsealedTsFileProcessorV2.java |   2 +-
 .../iotdb/db/engine/filenode/FileNodeManager.java  |  59 ++--
 .../db/engine/filenodeV2/FileNodeManagerV2.java    | 323 ++++++++++++++++++++-
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 121 ++++++--
 4 files changed, 443 insertions(+), 62 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java
index 7111873..5249560 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/UnsealedTsFileProcessorV2.java
@@ -162,7 +162,7 @@ public class UnsealedTsFileProcessorV2 {
    */
   public void flushOneMemTable() throws IOException {
     IMemTable memTableToFlush = flushingMemTables.pollFirst();
-    // null memtable only appears when calling forceClose()
+    // null memtable only appears when calling asyncForceClose()
     if (memTableToFlush != null) {
       MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(writer, storageGroupName,
           this::releaseFlushedMemTable);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
index 5dfa089..9e42101 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
@@ -930,34 +930,6 @@ public class FileNodeManager implements IStatistic, IService {
   }
 
   /**
-   * try to setCloseMark the filenode processor. The name of filenode processor is processorName
-   */
-  private boolean closeOneProcessor(String processorName) throws FileNodeManagerException {
-    if (!processorMap.containsKey(processorName)) {
-      return true;
-    }
-
-    Processor processor = processorMap.get(processorName);
-    if (processor.tryWriteLock()) {
-      try {
-        if (processor.canBeClosed()) {
-          processor.close();
-          return true;
-        } else {
-          return false;
-        }
-      } catch (ProcessorException e) {
-        LOGGER.error("Close the filenode processor {} error.", processorName, e);
-        throw new FileNodeManagerException(e);
-      } finally {
-        processor.writeUnlock();
-      }
-    } else {
-      return false;
-    }
-  }
-
-  /**
    * delete one filenode.
    */
   public void deleteOneFileNode(String processorName) throws FileNodeManagerException {
@@ -1096,8 +1068,39 @@ public class FileNodeManager implements IStatistic, IService {
     }
   }
 
+
+  /**
+   * try to setCloseMark the filenode processor. The name of filenode processor is processorName
+   * notice: this method has the same function with close()
+   */
+  private boolean closeOneProcessor(String processorName) throws FileNodeManagerException {
+    if (!processorMap.containsKey(processorName)) {
+      return true;
+    }
+
+    Processor processor = processorMap.get(processorName);
+    if (processor.tryWriteLock()) {
+      try {
+        if (processor.canBeClosed()) {
+          processor.close();
+          return true;
+        } else {
+          return false;
+        }
+      } catch (ProcessorException e) {
+        LOGGER.error("Close the filenode processor {} error.", processorName, e);
+        throw new FileNodeManagerException(e);
+      } finally {
+        processor.writeUnlock();
+      }
+    } else {
+      return false;
+    }
+  }
+
   /**
    * try to setCloseMark the filenode processor.
+   * notice: This method has the same function with closeOneProcessor()
    */
   private void close(String processorName) throws FileNodeManagerException {
     if (!processorMap.containsKey(processorName)) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
index 3d0ad48..d5938dc 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
@@ -19,23 +19,45 @@
 package org.apache.iotdb.db.engine.filenodeV2;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.Directories;
+import org.apache.iotdb.db.engine.filenode.FileNodeProcessor;
+import org.apache.iotdb.db.engine.filenode.TsFileResource;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
 import org.apache.iotdb.db.exception.FileNodeManagerException;
 import org.apache.iotdb.db.exception.FileNodeProcessorException;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.monitor.IStatistic;
+import org.apache.iotdb.db.monitor.MonitorConstants;
+import org.apache.iotdb.db.monitor.StatMonitor;
+import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.utils.FilePathUtils;
+import org.apache.iotdb.db.writelog.node.WriteLogNode;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class FileNodeManagerV2 implements IService {
+public class FileNodeManagerV2 implements IStatistic, IService {
 
   private static final Logger LOGGER = LoggerFactory
       .getLogger(org.apache.iotdb.db.engine.filenodeV2.FileNodeManagerV2.class);
@@ -52,7 +74,7 @@ public class FileNodeManagerV2 implements IService {
    * This map is used to manage all filenode processor,<br> the key is filenode name which is
    * storage group seriesPath.
    */
-  private ConcurrentHashMap<String, FileNodeProcessorV2> processorMap;
+  private final ConcurrentHashMap<String, FileNodeProcessorV2> processorMap;
 
   private static final FileNodeManagerV2 INSTANCE = new FileNodeManagerV2();
 
@@ -65,6 +87,11 @@ public class FileNodeManagerV2 implements IService {
    */
   private volatile FileNodeManagerStatus fileNodeManagerStatus = FileNodeManagerStatus.NONE;
 
+  /**
+   * There is no need to add concurrently
+   **/
+  private HashMap<String, AtomicLong> statParamsHashMap;
+
   private enum FileNodeManagerStatus {
     NONE, MERGE, CLOSE
   }
@@ -93,7 +120,11 @@ public class FileNodeManagerV2 implements IService {
 
   @Override
   public void stop() {
-
+    try {
+      syncCloseAllProcessor();
+    } catch (FileNodeManagerException e) {
+      LOGGER.error("Failed to setCloseMark file node manager because .", e);
+    }
   }
 
   @Override
@@ -103,7 +134,7 @@ public class FileNodeManagerV2 implements IService {
 
 
   private FileNodeProcessorV2 getProcessor(String devicePath)
-      throws FileNodeManagerException, FileNodeProcessorException {
+      throws FileNodeManagerException {
     String filenodeName;
     try {
       // return the storage group name
@@ -121,8 +152,14 @@ public class FileNodeManagerV2 implements IService {
         if (processor == null) {
           LOGGER.debug("construct a processor instance, the storage group is {}, Thread is {}",
               filenodeName, Thread.currentThread().getId());
-          processor = new FileNodeProcessorV2(baseDir, filenodeName);
-          processorMap.put(filenodeName, processor);
+          try {
+            processor = new FileNodeProcessorV2(baseDir, filenodeName);
+          } catch (FileNodeProcessorException e) {
+            throw new FileNodeManagerException(e);
+          }
+          synchronized (processorMap) {
+            processorMap.put(filenodeName, processor);
+          }
         }
       }
     }
@@ -130,13 +167,92 @@ public class FileNodeManagerV2 implements IService {
   }
 
 
+  private void updateStatHashMapWhenFail(TSRecord tsRecord) {
+    statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_REQ_FAIL.name())
+        .incrementAndGet();
+    statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS_FAIL.name())
+        .addAndGet(tsRecord.dataPointList.size());
+  }
+
+  /**
+   * get stats parameter hash map.
+   *
+   * @return the key represents the params' name, values is AtomicLong type
+   */
+  @Override
+  public Map<String, AtomicLong> getStatParamsHashMap() {
+    return statParamsHashMap;
+  }
+
+  @Override
+  public List<String> getAllPathForStatistic() {
+    List<String> list = new ArrayList<>();
+    for (MonitorConstants.FileNodeManagerStatConstants statConstant :
+        MonitorConstants.FileNodeManagerStatConstants.values()) {
+      list.add(MonitorConstants.STAT_STORAGE_DELTA_NAME + MonitorConstants.MONITOR_PATH_SEPARATOR
+          + statConstant.name());
+    }
+    return list;
+  }
+
+  @Override
+  public Map<String, TSRecord> getAllStatisticsValue() {
+    long curTime = System.currentTimeMillis();
+    TSRecord tsRecord = StatMonitor
+        .convertToTSRecord(getStatParamsHashMap(), MonitorConstants.STAT_STORAGE_DELTA_NAME,
+            curTime);
+    HashMap<String, TSRecord> ret = new HashMap<>();
+    ret.put(MonitorConstants.STAT_STORAGE_DELTA_NAME, tsRecord);
+    return ret;
+  }
+
+  /**
+   * Init Stat MetaDta.
+   */
+  @Override
+  public void registerStatMetadata() {
+    Map<String, String> hashMap = new HashMap<>();
+    for (MonitorConstants.FileNodeManagerStatConstants statConstant :
+        MonitorConstants.FileNodeManagerStatConstants.values()) {
+      hashMap
+          .put(MonitorConstants.STAT_STORAGE_DELTA_NAME + MonitorConstants.MONITOR_PATH_SEPARATOR
+              + statConstant.name(), MonitorConstants.DATA_TYPE_INT64);
+    }
+    StatMonitor.getInstance().registerStatStorageGroup(hashMap);
+  }
+
+  private void updateStat(boolean isMonitor, TSRecord tsRecord) {
+    if (!isMonitor) {
+      statParamsHashMap.get(MonitorConstants.FileNodeManagerStatConstants.TOTAL_POINTS.name())
+          .addAndGet(tsRecord.dataPointList.size());
+    }
+  }
+
+  /**
+   * This function is just for unit test.
+   */
+  public synchronized void resetFileNodeManager() {
+    for (String key : statParamsHashMap.keySet()) {
+      statParamsHashMap.put(key, new AtomicLong());
+    }
+    processorMap.clear();
+  }
+
+
+
   /**
    * insert TsRecord into storage group.
    *
    * @param tsRecord input Data
+   * @param isMonitor if true, the insertion is done by StatMonitor and the statistic Info will not
+   * be recorded. if false, the statParamsHashMap will be updated.
    * @return an int value represents the insert type, 0: failed; 1: overflow; 2: bufferwrite
    */
-  public boolean insert(TSRecord tsRecord) {
+  public int insert(TSRecord tsRecord, boolean isMonitor) throws FileNodeManagerException {
+
+    checkTimestamp(tsRecord);
+
+    updateStat(isMonitor, tsRecord);
 
     FileNodeProcessorV2 fileNodeProcessor;
     try {
@@ -144,9 +260,10 @@ public class FileNodeManagerV2 implements IService {
     } catch (Exception e) {
       LOGGER.warn("get FileNodeProcessor of device {} failed, because {}", tsRecord.deviceId,
           e.getMessage(), e);
-      return false;
+      throw new FileNodeManagerException(e);
     }
 
+    // TODO monitor: update statistics
     return fileNodeProcessor.insert(tsRecord);
   }
 
@@ -169,4 +286,194 @@ public class FileNodeManagerV2 implements IService {
     }
   }
 
+  private void checkTimestamp(TSRecord tsRecord) throws FileNodeManagerException {
+    if (tsRecord.time < 0) {
+      LOGGER.error("The insert time lt 0, {}.", tsRecord);
+      throw new FileNodeManagerException("The insert time lt 0, the tsrecord is " + tsRecord);
+    }
+  }
+
+  /**
+   * recovery the filenode processor.
+   */
+  public void recovery() {
+    // TODO
+  }
+
+
+  private void writeLog(TSRecord tsRecord, boolean isMonitor, WriteLogNode logNode)
+      throws FileNodeManagerException {
+    // TODO
+  }
+
+
+  /**
+   * update data.
+   */
+  public void update(String deviceId, String measurementId, long startTime, long endTime,
+      TSDataType type, String v) {
+    // TODO
+  }
+
+  /**
+   * delete data.
+   */
+  public void delete(String deviceId, String measurementId, long timestamp)
+      throws FileNodeManagerException {
+    // TODO
+  }
+
+  private void delete(String processorName,
+      Iterator<Entry<String, FileNodeProcessor>> processorIterator)
+      throws FileNodeManagerException {
+    // TODO
+  }
+
+
+  /**
+   * begin query.
+   *
+   * @param deviceId queried deviceId
+   * @return a query token for the device.
+   */
+  public int beginQuery(String deviceId) throws FileNodeManagerException {
+    // TODO
+    return -1;
+  }
+
+  /**
+   * end query.
+   */
+  public void endQuery(String deviceId, int token) throws FileNodeManagerException {
+    // TODO
+  }
+
+  /**
+   * query data.
+   */
+  public QueryDataSourceV2 query(SingleSeriesExpression seriesExpression, QueryContext context)
+      throws FileNodeManagerException {
+    String deviceId = seriesExpression.getSeriesPath().getDevice();
+    String measurementId = seriesExpression.getSeriesPath().getMeasurement();
+    FileNodeProcessorV2 fileNodeProcessor = getProcessor(deviceId);
+    LOGGER.debug("Get the FileNodeProcessor: filenode is {}, query.",
+        fileNodeProcessor.getStorageGroupName());
+    return fileNodeProcessor.query(deviceId, measurementId);
+  }
+
+  /**
+   * Append one specified tsfile to the storage group. <b>This method is only provided for
+   * transmission module</b>
+   *
+   * @param fileNodeName the seriesPath of storage group
+   * @param appendFile the appended tsfile information
+   */
+  public boolean appendFileToFileNode(String fileNodeName, TsFileResource appendFile,
+      String appendFilePath) throws FileNodeManagerException {
+    // TODO
+    return true;
+  }
+
+  /**
+   * get all overlap tsfiles which are conflict with the appendFile.
+   *
+   * @param fileNodeName the seriesPath of storage group
+   * @param appendFile the appended tsfile information
+   */
+  public List<String> getOverlapFilesFromFileNode(String fileNodeName, TsFileResource appendFile,
+      String uuid) throws FileNodeManagerException {
+    // TODO
+    return null;
+  }
+
+
+  /**
+   * merge all overflowed filenode.
+   *
+   * @throws FileNodeManagerException FileNodeManagerException
+   */
+  public void mergeAll() throws FileNodeManagerException {
+    // TODO
+  }
+
+  /**
+   * try to setCloseMark the filenode processor. The name of filenode processor is processorName
+   */
+  private boolean tryToCloseFileNodeProcessor(String processorName) throws FileNodeManagerException {
+    // TODO
+    return false;
+  }
+
+  /**
+   * Force to setCloseMark the filenode processor.
+   */
+  public void deleteOneFileNode(String processorName) throws FileNodeManagerException {
+    if (fileNodeManagerStatus != FileNodeManagerStatus.NONE) {
+      return;
+    }
+
+    fileNodeManagerStatus = FileNodeManagerStatus.CLOSE;
+    try {
+      if (processorMap.containsKey(processorName)) {
+        deleteFileNodeBlocked(processorName);
+      }
+    } catch (IOException e) {
+      LOGGER.error("Delete the filenode processor {} error.", processorName, e);
+      throw new FileNodeManagerException(e);
+    } finally {
+      fileNodeManagerStatus = FileNodeManagerStatus.NONE;
+    }
+  }
+
+  private void deleteFileNodeBlocked(String processorName) throws IOException {
+    LOGGER.info("Forced to delete the filenode processor {}", processorName);
+    FileNodeProcessorV2 processor = processorMap.get(processorName);
+    processor.syncCloseFileNode(() -> {
+      String fileNodePath = IoTDBDescriptor.getInstance().getConfig().getFileNodeDir();
+      fileNodePath = FilePathUtils.regularizePath(fileNodePath) + processorName;
+      try {
+        FileUtils.deleteDirectory(new File(fileNodePath));
+      } catch (IOException e) {
+        LOGGER.error("Delete tsfiles failed", e);
+      }
+      synchronized (processorMap) {
+        processorMap.remove(processorName);
+      }
+      return true;
+    });
+  }
+
+
+  /**
+   * add time series.
+   */
+  public void addTimeSeries(Path path, TSDataType dataType, TSEncoding encoding,
+      CompressionType compressor,
+      Map<String, String> props) throws FileNodeManagerException {
+    FileNodeProcessorV2 fileNodeProcessor = getProcessor(path.getFullPath());
+    fileNodeProcessor.addTimeSeries(path.getMeasurement(), dataType, encoding, compressor, props);
+  }
+
+
+  /**
+   * delete all filenode.
+   */
+  public synchronized boolean deleteAll() throws FileNodeManagerException {
+    LOGGER.info("Start deleting all filenode");
+    // TODO
+    return true;
+  }
+
+  /**
+   * Sync asyncCloseOneProcessor all file node processors.
+   */
+  public void syncCloseAllProcessor() throws FileNodeManagerException {
+    LOGGER.info("Start closing all filenode processor");
+    synchronized (processorMap){
+      for(FileNodeProcessorV2 processor: processorMap.values()){
+        processor.asyncForceClose();
+      }
+    }
+  }
+
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index 6dedfa8..ce8fbba 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -23,12 +23,13 @@ import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.Directories;
 import org.apache.iotdb.db.engine.UnsealedTsFileProcessorV2;
@@ -66,12 +67,12 @@ public class FileNodeProcessorV2 {
 
   // includes sealed and unsealed sequnce tsfiles
   private List<TsFileResourceV2> sequenceFileList = new ArrayList<>();
-  private UnsealedTsFileProcessorV2 workUnsealedSequenceTsFileProcessor = null;
+  private UnsealedTsFileProcessorV2 workSequenceTsFileProcessor = null;
   private CopyOnReadLinkedList<UnsealedTsFileProcessorV2> closingSequenceTsFileProcessor = new CopyOnReadLinkedList<>();
 
   // includes sealed and unsealed unsequnce tsfiles
   private List<TsFileResourceV2> unSequenceFileList = new ArrayList<>();
-  private UnsealedTsFileProcessorV2 workUnsealedUnSequenceTsFileProcessor = null;
+  private UnsealedTsFileProcessorV2 workUnSequenceTsFileProcessor = null;
   private CopyOnReadLinkedList<UnsealedTsFileProcessorV2> closingUnSequenceTsFileProcessor = new CopyOnReadLinkedList<>();
 
   /**
@@ -88,12 +89,20 @@ public class FileNodeProcessorV2 {
 
   private final ReadWriteLock lock;
 
+  private Condition closeFileNodeCondition;
+
+  /**
+   * Mark whether to close file node
+   */
+  private volatile boolean toBeClosed;
+
   private VersionController versionController;
 
   public FileNodeProcessorV2(String absoluteBaseDir, String storageGroupName)
       throws FileNodeProcessorException {
     this.storageGroupName = storageGroupName;
     lock = new ReentrantReadWriteLock();
+    closeFileNodeCondition = lock.writeLock().newCondition();
 
     File storageGroupDir = new File(absoluteBaseDir, storageGroupName);
     if (!storageGroupDir.exists()) {
@@ -160,29 +169,40 @@ public class FileNodeProcessorV2 {
     }
   }
 
-  public boolean insert(TSRecord tsRecord) {
+  /**
+   *
+   * @param tsRecord
+   * @return -1: failed, 1: Overflow, 2:Bufferwrite
+   */
+  public int insert(TSRecord tsRecord) {
     lock.writeLock().lock();
-    boolean result;
+    int insertResult;
 
     try {
+      if(toBeClosed){
+        return -1;
+      }
       // init map
       latestTimeForEachDevice.putIfAbsent(tsRecord.deviceId, Long.MIN_VALUE);
       latestFlushedTimeForEachDevice.putIfAbsent(tsRecord.deviceId, Long.MIN_VALUE);
 
+      boolean result;
       // write to sequence or unsequence file
       if (tsRecord.time > latestFlushedTimeForEachDevice.get(tsRecord.deviceId)) {
-        result = writeUnsealedDataFile(workUnsealedSequenceTsFileProcessor, tsRecord, true);
+        result = writeUnsealedDataFile(workSequenceTsFileProcessor, tsRecord, true);
+        insertResult = result ? 1 : -1;
       } else {
-        result = writeUnsealedDataFile(workUnsealedUnSequenceTsFileProcessor, tsRecord, false);
+        result = writeUnsealedDataFile(workUnSequenceTsFileProcessor, tsRecord, false);
+        insertResult = result ? 2 : -1;
       }
     } catch (Exception e) {
       LOGGER.error("insert tsRecord to unsealed data file failed, because {}", e.getMessage(), e);
-      result = false;
+      insertResult = -1;
     } finally {
       lock.writeLock().unlock();
     }
 
-    return result;
+    return insertResult;
   }
 
   private boolean writeUnsealedDataFile(UnsealedTsFileProcessorV2 unsealedTsFileProcessor,
@@ -282,10 +302,10 @@ public class FileNodeProcessorV2 {
     if (unsealedTsFileProcessor.shouldClose()) {
       if (sequence) {
         closingSequenceTsFileProcessor.add(unsealedTsFileProcessor);
-        workUnsealedSequenceTsFileProcessor = null;
+        workSequenceTsFileProcessor = null;
       } else {
         closingUnSequenceTsFileProcessor.add(unsealedTsFileProcessor);
-        workUnsealedUnSequenceTsFileProcessor = null;
+        workUnSequenceTsFileProcessor = null;
       }
       unsealedTsFileProcessor.setCloseMark();
     }
@@ -304,29 +324,80 @@ public class FileNodeProcessorV2 {
    */
   // TODO please consider concurrency with query and write method.
   private void closeUnsealedTsFileProcessor(UnsealedTsFileProcessorV2 bufferWriteProcessor) {
-    closingSequenceTsFileProcessor.remove(bufferWriteProcessor);
-    // end time with one start time
-    Map<String, Long> endTimeMap = new HashMap<>();
-    TsFileResourceV2 resource = workUnsealedSequenceTsFileProcessor.getTsFileResource();
-    synchronized (resource) {
-      for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
-        String deviceId = startTime.getKey();
-        endTimeMap.put(deviceId, latestTimeForEachDevice.get(deviceId));
+    lock.writeLock().unlock();
+    try {
+      closingSequenceTsFileProcessor.remove(bufferWriteProcessor);
+      // end time with one start time
+      Map<String, Long> endTimeMap = new HashMap<>();
+      TsFileResourceV2 resource = workSequenceTsFileProcessor.getTsFileResource();
+      synchronized (resource) {
+        for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
+          String deviceId = startTime.getKey();
+          endTimeMap.put(deviceId, latestTimeForEachDevice.get(deviceId));
+        }
+        resource.setEndTimeMap(endTimeMap);
       }
-      resource.setEndTimeMap(endTimeMap);
+      closeFileNodeCondition.signal();
+    }finally {
+      lock.writeLock().unlock();
     }
   }
 
-  public void forceClose() {
+  public void asyncForceClose() {
     lock.writeLock().lock();
     try {
-      if (workUnsealedSequenceTsFileProcessor != null) {
-        closingSequenceTsFileProcessor.add(workUnsealedSequenceTsFileProcessor);
-        workUnsealedSequenceTsFileProcessor.forceClose();
-        workUnsealedSequenceTsFileProcessor = null;
+      if (workSequenceTsFileProcessor != null) {
+        closingSequenceTsFileProcessor.add(workSequenceTsFileProcessor);
+        workSequenceTsFileProcessor.forceClose();
+        workSequenceTsFileProcessor = null;
+      }
+      if (workUnSequenceTsFileProcessor != null) {
+        closingUnSequenceTsFileProcessor.add(workUnSequenceTsFileProcessor);
+        workUnSequenceTsFileProcessor.forceClose();
+        workUnSequenceTsFileProcessor = null;
       }
     } finally {
       lock.writeLock().unlock();
     }
   }
+
+  /**
+   * Block this method until this file node can be closed.
+   */
+  public void syncCloseFileNode(Supplier<Boolean> removeProcessorFromManager){
+    lock.writeLock().lock();
+    try {
+      asyncForceClose();
+      toBeClosed = true;
+      while (true) {
+        if (unSequenceFileList.isEmpty() && sequenceFileList.isEmpty()
+            && workSequenceTsFileProcessor == null && workUnSequenceTsFileProcessor == null) {
+          removeProcessorFromManager.get();
+          break;
+        }
+        closeFileNodeCondition.await();
+      }
+    } catch (InterruptedException e) {
+      LOGGER.error("CloseFileNodeConditon occurs error while waiting for closing the file node {}",
+          storageGroupName, e);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  public UnsealedTsFileProcessorV2 getWorkSequenceTsFileProcessor() {
+    return workSequenceTsFileProcessor;
+  }
+
+  public UnsealedTsFileProcessorV2 getWorkUnSequenceTsFileProcessor() {
+    return workUnSequenceTsFileProcessor;
+  }
+
+  public String getStorageGroupName() {
+    return storageGroupName;
+  }
+
+  public int getClosingProcessorSize(){
+    return unSequenceFileList.size() + sequenceFileList.size();
+  }
 }