You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/21 04:04:06 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: add test for filenode processor

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new 8e35e67  add test for filenode processor
     new 6bd40b1  Merge branch 'feature_async_close_tsfile' of github.com:apache/incubator-iotdb into feature_async_close_tsfile
8e35e67 is described below

commit 8e35e67d6d64aebdb710ee79dcd62b302c52c290
Author: lta <li...@163.com>
AuthorDate: Fri Jun 21 12:03:18 2019 +0800

    add test for filenode processor
---
 .../db/engine/filenode/CopyOnReadLinkedList.java   |  5 ++-
 .../db/engine/filenodeV2/FileNodeManagerV2.java    | 39 ++++++++++++----------
 .../db/engine/filenodeV2/FileNodeProcessorV2.java  | 34 +++++++++++--------
 .../engine/filenodeV2/FileNodeProcessorV2Test.java | 12 ++++---
 .../filenodeV2/UnsealedTsFileProcessorV2Test.java  | 22 ++++++------
 5 files changed, 65 insertions(+), 47 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnReadLinkedList.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnReadLinkedList.java
index d3b87fa..3353a6c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnReadLinkedList.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/CopyOnReadLinkedList.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
 
 /**
  * this class can just guarantee some behavior in a concurrent thread safety mode:
@@ -59,6 +58,10 @@ public class CopyOnReadLinkedList<T> {
     return readCopy;
   }
 
+  public boolean isEmpty() {
+    return size() == 0;
+  }
+
   public int size() {
     return data.size();
   }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
index 595481a..e0e6830 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeManagerV2.java
@@ -135,31 +135,34 @@ public class FileNodeManagerV2 implements IStatistic, IService {
 
   private FileNodeProcessorV2 getProcessor(String devicePath)
       throws FileNodeManagerException {
-    String filenodeName;
+    String filenodeName = "";
     try {
       // return the storage group name
       filenodeName = MManager.getInstance().getFileNameByPath(devicePath);
-    } catch (PathErrorException e) {
-      LOGGER.error("MManager get storage group name error, seriesPath is {}", devicePath);
-      throw new FileNodeManagerException(e);
-    }
-    FileNodeProcessorV2 processor;
-    processor = processorMap.get(filenodeName);
-    if (processor == null) {
-      filenodeName = filenodeName.intern();
-      synchronized (filenodeName) {
-        processor = processorMap.get(filenodeName);
-        if (processor == null) {
-          LOGGER.debug("construct a processor instance, the storage group is {}, Thread is {}",
-              filenodeName, Thread.currentThread().getId());
-          processor = new FileNodeProcessorV2(filenodeName);
-          synchronized (processorMap) {
-            processorMap.put(filenodeName, processor);
+      FileNodeProcessorV2 processor;
+      processor = processorMap.get(filenodeName);
+      if (processor == null) {
+        filenodeName = filenodeName.intern();
+        synchronized (filenodeName) {
+          processor = processorMap.get(filenodeName);
+          if (processor == null) {
+            LOGGER.debug("construct a processor instance, the storage group is {}, Thread is {}",
+                filenodeName, Thread.currentThread().getId());
+            processor = new FileNodeProcessorV2(baseDir, filenodeName);
+            synchronized (processorMap) {
+              processorMap.put(filenodeName, processor);
+            }
           }
         }
       }
+      return processor;
+    } catch (PathErrorException e) {
+      LOGGER.error("MManager get storage group name error, seriesPath is {}", devicePath);
+      throw new FileNodeManagerException(e);
+    } catch (FileNodeProcessorException e) {
+      LOGGER.error("Fail to init simple file version controller of file node {}", filenodeName,  e);
+      throw new FileNodeManagerException(e);
     }
-    return processor;
   }
 
 
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index 665691f..b86d9fb 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -37,10 +37,10 @@ import org.apache.iotdb.db.engine.querycontext.GlobalSortedSeriesDataSourceV2;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSourceV2;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
-import org.apache.iotdb.db.engine.version.SysTimeVersionController;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.FileNodeProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.utils.FilePathUtils;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -95,7 +95,7 @@ public class FileNodeProcessorV2 {
 
   private VersionController versionController;
 
-  public FileNodeProcessorV2(String storageGroupName) {
+  public FileNodeProcessorV2(String baseDir, String storageGroupName) throws FileNodeProcessorException {
     this.storageGroupName = storageGroupName;
     lock = new ReentrantReadWriteLock();
     closeFileNodeCondition = lock.writeLock().newCondition();
@@ -105,7 +105,14 @@ public class FileNodeProcessorV2 {
     /**
      * version controller
      */
