You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/10/19 01:57:51 UTC

[iotdb] 03/06: complete cross refactor

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch compaction_recover_logger_1017
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a83a61097b0e70937dcd49e34c69850408cc6c38
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Oct 17 18:25:30 2023 +0800

    complete cross refactor
---
 .../execute/task/AbstractCompactionTask.java       |   1 +
 .../execute/task/CrossSpaceCompactionTask.java     | 114 +++++++++++++++++++++
 .../execute/task/InnerSpaceCompactionTask.java     |   2 -
 3 files changed, 115 insertions(+), 2 deletions(-)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
index 8e378b884c6..18661335876 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
@@ -66,6 +66,7 @@ public abstract class AbstractCompactionTask {
 
   protected long memoryCost = 0L;
 
+  protected boolean recoverMemoryStatus;
   protected CompactionTaskType compactionTaskType;
 
   protected AbstractCompactionTask(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
index 021e61389db..83af4af0c5a 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
@@ -23,23 +23,28 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.service.metrics.CompactionMetrics;
 import org.apache.iotdb.db.service.metrics.FileMetrics;
 import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionExceptionHandler;
+import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException;
 import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionValidationFailedException;
 import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICrossCompactionPerformer;
 import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
 import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
 import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
+import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer;
 import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
+import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier;
 import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.validator.CompactionValidator;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
+import org.apache.iotdb.tsfile.utils.TsFileUtils;
 
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -50,10 +55,13 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask {
   protected List<TsFileResource> selectedUnsequenceFiles;
   private File logFile;
   protected List<TsFileResource> targetTsfileResourceList;
+  private List<TsFileResource> emptyTargetTsFileResourceList;
   protected List<TsFileResource> holdWriteLockList = new ArrayList<>();
   protected double selectedSeqFileSize = 0;
   protected double selectedUnseqFileSize = 0;
 
+  protected boolean needRecoverTaskInfoFromLogFile;
+
   @SuppressWarnings("squid:S107")
   public CrossSpaceCompactionTask(
       long timePartition,
@@ -71,6 +79,7 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask {
         serialId);
     this.selectedSequenceFiles = selectedSequenceFiles;
     this.selectedUnsequenceFiles = selectedUnsequenceFiles;
+    this.emptyTargetTsFileResourceList = new ArrayList<>();
     this.performer = performer;
     this.hashCode = this.toString().hashCode();
     this.memoryCost = memoryCost;
@@ -79,9 +88,43 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask {
     createSummary();
   }
 
+  public CrossSpaceCompactionTask(
+      String databaseName, String dataRegionId, TsFileManager tsFileManager, File logFile) {
+    super(databaseName, dataRegionId, 0L, tsFileManager, 0L, CompactionTaskType.NORMAL);
+    this.logFile = logFile;
+    this.needRecoverTaskInfoFromLogFile = true;
+  }
+
+  private void recoverTaskInfoFromLogFile() throws IOException {
+    CompactionLogAnalyzer logAnalyzer = new CompactionLogAnalyzer(this.logFile);
+    logAnalyzer.analyze();
+    List<TsFileIdentifier> sourceFileIdentifiers = logAnalyzer.getSourceFileInfos();
+    List<TsFileIdentifier> targetFileIdentifiers = logAnalyzer.getTargetFileInfos();
+    List<TsFileIdentifier> deletedTargetFileIdentifiers = logAnalyzer.getDeletedTargetFileInfos();
+    this.selectedSequenceFiles = new ArrayList<>();
+    sourceFileIdentifiers.stream()
+        .filter(TsFileIdentifier::isSequence)
+        .forEach(f -> this.selectedSequenceFiles.add(new TsFileResource(f.getFileFromDataDirs())));
+    sourceFileIdentifiers.stream()
+        .filter(f -> !f.isSequence())
+        .forEach(
+            f -> this.selectedUnsequenceFiles.add(new TsFileResource(f.getFileFromDataDirs())));
+
+    for (TsFileIdentifier f : targetFileIdentifiers) {
+      File targetFileOnDisk = getRealTargetFile(f, IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX);
+      // The targetFileOnDisk may be null, but it won't impact the task recover stage
+      TsFileResource targetTsFile = new TsFileResource(targetFileOnDisk);
+      this.targetTsfileResourceList.add(targetTsFile);
+      if (deletedTargetFileIdentifiers.contains(f)) {
+        this.emptyTargetTsFileResourceList.add(targetTsFile);
+      }
+    }
+  }
+
   @Override
   @SuppressWarnings({"squid:S6541", "squid:S3776", "squid:S2142"})
   public boolean doCompaction() {
+    recoverMemoryStatus = true;
     boolean isSuccess = true;
     try {
       if (!tsFileManager.isAllowCompaction()) {
@@ -163,6 +206,7 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask {
         // find empty target files and add log
         for (TsFileResource targetResource : targetTsfileResourceList) {
           if (targetResource.isDeleted()) {
+            emptyTargetTsFileResourceList.add(targetResource);
             compactionLogger.logFile(targetResource, CompactionLogger.STR_DELETED_TARGET_FILES);
             compactionLogger.force();
           }
@@ -273,6 +317,76 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask {
     return isSuccess;
   }
 
+  public void recover() {
+    try {
+      if (needRecoverTaskInfoFromLogFile) {
+        recoverTaskInfoFromLogFile();
+      }
+      if (shouldRollback()) {
+        rollback();
+      } else {
+        // That finishTask() is revoked means
+        finishTask();
+      }
+    } catch (Exception e) {
+      handleRecoverException(e);
+    }
+  }
+
+  private boolean shouldRollback() {
+    return checkAllSourceFileExists(selectedSequenceFiles)
+        && checkAllSourceFileExists(selectedUnsequenceFiles);
+  }
+
+  private void rollback() throws Exception {
+    // if the task has started,
+    if (recoverMemoryStatus) {
+      removeTsFileInMemory(targetTsfileResourceList);
+      insertFilesToTsFileManager(selectedSequenceFiles);
+      insertFilesToTsFileManager(selectedUnsequenceFiles);
+    }
+    deleteCompactionModsFile(selectedSequenceFiles);
+    deleteCompactionModsFile(selectedUnsequenceFiles);
+    // delete target file
+    if (targetTsfileResourceList != null) {
+      if (!deleteTsFilesOnDisk(targetTsfileResourceList)) {
+        throw new CompactionRecoverException("failed to delete target file %s");
+      }
+    }
+  }
+
+  private void finishTask() throws IOException {
+    for (TsFileResource target : targetTsfileResourceList) {
+      if (target.isDeleted() || emptyTargetTsFileResourceList.contains(target)) {
+        // it means the target file is empty after compaction
+        if (target.remove()) {
+          throw new CompactionRecoverException(
+              String.format("failed to delete empty target file %s", target));
+        }
+      } else {
+        File targetFile = target.getTsFile();
+        if (targetFile == null || !TsFileUtils.isTsFileComplete(target.getTsFile())) {
+          throw new CompactionRecoverException(
+              String.format("Target file is not completed. %s", targetFile));
+        }
+        if (recoverMemoryStatus) {
+          target.setStatus(TsFileResourceStatus.NORMAL);
+        }
+      }
+    }
+    if (!deleteTsFilesOnDisk(selectedSequenceFiles)
+        || !deleteTsFilesOnDisk(selectedUnsequenceFiles)) {
+      throw new CompactionRecoverException("source files cannot be deleted successfully");
+    }
+    if (recoverMemoryStatus) {
+      // 这个地方的统计可能是不准确的
+      FileMetrics.getInstance().deleteTsFile(true, selectedSequenceFiles);
+      FileMetrics.getInstance().deleteTsFile(true, selectedUnsequenceFiles);
+    }
+    deleteCompactionModsFile(selectedSequenceFiles);
+    deleteCompactionModsFile(selectedUnsequenceFiles);
+  }
+
   @Override
   public boolean equalsOtherTask(AbstractCompactionTask otherTask) {
     if (!(otherTask instanceof CrossSpaceCompactionTask)) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index 47575d8ea11..8f48cbbdf88 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -72,8 +72,6 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
   protected boolean[] isHoldingWriteLock;
   protected long maxModsFileSize;
   protected AbstractInnerSpaceEstimator innerSpaceEstimator;
-
-  protected boolean recoverMemoryStatus;
   protected boolean needRecoverTaskInfoFromLogFile;
 
   public InnerSpaceCompactionTask(