You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/11/07 13:22:38 UTC
[iotdb] branch rel/0.11 updated: Fix compaction unseq concurrent
bug (#1982)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new 4708939 Fix compaction unseq concurrent bug (#1982)
4708939 is described below
commit 4708939e882ffac959a435bd3cde66d7aae66ca8
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Sat Nov 7 21:17:19 2020 +0800
Fix compaction unseq concurrent bug (#1982)
(cherry picked from commit bf3a4c794907e3116ab7b75c162e827614148114)
---
.../level/LevelCompactionTsFileManagement.java | 13 +++++++++----
.../iotdb/db/engine/merge/manage/MergeResource.java | 21 ++++++++++++++-------
2 files changed, 23 insertions(+), 11 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 8dc1517..f046f6f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -438,6 +438,15 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
@SuppressWarnings("squid:S3776")
private void merge(List<List<TsFileResource>> mergeResources, boolean sequence,
long timePartition, int currMaxLevel, int currMaxFileNumInEachLevel) {
+ // wait until unseq merge has finished
+ while (isUnseqMerging) {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ logger.error("{} [Compaction] shutdown", storageGroupName, e);
+ Thread.currentThread().interrupt();
+ }
+ }
long startTimeMillis = System.currentTimeMillis();
try {
logger.info("{} start to filter compaction condition", storageGroupName);
@@ -450,10 +459,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
// do not merge current unseq file level to upper level and just merge all of them to seq file
merge(isForceFullMerge, getTsFileList(true), mergeResources.get(i), Long.MAX_VALUE);
} else {
- // wait until unseq merge has finished
- while (isUnseqMerging) {
- Thread.sleep(200);
- }
for (TsFileResource mergeResource : mergeResources.get(i)) {
compactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
index 344007b..c6baf3c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
@@ -49,8 +49,8 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
/**
- * MergeResource manages files and caches of readers, writers, MeasurementSchemas and
- * modifications to avoid unnecessary object creations and file openings.
+ * MergeResource manages files and caches of readers, writers, MeasurementSchemas and modifications
+ * to avoid unnecessary object creations and file openings.
*/
public class MergeResource {
@@ -75,7 +75,7 @@ public class MergeResource {
}
private boolean filterResource(TsFileResource res) {
- return !res.isDeleted() && res.stillLives(timeLowerBound);
+ return res.getTsFile().exists() && !res.isDeleted() && res.stillLives(timeLowerBound);
}
public MergeResource(Collection<TsFileResource> seqFiles, List<TsFileResource> unseqFiles,
@@ -107,8 +107,9 @@ public class MergeResource {
}
/**
- * Construct a new or get an existing RestorableTsFileIOWriter of a merge temp file for a
- * SeqFile. The path of the merge temp file will be the seqFile's + ".merge".
+ * Construct a new or get an existing RestorableTsFileIOWriter of a merge temp file for a SeqFile.
+ * The path of the merge temp file will be the seqFile's + ".merge".
+ *
* @return A RestorableTsFileIOWriter of a merge temp file for a SeqFile.
*/
public RestorableTsFileIOWriter getMergeFileWriter(TsFileResource resource) throws IOException {
@@ -122,8 +123,9 @@ public class MergeResource {
}
/**
- * Query ChunkMetadata of a timeseries from the given TsFile (seq or unseq). The ChunkMetadata
- * is not cached since it is usually huge.
+ * Query ChunkMetadata of a timeseries from the given TsFile (seq or unseq). The ChunkMetadata is
+ * not cached since it is usually huge.
+ *
* @param path name of the time series
*/
public List<ChunkMetadata> queryChunkMetadata(PartialPath path, TsFileResource seqFile)
@@ -134,6 +136,7 @@ public class MergeResource {
/**
* Construct the a new or get an existing TsFileSequenceReader of a TsFile.
+ *
* @return a TsFileSequenceReader
*/
public TsFileSequenceReader getFileReader(TsFileResource tsFileResource) throws IOException {
@@ -148,6 +151,7 @@ public class MergeResource {
/**
* Construct UnseqResourceMergeReaders of for each timeseries over all seqFiles. The readers are
* not cached since the method is only called once for each timeseries.
+ *
* @param paths names of the timeseries
* @return an array of UnseqResourceMergeReaders each corresponding to a timeseries in paths
*/
@@ -171,6 +175,7 @@ public class MergeResource {
/**
* Get the modifications of a timeseries in the ModificationFile of a TsFile.
+ *
* @param path name of the time series
*/
public List<Modification> getModifications(TsFileResource tsFileResource, PartialPath path) {
@@ -191,6 +196,7 @@ public class MergeResource {
/**
* Remove and close the writer of the merge temp file of a SeqFile. The merge temp file is also
* deleted.
+ *
* @param tsFileResource the SeqFile
*/
public void removeFileAndWriter(TsFileResource tsFileResource) throws IOException {
@@ -203,6 +209,7 @@ public class MergeResource {
/**
* Remove and close the reader of the TsFile. The TsFile is NOT deleted.
+ *
* @param resource the SeqFile
*/
public void removeFileReader(TsFileResource resource) throws IOException {