-    versionController = SysTimeVersionController.INSTANCE;
+    try {
+      File storageGroupInfoDir = new File(baseDir, storageGroupName);
+      storageGroupInfoDir.mkdirs();
+      versionController = new SimpleFileVersionController(
+          storageGroupInfoDir.getPath());
+    } catch (IOException e) {
+      throw new FileNodeProcessorException(e);
+    }
 
     // construct the file schema
     this.fileSchema = constructFileSchema(storageGroupName);
@@ -187,8 +194,9 @@ public class FileNodeProcessorV2 {
       if (sequence) {
         if (workSequenceTsFileProcessor == null) {
           String baseDir = directories.getNextFolderForTsfile();
-          String filePath = Paths.get(baseDir, System.currentTimeMillis() + "")
+          String filePath = Paths.get(baseDir, System.currentTimeMillis() + "-" + versionController.nextVersion())
               .toString();
+          System.out.println(filePath);
           workSequenceTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName,
               new File(filePath),
               fileSchema, versionController, this::closeUnsealedTsFileProcessorCallback);
@@ -199,7 +207,7 @@ public class FileNodeProcessorV2 {
         if (workUnSequenceTsFileProcessor == null) {
           // TODO check if the disk is full
           String baseDir = IoTDBDescriptor.getInstance().getConfig().getOverflowDataDir();
-          String filePath = Paths.get(baseDir, System.currentTimeMillis() + "")
+          String filePath = Paths.get(baseDir, System.currentTimeMillis() + "" + + versionController.nextVersion())
               .toString();
           workUnSequenceTsFileProcessor = new UnsealedTsFileProcessorV2(storageGroupName,
               new File(filePath),
@@ -263,7 +271,7 @@ public class FileNodeProcessorV2 {
     for (TsFileResourceV2 tsFileResource : tsFileResources) {
       synchronized (tsFileResource) {
         if (!tsFileResource.getStartTimeMap().isEmpty()) {
-          if (!tsFileResource.isClosed()) {
+          if (tsFileResource.isClosed()) {
             tsfileResourcesForQuery.add(tsFileResource);
           } else {
             Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = tsFileResource
@@ -312,12 +320,12 @@ public class FileNodeProcessorV2 {
    * put the memtable back to the MemTablePool and make the metadata in writer visible
    */
   // TODO please consider concurrency with query and write method.
-  private void closeUnsealedTsFileProcessorCallback(UnsealedTsFileProcessorV2 bufferWriteProcessor) {
-    lock.writeLock().unlock();
+  public void closeUnsealedTsFileProcessorCallback(UnsealedTsFileProcessorV2 unsealedTsFileProcessor) {
+    lock.writeLock().lock();
     try {
-      closingSequenceTsFileProcessor.remove(bufferWriteProcessor);
+      closingSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
       // end time with one start time
-      TsFileResourceV2 resource = workSequenceTsFileProcessor.getTsFileResource();
+      TsFileResourceV2 resource = unsealedTsFileProcessor.getTsFileResource();
       synchronized (resource) {
         for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
           String deviceId = startTime.getKey();
@@ -352,15 +360,15 @@ public class FileNodeProcessorV2 {
   /**
    * Block this method until this file node can be closed.
    */
-  public void syncCloseFileNode(Supplier<Boolean> removeProcessorFromManager){
+  public void syncCloseFileNode(Supplier<Boolean> removeProcessorFromManagerCallback){
     lock.writeLock().lock();
     try {
       asyncForceClose();
       toBeClosed = true;
       while (true) {
-        if (unSequenceFileList.isEmpty() && sequenceFileList.isEmpty()
+        if (closingSequenceTsFileProcessor.isEmpty() && closingUnSequenceTsFileProcessor.isEmpty()
             && workSequenceTsFileProcessor == null && workUnSequenceTsFileProcessor == null) {
-          removeProcessorFromManager.get();
+          removeProcessorFromManagerCallback.get();
           break;
         }
         closeFileNodeCondition.await();
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
index 5503547..103e803 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java
@@ -33,6 +33,7 @@ public class FileNodeProcessorV2Test {
 
   private String storageGroup = "storage_group1";
   private String baseDir = "data";
+  private String systemDir = "data/info";
   private String deviceId = "root.vehicle.d0";
   private String measurementId = "s0";
   private FileNodeProcessorV2 processor;
@@ -41,7 +42,7 @@ public class FileNodeProcessorV2Test {
   public void setUp() throws Exception {
     MetadataManagerHelper.initMetadata();
     EnvironmentUtils.envSetUp();
-    processor = new FileNodeProcessorV2(storageGroup);
+    processor = new FileNodeProcessorV2(systemDir, storageGroup);
   }
 
   @After
@@ -53,17 +54,18 @@ public class FileNodeProcessorV2Test {
 
   @Test
   public void testAsyncClose() {
-    for (int j = 1; j <= 10; j++) {
+    for (int j = 1; j <= 100; j++) {
       TSRecord record = new TSRecord(j, deviceId);
       record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
       processor.insert(record);
       processor.asyncForceClose();
     }
 
+    processor.syncCloseFileNode(() -> null);
     QueryDataSourceV2 queryDataSource = processor.query(deviceId, measurementId);
-    Assert.assertEquals(queryDataSource.getSeqDataSource().getQueryTsFiles().size(), 10);
-    for (TsFileResourceV2 resource: queryDataSource.getSeqDataSource().getQueryTsFiles()) {
-      Assert.assertEquals(resource.isClosed(), true);
+    Assert.assertEquals(queryDataSource.getSeqDataSource().getQueryTsFiles().size(), 100);
+    for (TsFileResourceV2 resource : queryDataSource.getSeqDataSource().getQueryTsFiles()) {
+      Assert.assertTrue(resource.isClosed());
     }
   }
 
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
index 8900479..a51df9e 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java
@@ -28,6 +28,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.function.Consumer;
 import org.apache.iotdb.db.engine.MetadataManagerHelper;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.version.SysTimeVersionController;
@@ -145,16 +146,17 @@ public class UnsealedTsFileProcessorV2Test {
   @Test
   public void testWriteAndClose() throws WriteProcessException, IOException {
     processor = new UnsealedTsFileProcessorV2(storageGroup, new File(filePath),
-        FileSchemaUtils.constructFileSchema(deviceId), SysTimeVersionController.INSTANCE, x->{
-      TsFileResourceV2 resource = processor.getTsFileResource();
-      synchronized (resource) {
-        for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
-          String deviceId = startTime.getKey();
-          resource.getEndTimeMap().put(deviceId, resource.getStartTimeMap().get(deviceId));
-          resource.setClosed(true);
-        }
-      }
-    });
+        FileSchemaUtils.constructFileSchema(deviceId), SysTimeVersionController.INSTANCE,
+        unsealedTsFileProcessorV2 -> {
+          TsFileResourceV2 resource = unsealedTsFileProcessorV2.getTsFileResource();
+          synchronized (resource) {
+            for (Entry<String, Long> startTime : resource.getStartTimeMap().entrySet()) {
+              String deviceId = startTime.getKey();
+              resource.getEndTimeMap().put(deviceId, resource.getStartTimeMap().get(deviceId));
+            }
+            resource.setClosed(true);
+          }
+        });
 
     Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = processor
         .query(deviceId, measurementId, dataType, props);