You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/11/03 19:01:03 UTC
[iotdb] branch master updated: remove compaction read limiter
(#1930)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 20d0c49 remove compaction read limiter (#1930)
20d0c49 is described below
commit 20d0c499567130f88c67f7e6afd339b21c61d347
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Wed Nov 4 03:00:54 2020 +0800
remove compaction read limiter (#1930)
---
.../{TsFileManagement.md => Compaction.md} | 49 +++++------
.../resources/conf/iotdb-engine.properties | 67 +++++++--------
.../org/apache/iotdb/db/concurrent/ThreadName.java | 2 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 95 +++++++++-------------
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 19 ++---
.../CompactionMergeTaskPoolManager.java} | 32 ++++----
.../CompactionStrategy.java} | 20 ++---
.../TsFileManagement.java | 41 +++++-----
.../level/LevelCompactionTsFileManagement.java} | 79 ++++++++----------
.../no/NoCompactionTsFileManagement.java} | 10 +--
.../utils/CompactionLogAnalyzer.java} | 18 ++--
.../utils/CompactionLogger.java} | 12 +--
.../utils/CompactionUtils.java} | 54 ++++--------
.../iotdb/db/engine/merge/manage/MergeManager.java | 17 ----
.../db/engine/merge/task/MergeMultiChunkTask.java | 3 +-
.../engine/storagegroup/StorageGroupProcessor.java | 44 +++++-----
.../java/org/apache/iotdb/db/service/IoTDB.java | 4 +-
.../org/apache/iotdb/db/service/ServiceType.java | 2 +-
.../apache/iotdb/db/engine/merge/MergeLogTest.java | 2 +-
.../iotdb/db/engine/merge/MergePerfTest.java | 2 +-
.../iotdb/db/engine/merge/MergeTaskTest.java | 2 +-
.../apache/iotdb/db/engine/merge/MergeTest.java | 6 +-
.../storagegroup/StorageGroupProcessorTest.java | 6 +-
.../db/integration/IoTDBDeleteTimeseriesIT.java | 10 +--
.../db/integration/IoTDBLoadExternalTsfileIT.java | 6 +-
.../iotdb/db/integration/IoTDBMergeTest.java | 10 +--
.../db/integration/IoTDBRemovePartitionIT.java | 2 -
.../iotdb/db/integration/IoTDBSeriesReaderIT.java | 12 +--
28 files changed, 273 insertions(+), 353 deletions(-)
diff --git a/docs/zh/SystemDesign/StorageEngine/TsFileManagement.md b/docs/zh/SystemDesign/StorageEngine/Compaction.md
similarity index 79%
rename from docs/zh/SystemDesign/StorageEngine/TsFileManagement.md
rename to docs/zh/SystemDesign/StorageEngine/Compaction.md
index ee5b80b..3898ff7 100644
--- a/docs/zh/SystemDesign/StorageEngine/TsFileManagement.md
+++ b/docs/zh/SystemDesign/StorageEngine/Compaction.md
@@ -19,22 +19,21 @@
-->
-# TsFileManagement -热合并
+# Compaction -合并
## 整体思路
-现在 iotdb 会因为自动参数优化将很小的数据块写入文件,使得数据文件变得零散化,这导致了用户进行即席查询时需要的读盘次数过多,而后期的合并过程又不是即时进行的,这导致了系统对热数据的查询效率变慢,所以我们借鉴了虚拟内存的思想,在写入流程中增加了热合并过程,通过提高一部分写放大保证了写入到正式文件的数据块不小于给定阈值,提升系统即席查询的效率。将这部分逻辑写到 StorageGroupProcessor 里去,针对封口的 tsfile 文件进行合并。
+现在 iotdb 会因为自动参数优化将很小的数据块写入文件,使得数据文件变得零散化,这导致了用户进行即席查询时需要的读盘次数过多,而后期的合并过程又不是即时进行的,这导致了系统对热数据的查询效率变慢,所以我们借鉴了虚拟内存的思想,在写入流程中增加了合并过程,通过提高一部分写放大保证了写入到正式文件的数据块不小于给定阈值,提升系统即席查询的效率。将这部分逻辑写到 StorageGroupProcessor 里去,针对封口的 tsfile 文件进行合并。
* 配置修改
- * iotdb-engine.properties 加一个参数 tsfile_manage_strategy:表示 tsfile 文件管理策略
- * tsfile_manage_strategy 内置两个策略:LevelStsrategy 和 NormalStrategy
- * LevelStrategy 即开启热合并并使用层级合并方法,NormalStrategy 即关闭热合并
- * iotdb-engine.properties 中加 merge_chunk_point_number:最大的chunk大小限制,LevelStrategy 时,当达到该参数就合并到最后一层
- * iotdb-engine.properties 加一个参数 max_level_num:LevelStrategy 时最大层数
+ * iotdb-engine.properties 加一个参数 compaction_strategy:表示文件合并策略
+ * compaction_strategy 内置两个策略:LEVEL_COMPACTION 和 NO_COMPACTION
+ * LEVEL_COMPACTION 即开启合并并使用层级合并方法,NO_COMPACTION 即关闭合并
+ * iotdb-engine.properties 中加 merge_chunk_point_number:最大的chunk大小限制,LEVEL_COMPACTION 时,当达到该参数就合并到最后一层
+ * iotdb-engine.properties 加一个参数 max_level_num:LEVEL_COMPACTION 时最大层数
* 代码结构修改
- * 新建一个 TsFileManagement 类,专门管理 StorageGroupProcessor 中的 seqFileList 和 unSeqFileList ,在这里写热合并的主体逻辑,对外抽象对seqFileList和unSeqFileList的一系列所需接口
- * 对于普通 merge 会调用 TsFileManagement 的 getStableTsFileList() 来获取准备进行文件合并的文件列表,这里对于 LevelStrategy 来说就是返回第 {max_level_num - 1} 层的文件,对于 NormalStrategy 来说就是返回所有文件列表
- * 每一次热合并会取第一个被合并文件的时间戳作为新文件的时间戳,即 {firstFileTimestamp}-{version}-{mergeVerion + 1}.tsfiles
+ * 新建一个 TsFileManagement 类,专门管理 StorageGroupProcessor 中的 seqFileList 和 unSeqFileList ,在这里写合并的主体逻辑,对外抽象对 seqFileList 和 unSeqFileList 的一系列所需接口
+ * 每一次合并会取第一个被合并文件的时间戳作为新文件的时间戳,即 {firstFileTimestamp}-{version}-{mergeVersion + 1}.tsfile
## TsFileManagement 对外提供的接口和类
@@ -50,33 +49,35 @@
- int size(boolean sequence) 对应的文件列表长度
- void forkCurrentFileList(long timePartition) 保存当前文件的某时间分区列表
- void recover() 调用对应的恢复流程
-- HotCompactionMergeTask 调用对应的异步合并流程,传入 closeUnsealedTsFileProcessorCallBack 以通知外部类合并结束
+- CompactionMergeTask 调用对应的异步合并流程,传入 closeUnsealedTsFileProcessorCallBack 以通知外部类合并结束
-## LevelStrategy merge 流程
+## LEVEL_COMPACTION merge 流程
* 外部调用 forkCurrentFileList(long timePartition) 保存当前文件的某时间分区列表
* 这里选择的文件列表 chunk 点数之和不超出 merge_chunk_point_number
* 外部异步创建并提交 merge 线程
-* 判断是否要进行全局热合并
+* 判断是否要进行乱序合并
+ * 调用原乱序合并逻辑,见 MergeManager 文档
+* 判断是否要进行全局合并
* 生成目标文件 {first_file_name}-{max_level_num - 1}.tsfile
- * 生成合并日志 .hot_compaction.log
+ * 生成合并日志 .compaction.log
* 记录是全局合并
* 记录目标文件
* 进行合并(记录日志 device - offset)
* 记录完成合并
* 循环判断每一层的文件是否要合并到下一层
* 生成目标文件 {first_file_name}-{level + 1}.tsfile
- * 生成合并日志 .hot_compaction.log
+ * 生成合并日志 .compaction.log
* 记录是层级合并
* 记录目标文件
* 进行合并(记录日志 device - offset)
* 记录完成合并
* 加写锁
* 从磁盘删掉待合并的文件,并从正式文件列表中移除
-* 删除合并日志 .hot_compaction.log
+* 删除合并日志 .compaction.log
* 释放写锁
-## LevelStrategy recover 流程
+## LEVEL_COMPACTION recover 流程
* 如果日志文件存在
* 如果是全局合并(把所有小文件合并到最后一层)
@@ -90,12 +91,12 @@
* 如果日志文件不存在
* 无需恢复
-## LevelStrategy 例子
+## LEVEL_COMPACTION 例子
-设置 max_file_num_in_each_level = 3,tsfile_manage_strategy = LevelStrategy, max_level_num = 3,此时文件结构为,第0层、第1层、第2层,其中第2层是不再做热合并的稳定的文件列表
+设置 max_file_num_in_each_level = 3,tsfile_manage_strategy = LevelStrategy, max_level_num = 3,此时文件结构为,第0层、第1层、第2层,其中第2层是不再做合并的稳定的文件列表
### 完全根据 level 合并的情况
-假设此时整个系统中有5个文件,最后一个文件没有关闭,则其结构及顺序分布如下
+假设此时整个系统中有5个文件,最后一个文件没有关闭,则其结构及顺序分布如下
level-0: t2-0 t3-0 t4-0
level-1: t0-1 t1-1
当最后一个文件关闭,按如下方式合并(第0层的t2-0、t3-0、t4-0文件合并到了第1层的t2-1文件)
@@ -144,7 +145,7 @@ level-2: t0-2 t1-2 t2-2 t3-2 t4-2
* 提高 max_level_num
* 此时因为不会改变任何文件的原 level,所以 recover 时文件还会被放到原来的层上,或超出 {max_level_num - 1} 的文件被放在最后一层(考虑到多次调整的情况)
* 即原文件将基于原来的 level 继续合并,超出 {max_level_num - 1} 部分也不会有乱序问题,因为在最后一层的必然是老文件
-假设整个系统中有5个文件,原max_file_num_in_each_level = 2,提高后的max_file_num_in_each_level = 3,此时恢复后的文件结构为:
+假设整个系统中有5个文件,原 max_file_num_in_each_level = 2,提高后的 max_file_num_in_each_level = 3,此时恢复后的文件结构为:
level-0: t2-0 t3-0 t4-0
level-1: t0-1 t1-1
假设 {size(t2-0)+size(t3-0)+size(t4-0)< merge_chunk_point_number},则进行合并的过程如下
@@ -169,7 +170,7 @@ level-2: t0-2
* 降低 max_level_num
* 此时因为不会改变任何文件的原 level,所以 recover 时小于此时 {max_level_num - 1} 的文件还会被放到原来的层上,而超出的文件将被放在最后一层
- * 即部分文件将被继续合并,而超出 {max_level_num - 2} 的文件将不会再被热合并
+ * 即部分文件将被继续合并,而超出 {max_level_num - 2} 的文件将不会再被合并
假设整个系统中有7个文件,原max_file_num_in_each_level = 3,降低后的max_file_num_in_each_level = 2,此时恢复后的文件结构为:
level-0: t4-0 t5-0 t6-0
level-1: t0-2 t1-1 t2-1 t3-1
@@ -197,9 +198,9 @@ level-0:
level-1: t3-1 t4-1
level-2: t0-2
-* NormalStrategy -> LevelStrategy
+* NO_COMPACTION -> LEVEL_COMPACTION
* 此时因为因为删去了原始合并的 {mergeVersion + 1} 策略,所以所有文件将全部被放到0层
- * 每一次热合并会最多取出满足 {merge_chunk_point_number} 的文件进行合并,直到将所有多余的文件热合并完,进入正常的热合并流程
+ * 每一次合并会最多取出满足 {merge_chunk_point_number} 的文件进行合并,直到将所有多余的文件合并完,进入正常的合并流程
假设整个系统中有5个文件,此时恢复后的文件结构为:
level-2: t0-0 t1-0 t2-0 t3-0 t4-0
假设 {size(t0-0)+size(t1-0)>=merge_chunk_point_number},则进行第一次合并的过程如下
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 681475a..48339d8 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -273,39 +273,42 @@ default_fill_interval=-1
####################
### Merge Configurations
####################
-# TsFile manage strategy, define use which hot compaction strategy
-# now we have NORMAL_STRATEGY, LEVEL_STRATEGY
-tsfile_manage_strategy=NORMAL_STRATEGY
+# LEVEL_COMPACTION, NO_COMPACTION
+compaction_strategy=LEVEL_COMPACTION
-# Work when tsfile_manage_strategy is level_strategy.
-# The max seq file num of each level. When file num exceeds this, the files in one level will merge to one.
-seq_file_num_in_each_level=10
+# Works when the compaction_strategy is LEVEL_COMPACTION.
+# The max seq file num of each level.
+# When the num of files in one level exceeds this,
+# the files in this level will merge to one and put to upper level.
+seq_file_num_in_each_level=6
-# Work when tsfile_manage_strategy is level_strategy.
+# Works when the compaction_strategy is LEVEL_COMPACTION.
# The max num of seq level.
seq_level_num=4
-# Work when tsfile_manage_strategy is level_strategy.
-# The max unseq file num of each level. When file num exceeds this, the files in one level will merge to one.
+# Works when compaction_strategy is LEVEL_COMPACTION.
+# The max ujseq file num of each level.
+# When the num of files in one level exceeds this,
+# the files in this level will merge to one and put to upper level.
unseq_file_num_in_each_level=10
-# Work when tsfile_manage_strategy is level_strategy.
+# Works when the compaction_strategy is LEVEL_COMPACTION.
# The max num of unseq level.
-unseq_level_num=2
+unseq_level_num=1
-# Work when tsfile_manage_strategy is level_strategy.
-# When merge point number reaches this, merge the files to the last level.
+# Works when the compaction_strategy is LEVEL_COMPACTION.
+# When the average point number of chunks in the target file reaches this, merge the file to the top level.
+# During a merge, if a chunk with less number of points than this parameter, the chunk will be
+# merged with its succeeding chunks even if it is not overflowed, until the merged chunks reach
+# this threshold and the new chunk will be flushed.
+# When less than 0, this mechanism is disabled.
merge_chunk_point_number=100000
-# Work when tsfile_manage_strategy is level_strategy.
-# When page point number of file reaches this, use append merge instead of deserialize merge.
+# Works when the compaction_strategy is LEVEL_COMPACTION.
+# When point number of a page reaches this, use "append merge" instead of "deserialize merge".
merge_page_point_number=1000
-# How many thread will be set up to perform merge main tasks, 1 by default.
-# Set to 1 when less than or equal to 0.
-merge_thread_num=1
-
-# How many thread will be set up to perform merge chunk sub-tasks, 4 by default.
+# How many threads will be set up to perform unseq merge chunk sub-tasks, 4 by default.
# Set to 1 when less than or equal to 0.
merge_chunk_subthread_num=4
@@ -314,7 +317,7 @@ merge_chunk_subthread_num=4
# When < 0, it means time is unbounded.
merge_fileSelection_time_budget=30000
-# How much memory may be used in ONE merge task (in byte), 20% of maximum JVM memory by default.
+# How much memory may be used in ONE merge task (in byte), 10% of maximum JVM memory by default.
# This is only a rough estimation, starting from a relatively small value to avoid OOM.
# Each new merge thread may take such memory, so merge_thread_num * merge_memory_budget is the
# total memory estimation of merge.
@@ -326,31 +329,17 @@ merge_fileSelection_time_budget=30000
# If you are feeling the rebooting is too slow, set this to false, false by default
continue_merge_after_reboot=false
-# A global merge will be performed each such interval, that is, each storage group will be merged
-# (if proper merge candidates can be found). Unit: second, default: 1hours.
-# When less than or equal to 0, timed merge is disabled.
-merge_interval_sec=0
-
-# When set to true, all merges becomes full merge (the whole SeqFiles are re-written despite how
+# When set to true, all unseq merges becomes full merge (the whole SeqFiles are re-written despite how
# much they are overflowed). This may increase merge overhead depending on how much the SeqFiles
# are overflowed.
force_full_merge=false
-# During a merge, if a chunk with less number of points than this parameter, the chunk will be
-# merged with its succeeding chunks even if it is not overflowed, until the merged chunks reach
-# this threshold and the new chunk will be flushed.
-# When less than 0, this mechanism is disabled.
-chunk_merge_point_threshold=20480
-
-# How many thread will be set up to perform hot compaction, 30 by default.
+# How many threads will be set up to perform compaction, 10 by default.
# Set to 1 when less than or equal to 0.
-hot_compaction_thread_num=30
+compaction_thread_num=10
# The limit of write throughput merge can reach per second
-merge_write_throughput_mb_per_sec=16
-
-# The limit of read throughput merge can reach per second
-merge_read_throughput_mb_per_sec=16
+merge_write_throughput_mb_per_sec=8
####################
### Metadata Cache Configuration
diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index 1ae4a4a..a901ebb 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -34,7 +34,7 @@ public enum ThreadName {
STAT_MONITOR("StatMonitor-ServerServiceImpl"),
FLUSH_SERVICE("Flush-ServerServiceImpl"),
FLUSH_SUB_TASK_SERVICE("Flush-SubTask-ServerServiceImpl"),
- HOT_COMPACTION_SERVICE("HotCompaction-ServerServiceImpl"),
+ COMPACTION_SERVICE("Compaction-ServerServiceImpl"),
WAL_DAEMON("IoTDB-MultiFileLogNodeManager-Sync-Thread"),
WAL_FORCE_DAEMON("IoTDB-MultiFileLogNodeManager-Force-Thread"),
INDEX_SERVICE("Index-ServerServiceImpl"),
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 671dab6..a7edf68 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -25,7 +25,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.merge.selector.MergeFileStrategy;
-import org.apache.iotdb.db.engine.tsfilemanagement.TsFileManagementStrategy;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.exception.LoadConfigurationException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.service.TSServiceImpl;
@@ -289,41 +289,50 @@ public class IoTDBConfig {
/**
* Work when tsfile_manage_strategy is level_strategy. When merge point number reaches this, merge
* the files to the last level.
+ * During a merge, if a chunk with less number of chunks than this parameter, the chunk will be
+ * merged with its succeeding chunks even if it is not overflowed, until the merged chunks reach
+ * this threshold and the new chunk will be flushed.
*/
private int mergeChunkPointNumberThreshold = 100000;
/**
- * Work when tsfile_manage_strategy is level_strategy. When page point number of file reaches
- * this, use append merge instead of deserialize merge.
+ * Works when the compaction_strategy is LEVEL_COMPACTION.
+ * When point number of a page reaches this, use "append merge" instead of "deserialize merge".
*/
private int mergePagePointNumberThreshold = 1000;
/**
- * TsFile manage strategy, define use which hot compaction strategy
+ * LEVEL_COMPACTION, NO_COMPACTION
*/
- private TsFileManagementStrategy tsFileManagementStrategy = TsFileManagementStrategy.NORMAL_STRATEGY;
+ private CompactionStrategy compactionStrategy = CompactionStrategy.LEVEL_COMPACTION;
/**
- * Work when tsfile_manage_strategy is level_strategy. The max seq file num of each level. When
- * file num exceeds this, the files in one level will merge to one.
+ * Works when the compaction_strategy is LEVEL_COMPACTION.
+ * The max seq file num of each level.
+ * When the num of files in one level exceeds this,
+ * the files in this level will merge to one and put to upper level.
*/
- private int seqFileNumInEachLevel = 10;
+ private int seqFileNumInEachLevel = 6;
/**
- * Work when tsfile_manage_strategy is level_strategy. The max num of seq level.
+ * Works when the compaction_strategy is LEVEL_COMPACTION.
+ * The max num of seq level.
*/
private int seqLevelNum = 4;
/**
- * Work when tsfile_manage_strategy is level_strategy. The max unseq file num of each level. When
- * file num exceeds this, the files in one level will merge to one.
+ * Works when compaction_strategy is LEVEL_COMPACTION.
+ * The max ujseq file num of each level.
+ * When the num of files in one level exceeds this,
+ * the files in this level will merge to one and put to upper level.
*/
private int unseqFileNumInEachLevel = 10;
/**
- * Work when tsfile_manage_strategy is level_strategy. The max num of unseq level.
+ * Works when the compaction_strategy is LEVEL_COMPACTION.
+ * The max num of unseq level.
*/
- private int unseqLevelNum = 2;
+ private int unseqLevelNum = 1;
/**
* whether to cache meta data(ChunkMetaData and TsFileMetaData) or not.
@@ -528,7 +537,7 @@ public class IoTDBConfig {
/**
* How much memory (in byte) can be used by a single merge task.
*/
- private long mergeMemoryBudget = (long) (Runtime.getRuntime().maxMemory() * 0.2);
+ private long mergeMemoryBudget = (long) (Runtime.getRuntime().maxMemory() * 0.1);
/**
* How many threads will be set up to perform upgrade tasks.
@@ -541,7 +550,7 @@ public class IoTDBConfig {
private int mergeThreadNum = 1;
/**
- * How many threads will be set up to perform merge chunk sub-tasks.
+ * How many threads will be set up to perform unseq merge chunk sub-tasks.
*/
private int mergeChunkSubThreadNum = 4;
@@ -565,34 +574,22 @@ public class IoTDBConfig {
private long mergeIntervalSec = 0L;
/**
- * When set to true, all merges becomes full merge (the whole SeqFiles are re-written despite how
+ * When set to true, all unseq merges becomes full merge (the whole SeqFiles are re-written despite how
* much they are overflowed). This may increase merge overhead depending on how much the SeqFiles
* are overflowed.
*/
private boolean forceFullMerge = false;
/**
- * During a merge, if a chunk with less number of chunks than this parameter, the chunk will be
- * merged with its succeeding chunks even if it is not overflowed, until the merged chunks reach
- * this threshold and the new chunk will be flushed.
- */
- private int chunkMergePointThreshold = 20480;
-
- /**
- * The limit of hot compaction merge can reach per second
+ * The limit of compaction merge can reach per second
*/
- private int mergeWriteThroughputMbPerSec = 16;
+ private int mergeWriteThroughputMbPerSec = 8;
/**
- * How many thread will be set up to perform hot compaction, 30 by default. Set to 1 when less
+ * How many thread will be set up to perform compaction, 10 by default. Set to 1 when less
* than or equal to 0.
*/
- private int hotCompactionThreadNum = 30;
-
- /**
- * The limit of read throughput merge can reach per second
- */
- private int mergeReadThroughputMbPerSec = 16;
+ private int compactionThreadNum = 10;
private MergeFileStrategy mergeFileStrategy = MergeFileStrategy.MAX_SERIES_NUM;
@@ -1370,20 +1367,12 @@ public class IoTDBConfig {
this.forceFullMerge = forceFullMerge;
}
- public int getChunkMergePointThreshold() {
- return chunkMergePointThreshold;
- }
-
- public void setChunkMergePointThreshold(int chunkMergePointThreshold) {
- this.chunkMergePointThreshold = chunkMergePointThreshold;
- }
-
- public int getHotCompactionThreadNum() {
- return hotCompactionThreadNum;
+ public int getCompactionThreadNum() {
+ return compactionThreadNum;
}
- public void setHotCompactionThreadNum(int hotCompactionThreadNum) {
- this.hotCompactionThreadNum = hotCompactionThreadNum;
+ public void setCompactionThreadNum(int compactionThreadNum) {
+ this.compactionThreadNum = compactionThreadNum;
}
public int getMergeWriteThroughputMbPerSec() {
@@ -1394,14 +1383,6 @@ public class IoTDBConfig {
this.mergeWriteThroughputMbPerSec = mergeWriteThroughputMbPerSec;
}
- public int getMergeReadThroughputMbPerSec() {
- return mergeReadThroughputMbPerSec;
- }
-
- public void setMergeReadThroughputMbPerSec(int mergeReadThroughputMbPerSec) {
- this.mergeReadThroughputMbPerSec = mergeReadThroughputMbPerSec;
- }
-
public boolean isEnableMemControl() {
return enableMemControl;
}
@@ -1452,13 +1433,13 @@ public class IoTDBConfig {
}
- public TsFileManagementStrategy getTsFileManagementStrategy() {
- return tsFileManagementStrategy;
+ public CompactionStrategy getCompactionStrategy() {
+ return compactionStrategy;
}
- public void setTsFileManagementStrategy(
- TsFileManagementStrategy tsFileManagementStrategy) {
- this.tsFileManagementStrategy = tsFileManagementStrategy;
+ public void setCompactionStrategy(
+ CompactionStrategy compactionStrategy) {
+ this.compactionStrategy = compactionStrategy;
}
public int getSeqFileNumInEachLevel() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 74f4371..de14763 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -32,7 +32,7 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
-import org.apache.iotdb.db.engine.tsfilemanagement.TsFileManagementStrategy;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -120,7 +120,7 @@ public class IoTDBDescriptor {
urlString += (File.separatorChar + IoTDBConfig.CONFIG_NAME);
}
- // If the url doesn't start with "file:" or "classpath:", it's provided as a normal path.
+ // If the url doesn't start with "file:" or "classpath:", it's provided as a no path.
// So we need to add it to make it a real URL.
if (!urlString.startsWith("file:") && !urlString.startsWith("classpath:")) {
urlString = "file:" + urlString;
@@ -303,9 +303,9 @@ public class IoTDBDescriptor {
.getProperty("merge_page_point_number",
Integer.toString(conf.getMergePagePointNumberThreshold()))));
- conf.setTsFileManagementStrategy(TsFileManagementStrategy.valueOf(properties
- .getProperty("tsfile_manage_strategy",
- conf.getTsFileManagementStrategy().toString())));
+ conf.setCompactionStrategy(CompactionStrategy.valueOf(properties
+ .getProperty("compaction_strategy",
+ conf.getCompactionStrategy().toString())));
conf.setSeqLevelNum(Integer.parseInt(properties
.getProperty("seq_level_num",
@@ -388,16 +388,11 @@ public class IoTDBDescriptor {
Long.toString(conf.getMergeIntervalSec()))));
conf.setForceFullMerge(Boolean.parseBoolean(properties.getProperty("force_full_merge",
Boolean.toString(conf.isForceFullMerge()))));
- conf.setChunkMergePointThreshold(Integer.parseInt(properties.getProperty(
- "chunk_merge_point_threshold", Integer.toString(conf.getChunkMergePointThreshold()))));
- conf.setHotCompactionThreadNum(Integer.parseInt(properties.getProperty(
- "hot_compaction_thread_num", Integer.toString(conf.getHotCompactionThreadNum()))));
+ conf.setCompactionThreadNum(Integer.parseInt(properties.getProperty(
+ "compaction_thread_num", Integer.toString(conf.getCompactionThreadNum()))));
conf.setMergeWriteThroughputMbPerSec(Integer.parseInt(properties.getProperty(
"merge_write_throughput_mb_per_sec",
Integer.toString(conf.getMergeWriteThroughputMbPerSec()))));
- conf.setMergeReadThroughputMbPerSec(Integer.parseInt(properties.getProperty(
- "merge_read_throughput_mb_per_sec",
- Integer.toString(conf.getMergeReadThroughputMbPerSec()))));
conf.setEnablePartialInsert(
Boolean.parseBoolean(properties.getProperty("enable_partial_insert",
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/HotCompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
similarity index 70%
rename from server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/HotCompactionMergeTaskPoolManager.java
rename to server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index f910c4b..f030300 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/HotCompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.tsfilemanagement;
+package org.apache.iotdb.db.engine.compaction;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
@@ -25,24 +25,24 @@ import java.util.concurrent.TimeUnit;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.tsfilemanagement.TsFileManagement.HotCompactionMergeTask;
+import org.apache.iotdb.db.engine.compaction.TsFileManagement.CompactionMergeTask;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.ServiceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * HotCompactionMergeTaskPoolManager provides a ThreadPool to queue and run all hot compaction
+ * CompactionMergeTaskPoolManager provides a ThreadPool to queue and run all compaction
* tasks.
*/
-public class HotCompactionMergeTaskPoolManager implements IService {
+public class CompactionMergeTaskPoolManager implements IService {
private static final Logger logger = LoggerFactory
- .getLogger(HotCompactionMergeTaskPoolManager.class);
- private static final HotCompactionMergeTaskPoolManager INSTANCE = new HotCompactionMergeTaskPoolManager();
+ .getLogger(CompactionMergeTaskPoolManager.class);
+ private static final CompactionMergeTaskPoolManager INSTANCE = new CompactionMergeTaskPoolManager();
private ExecutorService pool;
- public static HotCompactionMergeTaskPoolManager getInstance() {
+ public static CompactionMergeTaskPoolManager getInstance() {
return INSTANCE;
}
@@ -51,10 +51,10 @@ public class HotCompactionMergeTaskPoolManager implements IService {
if (pool == null) {
this.pool = IoTDBThreadPoolFactory
.newScheduledThreadPool(
- IoTDBDescriptor.getInstance().getConfig().getHotCompactionThreadNum(),
- ThreadName.HOT_COMPACTION_SERVICE.getName());
+ IoTDBDescriptor.getInstance().getConfig().getCompactionThreadNum(),
+ ThreadName.COMPACTION_SERVICE.getName());
}
- logger.info("Hot compaction merge task manager started.");
+ logger.info("Compaction task manager started.");
}
@Override
@@ -81,11 +81,11 @@ public class HotCompactionMergeTaskPoolManager implements IService {
// wait
long time = System.currentTimeMillis() - startTime;
if (time % 60_000 == 0) {
- logger.warn("HotCompactionManager has wait for {} seconds to stop", time / 1000);
+ logger.warn("CompactionManager has wait for {} seconds to stop", time / 1000);
}
}
pool = null;
- logger.info("HotCompactionManager stopped");
+ logger.info("CompactionManager stopped");
}
private void awaitTermination(ExecutorService service, long millseconds) {
@@ -93,7 +93,7 @@ public class HotCompactionMergeTaskPoolManager implements IService {
service.shutdown();
service.awaitTermination(millseconds, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
- logger.warn("HotCompactionThreadPool can not be closed in {} ms", millseconds);
+ logger.warn("CompactionThreadPool can not be closed in {} ms", millseconds);
Thread.currentThread().interrupt();
}
service.shutdownNow();
@@ -101,13 +101,13 @@ public class HotCompactionMergeTaskPoolManager implements IService {
@Override
public ServiceType getID() {
- return ServiceType.HOT_COMPACTION_SERVICE;
+ return ServiceType.COMPACTION_SERVICE;
}
- public void submitTask(HotCompactionMergeTask hotCompactionMergeTask)
+ public void submitTask(CompactionMergeTask compactionMergeTask)
throws RejectedExecutionException {
if (pool != null && !pool.isTerminated()) {
- pool.submit(hotCompactionMergeTask);
+ pool.submit(compactionMergeTask);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/TsFileManagementStrategy.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionStrategy.java
similarity index 65%
rename from server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/TsFileManagementStrategy.java
rename to server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionStrategy.java
index fb0e7c4..96ec9f9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/TsFileManagementStrategy.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionStrategy.java
@@ -17,22 +17,22 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.tsfilemanagement;
+package org.apache.iotdb.db.engine.compaction;
-import org.apache.iotdb.db.engine.tsfilemanagement.level.LevelTsFileManagement;
-import org.apache.iotdb.db.engine.tsfilemanagement.normal.NormalTsFileManagement;
+import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
+import org.apache.iotdb.db.engine.compaction.no.NoCompactionTsFileManagement;
-public enum TsFileManagementStrategy {
- LEVEL_STRATEGY,
- NORMAL_STRATEGY;
+public enum CompactionStrategy {
+ LEVEL_COMPACTION,
+ NO_COMPACTION;
public TsFileManagement getTsFileManagement(String storageGroupName, String storageGroupDir) {
switch (this) {
- case LEVEL_STRATEGY:
- return new LevelTsFileManagement(storageGroupName, storageGroupDir);
- case NORMAL_STRATEGY:
+ case LEVEL_COMPACTION:
+ return new LevelCompactionTsFileManagement(storageGroupName, storageGroupDir);
+ case NO_COMPACTION:
default:
- return new NormalTsFileManagement(storageGroupName, storageGroupDir);
+ return new NoCompactionTsFileManagement(storageGroupName, storageGroupDir);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/TsFileManagement.java
rename to server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 9f90f7f..5b98725 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.tsfilemanagement;
+package org.apache.iotdb.db.engine.compaction;
import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
import static org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.MERGING_MODIFICATION_FILE_NAME;
@@ -43,7 +43,7 @@ import org.apache.iotdb.db.engine.merge.selector.MergeFileStrategy;
import org.apache.iotdb.db.engine.merge.task.MergeTask;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
-import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.CloseHotCompactionMergeCallBack;
+import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.CloseCompactionMergeCallBack;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.MergeException;
import org.slf4j.Logger;
@@ -61,10 +61,9 @@ public abstract class TsFileManagement {
*/
public final ReentrantReadWriteLock mergeLock = new ReentrantReadWriteLock();
/**
- * hotCompactionMergeLock is used to wait for TsFile list change in hot compaction merge
- * processor.
+ * compactionMergeLock is used to wait for TsFile list change in compaction processor.
*/
- private final ReadWriteLock hotCompactionMergeLock = new ReentrantReadWriteLock();
+ private final ReadWriteLock compactionMergeLock = new ReentrantReadWriteLock();
public volatile boolean isUnseqMerging = false;
/**
@@ -145,42 +144,42 @@ public abstract class TsFileManagement {
public abstract void forkCurrentFileList(long timePartition) throws IOException;
public void readLock() {
- hotCompactionMergeLock.readLock().lock();
+ compactionMergeLock.readLock().lock();
}
public void readUnLock() {
- hotCompactionMergeLock.readLock().unlock();
+ compactionMergeLock.readLock().unlock();
}
public void writeLock() {
- hotCompactionMergeLock.writeLock().lock();
+ compactionMergeLock.writeLock().lock();
}
public void writeUnlock() {
- hotCompactionMergeLock.writeLock().unlock();
+ compactionMergeLock.writeLock().unlock();
}
public boolean tryWriteLock() {
- return hotCompactionMergeLock.writeLock().tryLock();
+ return compactionMergeLock.writeLock().tryLock();
}
protected abstract void merge(long timePartition);
- public class HotCompactionMergeTask implements Runnable {
+ public class CompactionMergeTask implements Runnable {
- private CloseHotCompactionMergeCallBack closeHotCompactionMergeCallBack;
+ private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
private long timePartitionId;
- public HotCompactionMergeTask(CloseHotCompactionMergeCallBack closeHotCompactionMergeCallBack,
+ public CompactionMergeTask(CloseCompactionMergeCallBack closeCompactionMergeCallBack,
long timePartitionId) {
- this.closeHotCompactionMergeCallBack = closeHotCompactionMergeCallBack;
+ this.closeCompactionMergeCallBack = closeCompactionMergeCallBack;
this.timePartitionId = timePartitionId;
}
@Override
public void run() {
merge(timePartitionId);
- closeHotCompactionMergeCallBack.call();
+ closeCompactionMergeCallBack.call();
}
}
@@ -263,22 +262,22 @@ public abstract class TsFileManagement {
}
/**
- * acquire the write locks of the resource , the merge lock and the hot compaction lock
+ * acquire the write locks of the resource , the merge lock and the compaction lock
*/
private void doubleWriteLock(TsFileResource seqFile) {
boolean fileLockGot;
boolean mergeLockGot;
- boolean hotCompactionLockGot;
+ boolean compactionLockGot;
while (true) {
fileLockGot = seqFile.tryWriteLock();
mergeLockGot = mergeLock.writeLock().tryLock();
- hotCompactionLockGot = tryWriteLock();
+ compactionLockGot = tryWriteLock();
- if (fileLockGot && mergeLockGot && hotCompactionLockGot) {
+ if (fileLockGot && mergeLockGot && compactionLockGot) {
break;
} else {
// did not get all of them, release the gotten one and retry
- if (hotCompactionLockGot) {
+ if (compactionLockGot) {
writeUnlock();
}
if (mergeLockGot) {
@@ -292,7 +291,7 @@ public abstract class TsFileManagement {
}
/**
- * release the write locks of the resource , the merge lock and the hot compaction lock
+ * release the write locks of the resource , the merge lock and the compaction lock
*/
private void doubleWriteUnlock(TsFileResource seqFile) {
writeUnlock();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
similarity index 86%
rename from server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
rename to server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 6c09c1a..979c845 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/level/LevelTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -17,17 +17,15 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.tsfilemanagement.level;
+package org.apache.iotdb.db.engine.compaction.level;
import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
-import static org.apache.iotdb.db.engine.tsfilemanagement.normal.NormalTsFileManagement.compareFileName;
-import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger.HOT_COMPACTION_LOG_NAME;
-import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger.SOURCE_NAME;
-import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger.TARGET_NAME;
+import static org.apache.iotdb.db.engine.compaction.no.NoCompactionTsFileManagement.compareFileName;
+import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPACTION_LOG_NAME;
+import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.SOURCE_NAME;
+import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.TARGET_NAME;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
-import com.clearspring.analytics.stream.cardinality.HyperLogLog;
-import com.clearspring.analytics.stream.cardinality.ICardinality;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
@@ -37,7 +35,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -45,26 +42,22 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.engine.tsfilemanagement.TsFileManagement;
-import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogAnalyzer;
-import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger;
-import org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionUtils;
+import org.apache.iotdb.db.engine.compaction.TsFileManagement;
+import org.apache.iotdb.db.engine.compaction.utils.CompactionLogAnalyzer;
+import org.apache.iotdb.db.engine.compaction.utils.CompactionLogger;
+import org.apache.iotdb.db.engine.compaction.utils.CompactionUtils;
import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
-import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * The TsFileManagement for LEVEL_STRATEGY, use level struct to manage TsFile list
+ * The TsFileManagement for LEVEL_COMPACTION, use level struct to manage TsFile list
*/
-public class LevelTsFileManagement extends TsFileManagement {
+public class LevelCompactionTsFileManagement extends TsFileManagement {
- private static final Logger logger = LoggerFactory.getLogger(LevelTsFileManagement.class);
+ private static final Logger logger = LoggerFactory.getLogger(LevelCompactionTsFileManagement.class);
private final int seqLevelNum = IoTDBDescriptor.getInstance().getConfig().getSeqLevelNum();
private final int seqFileNumInEachLevel = IoTDBDescriptor.getInstance().getConfig()
@@ -82,13 +75,13 @@ public class LevelTsFileManagement extends TsFileManagement {
private final List<List<TsFileResource>> forkedSequenceTsFileResources = new ArrayList<>();
private final List<List<TsFileResource>> forkedUnSequenceTsFileResources = new ArrayList<>();
- public LevelTsFileManagement(String storageGroupName, String storageGroupDir) {
+ public LevelCompactionTsFileManagement(String storageGroupName, String storageGroupDir) {
super(storageGroupName, storageGroupDir);
clear();
}
private void deleteLevelFiles(long timePartitionId, Collection<TsFileResource> mergeTsFiles) {
- logger.debug("{} [hot compaction] merge starts to delete file", storageGroupName);
+ logger.debug("{} [compaction] merge starts to delete file", storageGroupName);
for (TsFileResource mergeTsFile : mergeTsFiles) {
deleteLevelFile(mergeTsFile);
}
@@ -304,10 +297,10 @@ public class LevelTsFileManagement extends TsFileManagement {
@SuppressWarnings("squid:S3776")
public void recover() {
File logFile = FSFactoryProducer.getFSFactory()
- .getFile(storageGroupDir, storageGroupName + HOT_COMPACTION_LOG_NAME);
+ .getFile(storageGroupDir, storageGroupName + COMPACTION_LOG_NAME);
try {
if (logFile.exists()) {
- HotCompactionLogAnalyzer logAnalyzer = new HotCompactionLogAnalyzer(logFile);
+ CompactionLogAnalyzer logAnalyzer = new CompactionLogAnalyzer(logFile);
logAnalyzer.analyze();
Set<String> deviceSet = logAnalyzer.getDeviceSet();
List<File> sourceFileList = logAnalyzer.getSourceFiles();
@@ -326,9 +319,9 @@ public class LevelTsFileManagement extends TsFileManagement {
writer.close();
TsFileResource targetTsFileResource = new TsFileResource(targetFile);
long timePartition = targetTsFileResource.getTimePartition();
- HotCompactionUtils
+ CompactionUtils
.merge(targetTsFileResource, getTsFileList(isSeq), storageGroupName,
- new HotCompactionLogger(storageGroupDir, storageGroupName), deviceSet, isSeq);
+ new CompactionLogger(storageGroupDir, storageGroupName), deviceSet, isSeq);
if (isSeq) {
for (TreeSet<TsFileResource> currMergeFile : sequenceTsFileResources
.get(timePartition)) {
@@ -356,20 +349,20 @@ public class LevelTsFileManagement extends TsFileManagement {
writer.getIOWriterOut().truncate(offset - 1);
writer.close();
if (isSeq) {
- HotCompactionUtils
+ CompactionUtils
.merge(targetResource,
new ArrayList<>(sequenceTsFileResources.get(timePartition).get(level)),
storageGroupName,
- new HotCompactionLogger(storageGroupDir, storageGroupName), deviceSet,
+ new CompactionLogger(storageGroupDir, storageGroupName), deviceSet,
true);
deleteLevelFiles(timePartition,
sequenceTsFileResources.get(timePartition).get(level));
sequenceTsFileResources.get(timePartition).get(level + 1).add(targetResource);
} else {
- HotCompactionUtils
+ CompactionUtils
.merge(targetResource, unSequenceTsFileResources.get(timePartition).get(level),
storageGroupName,
- new HotCompactionLogger(storageGroupDir, storageGroupName), deviceSet,
+ new CompactionLogger(storageGroupDir, storageGroupName), deviceSet,
false);
deleteLevelFiles(timePartition,
unSequenceTsFileResources.get(timePartition).get(level));
@@ -440,8 +433,8 @@ public class LevelTsFileManagement extends TsFileManagement {
long timePartition, int currMaxLevel, int currMaxFileNumInEachLevel) {
long startTimeMillis = System.currentTimeMillis();
try {
- logger.info("{} start to filter hot compaction condition", storageGroupName);
- HotCompactionLogger hotCompactionLogger = new HotCompactionLogger(storageGroupDir,
+ logger.info("{} start to filter compaction condition", storageGroupName);
+ CompactionLogger compactionLogger = new CompactionLogger(storageGroupDir,
storageGroupName);
for (int i = 0; i < currMaxLevel - 1; i++) {
if (currMaxFileNumInEachLevel <= mergeResources.get(i).size()) {
@@ -451,23 +444,23 @@ public class LevelTsFileManagement extends TsFileManagement {
merge(isForceFullMerge, getTsFileList(true), mergeResources.get(i), Long.MAX_VALUE);
} else {
for (TsFileResource mergeResource : mergeResources.get(i)) {
- hotCompactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile());
+ compactionLogger.logFile(SOURCE_NAME, mergeResource.getTsFile());
}
File newLevelFile = createNewTsFileName(mergeResources.get(i).get(0).getTsFile(),
i + 1);
- hotCompactionLogger.logSequence(sequence);
- hotCompactionLogger.logFile(TARGET_NAME, newLevelFile);
- logger.info("{} [Hot Compaction] merge level-{}'s {} tsfiles to next level",
+ compactionLogger.logSequence(sequence);
+ compactionLogger.logFile(TARGET_NAME, newLevelFile);
+ logger.info("{} [Compaction] merge level-{}'s {} tsfiles to next level",
storageGroupName, i, mergeResources.get(i).size());
TsFileResource newResource = new TsFileResource(newLevelFile);
- HotCompactionUtils
- .merge(newResource, mergeResources.get(i), storageGroupName, hotCompactionLogger,
+ CompactionUtils
+ .merge(newResource, mergeResources.get(i), storageGroupName, compactionLogger,
new HashSet<>(), sequence);
writeLock();
try {
deleteLevelFiles(timePartition, mergeResources.get(i));
- hotCompactionLogger.logMergeFinish();
+ compactionLogger.logMergeFinish();
if (sequence) {
sequenceTsFileResources.get(timePartition).get(i + 1).add(newResource);
} else {
@@ -482,24 +475,24 @@ public class LevelTsFileManagement extends TsFileManagement {
}
}
}
- hotCompactionLogger.close();
+ compactionLogger.close();
File logFile = FSFactoryProducer.getFSFactory()
- .getFile(storageGroupDir, storageGroupName + HOT_COMPACTION_LOG_NAME);
+ .getFile(storageGroupDir, storageGroupName + COMPACTION_LOG_NAME);
if (logFile.exists()) {
Files.delete(logFile.toPath());
}
} catch (Exception e) {
- logger.error("Error occurred in Hot Compaction Merge thread", e);
+ logger.error("Error occurred in Compaction Merge thread", e);
} finally {
// reset the merge working state to false
- logger.info("{} [Hot Compaction] merge end time isSeq = {}, consumption: {} ms",
+ logger.info("{} [Compaction] merge end time isSeq = {}, consumption: {} ms",
storageGroupName, sequence,
System.currentTimeMillis() - startTimeMillis);
}
}
/**
- * if level < maxLevel-1, the file need hot compaction else, the file can be merged later
+ * if level < maxLevel-1, the file need compaction else, the file can be merged later
*/
private File createNewTsFileName(File sourceFile, int level) {
String path = sourceFile.getAbsolutePath();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/normal/NormalTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/normal/NormalTsFileManagement.java
rename to server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
index 359a93e..935a5c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/normal/NormalTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.tsfilemanagement.normal;
+package org.apache.iotdb.db.engine.compaction.no;
import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
@@ -28,13 +28,13 @@ import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.engine.tsfilemanagement.TsFileManagement;
+import org.apache.iotdb.db.engine.compaction.TsFileManagement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class NormalTsFileManagement extends TsFileManagement {
+public class NoCompactionTsFileManagement extends TsFileManagement {
- private static final Logger logger = LoggerFactory.getLogger(NormalTsFileManagement.class);
+ private static final Logger logger = LoggerFactory.getLogger(NoCompactionTsFileManagement.class);
// includes sealed and unsealed sequence TsFiles
private TreeSet<TsFileResource> sequenceFileTreeSet = new TreeSet<>(
(o1, o2) -> {
@@ -46,7 +46,7 @@ public class NormalTsFileManagement extends TsFileManagement {
// includes sealed and unsealed unSequence TsFiles
private List<TsFileResource> unSequenceFileList = new ArrayList<>();
- public NormalTsFileManagement(String storageGroupName, String storageGroupDir) {
+ public NoCompactionTsFileManagement(String storageGroupName, String storageGroupDir) {
super(storageGroupName, storageGroupDir);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionLogAnalyzer.java
similarity index 81%
rename from server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionLogAnalyzer.java
rename to server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionLogAnalyzer.java
index f8410a3..34314d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionLogAnalyzer.java
@@ -17,14 +17,14 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.tsfilemanagement.utils;
+package org.apache.iotdb.db.engine.compaction.utils;
-import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger.FULL_MERGE;
-import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger.MERGE_FINISHED;
-import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger.SEQUENCE_NAME;
-import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger.SOURCE_NAME;
-import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger.TARGET_NAME;
-import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogger.UNSEQUENCE_NAME;
+import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.FULL_MERGE;
+import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.MERGE_FINISHED;
+import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.SEQUENCE_NAME;
+import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.SOURCE_NAME;
+import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.TARGET_NAME;
+import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.UNSEQUENCE_NAME;
import java.io.BufferedReader;
import java.io.File;
@@ -35,7 +35,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
-public class HotCompactionLogAnalyzer {
+public class CompactionLogAnalyzer {
public static final String STR_DEVICE_OFFSET_SEPERATOR = " ";
@@ -48,7 +48,7 @@ public class HotCompactionLogAnalyzer {
private boolean isSeq = false;
private boolean fullMerge = false;
- public HotCompactionLogAnalyzer(File logFile) {
+ public CompactionLogAnalyzer(File logFile) {
this.logFile = logFile;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionLogger.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionLogger.java
similarity index 84%
rename from server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionLogger.java
rename to server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionLogger.java
index 55c3b5e..692ef24 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionLogger.java
@@ -17,9 +17,9 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.tsfilemanagement.utils;
+package org.apache.iotdb.db.engine.compaction.utils;
-import static org.apache.iotdb.db.engine.tsfilemanagement.utils.HotCompactionLogAnalyzer.STR_DEVICE_OFFSET_SEPERATOR;
+import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogAnalyzer.STR_DEVICE_OFFSET_SEPERATOR;
import java.io.BufferedWriter;
import java.io.File;
@@ -27,9 +27,9 @@ import java.io.FileWriter;
import java.io.IOException;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
-public class HotCompactionLogger {
+public class CompactionLogger {
- public static final String HOT_COMPACTION_LOG_NAME = ".hot_compaction.log";
+ public static final String COMPACTION_LOG_NAME = ".compaction.log";
public static final String SOURCE_NAME = "source";
public static final String TARGET_NAME = "target";
public static final String SEQUENCE_NAME = "sequence";
@@ -39,10 +39,10 @@ public class HotCompactionLogger {
private BufferedWriter logStream;
- public HotCompactionLogger(String storageGroupDir, String storageGroupName) throws IOException {
+ public CompactionLogger(String storageGroupDir, String storageGroupName) throws IOException {
logStream = new BufferedWriter(
new FileWriter(SystemFileFactory.INSTANCE.getFile(storageGroupDir,
- storageGroupName + HOT_COMPACTION_LOG_NAME), true));
+ storageGroupName + COMPACTION_LOG_NAME), true));
}
public void close() throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
similarity index 85%
rename from server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java
rename to server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 0e5a3bf..c2edcbe 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/tsfilemanagement/utils/HotCompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.tsfilemanagement.utils;
+package org.apache.iotdb.db.engine.compaction.utils;
import static org.apache.iotdb.db.utils.MergeUtils.writeTVPair;
@@ -52,17 +52,17 @@ import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class HotCompactionUtils {
+public class CompactionUtils {
- private static final Logger logger = LoggerFactory.getLogger(HotCompactionUtils.class);
+ private static final Logger logger = LoggerFactory.getLogger(CompactionUtils.class);
private static final int MERGE_PAGE_POINT_NUM = IoTDBDescriptor.getInstance().getConfig()
.getMergePagePointNumberThreshold();
- private HotCompactionUtils() {
+ private CompactionUtils() {
throw new IllegalStateException("Utility class");
}
- private static Pair<ChunkMetadata, Chunk> readByAppendMerge(RateLimiter compactionReadRateLimiter,
+ private static Pair<ChunkMetadata, Chunk> readByAppendMerge(
Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap) throws IOException {
ChunkMetadata newChunkMetadata = null;
Chunk newChunk = null;
@@ -70,8 +70,6 @@ public class HotCompactionUtils {
.entrySet()) {
for (ChunkMetadata chunkMetadata : entry.getValue()) {
Chunk chunk = entry.getKey().readMemChunk(chunkMetadata);
- MergeManager
- .mergeRateLimiterAcquire(compactionReadRateLimiter, chunk.getData().position());
if (newChunkMetadata == null) {
newChunkMetadata = chunkMetadata;
newChunk = chunk;
@@ -84,7 +82,7 @@ public class HotCompactionUtils {
return new Pair<>(newChunkMetadata, newChunk);
}
- private static long readByDeserializeMerge(RateLimiter compactionReadRateLimiter,
+ private static long readByDeserializeMerge(
Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap, long maxVersion,
Map<Long, TimeValuePair> timeValuePairMap)
throws IOException {
@@ -96,29 +94,24 @@ public class HotCompactionUtils {
maxVersion = Math.max(chunkMetadata.getVersion(), maxVersion);
IChunkReader chunkReader = new ChunkReaderByTimestamp(
reader.readMemChunk(chunkMetadata));
- long chunkSize = 0;
while (chunkReader.hasNextSatisfiedPage()) {
IPointReader iPointReader = new BatchDataIterator(
chunkReader.nextPageData());
while (iPointReader.hasNextTimeValuePair()) {
TimeValuePair timeValuePair = iPointReader.nextTimeValuePair();
- chunkSize += timeValuePair.getSize();
timeValuePairMap.put(timeValuePair.getTimestamp(), timeValuePair);
}
}
- MergeManager
- .mergeRateLimiterAcquire(compactionReadRateLimiter, chunkSize);
}
}
return maxVersion;
}
private static long writeByAppendMerge(long maxVersion, String device,
- RateLimiter compactionWriteRateLimiter, RateLimiter compactionReadRateLimiter,
+ RateLimiter compactionWriteRateLimiter,
Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadatasMap,
TsFileResource targetResource, RestorableTsFileIOWriter writer) throws IOException {
- Pair<ChunkMetadata, Chunk> chunkPair = readByAppendMerge(compactionReadRateLimiter,
- readerChunkMetadatasMap);
+ Pair<ChunkMetadata, Chunk> chunkPair = readByAppendMerge(readerChunkMetadatasMap);
ChunkMetadata newChunkMetadata = chunkPair.left;
Chunk newChunk = chunkPair.right;
if (newChunkMetadata != null && newChunk != null) {
@@ -134,12 +127,11 @@ public class HotCompactionUtils {
}
private static long writeByDeserializeMerge(long maxVersion, String device,
- RateLimiter compactionRateLimiter, RateLimiter compactionReadRateLimiter,
+ RateLimiter compactionRateLimiter,
Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry,
TsFileResource targetResource, RestorableTsFileIOWriter writer) throws IOException {
Map<Long, TimeValuePair> timeValuePairMap = new TreeMap<>();
- maxVersion = readByDeserializeMerge(compactionReadRateLimiter, entry.getValue(), maxVersion,
- timeValuePairMap);
+ maxVersion = readByDeserializeMerge(entry.getValue(), maxVersion, timeValuePairMap);
Iterator<List<ChunkMetadata>> chunkMetadataListIterator = entry.getValue().values()
.iterator();
if (!chunkMetadataListIterator.hasNext()) {
@@ -183,18 +175,17 @@ public class HotCompactionUtils {
* @param targetResource the target resource to be merged to
* @param tsFileResources the source resource to be merged
* @param storageGroup the storage group name
- * @param hotCompactionLogger the logger
+ * @param compactionLogger the logger
* @param devices the devices to be skipped(used by recover)
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public static void merge(TsFileResource targetResource,
List<TsFileResource> tsFileResources, String storageGroup,
- HotCompactionLogger hotCompactionLogger,
+ CompactionLogger compactionLogger,
Set<String> devices, boolean sequence) throws IOException {
RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new HashMap<>();
RateLimiter compactionWriteRateLimiter = MergeManager.getINSTANCE().getMergeWriteRateLimiter();
- RateLimiter compactionReadRateLimiter = MergeManager.getINSTANCE().getMergeReadRateLimiter();
Set<String> tsFileDevicesMap = getTsFileDevicesSet(tsFileResources, tsFileSequenceReaderMap,
storageGroup);
for (String device : tsFileDevicesMap) {
@@ -209,10 +200,8 @@ public class HotCompactionUtils {
tsFileSequenceReaderMap, storageGroup);
Map<String, List<ChunkMetadata>> chunkMetadataMap = reader
.readChunkMetadataInDevice(device);
- long chunkMetadataSize = 0;
for (Entry<String, List<ChunkMetadata>> entry : chunkMetadataMap.entrySet()) {
for (ChunkMetadata chunkMetadata : entry.getValue()) {
- chunkMetadataSize += chunkMetadata.getStatistics().calculateRamSize();
Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap;
String measurementUid = chunkMetadata.getMeasurementUid();
if (measurementChunkMetadataMap.containsKey(measurementUid)) {
@@ -232,16 +221,12 @@ public class HotCompactionUtils {
.put(chunkMetadata.getMeasurementUid(), readerChunkMetadataMap);
}
}
- // wait for limit read
- MergeManager
- .mergeRateLimiterAcquire(compactionReadRateLimiter, chunkMetadataSize);
}
if (!sequence) {
long maxVersion = Long.MIN_VALUE;
for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry : measurementChunkMetadataMap
.entrySet()) {
maxVersion = writeByDeserializeMerge(maxVersion, device, compactionWriteRateLimiter,
- compactionReadRateLimiter,
entry,
targetResource, writer);
}
@@ -262,26 +247,23 @@ public class HotCompactionUtils {
}
}
if (isPageEnoughLarge) {
- logger.debug("{} [Hot Compaction] page enough large, use append merge", storageGroup);
+ logger.info("{} [Compaction] page enough large, use append merge", storageGroup);
// append page in chunks, so we do not have to deserialize a chunk
maxVersion = writeByAppendMerge(maxVersion, device, compactionWriteRateLimiter,
- compactionReadRateLimiter,
readerChunkMetadatasMap, targetResource, writer);
} else {
- logger.debug("{} [Hot Compaction] page enough large, use deserialize merge",
- storageGroup);
+ logger
+ .info("{} [Compaction] page enough large, use deserialize merge", storageGroup);
// we have to deserialize chunks to merge pages
maxVersion = writeByDeserializeMerge(maxVersion, device, compactionWriteRateLimiter,
- compactionReadRateLimiter,
- entry,
- targetResource, writer);
+ entry, targetResource, writer);
}
}
writer.endChunkGroup();
writer.writeVersion(maxVersion);
}
- if (hotCompactionLogger != null) {
- hotCompactionLogger.logDevice(device, writer.getPos());
+ if (compactionLogger != null) {
+ compactionLogger.logDevice(device, writer.getPos());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
index 3ba8bed..158bef6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
@@ -61,7 +61,6 @@ public class MergeManager implements IService, MergeManagerMBean {
.format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE,
getID().getJmxName());
private final RateLimiter mergeWriteRateLimiter = RateLimiter.create(Double.MAX_VALUE);
- private final RateLimiter mergeReadRateLimiter = RateLimiter.create(Double.MAX_VALUE);
private AtomicInteger threadCnt = new AtomicInteger();
private ThreadPoolExecutor mergeTaskPool;
@@ -80,11 +79,6 @@ public class MergeManager implements IService, MergeManagerMBean {
return mergeWriteRateLimiter;
}
- public RateLimiter getMergeReadRateLimiter() {
- setReadMergeRate(IoTDBDescriptor.getInstance().getConfig().getMergeReadThroughputMbPerSec());
- return mergeReadRateLimiter;
- }
-
/**
* wait by throughoutMbPerSec limit to avoid continuous Write Or Read
*/
@@ -109,17 +103,6 @@ public class MergeManager implements IService, MergeManagerMBean {
}
}
- private void setReadMergeRate(final double throughoutMbPerSec) {
- double throughout = throughoutMbPerSec * 1024.0 * 1024.0;
- // if throughout = 0, disable rate limiting
- if (throughout == 0) {
- throughout = Double.MAX_VALUE;
- }
- if (mergeReadRateLimiter.getRate() != throughout) {
- mergeReadRateLimiter.setRate(throughout);
- }
- }
-
public static MergeManager getINSTANCE() {
return INSTANCE;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
index 11be896..5d47170 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
@@ -62,7 +62,7 @@ public class MergeMultiChunkTask {
private static final Logger logger = LoggerFactory.getLogger(MergeMultiChunkTask.class);
private static int minChunkPointNum = IoTDBDescriptor.getInstance().getConfig()
- .getChunkMergePointThreshold();
+ .getMergeChunkPointNumberThreshold();
private MergeLogger mergeLogger;
private List<PartialPath> unmergedSeries;
@@ -305,7 +305,6 @@ public class MergeMultiChunkTask {
}
-
/**
* merge a sequence chunk SK
* <p>
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index b5edffd..8673544 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.flush.CloseFileListener;
import org.apache.iotdb.db.engine.flush.FlushListener;
@@ -57,9 +58,8 @@ import org.apache.iotdb.db.engine.merge.task.RecoverMergeTask;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.engine.tsfilemanagement.HotCompactionMergeTaskPoolManager;
-import org.apache.iotdb.db.engine.tsfilemanagement.TsFileManagement;
-import org.apache.iotdb.db.engine.tsfilemanagement.level.LevelTsFileManagement;
+import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
+import org.apache.iotdb.db.engine.compaction.TsFileManagement;
import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.BatchInsertionException;
@@ -164,9 +164,9 @@ public class StorageGroupProcessor {
*/
private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors = new TreeMap<>();
/**
- * hotCompactionMergeWorking is used to wait for last hot compaction to be done.
+ * compactionMergeWorking is used to wait for last compaction to be done.
*/
- private volatile boolean hotCompactionMergeWorking = false;
+ private volatile boolean compactionMergeWorking = false;
// upgrading sequence TsFile resource list
private List<TsFileResource> upgradeSeqFileList = new LinkedList<>();
@@ -269,7 +269,7 @@ public class StorageGroupProcessor {
logger.error("create Storage Group system Directory {} failed",
storageGroupSysDir.getPath());
}
- this.tsFileManagement = IoTDBDescriptor.getInstance().getConfig().getTsFileManagementStrategy()
+ this.tsFileManagement = IoTDBDescriptor.getInstance().getConfig().getCompactionStrategy()
.getTsFileManagement(storageGroupName, storageGroupSysDir.getAbsolutePath());
recover();
@@ -557,7 +557,7 @@ public class StorageGroupProcessor {
private void recoverTsFiles(List<TsFileResource> tsFiles, boolean isSeq) {
for (int i = 0; i < tsFiles.size(); i++) {
- if (LevelTsFileManagement.getMergeLevel(tsFiles.get(i).getTsFile()) > 0) {
+ if (LevelCompactionTsFileManagement.getMergeLevel(tsFiles.get(i).getTsFile()) > 0) {
continue;
}
TsFileResource tsFileResource = tsFiles.get(i);
@@ -1260,11 +1260,11 @@ public class StorageGroupProcessor {
(System.currentTimeMillis() - startTime) / 1000);
}
}
- while (hotCompactionMergeWorking) {
+ while (compactionMergeWorking) {
closeStorageGroupCondition.wait(100);
if (System.currentTimeMillis() - startTime > 60_000) {
logger
- .warn("{} has spent {}s to wait for closing hot compaction.", this.storageGroupName,
+ .warn("{} has spent {}s to wait for closing compaction.", this.storageGroupName,
(System.currentTimeMillis() - startTime) / 1000);
}
}
@@ -1652,23 +1652,23 @@ public class StorageGroupProcessor {
} else {
closingUnSequenceTsFileProcessor.remove(tsFileProcessor);
}
- if (!hotCompactionMergeWorking && !HotCompactionMergeTaskPoolManager.getInstance()
+ if (!compactionMergeWorking && !CompactionMergeTaskPoolManager.getInstance()
.isTerminated()) {
- hotCompactionMergeWorking = true;
- logger.info("{} submit a hot compaction merge task", storageGroupName);
+ compactionMergeWorking = true;
+ logger.info("{} submit a compaction merge task", storageGroupName);
try {
- // fork and filter current tsfile, then commit then to hot compaction merge
+ // fork and filter current tsfile, then commit then to compaction merge
tsFileManagement.forkCurrentFileList(tsFileProcessor.getTimeRangeId());
- HotCompactionMergeTaskPoolManager.getInstance()
+ CompactionMergeTaskPoolManager.getInstance()
.submitTask(
- tsFileManagement.new HotCompactionMergeTask(this::closeHotCompactionMergeCallBack,
+ tsFileManagement.new CompactionMergeTask(this::closeCompactionMergeCallBack,
tsFileProcessor.getTimeRangeId()));
} catch (IOException | RejectedExecutionException e) {
- this.closeHotCompactionMergeCallBack();
- logger.error("{} hot compaction submit task failed", storageGroupName);
+ this.closeCompactionMergeCallBack();
+ logger.error("{} compaction submit task failed", storageGroupName);
}
} else {
- logger.info("{} last hot compaction merge task is working, skip current merge",
+ logger.info("{} last compaction merge task is working, skip current merge",
storageGroupName);
}
synchronized (closeStorageGroupCondition) {
@@ -1678,10 +1678,10 @@ public class StorageGroupProcessor {
}
/**
- * close hot compaction merge callback, to release some locks
+ * close compaction merge callback, to release some locks
*/
- private void closeHotCompactionMergeCallBack() {
- this.hotCompactionMergeWorking = false;
+ private void closeCompactionMergeCallBack() {
+ this.compactionMergeWorking = false;
synchronized (closeStorageGroupCondition) {
closeStorageGroupCondition.notifyAll();
}
@@ -2464,7 +2464,7 @@ public class StorageGroupProcessor {
}
@FunctionalInterface
- public interface CloseHotCompactionMergeCallBack {
+ public interface CloseCompactionMergeCallBack {
void call();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 68af06c..f56e7ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -26,9 +26,9 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.cost.statistic.Measurement;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor;
+import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
import org.apache.iotdb.db.engine.flush.FlushManager;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
-import org.apache.iotdb.db.engine.tsfilemanagement.HotCompactionMergeTaskPoolManager;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.monitor.StatMonitor;
@@ -139,7 +139,7 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(SyncServerManager.getInstance());
registerManager.register(UpgradeSevice.getINSTANCE());
registerManager.register(MergeManager.getINSTANCE());
- registerManager.register(HotCompactionMergeTaskPoolManager.getInstance());
+ registerManager.register(CompactionMergeTaskPoolManager.getInstance());
logger.info("Congratulation, IoTDB is set up successfully. Now, enjoy yourself!");
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index 91ca2a8..0d50afb 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -37,7 +37,7 @@ public enum ServiceType {
SYNC_SERVICE("SYNC ServerService", ""),
UPGRADE_SERVICE("UPGRADE DataService", ""),
MERGE_SERVICE("Merge Manager", "Merge Manager"),
- HOT_COMPACTION_SERVICE("Hot Compaction Manager", "Hot Compaction Manager"),
+ COMPACTION_SERVICE("Compaction Manager", "Compaction Manager"),
PERFORMANCE_STATISTIC_SERVICE("PERFORMANCE_STATISTIC_SERVICE", "PERFORMANCE_STATISTIC_SERVICE"),
TVLIST_ALLOCATOR_SERVICE("TVList Allocator", ""),
CACHE_HIT_RATIO_DISPLAY_SERVICE("CACHE_HIT_RATIO_DISPLAY_SERVICE",
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
index 6633ae6..9456343 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeLogTest.java
@@ -59,7 +59,7 @@ public class MergeLogTest extends MergeTest {
@Test
public void testMergeLog() throws Exception {
- IoTDBDescriptor.getInstance().getConfig().setChunkMergePointThreshold(Integer.MAX_VALUE);
+ IoTDBDescriptor.getInstance().getConfig().setMergeChunkPointNumberThreshold(Integer.MAX_VALUE);
MergeTask mergeTask =
new MergeTask(new MergeResource(seqResources.subList(0, 1), unseqResources.subList(0, 1)),
tempSGDir.getPath(), this::testCallBack, "test", false, 1, MERGE_TEST_SG);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java
index 3fd59ef..11f5f62 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java
@@ -55,7 +55,7 @@ public class MergePerfTest extends MergeTest{
}
public static void main(String[] args) throws Exception {
- IoTDBDescriptor.getInstance().getConfig().setChunkMergePointThreshold(-1);
+ IoTDBDescriptor.getInstance().getConfig().setMergeChunkPointNumberThreshold(-1);
List<Long> timeConsumptions = new ArrayList<>();
MergePerfTest perfTest = new MergePerfTest();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
index 8f4f691..53ca7b1 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
@@ -111,7 +111,7 @@ public class MergeTaskTest extends MergeTest {
@Test
public void testChunkNumThreshold() throws Exception {
- IoTDBDescriptor.getInstance().getConfig().setChunkMergePointThreshold(Integer.MAX_VALUE);
+ IoTDBDescriptor.getInstance().getConfig().setMergeChunkPointNumberThreshold(Integer.MAX_VALUE);
MergeTask mergeTask =
new MergeTask(new MergeResource(seqResources, unseqResources), tempSGDir.getPath(),
(k, v, l) -> {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
index ec0f773..2279ccb 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
@@ -76,8 +76,8 @@ abstract class MergeTest {
public void setUp() throws IOException, WriteProcessException, MetadataException {
IoTDB.metaManager.init();
prevMergeChunkThreshold =
- IoTDBDescriptor.getInstance().getConfig().getChunkMergePointThreshold();
- IoTDBDescriptor.getInstance().getConfig().setChunkMergePointThreshold(-1);
+ IoTDBDescriptor.getInstance().getConfig().getMergeChunkPointNumberThreshold();
+ IoTDBDescriptor.getInstance().getConfig().setMergeChunkPointNumberThreshold(-1);
prepareSeries();
prepareFiles(seqFileNum, unseqFileNum);
MergeManager.getINSTANCE().start();
@@ -88,7 +88,7 @@ abstract class MergeTest {
removeFiles();
seqResources.clear();
unseqResources.clear();
- IoTDBDescriptor.getInstance().getConfig().setChunkMergePointThreshold(prevMergeChunkThreshold);
+ IoTDBDescriptor.getInstance().getConfig().setMergeChunkPointNumberThreshold(prevMergeChunkThreshold);
ChunkCache.getInstance().clear();
ChunkMetadataCache.getInstance().clear();
TimeSeriesMetadataCache.getInstance().clear();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 9d9172a..6e0efbc 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
-import org.apache.iotdb.db.engine.tsfilemanagement.TsFileManagementStrategy;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -66,7 +66,7 @@ public class StorageGroupProcessorTest {
@Before
public void setUp() throws Exception {
IoTDBDescriptor.getInstance().getConfig()
- .setTsFileManagementStrategy(TsFileManagementStrategy.NORMAL_STRATEGY);
+ .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
MetadataManagerHelper.initMetadata();
EnvironmentUtils.envSetUp();
processor = new DummySGP(systemDir, storageGroup);
@@ -81,7 +81,7 @@ public class StorageGroupProcessorTest {
MergeManager.getINSTANCE().stop();
EnvironmentUtils.cleanEnv();
IoTDBDescriptor.getInstance().getConfig()
- .setTsFileManagementStrategy(TsFileManagementStrategy.LEVEL_STRATEGY);
+ .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
}
private void insertToStorageGroupProcessor(TSRecord record)
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBDeleteTimeseriesIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBDeleteTimeseriesIT.java
index de6e438..6b55775 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBDeleteTimeseriesIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBDeleteTimeseriesIT.java
@@ -25,7 +25,7 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Statement;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.tsfilemanagement.TsFileManagementStrategy;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
import org.junit.After;
@@ -36,7 +36,7 @@ import org.junit.Test;
public class IoTDBDeleteTimeseriesIT {
private long memtableSizeThreshold;
- private TsFileManagementStrategy tsFileManagementStrategy;
+ private CompactionStrategy tsFileManagementStrategy;
@Before
public void setUp() throws Exception {
@@ -45,16 +45,16 @@ public class IoTDBDeleteTimeseriesIT {
memtableSizeThreshold = IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold();
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(16);
tsFileManagementStrategy = IoTDBDescriptor.getInstance().getConfig()
- .getTsFileManagementStrategy();
+ .getCompactionStrategy();
IoTDBDescriptor.getInstance().getConfig()
- .setTsFileManagementStrategy(TsFileManagementStrategy.NORMAL_STRATEGY);
+ .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
}
@After
public void tearDown() throws Exception {
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(memtableSizeThreshold);
IoTDBDescriptor.getInstance().getConfig()
- .setTsFileManagementStrategy(tsFileManagementStrategy);
+ .setCompactionStrategy(tsFileManagementStrategy);
EnvironmentUtils.cleanEnv();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
index a2576f3..319fdd1 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsfileIT.java
@@ -37,7 +37,7 @@ import java.util.Set;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.engine.tsfilemanagement.TsFileManagementStrategy;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.PartialPath;
@@ -126,7 +126,7 @@ public class IoTDBLoadExternalTsfileIT {
@Before
public void setUp() throws Exception {
IoTDBDescriptor.getInstance().getConfig()
- .setTsFileManagementStrategy(TsFileManagementStrategy.NORMAL_STRATEGY);
+ .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
EnvironmentUtils.closeStatMonitor();
EnvironmentUtils.envSetUp();
Class.forName(Config.JDBC_DRIVER_NAME);
@@ -137,7 +137,7 @@ public class IoTDBLoadExternalTsfileIT {
public void tearDown() throws Exception {
EnvironmentUtils.cleanEnv();
IoTDBDescriptor.getInstance().getConfig()
- .setTsFileManagementStrategy(TsFileManagementStrategy.LEVEL_STRATEGY);
+ .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
}
@Test
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
index 417eade..9f67377 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
@@ -28,7 +28,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.tsfilemanagement.TsFileManagementStrategy;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
import org.junit.After;
@@ -41,17 +41,17 @@ public class IoTDBMergeTest {
private static final Logger logger = LoggerFactory.getLogger(IoTDBMergeTest.class);
private long prevPartitionInterval;
- private TsFileManagementStrategy prevTsFileManagementStrategy;
+ private CompactionStrategy prevTsFileManagementStrategy;
@Before
public void setUp() throws Exception {
EnvironmentUtils.closeStatMonitor();
prevPartitionInterval = IoTDBDescriptor.getInstance().getConfig().getPartitionInterval();
prevTsFileManagementStrategy = IoTDBDescriptor.getInstance().getConfig()
- .getTsFileManagementStrategy();
+ .getCompactionStrategy();
IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(1);
IoTDBDescriptor.getInstance().getConfig()
- .setTsFileManagementStrategy(TsFileManagementStrategy.NORMAL_STRATEGY);
+ .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
EnvironmentUtils.envSetUp();
Class.forName(Config.JDBC_DRIVER_NAME);
}
@@ -61,7 +61,7 @@ public class IoTDBMergeTest {
EnvironmentUtils.cleanEnv();
IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(prevPartitionInterval);
IoTDBDescriptor.getInstance().getConfig()
- .setTsFileManagementStrategy(prevTsFileManagementStrategy);
+ .setCompactionStrategy(prevTsFileManagementStrategy);
}
@Test
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java
index d03c6fe..860ff45 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRemovePartitionIT.java
@@ -29,9 +29,7 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.tsfilemanagement.TsFileManagementStrategy;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.PartialPath;
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
index a5304df..788b18b 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
@@ -35,7 +35,7 @@ import java.util.Collections;
import java.util.List;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
-import org.apache.iotdb.db.engine.tsfilemanagement.TsFileManagementStrategy;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -86,10 +86,10 @@ public class IoTDBSeriesReaderIT {
tsFileConfig.setPageSizeInByte(1024 * 1024 * 150);
tsFileConfig.setGroupSizeInByte(1024 * 1024 * 150);
prevChunkMergePointThreshold = IoTDBDescriptor.getInstance().getConfig()
- .getChunkMergePointThreshold();
+ .getMergeChunkPointNumberThreshold();
IoTDBDescriptor.getInstance().getConfig()
- .setTsFileManagementStrategy(TsFileManagementStrategy.NORMAL_STRATEGY);
- IoTDBDescriptor.getInstance().getConfig().setChunkMergePointThreshold(Integer.MAX_VALUE);
+ .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+ IoTDBDescriptor.getInstance().getConfig().setMergeChunkPointNumberThreshold(Integer.MAX_VALUE);
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 16);
// test result of IBatchReader should not cross partition
@@ -114,11 +114,11 @@ public class IoTDBSeriesReaderIT {
EnvironmentUtils.cleanEnv();
IoTDBDescriptor.getInstance().getConfig()
- .setTsFileManagementStrategy(TsFileManagementStrategy.LEVEL_STRATEGY);
+ .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(groupSizeInByte);
IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(prevPartitionInterval);
IoTDBDescriptor.getInstance().getConfig()
- .setChunkMergePointThreshold(prevChunkMergePointThreshold);
+ .setMergeChunkPointNumberThreshold(prevChunkMergePointThreshold);
}
private static void insertData() throws ClassNotFoundException {