You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/11/07 13:22:38 UTC

[iotdb] branch rel/0.11 updated: Fix compaction unseq concurrent bug (#1982)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.11 by this push:
     new 4708939  Fix compaction unseq concurrent bug (#1982)
4708939 is described below

commit 4708939e882ffac959a435bd3cde66d7aae66ca8
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Sat Nov 7 21:17:19 2020 +0800

    Fix compaction unseq concurrent bug (#1982)
    
    (cherry picked from commit bf3a4c794907e3116ab7b75c162e827614148114)
---
 .../level/LevelCompactionTsFileManagement.java      | 13 +++++++++----
 .../iotdb/db/engine/merge/manage/MergeResource.java | 21 ++++++++++++++-------
 2 files changed, 23 insertions(+), 11 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 8dc1517..f046f6f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -438,6 +438,15 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
   @SuppressWarnings("squid:S3776")
   private void merge(List<List<TsFileResource>> mergeResources, boolean sequence,
       long timePartition, int currMaxLevel, int currMaxFileNumInEachLevel) {
+    // wait until unseq merge has finished
+    while (isUnseqMerging) {
+      try {
+        Thread.sleep(200);
+      } catch (InterruptedException e) {
+        logger.error("{} [Compaction] shutdown", storageGroupName, e);
+        Thread.currentThread().interrupt();
+      }
+    }
     long startTimeMillis = System.currentTimeMillis();
     try {
       logger.info("{} start to filter compaction condition", storageGroupName);
@@ -450,10 +459,6 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
             // do not merge current unseq file level to upper level and just merge all of them to seq file
             merge(isForceFullMerge, getTsFileList(true), mergeResources.get(i), Long.MAX_VALUE);
           } else {
-            // wait until unseq merge has finished
-            while (isUnseqMerging) {
-              Thread.sleep(200);
-            }
             for (TsFileResource mergeResource : mergeResources.get(i)) {
               compactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile());
             }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
index 344007b..c6baf3c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
@@ -49,8 +49,8 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
 
 /**
- * MergeResource manages files and caches of readers, writers, MeasurementSchemas and
- * modifications to avoid unnecessary object creations and file openings.
+ * MergeResource manages files and caches of readers, writers, MeasurementSchemas and modifications
+ * to avoid unnecessary object creations and file openings.
  */
 public class MergeResource {
 
@@ -75,7 +75,7 @@ public class MergeResource {
   }
 
   private boolean filterResource(TsFileResource res) {
-    return !res.isDeleted() && res.stillLives(timeLowerBound);
+    return res.getTsFile().exists() && !res.isDeleted() && res.stillLives(timeLowerBound);
   }
 
   public MergeResource(Collection<TsFileResource> seqFiles, List<TsFileResource> unseqFiles,
@@ -107,8 +107,9 @@ public class MergeResource {
   }
 
   /**
-   * Construct a new or get an existing RestorableTsFileIOWriter of a merge temp file for a
-   * SeqFile. The path of the merge temp file will be the seqFile's + ".merge".
+   * Construct a new or get an existing RestorableTsFileIOWriter of a merge temp file for a SeqFile.
+   * The path of the merge temp file will be the seqFile's + ".merge".
+   *
    * @return A RestorableTsFileIOWriter of a merge temp file for a SeqFile.
    */
   public RestorableTsFileIOWriter getMergeFileWriter(TsFileResource resource) throws IOException {
@@ -122,8 +123,9 @@ public class MergeResource {
   }
 
   /**
-   * Query ChunkMetadata of a timeseries from the given TsFile (seq or unseq). The ChunkMetadata
-   * is not cached since it is usually huge.
+   * Query ChunkMetadata of a timeseries from the given TsFile (seq or unseq). The ChunkMetadata is
+   * not cached since it is usually huge.
+   *
    * @param path name of the time series
    */
   public List<ChunkMetadata> queryChunkMetadata(PartialPath path, TsFileResource seqFile)
@@ -134,6 +136,7 @@ public class MergeResource {
 
   /**
    * Construct the a new or get an existing TsFileSequenceReader of a TsFile.
+   *
    * @return a TsFileSequenceReader
    */
   public TsFileSequenceReader getFileReader(TsFileResource tsFileResource) throws IOException {
@@ -148,6 +151,7 @@ public class MergeResource {
   /**
    * Construct UnseqResourceMergeReaders of for each timeseries over all seqFiles. The readers are
    * not cached since the method is only called once for each timeseries.
+   *
    * @param paths names of the timeseries
    * @return an array of UnseqResourceMergeReaders each corresponding to a timeseries in paths
    */
@@ -171,6 +175,7 @@ public class MergeResource {
 
   /**
    * Get the modifications of a timeseries in the ModificationFile of a TsFile.
+   *
    * @param path name of the time series
    */
   public List<Modification> getModifications(TsFileResource tsFileResource, PartialPath path) {
@@ -191,6 +196,7 @@ public class MergeResource {
   /**
    * Remove and close the writer of the merge temp file of a SeqFile. The merge temp file is also
    * deleted.
+   *
    * @param tsFileResource the SeqFile
    */
   public void removeFileAndWriter(TsFileResource tsFileResource) throws IOException {
@@ -203,6 +209,7 @@ public class MergeResource {
 
   /**
    * Remove and close the reader of the TsFile. The TsFile is NOT deleted.
+   *
    * @param resource the SeqFile
    */
   public void removeFileReader(TsFileResource resource) throws IOException {