You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/06/21 02:32:15 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: add filenode test

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new c8d8e24  add filenode test
     new c90ad2e  Merge remote-tracking branch 'origin/feature_async_close_tsfile' into feature_async_close_tsfile
c8d8e24 is described below

commit c8d8e24de9fa728a293f2d9e21b72b70ac8e2550
Author: qiaojialin <64...@qq.com>
AuthorDate: Fri Jun 21 10:31:29 2019 +0800

    add filenode test
---
 .../db/engine/filenodeV2/FileNodeManagerV2.java    |  6 +--
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 49 +++++++++-------------
 .../filenodeV2/UnsealedTsFileProcessorV2.java      |  8 ++--
 .../engine/filenodeV2/FileNodeProcessorV2Test.java | 26 +++---------
 .../filenodeV2/UnsealedTsFileProcessorV2Test.java  |  1 +
 5 files changed, 32 insertions(+), 58 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
index ea1fdd3..595481a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
@@ -152,11 +152,7 @@ public class FileNodeManagerV2 implements IStatistic, IService {
         if (processor == null) {
           LOGGER.debug("construct a processor instance, the storage group is {}, Thread is {}",
               filenodeName, Thread.currentThread().getId());
-          try {
-            processor = new FileNodeProcessorV2(baseDir, filenodeName);
-          } catch (FileNodeProcessorException e) {
-            throw new FileNodeManagerException(e);
-          }
+          processor = new FileNodeProcessorV2(filenodeName);
           synchronized (processorMap) {
             processorMap.put(filenodeName, processor);
           }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index 02cea35..665691f 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSourceV2;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
+import org.apache.iotdb.db.engine.version.SysTimeVersionController;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.FileNodeProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
@@ -56,8 +57,6 @@ public class FileNodeProcessorV2 {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(FileNodeProcessorV2.class);
 
-  private static final String RESTORE_FILE_SUFFIX = ".restore";
-
   private static final MManager mManager = MManager.getInstance();
   private static final Directories directories = Directories.getInstance();
 
@@ -96,29 +95,17 @@ public class FileNodeProcessorV2 {
 
   private VersionController versionController;
 
-  public FileNodeProcessorV2(String absoluteBaseDir, String storageGroupName)
-      throws FileNodeProcessorException {
+  public FileNodeProcessorV2(String storageGroupName) {
     this.storageGroupName = storageGroupName;
     lock = new ReentrantReadWriteLock();
     closeFileNodeCondition = lock.writeLock().newCondition();
 
-    File storageGroupDir = new File(absoluteBaseDir, storageGroupName);
-    if (!storageGroupDir.exists()) {
-      storageGroupDir.mkdir();
-      LOGGER.info("The directory of the storage group {} doesn't exist. Create a new " +
-          "directory {}", storageGroupName, storageGroupDir.getAbsolutePath());
-    }
-
     recovery();
 
     /**
      * version controller
      */
-    try {
-      versionController = new SimpleFileVersionController(storageGroupDir.getAbsolutePath());
-    } catch (IOException e) {
-      throw new FileNodeProcessorException(e);
-    }
+    versionController = SysTimeVersionController.INSTANCE;
 
     // construct the file schema
     this.fileSchema = constructFileSchema(storageGroupName);
@@ -175,10 +162,10 @@ public class FileNodeProcessorV2 {
       boolean result;
       // write to sequence or unsequence file
       if (tsRecord.time > latestFlushedTimeForEachDevice.get(tsRecord.deviceId)) {
-        result = writeUnsealedDataFile(workSequenceTsFileProcessor, tsRecord, true);
+        result = writeUnsealedDataFile(tsRecord, true);
         insertResult = result ? 1 : -1;
       } else {
-        result = writeUnsealedDataFile(workUnSequenceTsFileProcessor, tsRecord, false);
+        result = writeUnsealedDataFile(tsRecord, false);
         insertResult = result ? 2 : -1;
       }
     } catch (Exception e) {
@@ -191,31 +178,35 @@ public class FileNodeProcessorV2 {
     return insertResult;
   }
 
-  private boolean writeUnsealedDataFile(UnsealedTsFileProcessorV2 unsealedTsFileProcessor,
-      TSRecord tsRecord, boolean sequence) throws IOException {
+  private boolean writeUnsealedDataFile(TSRecord tsRecord, boolean sequence) throws IOException {
     lock.writeLock().lock();
+    UnsealedTsFileProcessorV2 unsealedTsFileProcessor;
     try {
       boolean result;
       // create a new BufferWriteProcessor
-      if (unsealedTsFileProcessor == null) {
-        if (sequence) {
+      if (sequence) {
+        if (workSequenceTsFileProcessor == null) {
           String baseDir = directories.getNextFolderForTsfile();
-          String filePath = Paths.get(baseDir, storageGroupName, System.currentTimeMillis() + "")
+          String filePath = Paths.get(baseDir, System.currentTimeMillis() + "")
               .toString();
-          unsealedTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName,
+          workSequenceTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName,
               new File(filePath),
               fileSchema, versionController, this::closeUnsealedTsFileProcessorCallback);
-          sequenceFileList.add(unsealedTsFileProcessor.getTsFileResource());
-        } else {
+          sequenceFileList.add(workSequenceTsFileProcessor.getTsFileResource());
+        }
+        unsealedTsFileProcessor = workSequenceTsFileProcessor;
+      } else {
+        if (workUnSequenceTsFileProcessor == null) {
           // TODO check if the disk is full
           String baseDir = IoTDBDescriptor.getInstance().getConfig().getOverflowDataDir();
-          String filePath = Paths.get(baseDir, storageGroupName, System.currentTimeMillis() + "")
+          String filePath = Paths.get(baseDir, System.currentTimeMillis() + "")
               .toString();
-          unsealedTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName,
+          workUnSequenceTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName,
               new File(filePath),
               fileSchema, versionController, this::closeUnsealedTsFileProcessorCallback);
-          unSequenceFileList.add(unsealedTsFileProcessor.getTsFileResource());
+          unSequenceFileList.add(workUnSequenceTsFileProcessor.getTsFileResource());
         }
+        unsealedTsFileProcessor = workUnSequenceTsFileProcessor;
       }
 
       // write BufferWrite
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index f00b935..3185a2e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -71,7 +71,7 @@ public class UnsealedTsFileProcessorV2 {
 
   private VersionController versionController;
 
-  private Consumer<UnsealedTsFileProcessorV2> closeUnsealedTsFileProcessor;
+  private Consumer<UnsealedTsFileProcessorV2> closeUnsealedFileCallback;
 
   /**
    * sync this object in query() and asyncFlush()
@@ -80,14 +80,14 @@ public class UnsealedTsFileProcessorV2 {
 
   public UnsealedTsFileProcessorV2(String storageGroupName, File tsfile, FileSchema fileSchema,
       VersionController versionController,
-      Consumer<UnsealedTsFileProcessorV2> closeUnsealedTsFileProcessor)
+      Consumer<UnsealedTsFileProcessorV2> closeUnsealedFileCallback)
       throws IOException {
     this.storageGroupName = storageGroupName;
     this.fileSchema = fileSchema;
     this.tsFileResource = new TsFileResourceV2(tsfile);
     this.versionController = versionController;
     this.writer = new NativeRestorableIOWriter(tsfile);
-    this.closeUnsealedTsFileProcessor = closeUnsealedTsFileProcessor;
+    this.closeUnsealedFileCallback = closeUnsealedFileCallback;
   }
 
   /**
@@ -254,7 +254,7 @@ public class UnsealedTsFileProcessorV2 {
     writer = null;
 
     // remove this processor from Closing list in FileNodeProcessor
-    closeUnsealedTsFileProcessor.accept(this);
+    closeUnsealedFileCallback.accept(this);
 
     // delete the restore for this bufferwrite processor
     if (LOGGER.isInfoEnabled()) {
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
index 385b86e..5503547 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
@@ -18,16 +18,8 @@
  */
 package org.apache.iotdb.db.engine.filenodeV2;
 
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.MetadataManagerHelper;
-import org.apache.iotdb.db.engine.bufferwrite.BufferWriteProcessor;
-import org.apache.iotdb.db.engine.filenode.FileNodeProcessor;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
-import org.apache.iotdb.db.exception.BufferWriteProcessorException;
-import org.apache.iotdb.db.exception.FileNodeProcessorException;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
@@ -43,15 +35,13 @@ public class FileNodeProcessorV2Test {
   private String baseDir = "data";
   private String deviceId = "root.vehicle.d0";
   private String measurementId = "s0";
-  private TSDataType dataType = TSDataType.INT32;
-  private Map<String, String> props = Collections.emptyMap();
   private FileNodeProcessorV2 processor;
 
   @Before
   public void setUp() throws Exception {
     MetadataManagerHelper.initMetadata();
     EnvironmentUtils.envSetUp();
-    processor = new FileNodeProcessorV2(baseDir, storageGroup);
+    processor = new FileNodeProcessorV2(storageGroup);
   }
 
   @After
@@ -62,23 +52,19 @@ public class FileNodeProcessorV2Test {
 
 
   @Test
-  public void testAsyncClose()
-      throws FileNodeProcessorException, BufferWriteProcessorException, ExecutionException, InterruptedException {
-
-
+  public void testAsyncClose() {
     for (int j = 1; j <= 10; j++) {
       TSRecord record = new TSRecord(j, deviceId);
-      for (int i = 0; i < 10; i++) {
-        record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(i)));
-      }
+      record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
       processor.insert(record);
       processor.asyncForceClose();
     }
 
     QueryDataSourceV2 queryDataSource = processor.query(deviceId, measurementId);
-
     Assert.assertEquals(queryDataSource.getSeqDataSource().getQueryTsFiles().size(), 10);
-
+    for (TsFileResourceV2 resource: queryDataSource.getSeqDataSource().getQueryTsFiles()) {
+      Assert.assertEquals(resource.isClosed(), true);
+    }
   }
 
 
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
index 3b36cc4..8900479 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
@@ -151,6 +151,7 @@ public class UnsealedTsFileProcessorV2Test {
         for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
           String deviceId = startTime.getKey();
           resource.getEndTimeMap().put(deviceId, resource.getStartTimeMap().get(deviceId));
+          resource.setClosed(true);
         }
       }
     });