You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/21 07:52:30 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: fix asyncFlushAndSealAllFiles in FileNodeManagerV2

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new 29e7485  fix asyncFlushAndSealAllFiles in FileNodeManagerV2
29e7485 is described below

commit 29e74854b531fee767295be34313461505bc4d1f
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri Jun 21 15:52:18 2019 +0800

    fix asyncFlushAndSealAllFiles in FileNodeManagerV2
---
 .../db/engine/filenodeV2/FileNodeManagerV2.java    | 39 +++++-----------------
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  |  7 ++--
 2 files changed, 13 insertions(+), 33 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
index dac26b8..dcd3924 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
@@ -66,7 +66,7 @@ public class FileNodeManagerV2 implements IService {
    * This map is used to manage all filenode processor,<br> the key is filenode name which is
    * storage group seriesPath.
    */
-  private final ConcurrentHashMap<String, FileNodeProcessorV2> processorMap;
+  private final ConcurrentHashMap<String, FileNodeProcessorV2> processorMap = new ConcurrentHashMap<>();
 
   private static final FileNodeManagerV2 INSTANCE = new FileNodeManagerV2();
 
@@ -85,19 +85,12 @@ public class FileNodeManagerV2 implements IService {
 
 
   private FileNodeManagerV2() {
-    String normalizedBaseDir = config.getFileNodeDir();
-    if (normalizedBaseDir.charAt(normalizedBaseDir.length() - 1) != File.separatorChar) {
-      normalizedBaseDir += Character.toString(File.separatorChar);
-    }
-    baseDir = normalizedBaseDir;
-    processorMap = new ConcurrentHashMap<>();
-
+    baseDir = FilePathUtils.regularizePath(config.getFileNodeDir());
     // create baseDir
     File dir = new File(baseDir);
     if (dir.mkdirs()) {
-      LOGGER.info("baseDir {} doesn't exist, create it", dir.getPath());
+      LOGGER.info("Base directory {} of all storage groups doesn't exist, create it", dir.getPath());
     }
-
   }
 
   @Override
@@ -182,31 +175,15 @@ public class FileNodeManagerV2 implements IService {
     return fileNodeProcessor.insert(tsRecord);
   }
 
-  private void closeAllFileNodeProcessor() {
-    synchronized (processorMap) {
-      LOGGER.info("Start to setCloseMark all FileNode");
-      if (fileNodeManagerStatus != FileNodeManagerStatus.NONE) {
-        LOGGER.info(
-            "Failed to setCloseMark all FileNode processor because the FileNodeManager's status is {}",
-            fileNodeManagerStatus);
-        return;
-      }
-
-      fileNodeManagerStatus = FileNodeManagerStatus.CLOSE;
-
-      for (Map.Entry<String, FileNodeProcessorV2> processorEntry : processorMap.entrySet()) {
 
+  public void asyncFlushAndSealAllFiles() {
+    synchronized (processorMap) {
+      for (FileNodeProcessorV2 fileNodeProcessor: processorMap.values()) {
+        fileNodeProcessor.asyncForceClose();
       }
-
     }
   }
 
-  private void checkTimestamp(TSRecord tsRecord) throws FileNodeManagerException {
-    if (tsRecord.time < 0) {
-      LOGGER.error("The insert time lt 0, {}.", tsRecord);
-      throw new FileNodeManagerException("The insert time lt 0, the tsrecord is " + tsRecord);
-    }
-  }
 
   /**
    * recovery the filenode processor.
@@ -341,7 +318,7 @@ public class FileNodeManagerV2 implements IService {
   private void deleteFileNodeBlocked(String processorName) throws IOException {
     LOGGER.info("Forced to delete the filenode processor {}", processorName);
     FileNodeProcessorV2 processor = processorMap.get(processorName);
-    processor.syncCloseAndReleaseFileNode(() -> {
+    processor.syncCloseAndStopFileNode(() -> {
       String fileNodePath = IoTDBDescriptor.getInstance().getConfig().getFileNodeDir();
       fileNodePath = FilePathUtils.regularizePath(fileNodePath) + processorName;
       try {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index f1b05fe..73dcc08 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -106,7 +106,10 @@ public class FileNodeProcessorV2 {
      */
     try {
       File storageGroupInfoDir = new File(baseDir, storageGroupName);
-      storageGroupInfoDir.mkdirs();
+      if (storageGroupInfoDir.mkdirs()) {
+        LOGGER.info("Storage Group Info Directory {} doesn't exist, create it", storageGroupInfoDir.getPath());
+      }
+
       versionController = new SimpleFileVersionController(
           storageGroupInfoDir.getPath());
     } catch (IOException e) {
@@ -352,7 +355,7 @@ public class FileNodeProcessorV2 {
   /**
    * This method will be blocked until this file node can be closed.
    */
-  public void syncCloseAndReleaseFileNode(Supplier<Boolean> removeProcessorFromManagerCallback){
+  public void syncCloseAndStopFileNode(Supplier<Boolean> removeProcessorFromManagerCallback){
     lock.writeLock().lock();
     try {
       asyncForceClose();