You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/03/23 07:49:17 UTC

[GitHub] [iotdb] choubenson commented on a change in pull request #5316: [IOTDB-2561]Compaction code refactoring for restart recovery and exception handling

choubenson commented on a change in pull request #5316:
URL: https://github.com/apache/iotdb/pull/5316#discussion_r832950998



##########
File path: server/src/main/java/org/apache/iotdb/db/engine/compaction/task/CompactionRecoverTask.java
##########
@@ -20,102 +20,481 @@
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.compaction.CompactionUtils;
+import org.apache.iotdb.db.engine.compaction.TsFileIdentifier;
+import org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogAnalyzer;
 import org.apache.iotdb.db.engine.compaction.utils.log.CompactionLogger;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 
+import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.regex.Pattern;
+import java.util.concurrent.atomic.AtomicInteger;
 
-/**
- * CompactionRecoverTask execute the recover process for all compaction task sequentially, including
- * InnerCompactionTask in sequence/unsequence space, CrossSpaceCompaction.
- */
-public class CompactionRecoverTask {
-  private static final Logger logger =
-      LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
-  private TsFileManager tsFileManager;
-  private String logicalStorageGroupName;
-  private String virtualStorageGroupId;
+/** CompactionRecoverTask executes the recover process for all compaction tasks. */
+public class CompactionRecoverTask extends AbstractCompactionTask {
+  private final Logger LOGGER = LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
+  private final File compactionLogFile;
+  private final boolean isInnerSpace;
 
   public CompactionRecoverTask(
-      TsFileManager tsFileManager, String logicalStorageGroupName, String virtualStorageGroupId) {
-    this.tsFileManager = tsFileManager;
-    this.logicalStorageGroupName = logicalStorageGroupName;
-    this.virtualStorageGroupId = virtualStorageGroupId;
+      String logicalStorageGroupName,
+      String virtualStorageGroupName,
+      long timePartition,
+      TsFileManager tsFileManager,
+      AtomicInteger currentTaskNum,
+      File logFile,
+      boolean isInnerSpace) {
+    super(
+        logicalStorageGroupName + "-" + virtualStorageGroupName,
+        timePartition,
+        tsFileManager,
+        currentTaskNum);
+    this.compactionLogFile = logFile;
+    this.isInnerSpace = isInnerSpace;
   }
 
-  public void recoverCrossSpaceCompaction() throws Exception {
-    logger.info("recovering cross compaction");
-    recoverCrossCompactionFromOldVersion();
-    recoverCrossCompaction();
-    logger.info("try to synchronize CompactionScheduler");
+  @Override
+  public void doCompaction() {
+    boolean handleSuccess = true;
+    LOGGER.info(
+        "{} [Compaction][Recover] compaction log is {}", fullStorageGroupName, compactionLogFile);
+    try {
+      if (compactionLogFile.exists()) {
+        LOGGER.info(
+            "{} [Compaction][Recover] compaction log file {} exists, start to recover it",
+            fullStorageGroupName,
+            compactionLogFile);
+        CompactionLogAnalyzer logAnalyzer = new CompactionLogAnalyzer(compactionLogFile);
+        CompactionRecoverFromOld compactionRecoverFromOld = new CompactionRecoverFromOld();
+        if (isInnerSpace && compactionRecoverFromOld.isOldInnerCompactionLog()) {
+          // inner compaction log from previous version (<0.13)
+          logAnalyzer.analyzeOldInnerCompactionLog();
+        } else if (!isInnerSpace && compactionRecoverFromOld.isOldCrossCompactionLog()) {
+          // cross compaction log from previous version (<0.13)
+          logAnalyzer.analyzeOldCrossCompactionLog();
+        } else {
+          logAnalyzer.analyze();
+        }
+        List<TsFileIdentifier> sourceFileIdentifiers = logAnalyzer.getSourceFileInfos();
+        List<TsFileIdentifier> targetFileIdentifiers = logAnalyzer.getTargetFileInfos();
+
+        // compaction log file is incomplete
+        if (targetFileIdentifiers.isEmpty() || sourceFileIdentifiers.isEmpty()) {
+          LOGGER.info(
+              "{} [Compaction][Recover] incomplete log file, abort recover", fullStorageGroupName);
+          return;
+        }
+
+        // check is all source files existed
+        boolean isAllSourcesFileExisted = true;
+        for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
+          File sourceFile = sourceFileIdentifier.getFileFromDataDirs();
+          if (sourceFile == null) {
+            isAllSourcesFileExisted = false;
+            break;
+          }
+        }
+
+        if (isAllSourcesFileExisted) {
+          if (!isInnerSpace && logAnalyzer.isLogFromOld()) {
+            handleSuccess =
+                compactionRecoverFromOld.handleCrossCompactionWithAllSourceFilesExistFromOld(
+                    targetFileIdentifiers);
+          } else {
+            handleSuccess =
+                handleWithAllSourceFilesExist(
+                    targetFileIdentifiers, sourceFileIdentifiers, isInnerSpace);
+          }
+        } else {
+          if (!isInnerSpace && logAnalyzer.isLogFromOld()) {
+            handleSuccess =
+                compactionRecoverFromOld.handleCrossCompactionWithSomeSourceFilesLostFromOld(
+                    targetFileIdentifiers, sourceFileIdentifiers);
+          } else {
+            handleSuccess = handleWithSomeSourceFilesLost(sourceFileIdentifiers);
+          }
+        }
+      }
+    } catch (IOException e) {
+      LOGGER.error("Recover compaction error", e);
+    } finally {
+      if (!handleSuccess) {
+        LOGGER.error(
+            "{} [Compaction][Recover] Failed to recover compaction, set allowCompaction to false",
+            fullStorageGroupName);
+        tsFileManager.setAllowCompaction(false);
+      } else {
+        if (compactionLogFile.exists()) {
+          try {
+            LOGGER.info(
+                "{} [Compaction][Recover] Recover compaction successfully, delete log file {}",
+                fullStorageGroupName,
+                compactionLogFile);
+            FileUtils.delete(compactionLogFile);
+          } catch (IOException e) {
+            LOGGER.error(
+                "{} [Compaction][Recover] Exception occurs while deleting log file {}, set allowCompaction to false",
+                fullStorageGroupName,
+                compactionLogFile,
+                e);
+            tsFileManager.setAllowCompaction(false);
+          }
+        }
+      }
+    }
   }
 
-  private void recoverCrossCompaction() throws Exception {
-    List<String> sequenceDirs = DirectoryManager.getInstance().getAllSequenceFileFolders();
-    for (String dir : sequenceDirs) {
-      File storageGroupDir =
-          new File(
-              dir
-                  + File.separator
-                  + logicalStorageGroupName
-                  + File.separator
-                  + virtualStorageGroupId);
-      if (!storageGroupDir.exists()) {
-        return;
+  /**
+   * All source files exist: (1) delete all the target files and tmp target files (2) delete
+   * compaction mods files.
+   */
+  private boolean handleWithAllSourceFilesExist(
+      List<TsFileIdentifier> targetFileIdentifiers,
+      List<TsFileIdentifier> sourceFileIdentifiers,
+      boolean isInnerSpace) {
+    LOGGER.info(
+        "{} [Compaction][Recover] all source files exists, delete all target files.",
+        fullStorageGroupName);
+
+    // remove tmp target files and target files
+    for (TsFileIdentifier targetFileIdentifier : targetFileIdentifiers) {
+      // xxx.inner or xxx.cross
+      File tmpTargetFile = targetFileIdentifier.getFileFromDataDirs();
+      // xxx.tsfile
+      File targetFile =
+          getFileFromDataDirs(
+              targetFileIdentifier
+                  .getFilePath()
+                  .replace(
+                      isInnerSpace
+                          ? IoTDBConstant.INNER_COMPACTION_TMP_FILE_SUFFIX
+                          : IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX,
+                      TsFileConstant.TSFILE_SUFFIX));
+      TsFileResource targetResource = null;
+      if (tmpTargetFile != null) {
+        targetResource = new TsFileResource(tmpTargetFile);
+      } else if (targetFile != null) {
+        targetResource = new TsFileResource(targetFile);
       }
-      File[] timePartitionDirs = storageGroupDir.listFiles();
-      if (timePartitionDirs == null) {
-        return;
+
+      if (targetResource != null && !targetResource.remove()) {
+        // failed to remove tmp target tsfile
+        // system should not carry out the subsequent compaction in case of data redundant
+        LOGGER.warn(
+            "{} [Compaction][Recover] failed to remove target file {}",
+            fullStorageGroupName,
+            targetResource);
+        return false;
       }
-      for (File timePartitionDir : timePartitionDirs) {
-        if (!timePartitionDir.isDirectory()
-            || !Pattern.compile("[0-9]*").matcher(timePartitionDir.getName()).matches()) {
-          continue;
+    }
+
+    // delete compaction mods files
+    List<TsFileResource> sourceTsFileResourceList = new ArrayList<>();
+    for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
+      sourceTsFileResourceList.add(new TsFileResource(sourceFileIdentifier.getFileFromDataDirs()));
+    }
+    try {
+      CompactionUtils.deleteCompactionModsFile(sourceTsFileResourceList, Collections.emptyList());
+    } catch (Throwable e) {
+      LOGGER.error(
+          "{} [Compaction][Recover] Exception occurs while deleting compaction mods file, set allowCompaction to false",
+          fullStorageGroupName,
+          e);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Some source files lost: delete remaining source files, including: tsfile, resource file, mods
+   * file and compaction mods file.
+   */
+  private boolean handleWithSomeSourceFilesLost(List<TsFileIdentifier> sourceFileIdentifiers) {
+    // some source files have been deleted, while target file must exist.
+    boolean handleSuccess = true;
+    for (TsFileIdentifier sourceFileIdentifier : sourceFileIdentifiers) {
+      File sourceFile = sourceFileIdentifier.getFileFromDataDirs();
+      if (sourceFile != null) {
+        // delete source tsfile, resource file and mods file
+        if (!new TsFileResource(sourceFile).remove()) {
+          LOGGER.error(
+              "{} [Compaction][Recover] fail to delete remaining source file {}.",
+              fullStorageGroupName,
+              sourceFile);
+          handleSuccess = false;
         }
-        File[] compactionLogs =
-            CompactionLogger.findCrossSpaceCompactionLogs(timePartitionDir.getPath());
-        for (File compactionLog : compactionLogs) {
-          logger.info("calling cross compaction task");
-          IoTDBDescriptor.getInstance()
-              .getConfig()
-              .getCrossCompactionStrategy()
-              .getCompactionRecoverTask(
-                  logicalStorageGroupName,
-                  virtualStorageGroupId,
-                  Long.parseLong(timePartitionDir.getName()),
-                  compactionLog,
-                  tsFileManager)
-              .call();
+      } else {
+        // if source file does not exist, its resource file may still exist, so delete it.
+        File resourceFile =
+            getFileFromDataDirs(
+                sourceFileIdentifier.getFilePath() + TsFileResource.RESOURCE_SUFFIX);
+        if (resourceFile != null && !resourceFile.delete()) {
+          LOGGER.error(
+              "{} [Compaction][Recover] fail to delete target file {}, this may cause data incorrectness",
+              fullStorageGroupName,
+              resourceFile);
+          handleSuccess = false;
+        }
+
+        // delete .mods file of source tsfile
+        File modFile =
+            getFileFromDataDirs(sourceFileIdentifier.getFilePath() + ModificationFile.FILE_SUFFIX);
+        if (modFile != null && !modFile.delete()) {
+          LOGGER.error(
+              "{} [Compaction][Recover] fail to delete target file {}, this may cause data incorrectness",
+              fullStorageGroupName,
+              modFile);
+          handleSuccess = false;
         }
       }
+      // delete .compaction.mods file of all source files
+      File compactionModFile =
+          getFileFromDataDirs(
+              sourceFileIdentifier.getFilePath() + ModificationFile.COMPACTION_FILE_SUFFIX);
+      if (compactionModFile != null && !compactionModFile.delete()) {
+        LOGGER.error(
+            "{} [Compaction][Recover] fail to delete target file {}, this may cause data incorrectness",
+            fullStorageGroupName,
+            compactionModFile);
+        handleSuccess = false;
+      }
     }
+    return handleSuccess;
+  }
+
+  /**
+   * This method find the File object of given filePath by searching it in every data directory. If
+   * the file is not found, it will return null.
+   */
+  private File getFileFromDataDirs(String filePath) {
+    String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+    for (String dataDir : dataDirs) {
+      File f = new File(dataDir, filePath);
+      if (f.exists()) {
+        return f;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public boolean equalsOtherTask(AbstractCompactionTask otherTask) {
+    if (otherTask instanceof CompactionRecoverTask) {
+      return compactionLogFile.equals(((CompactionRecoverTask) otherTask).compactionLogFile);
+    }
+    return false;
+  }
+
+  @Override
+  public boolean checkValidAndSetMerging() {
+    return compactionLogFile.exists();
   }
 
-  private void recoverCrossCompactionFromOldVersion() throws Exception {
-    // check whether there is old compaction log from previous version (<0.13)
-    File mergeLogFromOldVersion =
-        new File(
-            tsFileManager.getStorageGroupDir()
-                + File.separator
-                + CompactionLogger.CROSS_COMPACTION_LOG_NAME_FROM_OLD);
-    if (mergeLogFromOldVersion.exists()) {
-      logger.info("calling cross compaction task to recover from previous version.");
-      IoTDBDescriptor.getInstance()
-          .getConfig()
-          .getCrossCompactionStrategy()
-          .getCompactionRecoverTask(
-              logicalStorageGroupName,
-              virtualStorageGroupId,
-              0L,
-              mergeLogFromOldVersion,
-              tsFileManager)
-          .call();
+  @Override
+  public void setSourceFilesToCompactionCandidate() {
+    // do nothing
+  }
+
+  @Override
+  public void resetCompactionCandidateStatusForAllSourceFiles() {
+    // do nothing
+  }
+
+  /**
+   * Used to check whether it is recoverd from last version (<0.13) and perform corresponding
+   * process.
+   */
+  private class CompactionRecoverFromOld {

Review comment:
       Here the nested class needs to call the method of the outer class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org