You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/03/12 01:51:45 UTC

[iotdb] branch master updated: Fix compaction with cluster snapshot deletion (#2811)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ff3a0f9  Fix compaction with cluster snapshot deletion (#2811)
ff3a0f9 is described below

commit ff3a0f9fdebe19d2d97d31657d2e8f4d86dbcd0a
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Fri Mar 12 09:51:19 2021 +0800

    Fix compaction with cluster snapshot deletion (#2811)
    
    * add enable unseq compaction
    
    * add compaction exit catchup when cluster snapshot delete local file
    
    Co-authored-by: zhanglingzhe <su...@foxmail.com>
---
 .../db/engine/compaction/TsFileManagement.java     |  8 ++++++
 .../level/LevelCompactionTsFileManagement.java     | 31 ++++++++++++++++++++++
 .../engine/compaction/utils/CompactionUtils.java   |  3 +++
 3 files changed, 42 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 1b18011..b176365 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.CloseCompactionMergeCallBack;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.MergeException;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,6 +48,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
 import static org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.MERGING_MODIFICATION_FILE_NAME;
 import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
 
@@ -383,6 +385,12 @@ public abstract class TsFileManagement {
       doubleWriteLock(seqFile);
 
       try {
+        // if meet error(like file not found) in merge task, the .merge file may not be deleted
+        File mergedFile =
+            FSFactoryProducer.getFSFactory().getFile(seqFile.getTsFilePath() + MERGE_SUFFIX);
+        if (mergedFile.exists()) {
+          mergedFile.delete();
+        }
         updateMergeModification(seqFile);
         if (i == seqFiles.size() - 1) {
           // FIXME if there is an exception, the the modification file will be not closed.
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 91d572e..8dfbd2b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -664,6 +664,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
         }
       }
     } catch (Exception e) {
+      restoreCompaction();
       logger.error("Error occurred in Compaction Merge thread", e);
     } finally {
       isSeqMerging = false;
@@ -771,6 +772,36 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
     throw new IOException();
   }
 
+  /** restore the files back to the status before the compaction task is submitted */
+  private void restoreCompaction() {
+    File logFile =
+        FSFactoryProducer.getFSFactory()
+            .getFile(storageGroupDir, storageGroupName + COMPACTION_LOG_NAME);
+    try {
+      if (logFile.exists()) {
+        CompactionLogAnalyzer logAnalyzer = new CompactionLogAnalyzer(logFile);
+        logAnalyzer.analyze();
+        String targetFilePath = logAnalyzer.getTargetFile();
+        if (targetFilePath != null) {
+          File targetFile = new File(targetFilePath);
+          if (targetFile.exists()) {
+            targetFile.delete();
+          }
+        }
+      }
+    } catch (IOException e) {
+      logger.error("restore compaction failed", e);
+    } finally {
+      if (logFile.exists()) {
+        try {
+          Files.delete(logFile.toPath());
+        } catch (IOException e) {
+          logger.error("delete compaction log file error ", e);
+        }
+      }
+    }
+  }
+
   @TestOnly
   public Map<Long, List<SortedSet<TsFileResource>>> getSequenceTsFileResources() {
     return sequenceTsFileResources;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index bfdd52a..059b07b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -271,6 +271,9 @@ public class CompactionUtils {
       for (TsFileResource tsFileResource : tsFileResources) {
         TsFileSequenceReader reader =
             buildReaderFromTsFileResource(tsFileResource, tsFileSequenceReaderMap, storageGroup);
+        if (reader == null) {
+          throw new IOException();
+        }
         Iterator<Map<String, List<ChunkMetadata>>> iterator =
             reader.getMeasurementChunkMetadataListMapIterator(device);
         chunkMetadataListIteratorCache.put(reader, iterator);