You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/05/19 09:09:06 UTC

[iotdb] branch rel/0.11 updated: [To rel/0.11] Fix continuous compaction run into dead loop when unseq compaction cannot select candidates (#3207)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.11 by this push:
     new f100441  [To rel/0.11] Fix continuous compaction run into dead loop when unseq compaction cannot select candidates (#3207)
f100441 is described below

commit f100441e14f0978519652155d7c5a11a1e42914e
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Wed May 19 17:08:45 2021 +0800

    [To rel/0.11] Fix continuous compaction run into dead loop when unseq compaction cannot select candidates (#3207)
    
    Co-authored-by: zhanglingzhe <su...@foxmail.com>
---
 .../iotdb/db/engine/compaction/TsFileManagement.java | 20 +++++++++++---------
 .../engine/storagegroup/StorageGroupProcessor.java   | 11 +++++++++--
 2 files changed, 20 insertions(+), 11 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 2e237ea..a137e4e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -190,15 +190,6 @@ public abstract class TsFileManagement {
       List<TsFileResource> seqMergeList,
       List<TsFileResource> unSeqMergeList,
       long dataTTL) {
-    if (isUnseqMerging) {
-      if (logger.isInfoEnabled()) {
-        logger.info(
-            "{} Last merge is ongoing, currently consumed time: {}ms",
-            storageGroupName,
-            (System.currentTimeMillis() - mergeStartTime));
-      }
-      return false;
-    }
     // wait until seq merge has finished
     while (isSeqMerging) {
       try {
@@ -271,6 +262,17 @@ public abstract class TsFileManagement {
             mergeFiles[0].size(),
             mergeFiles[1].size());
       }
+      // wait until unseq merge has finished
+      while (isUnseqMerging) {
+        try {
+          Thread.sleep(200);
+        } catch (InterruptedException e) {
+          logger.error("{} [Compaction] shutdown", storageGroupName, e);
+          Thread.currentThread().interrupt();
+          return false;
+        }
+      }
+
       return true;
     } catch (MergeException | IOException e) {
       logger.error("{} cannot select file for merge", storageGroupName, e);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 1440a51..e5cdc81 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -372,6 +372,14 @@ public class StorageGroupProcessor {
           .putAll(endTimeMap);
       globalLatestFlushedTimeForEachDevice.putAll(endTimeMap);
     }
+
+    if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()
+        && seqTsFileResources.size() > 0) {
+      for (long timePartitionId : partitionLatestFlushedTimeForEachDevice.keySet()) {
+        executeCompaction(
+            timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+      }
+    }
   }
 
   private void recoverCompaction() {
@@ -1883,11 +1891,10 @@ public class StorageGroupProcessor {
 
   /** close compaction merge callback, to release some locks */
   private void closeCompactionMergeCallBack(boolean isMerge, long timePartitionId) {
+    this.compactionMergeWorking = false;
     if (isMerge && IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
       executeCompaction(
           timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
-    } else {
-      this.compactionMergeWorking = false;
     }
   }