You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ej...@apache.org on 2021/02/21 13:40:20 UTC
[iotdb] branch proceeding_vldb updated: finish tired compaction
This is an automated email from the ASF dual-hosted git repository.
ejttianyu pushed a commit to branch proceeding_vldb
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/proceeding_vldb by this push:
new 327e1da finish tired compaction
327e1da is described below
commit 327e1da96d61f26859b744dbbc140f53bf9be0aa
Author: EJTTianyu <16...@qq.com>
AuthorDate: Sun Feb 21 21:39:48 2021 +0800
finish tired compaction
---
.../tired/TiredCompactionTsFileManagement.java | 15 +++++--
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 51 ++++++++++++++++++----
2 files changed, 54 insertions(+), 12 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/tired/TiredCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/tired/TiredCompactionTsFileManagement.java
index f2f688b..e9146b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/tired/TiredCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/tired/TiredCompactionTsFileManagement.java
@@ -38,6 +38,8 @@ import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
+import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.engine.compaction.TsFileManagement;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.metadata.MManager;
@@ -45,6 +47,7 @@ import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
import org.apache.iotdb.db.utils.MergeUtils;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
@@ -343,9 +346,9 @@ public class TiredCompactionTsFileManagement extends TsFileManagement {
@Override
protected void merge(long timePartition) {
- if (isCompactionWorking()) {
- return;
- }
+// if (isCompactionWorking()) {
+// return;
+// }
Map<Long, Map<Long, List<TsFileResource>>> selectFiles = selectMergeFile(timePartition);
mergeFiles(selectFiles, timePartition);
}
@@ -444,7 +447,7 @@ public class TiredCompactionTsFileManagement extends TsFileManagement {
newResource.setHistoricalVersions(historicalVersions);
newResource.serialize();
newFileWriter.endFile();
-
+ newResource.close();
cleanUp(resources, newResource, mergedLevel, timePartition, parentPath);
} catch (Exception e) {
//TODO do nothing
@@ -477,6 +480,10 @@ public class TiredCompactionTsFileManagement extends TsFileManagement {
}
for (TsFileResource deleteRes : res.getValue()) {
deleteRes.delete();
+ ChunkMetadataCache.getInstance().clear();
+ TimeSeriesMetadataCache.getInstance().clear();
+ FileReaderManager.getInstance().closeFileAndRemoveReader(deleteRes.getTsFilePath());
+ deleteRes.delete();
}
}
File oldFile = newTsResource.getTsFile();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 65fdbb0..28d7cbf 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -697,15 +697,21 @@ public class PlanExecutor implements IPlanExecutor {
// 为了 vldb 投稿, 根据文件 version 加载数据文件
// 数据文件夹格式 sg -- seq
// -- unseq
- loadVldbDataDir(file, plan);
+ if (IoTDBDescriptor.getInstance().getConfig().isOverlapSplit()) {
+ loadVldbDataDir(file, plan);
+ } else {
+ loadVldbDataDirNoSplit(file, plan);
+ }
} else {
loadFile(file, plan);
}
}
+ // 用于处理顺序,乱序分离的情况
private void loadVldbDataDir(File curFile, OperateFilePlan plan) throws QueryProcessException{
File[] files = curFile.listFiles();
List<File> fileList = new ArrayList<>();
+ List<File> unseqTmpList = new ArrayList<>();
Map<Boolean, List<File>> fileBySeq = new HashMap<>();
fileBySeq.put(true, new ArrayList<>());
fileBySeq.put(false, new ArrayList<>());
@@ -718,23 +724,31 @@ public class PlanExecutor implements IPlanExecutor {
}
for (File file : files[1].listFiles()) {
if (file.getName().endsWith(TSFILE_SUFFIX)) {
- fileList.add(file);
+ unseqTmpList.add(file);
fileBySeq.get(false).add(file);
}
}
- File maxFile = null;
- if (!fileBySeq.get(true).isEmpty()) {
- maxFile = fileList.remove(fileList.size() - 1);
- }
Collections.sort(fileList, new Comparator<File>() {
@Override
public int compare(File o1, File o2) {
return TsFileManagement.compareFileName(o1, o2);
}
});
- if (!fileBySeq.get(true).isEmpty()) {
- fileList.add(maxFile);
+ Collections.sort(unseqTmpList, new Comparator<File>() {
+ @Override
+ public int compare(File o1, File o2) {
+ return TsFileManagement.compareFileName(o1, o2);
+ }
+ });
+ for (int i = 1; i < unseqTmpList.size(); i++){
+ for (int j =0; j < fileList.size() ; j++){
+ if (TsFileManagement.compareFileName(unseqTmpList.get(i), fileList.get(j)) == -1){
+ fileList.add(j - 1, unseqTmpList.get(i - 1));
+ break;
+ }
+ }
}
+ fileList.add(unseqTmpList.get(unseqTmpList.size() - 1));
for (File file : fileList) {
if (fileBySeq.get(true).contains(file)) {
loadFileBySeq(file, true, plan);
@@ -744,6 +758,27 @@ public class PlanExecutor implements IPlanExecutor {
}
}
+ // 用于处理顺序,乱序不分离的情况
+ private void loadVldbDataDirNoSplit(File curFile, OperateFilePlan plan)
+ throws QueryProcessException {
+ File[] files = curFile.listFiles();
+ List<File> fileList = new ArrayList<>();
+ for (File file : files[1].listFiles()) {
+ if (file.getName().endsWith(TSFILE_SUFFIX)) {
+ fileList.add(file);
+ }
+ }
+ Collections.sort(fileList, new Comparator<File>() {
+ @Override
+ public int compare(File o1, File o2) {
+ return TsFileManagement.compareFileName(o1, o2);
+ }
+ });
+ for (File file : fileList) {
+ loadFileBySeq(file, false, plan);
+ }
+ }
+
public static void main(String[] args) throws Exception {
new PlanExecutor().loadVldbDataDir(new File(
"/Users/tianyu/2019秋季学期/iotdb/server/target/iotdb-server-0.11.2/data/data/sequence/root.group_1"),