You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/03/12 01:51:45 UTC
[iotdb] branch master updated: Fix compaction with cluster snapshot
deletion (#2811)
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ff3a0f9 Fix compaction with cluster snapshot deletion (#2811)
ff3a0f9 is described below
commit ff3a0f9fdebe19d2d97d31657d2e8f4d86dbcd0a
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Fri Mar 12 09:51:19 2021 +0800
Fix compaction with cluster snapshot deletion (#2811)
* add enable unseq compaction
* add compaction exit catchup when cluster snapshot delete local file
Co-authored-by: zhanglingzhe <su...@foxmail.com>
---
.../db/engine/compaction/TsFileManagement.java | 8 ++++++
.../level/LevelCompactionTsFileManagement.java | 31 ++++++++++++++++++++++
.../engine/compaction/utils/CompactionUtils.java | 3 +++
3 files changed, 42 insertions(+)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 1b18011..b176365 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.CloseCompactionMergeCallBack;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.MergeException;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +48,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
import static org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.MERGING_MODIFICATION_FILE_NAME;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
@@ -383,6 +385,12 @@ public abstract class TsFileManagement {
doubleWriteLock(seqFile);
try {
+ // if meet error(like file not found) in merge task, the .merge file may not be deleted
+ File mergedFile =
+ FSFactoryProducer.getFSFactory().getFile(seqFile.getTsFilePath() + MERGE_SUFFIX);
+ if (mergedFile.exists()) {
+ mergedFile.delete();
+ }
updateMergeModification(seqFile);
if (i == seqFiles.size() - 1) {
// FIXME if there is an exception, the the modification file will be not closed.
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 91d572e..8dfbd2b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -664,6 +664,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
}
}
} catch (Exception e) {
+ restoreCompaction();
logger.error("Error occurred in Compaction Merge thread", e);
} finally {
isSeqMerging = false;
@@ -771,6 +772,36 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
throw new IOException();
}
+ /** restore the files back to the status before the compaction task is submitted */
+ private void restoreCompaction() {
+ File logFile =
+ FSFactoryProducer.getFSFactory()
+ .getFile(storageGroupDir, storageGroupName + COMPACTION_LOG_NAME);
+ try {
+ if (logFile.exists()) {
+ CompactionLogAnalyzer logAnalyzer = new CompactionLogAnalyzer(logFile);
+ logAnalyzer.analyze();
+ String targetFilePath = logAnalyzer.getTargetFile();
+ if (targetFilePath != null) {
+ File targetFile = new File(targetFilePath);
+ if (targetFile.exists()) {
+ targetFile.delete();
+ }
+ }
+ }
+ } catch (IOException e) {
+ logger.error("restore compaction failed", e);
+ } finally {
+ if (logFile.exists()) {
+ try {
+ Files.delete(logFile.toPath());
+ } catch (IOException e) {
+ logger.error("delete compaction log file error ", e);
+ }
+ }
+ }
+ }
+
@TestOnly
public Map<Long, List<SortedSet<TsFileResource>>> getSequenceTsFileResources() {
return sequenceTsFileResources;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index bfdd52a..059b07b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -271,6 +271,9 @@ public class CompactionUtils {
for (TsFileResource tsFileResource : tsFileResources) {
TsFileSequenceReader reader =
buildReaderFromTsFileResource(tsFileResource, tsFileSequenceReaderMap, storageGroup);
+ if (reader == null) {
+ throw new IOException();
+ }
Iterator<Map<String, List<ChunkMetadata>>> iterator =
reader.getMeasurementChunkMetadataListMapIterator(device);
chunkMetadataListIteratorCache.put(reader, iterator);