You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/25 09:16:20 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: fix async close bug

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new 01f36de  fix async close bug
     new 7a00539  Merge remote-tracking branch 'origin/feature_async_close_tsfile' into feature_async_close_tsfile
01f36de is described below

commit 01f36deab65a506cc1d8d0ab66dbf4a8e2b332f6
Author: qiaojialin <64...@qq.com>
AuthorDate: Tue Jun 25 17:15:44 2019 +0800

    fix async close bug
---
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 62 +++++++++++-----------
 .../filenodeV2/UnsealedTsFileProcessorV2.java      |  8 ++-
 2 files changed, 38 insertions(+), 32 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index b167766..2b3b958 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -286,7 +286,11 @@ public class FileNodeProcessorV2 {
 
     // check memtable size and may asyncFlush the workMemtable
     if (unsealedTsFileProcessor.shouldFlush()) {
-      flushAndCheckShouldClose(unsealedTsFileProcessor, sequence);
+      if (unsealedTsFileProcessor.shouldClose()) {
+        asyncCloseTsFileProcessor(unsealedTsFileProcessor, sequence);
+      } else {
+        unsealedTsFileProcessor.asyncFlush();
+      }
     }
 
     return result;
@@ -316,6 +320,33 @@ public class FileNodeProcessorV2 {
     }
   }
 
+  /**
+   * only called by insert(), thread-safety should be ensured by caller
+   */
+  private void asyncCloseTsFileProcessor(UnsealedTsFileProcessorV2 unsealedTsFileProcessor,
+      boolean sequence) {
+
+    LOGGER.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}",
+        unsealedTsFileProcessor.getWorkMemTableMemory(),
+        unsealedTsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
+
+    // check file size and may setCloseMark the BufferWrite
+    if (sequence) {
+      closingSequenceTsFileProcessor.add(unsealedTsFileProcessor);
+      workSequenceTsFileProcessor = null;
+    } else {
+      closingUnSequenceTsFileProcessor.add(unsealedTsFileProcessor);
+      workUnSequenceTsFileProcessor = null;
+    }
+
+    // async close tsfile
+    unsealedTsFileProcessor.asyncClose();
+
+    LOGGER.info("The file size {} reaches the threshold, async close tsfile: {}.",
+        unsealedTsFileProcessor.getTsFileResource().getFileSize(),
+        unsealedTsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
+  }
+
 
   // TODO need a read lock, please consider the concurrency with flush manager threads.
   public QueryDataSourceV2 query(String deviceId, String measurementId)
@@ -450,35 +481,6 @@ public class FileNodeProcessorV2 {
     }
   }
 
-  /**
-   * ensure there must be a flush thread submitted after setCloseMark() is called, therefore the
-   * setCloseMark task will be executed by a flush thread.
-   *
-   * only called by insert(), thread-safety should be ensured by caller
-   */
-  private void flushAndCheckShouldClose(UnsealedTsFileProcessorV2 unsealedTsFileProcessor,
-      boolean sequence) {
-
-    LOGGER.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}",
-        unsealedTsFileProcessor.getWorkMemTableMemory(),
-        unsealedTsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
-
-    // check file size and may setCloseMark the BufferWrite
-    if (unsealedTsFileProcessor.shouldClose()) {
-      if (sequence) {
-        closingSequenceTsFileProcessor.add(unsealedTsFileProcessor);
-        workSequenceTsFileProcessor = null;
-      } else {
-        closingUnSequenceTsFileProcessor.add(unsealedTsFileProcessor);
-        workUnSequenceTsFileProcessor = null;
-      }
-      LOGGER.info("The file size {} reaches the threshold, async close tsfile: {}.",
-          unsealedTsFileProcessor.getTsFileResource().getFileSize(),
-          unsealedTsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
-    }
-
-    unsealedTsFileProcessor.asyncClose();
-  }
 
   public void asyncForceClose() {
     lock.writeLock().lock();
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index b3074a9..d6c860a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -197,6 +197,10 @@ public class UnsealedTsFileProcessorV2 {
     LOGGER.info("File {} is closed synchronously", tsFileResource.getFile().getAbsolutePath());
   }
 
+  /**
+   * Ensure there must be a flush thread submitted after setCloseMark() is called,
+   * therefore the setCloseMark task will be executed by a flush thread.
+   */
   public void asyncClose() {
     flushQueryLock.writeLock().lock();
     LOGGER.info("Async close the file: {}", tsFileResource.getFile().getAbsolutePath());
@@ -206,11 +210,11 @@ public class UnsealedTsFileProcessorV2 {
         LOGGER.debug("add an empty memtable into flushing memtable list when sync close");
       }
       flushingMemTables.add(tmpMemTable);
+      shouldClose = true;
+      workMemTable = null;
       tmpMemTable.setVersion(versionController.nextVersion());
       FlushManager.getInstance().registerUnsealedTsFileProcessor(this);
       flushUpdateLatestFlushTimeCallback.get();
-      shouldClose = true;
-      workMemTable = null;
     } finally {
       flushQueryLock.writeLock().unlock();
     }