You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/28 11:13:06 UTC

[iotdb] branch master updated: [IOTDB-5147]Optimize compaction schedule when priority is BALANCE (#9163)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ba24ca41c [IOTDB-5147]Optimize compaction schedule when priority is BALANCE (#9163)
2ba24ca41c is described below

commit 2ba24ca41c5823073cd68ca3bb3fefe6de2a4f8a
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Tue Feb 28 19:12:58 2023 +0800

    [IOTDB-5147]Optimize compaction schedule when priority is BALANCE (#9163)
---
 .../execute/task/InnerSpaceCompactionTask.java     |   9 +-
 .../compaction/schedule/CompactionScheduler.java   | 125 ++++++++++++---------
 .../impl/RewriteCrossSpaceCompactionSelector.java  |  42 +++----
 .../impl/SizeTieredCompactionSelector.java         |   2 +-
 .../inner/InnerCompactionSchedulerTest.java        |   6 +-
 5 files changed, 103 insertions(+), 81 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
index c1bb3eb505..a9f7aca321 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -110,10 +110,12 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
     // get resource of target file
     String dataDirectory = selectedTsFileResourceList.get(0).getTsFile().getParent();
     LOGGER.info(
-        "{}-{} [Compaction] InnerSpaceCompaction task starts with {} files",
+        "{}-{} [Compaction] {} InnerSpaceCompaction task starts with {} files, total file size is {} MB.",
         storageGroupName,
         dataRegionId,
-        selectedTsFileResourceList.size());
+        sequence ? "Sequence" : "Unsequence",
+        selectedTsFileResourceList.size(),
+        selectedFileSize / 1024 / 1024);
     try {
       targetTsFileResource =
           TsFileNameGenerator.getInnerCompactionTargetFileResource(
@@ -252,10 +254,11 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
 
       double costTime = (System.currentTimeMillis() - startTime) / 1000.0d;
       LOGGER.info(
-          "{}-{} [Compaction] InnerSpaceCompaction task finishes successfully, target file is {},"
+          "{}-{} [Compaction] {} InnerSpaceCompaction task finishes successfully, target file is {},"
               + "time cost is {} s, compaction speed is {} MB/s, {}",
           storageGroupName,
           dataRegionId,
+          sequence ? "Sequence" : "Unsequence",
           targetTsFileResource.getTsFile().getName(),
           costTime,
           selectedFileSize / 1024.0d / 1024.0d / costTime,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java
index 7d3a5d4d7a..d610fc5a4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -57,40 +58,22 @@ public class CompactionScheduler {
       return;
     }
     try {
-      tryToSubmitCrossSpaceCompactionTask(
-          tsFileManager.getStorageGroupName(),
-          tsFileManager.getDataRegionId(),
-          timePartition,
-          tsFileManager);
-      tryToSubmitInnerSpaceCompactionTask(
-          tsFileManager.getStorageGroupName(),
-          tsFileManager.getDataRegionId(),
-          timePartition,
-          tsFileManager,
-          true);
-      tryToSubmitInnerSpaceCompactionTask(
-          tsFileManager.getStorageGroupName(),
-          tsFileManager.getDataRegionId(),
-          timePartition,
-          tsFileManager,
-          false);
+      tryToSubmitCrossSpaceCompactionTask(tsFileManager, timePartition);
+      tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition);
     } catch (InterruptedException e) {
       LOGGER.error("Exception occurs when selecting compaction tasks", e);
       Thread.currentThread().interrupt();
     }
   }
 
-  public static void tryToSubmitInnerSpaceCompactionTask(
-      String storageGroupName,
-      String dataRegionId,
-      long timePartition,
-      TsFileManager tsFileManager,
-      boolean sequence)
-      throws InterruptedException {
+  private static List<List<TsFileResource>> selectInnerSpaceCompactionTask(
+      long timePartition, TsFileManager tsFileManager, boolean sequence) {
     if ((!config.isEnableSeqSpaceCompaction() && sequence)
         || (!config.isEnableUnseqSpaceCompaction() && !sequence)) {
-      return;
+      return Collections.emptyList();
     }
+    String storageGroupName = tsFileManager.getStorageGroupName();
+    String dataRegionId = tsFileManager.getDataRegionId();
 
     ICompactionSelector innerSpaceCompactionSelector = null;
     if (sequence) {
@@ -104,44 +87,76 @@ public class CompactionScheduler {
               .getInnerUnsequenceCompactionSelector()
               .createInstance(storageGroupName, dataRegionId, timePartition, tsFileManager);
     }
-    List<List<TsFileResource>> taskList =
-        innerSpaceCompactionSelector.selectInnerSpaceTask(
-            sequence
-                ? tsFileManager.getOrCreateSequenceListByTimePartition(timePartition)
-                : tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition));
-    for (List<TsFileResource> task : taskList) {
-      ICompactionPerformer performer =
-          sequence
-              ? IoTDBDescriptor.getInstance()
-                  .getConfig()
-                  .getInnerSeqCompactionPerformer()
-                  .createInstance()
-              : IoTDBDescriptor.getInstance()
-                  .getConfig()
-                  .getInnerUnseqCompactionPerformer()
-                  .createInstance();
-      CompactionTaskManager.getInstance()
-          .addTaskToWaitingQueue(
-              new InnerSpaceCompactionTask(
-                  timePartition,
-                  tsFileManager,
-                  task,
-                  sequence,
-                  performer,
-                  CompactionTaskManager.currentTaskNum,
-                  tsFileManager.getNextCompactionTaskId()));
+
+    return innerSpaceCompactionSelector.selectInnerSpaceTask(
+        sequence
+            ? tsFileManager.getOrCreateSequenceListByTimePartition(timePartition)
+            : tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition));
+  }
+
+  public static void tryToSubmitInnerSpaceCompactionTask(
+      TsFileManager tsFileManager, long timePartition) throws InterruptedException {
+    List<List<TsFileResource>> seqTaskList =
+        selectInnerSpaceCompactionTask(timePartition, tsFileManager, true);
+    List<List<TsFileResource>> unseqTaskList =
+        selectInnerSpaceCompactionTask(timePartition, tsFileManager, false);
+    int taskFreeSize =
+        config.getCandidateCompactionTaskQueueSize()
+            - CompactionTaskManager.getInstance().getCompactionCandidateTaskCount();
+    int taskSize = Math.max(seqTaskList.size(), unseqTaskList.size());
+    for (int i = 0; i < taskSize; i++) {
+      if (taskFreeSize <= 0) {
+        break;
+      }
+      // submit one seq inner space task
+      if (i < seqTaskList.size()) {
+        submitInnerTask(seqTaskList.get(i), tsFileManager, timePartition, true);
+        taskFreeSize--;
+      }
+
+      // submit one unseq inner space task
+      if (i < unseqTaskList.size()) {
+        submitInnerTask(unseqTaskList.get(i), tsFileManager, timePartition, false);
+        taskFreeSize--;
+      }
     }
   }
 
-  private static void tryToSubmitCrossSpaceCompactionTask(
-      String logicalStorageGroupName,
-      String dataRegionId,
+  private static void submitInnerTask(
+      List<TsFileResource> taskList,
+      TsFileManager tsFileManager,
       long timePartition,
-      TsFileManager tsFileManager)
+      boolean sequence)
       throws InterruptedException {
+    ICompactionPerformer performer =
+        sequence
+            ? IoTDBDescriptor.getInstance()
+                .getConfig()
+                .getInnerSeqCompactionPerformer()
+                .createInstance()
+            : IoTDBDescriptor.getInstance()
+                .getConfig()
+                .getInnerUnseqCompactionPerformer()
+                .createInstance();
+    CompactionTaskManager.getInstance()
+        .addTaskToWaitingQueue(
+            new InnerSpaceCompactionTask(
+                timePartition,
+                tsFileManager,
+                taskList,
+                sequence,
+                performer,
+                CompactionTaskManager.currentTaskNum,
+                tsFileManager.getNextCompactionTaskId()));
+  }
+
+  private static void tryToSubmitCrossSpaceCompactionTask(
+      TsFileManager tsFileManager, long timePartition) throws InterruptedException {
     if (!config.isEnableCrossSpaceCompaction()) {
       return;
     }
+    String logicalStorageGroupName = tsFileManager.getStorageGroupName();
+    String dataRegionId = tsFileManager.getDataRegionId();
     ICrossSpaceSelector crossSpaceCompactionSelector =
         config
             .getCrossCompactionSelector()
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
index 23e1b07e87..af3f383202 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
@@ -53,7 +53,7 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
   protected long timePartition;
   protected TsFileManager tsFileManager;
 
-  private boolean hasPrintedLog = false;
+  private static boolean hasPrintedLog = false;
 
   private final long memoryBudget;
   private final int maxCrossCompactionFileNum;
@@ -109,21 +109,13 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
    */
   private CrossCompactionTaskResource selectOneTaskResources(
       CrossSpaceCompactionCandidate candidate) throws MergeException {
-    long startTime = System.currentTimeMillis();
     try {
       LOGGER.debug(
           "Selecting cross compaction task resources from {} seqFile, {} unseqFiles",
           candidate.getSeqFiles().size(),
           candidate.getUnseqFiles().size());
       CrossCompactionTaskResource taskResource = executeTaskResourceSelection(candidate);
-      LOGGER.info(
-          "selected one cross compaction task resource. is valid: {}, {} seqFiles, {} unseqFiles, total memory cost {}, "
-              + "time consumption {}ms",
-          taskResource.isValid(),
-          taskResource.getSeqFiles().size(),
-          taskResource.getUnseqFiles().size(),
-          taskResource.getTotalMemoryCost(),
-          System.currentTimeMillis() - startTime);
+
       return taskResource;
     } catch (IOException e) {
       throw new MergeException(e);
@@ -238,6 +230,7 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
     }
 
     // TODO: (xingtanzjr) need to confirm what this ttl is used for
+    long startTime = System.currentTimeMillis();
     long ttlLowerBound = System.currentTimeMillis() - Long.MAX_VALUE;
     // we record the variable `candidate` here is used for selecting more than one
     // CrossCompactionTaskResources in this method
@@ -245,20 +238,31 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
         new CrossSpaceCompactionCandidate(sequenceFileList, unsequenceFileList, ttlLowerBound);
     try {
       CrossCompactionTaskResource taskResources = selectOneTaskResources(candidate);
-      if (!taskResources.isValid() && !hasPrintedLog) {
-        LOGGER.info(
-            "{} [Compaction] Cannot select any files, because source files may be occupied by other compaction threads.",
-            logicalStorageGroupName + "-" + dataRegionId);
-        hasPrintedLog = true;
+      if (!taskResources.isValid()) {
+        if (!hasPrintedLog) {
+          LOGGER.info(
+              "{} [Compaction] Total source files: {} seqFiles, {} unseqFiles. Candidate source files: {} seqFiles, {} unseqFiles. Cannot select any files because they do not meet the conditions or may be occupied by other compaction threads.",
+              logicalStorageGroupName + "-" + dataRegionId,
+              sequenceFileList.size(),
+              unsequenceFileList.size(),
+              candidate.getSeqFiles().size(),
+              candidate.getUnseqFiles().size());
+          hasPrintedLog = true;
+        }
         return Collections.emptyList();
       }
-      hasPrintedLog = false;
-
       LOGGER.info(
-          "{} [Compaction] submit a task with {} sequence file and {} unseq files",
+          "{} [Compaction] Total source files: {} seqFiles, {} unseqFiles. Candidate source files: {} seqFiles, {} unseqFiles. Selected source files: {} seqFiles, {} unseqFiles, total memory cost {}, time consumption {}ms.",
           logicalStorageGroupName + "-" + dataRegionId,
+          sequenceFileList.size(),
+          unsequenceFileList.size(),
+          candidate.getSeqFiles().size(),
+          candidate.getUnseqFiles().size(),
           taskResources.getSeqFiles().size(),
-          taskResources.getUnseqFiles().size());
+          taskResources.getUnseqFiles().size(),
+          taskResources.getTotalMemoryCost(),
+          System.currentTimeMillis() - startTime);
+      hasPrintedLog = false;
       return Collections.singletonList(taskResources);
 
     } catch (MergeException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
index 94d6f03f58..c61a46485b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
@@ -225,7 +225,7 @@ public class SizeTieredCompactionSelector
         if (fileNameOfO1.getInnerCompactionCnt() != fileNameOfO2.getInnerCompactionCnt()) {
           return fileNameOfO2.getInnerCompactionCnt() - fileNameOfO1.getInnerCompactionCnt();
         }
-        return (int) (fileNameOfO2.getVersion() - fileNameOfO1.getVersion());
+        return (int) (fileNameOfO1.getVersion() - fileNameOfO2.getVersion());
       } catch (IOException e) {
         return 0;
       }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
index 66f272bf0a..99a7e98190 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
@@ -94,7 +94,7 @@ public class InnerCompactionSchedulerTest extends AbstractCompactionTest {
     TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp");
     tsFileManager.addAll(seqResources, true);
 
-    CompactionScheduler.tryToSubmitInnerSpaceCompactionTask("testSG", "0", 0L, tsFileManager, true);
+    CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L);
     try {
       Thread.sleep(5000);
     } catch (Exception e) {
@@ -115,7 +115,7 @@ public class InnerCompactionSchedulerTest extends AbstractCompactionTest {
     seqResources.get(0).setStatus(TsFileResourceStatus.COMPACTING);
     TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp");
     tsFileManager.addAll(seqResources, true);
-    CompactionScheduler.tryToSubmitInnerSpaceCompactionTask("testSG", "0", 0L, tsFileManager, true);
+    CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L);
 
     long waitingTime = 0;
     while (CompactionTaskManager.getInstance().getExecutingTaskCount() != 0) {
@@ -144,7 +144,7 @@ public class InnerCompactionSchedulerTest extends AbstractCompactionTest {
     seqResources.get(3).setStatus(TsFileResourceStatus.UNCLOSED);
     TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp");
     tsFileManager.addAll(seqResources, true);
-    CompactionScheduler.tryToSubmitInnerSpaceCompactionTask("testSG", "0", 0L, tsFileManager, true);
+    CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L);
     long waitingTime = 0;
     while (CompactionTaskManager.getInstance().getExecutingTaskCount() != 0) {
       try {