You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/03/16 02:03:11 UTC

[iotdb] branch rel/1.1 updated: allow submitting tasks when queue is full (#9305)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new 7be208e72a allow submitting tasks when queue is full (#9305)
7be208e72a is described below

commit 7be208e72a9cead2676b2496d9a0cfa5478f7503
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Thu Mar 16 10:03:05 2023 +0800

    allow submitting tasks when queue is full (#9305)
---
 .../compaction/schedule/CompactionScheduler.java   | 100 +++++++--------------
 .../compaction/schedule/CompactionTaskManager.java |   3 +-
 .../impl/RewriteCrossSpaceCompactionSelector.java  |   5 +-
 .../impl/SizeTieredCompactionSelector.java         |  27 ++----
 .../inner/InnerCompactionSchedulerTest.java        |   6 +-
 5 files changed, 45 insertions(+), 96 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java
index d610fc5a4d..97a12af670 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java
@@ -34,7 +34,6 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -59,23 +58,26 @@ public class CompactionScheduler {
     }
     try {
       tryToSubmitCrossSpaceCompactionTask(tsFileManager, timePartition);
-      tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition);
+      tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, true);
+      tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, false);
     } catch (InterruptedException e) {
       LOGGER.error("Exception occurs when selecting compaction tasks", e);
       Thread.currentThread().interrupt();
     }
   }
 
