You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/08/12 12:49:03 UTC
[iotdb] branch rel/0.12 updated: [To rel/0.12] Fix compaction
recover loses data (#3733)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 62377da [To rel/0.12] Fix compaction recover loses data (#3733)
62377da is described below
commit 62377da7b3f00351e8796ddad6f29418004d1fa7
Author: liuxuxin <37...@users.noreply.github.com>
AuthorDate: Thu Aug 12 20:48:37 2021 +0800
[To rel/0.12] Fix compaction recover loses data (#3733)
---
.../level/LevelCompactionTsFileManagement.java | 19 +++++-----
.../engine/compaction/utils/CompactionUtils.java | 7 ++--
.../engine/storagegroup/StorageGroupProcessor.java | 15 +++++---
.../writelog/recover/TsFileRecoverPerformer.java | 2 +-
.../compaction/LevelCompactionRecoverTest.java | 15 +++++---
.../write/writer/RestorableTsFileIOWriter.java | 41 ++++++++++++++++++++++
.../iotdb/tsfile/write/writer/TsFileOutput.java | 4 ++-
7 files changed, 81 insertions(+), 22 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 52141cb..3a6363a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -452,13 +452,12 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
}
}
long timePartition = targetTsFileResource.getTimePartition();
- RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(target);
+ RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(target, false);
// if not complete compaction, resume merge
if (writer.hasCrashed()) {
if (offset > 0) {
- writer.getIOWriterOut().truncate(offset - 1);
+ writer.getIOWriterOut().truncate(offset);
}
- writer.close();
CompactionLogger compactionLogger =
new CompactionLogger(storageGroupDir, storageGroupName);
List<Modification> modifications = new ArrayList<>();
@@ -469,7 +468,8 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
compactionLogger,
deviceSet,
isSeq,
- modifications);
+ modifications,
+ writer);
compactionLogger.close();
} else {
writer.close();
@@ -497,14 +497,13 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
sourceTsFileResources.add(sourceTsFileResource);
}
int level = TsFileResource.getMergeLevel(new File(sourceFileList.get(0)).getName());
- RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(target);
+ RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(target, false);
List<Modification> modifications = new ArrayList<>();
// if not complete compaction, resume merge
if (writer.hasCrashed()) {
if (offset > 0) {
- writer.getIOWriterOut().truncate(offset - 1);
+ writer.getIOWriterOut().truncate(offset);
}
- writer.close();
CompactionLogger compactionLogger =
new CompactionLogger(storageGroupDir, storageGroupName);
CompactionUtils.merge(
@@ -514,7 +513,8 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
compactionLogger,
deviceSet,
isSeq,
- modifications);
+ modifications,
+ writer);
compactionLogger.close();
// complete compaction and add target tsfile
int targetLevel = TsFileResource.getMergeLevel(targetResource.getTsFile().getName());
@@ -718,7 +718,8 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
compactionLogger,
new HashSet<>(),
sequence,
- modifications);
+ modifications,
+ null);
logger.info(
"{} [Compaction] merged level-{}'s {} TsFiles to next level, and start to delete old files",
storageGroupName,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 4fafa9b..7eb7e46 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -258,10 +258,13 @@ public class CompactionUtils {
CompactionLogger compactionLogger,
Set<String> devices,
boolean sequence,
- List<Modification> modifications)
+ List<Modification> modifications,
+ RestorableTsFileIOWriter writer)
throws IOException, IllegalPathException {
Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new HashMap<>();
- RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
+ if (writer == null) {
+ writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
+ }
try {
Map<String, List<Modification>> modificationCache = new HashMap<>();
RateLimiter compactionWriteRateLimiter =
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 1537952..631b0f9 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -268,6 +268,8 @@ public class StorageGroupProcessor {
private String insertWriteLockHolder = "";
+ private volatile boolean compacting = false;
+
/** get the direct byte buffer from pool, each fetch contains two ByteBuffer */
public ByteBuffer[] getWalDirectByteBuffer() {
ByteBuffer[] res = new ByteBuffer[2];
@@ -2074,12 +2076,17 @@ public class StorageGroupProcessor {
}
public void merge() {
- if (!tsFileManagement.recovered) {
- // doing recovered task
+ if (!tsFileManagement.recovered || compacting) {
+ // recovering or doing compaction
+ // stop running new compaction
return;
}
- if (config.getCompactionStrategy() == CompactionStrategy.LEVEL_COMPACTION) {
- new CompactionAllPartitionTask(logicalStorageGroupName).call();
+ try {
+ if (config.getCompactionStrategy() == CompactionStrategy.LEVEL_COMPACTION) {
+ new CompactionAllPartitionTask(logicalStorageGroupName).call();
+ }
+ } finally {
+ compacting = false;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index da53108..9c7fd7a 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -99,7 +99,7 @@ public class TsFileRecoverPerformer {
// remove corrupted part of the TsFile
RestorableTsFileIOWriter restorableTsFileIOWriter;
try {
- restorableTsFileIOWriter = new RestorableTsFileIOWriter(file);
+ restorableTsFileIOWriter = new RestorableTsFileIOWriter(file, false);
} catch (NotCompatibleTsFileException e) {
boolean result = file.delete();
logger.warn("TsFile {} is incompatible. Delete it successfully {}", filePath, result);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
index cc2cb39..35ddc01 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
@@ -138,7 +138,8 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
compactionLogger,
new HashSet<>(),
true,
- new ArrayList<>());
+ new ArrayList<>(),
+ null);
compactionLogger.close();
levelCompactionTsFileManagement.add(targetTsFileResource, true);
levelCompactionTsFileManagement.recover();
@@ -229,7 +230,8 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
compactionLogger,
new HashSet<>(),
true,
- new ArrayList<>());
+ new ArrayList<>(),
+ null);
compactionLogger.close();
BufferedReader logReader =
@@ -344,7 +346,8 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
compactionLogger,
new HashSet<>(),
true,
- new ArrayList<>());
+ new ArrayList<>(),
+ null);
compactionLogger.close();
BufferedReader logReader =
@@ -468,7 +471,8 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
compactionLogger,
new HashSet<>(),
false,
- new ArrayList<>());
+ new ArrayList<>(),
+ null);
compactionLogger.close();
levelCompactionTsFileManagement.add(targetTsFileResource, false);
levelCompactionTsFileManagement.recover();
@@ -675,7 +679,8 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
compactionLogger,
new HashSet<>(),
true,
- new ArrayList<>());
+ new ArrayList<>(),
+ null);
levelCompactionTsFileManagement.add(targetTsFileResource, true);
compactionLogger.close();
levelCompactionTsFileManagement.recover();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index 26f8494..9f14127 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -112,6 +112,47 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
}
}
+ public RestorableTsFileIOWriter(File file, boolean truncate) throws IOException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{} is opened.", file.getName());
+ }
+ this.file = file;
+ this.out = FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), true);
+
+ // file doesn't exist
+ if (file.length() == 0) {
+ startFile();
+ crashed = true;
+ canWrite = true;
+ return;
+ }
+
+ if (file.exists()) {
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) {
+
+ truncatedSize = reader.selfCheck(knownSchemas, chunkGroupMetadataList, true);
+ minPlanIndex = reader.getMinPlanIndex();
+ maxPlanIndex = reader.getMaxPlanIndex();
+ if (truncatedSize == TsFileCheckStatus.COMPLETE_FILE) {
+ crashed = false;
+ canWrite = false;
+ out.close();
+ } else if (truncatedSize == TsFileCheckStatus.INCOMPATIBLE_FILE) {
+ out.close();
+ throw new NotCompatibleTsFileException(
+ String.format("%s is not in TsFile format.", file.getAbsolutePath()));
+ } else {
+ crashed = true;
+ canWrite = true;
+ // remove broken data
+ if (truncate) {
+ out.truncate(truncatedSize);
+ }
+ }
+ }
+ }
+ }
+
/**
* Given a TsFile, generate a writable RestorableTsFileIOWriter. That is, for a complete TsFile,
* the function erases all FileMetadata and supports writing new data; For a incomplete TsFile,
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java
index 5e0b433..3173206 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileOutput.java
@@ -83,7 +83,9 @@ public interface TsFileOutput {
void flush() throws IOException;
/**
- * The same with {@link java.nio.channels.FileChannel#truncate(long)}.
+ * The same with {@link java.nio.channels.FileChannel#truncate(long)}. truncate will truncate from
+ * the position to the end of file (including the position) for example, a file has 80 bytes, if
+ * use truncate(60), it will truncate from 60 to 80. The file remain has data of range [0, 59].
*
* @param size size The new size, a non-negative byte count
*/