You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2023/03/16 02:31:12 UTC
[iotdb] 04/04: allow submitting tasks when queue is full (#9305)
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch rc/1.1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit aaa86ca46185a307a687aa3680615d192703dd61
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Thu Mar 16 10:03:05 2023 +0800
allow submitting tasks when queue is full (#9305)
---
.../compaction/schedule/CompactionScheduler.java | 100 +++++++--------------
.../compaction/schedule/CompactionTaskManager.java | 3 +-
.../impl/RewriteCrossSpaceCompactionSelector.java | 5 +-
.../impl/SizeTieredCompactionSelector.java | 27 ++----
.../inner/InnerCompactionSchedulerTest.java | 6 +-
5 files changed, 45 insertions(+), 96 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java
index d610fc5a4d..97a12af670 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java
@@ -34,7 +34,6 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -59,23 +58,26 @@ public class CompactionScheduler {
}
try {
tryToSubmitCrossSpaceCompactionTask(tsFileManager, timePartition);
- tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition);
+ tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, true);
+ tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, false);
} catch (InterruptedException e) {
LOGGER.error("Exception occurs when selecting compaction tasks", e);
Thread.currentThread().interrupt();
}
}
- private static List<List<TsFileResource>> selectInnerSpaceCompactionTask(
- long timePartition, TsFileManager tsFileManager, boolean sequence) {
+ public static void tryToSubmitInnerSpaceCompactionTask(
+ TsFileManager tsFileManager, long timePartition, boolean sequence)
+ throws InterruptedException {
if ((!config.isEnableSeqSpaceCompaction() && sequence)
|| (!config.isEnableUnseqSpaceCompaction() && !sequence)) {
- return Collections.emptyList();
+ return;
}
+
String storageGroupName = tsFileManager.getStorageGroupName();
String dataRegionId = tsFileManager.getDataRegionId();
- ICompactionSelector innerSpaceCompactionSelector = null;
+ ICompactionSelector innerSpaceCompactionSelector;
if (sequence) {
innerSpaceCompactionSelector =
config
@@ -87,69 +89,35 @@ public class CompactionScheduler {
.getInnerUnsequenceCompactionSelector()
.createInstance(storageGroupName, dataRegionId, timePartition, tsFileManager);
}
-
- return innerSpaceCompactionSelector.selectInnerSpaceTask(
- sequence
- ? tsFileManager.getOrCreateSequenceListByTimePartition(timePartition)
- : tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition));
- }
-
- public static void tryToSubmitInnerSpaceCompactionTask(
- TsFileManager tsFileManager, long timePartition) throws InterruptedException {
- List<List<TsFileResource>> seqTaskList =
- selectInnerSpaceCompactionTask(timePartition, tsFileManager, true);
- List<List<TsFileResource>> unseqTaskList =
- selectInnerSpaceCompactionTask(timePartition, tsFileManager, false);
- int taskFreeSize =
- config.getCandidateCompactionTaskQueueSize()
- - CompactionTaskManager.getInstance().getCompactionCandidateTaskCount();
- int taskSize = Math.max(seqTaskList.size(), unseqTaskList.size());
- for (int i = 0; i < taskSize; i++) {
- if (taskFreeSize <= 0) {
- break;
- }
- // submit one seq inner space task
- if (i < seqTaskList.size()) {
- submitInnerTask(seqTaskList.get(i), tsFileManager, timePartition, true);
- taskFreeSize--;
- }
-
- // submit one unseq inner space task
- if (i < unseqTaskList.size()) {
- submitInnerTask(unseqTaskList.get(i), tsFileManager, timePartition, false);
- taskFreeSize--;
- }
+ List<List<TsFileResource>> taskList =
+ innerSpaceCompactionSelector.selectInnerSpaceTask(
+ sequence
+ ? tsFileManager.getOrCreateSequenceListByTimePartition(timePartition)
+ : tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition));
+ for (List<TsFileResource> task : taskList) {
+ ICompactionPerformer performer =
+ sequence
+ ? IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getInnerSeqCompactionPerformer()
+ .createInstance()
+ : IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getInnerUnseqCompactionPerformer()
+ .createInstance();
+ CompactionTaskManager.getInstance()
+ .addTaskToWaitingQueue(
+ new InnerSpaceCompactionTask(
+ timePartition,
+ tsFileManager,
+ task,
+ sequence,
+ performer,
+ CompactionTaskManager.currentTaskNum,
+ tsFileManager.getNextCompactionTaskId()));
}
}
- private static void submitInnerTask(
- List<TsFileResource> taskList,
- TsFileManager tsFileManager,
- long timePartition,
- boolean sequence)
- throws InterruptedException {
- ICompactionPerformer performer =
- sequence
- ? IoTDBDescriptor.getInstance()
- .getConfig()
- .getInnerSeqCompactionPerformer()
- .createInstance()
- : IoTDBDescriptor.getInstance()
- .getConfig()
- .getInnerUnseqCompactionPerformer()
- .createInstance();
- CompactionTaskManager.getInstance()
- .addTaskToWaitingQueue(
- new InnerSpaceCompactionTask(
- timePartition,
- tsFileManager,
- taskList,
- sequence,
- performer,
- CompactionTaskManager.currentTaskNum,
- tsFileManager.getNextCompactionTaskId()));
- }
-
private static void tryToSubmitCrossSpaceCompactionTask(
TsFileManager tsFileManager, long timePartition) throws InterruptedException {
if (!config.isEnableCrossSpaceCompaction()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
index d75401820d..04f2cda472 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
@@ -221,8 +221,7 @@ public class CompactionTaskManager implements IService {
throws InterruptedException {
if (init
&& !candidateCompactionTaskQueue.contains(compactionTask)
- && !isTaskRunning(compactionTask)
- && candidateCompactionTaskQueue.size() < config.getCandidateCompactionTaskQueueSize()) {
+ && !isTaskRunning(compactionTask)) {
compactionTask.setSourceFilesToCompactionCandidate();
candidateCompactionTaskQueue.put(compactionTask);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
index af3f383202..a6ed828827 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
@@ -208,10 +208,7 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
private boolean canSubmitCrossTask(
List<TsFileResource> sequenceFileList, List<TsFileResource> unsequenceFileList) {
- return CompactionTaskManager.getInstance().getCompactionCandidateTaskCount()
- < config.getCandidateCompactionTaskQueueSize()
- && !sequenceFileList.isEmpty()
- && !unsequenceFileList.isEmpty();
+ return !sequenceFileList.isEmpty() && !unsequenceFileList.isEmpty();
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
index c61a46485b..df88248bf2 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
@@ -107,9 +107,7 @@ public class SizeTieredCompactionSelector
if (currentName.getInnerCompactionCnt() != level) {
// meet files of another level
if (selectedFileList.size() > 1) {
- if (!addOneTaskToQueue(taskPriorityQueue, selectedFileList, selectedFileSize)) {
- return false;
- }
+ taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
shouldContinueToSearch = false;
}
selectedFileList = new ArrayList<>();
@@ -134,9 +132,7 @@ public class SizeTieredCompactionSelector
|| selectedFileList.size() >= config.getMaxInnerCompactionCandidateFileNum()) {
// submit the task
if (selectedFileList.size() > 1) {
- if (!addOneTaskToQueue(taskPriorityQueue, selectedFileList, selectedFileSize)) {
- return false;
- }
+ taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
shouldContinueToSearch = false;
}
selectedFileList = new ArrayList<>();
@@ -147,25 +143,12 @@ public class SizeTieredCompactionSelector
// if next time partition exists
// submit a merge task even it does not meet the requirement for file num or file size
if (hasNextTimePartition && selectedFileList.size() > 1) {
- addOneTaskToQueue(taskPriorityQueue, selectedFileList, selectedFileSize);
+ taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
shouldContinueToSearch = false;
}
return shouldContinueToSearch;
}
- private boolean addOneTaskToQueue(
- PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue,
- List<TsFileResource> selectedFileList,
- long selectedFileSize) {
- if (CompactionTaskManager.getInstance().getCompactionCandidateTaskCount()
- + taskPriorityQueue.size()
- < config.getCandidateCompactionTaskQueueSize()) {
- taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
- return true;
- }
- return false;
- }
-
/**
* This method searches for a batch of files to be compacted from layer 0 to the highest layer. If
* there are more than a batch of files to be merged on a certain layer, it does not search to
@@ -223,9 +206,11 @@ public class SizeTieredCompactionSelector
TsFileNameGenerator.TsFileName fileNameOfO2 =
TsFileNameGenerator.getTsFileName(resourceOfO2.getTsFile().getName());
if (fileNameOfO1.getInnerCompactionCnt() != fileNameOfO2.getInnerCompactionCnt()) {
+ // the higher the inner compaction count, the higher the priority is
return fileNameOfO2.getInnerCompactionCnt() - fileNameOfO1.getInnerCompactionCnt();
}
- return (int) (fileNameOfO1.getVersion() - fileNameOfO2.getVersion());
+ // the larger the version number, the higher the priority is
+ return (int) (fileNameOfO2.getVersion() - fileNameOfO1.getVersion());
} catch (IOException e) {
return 0;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
index 99a7e98190..d1f907fa34 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
@@ -94,7 +94,7 @@ public class InnerCompactionSchedulerTest extends AbstractCompactionTest {
TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp");
tsFileManager.addAll(seqResources, true);
- CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L);
+ CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L, true);
try {
Thread.sleep(5000);
} catch (Exception e) {
@@ -115,7 +115,7 @@ public class InnerCompactionSchedulerTest extends AbstractCompactionTest {
seqResources.get(0).setStatus(TsFileResourceStatus.COMPACTING);
TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp");
tsFileManager.addAll(seqResources, true);
- CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L);
+ CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L, true);
long waitingTime = 0;
while (CompactionTaskManager.getInstance().getExecutingTaskCount() != 0) {
@@ -144,7 +144,7 @@ public class InnerCompactionSchedulerTest extends AbstractCompactionTest {
seqResources.get(3).setStatus(TsFileResourceStatus.UNCLOSED);
TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp");
tsFileManager.addAll(seqResources, true);
- CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L);
+ CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L, true);
long waitingTime = 0;
while (CompactionTaskManager.getInstance().getExecutingTaskCount() != 0) {
try {