You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/12/17 09:51:52 UTC
[iotdb] 01/02: tmp save
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch compaction_refine
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit edc7244f43d7eda4e369ea7cc2b01c223be3c902
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Sat Dec 17 00:23:56 2022 +0800
tmp save
---
.../inner/sizetiered/InnerCompactionCandidate.java | 52 +++++++++++
.../sizetiered/SizeTieredCompactionSelector.java | 103 +++++++++++----------
2 files changed, 106 insertions(+), 49 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/InnerCompactionCandidate.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/InnerCompactionCandidate.java
new file mode 100644
index 0000000000..4ccd475175
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/InnerCompactionCandidate.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.compaction.inner.sizetiered;
+
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class InnerCompactionCandidate {
+ private List<TsFileResource> tsFileResources;
+ private long totalFileSize;
+
+ public InnerCompactionCandidate() {
+ this.tsFileResources = new ArrayList<>();
+ this.totalFileSize = 0L;
+ }
+
+ public void addTsFileResource(TsFileResource resource) {
+ this.tsFileResources.add(resource);
+ totalFileSize += resource.getTsFileSize();
+ }
+
+ public int getFileCount() {
+ return tsFileResources.size();
+ }
+
+ public long getTotalFileSize() {
+ return totalFileSize;
+ }
+
+ public List<TsFileResource> getTsFileResources() {
+ return tsFileResources;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
index d8430029c9..f8d60a2fe1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
@@ -59,7 +59,6 @@ public class SizeTieredCompactionSelector
protected String storageGroupName;
protected String dataRegionId;
protected long timePartition;
- protected List<TsFileResource> tsFileResources;
protected boolean sequence;
protected TsFileManager tsFileManager;
protected boolean hasNextTimePartition;
@@ -87,64 +86,54 @@ public class SizeTieredCompactionSelector
* longer search for higher layers), otherwise it will return true.
*
* @param level the level to be searched
- * @param taskPriorityQueue it stores the batches of files to be compacted and the total size of
* each batch
* @return return whether to continue the search to higher levels
* @throws IOException
*/
- private boolean selectLevelTask(
- int level, PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue)
+ private List<InnerCompactionCandidate> selectLevelTask(List<TsFileResource> tsFileResources, int level)
throws IOException {
- boolean shouldContinueToSearch = true;
- List<TsFileResource> selectedFileList = new ArrayList<>();
- long selectedFileSize = 0L;
- long targetCompactionFileSize = config.getTargetCompactionFileSize();
-
+ List<InnerCompactionCandidate> result = new ArrayList<>();
+ //sort TsFileResources by inner level in ascending
+// tsFileResources.sort(new TsFileResourceInnerLevelComparator());
+ InnerCompactionCandidate candidate = new InnerCompactionCandidate();
for (TsFileResource currentFile : tsFileResources) {
- TsFileNameGenerator.TsFileName currentName =
- TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());
- if (currentName.getInnerCompactionCnt() != level) {
- if (selectedFileList.size() > 1) {
- taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
- shouldContinueToSearch = false;
- }
- selectedFileList = new ArrayList<>();
- selectedFileSize = 0L;
+ if (tsFileShouldBeSkipped(currentFile, level)) {
continue;
}
+ // 为什么 ?
if (currentFile.getStatus() != TsFileResourceStatus.CLOSED) {
- selectedFileList.clear();
- selectedFileSize = 0L;
+ candidate = new InnerCompactionCandidate();
continue;
}
- LOGGER.debug("Current File is {}, size is {}", currentFile, currentFile.getTsFileSize());
- selectedFileList.add(currentFile);
- selectedFileSize += currentFile.getTsFileSize();
- LOGGER.debug(
- "Add tsfile {}, current select file num is {}, size is {}",
- currentFile,
- selectedFileList.size(),
- selectedFileSize);
+ LOGGER.debug("file added. File is {}, size is {}", currentFile, currentFile.getTsFileSize());
+ candidate.addTsFileResource(currentFile);
+
// if the file size or file num reach threshold
- if (selectedFileSize >= targetCompactionFileSize
- || selectedFileList.size() >= config.getMaxInnerCompactionCandidateFileNum()) {
+ if (candidateSatisfied(candidate)) {
// submit the task
- if (selectedFileList.size() > 1) {
- taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
- shouldContinueToSearch = false;
+ if (candidate.getFileCount() > 1) {
+ result.add(candidate);
}
- selectedFileList = new ArrayList<>();
- selectedFileSize = 0L;
+ candidate = new InnerCompactionCandidate();
}
}
// if next time partition exists
// submit a merge task even it does not meet the requirement for file num or file size
- if (hasNextTimePartition && selectedFileList.size() > 1) {
- taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), selectedFileSize));
- shouldContinueToSearch = false;
+ if (hasNextTimePartition && candidate.getTotalFileSize() > 1) {
+ result.add(candidate);
}
- return shouldContinueToSearch;
+ return result;
+ }
+
+ private boolean candidateSatisfied(InnerCompactionCandidate candidate) {
+ return candidate.getFileCount() >= config.getTargetCompactionFileSize() || candidate.getTotalFileSize() >= config.getMaxInnerCompactionCandidateFileNum();
+ }
+
+ private boolean tsFileShouldBeSkipped(TsFileResource tsFileResource, int level) throws IOException {
+ TsFileNameGenerator.TsFileName currentFileName =
+ TsFileNameGenerator.getTsFileName(tsFileResource.getTsFile().getName());
+ return currentFileName.getInnerCompactionCnt() != level;
}
/**
@@ -157,19 +146,20 @@ public class SizeTieredCompactionSelector
*/
@Override
public List<List<TsFileResource>> selectInnerSpaceTask(List<TsFileResource> tsFileResources) {
- this.tsFileResources = tsFileResources;
- PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue =
+ PriorityQueue<InnerCompactionCandidate> taskPriorityQueue =
new PriorityQueue<>(new SizeTieredCompactionTaskComparator());
try {
- int maxLevel = searchMaxFileLevel();
+ int maxLevel = searchMaxFileLevel(tsFileResources);
for (int currentLevel = 0; currentLevel <= maxLevel; currentLevel++) {
- if (!selectLevelTask(currentLevel, taskPriorityQueue)) {
+ List<InnerCompactionCandidate> candidates = selectLevelTask(tsFileResources, currentLevel);
+ if (candidates.size() > 0) {
+ taskPriorityQueue.addAll(candidates);
break;
}
}
List<List<TsFileResource>> taskList = new LinkedList<>();
while (taskPriorityQueue.size() > 0) {
- List<TsFileResource> resources = taskPriorityQueue.poll().left;
+ List<TsFileResource> resources = taskPriorityQueue.poll().getTsFileResources();
taskList.add(resources);
}
return taskList;
@@ -179,7 +169,7 @@ public class SizeTieredCompactionSelector
return Collections.emptyList();
}
- private int searchMaxFileLevel() throws IOException {
+ private int searchMaxFileLevel(List<TsFileResource> tsFileResources) throws IOException {
int maxLevel = -1;
for (TsFileResource currentFile : tsFileResources) {
TsFileNameGenerator.TsFileName currentName =
@@ -192,12 +182,12 @@ public class SizeTieredCompactionSelector
}
private class SizeTieredCompactionTaskComparator
- implements Comparator<Pair<List<TsFileResource>, Long>> {
+ implements Comparator<InnerCompactionCandidate> {
@Override
- public int compare(Pair<List<TsFileResource>, Long> o1, Pair<List<TsFileResource>, Long> o2) {
- TsFileResource resourceOfO1 = o1.left.get(0);
- TsFileResource resourceOfO2 = o2.left.get(0);
+ public int compare(InnerCompactionCandidate o1, InnerCompactionCandidate o2) {
+ TsFileResource resourceOfO1 = o1.getTsFileResources().get(0);
+ TsFileResource resourceOfO2 = o2.getTsFileResources().get(0);
try {
TsFileNameGenerator.TsFileName fileNameOfO1 =
TsFileNameGenerator.getTsFileName(resourceOfO1.getTsFile().getName());
@@ -212,4 +202,19 @@ public class SizeTieredCompactionSelector
}
}
}
+
+ private class TsFileResourceInnerLevelComparator implements Comparator<TsFileResource> {
+ @Override
+ public int compare(TsFileResource o1, TsFileResource o2) {
+ try {
+ TsFileNameGenerator.TsFileName o1Name =
+ TsFileNameGenerator.getTsFileName(o1.getTsFile().getName());
+ TsFileNameGenerator.TsFileName o2Name =
+ TsFileNameGenerator.getTsFileName(o2.getTsFile().getName());
+ return o1Name.getInnerCompactionCnt() - o2Name.getInnerCompactionCnt();
+ } catch (IOException e) {
+ return 0;
+ }
+ }
+ }
}