You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/07/20 06:24:58 UTC

[incubator-iotdb] branch master updated: add vm level test and fix bug (#1522)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d4492c  add vm level test and fix bug (#1522)
0d4492c is described below

commit 0d4492c066e7b806d9bd0ec59660f08f6687edce
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Mon Jul 20 14:24:42 2020 +0800

    add vm level test and fix bug (#1522)
    
    * fix vm recover bug
    Co-authored-by: 张凌哲 <zh...@bytedance.com>
---
 .../engine/storagegroup/StorageGroupProcessor.java |  2 +-
 .../db/engine/storagegroup/TsFileProcessor.java    | 26 ++++++++++++++++++----
 .../writelog/recover/TsFileRecoverPerformer.java   |  3 ++-
 .../storagegroup/TsFileProcessorEnableVmTest.java  |  7 ++++++
 4 files changed, 32 insertions(+), 6 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 6a30c5f..db890bc 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -585,7 +585,7 @@ public class StorageGroupProcessor {
   public static int getVmLevel(File file) {
     String vmLevelStr = file.getPath()
         .substring(file.getPath().lastIndexOf(TSFILE_SUFFIX)).replaceAll(TSFILE_SUFFIX, "")
-        .split(IoTDBConstant.FILE_NAME_SEPARATOR)[0];
+        .split(IoTDBConstant.FILE_NAME_SEPARATOR)[1];
     return Integer.parseInt(vmLevelStr);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 9031941..c5b687c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -605,7 +605,7 @@ public class TsFileProcessor {
     }
   }
 
-  public File createNewVMFile(TsFileResource tsFileResource, int level) {
+  public File createNewVMFileWithLock(TsFileResource tsFileResource, int level) {
     vmFileCreateLock.writeLock().lock();
     try {
       TimeUnit.MILLISECONDS.sleep(1);
@@ -624,6 +624,22 @@ public class TsFileProcessor {
     }
   }
 
+  public static File createNewVMFile(TsFileResource tsFileResource, int level) {
+    try {
+      TimeUnit.MILLISECONDS.sleep(1);
+      File parent = tsFileResource.getTsFile().getParentFile();
+      return FSFactoryProducer.getFSFactory().getFile(parent,
+          tsFileResource.getTsFile().getName() + IoTDBConstant.FILE_NAME_SEPARATOR + level
+              + IoTDBConstant.FILE_NAME_SEPARATOR + System
+              .currentTimeMillis() + VM_SUFFIX);
+    } catch (InterruptedException e) {
+      logger.error("{}: {}, closing task is interrupted.",
+          tsFileResource.getTsFile().getParent(), tsFileResource.getTsFile().getName(), e);
+      Thread.currentThread().interrupt();
+      return null;
+    }
+  }
+
   private void deleteVmFiles(List<TsFileResource> vmMergeTsFiles,
       List<RestorableTsFileIOWriter> vmMergeWriters) throws IOException {
     logger.debug("{}: {} vm merge starts to delete file", storageGroupName,
@@ -693,7 +709,7 @@ public class TsFileProcessor {
           }
           int level = getVmLevel(sourceFileList.get(0));
           if (isMergeFinished) {
-            File newVmFile = createNewVMFile(tsFileResource, level + 1);
+            File newVmFile = createNewVMFileWithLock(tsFileResource, level + 1);
             if (!targetFile.renameTo(newVmFile)) {
               logger.error("Failed to rename {} to {}", targetFile, newVmFile);
             } else {
@@ -760,7 +776,7 @@ public class TsFileProcessor {
         if (config.isEnableVm()) {
           logger.info("{}: {} [Flush] start to flush a memtable to a vm", storageGroupName,
               tsFileResource.getTsFile().getName());
-          File newVmFile = createNewVMFile(tsFileResource, 0);
+          File newVmFile = createNewVMFileWithLock(tsFileResource, 0);
           if (vmWriters.isEmpty()) {
             vmWriters.add(new ArrayList<>());
             vmTsFileResources.add(new ArrayList<>());
@@ -1111,9 +1127,11 @@ public class TsFileProcessor {
       List<List<TsFileResource>> currMergeVmFiles, VmLogger vmLogger) throws IOException {
     VmMergeUtils.merge(writer, packVmWritersToSequenceList(currMergeVmWriters),
         storageGroupName, vmLogger, new HashSet<>(), sequence);
+    vmMergeLock.writeLock().lock();
     for (int i = 0; i < currMergeVmFiles.size(); i++) {
       deleteVmFiles(currMergeVmFiles.get(i), currMergeVmWriters.get(i));
     }
+    vmMergeLock.writeLock().unlock();
   }
 
   private List<RestorableTsFileIOWriter> packVmWritersToSequenceList(
@@ -1187,7 +1205,7 @@ public class TsFileProcessor {
               for (RestorableTsFileIOWriter vmWriter : vmMergeWriters.get(i)) {
                 vmLogger.logFile(SOURCE_NAME, vmWriter.getFile());
               }
-              File newVmFile = createNewVMFile(tsFileResource, i + 1);
+              File newVmFile = createNewVMFileWithLock(tsFileResource, i + 1);
               vmLogger.logFile(TARGET_NAME, newVmFile);
               logger.info("{}: {} [Hot Compaction] merge level-{}'s {} vms to next level vm",
                   storageGroupName, tsFileResource.getTsFile().getName(), i,
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index 1a146d6..d75de25 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.writelog.recover;
 
 import static org.apache.iotdb.db.engine.flush.MemTableFlushTask.getFlushLogFile;
 import static org.apache.iotdb.db.engine.flush.VmLogger.isVMLoggerFileExist;
+import static org.apache.iotdb.db.engine.storagegroup.TsFileProcessor.createNewVMFile;
 import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.RESOURCE_SUFFIX;
 
 import java.io.File;
@@ -176,7 +177,7 @@ public class TsFileRecoverPerformer {
             if (tsFileNotCrashed) {
 
               // if wal exists, we should open a new vmfile to replay it
-              File newVmFile = resource.getProcessor().createNewVMFile(resource, 0);
+              File newVmFile = createNewVMFile(resource, 0);
               TsFileResource newVmTsFileResource = new TsFileResource(newVmFile);
               RestorableTsFileIOWriter newVMWriter = new RestorableTsFileIOWriter(newVmFile);
               if (redoLogs(newVMWriter, newVmTsFileResource)) {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorEnableVmTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorEnableVmTest.java
index 1af2794..7b8044d 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorEnableVmTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorEnableVmTest.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.engine.storagegroup;
 
 import static junit.framework.TestCase.assertTrue;
+import static org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.getVmLevel;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
@@ -127,6 +128,12 @@ public class TsFileProcessorEnableVmTest {
     tsfileResourcesForQuery.clear();
     processor.query(deviceId, measurementId, dataType, encoding, props, context,
         tsfileResourcesForQuery);
+    List<List<TsFileResource>> tsfileResources = processor.getVmTsFileResources();
+    for (List<TsFileResource> levelResources : tsfileResources) {
+      for (TsFileResource resource : levelResources) {
+        assertEquals(0, getVmLevel(resource.getTsFile()));
+      }
+    }
 
     assertEquals(1, tsfileResourcesForQuery.size());
     assertEquals(1, tsfileResourcesForQuery.get(0).getChunkMetadataList().size());