You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/11/03 07:14:33 UTC
[iotdb] branch master updated: [IOTDB-1604] Fix data increase when
shut down iotdb in 0.12 and restart iotdb-0.13 (#4240)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0eeb0ce [IOTDB-1604] Fix data increase when shut down iotdb in 0.12 and restart iotdb-0.13 (#4240)
0eeb0ce is described below
commit 0eeb0ce4aed2f203825689e0ce5abd1e1122c716
Author: liuxuxin <37...@users.noreply.github.com>
AuthorDate: Wed Nov 3 15:14:02 2021 +0800
[IOTDB-1604] Fix data increase when shut down iotdb in 0.12 and restart iotdb-0.13 (#4240)
---
.../SizeTieredCompactionRecoverTask.java | 58 +++++++++++++++++++---
.../inner/utils/InnerSpaceCompactionUtils.java | 2 +-
.../compaction/task/AbstractCompactionTask.java | 7 ++-
.../engine/storagegroup/StorageGroupProcessor.java | 30 ++++++++++-
4 files changed, 87 insertions(+), 10 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTask.java
index 42e357f..bb85289 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTask.java
@@ -74,7 +74,10 @@ public class SizeTieredCompactionRecoverTask extends SizeTieredCompactionTask {
public void doCompaction() {
// read log -> Set<Device> -> doCompaction -> clear
try {
+ LOGGER.info(
+ "{} [Compaction][Recover] compaction log is {}", fullStorageGroupName, compactionLogFile);
if (compactionLogFile.exists()) {
+ LOGGER.info("{} [Compaction][Recover] log exists, start recover", fullStorageGroupName);
SizeTieredCompactionLogAnalyzer logAnalyzer =
new SizeTieredCompactionLogAnalyzer(compactionLogFile);
logAnalyzer.analyze();
@@ -85,27 +88,64 @@ public class SizeTieredCompactionRecoverTask extends SizeTieredCompactionTask {
}
File targetFile = new File(targetFileName);
File resourceFile = new File(targetFileName + ".resource");
+ LOGGER.info("{} [Compaction][Recover] target file is {}", fullStorageGroupName, targetFile);
if (!targetFile.exists()) {
if (resourceFile.exists()) {
if (!resourceFile.delete()) {
- LOGGER.warn("Fail to delete tsfile resource {}", resourceFile);
+ LOGGER.warn(
+ "{} [Compaction][Recover] Fail to delete tsfile resource {}",
+ fullStorageGroupName,
+ resourceFile);
+ } else {
+ LOGGER.info(
+ "{} [Compaction][Recover] Deleted target file resource {}",
+ fullStorageGroupName,
+ resourceFile);
}
}
+ LOGGER.info(
+ "{} [Compaction][Recover] Target file {} not exists, return",
+ fullStorageGroupName,
+ targetFile);
return;
}
RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetFile, false);
if (writer.hasCrashed()) {
+ LOGGER.info(
+ "{} [Compaction][Recover] target file {} crash, start to delete it",
+ fullStorageGroupName,
+ targetFile);
// the target tsfile is crashed, it is not completed
writer.close();
if (!targetFile.delete()) {
- LOGGER.warn("Fail to delete uncompleted file {}", targetFile);
+ LOGGER.warn(
+ "{} [Compaction][Recover] Fail to delete uncompleted file {}",
+ fullStorageGroupName,
+ targetFile);
+ } else {
+ LOGGER.info(
+ "{} [Compaction][Recover] remove target file {}", fullStorageGroupName, targetFile);
}
- if (!resourceFile.delete()) {
- LOGGER.warn("Fail to delete tsfile resource {}", resourceFile);
+ if (resourceFile.exists()) {
+ if (!resourceFile.delete()) {
+ LOGGER.warn(
+ "{} [Compaction][Recover] Fail to delete tsfile resource {}",
+ fullStorageGroupName,
+ resourceFile);
+ } else {
+ LOGGER.info(
+ "{} [Compaction][Recover] delete target file resource {}",
+ fullStorageGroupName,
+ resourceFile);
+ }
}
} else {
// the target tsfile is completed
+ LOGGER.info(
+ "{} [Compaction][Recover] target file {} is completed, remove source files",
+ fullStorageGroupName,
+ targetFile);
TsFileResource targetResource = new TsFileResource(targetFile);
List<TsFileResource> sourceTsFileResources = new ArrayList<>();
for (String sourceFileName : sourceFileList) {
@@ -123,9 +163,15 @@ public class SizeTieredCompactionRecoverTask extends SizeTieredCompactionTask {
} finally {
if (compactionLogFile.exists()) {
if (!compactionLogFile.delete()) {
- LOGGER.warn("fail to delete {}", compactionLogFile);
+ LOGGER.warn(
+ "{} [Compaction][Recover] fail to delete {}",
+ fullStorageGroupName,
+ compactionLogFile);
} else {
- LOGGER.info("delete compaction log {}", compactionLogFile);
+ LOGGER.info(
+ "{} [Compaction][Recover] delete compaction log {}",
+ fullStorageGroupName,
+ compactionLogFile);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
index 8d17795..e2dd85e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
@@ -551,7 +551,7 @@ public class InnerSpaceCompactionUtils {
public static void deleteTsFilesInDisk(
Collection<TsFileResource> mergeTsFiles, String storageGroupName) {
- logger.debug("{} [compaction] merge starts to delete real file", storageGroupName);
+ logger.info("{} [compaction] merge starts to delete real file ", storageGroupName);
for (TsFileResource mergeTsFile : mergeTsFiles) {
deleteTsFile(mergeTsFile);
logger.info(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
index 1555971..70eccf9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
@@ -20,6 +20,8 @@
package org.apache.iotdb.db.engine.compaction.task;
import org.apache.iotdb.db.engine.compaction.CompactionScheduler;
+import org.apache.iotdb.db.engine.compaction.cross.inplace.InplaceCompactionRecoverTask;
+import org.apache.iotdb.db.engine.compaction.inner.sizetiered.SizeTieredCompactionRecoverTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,7 +58,10 @@ public abstract class AbstractCompactionTask implements Callable<Void> {
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
} finally {
- CompactionScheduler.decPartitionCompaction(fullStorageGroupName, timePartition);
+ if (!(this instanceof InplaceCompactionRecoverTask)
+ && !(this instanceof SizeTieredCompactionRecoverTask)) {
+ CompactionScheduler.decPartitionCompaction(fullStorageGroupName, timePartition);
+ }
this.currentTaskNum.decrementAndGet();
}
return null;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index a48d308..f35a7d2 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -118,6 +118,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
import static org.apache.iotdb.db.engine.compaction.cross.inplace.task.CrossSpaceMergeTask.MERGE_SUFFIX;
+import static org.apache.iotdb.db.engine.compaction.inner.utils.SizeTieredCompactionLogger.COMPACTION_LOG_NAME;
import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
@@ -533,6 +534,7 @@ public class StorageGroupProcessor {
}
private void recoverInnerSpaceCompaction(boolean isSequence) throws Exception {
+ // search compaction log for SizeTieredCompaction
List<String> dirs;
if (isSequence) {
dirs = DirectoryManager.getInstance().getAllSequenceFileFolders();
@@ -575,6 +577,26 @@ public class StorageGroupProcessor {
}
}
}
+
+ // search compaction log for old LevelCompaction
+ File logFile =
+ FSFactoryProducer.getFSFactory()
+ .getFile(
+ storageGroupSysDir.getAbsolutePath(),
+ logicalStorageGroupName + COMPACTION_LOG_NAME);
+ if (logFile.exists()) {
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getInnerCompactionStrategy()
+ .getCompactionRecoverTask(
+ tsFileManager.getStorageGroupName(),
+ tsFileManager.getVirtualStorageGroup(),
+ -1,
+ logFile,
+ logFile.getParent(),
+ isSequence)
+ .call();
+ }
}
private void submitTimedCompactionTask() {
@@ -744,7 +766,7 @@ public class StorageGroupProcessor {
}
}
- private void recoverTsFiles(List<TsFileResource> tsFiles, boolean isSeq) {
+ private void recoverTsFiles(List<TsFileResource> tsFiles, boolean isSeq) throws IOException {
for (int i = 0; i < tsFiles.size(); i++) {
TsFileResource tsFileResource = tsFiles.get(i);
long timePartitionId = tsFileResource.getTimePartition();
@@ -759,7 +781,7 @@ public class StorageGroupProcessor {
isSeq,
i == tsFiles.size() - 1);
- RestorableTsFileIOWriter writer;
+ RestorableTsFileIOWriter writer = null;
try {
// this tsfile is not zero level, no need to perform redo wal
if (TsFileResource.getInnerCompactionCount(tsFileResource.getTsFile().getName()) > 0) {
@@ -841,6 +863,10 @@ public class StorageGroupProcessor {
logger.warn(
"Skip TsFile: {} because of error in recover: ", tsFileResource.getTsFilePath(), e);
continue;
+ } finally {
+ if (writer != null) {
+ writer.close();
+ }
}
}
}