You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/03/03 03:47:07 UTC

[iotdb] branch rel/1.0 updated: [To rel/1.0][IOTDB-5147]Optimize compaction schedule when priority is BALANCE (#9172)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.0 by this push:
     new 482922e8c4 [To rel/1.0][IOTDB-5147]Optimize compaction schedule when priority is BALANCE (#9172)
482922e8c4 is described below

commit 482922e8c40b0e545faf96a29af7173a8515b331
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Fri Mar 3 11:47:00 2023 +0800

    [To rel/1.0][IOTDB-5147]Optimize compaction schedule when priority is BALANCE (#9172)
---
 docs/UserGuide/Reference/Common-Config-Manual.md   |   8 ++
 .../zh/UserGuide/Reference/Common-Config-Manual.md |  31 +++--
 .../resources/conf/iotdb-common.properties         |   8 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  11 ++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   5 +
 .../execute/task/InnerSpaceCompactionTask.java     |   9 +-
 .../compaction/schedule/CompactionScheduler.java   | 131 ++++++++++++---------
 .../compaction/schedule/CompactionTaskManager.java |  13 +-
 .../DefaultCompactionTaskComparatorImpl.java       |  21 ++--
 .../impl/CrossSpaceCompactionCandidate.java        |   6 +-
 .../impl/RewriteCrossSpaceCompactionSelector.java  |  63 +++++-----
 .../impl/SizeTieredCompactionSelector.java         |  26 +++-
 .../engine/compaction/CompactionSchedulerTest.java |   5 +-
 .../CompactionSchedulerWithFastPerformerTest.java  |   8 +-
 .../inner/InnerCompactionSchedulerTest.java        |   6 +-
 15 files changed, 220 insertions(+), 131 deletions(-)

diff --git a/docs/UserGuide/Reference/Common-Config-Manual.md b/docs/UserGuide/Reference/Common-Config-Manual.md
index fdeb08dc26..b3bab3f74f 100644
--- a/docs/UserGuide/Reference/Common-Config-Manual.md
+++ b/docs/UserGuide/Reference/Common-Config-Manual.md
@@ -1017,6 +1017,14 @@ Different configuration parameters take effect in the following three ways:
 |Default| true                                                            |
 |Effective| After restart system                                            |
 
+* candidate\_compaction\_task\_queue\_size
+
+|Name| candidate\_compaction\_task\_queue\_size    |
+|:---:|:--------------------------------------------|
+|Description| The size of candidate compaction task queue |
+|Type| Int32                                       |
+|Default| 50                                          |
+|Effective| After restart system                        |
 
 ### Write Ahead Log Configuration
 
diff --git a/docs/zh/UserGuide/Reference/Common-Config-Manual.md b/docs/zh/UserGuide/Reference/Common-Config-Manual.md
index 0c6c28fcfb..1baea96f92 100644
--- a/docs/zh/UserGuide/Reference/Common-Config-Manual.md
+++ b/docs/zh/UserGuide/Reference/Common-Config-Manual.md
@@ -1056,21 +1056,30 @@ IoTDB ConfigNode 和 DataNode 的公共配置参数位于 `conf` 目录下。
 
 * sub\_compaction\_thread\_count
 
-|名字| sub\_compaction\_thread\_count |
-|:---:|:--|
-|描述| 每个跨空间合并任务的子任务线程数 |
-|类型| Int32 |
-|默认值| 4 |
-|改后生效方式| 重启服务生效|
+|名字| sub\_compaction\_thread\_count  |
+|:---:|:--------------------------------|
+|描述| 每个合并任务的子任务线程数,只对跨空间合并和乱序空间内合并生效 |
+|类型| int32                           |
+|默认值| 4                               |
+|改后生效方式| 重启服务生效                          |
 
 * enable\_compaction\_validation
 
 |名字| enable\_compaction\_validation |
-|:---:|:--|
-|描述| 开启合并结束后对顺序文件时间范围的检查 |
-|类型| Boolean |
-|默认值| true |
-|改后生效方式| 重启服务生效|
+|:---:|:-------------------------------|
+|描述| 开启合并结束后对顺序文件时间范围的检查            |
+|类型| Boolean                        |
+|默认值| true                           |
+|改后生效方式| 重启服务生效                         |
+
+* candidate\_compaction\_task\_queue\_size
+
+|名字| candidate\_compaction\_task\_queue\_size |
+|:---:|:-----------------------------------------|
+|描述| 合并任务优先级队列的大小                             |
+|类型| int32                                    |
+|默认值| 50                                       |
+|改后生效方式| 重启服务生效                                   |
 
 ### 写前日志配置
 
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 1254cca9a4..ef2b030ec0 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -573,6 +573,10 @@ cluster_name=defaultCluster
 # BALANCE: alternate two compaction types
 # compaction_priority=BALANCE
 
+# The size of candidate compaction task queue.
+# Datatype: int
+# candidate_compaction_task_queue_size = 50
+
 # The target tsfile size in compaction
 # Datatype: long, Unit: byte
 # target_compaction_file_size=1073741824
@@ -726,9 +730,9 @@ cluster_name=defaultCluster
 # Datatype: int
 # page_size_in_byte=65536
 
-# The maximum number of data points in a page, default 1024*1024
+# The maximum number of data points in a page, default 10000
 # Datatype: int
-# max_number_of_points_in_page=1048576
+# max_number_of_points_in_page=10000
 
 # The threshold for pattern matching in regex
 # Datatype: int
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 06cf83ce33..3bdb0c667d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -498,6 +498,9 @@ public class IoTDBConfig {
 
   private boolean enableCompactionValidation = true;
 
+  /** The size of candidate compaction task queue. */
+  private int candidateCompactionTaskQueueSize = 50;
+
   /** whether to cache meta data(ChunkMetaData and TsFileMetaData) or not. */
   private boolean metaDataCacheEnable = true;
 
@@ -3657,6 +3660,14 @@ public class IoTDBConfig {
     this.enableCompactionValidation = enableCompactionValidation;
   }
 
+  public int getCandidateCompactionTaskQueueSize() {
+    return candidateCompactionTaskQueueSize;
+  }
+
+  public void setCandidateCompactionTaskQueueSize(int candidateCompactionTaskQueueSize) {
+    this.candidateCompactionTaskQueueSize = candidateCompactionTaskQueueSize;
+  }
+
   public boolean isEnableAuditLog() {
     return enableAuditLog;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index d2450fa457..a88e65b1a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -669,6 +669,11 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "enable_compaction_validation",
                 Boolean.toString(conf.isEnableCompactionValidation()))));
+    conf.setCandidateCompactionTaskQueueSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "candidate_compaction_task_queue_size",
+                Integer.toString(conf.getCandidateCompactionTaskQueueSize()))));
 
     conf.setEnablePartialInsert(
         Boolean.parseBoolean(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
index c1bb3eb505..a9f7aca321 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -110,10 +110,12 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
     // get resource of target file
     String dataDirectory = selectedTsFileResourceList.get(0).getTsFile().getParent();
     LOGGER.info(
-        "{}-{} [Compaction] InnerSpaceCompaction task starts with {} files",
+        "{}-{} [Compaction] {} InnerSpaceCompaction task starts with {} files, total file size is {} MB.",
         storageGroupName,
         dataRegionId,
-        selectedTsFileResourceList.size());
+        sequence ? "Sequence" : "Unsequence",
+        selectedTsFileResourceList.size(),
+        selectedFileSize / 1024 / 1024);
     try {
       targetTsFileResource =
           TsFileNameGenerator.getInnerCompactionTargetFileResource(
@@ -252,10 +254,11 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
 
       double costTime = (System.currentTimeMillis() - startTime) / 1000.0d;
       LOGGER.info(
-          "{}-{} [Compaction] InnerSpaceCompaction task finishes successfully, target file is {},"
+          "{}-{} [Compaction] {} InnerSpaceCompaction task finishes successfully, target file is {},"
               + "time cost is {} s, compaction speed is {} MB/s, {}",
           storageGroupName,
           dataRegionId,
+          sequence ? "Sequence" : "Unsequence",
           targetTsFileResource.getTsFile().getName(),
           costTime,
           selectedFileSize / 1024.0d / 1024.0d / costTime,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java
index 0173423537..61d35f0dde 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java
@@ -34,7 +34,9 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * CompactionScheduler schedules and submits the compaction task periodically, and it counts the
@@ -56,40 +58,22 @@ public class CompactionScheduler {
       return;
     }
     try {
-      tryToSubmitCrossSpaceCompactionTask(
-          tsFileManager.getStorageGroupName(),
-          tsFileManager.getDataRegionId(),
-          timePartition,
-          tsFileManager);
-      tryToSubmitInnerSpaceCompactionTask(
-          tsFileManager.getStorageGroupName(),
-          tsFileManager.getDataRegionId(),
-          timePartition,
-          tsFileManager,
-          true);
-      tryToSubmitInnerSpaceCompactionTask(
-          tsFileManager.getStorageGroupName(),
-          tsFileManager.getDataRegionId(),
-          timePartition,
-          tsFileManager,
-          false);
+      tryToSubmitCrossSpaceCompactionTask(tsFileManager, timePartition);
+      tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition);
     } catch (InterruptedException e) {
       LOGGER.error("Exception occurs when selecting compaction tasks", e);
       Thread.currentThread().interrupt();
     }
   }
 
-  public static void tryToSubmitInnerSpaceCompactionTask(
-      String storageGroupName,
-      String dataRegionId,
-      long timePartition,
-      TsFileManager tsFileManager,
-      boolean sequence)
-      throws InterruptedException {
+  private static List<List<TsFileResource>> selectInnerSpaceCompactionTask(
+      long timePartition, TsFileManager tsFileManager, boolean sequence) {
     if ((!config.isEnableSeqSpaceCompaction() && sequence)
         || (!config.isEnableUnseqSpaceCompaction() && !sequence)) {
-      return;
+      return Collections.emptyList();
     }
+    String storageGroupName = tsFileManager.getStorageGroupName();
+    String dataRegionId = tsFileManager.getDataRegionId();
 
     ICompactionSelector innerSpaceCompactionSelector = null;
     if (sequence) {
@@ -103,44 +87,76 @@ public class CompactionScheduler {
               .getInnerUnsequenceCompactionSelector()
               .createInstance(storageGroupName, dataRegionId, timePartition, tsFileManager);
     }
-    List<List<TsFileResource>> taskList =
-        innerSpaceCompactionSelector.selectInnerSpaceTask(
-            sequence
-                ? tsFileManager.getOrCreateSequenceListByTimePartition(timePartition)
-                : tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition));
-    for (List<TsFileResource> task : taskList) {
-      ICompactionPerformer performer =
-          sequence
-              ? IoTDBDescriptor.getInstance()
-                  .getConfig()
-                  .getInnerSeqCompactionPerformer()
-                  .createInstance()
-              : IoTDBDescriptor.getInstance()
-                  .getConfig()
-                  .getInnerUnseqCompactionPerformer()
-                  .createInstance();
-      CompactionTaskManager.getInstance()
-          .addTaskToWaitingQueue(
-              new InnerSpaceCompactionTask(
-                  timePartition,
-                  tsFileManager,
-                  task,
-                  sequence,
-                  performer,
-                  CompactionTaskManager.currentTaskNum,
-                  tsFileManager.getNextCompactionTaskId()));
+
+    return innerSpaceCompactionSelector.selectInnerSpaceTask(
+        sequence
+            ? tsFileManager.getOrCreateSequenceListByTimePartition(timePartition)
+            : tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition));
+  }
+
+  public static void tryToSubmitInnerSpaceCompactionTask(
+      TsFileManager tsFileManager, long timePartition) throws InterruptedException {
+    List<List<TsFileResource>> seqTaskList =
+        selectInnerSpaceCompactionTask(timePartition, tsFileManager, true);
+    List<List<TsFileResource>> unseqTaskList =
+        selectInnerSpaceCompactionTask(timePartition, tsFileManager, false);
+    int taskFreeSize =
+        config.getCandidateCompactionTaskQueueSize()
+            - CompactionTaskManager.getInstance().getCompactionCandidateTaskCount();
+    int taskSize = Math.max(seqTaskList.size(), unseqTaskList.size());
+    for (int i = 0; i < taskSize; i++) {
+      if (taskFreeSize <= 0) {
+        break;
+      }
+      // submit one seq inner space task
+      if (i < seqTaskList.size()) {
+        submitInnerTask(seqTaskList.get(i), tsFileManager, timePartition, true);
+        taskFreeSize--;
+      }
+
+      // submit one unseq inner space task
+      if (i < unseqTaskList.size()) {
+        submitInnerTask(unseqTaskList.get(i), tsFileManager, timePartition, false);
+        taskFreeSize--;
+      }
     }
   }
 
-  private static void tryToSubmitCrossSpaceCompactionTask(
-      String logicalStorageGroupName,
-      String dataRegionId,
+  private static void submitInnerTask(
+      List<TsFileResource> taskList,
+      TsFileManager tsFileManager,
       long timePartition,
-      TsFileManager tsFileManager)
+      boolean sequence)
       throws InterruptedException {
+    ICompactionPerformer performer =
+        sequence
+            ? IoTDBDescriptor.getInstance()
+                .getConfig()
+                .getInnerSeqCompactionPerformer()
+                .createInstance()
+            : IoTDBDescriptor.getInstance()
+                .getConfig()
+                .getInnerUnseqCompactionPerformer()
+                .createInstance();
+    CompactionTaskManager.getInstance()
+        .addTaskToWaitingQueue(
+            new InnerSpaceCompactionTask(
+                timePartition,
+                tsFileManager,
+                taskList,
+                sequence,
+                performer,
+                CompactionTaskManager.currentTaskNum,
+                tsFileManager.getNextCompactionTaskId()));
+  }
+
+  private static void tryToSubmitCrossSpaceCompactionTask(
+      TsFileManager tsFileManager, long timePartition) throws InterruptedException {
     if (!config.isEnableCrossSpaceCompaction()) {
       return;
     }
+    String logicalStorageGroupName = tsFileManager.getStorageGroupName();
+    String dataRegionId = tsFileManager.getDataRegionId();
     ICrossSpaceSelector crossSpaceCompactionSelector =
         config
             .getCrossCompactionSelector()
@@ -149,7 +165,10 @@ public class CompactionScheduler {
         crossSpaceCompactionSelector.selectCrossSpaceTask(
             tsFileManager.getOrCreateSequenceListByTimePartition(timePartition),
             tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition));
-    List<Long> memoryCost = crossSpaceCompactionSelector.getCompactionMemoryCost();
+    List<Long> memoryCost =
+        taskList.stream()
+            .map(CrossCompactionTaskResource::getTotalMemoryCost)
+            .collect(Collectors.toList());
     for (int i = 0, size = taskList.size(); i < size; ++i) {
       CompactionTaskManager.getInstance()
           .addTaskToWaitingQueue(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
index 76bf9db322..d75401820d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
@@ -58,6 +58,8 @@ public class CompactionTaskManager implements IService {
 
   private static final CompactionTaskManager INSTANCE = new CompactionTaskManager();
 
+  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
   // The thread pool that executes the compaction task. The default number of threads for this pool
   // is 10.
   private WrappedThreadPoolExecutor taskExecutionPool;
@@ -67,7 +69,8 @@ public class CompactionTaskManager implements IService {
 
   public static volatile AtomicInteger currentTaskNum = new AtomicInteger(0);
   private final FixedPriorityBlockingQueue<AbstractCompactionTask> candidateCompactionTaskQueue =
-      new FixedPriorityBlockingQueue<>(1024, new DefaultCompactionTaskComparatorImpl());
+      new FixedPriorityBlockingQueue<>(
+          config.getCandidateCompactionTaskQueueSize(), new DefaultCompactionTaskComparatorImpl());
   // <StorageGroup-DataRegionId,futureSet>, it is used to store all compaction tasks under each
   // virtualStorageGroup
   private final Map<String, Map<AbstractCompactionTask, Future<CompactionTaskSummary>>>
@@ -76,7 +79,6 @@ public class CompactionTaskManager implements IService {
 
   private final RateLimiter mergeWriteRateLimiter = RateLimiter.create(Double.MAX_VALUE);
 
-  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private volatile boolean init = false;
 
   public static CompactionTaskManager getInstance() {
@@ -219,7 +221,8 @@ public class CompactionTaskManager implements IService {
       throws InterruptedException {
     if (init
         && !candidateCompactionTaskQueue.contains(compactionTask)
-        && !isTaskRunning(compactionTask)) {
+        && !isTaskRunning(compactionTask)
+        && candidateCompactionTaskQueue.size() < config.getCandidateCompactionTaskQueueSize()) {
       compactionTask.setSourceFilesToCompactionCandidate();
       candidateCompactionTaskQueue.put(compactionTask);
 
@@ -324,6 +327,10 @@ public class CompactionTaskManager implements IService {
     return getExecutingTaskCount() + candidateCompactionTaskQueue.size();
   }
 
+  public int getCompactionCandidateTaskCount() {
+    return candidateCompactionTaskQueue.size();
+  }
+
   public synchronized List<AbstractCompactionTask> getRunningCompactionTaskList() {
     List<AbstractCompactionTask> tasks = new ArrayList<>();
     for (Map<AbstractCompactionTask, Future<CompactionTaskSummary>> runningTaskMap :
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/comparator/DefaultCompactionTaskComparatorImpl.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/comparator/DefaultCompactionTaskComparatorImpl.java
index 334acae481..d2113a9796 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/comparator/DefaultCompactionTaskComparatorImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/comparator/DefaultCompactionTaskComparatorImpl.java
@@ -37,13 +37,21 @@ public class DefaultCompactionTaskComparatorImpl implements ICompactionTaskCompa
     if ((((o1 instanceof InnerSpaceCompactionTask) && (o2 instanceof CrossSpaceCompactionTask))
         || ((o2 instanceof InnerSpaceCompactionTask)
             && (o1 instanceof CrossSpaceCompactionTask)))) {
-      if (config.getCompactionPriority() != CompactionPriority.CROSS_INNER) {
+      if (config.getCompactionPriority() == CompactionPriority.CROSS_INNER) {
+        // priority is CROSS_INNER
+        return o1 instanceof CrossSpaceCompactionTask ? -1 : 1;
+      } else if (config.getCompactionPriority() == CompactionPriority.INNER_CROSS) {
+        // priority is INNER_CROSS
         return o1 instanceof InnerSpaceCompactionTask ? -1 : 1;
       } else {
-        return o1 instanceof CrossSpaceCompactionTask ? -1 : 1;
+        // priority is BALANCE
+        if (o1.getSerialId() != o2.getSerialId()) {
+          return o1.getSerialId() < o2.getSerialId() ? -1 : 1;
+        } else {
+          return o1 instanceof CrossSpaceCompactionTask ? -1 : 1;
+        }
       }
-    }
-    if (o1 instanceof InnerSpaceCompactionTask) {
+    } else if (o1 instanceof InnerSpaceCompactionTask) {
       return compareInnerSpaceCompactionTask(
           (InnerSpaceCompactionTask) o1, (InnerSpaceCompactionTask) o2);
     } else {
@@ -54,11 +62,6 @@ public class DefaultCompactionTaskComparatorImpl implements ICompactionTaskCompa
 
   public int compareInnerSpaceCompactionTask(
       InnerSpaceCompactionTask o1, InnerSpaceCompactionTask o2) {
-    if (o1.isSequence() ^ o2.isSequence()) {
-      // prioritize sequence file compaction
-      return o1.isSequence() ? -1 : 1;
-    }
-
     // if the sum of compaction count of the selected files are different
     // we prefer to execute task with smaller compaction count
     // this can reduce write amplification
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/CrossSpaceCompactionCandidate.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/CrossSpaceCompactionCandidate.java
index d815967fa2..11a782e3bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/CrossSpaceCompactionCandidate.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/CrossSpaceCompactionCandidate.java
@@ -139,11 +139,9 @@ public class CrossSpaceCompactionCandidate {
   private List<TsFileResourceCandidate> filterUnseqResource(List<TsFileResource> unseqResources) {
     List<TsFileResourceCandidate> ret = new ArrayList<>();
     for (TsFileResource resource : unseqResources) {
-      if (resource.getStatus() != TsFileResourceStatus.CLOSED
-          || !resource.getTsFile().exists()
-          || resource.isDeleted()) {
+      if (resource.getStatus() != TsFileResourceStatus.CLOSED || !resource.getTsFile().exists()) {
         break;
-      } else if (!resource.isDeleted() && resource.stillLives(ttlLowerBound)) {
+      } else if (resource.stillLives(ttlLowerBound)) {
         ret.add(new TsFileResourceCandidate(resource));
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
index 9298f3bb5c..1a57d4ef0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
@@ -46,17 +46,16 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
       LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
-  private final int SELECT_WARN_THRESHOLD = 10;
   protected String logicalStorageGroupName;
   protected String dataRegionId;
   protected long timePartition;
   protected TsFileManager tsFileManager;
 
-  private long totalCost;
+  private static boolean hasPrintedLog = false;
+
   private final long memoryBudget;
   private final int maxCrossCompactionFileNum;
   private final long maxCrossCompactionFileSize;
-  private int seqSelectedNum;
 
   private AbstractCompactionEstimator compactionEstimator;
 
@@ -109,21 +108,13 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
    */
   private CrossCompactionTaskResource selectOneTaskResources(
       CrossSpaceCompactionCandidate candidate) throws MergeException {
-    long startTime = System.currentTimeMillis();
     try {
       LOGGER.debug(
           "Selecting cross compaction task resources from {} seqFile, {} unseqFiles",
           candidate.getSeqFiles().size(),
           candidate.getUnseqFiles().size());
       CrossCompactionTaskResource taskResource = executeTaskResourceSelection(candidate);
-      LOGGER.info(
-          "selected one cross compaction task resource. is valid: {}, {} seqFiles, {} unseqFiles, total memory cost {}, "
-              + "time consumption {}ms",
-          taskResource.isValid(),
-          taskResource.getSeqFiles().size(),
-          taskResource.getUnseqFiles().size(),
-          taskResource.getTotalMemoryCost(),
-          System.currentTimeMillis() - startTime);
+
       return taskResource;
     } catch (IOException e) {
       throw new MergeException(e);
@@ -173,7 +164,7 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
           unseqFile,
           targetSeqFiles,
           memoryCost,
-          totalCost);
+          taskResource.getTotalMemoryCost());
     }
     taskResource.sortSeqFiles(candidate.getSeqFiles());
     return taskResource;
@@ -214,9 +205,12 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
     return false;
   }
 
-  private boolean canSubmitCrossTask() {
-    return config.isEnableCrossSpaceCompaction()
-        && (CompactionTaskManager.currentTaskNum.get() < config.getCompactionThreadCount());
+  private boolean canSubmitCrossTask(
+      List<TsFileResource> sequenceFileList, List<TsFileResource> unsequenceFileList) {
+    return CompactionTaskManager.getInstance().getCompactionCandidateTaskCount()
+            < config.getCandidateCompactionTaskQueueSize()
+        && !sequenceFileList.isEmpty()
+        && !unsequenceFileList.isEmpty();
   }
 
   /**
@@ -230,13 +224,12 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
   @Override
   public List<CrossCompactionTaskResource> selectCrossSpaceTask(
       List<TsFileResource> sequenceFileList, List<TsFileResource> unsequenceFileList) {
-    if (!canSubmitCrossTask()) {
-      return Collections.emptyList();
-    }
-    if (sequenceFileList.isEmpty() || unsequenceFileList.isEmpty()) {
+    if (!canSubmitCrossTask(sequenceFileList, unsequenceFileList)) {
       return Collections.emptyList();
     }
+
     // TODO: (xingtanzjr) need to confirm what this ttl is used for
+    long startTime = System.currentTimeMillis();
     long ttlLowerBound = System.currentTimeMillis() - Long.MAX_VALUE;
     // we record the variable `candidate` here is used for selecting more than one
     // CrossCompactionTaskResources in this method
@@ -245,17 +238,30 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
     try {
       CrossCompactionTaskResource taskResources = selectOneTaskResources(candidate);
       if (!taskResources.isValid()) {
-        LOGGER.info(
-            "{} [Compaction] Cannot select any files, because source files may be occupied by other compaction threads.",
-            logicalStorageGroupName + "-" + dataRegionId);
+        if (!hasPrintedLog) {
+          LOGGER.info(
+              "{} [Compaction] Total source files: {} seqFiles, {} unseqFiles. Candidate source files: {} seqFiles, {} unseqFiles. Cannot select any files because they do not meet the conditions or may be occupied by other compaction threads.",
+              logicalStorageGroupName + "-" + dataRegionId,
+              sequenceFileList.size(),
+              unsequenceFileList.size(),
+              candidate.getSeqFiles().size(),
+              candidate.getUnseqFiles().size());
+          hasPrintedLog = true;
+        }
         return Collections.emptyList();
       }
-
       LOGGER.info(
-          "{} [Compaction] submit a task with {} sequence file and {} unseq files",
+          "{} [Compaction] Total source files: {} seqFiles, {} unseqFiles. Candidate source files: {} seqFiles, {} unseqFiles. Selected source files: {} seqFiles, {} unseqFiles, total memory cost {}, time consumption {}ms.",
           logicalStorageGroupName + "-" + dataRegionId,
+          sequenceFileList.size(),
+          unsequenceFileList.size(),
+          candidate.getSeqFiles().size(),
+          candidate.getUnseqFiles().size(),
           taskResources.getSeqFiles().size(),
-          taskResources.getUnseqFiles().size());
+          taskResources.getUnseqFiles().size(),
+          taskResources.getTotalMemoryCost(),
+          System.currentTimeMillis() - startTime);
+      hasPrintedLog = false;
       return Collections.singletonList(taskResources);
 
     } catch (MergeException e) {
@@ -263,9 +269,4 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
     }
     return Collections.emptyList();
   }
-
-  @Override
-  public List<Long> getCompactionMemoryCost() {
-    return Collections.singletonList(totalCost);
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
index e4b7111505..c61a46485b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
@@ -105,8 +105,11 @@ public class SizeTieredCompactionSelector
       TsFileNameGenerator.TsFileName currentName =
           TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());
       if (currentName.getInnerCompactionCnt() != level) {
+        // meet files of another level
         if (selectedFileList.size() > 1) {
-          taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
+          if (!addOneTaskToQueue(taskPriorityQueue, selectedFileList, selectedFileSize)) {
+            return false;
+          }
           shouldContinueToSearch = false;
         }
         selectedFileList = new ArrayList<>();
@@ -131,7 +134,9 @@ public class SizeTieredCompactionSelector
           || selectedFileList.size() >= config.getMaxInnerCompactionCandidateFileNum()) {
         // submit the task
         if (selectedFileList.size() > 1) {
-          taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
+          if (!addOneTaskToQueue(taskPriorityQueue, selectedFileList, selectedFileSize)) {
+            return false;
+          }
           shouldContinueToSearch = false;
         }
         selectedFileList = new ArrayList<>();
@@ -142,12 +147,25 @@ public class SizeTieredCompactionSelector
     // if next time partition exists
     // submit a merge task even it does not meet the requirement for file num or file size
     if (hasNextTimePartition && selectedFileList.size() > 1) {
-      taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
+      addOneTaskToQueue(taskPriorityQueue, selectedFileList, selectedFileSize);
       shouldContinueToSearch = false;
     }
     return shouldContinueToSearch;
   }
 
+  private boolean addOneTaskToQueue(
+      PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue,
+      List<TsFileResource> selectedFileList,
+      long selectedFileSize) {
+    if (CompactionTaskManager.getInstance().getCompactionCandidateTaskCount()
+            + taskPriorityQueue.size()
+        < config.getCandidateCompactionTaskQueueSize()) {
+      taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
+      return true;
+    }
+    return false;
+  }
+
   /**
    * This method searches for a batch of files to be compacted from layer 0 to the highest layer. If
    * there are more than a batch of files to be merged on a certain layer, it does not search to
@@ -207,7 +225,7 @@ public class SizeTieredCompactionSelector
         if (fileNameOfO1.getInnerCompactionCnt() != fileNameOfO2.getInnerCompactionCnt()) {
           return fileNameOfO2.getInnerCompactionCnt() - fileNameOfO1.getInnerCompactionCnt();
         }
-        return (int) (fileNameOfO2.getVersion() - fileNameOfO1.getVersion());
+        return (int) (fileNameOfO1.getVersion() - fileNameOfO2.getVersion());
       } catch (IOException e) {
         return 0;
       }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
index d8ebdf1121..de737cafbe 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
@@ -86,7 +86,8 @@ public class CompactionSchedulerTest {
     CompactionClearUtils.clearAllCompactionFiles();
     EnvironmentUtils.cleanAllDir();
     File basicOutputDir = new File(TestConstant.BASE_OUTPUT_PATH);
-    IoTDBDescriptor.getInstance().getConfig().setCompactionPriority(CompactionPriority.INNER_CROSS);
+
+    IoTDBDescriptor.getInstance().getConfig().setCompactionPriority(CompactionPriority.BALANCE);
     if (!basicOutputDir.exists()) {
       assertTrue(basicOutputDir.mkdirs());
     }
@@ -112,13 +113,13 @@ public class CompactionSchedulerTest {
 
   @After
   public void tearDown() throws IOException, StorageEngineException {
+    CompactionTaskManager.getInstance().stop();
     new CompactionConfigRestorer().restoreCompactionConfig();
     ChunkCache.getInstance().clear();
     TimeSeriesMetadataCache.getInstance().clear();
     CompactionClearUtils.clearAllCompactionFiles();
     EnvironmentUtils.cleanAllDir();
     CompactionClearUtils.deleteEmptyDir(new File("target"));
-    CompactionTaskManager.getInstance().stop();
   }
 
   /**
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerWithFastPerformerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerWithFastPerformerTest.java
index 15fa53426e..15dd291aba 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerWithFastPerformerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerWithFastPerformerTest.java
@@ -58,7 +58,8 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class CompactionSchedulerWithFastPerformerTest {
-  private static final Logger logger = LoggerFactory.getLogger(CompactionSchedulerTest.class);
+  private static final Logger logger =
+      LoggerFactory.getLogger(CompactionSchedulerWithFastPerformerTest.class);
   static final String COMPACTION_TEST_SG = "root.compactionSchedulerTest_";
   static final long MAX_WAITING_TIME = 60_000;
   static final long SCHEDULE_AGAIN_TIME = 30_000;
@@ -86,7 +87,8 @@ public class CompactionSchedulerWithFastPerformerTest {
     CompactionClearUtils.clearAllCompactionFiles();
     EnvironmentUtils.cleanAllDir();
     File basicOutputDir = new File(TestConstant.BASE_OUTPUT_PATH);
-    IoTDBDescriptor.getInstance().getConfig().setCompactionPriority(CompactionPriority.INNER_CROSS);
+
+    IoTDBDescriptor.getInstance().getConfig().setCompactionPriority(CompactionPriority.BALANCE);
     if (!basicOutputDir.exists()) {
       assertTrue(basicOutputDir.mkdirs());
     }
@@ -112,13 +114,13 @@ public class CompactionSchedulerWithFastPerformerTest {
 
   @After
   public void tearDown() throws IOException, StorageEngineException {
+    CompactionTaskManager.getInstance().stop();
     new CompactionConfigRestorer().restoreCompactionConfig();
     ChunkCache.getInstance().clear();
     TimeSeriesMetadataCache.getInstance().clear();
     CompactionClearUtils.clearAllCompactionFiles();
     EnvironmentUtils.cleanAllDir();
     CompactionClearUtils.deleteEmptyDir(new File("target"));
-    CompactionTaskManager.getInstance().stop();
   }
 
   /**
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
index 66f272bf0a..99a7e98190 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
@@ -94,7 +94,7 @@ public class InnerCompactionSchedulerTest extends AbstractCompactionTest {
     TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp");
     tsFileManager.addAll(seqResources, true);
 
-    CompactionScheduler.tryToSubmitInnerSpaceCompactionTask("testSG", "0", 0L, tsFileManager, true);
+    CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L);
     try {
       Thread.sleep(5000);
     } catch (Exception e) {
@@ -115,7 +115,7 @@ public class InnerCompactionSchedulerTest extends AbstractCompactionTest {
     seqResources.get(0).setStatus(TsFileResourceStatus.COMPACTING);
     TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp");
     tsFileManager.addAll(seqResources, true);
-    CompactionScheduler.tryToSubmitInnerSpaceCompactionTask("testSG", "0", 0L, tsFileManager, true);
+    CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L);
 
     long waitingTime = 0;
     while (CompactionTaskManager.getInstance().getExecutingTaskCount() != 0) {
@@ -144,7 +144,7 @@ public class InnerCompactionSchedulerTest extends AbstractCompactionTest {
     seqResources.get(3).setStatus(TsFileResourceStatus.UNCLOSED);
     TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp");
     tsFileManager.addAll(seqResources, true);
-    CompactionScheduler.tryToSubmitInnerSpaceCompactionTask("testSG", "0", 0L, tsFileManager, true);
+    CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L);
     long waitingTime = 0;
     while (CompactionTaskManager.getInstance().getExecutingTaskCount() != 0) {
       try {