You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/02/28 11:13:06 UTC
[iotdb] branch master updated: [IOTDB-5147]Optimize compaction schedule when priority is BALANCE (#9163)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2ba24ca41c [IOTDB-5147]Optimize compaction schedule when priority is BALANCE (#9163)
2ba24ca41c is described below
commit 2ba24ca41c5823073cd68ca3bb3fefe6de2a4f8a
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Tue Feb 28 19:12:58 2023 +0800
[IOTDB-5147]Optimize compaction schedule when priority is BALANCE (#9163)
---
.../execute/task/InnerSpaceCompactionTask.java | 9 +-
.../compaction/schedule/CompactionScheduler.java | 125 ++++++++++++---------
.../impl/RewriteCrossSpaceCompactionSelector.java | 42 +++----
.../impl/SizeTieredCompactionSelector.java | 2 +-
.../inner/InnerCompactionSchedulerTest.java | 6 +-
5 files changed, 103 insertions(+), 81 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
index c1bb3eb505..a9f7aca321 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -110,10 +110,12 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
// get resource of target file
String dataDirectory = selectedTsFileResourceList.get(0).getTsFile().getParent();
LOGGER.info(
- "{}-{} [Compaction] InnerSpaceCompaction task starts with {} files",
+ "{}-{} [Compaction] {} InnerSpaceCompaction task starts with {} files, total file size is {} MB.",
storageGroupName,
dataRegionId,
- selectedTsFileResourceList.size());
+ sequence ? "Sequence" : "Unsequence",
+ selectedTsFileResourceList.size(),
+ selectedFileSize / 1024 / 1024);
try {
targetTsFileResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(
@@ -252,10 +254,11 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
double costTime = (System.currentTimeMillis() - startTime) / 1000.0d;
LOGGER.info(
- "{}-{} [Compaction] InnerSpaceCompaction task finishes successfully, target file is {},"
+ "{}-{} [Compaction] {} InnerSpaceCompaction task finishes successfully, target file is {},"
+ "time cost is {} s, compaction speed is {} MB/s, {}",
storageGroupName,
dataRegionId,
+ sequence ? "Sequence" : "Unsequence",
targetTsFileResource.getTsFile().getName(),
costTime,
selectedFileSize / 1024.0d / 1024.0d / costTime,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java
index 7d3a5d4d7a..d610fc5a4d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -57,40 +58,22 @@ public class CompactionScheduler {
return;
}
try {
- tryToSubmitCrossSpaceCompactionTask(
- tsFileManager.getStorageGroupName(),
- tsFileManager.getDataRegionId(),
- timePartition,
- tsFileManager);
- tryToSubmitInnerSpaceCompactionTask(
- tsFileManager.getStorageGroupName(),
- tsFileManager.getDataRegionId(),
- timePartition,
- tsFileManager,
- true);
- tryToSubmitInnerSpaceCompactionTask(
- tsFileManager.getStorageGroupName(),
- tsFileManager.getDataRegionId(),
- timePartition,
- tsFileManager,
- false);
+ tryToSubmitCrossSpaceCompactionTask(tsFileManager, timePartition);
+ tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition);
} catch (InterruptedException e) {
LOGGER.error("Exception occurs when selecting compaction tasks", e);
Thread.currentThread().interrupt();
}
}
- public static void tryToSubmitInnerSpaceCompactionTask(
- String storageGroupName,
- String dataRegionId,
- long timePartition,
- TsFileManager tsFileManager,
- boolean sequence)
- throws InterruptedException {
+ private static List<List<TsFileResource>> selectInnerSpaceCompactionTask(
+ long timePartition, TsFileManager tsFileManager, boolean sequence) {
if ((!config.isEnableSeqSpaceCompaction() && sequence)
|| (!config.isEnableUnseqSpaceCompaction() && !sequence)) {
- return;
+ return Collections.emptyList();
}
+ String storageGroupName = tsFileManager.getStorageGroupName();
+ String dataRegionId = tsFileManager.getDataRegionId();
ICompactionSelector innerSpaceCompactionSelector = null;
if (sequence) {
@@ -104,44 +87,76 @@ public class CompactionScheduler {
.getInnerUnsequenceCompactionSelector()
.createInstance(storageGroupName, dataRegionId, timePartition, tsFileManager);
}
- List<List<TsFileResource>> taskList =
- innerSpaceCompactionSelector.selectInnerSpaceTask(
- sequence
- ? tsFileManager.getOrCreateSequenceListByTimePartition(timePartition)
- : tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition));
- for (List<TsFileResource> task : taskList) {
- ICompactionPerformer performer =
- sequence
- ? IoTDBDescriptor.getInstance()
- .getConfig()
- .getInnerSeqCompactionPerformer()
- .createInstance()
- : IoTDBDescriptor.getInstance()
- .getConfig()
- .getInnerUnseqCompactionPerformer()
- .createInstance();
- CompactionTaskManager.getInstance()
- .addTaskToWaitingQueue(
- new InnerSpaceCompactionTask(
- timePartition,
- tsFileManager,
- task,
- sequence,
- performer,
- CompactionTaskManager.currentTaskNum,
- tsFileManager.getNextCompactionTaskId()));
+
+ return innerSpaceCompactionSelector.selectInnerSpaceTask(
+ sequence
+ ? tsFileManager.getOrCreateSequenceListByTimePartition(timePartition)
+ : tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition));
+ }
+
+ public static void tryToSubmitInnerSpaceCompactionTask(
+ TsFileManager tsFileManager, long timePartition) throws InterruptedException {
+ List<List<TsFileResource>> seqTaskList =
+ selectInnerSpaceCompactionTask(timePartition, tsFileManager, true);
+ List<List<TsFileResource>> unseqTaskList =
+ selectInnerSpaceCompactionTask(timePartition, tsFileManager, false);
+ int taskFreeSize =
+ config.getCandidateCompactionTaskQueueSize()
+ - CompactionTaskManager.getInstance().getCompactionCandidateTaskCount();
+ int taskSize = Math.max(seqTaskList.size(), unseqTaskList.size());
+ for (int i = 0; i < taskSize; i++) {
+ if (taskFreeSize <= 0) {
+ break;
+ }
+ // submit one seq inner space task
+ if (i < seqTaskList.size()) {
+ submitInnerTask(seqTaskList.get(i), tsFileManager, timePartition, true);
+ taskFreeSize--;
+ }
+
+ // submit one unseq inner space task
+ if (i < unseqTaskList.size()) {
+ submitInnerTask(unseqTaskList.get(i), tsFileManager, timePartition, false);
+ taskFreeSize--;
+ }
}
}
- private static void tryToSubmitCrossSpaceCompactionTask(
- String logicalStorageGroupName,
- String dataRegionId,
+ private static void submitInnerTask(
+ List<TsFileResource> taskList,
+ TsFileManager tsFileManager,
long timePartition,
- TsFileManager tsFileManager)
+ boolean sequence)
throws InterruptedException {
+ ICompactionPerformer performer =
+ sequence
+ ? IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getInnerSeqCompactionPerformer()
+ .createInstance()
+ : IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getInnerUnseqCompactionPerformer()
+ .createInstance();
+ CompactionTaskManager.getInstance()
+ .addTaskToWaitingQueue(
+ new InnerSpaceCompactionTask(
+ timePartition,
+ tsFileManager,
+ taskList,
+ sequence,
+ performer,
+ CompactionTaskManager.currentTaskNum,
+ tsFileManager.getNextCompactionTaskId()));
+ }
+
+ private static void tryToSubmitCrossSpaceCompactionTask(
+ TsFileManager tsFileManager, long timePartition) throws InterruptedException {
if (!config.isEnableCrossSpaceCompaction()) {
return;
}
+ String logicalStorageGroupName = tsFileManager.getStorageGroupName();
+ String dataRegionId = tsFileManager.getDataRegionId();
ICrossSpaceSelector crossSpaceCompactionSelector =
config
.getCrossCompactionSelector()
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
index 23e1b07e87..af3f383202 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
@@ -53,7 +53,7 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
protected long timePartition;
protected TsFileManager tsFileManager;
- private boolean hasPrintedLog = false;
+ private static boolean hasPrintedLog = false;
private final long memoryBudget;
private final int maxCrossCompactionFileNum;
@@ -109,21 +109,13 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
*/
private CrossCompactionTaskResource selectOneTaskResources(
CrossSpaceCompactionCandidate candidate) throws MergeException {
- long startTime = System.currentTimeMillis();
try {
LOGGER.debug(
"Selecting cross compaction task resources from {} seqFile, {} unseqFiles",
candidate.getSeqFiles().size(),
candidate.getUnseqFiles().size());
CrossCompactionTaskResource taskResource = executeTaskResourceSelection(candidate);
- LOGGER.info(
- "selected one cross compaction task resource. is valid: {}, {} seqFiles, {} unseqFiles, total memory cost {}, "
- + "time consumption {}ms",
- taskResource.isValid(),
- taskResource.getSeqFiles().size(),
- taskResource.getUnseqFiles().size(),
- taskResource.getTotalMemoryCost(),
- System.currentTimeMillis() - startTime);
+
return taskResource;
} catch (IOException e) {
throw new MergeException(e);
@@ -238,6 +230,7 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
}
// TODO: (xingtanzjr) need to confirm what this ttl is used for
+ long startTime = System.currentTimeMillis();
long ttlLowerBound = System.currentTimeMillis() - Long.MAX_VALUE;
// we record the variable `candidate` here is used for selecting more than one
// CrossCompactionTaskResources in this method
@@ -245,20 +238,31 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
new CrossSpaceCompactionCandidate(sequenceFileList, unsequenceFileList, ttlLowerBound);
try {
CrossCompactionTaskResource taskResources = selectOneTaskResources(candidate);
- if (!taskResources.isValid() && !hasPrintedLog) {
- LOGGER.info(
- "{} [Compaction] Cannot select any files, because source files may be occupied by other compaction threads.",
- logicalStorageGroupName + "-" + dataRegionId);
- hasPrintedLog = true;
+ if (!taskResources.isValid()) {
+ if (!hasPrintedLog) {
+ LOGGER.info(
+ "{} [Compaction] Total source files: {} seqFiles, {} unseqFiles. Candidate source files: {} seqFiles, {} unseqFiles. Cannot select any files because they do not meet the conditions or may be occupied by other compaction threads.",
+ logicalStorageGroupName + "-" + dataRegionId,
+ sequenceFileList.size(),
+ unsequenceFileList.size(),
+ candidate.getSeqFiles().size(),
+ candidate.getUnseqFiles().size());
+ hasPrintedLog = true;
+ }
return Collections.emptyList();
}
- hasPrintedLog = false;
-
LOGGER.info(
- "{} [Compaction] submit a task with {} sequence file and {} unseq files",
+ "{} [Compaction] Total source files: {} seqFiles, {} unseqFiles. Candidate source files: {} seqFiles, {} unseqFiles. Selected source files: {} seqFiles, {} unseqFiles, total memory cost {}, time consumption {}ms.",
logicalStorageGroupName + "-" + dataRegionId,
+ sequenceFileList.size(),
+ unsequenceFileList.size(),
+ candidate.getSeqFiles().size(),
+ candidate.getUnseqFiles().size(),
taskResources.getSeqFiles().size(),
- taskResources.getUnseqFiles().size());
+ taskResources.getUnseqFiles().size(),
+ taskResources.getTotalMemoryCost(),
+ System.currentTimeMillis() - startTime);
+ hasPrintedLog = false;
return Collections.singletonList(taskResources);
} catch (MergeException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
index 94d6f03f58..c61a46485b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
@@ -225,7 +225,7 @@ public class SizeTieredCompactionSelector
if (fileNameOfO1.getInnerCompactionCnt() != fileNameOfO2.getInnerCompactionCnt()) {
return fileNameOfO2.getInnerCompactionCnt() - fileNameOfO1.getInnerCompactionCnt();
}
- return (int) (fileNameOfO2.getVersion() - fileNameOfO1.getVersion());
+ return (int) (fileNameOfO1.getVersion() - fileNameOfO2.getVersion());
} catch (IOException e) {
return 0;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
index 66f272bf0a..99a7e98190 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
@@ -94,7 +94,7 @@ public class InnerCompactionSchedulerTest extends AbstractCompactionTest {
TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp");
tsFileManager.addAll(seqResources, true);
- CompactionScheduler.tryToSubmitInnerSpaceCompactionTask("testSG", "0", 0L, tsFileManager, true);
+ CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L);
try {
Thread.sleep(5000);
} catch (Exception e) {
@@ -115,7 +115,7 @@ public class InnerCompactionSchedulerTest extends AbstractCompactionTest {
seqResources.get(0).setStatus(TsFileResourceStatus.COMPACTING);
TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp");
tsFileManager.addAll(seqResources, true);
- CompactionScheduler.tryToSubmitInnerSpaceCompactionTask("testSG", "0", 0L, tsFileManager, true);
+ CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L);
long waitingTime = 0;
while (CompactionTaskManager.getInstance().getExecutingTaskCount() != 0) {
@@ -144,7 +144,7 @@ public class InnerCompactionSchedulerTest extends AbstractCompactionTest {
seqResources.get(3).setStatus(TsFileResourceStatus.UNCLOSED);
TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp");
tsFileManager.addAll(seqResources, true);
- CompactionScheduler.tryToSubmitInnerSpaceCompactionTask("testSG", "0", 0L, tsFileManager, true);
+ CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L);
long waitingTime = 0;
while (CompactionTaskManager.getInstance().getExecutingTaskCount() != 0) {
try {