You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/03/03 03:47:07 UTC
[iotdb] branch rel/1.0 updated: [To rel/1.0][IOTDB-5147]Optimize compaction schedule when priority is BALANCE (#9172)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.0 by this push:
new 482922e8c4 [To rel/1.0][IOTDB-5147]Optimize compaction schedule when priority is BALANCE (#9172)
482922e8c4 is described below
commit 482922e8c40b0e545faf96a29af7173a8515b331
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Fri Mar 3 11:47:00 2023 +0800
[To rel/1.0][IOTDB-5147]Optimize compaction schedule when priority is BALANCE (#9172)
---
docs/UserGuide/Reference/Common-Config-Manual.md | 8 ++
.../zh/UserGuide/Reference/Common-Config-Manual.md | 31 +++--
.../resources/conf/iotdb-common.properties | 8 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 ++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 5 +
.../execute/task/InnerSpaceCompactionTask.java | 9 +-
.../compaction/schedule/CompactionScheduler.java | 131 ++++++++++++---------
.../compaction/schedule/CompactionTaskManager.java | 13 +-
.../DefaultCompactionTaskComparatorImpl.java | 21 ++--
.../impl/CrossSpaceCompactionCandidate.java | 6 +-
.../impl/RewriteCrossSpaceCompactionSelector.java | 63 +++++-----
.../impl/SizeTieredCompactionSelector.java | 26 +++-
.../engine/compaction/CompactionSchedulerTest.java | 5 +-
.../CompactionSchedulerWithFastPerformerTest.java | 8 +-
.../inner/InnerCompactionSchedulerTest.java | 6 +-
15 files changed, 220 insertions(+), 131 deletions(-)
diff --git a/docs/UserGuide/Reference/Common-Config-Manual.md b/docs/UserGuide/Reference/Common-Config-Manual.md
index fdeb08dc26..b3bab3f74f 100644
--- a/docs/UserGuide/Reference/Common-Config-Manual.md
+++ b/docs/UserGuide/Reference/Common-Config-Manual.md
@@ -1017,6 +1017,14 @@ Different configuration parameters take effect in the following three ways:
|Default| true |
|Effective| After restart system |
+* candidate\_compaction\_task\_queue\_size
+
+|Name| candidate\_compaction\_task\_queue\_size |
+|:---:|:--------------------------------------------|
+|Description| The size of candidate compaction task queue |
+|Type| Int32 |
+|Default| 50 |
+|Effective| After restart system |
### Write Ahead Log Configuration
diff --git a/docs/zh/UserGuide/Reference/Common-Config-Manual.md b/docs/zh/UserGuide/Reference/Common-Config-Manual.md
index 0c6c28fcfb..1baea96f92 100644
--- a/docs/zh/UserGuide/Reference/Common-Config-Manual.md
+++ b/docs/zh/UserGuide/Reference/Common-Config-Manual.md
@@ -1056,21 +1056,30 @@ IoTDB ConfigNode 和 DataNode 的公共配置参数位于 `conf` 目录下。
* sub\_compaction\_thread\_count
-|名字| sub\_compaction\_thread\_count |
-|:---:|:--|
-|描述| 每个跨空间合并任务的子任务线程数 |
-|类型| Int32 |
-|默认值| 4 |
-|改后生效方式| 重启服务生效|
+|名字| sub\_compaction\_thread\_count |
+|:---:|:--------------------------------|
+|描述| 每个合并任务的子任务线程数,只对跨空间合并和乱序空间内合并生效 |
+|类型| int32 |
+|默认值| 4 |
+|改后生效方式| 重启服务生效 |
* enable\_compaction\_validation
|名字| enable\_compaction\_validation |
-|:---:|:--|
-|描述| 开启合并结束后对顺序文件时间范围的检查 |
-|类型| Boolean |
-|默认值| true |
-|改后生效方式| 重启服务生效|
+|:---:|:-------------------------------|
+|描述| 开启合并结束后对顺序文件时间范围的检查 |
+|类型| Boolean |
+|默认值| true |
+|改后生效方式| 重启服务生效 |
+
+* candidate\_compaction\_task\_queue\_size
+
+|名字| candidate\_compaction\_task\_queue\_size |
+|:---:|:-----------------------------------------|
+|描述| 合并任务优先级队列的大小 |
+|类型| int32 |
+|默认值| 50 |
+|改后生效方式| 重启服务生效 |
### 写前日志配置
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 1254cca9a4..ef2b030ec0 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -573,6 +573,10 @@ cluster_name=defaultCluster
# BALANCE: alternate two compaction types
# compaction_priority=BALANCE
+# The size of candidate compaction task queue.
+# Datatype: int
+# candidate_compaction_task_queue_size = 50
+
# The target tsfile size in compaction
# Datatype: long, Unit: byte
# target_compaction_file_size=1073741824
@@ -726,9 +730,9 @@ cluster_name=defaultCluster
# Datatype: int
# page_size_in_byte=65536
-# The maximum number of data points in a page, default 1024*1024
+# The maximum number of data points in a page, default 10000
# Datatype: int
-# max_number_of_points_in_page=1048576
+# max_number_of_points_in_page=10000
# The threshold for pattern matching in regex
# Datatype: int
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 06cf83ce33..3bdb0c667d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -498,6 +498,9 @@ public class IoTDBConfig {
private boolean enableCompactionValidation = true;
+ /** The size of candidate compaction task queue. */
+ private int candidateCompactionTaskQueueSize = 50;
+
/** whether to cache meta data(ChunkMetaData and TsFileMetaData) or not. */
private boolean metaDataCacheEnable = true;
@@ -3657,6 +3660,14 @@ public class IoTDBConfig {
this.enableCompactionValidation = enableCompactionValidation;
}
+ public int getCandidateCompactionTaskQueueSize() {
+ return candidateCompactionTaskQueueSize;
+ }
+
+ public void setCandidateCompactionTaskQueueSize(int candidateCompactionTaskQueueSize) {
+ this.candidateCompactionTaskQueueSize = candidateCompactionTaskQueueSize;
+ }
+
public boolean isEnableAuditLog() {
return enableAuditLog;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index d2450fa457..a88e65b1a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -669,6 +669,11 @@ public class IoTDBDescriptor {
properties.getProperty(
"enable_compaction_validation",
Boolean.toString(conf.isEnableCompactionValidation()))));
+ conf.setCandidateCompactionTaskQueueSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "candidate_compaction_task_queue_size",
+ Integer.toString(conf.getCandidateCompactionTaskQueueSize()))));
conf.setEnablePartialInsert(
Boolean.parseBoolean(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
index c1bb3eb505..a9f7aca321 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -110,10 +110,12 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
// get resource of target file
String dataDirectory = selectedTsFileResourceList.get(0).getTsFile().getParent();
LOGGER.info(
- "{}-{} [Compaction] InnerSpaceCompaction task starts with {} files",
+ "{}-{} [Compaction] {} InnerSpaceCompaction task starts with {} files, total file size is {} MB.",
storageGroupName,
dataRegionId,
- selectedTsFileResourceList.size());
+ sequence ? "Sequence" : "Unsequence",
+ selectedTsFileResourceList.size(),
+ selectedFileSize / 1024 / 1024);
try {
targetTsFileResource =
TsFileNameGenerator.getInnerCompactionTargetFileResource(
@@ -252,10 +254,11 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask {
double costTime = (System.currentTimeMillis() - startTime) / 1000.0d;
LOGGER.info(
- "{}-{} [Compaction] InnerSpaceCompaction task finishes successfully, target file is {},"
+ "{}-{} [Compaction] {} InnerSpaceCompaction task finishes successfully, target file is {},"
+ "time cost is {} s, compaction speed is {} MB/s, {}",
storageGroupName,
dataRegionId,
+ sequence ? "Sequence" : "Unsequence",
targetTsFileResource.getTsFile().getName(),
costTime,
selectedFileSize / 1024.0d / 1024.0d / costTime,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java
index 0173423537..61d35f0dde 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionScheduler.java
@@ -34,7 +34,9 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.List;
+import java.util.stream.Collectors;
/**
* CompactionScheduler schedules and submits the compaction task periodically, and it counts the
@@ -56,40 +58,22 @@ public class CompactionScheduler {
return;
}
try {
- tryToSubmitCrossSpaceCompactionTask(
- tsFileManager.getStorageGroupName(),
- tsFileManager.getDataRegionId(),
- timePartition,
- tsFileManager);
- tryToSubmitInnerSpaceCompactionTask(
- tsFileManager.getStorageGroupName(),
- tsFileManager.getDataRegionId(),
- timePartition,
- tsFileManager,
- true);
- tryToSubmitInnerSpaceCompactionTask(
- tsFileManager.getStorageGroupName(),
- tsFileManager.getDataRegionId(),
- timePartition,
- tsFileManager,
- false);
+ tryToSubmitCrossSpaceCompactionTask(tsFileManager, timePartition);
+ tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition);
} catch (InterruptedException e) {
LOGGER.error("Exception occurs when selecting compaction tasks", e);
Thread.currentThread().interrupt();
}
}
- public static void tryToSubmitInnerSpaceCompactionTask(
- String storageGroupName,
- String dataRegionId,
- long timePartition,
- TsFileManager tsFileManager,
- boolean sequence)
- throws InterruptedException {
+ private static List<List<TsFileResource>> selectInnerSpaceCompactionTask(
+ long timePartition, TsFileManager tsFileManager, boolean sequence) {
if ((!config.isEnableSeqSpaceCompaction() && sequence)
|| (!config.isEnableUnseqSpaceCompaction() && !sequence)) {
- return;
+ return Collections.emptyList();
}
+ String storageGroupName = tsFileManager.getStorageGroupName();
+ String dataRegionId = tsFileManager.getDataRegionId();
ICompactionSelector innerSpaceCompactionSelector = null;
if (sequence) {
@@ -103,44 +87,76 @@ public class CompactionScheduler {
.getInnerUnsequenceCompactionSelector()
.createInstance(storageGroupName, dataRegionId, timePartition, tsFileManager);
}
- List<List<TsFileResource>> taskList =
- innerSpaceCompactionSelector.selectInnerSpaceTask(
- sequence
- ? tsFileManager.getOrCreateSequenceListByTimePartition(timePartition)
- : tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition));
- for (List<TsFileResource> task : taskList) {
- ICompactionPerformer performer =
- sequence
- ? IoTDBDescriptor.getInstance()
- .getConfig()
- .getInnerSeqCompactionPerformer()
- .createInstance()
- : IoTDBDescriptor.getInstance()
- .getConfig()
- .getInnerUnseqCompactionPerformer()
- .createInstance();
- CompactionTaskManager.getInstance()
- .addTaskToWaitingQueue(
- new InnerSpaceCompactionTask(
- timePartition,
- tsFileManager,
- task,
- sequence,
- performer,
- CompactionTaskManager.currentTaskNum,
- tsFileManager.getNextCompactionTaskId()));
+
+ return innerSpaceCompactionSelector.selectInnerSpaceTask(
+ sequence
+ ? tsFileManager.getOrCreateSequenceListByTimePartition(timePartition)
+ : tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition));
+ }
+
+ public static void tryToSubmitInnerSpaceCompactionTask(
+ TsFileManager tsFileManager, long timePartition) throws InterruptedException {
+ List<List<TsFileResource>> seqTaskList =
+ selectInnerSpaceCompactionTask(timePartition, tsFileManager, true);
+ List<List<TsFileResource>> unseqTaskList =
+ selectInnerSpaceCompactionTask(timePartition, tsFileManager, false);
+ int taskFreeSize =
+ config.getCandidateCompactionTaskQueueSize()
+ - CompactionTaskManager.getInstance().getCompactionCandidateTaskCount();
+ int taskSize = Math.max(seqTaskList.size(), unseqTaskList.size());
+ for (int i = 0; i < taskSize; i++) {
+ if (taskFreeSize <= 0) {
+ break;
+ }
+ // submit one seq inner space task
+ if (i < seqTaskList.size()) {
+ submitInnerTask(seqTaskList.get(i), tsFileManager, timePartition, true);
+ taskFreeSize--;
+ }
+
+ // submit one unseq inner space task
+ if (i < unseqTaskList.size()) {
+ submitInnerTask(unseqTaskList.get(i), tsFileManager, timePartition, false);
+ taskFreeSize--;
+ }
}
}
- private static void tryToSubmitCrossSpaceCompactionTask(
- String logicalStorageGroupName,
- String dataRegionId,
+ private static void submitInnerTask(
+ List<TsFileResource> taskList,
+ TsFileManager tsFileManager,
long timePartition,
- TsFileManager tsFileManager)
+ boolean sequence)
throws InterruptedException {
+ ICompactionPerformer performer =
+ sequence
+ ? IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getInnerSeqCompactionPerformer()
+ .createInstance()
+ : IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getInnerUnseqCompactionPerformer()
+ .createInstance();
+ CompactionTaskManager.getInstance()
+ .addTaskToWaitingQueue(
+ new InnerSpaceCompactionTask(
+ timePartition,
+ tsFileManager,
+ taskList,
+ sequence,
+ performer,
+ CompactionTaskManager.currentTaskNum,
+ tsFileManager.getNextCompactionTaskId()));
+ }
+
+ private static void tryToSubmitCrossSpaceCompactionTask(
+ TsFileManager tsFileManager, long timePartition) throws InterruptedException {
if (!config.isEnableCrossSpaceCompaction()) {
return;
}
+ String logicalStorageGroupName = tsFileManager.getStorageGroupName();
+ String dataRegionId = tsFileManager.getDataRegionId();
ICrossSpaceSelector crossSpaceCompactionSelector =
config
.getCrossCompactionSelector()
@@ -149,7 +165,10 @@ public class CompactionScheduler {
crossSpaceCompactionSelector.selectCrossSpaceTask(
tsFileManager.getOrCreateSequenceListByTimePartition(timePartition),
tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition));
- List<Long> memoryCost = crossSpaceCompactionSelector.getCompactionMemoryCost();
+ List<Long> memoryCost =
+ taskList.stream()
+ .map(CrossCompactionTaskResource::getTotalMemoryCost)
+ .collect(Collectors.toList());
for (int i = 0, size = taskList.size(); i < size; ++i) {
CompactionTaskManager.getInstance()
.addTaskToWaitingQueue(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
index 76bf9db322..d75401820d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
@@ -58,6 +58,8 @@ public class CompactionTaskManager implements IService {
private static final CompactionTaskManager INSTANCE = new CompactionTaskManager();
+ private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
// The thread pool that executes the compaction task. The default number of threads for this pool
// is 10.
private WrappedThreadPoolExecutor taskExecutionPool;
@@ -67,7 +69,8 @@ public class CompactionTaskManager implements IService {
public static volatile AtomicInteger currentTaskNum = new AtomicInteger(0);
private final FixedPriorityBlockingQueue<AbstractCompactionTask> candidateCompactionTaskQueue =
- new FixedPriorityBlockingQueue<>(1024, new DefaultCompactionTaskComparatorImpl());
+ new FixedPriorityBlockingQueue<>(
+ config.getCandidateCompactionTaskQueueSize(), new DefaultCompactionTaskComparatorImpl());
// <StorageGroup-DataRegionId,futureSet>, it is used to store all compaction tasks under each
// virtualStorageGroup
private final Map<String, Map<AbstractCompactionTask, Future<CompactionTaskSummary>>>
@@ -76,7 +79,6 @@ public class CompactionTaskManager implements IService {
private final RateLimiter mergeWriteRateLimiter = RateLimiter.create(Double.MAX_VALUE);
- private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private volatile boolean init = false;
public static CompactionTaskManager getInstance() {
@@ -219,7 +221,8 @@ public class CompactionTaskManager implements IService {
throws InterruptedException {
if (init
&& !candidateCompactionTaskQueue.contains(compactionTask)
- && !isTaskRunning(compactionTask)) {
+ && !isTaskRunning(compactionTask)
+ && candidateCompactionTaskQueue.size() < config.getCandidateCompactionTaskQueueSize()) {
compactionTask.setSourceFilesToCompactionCandidate();
candidateCompactionTaskQueue.put(compactionTask);
@@ -324,6 +327,10 @@ public class CompactionTaskManager implements IService {
return getExecutingTaskCount() + candidateCompactionTaskQueue.size();
}
+ public int getCompactionCandidateTaskCount() {
+ return candidateCompactionTaskQueue.size();
+ }
+
public synchronized List<AbstractCompactionTask> getRunningCompactionTaskList() {
List<AbstractCompactionTask> tasks = new ArrayList<>();
for (Map<AbstractCompactionTask, Future<CompactionTaskSummary>> runningTaskMap :
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/comparator/DefaultCompactionTaskComparatorImpl.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/comparator/DefaultCompactionTaskComparatorImpl.java
index 334acae481..d2113a9796 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/comparator/DefaultCompactionTaskComparatorImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/comparator/DefaultCompactionTaskComparatorImpl.java
@@ -37,13 +37,21 @@ public class DefaultCompactionTaskComparatorImpl implements ICompactionTaskCompa
if ((((o1 instanceof InnerSpaceCompactionTask) && (o2 instanceof CrossSpaceCompactionTask))
|| ((o2 instanceof InnerSpaceCompactionTask)
&& (o1 instanceof CrossSpaceCompactionTask)))) {
- if (config.getCompactionPriority() != CompactionPriority.CROSS_INNER) {
+ if (config.getCompactionPriority() == CompactionPriority.CROSS_INNER) {
+ // priority is CROSS_INNER
+ return o1 instanceof CrossSpaceCompactionTask ? -1 : 1;
+ } else if (config.getCompactionPriority() == CompactionPriority.INNER_CROSS) {
+ // priority is INNER_CROSS
return o1 instanceof InnerSpaceCompactionTask ? -1 : 1;
} else {
- return o1 instanceof CrossSpaceCompactionTask ? -1 : 1;
+ // priority is BALANCE
+ if (o1.getSerialId() != o2.getSerialId()) {
+ return o1.getSerialId() < o2.getSerialId() ? -1 : 1;
+ } else {
+ return o1 instanceof CrossSpaceCompactionTask ? -1 : 1;
+ }
}
- }
- if (o1 instanceof InnerSpaceCompactionTask) {
+ } else if (o1 instanceof InnerSpaceCompactionTask) {
return compareInnerSpaceCompactionTask(
(InnerSpaceCompactionTask) o1, (InnerSpaceCompactionTask) o2);
} else {
@@ -54,11 +62,6 @@ public class DefaultCompactionTaskComparatorImpl implements ICompactionTaskCompa
public int compareInnerSpaceCompactionTask(
InnerSpaceCompactionTask o1, InnerSpaceCompactionTask o2) {
- if (o1.isSequence() ^ o2.isSequence()) {
- // prioritize sequence file compaction
- return o1.isSequence() ? -1 : 1;
- }
-
// if the sum of compaction count of the selected files are different
// we prefer to execute task with smaller compaction count
// this can reduce write amplification
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/CrossSpaceCompactionCandidate.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/CrossSpaceCompactionCandidate.java
index d815967fa2..11a782e3bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/CrossSpaceCompactionCandidate.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/CrossSpaceCompactionCandidate.java
@@ -139,11 +139,9 @@ public class CrossSpaceCompactionCandidate {
private List<TsFileResourceCandidate> filterUnseqResource(List<TsFileResource> unseqResources) {
List<TsFileResourceCandidate> ret = new ArrayList<>();
for (TsFileResource resource : unseqResources) {
- if (resource.getStatus() != TsFileResourceStatus.CLOSED
- || !resource.getTsFile().exists()
- || resource.isDeleted()) {
+ if (resource.getStatus() != TsFileResourceStatus.CLOSED || !resource.getTsFile().exists()) {
break;
- } else if (!resource.isDeleted() && resource.stillLives(ttlLowerBound)) {
+ } else if (resource.stillLives(ttlLowerBound)) {
ret.add(new TsFileResourceCandidate(resource));
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
index 9298f3bb5c..1a57d4ef0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
@@ -46,17 +46,16 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- private final int SELECT_WARN_THRESHOLD = 10;
protected String logicalStorageGroupName;
protected String dataRegionId;
protected long timePartition;
protected TsFileManager tsFileManager;
- private long totalCost;
+ private static boolean hasPrintedLog = false;
+
private final long memoryBudget;
private final int maxCrossCompactionFileNum;
private final long maxCrossCompactionFileSize;
- private int seqSelectedNum;
private AbstractCompactionEstimator compactionEstimator;
@@ -109,21 +108,13 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
*/
private CrossCompactionTaskResource selectOneTaskResources(
CrossSpaceCompactionCandidate candidate) throws MergeException {
- long startTime = System.currentTimeMillis();
try {
LOGGER.debug(
"Selecting cross compaction task resources from {} seqFile, {} unseqFiles",
candidate.getSeqFiles().size(),
candidate.getUnseqFiles().size());
CrossCompactionTaskResource taskResource = executeTaskResourceSelection(candidate);
- LOGGER.info(
- "selected one cross compaction task resource. is valid: {}, {} seqFiles, {} unseqFiles, total memory cost {}, "
- + "time consumption {}ms",
- taskResource.isValid(),
- taskResource.getSeqFiles().size(),
- taskResource.getUnseqFiles().size(),
- taskResource.getTotalMemoryCost(),
- System.currentTimeMillis() - startTime);
+
return taskResource;
} catch (IOException e) {
throw new MergeException(e);
@@ -173,7 +164,7 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
unseqFile,
targetSeqFiles,
memoryCost,
- totalCost);
+ taskResource.getTotalMemoryCost());
}
taskResource.sortSeqFiles(candidate.getSeqFiles());
return taskResource;
@@ -214,9 +205,12 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
return false;
}
- private boolean canSubmitCrossTask() {
- return config.isEnableCrossSpaceCompaction()
- && (CompactionTaskManager.currentTaskNum.get() < config.getCompactionThreadCount());
+ private boolean canSubmitCrossTask(
+ List<TsFileResource> sequenceFileList, List<TsFileResource> unsequenceFileList) {
+ return CompactionTaskManager.getInstance().getCompactionCandidateTaskCount()
+ < config.getCandidateCompactionTaskQueueSize()
+ && !sequenceFileList.isEmpty()
+ && !unsequenceFileList.isEmpty();
}
/**
@@ -230,13 +224,12 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
@Override
public List<CrossCompactionTaskResource> selectCrossSpaceTask(
List<TsFileResource> sequenceFileList, List<TsFileResource> unsequenceFileList) {
- if (!canSubmitCrossTask()) {
- return Collections.emptyList();
- }
- if (sequenceFileList.isEmpty() || unsequenceFileList.isEmpty()) {
+ if (!canSubmitCrossTask(sequenceFileList, unsequenceFileList)) {
return Collections.emptyList();
}
+
// TODO: (xingtanzjr) need to confirm what this ttl is used for
+ long startTime = System.currentTimeMillis();
long ttlLowerBound = System.currentTimeMillis() - Long.MAX_VALUE;
// we record the variable `candidate` here is used for selecting more than one
// CrossCompactionTaskResources in this method
@@ -245,17 +238,30 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
try {
CrossCompactionTaskResource taskResources = selectOneTaskResources(candidate);
if (!taskResources.isValid()) {
- LOGGER.info(
- "{} [Compaction] Cannot select any files, because source files may be occupied by other compaction threads.",
- logicalStorageGroupName + "-" + dataRegionId);
+ if (!hasPrintedLog) {
+ LOGGER.info(
+ "{} [Compaction] Total source files: {} seqFiles, {} unseqFiles. Candidate source files: {} seqFiles, {} unseqFiles. Cannot select any files because they do not meet the conditions or may be occupied by other compaction threads.",
+ logicalStorageGroupName + "-" + dataRegionId,
+ sequenceFileList.size(),
+ unsequenceFileList.size(),
+ candidate.getSeqFiles().size(),
+ candidate.getUnseqFiles().size());
+ hasPrintedLog = true;
+ }
return Collections.emptyList();
}
-
LOGGER.info(
- "{} [Compaction] submit a task with {} sequence file and {} unseq files",
+ "{} [Compaction] Total source files: {} seqFiles, {} unseqFiles. Candidate source files: {} seqFiles, {} unseqFiles. Selected source files: {} seqFiles, {} unseqFiles, total memory cost {}, time consumption {}ms.",
logicalStorageGroupName + "-" + dataRegionId,
+ sequenceFileList.size(),
+ unsequenceFileList.size(),
+ candidate.getSeqFiles().size(),
+ candidate.getUnseqFiles().size(),
taskResources.getSeqFiles().size(),
- taskResources.getUnseqFiles().size());
+ taskResources.getUnseqFiles().size(),
+ taskResources.getTotalMemoryCost(),
+ System.currentTimeMillis() - startTime);
+ hasPrintedLog = false;
return Collections.singletonList(taskResources);
} catch (MergeException e) {
@@ -263,9 +269,4 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector
}
return Collections.emptyList();
}
-
- @Override
- public List<Long> getCompactionMemoryCost() {
- return Collections.singletonList(totalCost);
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
index e4b7111505..c61a46485b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
@@ -105,8 +105,11 @@ public class SizeTieredCompactionSelector
TsFileNameGenerator.TsFileName currentName =
TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());
if (currentName.getInnerCompactionCnt() != level) {
+ // meet files of another level
if (selectedFileList.size() > 1) {
- taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
+ if (!addOneTaskToQueue(taskPriorityQueue, selectedFileList, selectedFileSize)) {
+ return false;
+ }
shouldContinueToSearch = false;
}
selectedFileList = new ArrayList<>();
@@ -131,7 +134,9 @@ public class SizeTieredCompactionSelector
|| selectedFileList.size() >= config.getMaxInnerCompactionCandidateFileNum()) {
// submit the task
if (selectedFileList.size() > 1) {
- taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
+ if (!addOneTaskToQueue(taskPriorityQueue, selectedFileList, selectedFileSize)) {
+ return false;
+ }
shouldContinueToSearch = false;
}
selectedFileList = new ArrayList<>();
@@ -142,12 +147,25 @@ public class SizeTieredCompactionSelector
// if next time partition exists
// submit a merge task even it does not meet the requirement for file num or file size
if (hasNextTimePartition && selectedFileList.size() > 1) {
- taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
+ addOneTaskToQueue(taskPriorityQueue, selectedFileList, selectedFileSize);
shouldContinueToSearch = false;
}
return shouldContinueToSearch;
}
+ private boolean addOneTaskToQueue(
+ PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue,
+ List<TsFileResource> selectedFileList,
+ long selectedFileSize) {
+ if (CompactionTaskManager.getInstance().getCompactionCandidateTaskCount()
+ + taskPriorityQueue.size()
+ < config.getCandidateCompactionTaskQueueSize()) {
+ taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
+ return true;
+ }
+ return false;
+ }
+
/**
* This method searches for a batch of files to be compacted from layer 0 to the highest layer. If
* there are more than a batch of files to be merged on a certain layer, it does not search to
@@ -207,7 +225,7 @@ public class SizeTieredCompactionSelector
if (fileNameOfO1.getInnerCompactionCnt() != fileNameOfO2.getInnerCompactionCnt()) {
return fileNameOfO2.getInnerCompactionCnt() - fileNameOfO1.getInnerCompactionCnt();
}
- return (int) (fileNameOfO2.getVersion() - fileNameOfO1.getVersion());
+ return (int) (fileNameOfO1.getVersion() - fileNameOfO2.getVersion());
} catch (IOException e) {
return 0;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
index d8ebdf1121..de737cafbe 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
@@ -86,7 +86,8 @@ public class CompactionSchedulerTest {
CompactionClearUtils.clearAllCompactionFiles();
EnvironmentUtils.cleanAllDir();
File basicOutputDir = new File(TestConstant.BASE_OUTPUT_PATH);
- IoTDBDescriptor.getInstance().getConfig().setCompactionPriority(CompactionPriority.INNER_CROSS);
+
+ IoTDBDescriptor.getInstance().getConfig().setCompactionPriority(CompactionPriority.BALANCE);
if (!basicOutputDir.exists()) {
assertTrue(basicOutputDir.mkdirs());
}
@@ -112,13 +113,13 @@ public class CompactionSchedulerTest {
@After
public void tearDown() throws IOException, StorageEngineException {
+ CompactionTaskManager.getInstance().stop();
new CompactionConfigRestorer().restoreCompactionConfig();
ChunkCache.getInstance().clear();
TimeSeriesMetadataCache.getInstance().clear();
CompactionClearUtils.clearAllCompactionFiles();
EnvironmentUtils.cleanAllDir();
CompactionClearUtils.deleteEmptyDir(new File("target"));
- CompactionTaskManager.getInstance().stop();
}
/**
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerWithFastPerformerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerWithFastPerformerTest.java
index 15fa53426e..15dd291aba 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerWithFastPerformerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerWithFastPerformerTest.java
@@ -58,7 +58,8 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class CompactionSchedulerWithFastPerformerTest {
- private static final Logger logger = LoggerFactory.getLogger(CompactionSchedulerTest.class);
+ private static final Logger logger =
+ LoggerFactory.getLogger(CompactionSchedulerWithFastPerformerTest.class);
static final String COMPACTION_TEST_SG = "root.compactionSchedulerTest_";
static final long MAX_WAITING_TIME = 60_000;
static final long SCHEDULE_AGAIN_TIME = 30_000;
@@ -86,7 +87,8 @@ public class CompactionSchedulerWithFastPerformerTest {
CompactionClearUtils.clearAllCompactionFiles();
EnvironmentUtils.cleanAllDir();
File basicOutputDir = new File(TestConstant.BASE_OUTPUT_PATH);
- IoTDBDescriptor.getInstance().getConfig().setCompactionPriority(CompactionPriority.INNER_CROSS);
+
+ IoTDBDescriptor.getInstance().getConfig().setCompactionPriority(CompactionPriority.BALANCE);
if (!basicOutputDir.exists()) {
assertTrue(basicOutputDir.mkdirs());
}
@@ -112,13 +114,13 @@ public class CompactionSchedulerWithFastPerformerTest {
@After
public void tearDown() throws IOException, StorageEngineException {
+ CompactionTaskManager.getInstance().stop();
new CompactionConfigRestorer().restoreCompactionConfig();
ChunkCache.getInstance().clear();
TimeSeriesMetadataCache.getInstance().clear();
CompactionClearUtils.clearAllCompactionFiles();
EnvironmentUtils.cleanAllDir();
CompactionClearUtils.deleteEmptyDir(new File("target"));
- CompactionTaskManager.getInstance().stop();
}
/**
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
index 66f272bf0a..99a7e98190 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
@@ -94,7 +94,7 @@ public class InnerCompactionSchedulerTest extends AbstractCompactionTest {
TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp");
tsFileManager.addAll(seqResources, true);
- CompactionScheduler.tryToSubmitInnerSpaceCompactionTask("testSG", "0", 0L, tsFileManager, true);
+ CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L);
try {
Thread.sleep(5000);
} catch (Exception e) {
@@ -115,7 +115,7 @@ public class InnerCompactionSchedulerTest extends AbstractCompactionTest {
seqResources.get(0).setStatus(TsFileResourceStatus.COMPACTING);
TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp");
tsFileManager.addAll(seqResources, true);
- CompactionScheduler.tryToSubmitInnerSpaceCompactionTask("testSG", "0", 0L, tsFileManager, true);
+ CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L);
long waitingTime = 0;
while (CompactionTaskManager.getInstance().getExecutingTaskCount() != 0) {
@@ -144,7 +144,7 @@ public class InnerCompactionSchedulerTest extends AbstractCompactionTest {
seqResources.get(3).setStatus(TsFileResourceStatus.UNCLOSED);
TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp");
tsFileManager.addAll(seqResources, true);
- CompactionScheduler.tryToSubmitInnerSpaceCompactionTask("testSG", "0", 0L, tsFileManager, true);
+ CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(tsFileManager, 0L);
long waitingTime = 0;
while (CompactionTaskManager.getInstance().getExecutingTaskCount() != 0) {
try {