You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/10/19 01:57:51 UTC
[iotdb] 03/06: complete cross refactor
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch compaction_recover_logger_1017
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a83a61097b0e70937dcd49e34c69850408cc6c38
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue Oct 17 18:25:30 2023 +0800
complete cross refactor
---
.../execute/task/AbstractCompactionTask.java | 1 +
.../execute/task/CrossSpaceCompactionTask.java | 114 +++++++++++++++++++++
.../execute/task/InnerSpaceCompactionTask.java | 2 -
3 files changed, 115 insertions(+), 2 deletions(-)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
index 8e378b884c6..18661335876 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
@@ -66,6 +66,7 @@ public abstract class AbstractCompactionTask {
protected long memoryCost = 0L;
+ protected boolean recoverMemoryStatus;
protected CompactionTaskType compactionTaskType;
protected AbstractCompactionTask(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
index 021e61389db..83af4af0c5a 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
@@ -23,23 +23,28 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import org.apache.iotdb.db.service.metrics.FileMetrics;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionExceptionHandler;
+import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionValidationFailedException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICrossCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
+import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
+import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.validator.CompactionValidator;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
+import org.apache.iotdb.tsfile.utils.TsFileUtils;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -50,10 +55,13 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask {
protected List<TsFileResource> selectedUnsequenceFiles;
private File logFile;
protected List<TsFileResource> targetTsfileResourceList;
+ private List<TsFileResource> emptyTargetTsFileResourceList;
protected List<TsFileResource> holdWriteLockList = new ArrayList<>();
protected double selectedSeqFileSize = 0;
protected double selectedUnseqFileSize = 0;
+ protected boolean needRecoverTaskInfoFromLogFile;
+
@SuppressWarnings("squid:S107")
public CrossSpaceCompactionTask(
long timePartition,
@@ -71,6 +79,7 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask {
serialId);
this.selectedSequenceFiles = selectedSequenceFiles;
this.selectedUnsequenceFiles = selectedUnsequenceFiles;
+ this.emptyTargetTsFileResourceList = new ArrayList<>();
this.performer = performer;
this.hashCode = this.toString().hashCode();
this.memoryCost = memoryCost;
@@ -79,9 +88,43 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask {
createSummary();
}
+ public CrossSpaceCompactionTask(
+ String databaseName, String dataRegionId, TsFileManager tsFileManager, File logFile) {
+ super(databaseName, dataRegionId, 0L, tsFileManager, 0L, CompactionTaskType.NORMAL);
+ this.logFile = logFile;
+ this.needRecoverTaskInfoFromLogFile = true;
+ }
+
+ private void recoverTaskInfoFromLogFile() throws IOException {
+ CompactionLogAnalyzer logAnalyzer = new CompactionLogAnalyzer(this.logFile);
+ logAnalyzer.analyze();
+ List<TsFileIdentifier> sourceFileIdentifiers = logAnalyzer.getSourceFileInfos();
+ List<TsFileIdentifier> targetFileIdentifiers = logAnalyzer.getTargetFileInfos();
+ List<TsFileIdentifier> deletedTargetFileIdentifiers = logAnalyzer.getDeletedTargetFileInfos();
+ this.selectedSequenceFiles = new ArrayList<>();
+ sourceFileIdentifiers.stream()
+ .filter(TsFileIdentifier::isSequence)
+ .forEach(f -> this.selectedSequenceFiles.add(new TsFileResource(f.getFileFromDataDirs())));
+ sourceFileIdentifiers.stream()
+ .filter(f -> !f.isSequence())
+ .forEach(
+ f -> this.selectedUnsequenceFiles.add(new TsFileResource(f.getFileFromDataDirs())));
+
+ for (TsFileIdentifier f : targetFileIdentifiers) {
+ File targetFileOnDisk = getRealTargetFile(f, IoTDBConstant.CROSS_COMPACTION_TMP_FILE_SUFFIX);
+ // The targetFileOnDisk may be null, but it won't impact the task recover stage
+ TsFileResource targetTsFile = new TsFileResource(targetFileOnDisk);
+ this.targetTsfileResourceList.add(targetTsFile);
+ if (deletedTargetFileIdentifiers.contains(f)) {
+ this.emptyTargetTsFileResourceList.add(targetTsFile);
+ }
+ }
+ }
+
@Override
@SuppressWarnings({"squid:S6541", "squid:S3776", "squid:S2142"})
public boolean doCompaction() {
+ recoverMemoryStatus = true;
boolean isSuccess = true;
try {
if (!tsFileManager.isAllowCompaction()) {
@@ -163,6 +206,7 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask {
// find empty target files and add log
for (TsFileResource targetResource : targetTsfileResourceList) {
if (targetResource.isDeleted()) {
+ emptyTargetTsFileResourceList.add(targetResource);
compactionLogger.logFile(targetResource, CompactionLogger.STR_DELETED_TARGET_FILES);
compactionLogger.force();
}
@@ -273,6 +317,76 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask {
return isSuccess;
}
+ public void recover() {
+ try {
+ if (needRecoverTaskInfoFromLogFile) {
+ recoverTaskInfoFromLogFile();
+ }
+ if (shouldRollback()) {
+ rollback();
+ } else {
+ // That finishTask() is revoked means
+ finishTask();
+ }
+ } catch (Exception e) {
+ handleRecoverException(e);
+ }
+ }
+
+ private boolean shouldRollback() {
+ return checkAllSourceFileExists(selectedSequenceFiles)
+ && checkAllSourceFileExists(selectedUnsequenceFiles);
+ }
+
+ private void rollback() throws Exception {
+ // if the task has started,
+ if (recoverMemoryStatus) {
+ removeTsFileInMemory(targetTsfileResourceList);
+ insertFilesToTsFileManager(selectedSequenceFiles);
+ insertFilesToTsFileManager(selectedUnsequenceFiles);
+ }
+ deleteCompactionModsFile(selectedSequenceFiles);
+ deleteCompactionModsFile(selectedUnsequenceFiles);
+ // delete target file
+ if (targetTsfileResourceList != null) {
+ if (!deleteTsFilesOnDisk(targetTsfileResourceList)) {
+ throw new CompactionRecoverException("failed to delete target file %s");
+ }
+ }
+ }
+
+ private void finishTask() throws IOException {
+ for (TsFileResource target : targetTsfileResourceList) {
+ if (target.isDeleted() || emptyTargetTsFileResourceList.contains(target)) {
+ // it means the target file is empty after compaction
+ if (target.remove()) {
+ throw new CompactionRecoverException(
+ String.format("failed to delete empty target file %s", target));
+ }
+ } else {
+ File targetFile = target.getTsFile();
+ if (targetFile == null || !TsFileUtils.isTsFileComplete(target.getTsFile())) {
+ throw new CompactionRecoverException(
+ String.format("Target file is not completed. %s", targetFile));
+ }
+ if (recoverMemoryStatus) {
+ target.setStatus(TsFileResourceStatus.NORMAL);
+ }
+ }
+ }
+ if (!deleteTsFilesOnDisk(selectedSequenceFiles)
+ || !deleteTsFilesOnDisk(selectedUnsequenceFiles)) {
+ throw new CompactionRecoverException("source files cannot be deleted successfully");
+ }
+ if (recoverMemoryStatus) {
+ // 这个地方的统计可能是不准确的
+ FileMetrics.getInstance().deleteTsFile(true, selectedSequenceFiles);
+ FileMetrics.getInstance().deleteTsFile(true, selectedUnsequenceFiles);
+ }
+ deleteCompactionModsFile(selectedSequenceFiles);
+ deleteCompactionModsFile(selectedUnsequenceFiles);
+ }
+
@Override
public boolean equalsOtherTask(AbstractCompactionTask otherTask) {
if (!(otherTask instanceof CrossSpaceCompactionTask)) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index 47575d8ea11..8f48cbbdf88 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -72,8 +72,6 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
protected boolean[] isHoldingWriteLock;
protected long maxModsFileSize;
protected AbstractInnerSpaceEstimator innerSpaceEstimator;
-
- protected boolean recoverMemoryStatus;
protected boolean needRecoverTaskInfoFromLogFile;
public InnerSpaceCompactionTask(