You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/04/08 03:27:39 UTC

[incubator-iotdb] 01/01: fix meet incompatible file error

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch fix_meet_incompatible_file
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 32bac07d52d2c9d063a7e21ae08b0efc7d2a5831
Author: qiaojialin <64...@qq.com>
AuthorDate: Wed Apr 8 11:27:28 2020 +0800

    fix meet incompatible file error
---
 .../engine/storagegroup/StorageGroupProcessor.java | 52 ++++++++++++----------
 .../write/writer/RestorableTsFileIOWriter.java     | 11 ++---
 2 files changed, 34 insertions(+), 29 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 0f2a9ef..dbd131d 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -245,28 +245,20 @@ public class StorageGroupProcessor {
     logger.info("recover Storage Group  {}", storageGroupName);
 
     try {
-      // collect TsFiles from sequential and unsequential data directory
-      List<TsFileResource> seqTsFiles = getAllFiles(
+      // collect candidate TsFiles from sequential and unsequential data directory
+      List<TsFileResource> tmpSeqTsFiles = getAllFiles(
           DirectoryManager.getInstance().getAllSequenceFileFolders());
-      List<TsFileResource> unseqTsFiles =
+      List<TsFileResource> tmpUnseqTsFiles =
           getAllFiles(DirectoryManager.getInstance().getAllUnSequenceFileFolders());
 
-      recoverSeqFiles(seqTsFiles);
-      recoverUnseqFiles(unseqTsFiles);
+      recoverSeqFiles(tmpSeqTsFiles);
+      recoverUnseqFiles(tmpUnseqTsFiles);
 
-      for (TsFileResource resource : seqTsFiles) {
-        //After recover, case the TsFile's length is equal to 0, delete both the TsFileResource and the file itself
-        if (resource.getFile().length() == 0) {
-          deleteTsfile(resource.getFile());
-        }
+      for (TsFileResource resource : sequenceFileTreeSet) {
         long partitionNum = resource.getTimePartition();
         partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
       }
-      for (TsFileResource resource : unseqTsFiles) {
-        //After recover, case the TsFile's length is equal to 0, delete both the TsFileResource and the file itself
-        if (resource.getFile().length() == 0) {
-          deleteTsfile(resource.getFile());
-        }
+      for (TsFileResource resource : unSequenceFileList) {
         long partitionNum = resource.getTimePartition();
         partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
       }
@@ -277,8 +269,8 @@ public class StorageGroupProcessor {
       if (mergingMods.exists()) {
         mergingModification = new ModificationFile(mergingMods.getPath());
       }
-      RecoverMergeTask recoverMergeTask = new RecoverMergeTask(seqTsFiles, unseqTsFiles,
-          storageGroupSysDir.getPath(), this::mergeEndAction, taskName,
+      RecoverMergeTask recoverMergeTask = new RecoverMergeTask(new ArrayList<>(sequenceFileTreeSet),
+          unSequenceFileList, storageGroupSysDir.getPath(), this::mergeEndAction, taskName,
           IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(), storageGroupName);
       logger.info("{} a RecoverMergeTask {} starts...", storageGroupName, taskName);
       recoverMergeTask
@@ -370,16 +362,22 @@ public class StorageGroupProcessor {
     }
   }
 
-  private void recoverSeqFiles(List<TsFileResource> tsFiles) throws StorageGroupProcessorException {
+  private void recoverSeqFiles(List<TsFileResource> tsFiles) {
     for (int i = 0; i < tsFiles.size(); i++) {
       TsFileResource tsFileResource = tsFiles.get(i);
-      sequenceFileTreeSet.add(tsFileResource);
       long timePartitionId = tsFileResource.getTimePartition();
 
       TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
           getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, false,
           i == tsFiles.size() - 1);
-      RestorableTsFileIOWriter writer = recoverPerformer.recover();
+
+      RestorableTsFileIOWriter writer;
+      try {
+        writer = recoverPerformer.recover();
+      } catch (StorageGroupProcessorException e) {
+        logger.warn("skip InCompatible TsFile: {}", tsFileResource.getPath());
+        continue;
+      }
       if (i != tsFiles.size() - 1 || !writer.canWrite()) {
         // not the last file or cannot write, just close it
         tsFileResource.setClosed(true);
@@ -395,20 +393,25 @@ public class StorageGroupProcessor {
         tsFileProcessor.setTimeRangeId(timePartitionId);
         writer.makeMetadataVisible();
       }
+      sequenceFileTreeSet.add(tsFileResource);
     }
   }
 
-  private void recoverUnseqFiles(List<TsFileResource> tsFiles)
-      throws StorageGroupProcessorException {
+  private void recoverUnseqFiles(List<TsFileResource> tsFiles) {
     for (int i = 0; i < tsFiles.size(); i++) {
       TsFileResource tsFileResource = tsFiles.get(i);
-      unSequenceFileList.add(tsFileResource);
       long timePartitionId = tsFileResource.getTimePartition();
 
       TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
           getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, true,
           i == tsFiles.size() - 1);
-      RestorableTsFileIOWriter writer = recoverPerformer.recover();
+      RestorableTsFileIOWriter writer;
+      try {
+        writer = recoverPerformer.recover();
+      } catch (StorageGroupProcessorException e) {
+        logger.warn("skip InCompatible TsFile: {}", tsFileResource.getPath());
+        continue;
+      }
       if (i != tsFiles.size() - 1 || !writer.canWrite()) {
         // not the last file or cannot write, just close it
         tsFileResource.setClosed(true);
@@ -424,6 +427,7 @@ public class StorageGroupProcessor {
         tsFileProcessor.setTimeRangeId(timePartitionId);
         writer.makeMetadataVisible();
       }
+      unSequenceFileList.add(tsFileResource);
     }
   }
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index ee94550..9f88c05 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -47,7 +47,7 @@ import org.slf4j.LoggerFactory;
  */
 public class RestorableTsFileIOWriter extends TsFileIOWriter {
 
-  private static final Logger resourceLogger = LoggerFactory.getLogger("FileMonitor");
+  private static final Logger logger = LoggerFactory.getLogger("FileMonitor");
   private long truncatedPosition = -1;
   private Map<Path, MeasurementSchema> knownSchemas = new HashMap<>();
 
@@ -65,8 +65,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
    * @throws IOException if write failed, or the file is broken but autoRepair==false.
    */
   public RestorableTsFileIOWriter(File file) throws IOException {
-    if (resourceLogger.isDebugEnabled()) {
-      resourceLogger.debug("{} is opened.", file.getName());
+    if (logger.isDebugEnabled()) {
+      logger.debug("{} is opened.", file.getName());
     }
     this.file = file;
     this.out = FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), true);
@@ -93,8 +93,9 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
         totalChunkNum = reader.getTotalChunkNum();
         if (truncatedPosition == TsFileCheckStatus.INCOMPATIBLE_FILE) {
           out.close();
-          throw new IOException(
-              String.format("%s is not in TsFile format.", file.getAbsolutePath()));
+          boolean result = file.delete();
+          logger.warn("TsFile {} is incompatible. Delete it successfully {}", file.getAbsolutePath(), result);
+          throw new IOException(String.format("%s is not in TsFile format.", file.getAbsolutePath()));
         } else if (truncatedPosition == TsFileCheckStatus.ONLY_MAGIC_HEAD) {
           crashed = true;
           out.truncate(