You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/19 06:37:15 UTC

[incubator-iotdb] 01/02: fix FNP insert

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit e8dc717e5abae29b6f05d772852bf61ec6dc4902
Author: qiaojialin <64...@qq.com>
AuthorDate: Wed Jun 19 14:01:27 2019 +0800

    fix FNP insert
---
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 66 +++++++++++-----------
 1 file changed, 34 insertions(+), 32 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index c1cad5b..e5902c1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.db.engine.AbstractUnsealedDataFileProcessorV2;
 import org.apache.iotdb.db.engine.bufferwriteV2.BufferWriteProcessorV2;
 import org.apache.iotdb.db.engine.filenode.CopyOnWriteLinkedList;
 import org.apache.iotdb.db.engine.filenode.FileNodeProcessorStatus;
+import org.apache.iotdb.db.engine.overflowV2.OverflowProcessorV2;
 import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.FileNodeProcessorException;
@@ -73,7 +74,7 @@ public class FileNodeProcessorV2 {
 
   // for overflow
   private List<TsFileResourceV2> unsequenceFileList;
-  private BufferWriteProcessorV2 workOverflowProcessor = null;
+  private OverflowProcessorV2 workOverflowProcessor = null;
   private CopyOnWriteLinkedList<BufferWriteProcessorV2> closingOverflowProcessor = new CopyOnWriteLinkedList<>();
 
   /**
@@ -123,8 +124,7 @@ public class FileNodeProcessorV2 {
     try {
       fileNodeProcessorStore = readStoreFromDisk();
     } catch (FileNodeProcessorException e) {
-      LOGGER.error(
-          "The fileNode processor {} encountered an error when recoverying restore " +
+      LOGGER.error("The fileNode processor {} encountered an error when recoverying restore " +
               "information.", storageGroup);
       throw new FileNodeProcessorException(e);
     }
@@ -176,24 +176,51 @@ public class FileNodeProcessorV2 {
     }
   }
 
-  private void writeUnsealedDataFile(AbstractUnsealedDataFileProcessorV2 udfProcessor, TSRecord tsRecord, boolean sequence) {
+  public boolean insert(TSRecord tsRecord) {
+    lock.writeLock().lock();
+    boolean result = true;
+
+    try {
+      // init map
+      latestTimeMap.putIfAbsent(tsRecord.deviceId, Long.MIN_VALUE);
+      latestFlushTimeMap.putIfAbsent(tsRecord.deviceId, Long.MIN_VALUE);
+
+      // write to sequence or unsequence file
+      if (tsRecord.time > latestFlushTimeMap.get(tsRecord.deviceId)) {
+        writeUnsealedDataFile(workBufferWriteProcessor, tsRecord, true);
+      } else {
+        writeUnsealedDataFile(workOverflowProcessor, tsRecord, false);
+      }
+    } catch (Exception e) {
+      LOGGER.error("insert tsRecord to unsealed data file failed, because {}", e.getMessage(), e);
+      result = false;
+    } finally {
+      lock.writeLock().unlock();
+    }
+
+    return result;
+  }
+
+
+  private void writeUnsealedDataFile(AbstractUnsealedDataFileProcessorV2 udfProcessor,
+      TSRecord tsRecord, boolean sequence) throws IOException {
     boolean result;
     // create a new BufferWriteProcessor
     if (udfProcessor == null) {
       if (sequence) {
         String baseDir = directories.getNextFolderForTsfile();
         String filePath = Paths.get(baseDir, storageGroup, tsRecord.time + "").toString();
-        workBufferWriteProcessor = new BufferWriteProcessorV2(storageGroup, new File(filePath),
+        udfProcessor = new BufferWriteProcessorV2(storageGroup, new File(filePath),
             fileSchema, versionController, this::closeBufferWriteProcessorCallBack);
         sequenceFileList.add(udfProcessor.getTsFileResource());
       } else {
+        // TODO check if the disk is full
         String baseDir = IoTDBDescriptor.getInstance().getConfig().getOverflowDataDir();
         String filePath = Paths.get(baseDir, storageGroup, tsRecord.time + "").toString();
-        workBufferWriteProcessor = new BufferWriteProcessorV2(storageGroup, new File(filePath),
+        udfProcessor = new OverflowProcessorV2(storageGroup, new File(filePath),
             fileSchema, versionController, this::closeBufferWriteProcessorCallBack);
         unsequenceFileList.add(udfProcessor.getTsFileResource());
       }
-      // TODO check if the disk is full
     }
 
     // write BufferWrite
@@ -210,31 +237,6 @@ public class FileNodeProcessorV2 {
     }
   }
 
-  public boolean insert(TSRecord tsRecord) {
-    lock.writeLock().lock();
-    boolean result = true;
-
-    try {
-
-      // init map
-      latestTimeMap.putIfAbsent(tsRecord.deviceId, Long.MIN_VALUE);
-      latestFlushTimeMap.putIfAbsent(tsRecord.deviceId, Long.MIN_VALUE);
-
-      if (tsRecord.time > latestFlushTimeMap.get(tsRecord.deviceId)) {
-
-
-
-      } else {
-        // TODO write to overflow
-      }
-    } catch (Exception e) {
-
-    } finally {
-      lock.writeLock().unlock();
-    }
-
-    return result;
-  }
 
   /**
    * ensure there must be a flush thread submitted after close() is called,