You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/12/03 07:41:49 UTC
[iotdb] branch master updated: [IOTDB-2024] Add annotation to compaction (#4523)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7eca660 [IOTDB-2024] Add annotation to compaction (#4523)
7eca660 is described below
commit 7eca6605bdf59fe9798d0da8d0ae67b903088b9c
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Fri Dec 3 15:41:07 2021 +0800
[IOTDB-2024] Add annotation to compaction (#4523)
---
.../engine/compaction/CompactionTaskManager.java | 16 ++++++++++++++-
.../cross/inplace/InplaceCompactionSelector.java | 8 ++++++++
.../inplace/selector/MaxFileMergeFileSelector.java | 20 +++++++++++++++++++
.../sizetiered/SizeTieredCompactionSelector.java | 23 +++++++++++++++++++++-
4 files changed, 65 insertions(+), 2 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index 959b9fc..8eac781 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -52,16 +52,25 @@ import java.util.concurrent.atomic.AtomicInteger;
public class CompactionTaskManager implements IService {
private static final Logger logger = LoggerFactory.getLogger("COMPACTION");
private static final CompactionTaskManager INSTANCE = new CompactionTaskManager();
+
+ // The thread pool that executes the compaction task. The default number of threads for this pool
+ // is 10.
private WrappedScheduledExecutorService taskExecutionPool;
public static volatile AtomicInteger currentTaskNum = new AtomicInteger(0);
- // TODO: record the task in time partition
private MinMaxPriorityQueue<AbstractCompactionTask> candidateCompactionTaskQueue =
MinMaxPriorityQueue.orderedBy(new CompactionTaskComparator()).maximumSize(1000).create();
+ // <logicalStorageGroupName,futureSet>, it is used to terminate all compaction tasks under the
+ // logicalStorageGroup
private Map<String, Set<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
private Map<String, Map<Long, Set<Future<Void>>>> compactionTaskFutures =
new ConcurrentHashMap<>();
private List<AbstractCompactionTask> runningCompactionTaskList = new ArrayList<>();
+
+ // The thread pool that periodically fetches and executes the compaction task from
+ // candidateCompactionTaskQueue to taskExecutionPool. The default number of threads for this pool
+ // is 1.
private ScheduledExecutorService compactionTaskSubmissionThreadPool;
+
private final long TASK_SUBMIT_INTERVAL =
IoTDBDescriptor.getInstance().getConfig().getCompactionSubmissionInterval();
@@ -81,6 +90,11 @@ public class CompactionTaskManager implements IService {
currentTaskNum = new AtomicInteger(0);
compactionTaskSubmissionThreadPool =
IoTDBThreadPoolFactory.newScheduledThreadPool(1, ThreadName.COMPACTION_SERVICE.getName());
+
+ // Periodically do the following: fetch the highest priority thread from the
+ // candidateCompactionTaskQueue, check that all tsfiles in the compaction task are valid, and
+ // if there is thread space available in the taskExecutionPool, put the compaction task thread
+ // into the taskExecutionPool and perform the compaction.
compactionTaskSubmissionThreadPool.scheduleWithFixedDelay(
this::submitTaskFromTaskQueue,
TASK_SUBMIT_INTERVAL,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/InplaceCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/InplaceCompactionSelector.java
index dcc494c..6190fb4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/InplaceCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/InplaceCompactionSelector.java
@@ -62,6 +62,14 @@ public class InplaceCompactionSelector extends AbstractCrossSpaceCompactionSelec
taskFactory);
}
+ /**
+ * This method creates a specific file selector according to the file selection strategy of
+ * crossSpace compaction, uses the file selector to select all unseqFiles and seqFiles to be
+ * compacted under the time partition of the virtual storage group, and creates a compaction task
+ * for them. The task is put into the compactionTaskQueue of the {@link CompactionTaskManager}.
+ *
+ * @return Returns whether the file was found and submits the merge task
+ */
@Override
public boolean selectAndSubmit() {
boolean taskSubmitted = false;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/selector/MaxFileMergeFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/selector/MaxFileMergeFileSelector.java
index bda4cc4..0d4df93 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/selector/MaxFileMergeFileSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/selector/MaxFileMergeFileSelector.java
@@ -130,6 +130,17 @@ public class MaxFileMergeFileSelector implements ICrossSpaceMergeFileSelector {
return new List[] {selectedSeqFiles, selectedUnseqFiles};
}
+ /**
+ * In a preset time (30 seconds), for each unseqFile, find the list of seqFiles that overlap with
+ * it and have not been selected by the file selector of this compaction task. After finding each
+ * unseqFile and its corresponding overlap seqFile list, estimate the additional memory overhead
+ * that may be added by compacting them (preferably using the loop estimate), and if it does not
+ * exceed the memory overhead preset by the system for the compaction thread, put them into the
+ * selectedSeqFiles and selectedUnseqFiles.
+ *
+ * @param useTightBound whether is tight estimate or loop estimate
+ * @throws IOException
+ */
void select(boolean useTightBound) throws IOException {
tmpSelectedSeqFiles = new HashSet<>();
seqSelected = new boolean[resource.getSeqFiles().size()];
@@ -221,6 +232,15 @@ public class MaxFileMergeFileSelector implements ICrossSpaceMergeFileSelector {
return isClosedAndNotMerging;
}
+ /**
+ * Put the index of the seqFile that has an overlap with the specific unseqFile and has not been
+ * selected by the file selector of the compaction task into the tmpSelectedSeqFiles list. To
+ * determine whether overlap exists is to traverse each device ChunkGroup in unseqFiles, and
+ * determine whether it overlaps with the same device ChunkGroup of each seqFile that are not
+ * selected by the compaction task, if so, select this seqFile.
+ *
+ * @param unseqFile the tsFileResource of unseqFile to be compacted
+ */
private void selectOverlappedSeqFiles(TsFileResource unseqFile) {
int tmpSelectedNum = 0;
for (String deviceId : unseqFile.getDevices()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
index db665bd..0786503 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
@@ -72,6 +72,14 @@ public class SizeTieredCompactionSelector extends AbstractInnerSpaceCompactionSe
taskFactory);
}
+ /**
+ * This method searches for a batch of files to be compacted from layer 0 to the highest layer. If
+ * there are more than a batch of files to be merged on a certain layer, it does not search to
+ * higher layers. It creates a compaction thread for each batch of files and put it into the
+ * candidateCompactionTaskQueue of the {@link CompactionTaskManager}.
+ *
+ * @return Returns whether the file was found and submits the merge task
+ */
@Override
public boolean selectAndSubmit() {
LOGGER.debug(
@@ -105,6 +113,20 @@ public class SizeTieredCompactionSelector extends AbstractInnerSpaceCompactionSe
return true;
}
+ /**
+ * This method searches for all files on the given level. If there are consecutive files on the
+ * level that meet the system preset conditions (the number exceeds 10 or the total file size
+ * exceeds 2G), a compaction task is created for the batch of files and placed in the
+ * taskPriorityQueue queue , and continue to search for the next batch. If at least one batch of
+ * files to be compacted is found on this layer, it will return false (indicating that it will no
+ * longer search for higher layers), otherwise it will return true.
+ *
+ * @param level the level to be searched
+ * @param taskPriorityQueue it stores the batches of files to be compacted and the total size of
+ * each batch
+ * @return return whether to continue the search to higher levels
+ * @throws IOException
+ */
private boolean selectLevelTask(
int level, PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue)
throws IOException {
@@ -113,7 +135,6 @@ public class SizeTieredCompactionSelector extends AbstractInnerSpaceCompactionSe
long selectedFileSize = 0L;
long targetCompactionFileSize = config.getTargetCompactionFileSize();
- // this iterator traverses the list in reverse order
for (TsFileResource currentFile : tsFileResources) {
TsFileNameGenerator.TsFileName currentName =
TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());