-  private static List<List<TsFileResource>> selectInnerSpaceCompactionTask(
-      long timePartition, TsFileManager tsFileManager, boolean sequence) {
+  public static void tryToSubmitInnerSpaceCompactionTask(
+      TsFileManager tsFileManager, long timePartition, boolean sequence)
+      throws InterruptedException {
     if ((!config.isEnableSeqSpaceCompaction() && sequence)
         || (!config.isEnableUnseqSpaceCompaction() && !sequence)) {
-      return Collections.emptyList();
+      return;
     }
+
     String storageGroupName = tsFileManager.getStorageGroupName();
     String dataRegionId = tsFileManager.getDataRegionId();
 
-    ICompactionSelector innerSpaceCompactionSelector = null;
+    ICompactionSelector innerSpaceCompactionSelector;
     if (sequence) {
       innerSpaceCompactionSelector =
           config
@@ -87,69 +89,35 @@ public class CompactionScheduler {
               .getInnerUnsequenceCompactionSelector()
               .createInstance(storageGroupName, dataRegionId, timePartition, tsFileManager);
     }
-
-    return innerSpaceCompactionSelector.selectInnerSpaceTask(
-        sequence
-            ? tsFileManager.getOrCreateSequenceListByTimePartition(timePartition)
-            : tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition));
-  }
-
-  public static void tryToSubmitInnerSpaceCompactionTask(
-      TsFileManager tsFileManager, long timePartition) throws InterruptedException {
-    List<List<TsFileResource>> seqTaskList =
-        selectInnerSpaceCompactionTask(timePartition, tsFileManager, true);
-    List<List<TsFileResource>> unseqTaskList =
-        selectInnerSpaceCompactionTask(timePartition, tsFileManager, false);
-    int taskFreeSize =
-        config.getCandidateCompactionTaskQueueSize()
-            - CompactionTaskManager.getInstance().getCompactionCandidateTaskCount();
-    int taskSize = Math.max(seqTaskList.size(), unseqTaskList.size());
-    for (int i = 0; i < taskSize; i++) {
-      if (taskFreeSize <= 0) {
-        break;
-      }
-      // submit one seq inner space task
-      if (i < seqTaskList.size()) {
-        submitInnerTask(seqTaskList.get(i), tsFileManager, timePartition, true);
-        taskFreeSize--;
-      }
-
-      // submit one unseq inner space task
-      if (i < unseqTaskList.size()) {
-        submitInnerTask(unseqTaskList.get(i), tsFileManager, timePartition, false);
-        taskFreeSize--;
-      }
+    List<List<TsFileResource>> taskList =
+        innerSpaceCompactionSelector.selectInnerSpaceTask(
+            sequence
+                ? tsFileManager.getOrCreateSequenceListByTimePartition(timePartition)
+                : tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition));
+    for (List<TsFileResource> task : taskList) {
+      ICompactionPerformer performer =
+          sequence
+              ? IoTDBDescriptor.getInstance()
+                  .getConfig()
+                  .getInnerSeqCompactionPerformer()
+                  .createInstance()
+              : IoTDBDescriptor.getInstance()
+                  .getConfig()
+                  .getInnerUnseqCompactionPerformer()
+                  .createInstance();
+      CompactionTaskManager.getInstance()
+          .addTaskToWaitingQueue(
+              new InnerSpaceCompactionTask(
+                  timePartition,
+                  tsFileManager,
+                  task,
+                  sequence,
+                  performer,
+                  CompactionTaskManager.currentTaskNum,
+                  tsFileManager.getNextCompactionTaskId()));
     }
   }
 
-  private static void submitInnerTask(
-      List<TsFileResource> taskList,
-      TsFileManager tsFileManager,
-      long timePartition,
-      boolean sequence)
-      throws InterruptedException {
-    ICompactionPerformer performer =
-        sequence
-            ? IoTDBDescriptor.getInstance()
-                .getConfig()
-                .getInnerSeqCompactionPerformer()
-                .createInstance()
-            : IoTDBDescriptor.getInstance()
-                .getConfig()
-                .getInnerUnseqCompactionPerformer()
-                .createInstance();
-    CompactionTaskManager.getInstance()
-        .addTaskToWaitingQueue(
-            new InnerSpaceCompactionTask(
-                timePartition,
-                tsFileManager,
-                taskList,
-                sequence,
-                performer,
-                CompactionTaskManager.currentTaskNum,
-                tsFileManager.getNextCompactionTaskId()));
-  }
-
   private static void tryToSubmitCrossSpaceCompactionTask(
       TsFileManager tsFileManager, long timePartition) throws InterruptedException {
     if (!config.isEnableCrossSpaceCompaction()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
index d75401820d..04f2cda472 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
@@ -221,8 +221,7 @@ public class CompactionTaskManager implements IService {
       throws InterruptedException {
     if (init
         && !candidateCompactionTaskQueue.contains(compactionTask)
-        && !isTaskRunning(compactionTask)
-        && candidateCompactionTaskQueue.size() < config.getCandidateCompactionTaskQueueSize()) {
+        && !isTaskRunning(compactionTask)) {
       compactionTask.setSourceFilesToCompactionCandidate();
       candidateCompactionTaskQueue.put(compactionTask);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
index af3f383202..a6ed828827 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
@@ -208,10 +208,7 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
 
   private boolean canSubmitCrossTask(
       List<TsFileResource> sequenceFileList, List<TsFileResource> unsequenceFileList) {
-    return CompactionTaskManager.getInstance().getCompactionCandidateTaskCount()
-            < config.getCandidateCompactionTaskQueueSize()
-        && !sequenceFileList.isEmpty()
-        && !unsequenceFileList.isEmpty();
+    return !sequenceFileList.isEmpty() && !unsequenceFileList.isEmpty();
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
index c61a46485b..df88248bf2 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
@@ -107,9 +107,7 @@ public class SizeTieredCompactionSelector
       if (currentName.getInnerCompactionCnt() != level) {
         // meet files of another level
         if (selectedFileList.size() > 1) {
-          if (!addOneTaskToQueue(taskPriorityQueue, selectedFileList, selectedFileSize)) {
-            return false;
-          }
+          taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
           shouldContinueToSearch = false;
         }
         selectedFileList = new ArrayList<>();
@@ -134,9 +132,7 @@ public class SizeTieredCompactionSelector
           || selectedFileList.size() >= config.getMaxInnerCompactionCandidateFileNum()) {
         // submit the task
         if (selectedFileList.size() > 1) {
-          if (!addOneTaskToQueue(taskPriorityQueue, selectedFileList, selectedFileSize)) {
-            return false;
-          }
+          taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
           shouldContinueToSearch = false;
         }
         selectedFileList = new ArrayList<>();
@@ -147,25 +143,12 @@ public class SizeTieredCompactionSelector
     // if next time partition exists
     // submit a merge task even it does not meet the requirement for file num or file size
     if (hasNextTimePartition && selectedFileList.size() > 1) {
-      addOneTaskToQueue(taskPriorityQueue, selectedFileList, selectedFileSize);
+      taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
       shouldContinueToSearch = false;
     }
     return shouldContinueToSearch;
   }
 
-  private boolean addOneTaskToQueue(
-      PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue,
-      List<TsFileResource> selectedFileList,
-      long selectedFileSize) {
-    if (CompactionTaskManager.getInstance().getCompactionCandidateTaskCount()
-            + taskPriorityQueue.size()
-        < config.getCandidateCompactionTaskQueueSize()) {
-      taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
-      return true;
-    }
-    return false;
-  }
-
   /**
    * This method searches for a batch of files to be compacted from layer 0 to the highest layer. If
    * there are more than a batch of files to be merged on a certain layer, it does not search to
@@ -223,9 +206,11 @@ public class SizeTieredCompactionSelector
         TsFileNameGenerator.TsFileName fileNameOfO2 =
             TsFileNameGenerator.getTsFileName(resourceOfO2.getTsFile().getName());
         if (fileNameOfO1.getInnerCompactionCnt() != fileNameOfO2.getInnerCompactionCnt()) {
+          // the higher the inner compaction count, the higher the priority is
           return fileNameOfO2.getInnerCompactionCnt() - fileNameOfO1.getInnerCompactionCnt();
         }
-        return (int) (fileNameOfO1.getVersion() - fileNameOfO2.getVersion());
+        // the larger the version number, the higher the priority is
+        return (int) (fileNameOfO2.getVersion() - fileNameOfO1.getVersion());
       } catch (IOException e) {
         return 0;
       }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
index 99a7e98190..d1f907fa34 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
@@ -94,7 +94,7 @@ public class InnerCompactionSchedulerTest extends AbstractCompactionTest {
     TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp");
     tsFileManager.addAll(seqResources, true);
 
-    CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L);
+    CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L, true);
     try {
       Thread.sleep(5000);
     } catch (Exception e) {
@@ -115,7 +115,7 @@ public class InnerCompactionSchedulerTest extends AbstractCompactionTest {
     seqResources.get(0).setStatus(TsFileResourceStatus.COMPACTING);
     TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp");
     tsFileManager.addAll(seqResources, true);
-    CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L);
+    CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L, true);
 
     long waitingTime = 0;
     while (CompactionTaskManager.getInstance().getExecutingTaskCount() != 0) {
@@ -144,7 +144,7 @@ public class InnerCompactionSchedulerTest extends AbstractCompactionTest {
     seqResources.get(3).setStatus(TsFileResourceStatus.UNCLOSED);
     TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp");
     tsFileManager.addAll(seqResources, true);
-    CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L);
+    CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L, true);
     long waitingTime = 0;
     while (CompactionTaskManager.getInstance().getExecutingTaskCount() != 0) {
       try {