You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2020/06/22 02:17:10 UTC

[GitHub] [incubator-iotdb] zhanglingzhe0820 opened a new pull request #1401: [IOTDB-706]Introduce virtual memtable for larger Chunk

zhanglingzhe0820 opened a new pull request #1401:
URL: https://github.com/apache/incubator-iotdb/pull/1401


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-iotdb] SilverNarcissus commented on a change in pull request #1401: [IOTDB-706]Introduce virtual memtable for larger Chunk

Posted by GitBox <gi...@apache.org>.
SilverNarcissus commented on a change in pull request #1401:
URL: https://github.com/apache/incubator-iotdb/pull/1401#discussion_r443349363



##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
##########
@@ -497,6 +509,34 @@ else if (upgradeFolder.exists()) {
     return new Pair<>(ret, upgradeRet);
   }
 
+  private Map<String, List<TsFileResource>> getAllVms(List<String> folders) throws IOException {
+    List<File> vmFiles = new ArrayList<>();
+    for (String baseDir : folders) {
+      File fileFolder = fsFactory.getFile(baseDir, storageGroupName);
+      if (!fileFolder.exists()) {
+        continue;
+      }
+      Collections
+          .addAll(vmFiles, fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), VM_SUFFIX));
+    }
+
+    Map<String, List<TsFileResource>> vmTsFileResourceMap = new HashMap<>();
+    for (File f : vmFiles) {
+      TsFileResource fileResource = new TsFileResource(f);
+      fileResource.setClosed(false);
+      // make sure the flush command is called before IoTDB is down.
+      fileResource.deserialize();
+      String tsfilePrefix = f.getName().split(TSFILE_SEPARATOR)[0];
+      List<TsFileResource> vmTsFileResource = new ArrayList<>();
+      if (vmTsFileResourceMap.containsKey(tsfilePrefix)) {
+        vmTsFileResource = vmTsFileResourceMap.get(tsfilePrefix);

Review comment:
       You can use getOrDefault method to save time

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
##########
@@ -549,15 +615,86 @@ public void flushOneMemTable() {
     memTableToFlush = flushingMemTables.getFirst();
     if (logger.isInfoEnabled()) {
       logger.info("{}: {} starts to flush a memtable in a flush thread", storageGroupName,
-              tsFileResource.getFile().getName());
+          tsFileResource.getFile().getName());
     }
     // signal memtable only may appear when calling asyncClose()
     if (!memTableToFlush.isSignalMemTable()) {
-      MemTableFlushTask flushTask = new MemTableFlushTask(memTableToFlush, writer,
-          storageGroupName);
       try {
+        boolean isVm = false;
+        boolean isFull = false;
+        MemTableFlushTask flushTask;
+        if (config.isEnableVm()) {
+          long vmPointNum = 0;
+          for (RestorableTsFileIOWriter vmWriter : vmWriters) {
+            Map<String, Map<String, List<ChunkMetadata>>> metadatasForQuery = vmWriter
+                .getMetadatasForQuery();
+            for (String device : metadatasForQuery.keySet()) {
+              Map<String, List<ChunkMetadata>> chunkMetadataListMap = metadatasForQuery.get(device);
+              for (String sensor : chunkMetadataListMap.keySet()) {
+                for (ChunkMetadata chunkMetadata : chunkMetadataListMap.get(sensor)) {
+                  vmPointNum += chunkMetadata.getNumOfPoints();
+                }
+              }
+            }
+          }
+          // all flush to target file
+          if ((
+              (vmPointNum + memTableToFlush.getTotalPointsNum()) / memTableToFlush.getSeriesNumber()
+                  > config
+                  .getMemtablePointThreshold()) || (shouldClose && flushingMemTables.size() == 1)) {
+            isVm = false;
+            isFull = false;
+            flushTask = new MemTableFlushTask(memTableToFlush, writer, vmWriters, false,
+                false,
+                storageGroupName);
+          } else {
+            // merge vm files
+            if (config.getMaxVmNum() <= vmTsFileResources.size()) {
+              isVm = true;
+              isFull = true;
+              flushTask = new MemTableFlushTask(memTableToFlush, writer, vmWriters,
+                  true, true,

Review comment:
       Maybe you should use isVm and isFull to replace true, true

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
##########
@@ -549,15 +615,86 @@ public void flushOneMemTable() {
     memTableToFlush = flushingMemTables.getFirst();
     if (logger.isInfoEnabled()) {
       logger.info("{}: {} starts to flush a memtable in a flush thread", storageGroupName,
-              tsFileResource.getFile().getName());
+          tsFileResource.getFile().getName());
     }
     // signal memtable only may appear when calling asyncClose()
     if (!memTableToFlush.isSignalMemTable()) {
-      MemTableFlushTask flushTask = new MemTableFlushTask(memTableToFlush, writer,
-          storageGroupName);
       try {
+        boolean isVm = false;
+        boolean isFull = false;
+        MemTableFlushTask flushTask;
+        if (config.isEnableVm()) {
+          long vmPointNum = 0;
+          for (RestorableTsFileIOWriter vmWriter : vmWriters) {
+            Map<String, Map<String, List<ChunkMetadata>>> metadatasForQuery = vmWriter
+                .getMetadatasForQuery();
+            for (String device : metadatasForQuery.keySet()) {
+              Map<String, List<ChunkMetadata>> chunkMetadataListMap = metadatasForQuery.get(device);
+              for (String sensor : chunkMetadataListMap.keySet()) {
+                for (ChunkMetadata chunkMetadata : chunkMetadataListMap.get(sensor)) {
+                  vmPointNum += chunkMetadata.getNumOfPoints();
+                }
+              }
+            }
+          }
+          // all flush to target file
+          if ((
+              (vmPointNum + memTableToFlush.getTotalPointsNum()) / memTableToFlush.getSeriesNumber()
+                  > config
+                  .getMemtablePointThreshold()) || (shouldClose && flushingMemTables.size() == 1)) {
+            isVm = false;
+            isFull = false;
+            flushTask = new MemTableFlushTask(memTableToFlush, writer, vmWriters, false,
+                false,
+                storageGroupName);
+          } else {
+            // merge vm files
+            if (config.getMaxVmNum() <= vmTsFileResources.size()) {
+              isVm = true;
+              isFull = true;
+              flushTask = new MemTableFlushTask(memTableToFlush, writer, vmWriters,
+                  true, true,
+                  storageGroupName);
+            } else {
+              isVm = true;
+              isFull = false;
+              File newVmFile = createNewVMFile();
+              vmTsFileResources.add(new TsFileResource(newVmFile));
+              vmWriters.add(new RestorableTsFileIOWriter(newVmFile));
+              flushTask = new MemTableFlushTask(memTableToFlush, writer, vmWriters,
+                  true, false,
+                  storageGroupName);

Review comment:
       like above~

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
##########
@@ -549,15 +615,86 @@ public void flushOneMemTable() {
     memTableToFlush = flushingMemTables.getFirst();
     if (logger.isInfoEnabled()) {
       logger.info("{}: {} starts to flush a memtable in a flush thread", storageGroupName,
-              tsFileResource.getFile().getName());
+          tsFileResource.getFile().getName());
     }
     // signal memtable only may appear when calling asyncClose()
     if (!memTableToFlush.isSignalMemTable()) {
-      MemTableFlushTask flushTask = new MemTableFlushTask(memTableToFlush, writer,
-          storageGroupName);
       try {
+        boolean isVm = false;
+        boolean isFull = false;
+        MemTableFlushTask flushTask;
+        if (config.isEnableVm()) {
+          long vmPointNum = 0;
+          for (RestorableTsFileIOWriter vmWriter : vmWriters) {
+            Map<String, Map<String, List<ChunkMetadata>>> metadatasForQuery = vmWriter
+                .getMetadatasForQuery();
+            for (String device : metadatasForQuery.keySet()) {
+              Map<String, List<ChunkMetadata>> chunkMetadataListMap = metadatasForQuery.get(device);
+              for (String sensor : chunkMetadataListMap.keySet()) {
+                for (ChunkMetadata chunkMetadata : chunkMetadataListMap.get(sensor)) {
+                  vmPointNum += chunkMetadata.getNumOfPoints();
+                }
+              }
+            }
+          }
+          // all flush to target file
+          if ((
+              (vmPointNum + memTableToFlush.getTotalPointsNum()) / memTableToFlush.getSeriesNumber()
+                  > config
+                  .getMemtablePointThreshold()) || (shouldClose && flushingMemTables.size() == 1)) {
+            isVm = false;
+            isFull = false;
+            flushTask = new MemTableFlushTask(memTableToFlush, writer, vmWriters, false,
+                false,

Review comment:
       like below~




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-iotdb] JackieTien97 merged pull request #1401: [Tmp IOTDB-706] Introduce virtual memtable for larger Chunk

Posted by GitBox <gi...@apache.org>.
JackieTien97 merged pull request #1401:
URL: https://github.com/apache/incubator-iotdb/pull/1401


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-iotdb] JackieTien97 commented on a change in pull request #1401: [IOTDB-706]Introduce virtual memtable for larger Chunk

Posted by GitBox <gi...@apache.org>.
JackieTien97 commented on a change in pull request #1401:
URL: https://github.com/apache/incubator-iotdb/pull/1401#discussion_r443965046



##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
##########
@@ -1944,11 +1991,9 @@ private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB) {
   }
 
   /**
-   * If the historical versions of a file is a sub-set of the given file's, (close and) remove it to reduce
-   * unnecessary merge. Only used when the file sender and the receiver share the same file
-   * close policy.
-   * Warning: DO NOT REMOVE
-   * @param resource
+   * If the historical versions of a file is a sub-set of the given file's, (close and) remove it to
+   * reduce unnecessary merge. Only used when the file sender and the receiver share the same file
+   * close policy. Warning: DO NOT REMOVE

Review comment:
       change back

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
##########
@@ -1858,12 +1907,12 @@ private long computeMaxVersion(Long oldVersion, Long newVersion) {
   }
 
   /**
-   * Find the position of "newTsFileResource" in the sequence files if it can be inserted into them.
-   * @param newTsFileResource
-   * @param newFilePartitionId
-   * @return POS_ALREADY_EXIST(-2) if some file has the same name as the one to be inserted
-   *         POS_OVERLAP(-3) if some file overlaps the new file
-   *         an insertion position i >= -1 if the new file can be inserted between [i, i+1]
+   * Find the position of "newTsFileResource" in the sequence files if it can be inserted into
+   * them.
+   *
+   * @return POS_ALREADY_EXIST(- 2) if some file has the same name as the one to be inserted
+   * POS_OVERLAP(-3) if some file overlaps the new file an insertion position i >= -1 if the new
+   * file can be inserted between [i, i+1]

Review comment:
       change back

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
##########
@@ -400,8 +419,8 @@ void asyncClose() {
   }
 
   /**
-   * TODO if the flushing thread is too fast, the tmpMemTable.wait() may never wakeup
-   * Tips: I am trying to solve this issue by checking whether the table exist before wait()
+   * TODO if the flushing thread is too fast, the tmpMemTable.wait() may never wakeup Tips: I am
+   * trying to solve this issue by checking whether the table exist before wait()

Review comment:
       change back

##########
File path: server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
##########
@@ -556,9 +572,9 @@
   private int defaultFillInterval = -1;
 
   /**
-   * default TTL for storage groups that are not set TTL by statements, in ms
-   * Notice: if this property is changed, previous created storage group which are not set TTL will
-   * also be affected.
+   * default TTL for storage groups that are not set TTL by statements, in ms Notice: if this
+   * property is changed, previous created storage group which are not set TTL will also be

Review comment:
       change back,  the Notice statement should be a new line

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
##########
@@ -230,12 +294,56 @@ public void run() {
         long starTime = System.currentTimeMillis();
         try {
           if (ioMessage instanceof StartFlushGroupIOTask) {
-            writer.startChunkGroup(((StartFlushGroupIOTask) ioMessage).deviceId);
+            if (isVm) {
+              this.vmWriters
+                  .get(MemTableFlushTask.this.vmWriters.size() - 1)
+                  .startChunkGroup(((StartFlushGroupIOTask) ioMessage).deviceId);
+            } else {
+              writer.startChunkGroup(((StartFlushGroupIOTask) ioMessage).deviceId);
+            }
+          } else if (ioMessage instanceof MergeVmIoTask) {
+            RestorableTsFileIOWriter mergeWriter = ((MergeVmIoTask) ioMessage).mergeWriter;
+            for (String deviceId : memTable.getMemTableMap().keySet()) {
+              mergeWriter.startChunkGroup(deviceId);
+              for (String measurementId : memTable.getMemTableMap().get(deviceId).keySet()) {
+                ChunkMetadata newChunkMetadata = null;
+                Chunk newChunk = null;
+                for (RestorableTsFileIOWriter vmWriter : vmWriters) {
+                  TsFileSequenceReader reader = new TsFileSequenceReader(
+                      vmWriter.getFile().getAbsolutePath());
+                  List<ChunkMetadata> chunkMetadataList = vmWriter.getMetadatasForQuery()
+                      .get(deviceId).get(measurementId);
+                  for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+                    Chunk chunk = reader.readMemChunk(chunkMetadata);
+                    if (newChunkMetadata == null) {
+                      newChunkMetadata = chunkMetadata;
+                      newChunk = chunk;
+                    } else {
+                      newChunkMetadata.mergeChunkMetadata(chunkMetadata);
+                      newChunk.mergeChunk(chunk);
+                    }
+                  }
+                }
+                if (newChunkMetadata != null && newChunk != null) {
+                  mergeWriter.writeChunk(newChunk, newChunkMetadata);
+                }
+              }
+              mergeWriter.endChunkGroup();
+            }

Review comment:
       While isVM=false and isFull=false, it seems that the current memTable will be firstly written into writer,  and then all the vm files will be merge and written into writer. However, we should firstly merge the current memTable with all the vm files and then write all of them into writer together instead of separating them.

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
##########
@@ -228,22 +230,20 @@
   private TsFileFlushPolicy fileFlushPolicy;
 
   /**
-   * partitionDirectFileVersions records the versions of the direct TsFiles (generated by close,
-   * not including the files generated by merge) of each partition.
-   * As data file close is managed by the leader in the distributed version, the files with the
-   * same version(s) have the same data, despite that the inner structure (the size and
-   * organization of chunks) may be different, so we can easily find what remote files we do not
-   * have locally.
-   * partition number -> version number set
+   * partitionDirectFileVersions records the versions of the direct TsFiles (generated by close, not
+   * including the files generated by merge) of each partition. As data file close is managed by the
+   * leader in the distributed version, the files with the same version(s) have the same data,
+   * despite that the inner structure (the size and organization of chunks) may be different, so we
+   * can easily find what remote files we do not have locally. partition number -> version number
+   * set
    */
   private Map<Long, Set<Long>> partitionDirectFileVersions = new HashMap<>();
 
   /**
-   * The max file versions in each partition. By recording this, if several IoTDB instances have
-   * the same policy of closing file and their ingestion is identical, then files of the same
-   * version in different IoTDB instance will have identical data, providing convenience for data
-   * comparison across different instances.
-   * partition number -> max version number
+   * The max file versions in each partition. By recording this, if several IoTDB instances have the
+   * same policy of closing file and their ingestion is identical, then files of the same version in
+   * different IoTDB instance will have identical data, providing convenience for data comparison
+   * across different instances. partition number -> max version number

Review comment:
       same as above

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
##########
@@ -228,22 +230,20 @@
   private TsFileFlushPolicy fileFlushPolicy;
 
   /**
-   * partitionDirectFileVersions records the versions of the direct TsFiles (generated by close,
-   * not including the files generated by merge) of each partition.
-   * As data file close is managed by the leader in the distributed version, the files with the
-   * same version(s) have the same data, despite that the inner structure (the size and
-   * organization of chunks) may be different, so we can easily find what remote files we do not
-   * have locally.
-   * partition number -> version number set
+   * partitionDirectFileVersions records the versions of the direct TsFiles (generated by close, not
+   * including the files generated by merge) of each partition. As data file close is managed by the
+   * leader in the distributed version, the files with the same version(s) have the same data,
+   * despite that the inner structure (the size and organization of chunks) may be different, so we
+   * can easily find what remote files we do not have locally. partition number -> version number
+   * set
    */
   private Map<Long, Set<Long>> partitionDirectFileVersions = new HashMap<>();
 
   /**
-   * The max file versions in each partition. By recording this, if several IoTDB instances have
-   * the same policy of closing file and their ingestion is identical, then files of the same
-   * version in different IoTDB instance will have identical data, providing convenience for data
-   * comparison across different instances.
-   * partition number -> max version number
+   * The max file versions in each partition. By recording this, if several IoTDB instances have the
+   * same policy of closing file and their ingestion is identical, then files of the same version in
+   * different IoTDB instance will have identical data, providing convenience for data comparison
+   * across different instances. partition number -> max version number

Review comment:
       same as above

##########
File path: server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
##########
@@ -440,6 +441,21 @@
    */
   private int mergeThreadNum = 1;
 
+  /**
+   * Is vm merge enable
+   */
+  private boolean enableVm = true;
+
+  /**
+   * When a memTable's point num exceeds this, the vm memtable is flushed to disk
+   */
+  private int memtablePointThreshold = 1024;
+
+  /**
+   * The max vm num of each memtable. When vm num exceeds this, the vm files will merge to one.
+   */
+  private int maxVmNum = 100000;

Review comment:
       The default value is set so high, it that for convenience of testing while starting from IoTDB.java?

##########
File path: server/src/assembly/resources/conf/iotdb-engine.properties
##########
@@ -245,6 +245,14 @@ default_fill_interval=-1
 ####################
 ### Merge Configurations
 ####################
+# Is vm merge enable.
+enable_vm=false
+
+# When a memTable's point num exceeds this, the vm memtable is flushed to disk. The default threshold is 1024.

Review comment:
       ```suggestion
   # When a memTable's average point nums of one chunk exceeds this, the vm memtable is flushed to disk. The default threshold is 1024.
   ```

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
##########
@@ -228,22 +230,20 @@
   private TsFileFlushPolicy fileFlushPolicy;
 
   /**
-   * partitionDirectFileVersions records the versions of the direct TsFiles (generated by close,
-   * not including the files generated by merge) of each partition.
-   * As data file close is managed by the leader in the distributed version, the files with the
-   * same version(s) have the same data, despite that the inner structure (the size and
-   * organization of chunks) may be different, so we can easily find what remote files we do not
-   * have locally.
-   * partition number -> version number set
+   * partitionDirectFileVersions records the versions of the direct TsFiles (generated by close, not
+   * including the files generated by merge) of each partition. As data file close is managed by the
+   * leader in the distributed version, the files with the same version(s) have the same data,
+   * despite that the inner structure (the size and organization of chunks) may be different, so we
+   * can easily find what remote files we do not have locally. partition number -> version number
+   * set

Review comment:
       change back

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
##########
@@ -1904,11 +1953,9 @@ private int findInsertionPosition(TsFileResource newTsFileResource, long newFile
 
   /**
    * Compare each device in the two files to find the time relation of them.
-   * @param fileA
-   * @param fileB
-   * @return -1 if fileA is totally older than fileB (A < B)
-   *          0 if fileA is partially older than fileB and partially newer than fileB (A X B)
-   *          1 if fileA is totally newer than fileB (B < A)
+   *
+   * @return -1 if fileA is totally older than fileB (A < B) 0 if fileA is partially older than
+   * fileB and partially newer than fileB (A X B) 1 if fileA is totally newer than fileB (B < A)

Review comment:
       change back

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
##########
@@ -710,8 +758,8 @@ private boolean isAlive(long time) {
   }
 
   /**
-   * insert batch to tsfile processor thread-safety that the caller need to guarantee
-   * The rows to be inserted are in the range [start, end)
+   * insert batch to tsfile processor thread-safety that the caller need to guarantee The rows to be
+   * inserted are in the range [start, end)

Review comment:
       change back

##########
File path: server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
##########
@@ -549,15 +615,86 @@ public void flushOneMemTable() {
     memTableToFlush = flushingMemTables.getFirst();
     if (logger.isInfoEnabled()) {
       logger.info("{}: {} starts to flush a memtable in a flush thread", storageGroupName,
-              tsFileResource.getFile().getName());
+          tsFileResource.getFile().getName());
     }
     // signal memtable only may appear when calling asyncClose()
     if (!memTableToFlush.isSignalMemTable()) {
-      MemTableFlushTask flushTask = new MemTableFlushTask(memTableToFlush, writer,
-          storageGroupName);
       try {
+        boolean isVm = false;
+        boolean isFull = false;
+        MemTableFlushTask flushTask;
+        if (config.isEnableVm()) {
+          long vmPointNum = 0;
+          for (RestorableTsFileIOWriter vmWriter : vmWriters) {
+            Map<String, Map<String, List<ChunkMetadata>>> metadatasForQuery = vmWriter
+                .getMetadatasForQuery();
+            for (String device : metadatasForQuery.keySet()) {
+              Map<String, List<ChunkMetadata>> chunkMetadataListMap = metadatasForQuery.get(device);
+              for (String sensor : chunkMetadataListMap.keySet()) {
+                for (ChunkMetadata chunkMetadata : chunkMetadataListMap.get(sensor)) {
+                  vmPointNum += chunkMetadata.getNumOfPoints();
+                }
+              }
+            }
+          }
+          // all flush to target file
+          if ((
+              (vmPointNum + memTableToFlush.getTotalPointsNum()) / memTableToFlush.getSeriesNumber()

Review comment:
       The denominator should add the number of series that exist in vmWriters but not in memTableToFlush




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org