You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/04/08 03:27:39 UTC
[incubator-iotdb] 01/01: fix meet incompatible file error
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch fix_meet_incompatible_file
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 32bac07d52d2c9d063a7e21ae08b0efc7d2a5831
Author: qiaojialin <64...@qq.com>
AuthorDate: Wed Apr 8 11:27:28 2020 +0800
fix meet incompatible file error
---
.../engine/storagegroup/StorageGroupProcessor.java | 52 ++++++++++++----------
.../write/writer/RestorableTsFileIOWriter.java | 11 ++---
2 files changed, 34 insertions(+), 29 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 0f2a9ef..dbd131d 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -245,28 +245,20 @@ public class StorageGroupProcessor {
logger.info("recover Storage Group {}", storageGroupName);
try {
- // collect TsFiles from sequential and unsequential data directory
- List<TsFileResource> seqTsFiles = getAllFiles(
+ // collect candidate TsFiles from sequential and unsequential data directory
+ List<TsFileResource> tmpSeqTsFiles = getAllFiles(
DirectoryManager.getInstance().getAllSequenceFileFolders());
- List<TsFileResource> unseqTsFiles =
+ List<TsFileResource> tmpUnseqTsFiles =
getAllFiles(DirectoryManager.getInstance().getAllUnSequenceFileFolders());
- recoverSeqFiles(seqTsFiles);
- recoverUnseqFiles(unseqTsFiles);
+ recoverSeqFiles(tmpSeqTsFiles);
+ recoverUnseqFiles(tmpUnseqTsFiles);
- for (TsFileResource resource : seqTsFiles) {
- //After recover, case the TsFile's length is equal to 0, delete both the TsFileResource and the file itself
- if (resource.getFile().length() == 0) {
- deleteTsfile(resource.getFile());
- }
+ for (TsFileResource resource : sequenceFileTreeSet) {
long partitionNum = resource.getTimePartition();
partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
}
- for (TsFileResource resource : unseqTsFiles) {
- //After recover, case the TsFile's length is equal to 0, delete both the TsFileResource and the file itself
- if (resource.getFile().length() == 0) {
- deleteTsfile(resource.getFile());
- }
+ for (TsFileResource resource : unSequenceFileList) {
long partitionNum = resource.getTimePartition();
partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
}
@@ -277,8 +269,8 @@ public class StorageGroupProcessor {
if (mergingMods.exists()) {
mergingModification = new ModificationFile(mergingMods.getPath());
}
- RecoverMergeTask recoverMergeTask = new RecoverMergeTask(seqTsFiles, unseqTsFiles,
- storageGroupSysDir.getPath(), this::mergeEndAction, taskName,
+ RecoverMergeTask recoverMergeTask = new RecoverMergeTask(new ArrayList<>(sequenceFileTreeSet),
+ unSequenceFileList, storageGroupSysDir.getPath(), this::mergeEndAction, taskName,
IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(), storageGroupName);
logger.info("{} a RecoverMergeTask {} starts...", storageGroupName, taskName);
recoverMergeTask
@@ -370,16 +362,22 @@ public class StorageGroupProcessor {
}
}
- private void recoverSeqFiles(List<TsFileResource> tsFiles) throws StorageGroupProcessorException {
+ private void recoverSeqFiles(List<TsFileResource> tsFiles) {
for (int i = 0; i < tsFiles.size(); i++) {
TsFileResource tsFileResource = tsFiles.get(i);
- sequenceFileTreeSet.add(tsFileResource);
long timePartitionId = tsFileResource.getTimePartition();
TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, false,
i == tsFiles.size() - 1);
- RestorableTsFileIOWriter writer = recoverPerformer.recover();
+
+ RestorableTsFileIOWriter writer;
+ try {
+ writer = recoverPerformer.recover();
+ } catch (StorageGroupProcessorException e) {
+ logger.warn("skip InCompatible TsFile: {}", tsFileResource.getPath());
+ continue;
+ }
if (i != tsFiles.size() - 1 || !writer.canWrite()) {
// not the last file or cannot write, just close it
tsFileResource.setClosed(true);
@@ -395,20 +393,25 @@ public class StorageGroupProcessor {
tsFileProcessor.setTimeRangeId(timePartitionId);
writer.makeMetadataVisible();
}
+ sequenceFileTreeSet.add(tsFileResource);
}
}
- private void recoverUnseqFiles(List<TsFileResource> tsFiles)
- throws StorageGroupProcessorException {
+ private void recoverUnseqFiles(List<TsFileResource> tsFiles) {
for (int i = 0; i < tsFiles.size(); i++) {
TsFileResource tsFileResource = tsFiles.get(i);
- unSequenceFileList.add(tsFileResource);
long timePartitionId = tsFileResource.getTimePartition();
TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, true,
i == tsFiles.size() - 1);
- RestorableTsFileIOWriter writer = recoverPerformer.recover();
+ RestorableTsFileIOWriter writer;
+ try {
+ writer = recoverPerformer.recover();
+ } catch (StorageGroupProcessorException e) {
+ logger.warn("skip InCompatible TsFile: {}", tsFileResource.getPath());
+ continue;
+ }
if (i != tsFiles.size() - 1 || !writer.canWrite()) {
// not the last file or cannot write, just close it
tsFileResource.setClosed(true);
@@ -424,6 +427,7 @@ public class StorageGroupProcessor {
tsFileProcessor.setTimeRangeId(timePartitionId);
writer.makeMetadataVisible();
}
+ unSequenceFileList.add(tsFileResource);
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index ee94550..9f88c05 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -47,7 +47,7 @@ import org.slf4j.LoggerFactory;
*/
public class RestorableTsFileIOWriter extends TsFileIOWriter {
- private static final Logger resourceLogger = LoggerFactory.getLogger("FileMonitor");
+ private static final Logger logger = LoggerFactory.getLogger("FileMonitor");
private long truncatedPosition = -1;
private Map<Path, MeasurementSchema> knownSchemas = new HashMap<>();
@@ -65,8 +65,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
* @throws IOException if write failed, or the file is broken but autoRepair==false.
*/
public RestorableTsFileIOWriter(File file) throws IOException {
- if (resourceLogger.isDebugEnabled()) {
- resourceLogger.debug("{} is opened.", file.getName());
+ if (logger.isDebugEnabled()) {
+ logger.debug("{} is opened.", file.getName());
}
this.file = file;
this.out = FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), true);
@@ -93,8 +93,9 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
totalChunkNum = reader.getTotalChunkNum();
if (truncatedPosition == TsFileCheckStatus.INCOMPATIBLE_FILE) {
out.close();
- throw new IOException(
- String.format("%s is not in TsFile format.", file.getAbsolutePath()));
+ boolean result = file.delete();
+ logger.warn("TsFile {} is incompatible. Delete it successfully {}", file.getAbsolutePath(), result);
+ throw new IOException(String.format("%s is not in TsFile format.", file.getAbsolutePath()));
} else if (truncatedPosition == TsFileCheckStatus.ONLY_MAGIC_HEAD) {
crashed = true;
out.truncate(