You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/06/16 07:09:53 UTC
[iotdb] branch new_compaction updated: [To new_compaction] modify
configuration and add resource list (#3413)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch new_compaction
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/new_compaction by this push:
new d2283b2 [To new_compaction] modify configuration and add resource list (#3413)
d2283b2 is described below
commit d2283b29f058639d5383e9b288ab6e6af6359ad9
Author: liuxuxin <37...@users.noreply.github.com>
AuthorDate: Wed Jun 16 15:09:20 2021 +0800
[To new_compaction] modify configuration and add resource list (#3413)
* Modify Configuration
* TsFileResourceList
---
.../resources/conf/iotdb-engine.properties | 81 +++----
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 191 +++++++----------
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 70 +++----
.../org/apache/iotdb/db/engine/StorageEngine.java | 72 +++----
.../db/engine/compaction/CompactionPriority.java | 5 +
...PoolManager.java => CompactionTaskManager.java} | 12 +-
.../db/engine/compaction/TsFileManagement.java | 2 +-
.../level/LevelCompactionTsFileManagement.java | 17 +-
.../engine/flush/pool/FlushSubTaskPoolManager.java | 1 +
.../db/engine/flush/pool/FlushTaskPoolManager.java | 1 +
.../iotdb/db/engine/merge/manage/MergeManager.java | 2 +-
.../engine/storagegroup/StorageGroupProcessor.java | 32 +--
.../db/engine/storagegroup/TsFileResource.java | 6 +
.../db/engine/storagegroup/TsFileResourceList.java | 232 +++++++++++++++++++++
...eGroupManager.java => StorageGroupManager.java} | 9 +-
.../org/apache/iotdb/db/monitor/StatMonitor.java | 8 +-
.../iotdb/db/query/pool/QueryTaskPoolManager.java | 2 +-
.../flush/pool => rescon}/AbstractPoolManager.java | 2 +-
.../java/org/apache/iotdb/db/service/IoTDB.java | 4 +-
.../runtime/WindowEvaluationTaskPoolManager.java | 2 +-
.../iotdb/db/integration/IoTDBRestartIT.java | 4 +-
.../apache/iotdb/db/utils/EnvironmentUtils.java | 4 +-
22 files changed, 461 insertions(+), 298 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 9dd5434..af5db99 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -357,58 +357,37 @@ timestamp_precision=ms
### Query Configurations
####################
-# the default time period that used in fill query, -1 by default means infinite past time, in ms
-# Datatype: int
+# the default time period that used in fill query, -1 by default means infinite past time
+# Datatype: int, Unit: ms
# default_fill_interval=-1
####################
### Merge Configurations
####################
-# LEVEL_COMPACTION, NO_COMPACTION
-# Datatype: CompactionStrategy
-# compaction_strategy=LEVEL_COMPACTION
-
-# Works when the compaction_strategy is LEVEL_COMPACTION.
-# Whether to merge unseq files into seq files or not.
+# sequence space compaction: only compact the sequence files
# Datatype: boolean
-# enable_unseq_compaction=true
+# enable_seq_space_compaction=true
-# Works when the compaction_strategy is LEVEL_COMPACTION.
-# Whether to start next compaction task automatically after finish one compaction task
-# enable_continuous_compaction=true
-
-# Works when the compaction_strategy is LEVEL_COMPACTION.
-# The max seq file num of each level.
-# When the num of files in one level exceeds this,
-# the files in this level will merge to one and put to upper level.
-# Datatype: int
-# seq_file_num_in_each_level=6
-
-# Works when the compaction_strategy is LEVEL_COMPACTION.
-# The max num of seq level.
-# Datatype: int
-# seq_level_num=3
+# unsequence space compaction: only compact the unsequence files
+# Datatype: boolean
+# enable_unseq_space_compaction=true
-# Works when compaction_strategy is LEVEL_COMPACTION.
-# The max unseq file num of each level.
-# When the num of files in one level exceeds this,
-# the files in this level will merge to one and put to upper level.
-# Datatype: int
-# unseq_file_num_in_each_level=10
+# cross space compaction: compact the unsequence files into the overlapped sequence files
+# Datatype: boolean
+# enable_cross_space_compaction=true
-# Works when the compaction_strategy is LEVEL_COMPACTION.
-# The max num of unseq level.
-# Datatype: int
-# unseq_level_num=1
+# The priority of compaction execution
+# inner_cross: prioritize inner space compaction, reduce the number of files first
+# cross_inner: prioritize cross space compaction, eliminate the unsequence files first
+# balance: alternate two compaction types
+# compaction_priority=inner_cross
-# Works when compaction_strategy is LEVEL_COMPACTION.
-# The max open file num in each unseq compaction task.
+# The max open file num in each cross space compaction task.
# We use the unseq file num as the open file num
# This parameters have to be much smaller than the permitted max open file num of each process controlled by operator system(65535 in most system)
# Datatype: int
-# max_open_file_num_in_each_unseq_compaction=2000
+# max_open_file_num_in_cross_space_compaction=2000
-# Works when the compaction_strategy is LEVEL_COMPACTION.
# When the average point number of chunks in the target file reaches this, merge the file to the top level.
# During a merge, if a chunk with less number of points than this parameter, the chunk will be
# merged with its succeeding chunks even if it is not overflowed, until the merged chunks reach
@@ -417,27 +396,26 @@ timestamp_precision=ms
# Datatype: int
# merge_chunk_point_number=100000
-# Works when the compaction_strategy is LEVEL_COMPACTION.
# When point number of a page reaches this, use "append merge" instead of "deserialize merge".
# Datatype: int
# merge_page_point_number=100
-# How many threads will be set up to perform unseq merge chunk sub-tasks, 4 by default.
+# How many threads will be set up to perform cross space compaction chunk sub-tasks, 4 by default.
# Set to 1 when less than or equal to 0.
# Datatype: int
# merge_chunk_subthread_num=4
# If one merge file selection runs for more than this time, it will be ended and its current
-# selection will be used as final selection. Unit: millis.
+# selection will be used as final selection.
# When < 0, it means time is unbounded.
-# Datatype: long
+# Datatype: long, Unit: ms
# merge_fileSelection_time_budget=30000
-# How much memory may be used in ONE merge task (in byte), 10% of maximum JVM memory by default.
+# How much memory may be used in ONE merge task, 10% of maximum JVM memory by default.
# This is only a rough estimation, starting from a relatively small value to avoid OOM.
# Each new merge thread may take such memory, so merge_thread_num * merge_memory_budget is the
# total memory estimation of merge.
-# Datatype: long
+# Datatype: long, Unit: Byte
# merge_memory_budget=268435456
# When set to true, if some crashed merges are detected during system rebooting, such merges will
@@ -447,23 +425,26 @@ timestamp_precision=ms
# Datatype: boolean
# continue_merge_after_reboot=false
-# When set to true, all unseq merges becomes full merge (the whole SeqFiles are re-written despite how
-# much they are overflowed). This may increase merge overhead depending on how much the SeqFiles
-# are overflowed.
+# When set to true, all cross space compaction becomes full merge (the whole SeqFiles are re-written despite how
+# much they are overflowed). This may increase merge overhead depending on how much the SeqFiles are overflowed.
# Datatype: boolean
# force_full_merge=true
# How many threads will be set up to perform compaction, 10 by default.
# Set to 1 when less than or equal to 0.
# Datatype: int
-# compaction_thread_num=10
+# concurrent_compaction_thread=10
+
+# The interval of compaction task submission
+# Datatype: long, Unit: ms
+# compaction_interval=10000
# The limit of write throughput merge can reach per second
# Datatype: int
# merge_write_throughput_mb_per_sec=8
-# The max executing time of query. unit: ms
-# Datatype: int
+# The max executing time of query.
+# Datatype: int, Unit: ms
# query_timeout_threshold=60000
####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 9f11f41..5737e65 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.conf;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
-import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.engine.compaction.CompactionPriority;
import org.apache.iotdb.db.engine.merge.selector.MergeFileStrategy;
import org.apache.iotdb.db.engine.storagegroup.timeindex.TimeIndexLevel;
import org.apache.iotdb.db.exception.LoadConfigurationException;
@@ -299,61 +299,52 @@ public class IoTDBConfig {
private int avgSeriesPointNumberThreshold = 10000;
/**
- * Work when tsfile_manage_strategy is level_strategy. When merge point number reaches this, merge
- * the files to the last level. During a merge, if a chunk with less number of chunks than this
- * parameter, the chunk will be merged with its succeeding chunks even if it is not overflowed,
- * until the merged chunks reach this threshold and the new chunk will be flushed.
+ * Only compact the sequence files
*/
- private int mergeChunkPointNumberThreshold = 100000;
+ private boolean enableSeqSpaceCompaction = true;
/**
- * Works when the compaction_strategy is LEVEL_COMPACTION. When point number of a page reaches
- * this, use "append merge" instead of "deserialize merge".
+ * Only compact the unsequence files
*/
- private int mergePagePointNumberThreshold = 100;
-
- /** LEVEL_COMPACTION, NO_COMPACTION */
- private CompactionStrategy compactionStrategy = CompactionStrategy.LEVEL_COMPACTION;
+ private boolean enableUnseqSpaceCompaction = true;
/**
- * Works when the compaction_strategy is LEVEL_COMPACTION. Whether to merge unseq files into seq
- * files or not.
+ * Compact the unseqence files into the overlapped sequence files
*/
- private boolean enableUnseqCompaction = true;
+ private boolean enableCrossSpaceCompaction = true;
/**
- * Works when the compaction_strategy is LEVEL_COMPACTION. Whether to start next compaction task
- * automatically after finish one compaction task
+ * The priority of compaction task execution. There are three priority strategy
+ * INNER_CROSS: prioritize inner space compaction, reduce the number of files first
+ * CROSS INNER: prioritize cross space compaction, eliminate the unsequence files first
+ * BALANCE: alternate two compaction types
*/
- private boolean enableContinuousCompaction = true;
+ private CompactionPriority compactionPriority = CompactionPriority.INNER_CROSS;
/**
- * Works when the compaction_strategy is LEVEL_COMPACTION. The max seq file num of each level.
- * When the num of files in one level exceeds this, the files in this level will merge to one and
- * put to upper level.
+ * When merge point number reaches this, merge the files to the last level. During a merge,
+ * if a chunk with less number of chunks than this parameter, the chunk will be merged with
+ * its succeeding chunks even if it is not overflowed,
+ * until the merged chunks reach this threshold and the new chunk will be flushed.
*/
- private int seqFileNumInEachLevel = 6;
-
- /** Works when the compaction_strategy is LEVEL_COMPACTION. The max num of seq level. */
- private int seqLevelNum = 3;
+ private int mergeChunkPointNumberThreshold = 100000;
/**
- * Works when compaction_strategy is LEVEL_COMPACTION. The max ujseq file num of each level. When
- * the num of files in one level exceeds this, the files in this level will merge to one and put
- * to upper level.
+ * When point number of a page reaches this, use "append merge" instead of "deserialize merge".
*/
- private int unseqFileNumInEachLevel = 10;
+ private int mergePagePointNumberThreshold = 100;
- /** Works when the compaction_strategy is LEVEL_COMPACTION. The max num of unseq level. */
- private int unseqLevelNum = 1;
+ /**
+ * The interval of compaction task submission in each virtual storage group. The unit is ms.
+ */
+ private long compactionInterval = 10_000L;
/**
- * Works when compaction_strategy is LEVEL_COMPACTION. The max open file num in each unseq
- * compaction task. We use the unseq file num as the open file num # This parameters have to be
- * much smaller than the permitted max open file num of each process controlled by operator
- * system(65535 in most system).
+ * The max open file num in each unseq compaction task. We use the unseq file num as the open
+ * file num # This parameters have to be much smaller than the permitted max open file num of
+ * each process controlled by operator system(65535 in most system).
*/
- private int maxOpenFileNumInEachUnseqCompaction = 2000;
+ private int maxOpenFileNumInCrossSpaceCompaction = 2000;
/** whether to cache meta data(ChunkMetaData and TsFileMetaData) or not. */
private boolean metaDataCacheEnable = true;
@@ -488,10 +479,7 @@ public class IoTDBConfig {
/** How many threads will be set up to perform upgrade tasks. */
private int upgradeThreadNum = 1;
- /** How many threads will be set up to perform main merge tasks. */
- private int mergeThreadNum = 1;
-
- /** How many threads will be set up to perform unseq merge chunk sub-tasks. */
+ /** How many threads will be set up to perform cross space compaction chunk sub-tasks. */
private int mergeChunkSubThreadNum = 4;
/**
@@ -515,7 +503,7 @@ public class IoTDBConfig {
private long mergeIntervalSec = 0L;
/**
- * When set to true, all unseq merges becomes full merge (the whole SeqFiles are re-written
+ * When set to true, all cross space compaction becomes full merge (the whole SeqFiles are re-written
* despite how much they are overflowed). This may increase merge overhead depending on how much
* the SeqFiles are overflowed.
*/
@@ -528,7 +516,7 @@ public class IoTDBConfig {
* How many thread will be set up to perform compaction, 10 by default. Set to 1 when less than or
* equal to 0.
*/
- private int compactionThreadNum = 10;
+ private int concurrentCompactionThread = 10;
private MergeFileStrategy mergeFileStrategy = MergeFileStrategy.MAX_SERIES_NUM;
@@ -1260,14 +1248,6 @@ public class IoTDBConfig {
this.mergeMemoryBudget = mergeMemoryBudget;
}
- public int getMergeThreadNum() {
- return mergeThreadNum;
- }
-
- void setMergeThreadNum(int mergeThreadNum) {
- this.mergeThreadNum = mergeThreadNum;
- }
-
public boolean isContinueMergeAfterReboot() {
return continueMergeAfterReboot;
}
@@ -1412,12 +1392,12 @@ public class IoTDBConfig {
this.forceFullMerge = forceFullMerge;
}
- public int getCompactionThreadNum() {
- return compactionThreadNum;
+ public int getConcurrentCompactionThread() {
+ return concurrentCompactionThread;
}
- public void setCompactionThreadNum(int compactionThreadNum) {
- this.compactionThreadNum = compactionThreadNum;
+ public void setConcurrentCompactionThread(int concurrentCompactionThread) {
+ this.concurrentCompactionThread = concurrentCompactionThread;
}
public int getMergeWriteThroughputMbPerSec() {
@@ -1476,68 +1456,13 @@ public class IoTDBConfig {
this.mergeFileStrategy = mergeFileStrategy;
}
- public CompactionStrategy getCompactionStrategy() {
- return compactionStrategy;
- }
-
- public void setCompactionStrategy(CompactionStrategy compactionStrategy) {
- this.compactionStrategy = compactionStrategy;
- }
-
- public boolean isEnableUnseqCompaction() {
- return enableUnseqCompaction;
- }
-
- public void setEnableUnseqCompaction(boolean enableUnseqCompaction) {
- this.enableUnseqCompaction = enableUnseqCompaction;
- }
-
- public boolean isEnableContinuousCompaction() {
- return enableContinuousCompaction;
- }
-
- public void setEnableContinuousCompaction(boolean enableContinuousCompaction) {
- this.enableContinuousCompaction = enableContinuousCompaction;
- }
-
- public int getSeqFileNumInEachLevel() {
- return seqFileNumInEachLevel;
- }
-
- public void setSeqFileNumInEachLevel(int seqFileNumInEachLevel) {
- this.seqFileNumInEachLevel = seqFileNumInEachLevel;
- }
-
- public int getSeqLevelNum() {
- return seqLevelNum;
- }
-
- public void setSeqLevelNum(int seqLevelNum) {
- this.seqLevelNum = seqLevelNum;
- }
-
- public int getUnseqFileNumInEachLevel() {
- return unseqFileNumInEachLevel;
- }
-
- public void setUnseqFileNumInEachLevel(int unseqFileNumInEachLevel) {
- this.unseqFileNumInEachLevel = unseqFileNumInEachLevel;
- }
-
- public int getUnseqLevelNum() {
- return unseqLevelNum;
- }
-
- public void setUnseqLevelNum(int unseqLevelNum) {
- this.unseqLevelNum = unseqLevelNum;
- }
- public int getMaxOpenFileNumInEachUnseqCompaction() {
- return maxOpenFileNumInEachUnseqCompaction;
+ public int getMaxOpenFileNumInCrossSpaceCompaction() {
+ return maxOpenFileNumInCrossSpaceCompaction;
}
- public void setMaxOpenFileNumInEachUnseqCompaction(int maxOpenFileNumInEachUnseqCompaction) {
- this.maxOpenFileNumInEachUnseqCompaction = maxOpenFileNumInEachUnseqCompaction;
+ public void setMaxOpenFileNumInCrossSpaceCompaction(int maxOpenFileNumInCrossSpaceCompaction) {
+ this.maxOpenFileNumInCrossSpaceCompaction = maxOpenFileNumInCrossSpaceCompaction;
}
public int getMergeChunkSubThreadNum() {
@@ -2184,4 +2109,44 @@ public class IoTDBConfig {
public void setAdminPassword(String adminPassword) {
this.adminPassword = adminPassword;
}
+
+ public boolean isEnableSeqSpaceCompaction() {
+ return enableSeqSpaceCompaction;
+ }
+
+ public void setEnableSeqSpaceCompaction(boolean enableSeqSpaceCompaction) {
+ this.enableSeqSpaceCompaction = enableSeqSpaceCompaction;
+ }
+
+ public boolean isEnableUnseqSpaceCompaction() {
+ return enableUnseqSpaceCompaction;
+ }
+
+ public void setEnableUnseqSpaceCompaction(boolean enableUnseqSpaceCompaction) {
+ this.enableUnseqSpaceCompaction = enableUnseqSpaceCompaction;
+ }
+
+ public boolean isEnableCrossSpaceCompaction() {
+ return enableCrossSpaceCompaction;
+ }
+
+ public void setEnableCrossSpaceCompaction(boolean enableCrossSpaceCompaction) {
+ this.enableCrossSpaceCompaction = enableCrossSpaceCompaction;
+ }
+
+ public CompactionPriority getCompactionPriority() {
+ return compactionPriority;
+ }
+
+ public void setCompactionPriority(CompactionPriority compactionPriority) {
+ this.compactionPriority = compactionPriority;
+ }
+
+ public long getCompactionInterval() {
+ return compactionInterval;
+ }
+
+ public void setCompactionInterval(long compactionInterval) {
+ this.compactionInterval = compactionInterval;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 645a534..3231b94 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.conf;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
-import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.engine.compaction.CompactionPriority;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -311,48 +311,46 @@ public class IoTDBDescriptor {
"merge_page_point_number",
Integer.toString(conf.getMergePagePointNumberThreshold()))));
- conf.setCompactionStrategy(
- CompactionStrategy.valueOf(
+ conf.setCompactionInterval(Long.parseLong(
properties.getProperty(
- "compaction_strategy", conf.getCompactionStrategy().toString())));
+ "compaction_interval",
+ Long.toString(conf.getCompactionInterval())
+ )
+ ));
- conf.setEnableUnseqCompaction(
- Boolean.parseBoolean(
+ conf.setEnableCrossSpaceCompaction(Boolean.parseBoolean(
properties.getProperty(
- "enable_unseq_compaction", Boolean.toString(conf.isEnableUnseqCompaction()))));
+ "enable_cross_space_compaction",
+ Boolean.toString(conf.isEnableCrossSpaceCompaction())
+ )
+ ));
- conf.setEnableContinuousCompaction(
- Boolean.parseBoolean(
+ conf.setEnableSeqSpaceCompaction(Boolean.parseBoolean(
properties.getProperty(
- "enable_continuous_compaction",
- Boolean.toString(conf.isEnableContinuousCompaction()))));
-
- conf.setSeqLevelNum(
- Integer.parseInt(
- properties.getProperty("seq_level_num", Integer.toString(conf.getSeqLevelNum()))));
+ "enable_seq_space_compaction",
+ Boolean.toString(conf.isEnableSeqSpaceCompaction())
+ )
+ ));
- conf.setSeqFileNumInEachLevel(
- Integer.parseInt(
+ conf.setEnableUnseqSpaceCompaction(Boolean.parseBoolean(
properties.getProperty(
- "seq_file_num_in_each_level",
- Integer.toString(conf.getSeqFileNumInEachLevel()))));
+ "enable_unseq_space_compaction",
+ Boolean.toString(conf.isEnableUnseqSpaceCompaction())
+ )
+ ));
- conf.setUnseqLevelNum(
- Integer.parseInt(
+ conf.setCompactionPriority(CompactionPriority.valueOf(
properties.getProperty(
- "unseq_level_num", Integer.toString(conf.getUnseqLevelNum()))));
+ "compaction_priority",
+ conf.getCompactionPriority().toString()
+ )
+ ));
- conf.setMaxOpenFileNumInEachUnseqCompaction(
+ conf.setMaxOpenFileNumInCrossSpaceCompaction(
Integer.parseInt(
properties.getProperty(
"max_open_file_num_in_each_unseq_compaction",
- Integer.toString(conf.getMaxOpenFileNumInEachUnseqCompaction()))));
-
- conf.setUnseqFileNumInEachLevel(
- Integer.parseInt(
- properties.getProperty(
- "unseq_file_num_in_each_level",
- Integer.toString(conf.getUnseqFileNumInEachLevel()))));
+ Integer.toString(conf.getMaxOpenFileNumInCrossSpaceCompaction()))));
conf.setQueryTimeoutThreshold(
Integer.parseInt(
@@ -456,10 +454,6 @@ public class IoTDBDescriptor {
Long.parseLong(
properties.getProperty(
"merge_memory_budget", Long.toString(conf.getMergeMemoryBudget()))));
- conf.setMergeThreadNum(
- Integer.parseInt(
- properties.getProperty(
- "merge_thread_num", Integer.toString(conf.getMergeThreadNum()))));
conf.setMergeChunkSubThreadNum(
Integer.parseInt(
properties.getProperty(
@@ -483,10 +477,10 @@ public class IoTDBDescriptor {
Boolean.parseBoolean(
properties.getProperty(
"force_full_merge", Boolean.toString(conf.isForceFullMerge()))));
- conf.setCompactionThreadNum(
+ conf.setConcurrentCompactionThread(
Integer.parseInt(
properties.getProperty(
- "compaction_thread_num", Integer.toString(conf.getCompactionThreadNum()))));
+ "concurrent_compaction_thread", Integer.toString(conf.getConcurrentCompactionThread()))));
conf.setMergeWriteThroughputMbPerSec(
Integer.parseInt(
properties.getProperty(
@@ -1031,10 +1025,6 @@ public class IoTDBDescriptor {
// update slow_query_threshold
conf.setSlowQueryThreshold(Long.parseLong(properties.getProperty("slow_query_threshold")));
- // update enable_continuous_compaction
- conf.setEnableContinuousCompaction(
- Boolean.parseBoolean(properties.getProperty("enable_continuous_compaction")));
-
// update merge_write_throughput_mb_per_sec
conf.setMergeWriteThroughputMbPerSec(
Integer.parseInt(properties.getProperty("merge_write_throughput_mb_per_sec")));
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index f5d0bff..43c5118 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -32,7 +32,7 @@ import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.engine.storagegroup.virtualSg.VirtualStorageGroupManager;
+import org.apache.iotdb.db.engine.storagegroup.virtualSg.StorageGroupManager;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.exception.ShutdownException;
@@ -116,7 +116,7 @@ public class StorageEngine implements IService {
*/
private final String systemDir;
/** storage group name -> storage group processor */
- private final ConcurrentHashMap<PartialPath, VirtualStorageGroupManager> processorMap =
+ private final ConcurrentHashMap<PartialPath, StorageGroupManager> processorMap =
new ConcurrentHashMap<>();
private AtomicBoolean isAllSgReady = new AtomicBoolean(false);
@@ -273,11 +273,11 @@ public class StorageEngine implements IService {
() -> {
try {
// for recovery in test
- VirtualStorageGroupManager virtualStorageGroupManager =
+ StorageGroupManager storageGroupManager =
processorMap.computeIfAbsent(
- storageGroup.getPartialPath(), id -> new VirtualStorageGroupManager());
+ storageGroup.getPartialPath(), id -> new StorageGroupManager());
- virtualStorageGroupManager.recover(storageGroup);
+ storageGroupManager.recover(storageGroup);
logger.info(
"Storage Group Processor {} is recovered successfully",
@@ -302,7 +302,7 @@ public class StorageEngine implements IService {
private void checkTTL() {
try {
- for (VirtualStorageGroupManager processor : processorMap.values()) {
+ for (StorageGroupManager processor : processorMap.values()) {
processor.checkTTL();
}
} catch (ConcurrentModificationException e) {
@@ -440,17 +440,17 @@ public class StorageEngine implements IService {
private StorageGroupProcessor getStorageGroupProcessorByPath(
PartialPath devicePath, StorageGroupMNode storageGroupMNode)
throws StorageGroupProcessorException, StorageEngineException {
- VirtualStorageGroupManager virtualStorageGroupManager =
+ StorageGroupManager storageGroupManager =
processorMap.get(storageGroupMNode.getPartialPath());
- if (virtualStorageGroupManager == null) {
+ if (storageGroupManager == null) {
// if finish recover
if (isAllSgReady.get()) {
waitAllSgReady(devicePath);
synchronized (storageGroupMNode) {
- virtualStorageGroupManager = processorMap.get(storageGroupMNode.getPartialPath());
- if (virtualStorageGroupManager == null) {
- virtualStorageGroupManager = new VirtualStorageGroupManager();
- processorMap.put(storageGroupMNode.getPartialPath(), virtualStorageGroupManager);
+ storageGroupManager = processorMap.get(storageGroupMNode.getPartialPath());
+ if (storageGroupManager == null) {
+ storageGroupManager = new StorageGroupManager();
+ processorMap.put(storageGroupMNode.getPartialPath(), storageGroupManager);
}
}
} else {
@@ -462,7 +462,7 @@ public class StorageEngine implements IService {
TSStatusCode.STORAGE_GROUP_NOT_READY.getStatusCode());
}
}
- return virtualStorageGroupManager.getProcessor(devicePath, storageGroupMNode);
+ return storageGroupManager.getProcessor(devicePath, storageGroupMNode);
}
/**
@@ -521,8 +521,8 @@ public class StorageEngine implements IService {
/** This function is just for unit test. */
public synchronized void reset() {
- for (VirtualStorageGroupManager virtualStorageGroupManager : processorMap.values()) {
- virtualStorageGroupManager.reset();
+ for (StorageGroupManager storageGroupManager : processorMap.values()) {
+ storageGroupManager.reset();
}
}
@@ -616,12 +616,12 @@ public class StorageEngine implements IService {
}
private void updateMonitorStatistics(
- VirtualStorageGroupManager virtualStorageGroupManager, InsertPlan insertPlan) {
+ StorageGroupManager storageGroupManager, InsertPlan insertPlan) {
StatMonitor monitor = StatMonitor.getInstance();
int successPointsNum =
insertPlan.getMeasurements().length - insertPlan.getFailedMeasurementNumber();
// update to storage group statistics
- virtualStorageGroupManager.updateMonitorSeriesValue(successPointsNum);
+ storageGroupManager.updateMonitorSeriesValue(successPointsNum);
// update to global statistics
monitor.updateStatGlobalValue(successPointsNum);
}
@@ -629,14 +629,14 @@ public class StorageEngine implements IService {
/** flush command Sync asyncCloseOneProcessor all file node processors. */
public void syncCloseAllProcessor() {
logger.info("Start closing all storage group processor");
- for (VirtualStorageGroupManager processor : processorMap.values()) {
+ for (StorageGroupManager processor : processorMap.values()) {
processor.syncCloseAllWorkingTsFileProcessors();
}
}
public void forceCloseAllProcessor() throws TsFileProcessorException {
logger.info("Start force closing all storage group processor");
- for (VirtualStorageGroupManager processor : processorMap.values()) {
+ for (StorageGroupManager processor : processorMap.values()) {
processor.forceCloseAllWorkingTsFileProcessors();
}
}
@@ -647,8 +647,8 @@ public class StorageEngine implements IService {
return;
}
- VirtualStorageGroupManager virtualStorageGroupManager = processorMap.get(storageGroupPath);
- virtualStorageGroupManager.closeStorageGroupProcessor(isSeq, isSync);
+ StorageGroupManager storageGroupManager = processorMap.get(storageGroupPath);
+ storageGroupManager.closeStorageGroupProcessor(isSeq, isSync);
}
/**
@@ -665,8 +665,8 @@ public class StorageEngine implements IService {
throw new StorageGroupNotSetException(storageGroupPath.getFullPath());
}
- VirtualStorageGroupManager virtualStorageGroupManager = processorMap.get(storageGroupPath);
- virtualStorageGroupManager.closeStorageGroupProcessor(partitionId, isSeq, isSync);
+ StorageGroupManager storageGroupManager = processorMap.get(storageGroupPath);
+ storageGroupManager.closeStorageGroupProcessor(partitionId, isSeq, isSync);
}
public void delete(PartialPath path, long startTime, long endTime, long planIndex)
@@ -727,8 +727,8 @@ public class StorageEngine implements IService {
*/
public int countUpgradeFiles() {
int totalUpgradeFileNum = 0;
- for (VirtualStorageGroupManager virtualStorageGroupManager : processorMap.values()) {
- totalUpgradeFileNum += virtualStorageGroupManager.countUpgradeFiles();
+ for (StorageGroupManager storageGroupManager : processorMap.values()) {
+ totalUpgradeFileNum += storageGroupManager.countUpgradeFiles();
}
return totalUpgradeFileNum;
}
@@ -743,8 +743,8 @@ public class StorageEngine implements IService {
throw new StorageEngineException(
"Current system mode is read only, does not support file upgrade");
}
- for (VirtualStorageGroupManager virtualStorageGroupManager : processorMap.values()) {
- virtualStorageGroupManager.upgradeAll();
+ for (StorageGroupManager storageGroupManager : processorMap.values()) {
+ storageGroupManager.upgradeAll();
}
}
@@ -758,8 +758,8 @@ public class StorageEngine implements IService {
throw new StorageEngineException("Current system mode is read only, does not support merge");
}
- for (VirtualStorageGroupManager virtualStorageGroupManager : processorMap.values()) {
- virtualStorageGroupManager.mergeAll(isFullMerge);
+ for (StorageGroupManager storageGroupManager : processorMap.values()) {
+ storageGroupManager.mergeAll(isFullMerge);
}
}
@@ -811,8 +811,8 @@ public class StorageEngine implements IService {
deleteAllDataFilesInOneStorageGroup(storageGroupPath);
releaseWalDirectByteBufferPoolInOneStorageGroup(storageGroupPath);
- VirtualStorageGroupManager virtualStorageGroupManager = processorMap.remove(storageGroupPath);
- virtualStorageGroupManager.deleteStorageGroup(
+ StorageGroupManager storageGroupManager = processorMap.remove(storageGroupPath);
+ storageGroupManager.deleteStorageGroup(
systemDir + File.pathSeparator + storageGroupPath);
}
@@ -866,7 +866,7 @@ public class StorageEngine implements IService {
/** @return TsFiles (seq or unseq) grouped by their storage group and partition number. */
public Map<PartialPath, Map<Long, List<TsFileResource>>> getAllClosedStorageGroupTsFile() {
Map<PartialPath, Map<Long, List<TsFileResource>>> ret = new HashMap<>();
- for (Entry<PartialPath, VirtualStorageGroupManager> entry : processorMap.entrySet()) {
+ for (Entry<PartialPath, StorageGroupManager> entry : processorMap.entrySet()) {
entry.getValue().getAllClosedStorageGroupTsFile(entry.getKey(), ret);
}
return ret;
@@ -878,8 +878,8 @@ public class StorageEngine implements IService {
public boolean isFileAlreadyExist(
TsFileResource tsFileResource, PartialPath storageGroup, long partitionNum) {
- VirtualStorageGroupManager virtualStorageGroupManager = processorMap.get(storageGroup);
- if (virtualStorageGroupManager == null) {
+ StorageGroupManager storageGroupManager = processorMap.get(storageGroup);
+ if (storageGroupManager == null) {
return false;
}
@@ -906,7 +906,7 @@ public class StorageEngine implements IService {
processorMap.get(storageGroupPath).removePartitions(filter);
}
- public Map<PartialPath, VirtualStorageGroupManager> getProcessorMap() {
+ public Map<PartialPath, StorageGroupManager> getProcessorMap() {
return processorMap;
}
@@ -918,7 +918,7 @@ public class StorageEngine implements IService {
*/
public Map<String, List<Pair<Long, Boolean>>> getWorkingStorageGroupPartitions() {
Map<String, List<Pair<Long, Boolean>>> res = new ConcurrentHashMap<>();
- for (Entry<PartialPath, VirtualStorageGroupManager> entry : processorMap.entrySet()) {
+ for (Entry<PartialPath, StorageGroupManager> entry : processorMap.entrySet()) {
entry.getValue().getWorkingStorageGroupPartitions(entry.getKey().getFullPath(), res);
}
return res;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionPriority.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionPriority.java
new file mode 100644
index 0000000..915a91f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionPriority.java
@@ -0,0 +1,5 @@
+package org.apache.iotdb.db.engine.compaction;
+
+public enum CompactionPriority {
+ INNER_CROSS, CROSS_INNER, BALANCE
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
similarity index 93%
rename from server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
rename to server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index cdf6f4a..43fac62 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -47,16 +47,16 @@ import java.util.concurrent.TimeUnit;
import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPACTION_LOG_NAME;
/** CompactionMergeTaskPoolManager provides a ThreadPool to queue and run all compaction tasks. */
-public class CompactionMergeTaskPoolManager implements IService {
+public class CompactionTaskManager implements IService {
private static final Logger logger =
- LoggerFactory.getLogger(CompactionMergeTaskPoolManager.class);
- private static final CompactionMergeTaskPoolManager INSTANCE =
- new CompactionMergeTaskPoolManager();
+ LoggerFactory.getLogger(CompactionTaskManager.class);
+ private static final CompactionTaskManager INSTANCE =
+ new CompactionTaskManager();
private ExecutorService pool;
private Map<String, Set<Future<Void>>> storageGroupTasks = new ConcurrentHashMap<>();
- public static CompactionMergeTaskPoolManager getInstance() {
+ public static CompactionTaskManager getInstance() {
return INSTANCE;
}
@@ -65,7 +65,7 @@ public class CompactionMergeTaskPoolManager implements IService {
if (pool == null) {
this.pool =
IoTDBThreadPoolFactory.newScheduledThreadPool(
- IoTDBDescriptor.getInstance().getConfig().getCompactionThreadNum(),
+ IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread(),
ThreadName.COMPACTION_SERVICE.getName());
}
logger.info("Compaction task manager started.");
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 8f17c1f..0fd965e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -77,7 +77,7 @@ public abstract class TsFileManagement {
protected boolean isForceFullMerge = IoTDBDescriptor.getInstance().getConfig().isForceFullMerge();
private final int maxOpenFileNumInEachUnseqCompaction =
- IoTDBDescriptor.getInstance().getConfig().getMaxOpenFileNumInEachUnseqCompaction();
+ IoTDBDescriptor.getInstance().getConfig().getMaxOpenFileNumInCrossSpaceCompaction();
public TsFileManagement(String storageGroupName, String storageGroupDir) {
this.storageGroupName = storageGroupName;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index afefcd5..ce7743a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -64,17 +64,12 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
private static final Logger logger =
LoggerFactory.getLogger(LevelCompactionTsFileManagement.class);
- private final int seqLevelNum =
- Math.max(IoTDBDescriptor.getInstance().getConfig().getSeqLevelNum(), 1);
- private final int seqFileNumInEachLevel =
- Math.max(IoTDBDescriptor.getInstance().getConfig().getSeqFileNumInEachLevel(), 1);
- private final int unseqLevelNum =
- Math.max(IoTDBDescriptor.getInstance().getConfig().getUnseqLevelNum(), 1);
- private final int unseqFileNumInEachLevel =
- Math.max(IoTDBDescriptor.getInstance().getConfig().getUnseqFileNumInEachLevel(), 1);
-
- private final boolean enableUnseqCompaction =
- IoTDBDescriptor.getInstance().getConfig().isEnableUnseqCompaction();
+ private final int seqLevelNum = 0;
+ private final int seqFileNumInEachLevel = 0;
+ private final int unseqLevelNum = 0;
+ private final int unseqFileNumInEachLevel = 0;
+
+ private final boolean enableUnseqCompaction = false;
// First map is partition list; Second list is level list; Third list is file list in level;
private final Map<Long, List<SortedSet<TsFileResource>>> sequenceTsFileResources =
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java
index a46717e..10de051 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.engine.flush.pool;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.rescon.AbstractPoolManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushTaskPoolManager.java
index 5fc3ccb..a212bf3 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushTaskPoolManager.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.rescon.AbstractPoolManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
index f9112e1..23669b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
@@ -126,7 +126,7 @@ public class MergeManager implements IService, MergeManagerMBean {
public void start() {
JMXService.registerMBean(this, mbeanName);
if (mergeTaskPool == null) {
- int threadNum = IoTDBDescriptor.getInstance().getConfig().getMergeThreadNum();
+ int threadNum = IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread();
if (threadNum <= 0) {
threadNum = 1;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 2b2185d..0b17247 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.TsFileManagement;
import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
@@ -400,11 +400,7 @@ public class StorageGroupProcessor {
} else if (!storageGroupSysDir.exists()) {
logger.error("create Storage Group system Directory {} failed", storageGroupSysDir.getPath());
}
- this.tsFileManagement =
- IoTDBDescriptor.getInstance()
- .getConfig()
- .getCompactionStrategy()
- .getTsFileManagement(logicalStorageGroupName, storageGroupSysDir.getAbsolutePath());
+ // TODO: new TsFileManagement
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleWithFixedDelay(
@@ -535,24 +531,17 @@ public class StorageGroupProcessor {
globalLatestFlushedTimeForEachDevice.putAll(endTimeMap);
}
- if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()
- && seqTsFileResources.size() > 0) {
- for (long timePartitionId : timePartitionIdVersionControllerMap.keySet()) {
- executeCompaction(
- timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
- }
- }
}
private void recoverCompaction() {
- if (!CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
+ if (!CompactionTaskManager.getInstance().isTerminated()) {
compactionMergeWorking = true;
logger.info(
"{} - {} submit a compaction recover merge task",
logicalStorageGroupName,
virtualStorageGroupId);
try {
- CompactionMergeTaskPoolManager.getInstance()
+ CompactionTaskManager.getInstance()
.submitTask(
logicalStorageGroupName,
tsFileManagement.new CompactionRecoverTask(this::closeCompactionMergeCallBack));
@@ -2047,7 +2036,7 @@ public class StorageGroupProcessor {
}
private void executeCompaction(long timePartition, boolean fullMerge) {
- if (!compactionMergeWorking && !CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
+ if (!compactionMergeWorking && !CompactionTaskManager.getInstance().isTerminated()) {
compactionMergeWorking = true;
logger.info(
"{} submit a compaction merge task",
@@ -2056,7 +2045,7 @@ public class StorageGroupProcessor {
// fork and filter current tsfile, then commit then to compaction merge
tsFileManagement.forkCurrentFileList(timePartition);
tsFileManagement.setForceFullMerge(fullMerge);
- CompactionMergeTaskPoolManager.getInstance()
+ CompactionTaskManager.getInstance()
.submitTask(
logicalStorageGroupName,
tsFileManagement
@@ -2077,12 +2066,7 @@ public class StorageGroupProcessor {
/** close compaction merge callback, to release some locks */
private void closeCompactionMergeCallBack(boolean isMerge, long timePartitionId) {
- if (isMerge && IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
- executeCompaction(
- timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
- } else {
- this.compactionMergeWorking = false;
- }
+ this.compactionMergeWorking = false;
}
/**
@@ -2930,7 +2914,7 @@ public class StorageGroupProcessor {
writeLock("removePartitions");
try {
// abort ongoing comapctions and merges
- CompactionMergeTaskPoolManager.getInstance().abortCompaction(logicalStorageGroupName);
+ CompactionTaskManager.getInstance().abortCompaction(logicalStorageGroupName);
MergeManager.getINSTANCE().abortMerge(logicalStorageGroupName);
// close all working files that should be removed
removePartitions(filter, workSequenceTsFileProcessors.entrySet());
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 9449437..8c06314 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -89,6 +89,12 @@ public class TsFileResource {
/** version number */
public static final byte VERSION_NUMBER = 1;
+ /**
+ * Used in {@link TsFileResourceList FileList}
+ */
+ protected TsFileResource prev;
+ protected TsFileResource next;
+
public TsFileProcessor getProcessor() {
return processor;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceList.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceList.java
new file mode 100644
index 0000000..06627b8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceList.java
@@ -0,0 +1,232 @@
+package org.apache.iotdb.db.engine.storagegroup;
+
+import org.apache.iotdb.tsfile.exception.NotImplementedException;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+
+public class TsFileResourceList implements List<TsFileResource> {
+ private TsFileResource header;
+ private TsFileResource tail;
+ private int count = 0;
+
+ /**
+ * Insert a new node before an existing node
+ *
+ * @param node the existing node
+ * @param newNode the new node to insert
+ */
+ public void insertBefore(TsFileResource node, TsFileResource newNode) {
+ newNode.prev = node.prev;
+ newNode.next = node;
+ if (node.prev == null) {
+ header = newNode;
+ } else {
+ node.prev.next = newNode;
+ }
+ node.prev = newNode;
+ }
+
+ /**
+ * Insert a new node after an existing node
+ *
+ * @param node the existing node
+ * @param newNode the new node to insert
+ */
+ public void insertAfter(TsFileResource node, TsFileResource newNode) {
+ newNode.prev = node;
+ newNode.next = node.next;
+ if (node.next == null) {
+ tail = newNode;
+ } else {
+ node.next.prev = newNode;
+ }
+ node.next = newNode;
+ }
+
+ @Override
+ public int size() {
+ return count;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return count == 0;
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ if (o.getClass() != TsFileResource.class) {
+ return false;
+ }
+ boolean contain = false;
+ TsFileResource current = header;
+ while (current != null) {
+ if (current.equals(o)) {
+ contain = true;
+ break;
+ }
+ current = current.next;
+ }
+ return contain;
+ }
+
+ @Override
+ public Iterator<TsFileResource> iterator() {
+ return new TsFileResourceIterator();
+ }
+
+ /**
+ * Insert a new tsFileResource node to the end of List
+ */
+ @Override
+ public boolean add(TsFileResource tsFileResource) {
+ if (tail == null) {
+ header = tsFileResource;
+ tail = tsFileResource;
+ tsFileResource.prev = null;
+ tsFileResource.next = null;
+ } else {
+ insertAfter(tail, tsFileResource);
+ }
+ return true;
+ }
+
+ /**
+ * The tsFileResource to be removed must be in the list, otherwise may cause unknown behavior
+ */
+ @Override
+ public boolean remove(Object o) {
+ TsFileResource tsFileResource = (TsFileResource) o;
+ if (tsFileResource.prev == null) {
+ header = header.next;
+ header.prev = null;
+ } else if (tsFileResource.next == null) {
+ tail = tail.prev;
+ tail.next = null;
+ } else {
+ tsFileResource.prev.next = tsFileResource.next;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> c) {
+ return false;
+ }
+
+ /**
+ * Only List type parameter is legal, because it is in order.
+ */
+ @Override
+ public boolean addAll(Collection<? extends TsFileResource> c) {
+ if (c instanceof List) {
+ for (TsFileResource resource : c) {
+ add(resource);
+ }
+ return true;
+ }
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void clear() {
+ header = null;
+ tail = null;
+ count = 0;
+ }
+
+ @Override
+ public Object[] toArray() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public <T> T[] toArray(T[] a) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean addAll(int index, Collection<? extends TsFileResource> c) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> c) {
+ throw new NotImplementedException();
+ }
+
+
+ @Override
+ public TsFileResource get(int index) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public TsFileResource set(int index, TsFileResource element) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void add(int index, TsFileResource element) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public TsFileResource remove(int index) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public int indexOf(Object o) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public int lastIndexOf(Object o) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public ListIterator<TsFileResource> listIterator() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public ListIterator<TsFileResource> listIterator(int index) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public List<TsFileResource> subList(int fromIndex, int toIndex) {
+ throw new NotImplementedException();
+ }
+
+ private class TsFileResourceIterator implements Iterator<TsFileResource> {
+ TsFileResource current;
+
+ public TsFileResourceIterator() {
+ this.current = header;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return current != null;
+ }
+
+ @Override
+ public TsFileResource next() {
+ TsFileResource temp = current;
+ current = current.next;
+ return temp;
+ }
+ }
+
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
rename to server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java
index e4faadb..972aefa 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java
@@ -41,10 +41,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class VirtualStorageGroupManager {
+/**
+ * Each storage group that set by users corresponds to a StorageGroupManager
+ */
+public class StorageGroupManager {
/** logger of this class */
- private static final Logger logger = LoggerFactory.getLogger(VirtualStorageGroupManager.class);
+ private static final Logger logger = LoggerFactory.getLogger(StorageGroupManager.class);
/** virtual storage group partitioner */
VirtualPartitioner partitioner = HashVirtualPartitioner.getInstance();
@@ -55,7 +58,7 @@ public class VirtualStorageGroupManager {
/** value of root.stats."root.sg".TOTAL_POINTS */
private long monitorSeriesValue;
- public VirtualStorageGroupManager() {
+ public StorageGroupManager() {
virtualStorageGroupProcessor = new StorageGroupProcessor[partitioner.getPartitionCount()];
}
diff --git a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
index d78d62a..fbeb527 100644
--- a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
-import org.apache.iotdb.db.engine.storagegroup.virtualSg.VirtualStorageGroupManager;
+import org.apache.iotdb.db.engine.storagegroup.virtualSg.StorageGroupManager;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -231,13 +231,13 @@ public class StatMonitor implements StatMonitorMBean, IService {
@Override
public long getStorageGroupTotalPointsNum(String storageGroupName) {
try {
- VirtualStorageGroupManager virtualStorageGroupManager =
+ StorageGroupManager storageGroupManager =
storageEngine.getProcessorMap().get(new PartialPath(storageGroupName));
- if (virtualStorageGroupManager == null) {
+ if (storageGroupManager == null) {
return 0;
}
- return virtualStorageGroupManager.getMonitorSeriesValue();
+ return storageGroupManager.getMonitorSeriesValue();
} catch (IllegalPathException e) {
logger.error(e.getMessage());
return -1;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java
index 0025ef7..fa284b7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/pool/QueryTaskPoolManager.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.query.pool;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.flush.pool.AbstractPoolManager;
+import org.apache.iotdb.db.rescon.AbstractPoolManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java b/server/src/main/java/org/apache/iotdb/db/rescon/AbstractPoolManager.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
rename to server/src/main/java/org/apache/iotdb/db/rescon/AbstractPoolManager.java
index c2f9003..820033d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/AbstractPoolManager.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.engine.flush.pool;
+package org.apache.iotdb.db.rescon;
import org.slf4j.Logger;
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 51a64ac..e4458ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.cost.statistic.Measurement;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor;
-import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.flush.FlushManager;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
@@ -108,7 +108,7 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(TVListAllocator.getInstance());
registerManager.register(CacheHitRatioMonitor.getInstance());
registerManager.register(MergeManager.getINSTANCE());
- registerManager.register(CompactionMergeTaskPoolManager.getInstance());
+ registerManager.register(CompactionTaskManager.getInstance());
JMXService.registerMBean(getInstance(), mbeanName);
registerManager.register(StorageEngine.getInstance());
registerManager.register(TemporaryQueryDataFileService.getInstance());
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/windowing/runtime/WindowEvaluationTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/utils/windowing/runtime/WindowEvaluationTaskPoolManager.java
index c6d7223..afa0289 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/windowing/runtime/WindowEvaluationTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/windowing/runtime/WindowEvaluationTaskPoolManager.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.IoTThreadFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.flush.pool.AbstractPoolManager;
+import org.apache.iotdb.db.rescon.AbstractPoolManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
index d01b9d9..289ce56 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBRestartIT.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.integration;
import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
@@ -379,7 +379,7 @@ public class IoTDBRestartIT {
}
try {
- CompactionMergeTaskPoolManager.getInstance().waitAllCompactionFinish();
+ CompactionTaskManager.getInstance().waitAllCompactionFinish();
Thread.sleep(10000);
EnvironmentUtils.restartDaemon();
} catch (Exception e) {
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index fc29451..9892780 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
-import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.TriggerManagementException;
@@ -86,7 +86,7 @@ public class EnvironmentUtils {
public static void cleanEnv() throws IOException, StorageEngineException {
// wait all compaction finished
- CompactionMergeTaskPoolManager.getInstance().waitAllCompactionFinish();
+ CompactionTaskManager.getInstance().waitAllCompactionFinish();
// deregister all user defined classes
try {