You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/11/01 13:42:43 UTC
[iotdb] branch rel/0.12 updated: [To rel/0.12] [IOTDB-1872
IOTDB-1902 IOTDB-1903] Fix data increases abnormally after IoTDB restarts
(#4206)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new faa37c1 [To rel/0.12] [IOTDB-1872 IOTDB-1902 IOTDB-1903] Fix data increases abnormally after IoTDB restarts (#4206)
faa37c1 is described below
commit faa37c1f9b81e8d7487a5c17fd43fc5ea9dac0ee
Author: liuxuxin <37...@users.noreply.github.com>
AuthorDate: Mon Nov 1 21:42:14 2021 +0800
[To rel/0.12] [IOTDB-1872 IOTDB-1902 IOTDB-1903] Fix data increases abnormally after IoTDB restarts (#4206)
---
RELEASE_NOTES.md | 1 +
client-cpp/pom.xml | 4 +-
compile-tools/pom.xml | 6 +-
distribution/pom.xml | 2 +-
example/client-cpp-example/pom.xml | 2 +-
example/udf/pom.xml | 2 +-
grafana/pom.xml | 2 +-
jdbc/pom.xml | 2 +-
pom.xml | 8 +-
.../resources/conf/iotdb-engine.properties | 4 +
.../db/engine/compaction/CompactionStrategy.java | 9 +-
.../db/engine/compaction/TsFileManagement.java | 9 +-
.../level/LevelCompactionTsFileManagement.java | 103 ++++++--
.../no/NoCompactionTsFileManagement.java | 5 +-
.../compaction/utils/CompactionFileInfo.java | 107 ++++++++
.../compaction/utils/CompactionLogAnalyzer.java | 24 +-
.../engine/compaction/utils/CompactionLogger.java | 25 ++
.../db/engine/merge/manage/MergeResource.java | 35 +--
.../db/engine/merge/recover/MergeFileInfo.java | 116 +++++++++
.../{LogAnalyzer.java => MergeLogAnalyzer.java} | 50 +++-
.../iotdb/db/engine/merge/recover/MergeLogger.java | 9 +-
.../iotdb/db/engine/merge/task/MergeFileTask.java | 4 +-
.../db/engine/merge/task/MergeMultiChunkTask.java | 17 +-
.../iotdb/db/engine/merge/task/MergeTask.java | 12 +-
.../db/engine/merge/task/RecoverMergeTask.java | 24 +-
.../engine/storagegroup/StorageGroupProcessor.java | 24 +-
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 2 +-
.../apache/iotdb/db/tools/TsFileSketchTool.java | 2 +-
.../writelog/recover/TsFileRecoverPerformer.java | 2 +-
.../org/apache/iotdb/db/constant/TestConstant.java | 10 +
.../compaction/LevelCompactionCacheTest.java | 2 +-
.../engine/compaction/LevelCompactionLogTest.java | 2 +-
.../compaction/LevelCompactionMergeTest.java | 12 +-
.../engine/compaction/LevelCompactionModsTest.java | 4 +-
.../compaction/LevelCompactionMoreDataTest.java | 2 +-
.../compaction/LevelCompactionRecoverTest.java | 289 +++++++++++++++++----
.../compaction/LevelCompactionSelectorTest.java | 2 +-
.../LevelCompactionTsFileManagementTest.java | 4 +-
.../NoCompactionTsFileManagementTest.java | 4 +-
.../iotdb/tsfile/read/TsFileSequenceReader.java | 14 +-
.../write/writer/RestorableTsFileIOWriter.java | 9 +-
41 files changed, 776 insertions(+), 190 deletions(-)
diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md
index 5818af8..efc072a 100644
--- a/RELEASE_NOTES.md
+++ b/RELEASE_NOTES.md
@@ -61,6 +61,7 @@
* [IOTDB-1850] Fix deserialize page merge rate limiter
* [IOTDB-1879] Fix some Unsequence files never be merged to higher level or Sequence folder
* [ISSUE-3945] Fix Fuzzy query not support multiDevices and alignByDevice Dataset
+* [IOTDB-1872] Fix data increases abnormally after IoTDB restarts
* fix merge ClassCastException: MeasurementMNode
* change sync version check to major version
* init dummyIndex after restart cluster
diff --git a/client-cpp/pom.xml b/client-cpp/pom.xml
index c24caa1..a44b847 100644
--- a/client-cpp/pom.xml
+++ b/client-cpp/pom.xml
@@ -98,8 +98,8 @@
<cmake.root.dir>${project.parent.basedir}/compile-tools/thrift/target/cmake-${cmake-version}-win64-x64/</cmake.root.dir>
<thrift.exec.absolute.path>${project.parent.basedir}/compile-tools/thrift/target/build/compiler/cpp/bin/${cmake.build.type}/thrift.exe</thrift.exec.absolute.path>
<iotdb.server.script>start-server.bat</iotdb.server.script>
- <boost.include.dir />
- <boost.library.dir />
+ <boost.include.dir/>
+ <boost.library.dir/>
</properties>
</profile>
<profile>
diff --git a/compile-tools/pom.xml b/compile-tools/pom.xml
index 188ac3a..c046288 100644
--- a/compile-tools/pom.xml
+++ b/compile-tools/pom.xml
@@ -35,7 +35,7 @@
<cmake-version>3.17.3</cmake-version>
<openssl.include.dir>-Dtrue1=true1</openssl.include.dir>
<bison.executable.dir>-Dtrue1=true1</bison.executable.dir>
- <cmake.build.type />
+ <cmake.build.type/>
</properties>
<modules>
<module>thrift</module>
@@ -114,8 +114,8 @@
<thrift.make.executable>make</thrift.make.executable>
<thrift.compiler.executable>thrift.exe</thrift.compiler.executable>
<gradlew.executable>gradlew.bat</gradlew.executable>
- <boost.include.dir />
- <boost.library.dir />
+ <boost.include.dir/>
+ <boost.library.dir/>
</properties>
</profile>
</profiles>
diff --git a/distribution/pom.xml b/distribution/pom.xml
index ef366ac..0ff89bc 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -29,7 +29,7 @@
</parent>
<artifactId>iotdb-distribution</artifactId>
<name>IoTDB Distribution</name>
- <modules />
+ <modules/>
<build>
<plugins>
<plugin>
diff --git a/example/client-cpp-example/pom.xml b/example/client-cpp-example/pom.xml
index cb6069a..d7cd7ee 100644
--- a/example/client-cpp-example/pom.xml
+++ b/example/client-cpp-example/pom.xml
@@ -69,7 +69,7 @@
<properties>
<cmake.generator>Visual Studio 16 2019</cmake.generator>
<cmake.root.dir>${project.parent.basedir}/../compile-tools/thrift/target/cmake-${cmake-version}-win64-x64/</cmake.root.dir>
- <boost.include.dir />
+ <boost.include.dir/>
</properties>
</profile>
<profile>
diff --git a/example/udf/pom.xml b/example/udf/pom.xml
index c051c61..62c85c7 100644
--- a/example/udf/pom.xml
+++ b/example/udf/pom.xml
@@ -77,7 +77,7 @@
<importOrder>
<order>org.apache.iotdb,,javax,java,\#</order>
</importOrder>
- <removeUnusedImports />
+ <removeUnusedImports/>
</java>
</configuration>
<executions>
diff --git a/grafana/pom.xml b/grafana/pom.xml
index 8a0c7c2..0b39db1 100644
--- a/grafana/pom.xml
+++ b/grafana/pom.xml
@@ -170,7 +170,7 @@
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>${start-class}</mainClass>
</transformer>
diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index 6a5eb43..5cb73c0 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -198,7 +198,7 @@
</goals>
</pluginExecutionFilter>
<action>
- <ignore />
+ <ignore/>
</action>
</pluginExecution>
</pluginExecutions>
diff --git a/pom.xml b/pom.xml
index bbf0b10..0055d21 100644
--- a/pom.xml
+++ b/pom.xml
@@ -151,7 +151,7 @@
<sonar.junit.reportPaths>target/surefire-reports,target/failsafe-reports</sonar.junit.reportPaths>
<!-- By default, the argLine is empty-->
<gson.version>2.8.6</gson.version>
- <argLine />
+ <argLine/>
<!-- whether enable compiling the cpp client-->
<client-cpp>false</client-cpp>
<!-- disable enforcer by default-->
@@ -682,7 +682,7 @@
<importOrder>
<order>org.apache.iotdb,,javax,java,\#</order>
</importOrder>
- <removeUnusedImports />
+ <removeUnusedImports/>
</java>
<lineEndings>UNIX</lineEndings>
</configuration>
@@ -754,7 +754,7 @@
<phase>validate</phase>
<configuration>
<rules>
- <dependencyConvergence />
+ <dependencyConvergence/>
</rules>
</configuration>
<goals>
@@ -800,7 +800,7 @@
</requireJavaVersion>
<!-- Disabled for now as it breaks the ability to build single modules -->
<!--reactorModuleConvergence/-->
- <banVulnerable implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies" />
+ <banVulnerable implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies"/>
</rules>
</configuration>
</execution>
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 0fe904b..bf3026d 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -409,6 +409,10 @@ timestamp_precision=ms
# When point number of a page reaches this, use "append merge" instead of "deserialize merge".
# merge_page_point_number=100
+# How many thread will be used to perform merge task
+# Datatype: int
+# merge_thread_num=1
+
# How many threads will be set up to perform unseq merge chunk sub-tasks, 4 by default.
# Set to 1 when less than or equal to 0.
# merge_chunk_subthread_num=4
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionStrategy.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionStrategy.java
index 96ec9f9..2405502 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionStrategy.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionStrategy.java
@@ -26,13 +26,16 @@ public enum CompactionStrategy {
LEVEL_COMPACTION,
NO_COMPACTION;
- public TsFileManagement getTsFileManagement(String storageGroupName, String storageGroupDir) {
+ public TsFileManagement getTsFileManagement(
+ String storageGroupName, String virtualStrorageGroupId, String storageGroupDir) {
switch (this) {
case LEVEL_COMPACTION:
- return new LevelCompactionTsFileManagement(storageGroupName, storageGroupDir);
+ return new LevelCompactionTsFileManagement(
+ storageGroupName, virtualStrorageGroupId, storageGroupDir);
case NO_COMPACTION:
default:
- return new NoCompactionTsFileManagement(storageGroupName, storageGroupDir);
+ return new NoCompactionTsFileManagement(
+ storageGroupName, virtualStrorageGroupId, storageGroupDir);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 12bd1a3..a9d3a41 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -59,6 +59,7 @@ public abstract class TsFileManagement {
private static final Logger logger = LoggerFactory.getLogger(TsFileManagement.class);
protected String storageGroupName;
protected String storageGroupDir;
+ protected String virtualStorageGroupId;
/** Serialize queries, delete resource files, compaction cleanup files */
private final ReadWriteLock compactionMergeLock = new ReentrantReadWriteLock();
@@ -81,9 +82,11 @@ public abstract class TsFileManagement {
protected ReentrantLock compactionSelectionLock = new ReentrantLock();
- public TsFileManagement(String storageGroupName, String storageGroupDir) {
+ public TsFileManagement(
+ String storageGroupName, String virtualStorageGroupId, String storageGroupDir) {
this.storageGroupName = storageGroupName;
this.storageGroupDir = storageGroupDir;
+ this.virtualStorageGroupId = virtualStorageGroupId;
}
public void setForceFullMerge(boolean forceFullMerge) {
@@ -480,7 +483,7 @@ public abstract class TsFileManagement {
if (seqFiles != null) {
for (TsFileResource seqFile : seqFiles) {
if (seqFile.isMerging()) {
- logger.warn("return because {} is merging", seqFile.getTsFile());
+ logger.debug("return because {} is merging", seqFile.getTsFile());
return false;
}
}
@@ -488,7 +491,7 @@ public abstract class TsFileManagement {
if (unseqFiles != null) {
for (TsFileResource unseqFile : unseqFiles) {
if (unseqFile.isMerging()) {
- logger.warn("return because {} is merging", unseqFile.getTsFile());
+ logger.debug("return because {} is merging", unseqFile.getTsFile());
return false;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 644eec1..0698d7e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.engine.compaction.TsFileManagement;
+import org.apache.iotdb.db.engine.compaction.utils.CompactionFileInfo;
import org.apache.iotdb.db.engine.compaction.utils.CompactionLogAnalyzer;
import org.apache.iotdb.db.engine.compaction.utils.CompactionLogger;
import org.apache.iotdb.db.engine.compaction.utils.CompactionUtils;
@@ -33,6 +34,7 @@ import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.h2.store.fs.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,8 +53,8 @@ import java.util.TreeMap;
import java.util.TreeSet;
import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPACTION_LOG_NAME;
-import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.SOURCE_NAME;
-import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.TARGET_NAME;
+import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.SOURCE_INFO;
+import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.TARGET_INFO;
/** The TsFileManagement for LEVEL_COMPACTION, use level struct to manage TsFile list */
public class LevelCompactionTsFileManagement extends TsFileManagement {
@@ -85,8 +87,9 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
private final List<TsFileResource> sequenceRecoverTsFileResources = new ArrayList<>();
private final List<TsFileResource> unSequenceRecoverTsFileResources = new ArrayList<>();
- public LevelCompactionTsFileManagement(String storageGroupName, String storageGroupDir) {
- super(storageGroupName, storageGroupDir);
+ public LevelCompactionTsFileManagement(
+ String storageGroupName, String virtualStorageGroupId, String storageGroupDir) {
+ super(storageGroupName, virtualStorageGroupId, storageGroupDir);
clear();
}
@@ -121,17 +124,17 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
}
private void deleteLevelFilesInDisk(Collection<TsFileResource> mergeTsFiles) {
- logger.debug("{} [compaction] merge starts to delete real file", storageGroupName);
+ logger.info("{} [compaction] merge starts to delete real file", storageGroupName);
for (TsFileResource mergeTsFile : mergeTsFiles) {
deleteLevelFile(mergeTsFile);
- logger.debug(
+ logger.info(
"{} [Compaction] delete TsFile {}", storageGroupName, mergeTsFile.getTsFilePath());
}
}
private void deleteLevelFilesInList(
long timePartitionId, Collection<TsFileResource> mergeTsFiles, int level, boolean sequence) {
- logger.debug("{} [compaction] merge starts to delete file list", storageGroupName);
+ logger.info("{} [compaction] merge starts to delete file list", storageGroupName);
if (sequence) {
if (sequenceTsFileResources.containsKey(timePartitionId)) {
if (sequenceTsFileResources.get(timePartitionId).size() > level) {
@@ -427,44 +430,60 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
CompactionLogAnalyzer logAnalyzer = new CompactionLogAnalyzer(logFile);
logAnalyzer.analyze();
Set<String> deviceSet = logAnalyzer.getDeviceSet();
- List<String> sourceFileList = logAnalyzer.getSourceFiles();
- String targetFile = logAnalyzer.getTargetFile();
+ List<CompactionFileInfo> sourceFileInfo = logAnalyzer.getSourceFileInfo();
+ CompactionFileInfo targetFileInfo = logAnalyzer.getTargetFileInfo();
+ String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+ File targetFile = null;
boolean isSeq = logAnalyzer.isSeq();
- if (targetFile == null || sourceFileList.isEmpty()) {
+ if (targetFileInfo == null || sourceFileInfo.isEmpty()) {
return;
}
- File target = new File(targetFile);
- if (deviceSet.isEmpty()) {
+ if (deviceSet.isEmpty() && targetFileInfo != null) {
// if not in compaction, just delete the target file
- if (target.exists()) {
- Files.delete(target.toPath());
+ for (String dataDir : dataDirs) {
+ targetFile = targetFileInfo.getFile(dataDir);
+ if (targetFile.exists()) {
+ logger.info(
+ "[Compaction][Recover] Target file {} found, device set is null, delete it",
+ targetFile);
+ FileUtils.delete(targetFile.getPath());
+ return;
+ }
}
- return;
}
// get tsfile resource from list, as they have been recovered in StorageGroupProcessor
- TsFileResource targetResource = getRecoverTsFileResource(targetFile, isSeq);
+ TsFileResource targetResource =
+ getResourceFromDataDirs(true, dataDirs, isSeq, targetFileInfo);
if (targetResource != null) {
// target tsfile is not compeleted
+ logger.info(
+ "[Compaction][Recover] target file {} is not compeleted, remove it", targetResource);
targetResource.remove();
if (isSeq) {
sequenceRecoverTsFileResources.clear();
} else {
unSequenceRecoverTsFileResources.clear();
}
- } else if ((targetResource = getTsFileResource(targetFile, isSeq)) != null) {
+ } else if ((targetResource =
+ getResourceFromDataDirs(false, dataDirs, isSeq, targetFileInfo))
+ != null) {
// complete compaction, delete source files
+ logger.info(
+ "[Compaction][Recover] target file {} is compeleted, remove resource file",
+ targetResource);
long timePartition = targetResource.getTimePartition();
List<TsFileResource> sourceTsFileResources = new ArrayList<>();
- for (String file : sourceFileList) {
+ for (CompactionFileInfo sourceInfo : sourceFileInfo) {
// get tsfile resource from list, as they have been recovered in StorageGroupProcessor
- TsFileResource sourceTsFileResource = getTsFileResource(file, isSeq);
+ TsFileResource sourceTsFileResource =
+ getResourceFromDataDirs(false, dataDirs, isSeq, sourceInfo);
if (sourceTsFileResource == null) {
// if sourceTsFileResource is null, it has been deleted
continue;
}
sourceTsFileResources.add(sourceTsFileResource);
}
- if (sourceFileList.size() != 0) {
+ if (sourceFileInfo.size() != 0) {
List<Modification> modifications = new ArrayList<>();
// if not complete compaction, remove target file
writeLock();
@@ -473,7 +492,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
throw new InterruptedException(
String.format("%s [Compaction] abort", storageGroupName));
}
- int level = TsFileResource.getMergeLevel(new File(sourceFileList.get(0)).getName());
+ int level = TsFileResource.getMergeLevel(sourceFileInfo.get(0).getFilename());
deleteLevelFilesInList(timePartition, sourceTsFileResources, level, isSeq);
} finally {
writeUnlock();
@@ -503,6 +522,27 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
}
}
+ private TsFileResource getResourceFromDataDirs(
+ boolean recover, String[] dataDirs, boolean isSeq, CompactionFileInfo info)
+ throws IOException {
+ TsFileResource foundResource = null;
+ if (recover) {
+ for (String dataDir : dataDirs) {
+ if ((foundResource = getRecoverTsFileResource(info.getFile(dataDir).getPath(), isSeq))
+ != null) {
+ return foundResource;
+ }
+ }
+ } else {
+ for (String dataDir : dataDirs) {
+ if ((foundResource = getTsFileResource(info.getFile(dataDir).getPath(), isSeq)) != null) {
+ return foundResource;
+ }
+ }
+ }
+ return null;
+ }
+
private void deleteAllSubLevelFiles(boolean isSeq, long timePartition) {
if (isSeq) {
for (int level = 0; level < sequenceTsFileResources.get(timePartition).size(); level++) {
@@ -592,6 +632,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
int currMaxLevel,
int currMaxFileNumInEachLevel) {
// wait until unseq merge has finished
+ List<TsFileResource> mergingFiles = new ArrayList<>();
while (isUnseqMerging) {
try {
Thread.sleep(200);
@@ -632,18 +673,31 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
if (!checkAndSetFilesMergingIfNotSet(toMergeTsFiles, null)) {
return false;
}
+ mergingFiles.addAll(toMergeTsFiles);
} finally {
compactionSelectionLock.unlock();
}
compactionLogger = new CompactionLogger(storageGroupDir, storageGroupName);
// log source file list and target file for recover
for (TsFileResource mergeResource : toMergeTsFiles) {
- compactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile());
+ compactionLogger.logFile(
+ SOURCE_INFO,
+ storageGroupName,
+ virtualStorageGroupId,
+ timePartition,
+ mergeResource.getTsFile(),
+ sequence);
}
File newLevelFile =
TsFileResource.modifyTsFileNameMergeCnt(mergeResources.get(i).get(0).getTsFile());
compactionLogger.logSequence(sequence);
- compactionLogger.logFile(TARGET_NAME, newLevelFile);
+ compactionLogger.logFile(
+ TARGET_INFO,
+ storageGroupName,
+ virtualStorageGroupId,
+ timePartition,
+ newLevelFile,
+ sequence);
logger.info(
"{} [Compaction] merge level-{}'s {} TsFiles to next level",
storageGroupName,
@@ -710,6 +764,9 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
logger.error("{} Compaction log close fail", storageGroupName + COMPACTION_LOG_NAME);
}
}
+ for (TsFileResource resource : mergingFiles) {
+ resource.setMerging(false);
+ }
isMergeExecutedInCurrentTask = false;
restoreCompaction();
logger.error("Error occurred in Compaction Merge thread", e);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
index b62ce94..e5b8e47 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
@@ -39,8 +39,9 @@ public class NoCompactionTsFileManagement extends TsFileManagement {
// includes sealed and unsealed unSequence TsFiles
private final Map<Long, List<TsFileResource>> unSequenceFileListMap = new TreeMap<>();
- public NoCompactionTsFileManagement(String storageGroupName, String storageGroupDir) {
- super(storageGroupName, storageGroupDir);
+ public NoCompactionTsFileManagement(
+ String storageGroupName, String virtualStorageGroupId, String storageGroupDir) {
+ super(storageGroupName, virtualStorageGroupId, storageGroupDir);
}
@Deprecated
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionFileInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionFileInfo.java
new file mode 100644
index 0000000..7b75cf8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionFileInfo.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.compaction.utils;
+
+import java.io.File;
+import java.io.IOException;
+
+public class CompactionFileInfo {
+ String logicalStorageGroup;
+ String virtualStorageGroupId;
+ long timePartition;
+ String filename;
+ boolean sequence;
+
+ private CompactionFileInfo(
+ String logicalStorageGroup,
+ String virtualStorageGroupId,
+ long timePartition,
+ String filename,
+ boolean sequence) {
+ this.logicalStorageGroup = logicalStorageGroup;
+ this.virtualStorageGroupId = virtualStorageGroupId;
+ this.timePartition = timePartition;
+ this.filename = filename;
+ this.sequence = sequence;
+ }
+
+ public static CompactionFileInfo parseCompactionFileInfo(String infoString) throws IOException {
+ String[] info = infoString.split(" ");
+ try {
+ return new CompactionFileInfo(
+ info[0], info[1], Long.parseLong(info[2]), info[3], info[4].equals("sequence"));
+ } catch (Exception e) {
+ throw new IOException("invalid compaction log line: " + infoString);
+ }
+ }
+
+ public static CompactionFileInfo parseCompactionFileInfoFromPath(String filePath)
+ throws IOException {
+ String separator = File.separator;
+ if (separator.equals("\\")) {
+ separator = "\\\\";
+ }
+ String[] splitFilePath = filePath.split(separator);
+ int pathLength = splitFilePath.length;
+ if (pathLength < 4) {
+ throw new IOException("invalid compaction file path: " + filePath);
+ }
+ try {
+ return new CompactionFileInfo(
+ splitFilePath[pathLength - 4],
+ splitFilePath[pathLength - 3],
+ Long.parseLong(splitFilePath[pathLength - 2]),
+ splitFilePath[pathLength - 1],
+ splitFilePath[pathLength - 5].equals("sequence"));
+ } catch (Exception e) {
+ throw new IOException("invalid compaction log line: " + filePath);
+ }
+ }
+
+ public File getFile(String dataDir) {
+ return new File(
+ dataDir
+ + File.separator
+ + (sequence ? "sequence" : "unsequence")
+ + File.separator
+ + logicalStorageGroup
+ + File.separator
+ + virtualStorageGroupId
+ + File.separator
+ + timePartition
+ + File.separator
+ + filename);
+ }
+
+ public String getFilename() {
+ return filename;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "%s %s %d %s %s",
+ logicalStorageGroup,
+ virtualStorageGroupId,
+ timePartition,
+ filename,
+ sequence ? "sequence" : "unsequence");
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionLogAnalyzer.java
index 6bf36f6..e84bf91 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionLogAnalyzer.java
@@ -30,7 +30,9 @@ import java.util.Set;
import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.FULL_MERGE;
import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.SEQUENCE_NAME;
+import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.SOURCE_INFO;
import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.SOURCE_NAME;
+import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.TARGET_INFO;
import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.TARGET_NAME;
import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.UNSEQUENCE_NAME;
@@ -42,6 +44,8 @@ public class CompactionLogAnalyzer {
private List<String> deviceList = new ArrayList<>();
private List<Long> offsets = new ArrayList<>();
private List<String> sourceFiles = new ArrayList<>();
+ private List<CompactionFileInfo> sourceFileInfo = new ArrayList<>();
+ private CompactionFileInfo targetFileInfo = null;
private String targetFile = null;
private boolean isSeq = false;
private boolean fullMerge = false;
@@ -61,11 +65,19 @@ public class CompactionLogAnalyzer {
switch (currLine) {
case SOURCE_NAME:
currLine = bufferedReader.readLine();
- sourceFiles.add(currLine);
+ sourceFileInfo.add(CompactionFileInfo.parseCompactionFileInfoFromPath(currLine));
+ break;
+ case SOURCE_INFO:
+ currLine = bufferedReader.readLine();
+ sourceFileInfo.add(CompactionFileInfo.parseCompactionFileInfo(currLine));
break;
case TARGET_NAME:
currLine = bufferedReader.readLine();
- targetFile = currLine;
+ targetFileInfo = CompactionFileInfo.parseCompactionFileInfoFromPath(currLine);
+ break;
+ case TARGET_INFO:
+ currLine = bufferedReader.readLine();
+ targetFileInfo = CompactionFileInfo.parseCompactionFileInfo(currLine);
break;
case FULL_MERGE:
fullMerge = true;
@@ -117,4 +129,12 @@ public class CompactionLogAnalyzer {
public boolean isFullMerge() {
return fullMerge;
}
+
+ public List<CompactionFileInfo> getSourceFileInfo() {
+ return sourceFileInfo;
+ }
+
+ public CompactionFileInfo getTargetFileInfo() {
+ return targetFileInfo;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionLogger.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionLogger.java
index 4f57653..5bb9da6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionLogger.java
@@ -30,7 +30,9 @@ public class CompactionLogger {
public static final String COMPACTION_LOG_NAME = ".compaction.log";
public static final String SOURCE_NAME = "source";
+ public static final String SOURCE_INFO = "info-source";
public static final String TARGET_NAME = "target";
+ public static final String TARGET_INFO = "info-target";
public static final String SEQUENCE_NAME = "sequence";
public static final String UNSEQUENCE_NAME = "unsequence";
public static final String FULL_MERGE = "full merge";
@@ -65,6 +67,29 @@ public class CompactionLogger {
logStream.flush();
}
+ public void logFile(
+ String prefix,
+ String logicalStorageGroup,
+ String virtualStorageGroup,
+ long timePartition,
+ File file,
+ boolean sequence)
+ throws IOException {
+ logStream.write(prefix);
+ logStream.newLine();
+ logStream.write(logicalStorageGroup);
+ logStream.write(" ");
+ logStream.write(virtualStorageGroup);
+ logStream.write(" ");
+ logStream.write(String.valueOf(timePartition));
+ logStream.write(" ");
+ logStream.write(file.getName());
+ logStream.write(" ");
+ logStream.write(sequence ? "sequence" : "unsequence");
+ logStream.newLine();
+ logStream.flush();
+ }
+
public void logSequence(boolean isSeq) throws IOException {
if (isSeq) {
logStream.write(SEQUENCE_NAME);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
index bb00989..42e3068 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.engine.merge.manage;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.query.reader.resource.CachedUnseqResourceMergeReader;
import org.apache.iotdb.db.utils.MergeUtils;
@@ -36,6 +38,7 @@ import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -64,8 +67,6 @@ public class MergeResource {
private Map<TsFileResource, List<Modification>> modificationCache = new HashMap<>();
private Map<TsFileResource, Map<String, Pair<Long, Long>>> startEndTimeCache =
new HashMap<>(); // pair<startTime, endTime>
- private Map<PartialPath, MeasurementSchema> measurementSchemaMap =
- new HashMap<>(); // is this too waste?
private Map<MeasurementSchema, IChunkWriter> chunkWriterCache = new ConcurrentHashMap<>();
private long timeLowerBound = Long.MIN_VALUE;
@@ -103,31 +104,38 @@ public class MergeResource {
fileReaderCache.clear();
fileWriterCache.clear();
modificationCache.clear();
- measurementSchemaMap.clear();
chunkWriterCache.clear();
}
- public MeasurementSchema getSchema(PartialPath path) {
- return measurementSchemaMap.get(path);
- }
-
/**
* Construct a new or get an existing RestorableTsFileIOWriter of a merge temp file for a SeqFile.
* The path of the merge temp file will be the seqFile's + ".merge".
*
* @return A RestorableTsFileIOWriter of a merge temp file for a SeqFile.
*/
- public RestorableTsFileIOWriter getMergeFileWriter(TsFileResource resource) throws IOException {
+ public RestorableTsFileIOWriter getMergeFileWriter(TsFileResource resource, boolean recover)
+ throws IOException {
RestorableTsFileIOWriter writer = fileWriterCache.get(resource);
if (writer == null) {
writer =
new RestorableTsFileIOWriter(
- FSFactoryProducer.getFSFactory().getFile(resource.getTsFilePath() + MERGE_SUFFIX));
+ FSFactoryProducer.getFSFactory().getFile(resource.getTsFilePath() + MERGE_SUFFIX),
+ !recover,
+ recover);
fileWriterCache.put(resource, writer);
}
return writer;
}
+ public void closeAndRemoveWriter(File file) throws IOException {
+ for (TsFileResource resource : fileWriterCache.keySet()) {
+ if (resource.getTsFile().getAbsolutePath().equals(file.getAbsolutePath())) {
+ RestorableTsFileIOWriter writer = fileWriterCache.remove(resource);
+ writer.close();
+ }
+ }
+ }
+
/**
* Query ChunkMetadata of a timeseries from the given TsFile (seq or unseq). The ChunkMetadata is
* not cached since it is usually huge.
@@ -161,11 +169,12 @@ public class MergeResource {
* @param paths names of the timeseries
* @return an array of UnseqResourceMergeReaders each corresponding to a timeseries in paths
*/
- public IPointReader[] getUnseqReaders(List<PartialPath> paths) throws IOException {
+ public IPointReader[] getUnseqReaders(List<PartialPath> paths)
+ throws IOException, MetadataException {
List<Chunk>[] pathChunks = MergeUtils.collectUnseqChunks(paths, unseqFiles, this);
IPointReader[] ret = new IPointReader[paths.size()];
for (int i = 0; i < paths.size(); i++) {
- TSDataType dataType = getSchema(paths.get(i)).getType();
+ TSDataType dataType = MManager.getInstance().getSeriesType(paths.get(i));
ret[i] = new CachedUnseqResourceMergeReader(pathChunks[i], dataType);
}
return ret;
@@ -260,10 +269,6 @@ public class MergeResource {
this.cacheDeviceMeta = cacheDeviceMeta;
}
- public void setMeasurementSchemaMap(Map<PartialPath, MeasurementSchema> measurementSchemaMap) {
- this.measurementSchemaMap = measurementSchemaMap;
- }
-
public void clearChunkWriterCache() {
this.chunkWriterCache.clear();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeFileInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeFileInfo.java
new file mode 100644
index 0000000..2e6863a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeFileInfo.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.merge.recover;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import java.io.File;
+
+public class MergeFileInfo {
+ String logicalStorageGroup;
+ String virtualStorageGroup;
+ long timePartition;
+ String filename;
+ boolean sequence;
+
+ private MergeFileInfo(
+ String logicalStorageGroup,
+ String virtualStorageGroup,
+ long timePartition,
+ String filename,
+ boolean sequence) {
+ this.logicalStorageGroup = logicalStorageGroup;
+ this.virtualStorageGroup = virtualStorageGroup;
+ this.timePartition = timePartition;
+ this.filename = filename;
+ this.sequence = sequence;
+ }
+
+ public static MergeFileInfo getFileInfoFromFile(File file) {
+ String filePath = file.getAbsolutePath();
+ String splitSeparator = File.separator;
+ if (splitSeparator.equals("\\")) {
+ // in regex, split word should be \\
+ splitSeparator = "\\\\";
+ }
+
+ String[] paths = filePath.split(splitSeparator);
+ int pathLength = paths.length;
+ return new MergeFileInfo(
+ paths[pathLength - 4],
+ paths[pathLength - 3],
+ Long.parseLong(paths[pathLength - 2]),
+ paths[pathLength - 1],
+ paths[pathLength - 5].equals("sequence"));
+ }
+
+ public static MergeFileInfo getFileInfoFromString(String infoString) {
+ if (!infoString.contains(File.separator)) {
+ // the info string records info of merge files
+ String[] splits = infoString.split(" ");
+ return new MergeFileInfo(
+ splits[0],
+ splits[1],
+ Long.parseLong(splits[2]),
+ splits[3],
+ Boolean.parseBoolean(splits[4]));
+ } else {
+ // the info string records path of merge files
+ return getFileInfoFromFile(new File(infoString));
+ }
+ }
+
+ public File getFileFromDataDirs() {
+ String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+ for (String dataDir : dataDirs) {
+ File file =
+ new File(
+ dataDir.concat(File.separator)
+ + (sequence ? "sequence" : "unsequence").concat(File.separator)
+ + logicalStorageGroup.concat(File.separator)
+ + virtualStorageGroup.concat(File.separator)
+ + String.valueOf(timePartition).concat(File.separator)
+ + filename);
+ if (file.exists()) {
+ return file;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "%s %s %d %s %s",
+ logicalStorageGroup, virtualStorageGroup, timePartition, filename, sequence);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof MergeFileInfo) {
+ MergeFileInfo otherInfo = (MergeFileInfo) other;
+ return logicalStorageGroup.equals(otherInfo.logicalStorageGroup)
+ && virtualStorageGroup.equals(otherInfo.virtualStorageGroup)
+ && timePartition == otherInfo.timePartition
+ && sequence == otherInfo.sequence
+ && filename.equals(otherInfo.filename);
+ }
+ return false;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogAnalyzer.java
similarity index 84%
rename from server/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java
rename to server/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogAnalyzer.java
index 9693d3c..0ffc740 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/LogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogAnalyzer.java
@@ -48,7 +48,6 @@ import static org.apache.iotdb.db.engine.merge.recover.MergeLogger.STR_MERGE_END
import static org.apache.iotdb.db.engine.merge.recover.MergeLogger.STR_MERGE_START;
import static org.apache.iotdb.db.engine.merge.recover.MergeLogger.STR_SEQ_FILES;
import static org.apache.iotdb.db.engine.merge.recover.MergeLogger.STR_START;
-import static org.apache.iotdb.db.engine.merge.recover.MergeLogger.STR_TIMESERIES;
import static org.apache.iotdb.db.engine.merge.recover.MergeLogger.STR_UNSEQ_FILES;
/**
@@ -59,9 +58,9 @@ import static org.apache.iotdb.db.engine.merge.recover.MergeLogger.STR_UNSEQ_FIL
* server/0seq.tsfile.merge 338 end start root.mergeTest.device0.sensor1 server/0seq.tsfile.merge
* 664 end all ts end server/0seq.tsfile 145462 end merge end
*/
-public class LogAnalyzer {
+public class MergeLogAnalyzer {
- private static final Logger logger = LoggerFactory.getLogger(LogAnalyzer.class);
+ private static final Logger logger = LoggerFactory.getLogger(MergeLogAnalyzer.class);
private MergeResource resource;
private String taskName;
@@ -70,6 +69,7 @@ public class LogAnalyzer {
private Map<File, Long> fileLastPositions = new HashMap<>();
private Map<File, Long> tempFileLastPositions = new HashMap<>();
+ private Map<File, Long> prevTempFileLastPositions = null;
private List<PartialPath> mergedPaths = new ArrayList<>();
private List<PartialPath> unmergedPaths;
@@ -78,7 +78,7 @@ public class LogAnalyzer {
private Status status;
- public LogAnalyzer(
+ public MergeLogAnalyzer(
MergeResource resource, String taskName, File logFile, String storageGroupName) {
this.resource = resource;
this.taskName = taskName;
@@ -126,9 +126,10 @@ public class LogAnalyzer {
break;
}
Iterator<TsFileResource> iterator = resource.getSeqFiles().iterator();
+ MergeFileInfo toMatchedInfo = MergeFileInfo.getFileInfoFromString(currLine);
while (iterator.hasNext()) {
TsFileResource seqFile = iterator.next();
- if (seqFile.getTsFile().getAbsolutePath().equals(currLine)) {
+ if (MergeFileInfo.getFileInfoFromFile(seqFile.getTsFile()).equals(toMatchedInfo)) {
mergeSeqFiles.add(seqFile);
// remove to speed-up next iteration
iterator.remove();
@@ -153,13 +154,14 @@ public class LogAnalyzer {
long startTime = System.currentTimeMillis();
List<TsFileResource> mergeUnseqFiles = new ArrayList<>();
while ((currLine = bufferedReader.readLine()) != null) {
- if (currLine.equals(STR_TIMESERIES)) {
+ if (currLine.equals(STR_MERGE_START)) {
break;
}
Iterator<TsFileResource> iterator = resource.getUnseqFiles().iterator();
+ MergeFileInfo toMatchInfo = MergeFileInfo.getFileInfoFromString(currLine);
while (iterator.hasNext()) {
TsFileResource unseqFile = iterator.next();
- if (unseqFile.getTsFile().getAbsolutePath().equals(currLine)) {
+ if (MergeFileInfo.getFileInfoFromFile(unseqFile.getTsFile()).equals(toMatchInfo)) {
mergeUnseqFiles.add(unseqFile);
// remove to speed-up next iteration
iterator.remove();
@@ -192,6 +194,7 @@ public class LogAnalyzer {
}
List<PartialPath> currTSList = new ArrayList<>();
+ List<PartialPath> prevTSList = null;
long startTime = System.currentTimeMillis();
while ((currLine = bufferedReader.readLine()) != null) {
if (STR_ALL_TS_END.equals(currLine)) {
@@ -211,16 +214,33 @@ public class LogAnalyzer {
} else if (!currLine.contains(STR_END)) {
// file position
String[] splits = currLine.split(" ");
- File file = SystemFileFactory.INSTANCE.getFile(splits[0]);
- Long position = Long.parseLong(splits[1]);
+ Long position = Long.parseLong(splits[splits.length - 1]);
+ MergeFileInfo fileInfo =
+ MergeFileInfo.getFileInfoFromString(currLine.substring(0, currLine.lastIndexOf(' ')));
+ File file = fileInfo.getFileFromDataDirs();
tempFileLastPositions.put(file, position);
} else {
// a TS ends merging
- unmergedPaths.removeAll(currTSList);
- for (Entry<File, Long> entry : tempFileLastPositions.entrySet()) {
- fileLastPositions.put(entry.getKey(), entry.getValue());
+ if (prevTempFileLastPositions == null) {
+ prevTempFileLastPositions = tempFileLastPositions;
+ tempFileLastPositions = new HashMap<>();
+ prevTSList = currTSList;
+ currTSList = new ArrayList<>();
+ } else {
+ unmergedPaths.removeAll(prevTSList);
+ for (Entry<File, Long> entry : prevTempFileLastPositions.entrySet()) {
+ for (File file : fileLastPositions.keySet()) {
+ if (file.getName().equals(entry.getKey().getName())) {
+ fileLastPositions.put(file, entry.getValue());
+ }
+ }
+ }
+ prevTempFileLastPositions = tempFileLastPositions;
+ tempFileLastPositions = new HashMap<>();
+ prevTSList = currTSList;
+ currTSList = new ArrayList<>();
+ mergedPaths.addAll(prevTSList);
}
- mergedPaths.addAll(currTSList);
}
}
tempFileLastPositions = null;
@@ -252,8 +272,10 @@ public class LogAnalyzer {
}
if (!currLine.contains(STR_END)) {
String[] splits = currLine.split(" ");
- currFile = SystemFileFactory.INSTANCE.getFile(splits[0]);
Long lastPost = Long.parseLong(splits[1]);
+ MergeFileInfo fileInfo =
+ MergeFileInfo.getFileInfoFromString(currLine.substring(0, currLine.lastIndexOf(' ')));
+ currFile = fileInfo.getFileFromDataDirs();
fileLastPositions.put(currFile, lastPost);
} else {
if (currFile == null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogger.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogger.java
index 7932c01..ee6c744 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/recover/MergeLogger.java
@@ -36,7 +36,6 @@ public class MergeLogger {
static final String STR_SEQ_FILES = "seqFiles";
static final String STR_UNSEQ_FILES = "unseqFiles";
- static final String STR_TIMESERIES = "timeseries";
static final String STR_START = "start";
static final String STR_END = "end";
static final String STR_ALL_TS_END = "all ts end";
@@ -63,7 +62,7 @@ public class MergeLogger {
}
public void logFilePosition(File file) throws IOException {
- logStream.write(String.format("%s %d", file.getAbsolutePath(), file.length()));
+ logStream.write(String.format("%s %d", MergeFileInfo.getFileInfoFromFile(file), file.length()));
logStream.newLine();
logStream.flush();
}
@@ -81,7 +80,7 @@ public class MergeLogger {
}
public void logFileMergeStart(File file, long position) throws IOException {
- logStream.write(String.format("%s %d", file.getAbsolutePath(), position));
+ logStream.write(String.format("%s %d", MergeFileInfo.getFileInfoFromFile(file), position));
logStream.newLine();
logStream.flush();
}
@@ -107,7 +106,7 @@ public class MergeLogger {
logStream.write(STR_SEQ_FILES);
logStream.newLine();
for (TsFileResource tsFileResource : seqFiles) {
- logStream.write(tsFileResource.getTsFile().getAbsolutePath());
+ logStream.write(MergeFileInfo.getFileInfoFromFile(tsFileResource.getTsFile()).toString());
logStream.newLine();
}
logStream.flush();
@@ -117,7 +116,7 @@ public class MergeLogger {
logStream.write(STR_UNSEQ_FILES);
logStream.newLine();
for (TsFileResource tsFileResource : unseqFiles) {
- logStream.write(tsFileResource.getTsFile().getAbsolutePath());
+ logStream.write(MergeFileInfo.getFileInfoFromFile(tsFileResource.getTsFile()).toString());
logStream.newLine();
}
logStream.flush();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index 34735c4..36d5b5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -176,7 +176,7 @@ public class MergeFileTask {
// filter the chunks that have been merged
oldFileWriter.filterChunks(new HashMap<>(context.getUnmergedChunkStartTimes().get(seqFile)));
- RestorableTsFileIOWriter newFileWriter = resource.getMergeFileWriter(seqFile);
+ RestorableTsFileIOWriter newFileWriter = resource.getMergeFileWriter(seqFile, false);
newFileWriter.close();
try (TsFileSequenceReader newFileReader =
new TsFileSequenceReader(newFileWriter.getFile().getPath())) {
@@ -309,7 +309,7 @@ public class MergeFileTask {
private void moveUnmergedToNew(TsFileResource seqFile) throws IOException {
Map<PartialPath, List<Long>> fileUnmergedChunkStartTimes =
context.getUnmergedChunkStartTimes().get(seqFile);
- RestorableTsFileIOWriter fileWriter = resource.getMergeFileWriter(seqFile);
+ RestorableTsFileIOWriter fileWriter = resource.getMergeFileWriter(seqFile, false);
mergeLogger.logFileMergeStart(fileWriter.getFile(), fileWriter.getFile().length());
logger.debug("{} moving unmerged chunks of {} to the new file", taskName, seqFile);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
index 42e5fc1..9c73718 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
@@ -29,6 +29,8 @@ import org.apache.iotdb.db.engine.merge.selector.IMergePathSelector;
import org.apache.iotdb.db.engine.merge.selector.NaivePathSelector;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.utils.MergeUtils;
import org.apache.iotdb.db.utils.MergeUtils.MetaListEntry;
@@ -125,7 +127,7 @@ public class MergeMultiChunkTask {
this.storageGroupName = storageGroupName;
}
- void mergeSeries() throws IOException {
+ void mergeSeries() throws IOException, MetadataException {
if (logger.isInfoEnabled()) {
logger.info("{} starts to merge {} series", taskName, unmergedSeries.size());
}
@@ -174,7 +176,7 @@ public class MergeMultiChunkTask {
return String.format("Processed %d/%d series", mergedSeriesCnt, unmergedSeries.size());
}
- private void mergePaths() throws IOException {
+ private void mergePaths() throws IOException, MetadataException {
mergeLogger.logTSStart(currMergingPaths);
IPointReader[] unseqReaders = resource.getUnseqReaders(currMergingPaths);
currTimeValuePairs = new TimeValuePair[currMergingPaths.size()];
@@ -205,7 +207,8 @@ public class MergeMultiChunkTask {
return maxSensor;
}
- private void pathsMergeOneFile(int seqFileIdx, IPointReader[] unseqReaders) throws IOException {
+ private void pathsMergeOneFile(int seqFileIdx, IPointReader[] unseqReaders)
+ throws IOException, MetadataException {
TsFileResource currTsFile = resource.getSeqFiles().get(seqFileIdx);
// all paths in one call are from the same device
String deviceId = currMergingPaths.get(0).getDevice();
@@ -306,9 +309,9 @@ public class MergeMultiChunkTask {
return;
}
- RestorableTsFileIOWriter mergeFileWriter = resource.getMergeFileWriter(currTsFile);
+ RestorableTsFileIOWriter mergeFileWriter = resource.getMergeFileWriter(currTsFile, false);
for (PartialPath path : currMergingPaths) {
- MeasurementSchema schema = resource.getSchema(path);
+ MeasurementSchema schema = MManager.getInstance().getSeriesSchema(path);
mergeFileWriter.addSchema(path, schema);
}
// merge unseq data with seq data in this file or small chunks in this file into a larger chunk
@@ -622,11 +625,11 @@ public class MergeMultiChunkTask {
}
@SuppressWarnings("java:S2445") // avoid reading the same reader concurrently
- private void mergeChunkHeap() throws IOException {
+ private void mergeChunkHeap() throws IOException, MetadataException {
while (!chunkIdxHeap.isEmpty()) {
int pathIdx = chunkIdxHeap.poll();
PartialPath path = currMergingPaths.get(pathIdx);
- MeasurementSchema measurementSchema = resource.getSchema(path);
+ MeasurementSchema measurementSchema = MManager.getInstance().getSeriesSchema(path);
IChunkWriter chunkWriter = resource.getChunkWriter(measurementSchema);
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
index 11bc8a5..1b6fe08 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.MergeUtils;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,9 +37,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -117,6 +114,12 @@ public class MergeTask implements Callable<Void> {
private void abort() throws IOException {
states = States.ABORTED;
cleanUp(false);
+ for (TsFileResource resource : resource.getSeqFiles()) {
+ resource.setMerging(false);
+ }
+ for (TsFileResource resource : resource.getUnseqFiles()) {
+ resource.setMerging(false);
+ }
// call the callback to make sure the StorageGroup exit merging status, but passing 2
// empty file lists to avoid files being deleted.
callback.call(
@@ -146,7 +149,6 @@ public class MergeTask implements Callable<Void> {
mergeLogger.logFiles(resource);
Set<PartialPath> devices = IoTDB.metaManager.getDevices(new PartialPath(storageGroupName));
- Map<PartialPath, MeasurementSchema> measurementSchemaMap = new HashMap<>();
List<PartialPath> unmergedSeries = new ArrayList<>();
for (PartialPath device : devices) {
MNode deviceNode = IoTDB.metaManager.getNodeByPath(device);
@@ -154,12 +156,10 @@ public class MergeTask implements Callable<Void> {
if (entry.getValue() instanceof MeasurementMNode) {
// under some situation, the children of a device node may be another device node
PartialPath path = device.concatNode(entry.getKey());
- measurementSchemaMap.put(path, ((MeasurementMNode) entry.getValue()).getSchema());
unmergedSeries.add(path);
}
}
}
- resource.setMeasurementSchemaMap(measurementSchemaMap);
mergeLogger.logMergeStart();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
index 976c374..b629645 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
@@ -20,8 +20,8 @@
package org.apache.iotdb.db.engine.merge.task;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.merge.recover.LogAnalyzer;
-import org.apache.iotdb.db.engine.merge.recover.LogAnalyzer.Status;
+import org.apache.iotdb.db.engine.merge.recover.MergeLogAnalyzer;
+import org.apache.iotdb.db.engine.merge.recover.MergeLogAnalyzer.Status;
import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
import org.apache.iotdb.db.engine.merge.selector.MaxSeriesMergeFileSelector;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -31,11 +31,12 @@ import org.apache.iotdb.db.utils.MergeUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.h2.store.fs.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
@@ -51,7 +52,7 @@ public class RecoverMergeTask extends MergeTask {
private static final Logger logger = LoggerFactory.getLogger(RecoverMergeTask.class);
- private LogAnalyzer analyzer;
+ private MergeLogAnalyzer analyzer;
public RecoverMergeTask(
List<TsFileResource> seqFiles,
@@ -73,7 +74,7 @@ public class RecoverMergeTask extends MergeTask {
}
long startTime = System.currentTimeMillis();
- analyzer = new LogAnalyzer(resource, taskName, logFile, storageGroupName);
+ analyzer = new MergeLogAnalyzer(resource, taskName, logFile, storageGroupName);
Status status = analyzer.analyze();
if (logger.isInfoEnabled()) {
logger.info(
@@ -104,7 +105,7 @@ public class RecoverMergeTask extends MergeTask {
}
}
- private void resumeAfterFilesLogged(boolean continueMerge) throws IOException {
+ private void resumeAfterFilesLogged(boolean continueMerge) throws IOException, MetadataException {
if (continueMerge) {
resumeMergeProgress();
calculateConcurrentSeriesNum();
@@ -214,7 +215,7 @@ public class RecoverMergeTask extends MergeTask {
tsFileResource.getTsFile().getName(),
fileCnt,
resource.getSeqFiles().size());
- RestorableTsFileIOWriter mergeFileWriter = resource.getMergeFileWriter(tsFileResource);
+ RestorableTsFileIOWriter mergeFileWriter = resource.getMergeFileWriter(tsFileResource, true);
mergeFileWriter.makeMetadataVisible();
mergeContext.getUnmergedChunkStartTimes().put(tsFileResource, new HashMap<>());
List<PartialPath> pathsToRecover = analyzer.getMergedPaths();
@@ -287,12 +288,15 @@ public class RecoverMergeTask extends MergeTask {
for (Entry<File, Long> entry : analyzer.getFileLastPositions().entrySet()) {
File file = entry.getKey();
Long lastPosition = entry.getValue();
- if (file.exists() && file.length() != lastPosition) {
- try (FileInputStream fileInputStream = new FileInputStream(file)) {
- FileChannel channel = fileInputStream.getChannel();
+ if (file != null && file.exists() && file.length() != lastPosition && lastPosition != 0) {
+ try (FileOutputStream fileOutputStream = new FileOutputStream(file, true)) {
+ FileChannel channel = fileOutputStream.getChannel();
channel.truncate(lastPosition);
channel.close();
}
+ } else if (file != null && lastPosition == 0) {
+ FileUtils.delete(file.getPath());
+ resource.closeAndRemoveWriter(file);
}
}
analyzer.setFileLastPositions(null);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index a1812ed..8361382 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -110,7 +110,6 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
-import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
@@ -379,7 +378,10 @@ public class StorageGroupProcessor {
IoTDBDescriptor.getInstance()
.getConfig()
.getCompactionStrategy()
- .getTsFileManagement(logicalStorageGroupName, storageGroupSysDir.getAbsolutePath());
+ .getTsFileManagement(
+ logicalStorageGroupName,
+ virtualStorageGroupId,
+ storageGroupSysDir.getAbsolutePath());
ScheduledExecutorService executorService =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
@@ -598,10 +600,6 @@ public class StorageGroupProcessor {
// resources
continueFailedRenames(fileFolder, TEMP_SUFFIX);
- // some TsFiles were going to be replaced by the merged files when the system crashed and
- // the process was interrupted before the merged files could be named
- continueFailedRenames(fileFolder, MERGE_SUFFIX);
-
File[] subFiles = fileFolder.listFiles();
if (subFiles != null) {
for (File partitionFolder : subFiles) {
@@ -609,15 +607,9 @@ public class StorageGroupProcessor {
logger.warn("{} is not a directory.", partitionFolder.getAbsolutePath());
} else if (!partitionFolder.getName().equals(IoTDBConstant.UPGRADE_FOLDER_NAME)) {
// some TsFileResource may be being persisted when the system crashed, try recovered
- // such
- // resources
+ // such resources
continueFailedRenames(partitionFolder, TEMP_SUFFIX);
- // some TsFiles were going to be replaced by the merged files when the system crashed
- // and
- // the process was interrupted before the merged files could be named
- continueFailedRenames(partitionFolder, MERGE_SUFFIX);
-
Collections.addAll(
tsFiles,
fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), TSFILE_SUFFIX));
@@ -674,7 +666,7 @@ public class StorageGroupProcessor {
isSeq,
i == tsFiles.size() - 1);
- RestorableTsFileIOWriter writer;
+ RestorableTsFileIOWriter writer = null;
try {
// this tsfile is not zero level, no need to perform redo wal
if (TsFileResource.getMergeLevel(tsFileResource.getTsFile().getName()) > 0) {
@@ -695,6 +687,10 @@ public class StorageGroupProcessor {
logger.warn(
"Skip TsFile: {} because of error in recover: ", tsFileResource.getTsFilePath(), e);
continue;
+ } finally {
+ if (writer != null) {
+ writer.close();
+ }
}
if (i != tsFiles.size() - 1 || !writer.canWrite()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 7b6e5b1..cbd4f75 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -1075,7 +1075,7 @@ public class PlanExecutor implements IPlanExecutor {
List<ChunkGroupMetadata> chunkGroupMetadataList = new ArrayList<>();
try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) {
- reader.selfCheck(schemaMap, chunkGroupMetadataList, false);
+ reader.selfCheck(schemaMap, chunkGroupMetadataList, false, false);
if (plan.getVerifyMetadata()) {
loadNewTsFileVerifyMetadata(reader);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
index d36f736..d0f4a03 100644
--- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
+++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java
@@ -61,7 +61,7 @@ public class TsFileSketchTool {
try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
TsFileMetadata tsFileMetaData = reader.readFileMetadata();
List<ChunkGroupMetadata> allChunkGroupMetadata = new ArrayList<>();
- reader.selfCheck(null, allChunkGroupMetadata, false);
+ reader.selfCheck(null, allChunkGroupMetadata, false, false);
// begin print
StringBuilder str1 = new StringBuilder();
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index fe3dd71..3b251e2 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -99,7 +99,7 @@ public class TsFileRecoverPerformer {
// remove corrupted part of the TsFile
RestorableTsFileIOWriter restorableTsFileIOWriter;
try {
- restorableTsFileIOWriter = new RestorableTsFileIOWriter(file, needRedoWal);
+ restorableTsFileIOWriter = new RestorableTsFileIOWriter(file, needRedoWal, false);
} catch (NotCompatibleTsFileException e) {
boolean result = file.delete();
logger.warn("TsFile {} is incompatible. Delete it successfully {}", filePath, result);
diff --git a/server/src/test/java/org/apache/iotdb/db/constant/TestConstant.java b/server/src/test/java/org/apache/iotdb/db/constant/TestConstant.java
index 906bed1..61db9f8 100644
--- a/server/src/test/java/org/apache/iotdb/db/constant/TestConstant.java
+++ b/server/src/test/java/org/apache/iotdb/db/constant/TestConstant.java
@@ -30,6 +30,16 @@ public class TestConstant {
public static final String BASE_OUTPUT_PATH = "target".concat(File.separator);
public static final String OUTPUT_DATA_DIR =
BASE_OUTPUT_PATH + "data".concat(File.separator) + TSFILE_PREFIX_PATH;
+ public static final String SEQUENCE_DATA_DIR =
+ BASE_OUTPUT_PATH
+ + "data".concat(File.separator)
+ + "sequence".concat(File.separator)
+ + TSFILE_PREFIX_PATH;
+ public static final String UNSEQUENCE_DATA_DIR =
+ BASE_OUTPUT_PATH
+ + "data".concat(File.separator)
+ + "unsequence".concat(File.separator)
+ + TSFILE_PREFIX_PATH;
public static final String d0 = "root.vehicle.d0";
public static final String s0 = "s0";
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
index 5a62736..94b3a59 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
@@ -73,7 +73,7 @@ public class LevelCompactionCacheTest extends LevelCompactionTest {
@Test
public void testCompactionChunkCache() throws IOException {
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
- new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, "0", tempSGDir.getPath());
TsFileResource tsFileResource = seqResources.get(1);
TsFileSequenceReader reader = new TsFileSequenceReader(tsFileResource.getTsFilePath());
List<Path> paths = reader.getAllPaths();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java
index 913dca6..d93a2e9 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java
@@ -64,7 +64,7 @@ public class LevelCompactionLogTest extends LevelCompactionTest {
@Test
public void testCompactionLog() throws IOException {
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
- new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, "0", tempSGDir.getPath());
levelCompactionTsFileManagement.addAll(seqResources, true);
levelCompactionTsFileManagement.addAll(unseqResources, false);
levelCompactionTsFileManagement.forkCurrentFileList(0);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
index d725734..36567ab 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
@@ -77,7 +77,7 @@ public class LevelCompactionMergeTest extends LevelCompactionTest {
@Test
public void testCompactionMergeOnce() throws IllegalPathException, IOException {
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
- new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, "0", tempSGDir.getPath());
levelCompactionTsFileManagement.addAll(seqResources, true);
levelCompactionTsFileManagement.addAll(unseqResources, false);
levelCompactionTsFileManagement.forkCurrentFileList(0);
@@ -121,7 +121,7 @@ public class LevelCompactionMergeTest extends LevelCompactionTest {
IoTDBDescriptor.getInstance().getConfig().setSeqFileNumInEachLevel(2);
IoTDBDescriptor.getInstance().getConfig().setSeqLevelNum(2);
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
- new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, "0", tempSGDir.getPath());
levelCompactionTsFileManagement.addAll(seqResources, true);
levelCompactionTsFileManagement.addAll(unseqResources, false);
levelCompactionTsFileManagement.forkCurrentFileList(0);
@@ -173,7 +173,7 @@ public class LevelCompactionMergeTest extends LevelCompactionTest {
IoTDBDescriptor.getInstance().getConfig().setMergePagePointNumberThreshold(1);
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
- new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, "0", tempSGDir.getPath());
TsFileResource forthSeqTsFileResource = seqResources.get(3);
PartialPath path =
new PartialPath(
@@ -233,7 +233,7 @@ public class LevelCompactionMergeTest extends LevelCompactionTest {
IoTDBDescriptor.getInstance().getConfig().setMergeChunkPointNumberThreshold(1);
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
- new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, "0", tempSGDir.getPath());
levelCompactionTsFileManagement.addAll(seqResources, true);
levelCompactionTsFileManagement.addAll(unseqResources, false);
levelCompactionTsFileManagement.forkCurrentFileList(0);
@@ -270,7 +270,7 @@ public class LevelCompactionMergeTest extends LevelCompactionTest {
IoTDBDescriptor.getInstance().getConfig().setMergeChunkPointNumberThreshold(100000);
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
- new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, "0", tempSGDir.getPath());
levelCompactionTsFileManagement.addAll(seqResources, true);
levelCompactionTsFileManagement.addAll(unseqResources, false);
levelCompactionTsFileManagement.forkCurrentFileList(0);
@@ -314,7 +314,7 @@ public class LevelCompactionMergeTest extends LevelCompactionTest {
IoTDBDescriptor.getInstance().getConfig().setSeqLevelNum(2);
List<TsFileResource> compactionFiles = prepareTsFileResources();
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
- new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, "0", tempSGDir.getPath());
levelCompactionTsFileManagement.addAll(compactionFiles, true);
QueryContext context = new QueryContext();
PartialPath path =
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionModsTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionModsTest.java
index d419983..1f05996 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionModsTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionModsTest.java
@@ -67,7 +67,7 @@ public class LevelCompactionModsTest extends LevelCompactionTest {
@Test
public void testCompactionMods() throws IllegalPathException, IOException {
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
- new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, "0", tempSGDir.getPath());
TsFileResource sourceTsFileResource = seqResources.get(0);
TsFileResource targetTsFileResource = seqResources.get(1);
List<Modification> filterModifications = new ArrayList<>();
@@ -100,7 +100,7 @@ public class LevelCompactionModsTest extends LevelCompactionTest {
@Test
public void testCompactionModsByOffset() throws IllegalPathException, IOException {
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
- new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, "0", tempSGDir.getPath());
TsFileResource sourceTsFileResource = seqResources.get(0);
TsFileResource targetTsFileResource = seqResources.get(1);
List<Modification> filterModifications = new ArrayList<>();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMoreDataTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMoreDataTest.java
index d2bd748..739e003 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMoreDataTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMoreDataTest.java
@@ -198,7 +198,7 @@ public class LevelCompactionMoreDataTest extends LevelCompactionTest {
@Test
public void testSensorWithTwoOrThreeNode() throws IllegalPathException, IOException {
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
- new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, "0", tempSGDir.getPath());
levelCompactionTsFileManagement.addAll(seqResources, true);
levelCompactionTsFileManagement.addAll(unseqResources, false);
levelCompactionTsFileManagement.forkCurrentFileList(0);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
index 17e2958..d26f1c2 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionRecoverTest.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.db.engine.compaction;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
+import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
import org.apache.iotdb.db.engine.compaction.utils.CompactionLogger;
import org.apache.iotdb.db.engine.compaction.utils.CompactionUtils;
@@ -32,16 +34,28 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
import org.apache.commons.io.FileUtils;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -52,31 +66,185 @@ import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPACTION_LOG_NAME;
import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.SOURCE_NAME;
import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.TARGET_NAME;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
-public class LevelCompactionRecoverTest extends LevelCompactionTest {
+public class LevelCompactionRecoverTest {
File tempSGDir;
+ static final String COMPACTION_TEST_SG = "root.compactionTest";
+
+ protected int seqFileNum = 6;
+ int unseqFileNum = 0;
+ protected int measurementNum = 10;
+ int deviceNum = 10;
+ long ptNum = 100;
+ long flushInterval = 20;
+ TSEncoding encoding = TSEncoding.PLAIN;
+
+ String[] deviceIds;
+ MeasurementSchema[] measurementSchemas;
+
+ List<TsFileResource> seqResources = new ArrayList<>();
+ List<TsFileResource> unseqResources = new ArrayList<>();
+
+ private int prevMergeChunkThreshold;
+
+ void prepareSeries() throws MetadataException {
+ measurementSchemas = new MeasurementSchema[measurementNum];
+ for (int i = 0; i < measurementNum; i++) {
+ measurementSchemas[i] =
+ new MeasurementSchema(
+ "sensor" + i, TSDataType.DOUBLE, encoding, CompressionType.UNCOMPRESSED);
+ }
+ deviceIds = new String[deviceNum];
+ for (int i = 0; i < deviceNum; i++) {
+ deviceIds[i] = COMPACTION_TEST_SG + PATH_SEPARATOR + "device" + i;
+ }
+ IoTDB.metaManager.setStorageGroup(new PartialPath(COMPACTION_TEST_SG));
+ for (String device : deviceIds) {
+ for (MeasurementSchema measurementSchema : measurementSchemas) {
+ PartialPath devicePath = new PartialPath(device);
+ IoTDB.metaManager.createTimeseries(
+ devicePath.concatNode(measurementSchema.getMeasurementId()),
+ measurementSchema.getType(),
+ measurementSchema.getEncodingType(),
+ measurementSchema.getCompressor(),
+ Collections.emptyMap());
+ }
+ }
+ }
+
+ void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException {
+ for (int i = 0; i < seqFileNum; i++) {
+ File file =
+ new File(
+ TestConstant.SEQUENCE_DATA_DIR.concat(
+ i
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + i
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ if (!file.getParentFile().exists()) {
+ Assert.assertTrue(file.getParentFile().mkdirs());
+ }
+ TsFileResource tsFileResource = new TsFileResource(file);
+ tsFileResource.setClosed(true);
+ tsFileResource.updatePlanIndexes((long) i);
+ seqResources.add(tsFileResource);
+ prepareFile(tsFileResource, i * ptNum, ptNum, 0);
+ }
+ for (int i = 0; i < unseqFileNum; i++) {
+ File file =
+ new File(
+ TestConstant.UNSEQUENCE_DATA_DIR.concat(
+ (10000 + i)
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + (10000 + i)
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ if (!file.getParentFile().exists()) {
+ Assert.assertTrue(file.getParentFile().mkdirs());
+ }
+ TsFileResource tsFileResource = new TsFileResource(file);
+ tsFileResource.setClosed(true);
+ tsFileResource.updatePlanIndexes(i + seqFileNum);
+ unseqResources.add(tsFileResource);
+ prepareFile(tsFileResource, i * ptNum, ptNum * (i + 1) / unseqFileNum, 10000);
+ }
+ }
+
+ private void removeFiles() throws IOException {
+ for (TsFileResource tsFileResource : seqResources) {
+ if (tsFileResource.getTsFile().exists()) {
+ tsFileResource.remove();
+ }
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+ if (tsFileResource.getTsFile().exists()) {
+ tsFileResource.remove();
+ }
+ }
+ File[] files = FSFactoryProducer.getFSFactory().listFilesBySuffix("target", ".tsfile");
+ for (File file : files) {
+ file.delete();
+ }
+ File[] resourceFiles =
+ FSFactoryProducer.getFSFactory().listFilesBySuffix("target", ".resource");
+ for (File resourceFile : resourceFiles) {
+ resourceFile.delete();
+ }
+ FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
+ FileReaderManager.getInstance().stop();
+ }
+
+ void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum, long valueOffset)
+ throws IOException, WriteProcessException {
+ TsFileWriter fileWriter = new TsFileWriter(tsFileResource.getTsFile());
+ for (String deviceId : deviceIds) {
+ for (MeasurementSchema measurementSchema : measurementSchemas) {
+ fileWriter.registerTimeseries(
+ new Path(deviceId, measurementSchema.getMeasurementId()), measurementSchema);
+ }
+ }
+ for (long i = timeOffset; i < timeOffset + ptNum; i++) {
+ for (int j = 0; j < deviceNum; j++) {
+ TSRecord record = new TSRecord(i, deviceIds[j]);
+ for (int k = 0; k < measurementNum; k++) {
+ record.addTuple(
+ DataPoint.getDataPoint(
+ measurementSchemas[k].getType(),
+ measurementSchemas[k].getMeasurementId(),
+ String.valueOf(i + valueOffset)));
+ }
+ fileWriter.write(record);
+ tsFileResource.updateStartTime(deviceIds[j], i);
+ tsFileResource.updateEndTime(deviceIds[j], i);
+ }
+ if ((i + 1) % flushInterval == 0) {
+ fileWriter.flushAllChunkGroups();
+ }
+ }
+ fileWriter.close();
+ }
- @Override
@Before
public void setUp() throws IOException, WriteProcessException, MetadataException {
- super.setUp();
- tempSGDir = new File(TestConstant.OUTPUT_DATA_DIR.concat("tempSG"));
+ IoTDB.metaManager.init();
+ prevMergeChunkThreshold =
+ IoTDBDescriptor.getInstance().getConfig().getMergeChunkPointNumberThreshold();
+ IoTDBDescriptor.getInstance().getConfig().setMergeChunkPointNumberThreshold(-1);
+ prepareSeries();
+ prepareFiles(seqFileNum, unseqFileNum);
+ tempSGDir = new File(TestConstant.SEQUENCE_DATA_DIR.concat("tempSG"));
tempSGDir.mkdirs();
}
- @Override
@After
public void tearDown() throws IOException, StorageEngineException {
- super.tearDown();
+ removeFiles();
+ seqResources.clear();
+ unseqResources.clear();
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setMergeChunkPointNumberThreshold(prevMergeChunkThreshold);
+ ChunkCache.getInstance().clear();
+ TimeSeriesMetadataCache.getInstance().clear();
+ IoTDB.metaManager.clear();
+ EnvironmentUtils.cleanAllDir();
FileUtils.deleteDirectory(tempSGDir);
}
@@ -86,7 +254,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
public void testCompactionRecoverWithUncompletedTargetFileAndLog()
throws IOException, IllegalPathException {
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
- new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, "0", tempSGDir.getPath());
levelCompactionTsFileManagement.addAll(seqResources, true);
levelCompactionTsFileManagement.addAll(unseqResources, false);
QueryContext context = new QueryContext();
@@ -95,6 +263,8 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
deviceIds[0]
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
+ ChunkCache.getInstance().clear();
+ TimeSeriesMetadataCache.getInstance().clear();
IBatchReader tsFilesReader =
new SeriesRawDataBatchReader(
path,
@@ -114,7 +284,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
}
}
tsFilesReader.close();
- assertEquals(500, count);
+ assertEquals(600, count);
CompactionLogger compactionLogger =
new CompactionLogger(tempSGDir.getPath(), COMPACTION_TEST_SG);
@@ -124,7 +294,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
compactionLogger.logSequence(true);
deleteFileIfExists(
new File(
- TestConstant.OUTPUT_DATA_DIR.concat(
+ TestConstant.SEQUENCE_DATA_DIR.concat(
0
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
@@ -136,7 +306,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
TsFileResource targetTsFileResource =
new TsFileResource(
new File(
- TestConstant.OUTPUT_DATA_DIR.concat(
+ TestConstant.SEQUENCE_DATA_DIR.concat(
0
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
@@ -194,6 +364,8 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
deviceIds[0]
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
+ ChunkCache.getInstance().clear();
+ TimeSeriesMetadataCache.getInstance().clear();
tsFilesReader =
new SeriesRawDataBatchReader(
path,
@@ -213,7 +385,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
}
}
tsFilesReader.close();
- assertEquals(500, count);
+ assertEquals(600, count);
}
/** compaction recover merge finished */
@@ -221,7 +393,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
public void testRecoverCompleteTargetFileAndCompactionLog()
throws IOException, IllegalPathException {
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
- new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, "0", tempSGDir.getPath());
levelCompactionTsFileManagement.addAll(seqResources, true);
levelCompactionTsFileManagement.addAll(unseqResources, false);
QueryContext context = new QueryContext();
@@ -230,6 +402,8 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
deviceIds[0]
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
+ ChunkCache.getInstance().clear();
+ TimeSeriesMetadataCache.getInstance().clear();
IBatchReader tsFilesReader =
new SeriesRawDataBatchReader(
path,
@@ -249,7 +423,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
}
}
tsFilesReader.close();
- assertEquals(500, count);
+ assertEquals(600, count);
CompactionLogger compactionLogger =
new CompactionLogger(tempSGDir.getPath(), COMPACTION_TEST_SG);
@@ -259,7 +433,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
compactionLogger.logSequence(true);
deleteFileIfExists(
new File(
- TestConstant.OUTPUT_DATA_DIR.concat(
+ TestConstant.SEQUENCE_DATA_DIR.concat(
0
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
@@ -271,7 +445,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
TsFileResource targetTsFileResource =
new TsFileResource(
new File(
- TestConstant.OUTPUT_DATA_DIR.concat(
+ TestConstant.SEQUENCE_DATA_DIR.concat(
0
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
@@ -299,6 +473,8 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
deviceIds[0]
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
+ ChunkCache.getInstance().clear();
+ TimeSeriesMetadataCache.getInstance().clear();
tsFilesReader =
new SeriesRawDataBatchReader(
path,
@@ -318,7 +494,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
}
}
tsFilesReader.close();
- assertEquals(500, count);
+ assertEquals(600, count);
}
/** compeleted target file, and not resource files, compaction log exists */
@@ -326,7 +502,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
public void testCompactionRecoverWithCompletedTargetFileAndLog()
throws IOException, IllegalPathException {
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
- new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, "0", tempSGDir.getPath());
levelCompactionTsFileManagement.addAll(seqResources, true);
levelCompactionTsFileManagement.addAll(unseqResources, false);
QueryContext context = new QueryContext();
@@ -335,6 +511,8 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
deviceIds[0]
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
+ ChunkCache.getInstance().clear();
+ TimeSeriesMetadataCache.getInstance().clear();
IBatchReader tsFilesReader =
new SeriesRawDataBatchReader(
path,
@@ -354,7 +532,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
}
}
tsFilesReader.close();
- assertEquals(500, count);
+ assertEquals(600, count);
CompactionLogger compactionLogger =
new CompactionLogger(tempSGDir.getPath(), COMPACTION_TEST_SG);
@@ -364,7 +542,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
compactionLogger.logSequence(true);
deleteFileIfExists(
new File(
- TestConstant.OUTPUT_DATA_DIR.concat(
+ TestConstant.SEQUENCE_DATA_DIR.concat(
0
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
@@ -376,7 +554,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
TsFileResource targetTsFileResource =
new TsFileResource(
new File(
- TestConstant.OUTPUT_DATA_DIR.concat(
+ TestConstant.SEQUENCE_DATA_DIR.concat(
0
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
@@ -408,12 +586,15 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
deviceIds[0]
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
+ List<TsFileResource> resources = levelCompactionTsFileManagement.getTsFileList(true);
+ ChunkCache.getInstance().clear();
+ TimeSeriesMetadataCache.getInstance().clear();
tsFilesReader =
new SeriesRawDataBatchReader(
path,
measurementSchemas[0].getType(),
context,
- levelCompactionTsFileManagement.getTsFileList(true),
+ resources,
new ArrayList<>(),
null,
null,
@@ -427,7 +608,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
}
}
tsFilesReader.close();
- assertEquals(500, count);
+ assertEquals(600, count);
}
/** compeleted target file, and not resource files, compaction log exists */
@@ -435,7 +616,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
public void testCompactionRecoverWithCompletedTargetFile()
throws IOException, IllegalPathException {
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
- new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, "0", tempSGDir.getPath());
levelCompactionTsFileManagement.addAll(seqResources, true);
levelCompactionTsFileManagement.addAll(unseqResources, false);
QueryContext context = new QueryContext();
@@ -444,6 +625,8 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
deviceIds[0]
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
+ ChunkCache.getInstance().clear();
+ TimeSeriesMetadataCache.getInstance().clear();
IBatchReader tsFilesReader =
new SeriesRawDataBatchReader(
path,
@@ -463,7 +646,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
}
}
tsFilesReader.close();
- assertEquals(500, count);
+ assertEquals(600, count);
CompactionLogger compactionLogger =
new CompactionLogger(tempSGDir.getPath(), COMPACTION_TEST_SG);
@@ -473,7 +656,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
compactionLogger.logSequence(true);
deleteFileIfExists(
new File(
- TestConstant.OUTPUT_DATA_DIR.concat(
+ TestConstant.SEQUENCE_DATA_DIR.concat(
0
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
@@ -485,7 +668,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
TsFileResource targetTsFileResource =
new TsFileResource(
new File(
- TestConstant.OUTPUT_DATA_DIR.concat(
+ TestConstant.SEQUENCE_DATA_DIR.concat(
0
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
@@ -535,6 +718,8 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
deviceIds[0]
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
+ ChunkCache.getInstance().clear();
+ TimeSeriesMetadataCache.getInstance().clear();
tsFilesReader =
new SeriesRawDataBatchReader(
path,
@@ -554,7 +739,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
}
}
tsFilesReader.close();
- assertEquals(500, count);
+ assertEquals(600, count);
}
/** compaction recover merge finished,unseq */
@@ -565,7 +750,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
IoTDBDescriptor.getInstance().getConfig().setUnseqLevelNum(2);
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
- new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, "0", tempSGDir.getPath());
levelCompactionTsFileManagement.addAll(seqResources, true);
levelCompactionTsFileManagement.addAll(seqResources, false);
QueryContext context = new QueryContext();
@@ -574,6 +759,8 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
deviceIds[0]
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
+ ChunkCache.getInstance().clear();
+ TimeSeriesMetadataCache.getInstance().clear();
IBatchReader tsFilesReader =
new SeriesRawDataBatchReader(
path,
@@ -593,7 +780,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
}
}
tsFilesReader.close();
- assertEquals(500, count);
+ assertEquals(600, count);
CompactionLogger compactionLogger =
new CompactionLogger(tempSGDir.getPath(), COMPACTION_TEST_SG);
@@ -603,7 +790,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
compactionLogger.logSequence(false);
deleteFileIfExists(
new File(
- TestConstant.OUTPUT_DATA_DIR.concat(
+ TestConstant.SEQUENCE_DATA_DIR.concat(
0
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
@@ -615,7 +802,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
TsFileResource targetTsFileResource =
new TsFileResource(
new File(
- TestConstant.OUTPUT_DATA_DIR.concat(
+ TestConstant.SEQUENCE_DATA_DIR.concat(
0
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
@@ -643,6 +830,8 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
deviceIds[0]
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
+ ChunkCache.getInstance().clear();
+ TimeSeriesMetadataCache.getInstance().clear();
tsFilesReader =
new SeriesRawDataBatchReader(
path,
@@ -662,7 +851,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
}
}
tsFilesReader.close();
- assertEquals(500, count);
+ assertEquals(600, count);
IoTDBDescriptor.getInstance().getConfig().setUnseqLevelNum(prevUnseqLevelNum);
}
@@ -672,7 +861,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
public void testCompactionMergeRecoverMergeStartSourceLog()
throws IOException, IllegalPathException {
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
- new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, "0", tempSGDir.getPath());
levelCompactionTsFileManagement.addAll(seqResources, true);
levelCompactionTsFileManagement.addAll(unseqResources, false);
CompactionLogger compactionLogger =
@@ -688,6 +877,8 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
deviceIds[0]
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
+ ChunkCache.getInstance().clear();
+ TimeSeriesMetadataCache.getInstance().clear();
IBatchReader tsFilesReader =
new SeriesRawDataBatchReader(
path,
@@ -707,7 +898,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
}
}
tsFilesReader.close();
- assertEquals(500, count);
+ assertEquals(600, count);
}
/** compaction recover merge start just log source file and sequence flag */
@@ -715,7 +906,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
public void testCompactionMergeRecoverMergeStartSequenceLog()
throws IOException, IllegalPathException {
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
- new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, "0", tempSGDir.getPath());
levelCompactionTsFileManagement.addAll(seqResources, true);
levelCompactionTsFileManagement.addAll(unseqResources, false);
CompactionLogger compactionLogger =
@@ -743,6 +934,8 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
null,
true);
int count = 0;
+ ChunkCache.getInstance().clear();
+ TimeSeriesMetadataCache.getInstance().clear();
while (tsFilesReader.hasNextBatch()) {
BatchData batchData = tsFilesReader.nextBatch();
for (int i = 0; i < batchData.length(); i++) {
@@ -751,14 +944,14 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
}
}
tsFilesReader.close();
- assertEquals(500, count);
+ assertEquals(600, count);
}
/** compaction recover merge start target file logged */
@Test
public void testCompactionMergeRecoverMergeStart() throws IOException, IllegalPathException {
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
- new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, "0", tempSGDir.getPath());
levelCompactionTsFileManagement.addAll(seqResources, true);
levelCompactionTsFileManagement.addAll(unseqResources, false);
CompactionLogger compactionLogger =
@@ -769,7 +962,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
compactionLogger.logSequence(true);
deleteFileIfExists(
new File(
- TestConstant.OUTPUT_DATA_DIR.concat(
+ TestConstant.SEQUENCE_DATA_DIR.concat(
0
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
@@ -781,7 +974,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
TsFileResource targetTsFileResource =
new TsFileResource(
new File(
- TestConstant.OUTPUT_DATA_DIR.concat(
+ TestConstant.SEQUENCE_DATA_DIR.concat(
0
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
@@ -800,6 +993,8 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
deviceIds[0]
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
+ ChunkCache.getInstance().clear();
+ TimeSeriesMetadataCache.getInstance().clear();
IBatchReader tsFilesReader =
new SeriesRawDataBatchReader(
path,
@@ -819,7 +1014,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
}
}
tsFilesReader.close();
- assertEquals(500, count);
+ assertEquals(600, count);
}
/** compaction recover merge finished but no finish log */
@@ -827,7 +1022,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
public void testCompactionMergeRecoverMergeFinishedNoLog()
throws IOException, IllegalPathException {
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
- new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, "0", tempSGDir.getPath());
levelCompactionTsFileManagement.addAll(seqResources, true);
levelCompactionTsFileManagement.addAll(unseqResources, false);
CompactionLogger compactionLogger =
@@ -838,7 +1033,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
compactionLogger.logSequence(true);
deleteFileIfExists(
new File(
- TestConstant.OUTPUT_DATA_DIR.concat(
+ TestConstant.SEQUENCE_DATA_DIR.concat(
0
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
@@ -850,7 +1045,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
TsFileResource targetTsFileResource =
new TsFileResource(
new File(
- TestConstant.OUTPUT_DATA_DIR.concat(
+ TestConstant.SEQUENCE_DATA_DIR.concat(
0
+ IoTDBConstant.FILE_NAME_SEPARATOR
+ 0
@@ -878,6 +1073,8 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
deviceIds[0]
+ TsFileConstant.PATH_SEPARATOR
+ measurementSchemas[0].getMeasurementId());
+ ChunkCache.getInstance().clear();
+ TimeSeriesMetadataCache.getInstance().clear();
IBatchReader tsFilesReader =
new SeriesRawDataBatchReader(
path,
@@ -897,7 +1094,7 @@ public class LevelCompactionRecoverTest extends LevelCompactionTest {
}
}
tsFilesReader.close();
- assertEquals(500, count);
+ assertEquals(600, count);
}
public void deleteFileIfExists(File file) {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionSelectorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionSelectorTest.java
index f58ef9f..2e77f73 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionSelectorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionSelectorTest.java
@@ -62,7 +62,7 @@ public class LevelCompactionSelectorTest extends LevelCompactionTest {
public void testCompactionSelector()
throws NoSuchFieldException, IllegalAccessException, IOException {
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
- new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, "0", tempSGDir.getPath());
levelCompactionTsFileManagement.addAll(seqResources, true);
levelCompactionTsFileManagement.addAll(unseqResources, false);
levelCompactionTsFileManagement.forkCurrentFileList(0);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTsFileManagementTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTsFileManagementTest.java
index 7fd242a..853e133 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTsFileManagementTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTsFileManagementTest.java
@@ -66,7 +66,7 @@ public class LevelCompactionTsFileManagementTest extends LevelCompactionTest {
@Test
public void testAddRemoveAndIterator() throws IOException {
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
- new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, "0", tempSGDir.getPath());
for (TsFileResource tsFileResource : seqResources) {
levelCompactionTsFileManagement.add(tsFileResource, true);
}
@@ -163,7 +163,7 @@ public class LevelCompactionTsFileManagementTest extends LevelCompactionTest {
@Test
public void testIteratorRemove() throws IOException {
LevelCompactionTsFileManagement levelCompactionTsFileManagement =
- new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+ new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, "0", tempSGDir.getPath());
for (TsFileResource tsFileResource : seqResources) {
levelCompactionTsFileManagement.add(tsFileResource, true);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java
index 8106e45..eac9b88 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java
@@ -67,7 +67,7 @@ public class NoCompactionTsFileManagementTest extends LevelCompactionTest {
@Test
public void testAddRemoveAndIterator() {
NoCompactionTsFileManagement noCompactionTsFileManagement =
- new NoCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+ new NoCompactionTsFileManagement(COMPACTION_TEST_SG, "0", tempSGDir.getPath());
for (TsFileResource tsFileResource : seqResources) {
noCompactionTsFileManagement.add(tsFileResource, true);
}
@@ -170,7 +170,7 @@ public class NoCompactionTsFileManagementTest extends LevelCompactionTest {
@Test
public void testIteratorRemove() {
NoCompactionTsFileManagement noCompactionTsFileManagement =
- new NoCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath());
+ new NoCompactionTsFileManagement(COMPACTION_TEST_SG, "0", tempSGDir.getPath());
for (TsFileResource tsFileResource : seqResources) {
noCompactionTsFileManagement.add(tsFileResource, true);
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 3eb9d09..aee2d92 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -947,7 +947,9 @@ public class TsFileSequenceReader implements AutoCloseable {
* @param newSchema the schema on each time series in the file
* @param chunkGroupMetadataList ChunkGroupMetadata List
* @param fastFinish if true and the file is complete, then newSchema and chunkGroupMetadataList
- * parameter will be not modified.
+ * will not be loaded
+ * @param loadLastChunkMetadata construct the ChunkMetadataList of last ChunkGroup. Notice, assure
+ * the last ChunkGroup is complete if set this parameter to true!
* @return the position of the file that is fine. All data after the position in the file should
* be truncated.
*/
@@ -955,7 +957,8 @@ public class TsFileSequenceReader implements AutoCloseable {
public long selfCheck(
Map<Path, MeasurementSchema> newSchema,
List<ChunkGroupMetadata> chunkGroupMetadataList,
- boolean fastFinish)
+ boolean fastFinish,
+ boolean loadLastChunkMetadata)
throws IOException {
File checkFile = FSFactoryProducer.getFSFactory().getFile(this.file);
long fileSize;
@@ -1142,6 +1145,13 @@ public class TsFileSequenceReader implements AutoCloseable {
this.position(),
e.getMessage());
}
+ if (loadLastChunkMetadata
+ && !chunkGroupMetadataList
+ .get(chunkGroupMetadataList.size() - 1)
+ .getDevice()
+ .equals(lastDeviceId)) {
+ chunkGroupMetadataList.add(new ChunkGroupMetadata(lastDeviceId, chunkMetadataList));
+ }
// Despite the completeness of the data section, we will discard current FileMetadata
// so that we can continue to write data into this tsfile.
return truncatedSize;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index 8ee5283..54e00ed 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -91,7 +91,7 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
if (file.exists()) {
try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) {
- truncatedSize = reader.selfCheck(knownSchemas, chunkGroupMetadataList, true);
+ truncatedSize = reader.selfCheck(knownSchemas, chunkGroupMetadataList, true, false);
minPlanIndex = reader.getMinPlanIndex();
maxPlanIndex = reader.getMaxPlanIndex();
if (truncatedSize == TsFileCheckStatus.COMPLETE_FILE) {
@@ -112,7 +112,8 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
}
}
- public RestorableTsFileIOWriter(File file, boolean truncate) throws IOException {
+ public RestorableTsFileIOWriter(File file, boolean truncate, boolean appendLastChunkGroupMetadata)
+ throws IOException {
if (logger.isDebugEnabled()) {
logger.debug("{} is opened.", file.getName());
}
@@ -130,7 +131,9 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
if (file.exists()) {
try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) {
- truncatedSize = reader.selfCheck(knownSchemas, chunkGroupMetadataList, true);
+ truncatedSize =
+ reader.selfCheck(
+ knownSchemas, chunkGroupMetadataList, true, appendLastChunkGroupMetadata);
minPlanIndex = reader.getMinPlanIndex();
maxPlanIndex = reader.getMaxPlanIndex();
if (truncatedSize == TsFileCheckStatus.COMPLETE_FILE) {