You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/12/03 07:41:49 UTC

[iotdb] branch master updated: [IOTDB-2024] Add annotation to compaction (#4523)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7eca660  [IOTDB-2024] Add annotation to compaction (#4523)
7eca660 is described below

commit 7eca6605bdf59fe9798d0da8d0ae67b903088b9c
Author: 周沛辰 <45...@users.noreply.github.com>
AuthorDate: Fri Dec 3 15:41:07 2021 +0800

    [IOTDB-2024] Add annotation to compaction (#4523)
---
 .../engine/compaction/CompactionTaskManager.java   | 16 ++++++++++++++-
 .../cross/inplace/InplaceCompactionSelector.java   |  8 ++++++++
 .../inplace/selector/MaxFileMergeFileSelector.java | 20 +++++++++++++++++++
 .../sizetiered/SizeTieredCompactionSelector.java   | 23 +++++++++++++++++++++-
 4 files changed, 65 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index 959b9fc..8eac781 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -52,16 +52,25 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class CompactionTaskManager implements IService {
   private static final Logger logger = LoggerFactory.getLogger("COMPACTION");
   private static final CompactionTaskManager INSTANCE = new CompactionTaskManager();
+
+  // The thread pool that executes the compaction task. The default number of threads for this pool
+  // is 10.
   private WrappedScheduledExecutorService taskExecutionPool;
   public static volatile AtomicInteger currentTaskNum = new AtomicInteger(0);
-  // TODO: record the task in time partition
   private MinMaxPriorityQueue<AbstractCompactionTask> candidateCompactionTaskQueue =
       MinMaxPriorityQueue.orderedBy(new CompactionTaskComparator()).maximumSize(1000).create();
+  // <logicalStorageGroupName,futureSet>, it is used to terminate all compaction tasks under the
+  // logicalStorageGroup
   private Map<String, Set<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
   private Map<String, Map<Long, Set<Future<Void>>>> compactionTaskFutures =
       new ConcurrentHashMap<>();
   private List<AbstractCompactionTask> runningCompactionTaskList = new ArrayList<>();
+
+  // The thread pool that periodically fetches and executes the compaction task from
+  // candidateCompactionTaskQueue to taskExecutionPool. The default number of threads for this pool
+  // is 1.
   private ScheduledExecutorService compactionTaskSubmissionThreadPool;
+
   private final long TASK_SUBMIT_INTERVAL =
       IoTDBDescriptor.getInstance().getConfig().getCompactionSubmissionInterval();
 
@@ -81,6 +90,11 @@ public class CompactionTaskManager implements IService {
       currentTaskNum = new AtomicInteger(0);
       compactionTaskSubmissionThreadPool =
           IoTDBThreadPoolFactory.newScheduledThreadPool(1, ThreadName.COMPACTION_SERVICE.getName());
+
+      // Periodically do the following: fetch the highest priority thread from the
+      // candidateCompactionTaskQueue, check that all tsfiles in the compaction task are valid, and
+      // if there is thread space available in the taskExecutionPool, put the compaction task thread
+      // into the taskExecutionPool and perform the compaction.
       compactionTaskSubmissionThreadPool.scheduleWithFixedDelay(
           this::submitTaskFromTaskQueue,
           TASK_SUBMIT_INTERVAL,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/InplaceCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/InplaceCompactionSelector.java
index dcc494c..6190fb4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/InplaceCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/InplaceCompactionSelector.java
@@ -62,6 +62,14 @@ public class InplaceCompactionSelector extends AbstractCrossSpaceCompactionSelec
         taskFactory);
   }
 
+  /**
+   * This method creates a specific file selector according to the file selection strategy of
+   * crossSpace compaction, uses the file selector to select all unseqFiles and seqFiles to be
+   * compacted under the time partition of the virtual storage group, and creates a compaction task
+   * for them. The task is put into the compactionTaskQueue of the {@link CompactionTaskManager}.
+   *
+   * @return Returns whether the file was found and submits the merge task
+   */
   @Override
   public boolean selectAndSubmit() {
     boolean taskSubmitted = false;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/selector/MaxFileMergeFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/selector/MaxFileMergeFileSelector.java
index bda4cc4..0d4df93 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/selector/MaxFileMergeFileSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/selector/MaxFileMergeFileSelector.java
@@ -130,6 +130,17 @@ public class MaxFileMergeFileSelector implements ICrossSpaceMergeFileSelector {
     return new List[] {selectedSeqFiles, selectedUnseqFiles};
   }
 
+  /**
+   * In a preset time (30 seconds), for each unseqFile, find the list of seqFiles that overlap with
+   * it and have not been selected by the file selector of this compaction task. After finding each
+   * unseqFile and its corresponding overlap seqFile list, estimate the additional memory overhead
+   * that may be added by compacting them (preferably using the loop estimate), and if it does not
+   * exceed the memory overhead preset by the system for the compaction thread, put them into the
+   * selectedSeqFiles and selectedUnseqFiles.
+   *
+   * @param useTightBound whether is tight estimate or loop estimate
+   * @throws IOException
+   */
   void select(boolean useTightBound) throws IOException {
     tmpSelectedSeqFiles = new HashSet<>();
     seqSelected = new boolean[resource.getSeqFiles().size()];
@@ -221,6 +232,15 @@ public class MaxFileMergeFileSelector implements ICrossSpaceMergeFileSelector {
     return isClosedAndNotMerging;
   }
 
+  /**
+   * Put the index of the seqFile that has an overlap with the specific unseqFile and has not been
+   * selected by the file selector of the compaction task into the tmpSelectedSeqFiles list. To
+   * determine whether overlap exists is to traverse each device ChunkGroup in unseqFiles, and
+   * determine whether it overlaps with the same device ChunkGroup of each seqFile that are not
+   * selected by the compaction task, if so, select this seqFile.
+   *
+   * @param unseqFile the tsFileResource of unseqFile to be compacted
+   */
   private void selectOverlappedSeqFiles(TsFileResource unseqFile) {
     int tmpSelectedNum = 0;
     for (String deviceId : unseqFile.getDevices()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
index db665bd..0786503 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
@@ -72,6 +72,14 @@ public class SizeTieredCompactionSelector extends AbstractInnerSpaceCompactionSe
         taskFactory);
   }
 
+  /**
+   * This method searches for a batch of files to be compacted from layer 0 to the highest layer. If
+   * there are more than a batch of files to be merged on a certain layer, it does not search to
+   * higher layers. It creates a compaction thread for each batch of files and put it into the
+   * candidateCompactionTaskQueue of the {@link CompactionTaskManager}.
+   *
+   * @return Returns whether the file was found and submits the merge task
+   */
   @Override
   public boolean selectAndSubmit() {
     LOGGER.debug(
@@ -105,6 +113,20 @@ public class SizeTieredCompactionSelector extends AbstractInnerSpaceCompactionSe
     return true;
   }
 
+  /**
+   * This method searches for all files on the given level. If there are consecutive files on the
+   * level that meet the system preset conditions (the number exceeds 10 or the total file size
+   * exceeds 2G), a compaction task is created for the batch of files and placed in the
+   * taskPriorityQueue queue , and continue to search for the next batch. If at least one batch of
+   * files to be compacted is found on this layer, it will return false (indicating that it will no
+   * longer search for higher layers), otherwise it will return true.
+   *
+   * @param level the level to be searched
+   * @param taskPriorityQueue it stores the batches of files to be compacted and the total size of
+   *     each batch
+   * @return return whether to continue the search to higher levels
+   * @throws IOException
+   */
   private boolean selectLevelTask(
       int level, PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue)
       throws IOException {
@@ -113,7 +135,6 @@ public class SizeTieredCompactionSelector extends AbstractInnerSpaceCompactionSe
     long selectedFileSize = 0L;
     long targetCompactionFileSize = config.getTargetCompactionFileSize();
 
-    // this iterator traverses the list in reverse order
     for (TsFileResource currentFile : tsFileResources) {
       TsFileNameGenerator.TsFileName currentName =
           TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());