You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/25 04:14:58 UTC

[incubator-iotdb] 01/04: modify filenode processor test

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 71386ef5fefbe895cc02444ea33753fe07c82703
Author: lta <li...@163.com>
AuthorDate: Tue Jun 25 11:45:07 2019 +0800

    modify filenode processor test
---
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 60 +++++++++++-----------
 .../filenodeV2/UnsealedTsFileProcessorV2Test.java  |  2 +-
 2 files changed, 30 insertions(+), 32 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index d7f6f6b..365860e 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -90,7 +90,7 @@ public class FileNodeProcessorV2 {
 
   private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-  private Object closeFileNodeCondition = new Object();
+  private final Object closeFileNodeCondition = new Object();
 
   /**
    * Mark whether to close file node
@@ -233,6 +233,7 @@ public class FileNodeProcessorV2 {
    * @return -1: failed, 1: Overflow, 2:Bufferwrite
    */
   public boolean insert(InsertPlan insertPlan) {
+    lock.writeLock().lock();
     try {
       if (toBeClosed) {
         throw new FileNodeProcessorException(
@@ -251,47 +252,44 @@ public class FileNodeProcessorV2 {
     } catch (FileNodeProcessorException | IOException e) {
       LOGGER.error("insert tsRecord to unsealed data file failed, because {}", e.getMessage(), e);
       return false;
+    } finally {
+      lock.writeLock().unlock();
     }
   }
 
   private boolean insertUnsealedDataFile(InsertPlan insertPlan, boolean sequence)
       throws IOException {
-    lock.writeLock().lock();
     UnsealedTsFileProcessorV2 unsealedTsFileProcessor;
-    try {
-      boolean result;
-      // create a new BufferWriteProcessor
-      if (sequence) {
-        if (workSequenceTsFileProcessor == null) {
-          workSequenceTsFileProcessor = createTsFileProcessor(true);
-          sequenceFileList.add(workSequenceTsFileProcessor.getTsFileResource());
-        }
-        unsealedTsFileProcessor = workSequenceTsFileProcessor;
-      } else {
-        if (workUnSequenceTsFileProcessor == null) {
-          workUnSequenceTsFileProcessor = createTsFileProcessor(false);
-          unSequenceFileList.add(workUnSequenceTsFileProcessor.getTsFileResource());
-        }
-        unsealedTsFileProcessor = workUnSequenceTsFileProcessor;
+    boolean result;
+    // create a new BufferWriteProcessor
+    if (sequence) {
+      if (workSequenceTsFileProcessor == null) {
+        workSequenceTsFileProcessor = createTsFileProcessor(true);
+        sequenceFileList.add(workSequenceTsFileProcessor.getTsFileResource());
       }
-
-      // insert BufferWrite
-      result = unsealedTsFileProcessor.insert(insertPlan);
-
-      // try to update the latest time of the device of this tsRecord
-      if (result && latestTimeForEachDevice.get(insertPlan.getDeviceId()) < insertPlan.getTime()) {
-        latestTimeForEachDevice.put(insertPlan.getDeviceId(), insertPlan.getTime());
+      unsealedTsFileProcessor = workSequenceTsFileProcessor;
+    } else {
+      if (workUnSequenceTsFileProcessor == null) {
+        workUnSequenceTsFileProcessor = createTsFileProcessor(false);
+        unSequenceFileList.add(workUnSequenceTsFileProcessor.getTsFileResource());
       }
+      unsealedTsFileProcessor = workUnSequenceTsFileProcessor;
+    }
 
-      // check memtable size and may asyncFlush the workMemtable
-      if (unsealedTsFileProcessor.shouldFlush()) {
-        flushAndCheckShouldClose(unsealedTsFileProcessor, sequence);
-      }
+    // insert BufferWrite
+    result = unsealedTsFileProcessor.insert(insertPlan);
 
-      return result;
-    } finally {
-      lock.writeLock().unlock();
+    // try to update the latest time of the device of this tsRecord
+    if (result && latestTimeForEachDevice.get(insertPlan.getDeviceId()) < insertPlan.getTime()) {
+      latestTimeForEachDevice.put(insertPlan.getDeviceId(), insertPlan.getTime());
     }
+
+    // check memtable size and may asyncFlush the workMemtable
+    if (unsealedTsFileProcessor.shouldFlush()) {
+      flushAndCheckShouldClose(unsealedTsFileProcessor, sequence);
+    }
+
+    return result;
   }
 
   private UnsealedTsFileProcessorV2 createTsFileProcessor(boolean sequence) throws IOException {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
index 7d08217..ce00700 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
@@ -50,7 +50,7 @@ public class UnsealedTsFileProcessorV2Test {
 
   private UnsealedTsFileProcessorV2 processor;
   private String storageGroup = "storage_group1";
-  private String filePath = "testUnsealedTsFileProcessor.tsfile";
+  private String filePath = "data/testUnsealedTsFileProcessor.tsfile";
   private String deviceId = "root.vehicle.d0";
   private String measurementId = "s0";
   private TSDataType dataType = TSDataType.INT32;