You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/23 07:31:34 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: fix end time map update big

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new 395145c  fix end time map update big
395145c is described below

commit 395145c347b232bc827a947eedef255bd6284357
Author: qiaojialin <64...@qq.com>
AuthorDate: Sun Jun 23 15:31:31 2019 +0800

    fix end time map update big
---
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 26 +++++++++++++--------
 .../filenodeV2/UnsealedTsFileProcessorV2.java      | 27 +++++++++++-----------
 2 files changed, 30 insertions(+), 23 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index 6f1ec2f..74f7fef 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -445,6 +445,7 @@ public class FileNodeProcessorV2 {
     }
 
     unsealedTsFileProcessor.asyncFlush();
+
   }
 
   public void asyncForceClose() {
@@ -453,11 +454,13 @@ public class FileNodeProcessorV2 {
       if (workSequenceTsFileProcessor != null) {
         closingSequenceTsFileProcessor.add(workSequenceTsFileProcessor);
         workSequenceTsFileProcessor.asyncClose();
+        updateEndTimeMap(workSequenceTsFileProcessor);
         workSequenceTsFileProcessor = null;
       }
       if (workUnSequenceTsFileProcessor != null) {
         closingUnSequenceTsFileProcessor.add(workUnSequenceTsFileProcessor);
         workUnSequenceTsFileProcessor.asyncClose();
+        updateEndTimeMap(workUnSequenceTsFileProcessor);
         workUnSequenceTsFileProcessor = null;
       }
     } finally {
@@ -466,6 +469,18 @@ public class FileNodeProcessorV2 {
   }
 
   /**
+   * when close an UnsealedTsFileProcessor, update its EndTimeMap immediately
+   * @param tsFileProcessor processor to be closed
+   */
+  private void updateEndTimeMap(UnsealedTsFileProcessorV2 tsFileProcessor) {
+    TsFileResourceV2 resource = tsFileProcessor.getTsFileResource();
+    for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
+      String deviceId = startTime.getKey();
+      resource.getEndTimeMap().put(deviceId, latestTimeForEachDevice.get(deviceId));
+    }
+  }
+
+  /**
    * This method will be blocked until all tsfile processors are closed.
    */
   public void syncCloseFileNode(){
@@ -533,19 +548,12 @@ public class FileNodeProcessorV2 {
     try {
       if (closingSequenceTsFileProcessor.contains(unsealedTsFileProcessor)) {
         closingSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
-        LOGGER.info("removed a sequence tsfile processor, closing list size: {}", closingSequenceTsFileProcessor.size());
       } else {
-//        closingUnSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
+        closingUnSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
       }
       // end time with one start time
       TsFileResourceV2 resource = unsealedTsFileProcessor.getTsFileResource();
-      synchronized (resource) {
-        for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
-          String deviceId = startTime.getKey();
-          resource.getEndTimeMap().put(deviceId, latestTimeForEachDevice.get(deviceId));
-        }
-        resource.setClosed(true);
-      }
+      resource.setClosed(true);
       LOGGER.info("signal closing file node condition");
       closeFileNodeCondition.signal();
     }finally {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index 5b27226..b412248 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -182,6 +182,16 @@ public class UnsealedTsFileProcessorV2 {
     shouldClose = true;
   }
 
+  public void syncClose() {
+    asyncClose();
+    synchronized (flushingMemTables) {
+      try {
+        flushingMemTables.wait();
+      } catch (InterruptedException e) {
+        LOGGER.error("wait close interrupted", e);
+      }
+    }
+  }
 
   public void asyncClose() {
     flushQueryLock.writeLock().lock();
@@ -200,7 +210,9 @@ public class UnsealedTsFileProcessorV2 {
     }
   }
 
-  // only for test
+  /**
+   * TODO if the flushing thread is too fast, the tmpMemTable.wait() may never wakeup
+   */
   public void syncFlush() {
     IMemTable tmpMemTable;
     flushQueryLock.writeLock().lock();
@@ -226,19 +238,6 @@ public class UnsealedTsFileProcessorV2 {
     }
   }
 
-
-
-  public void syncClose() {
-    asyncClose();
-    synchronized (flushingMemTables) {
-      try {
-        flushingMemTables.wait();
-      } catch (InterruptedException e) {
-        LOGGER.error("wait close interrupted", e);
-      }
-    }
-  }
-
   /**
    * put the working memtable into flushing list and set the working memtable to null
    */