You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/04/21 01:47:02 UTC
[iotdb] branch ISSUE_5792 updated: add config of thread num
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch ISSUE_5792
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ISSUE_5792 by this push:
new 2d32f6e2de add config of thread num
2d32f6e2de is described below
commit 2d32f6e2de53e6d218298d2b38a3044239c7d650
Author: Tian Jiang <jt...@163.com>
AuthorDate: Fri Apr 21 09:49:29 2023 +0800
add config of thread num
---
.../assembly/resources/conf/iotdb-common.properties | 7 +++++++
.../concurrent/dynamic/DynamicThreadGroup.java | 8 +++++---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 21 +++++++++++++++++++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 11 +++++++++++
.../iotdb/db/engine/flush/MemTableFlushTaskV2.java | 8 ++++----
5 files changed, 48 insertions(+), 7 deletions(-)
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 038b1f894d..2e5b787964 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -528,6 +528,13 @@ cluster_name=defaultCluster
# Datatype: int
# upgrade_thread_count=1
+
+# When flushing a MemTable, the range of thread number that will be available for each pipeline state.
+# Set to 1 when less than or equal to 0.
+# Datatype: int
+# flush_min_sub_thread_num = 1
+# flush_max_sub_thread_num = 16
+
####################
### Compaction Configurations
####################
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
index a09f831a4a..2266d5c579 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
@@ -35,8 +35,8 @@ public class DynamicThreadGroup {
private String name;
private Supplier<DynamicThread> threadFactory;
private AtomicInteger threadCnt = new AtomicInteger();
- private int minThreadCnt = 1;
- private int maxThreadCnt = 8;
+ private int minThreadCnt;
+ private int maxThreadCnt;
private Map<DynamicThread, Future<?>> threadFutureMap = new ConcurrentHashMap<>();
public DynamicThreadGroup(
@@ -50,7 +50,9 @@ public class DynamicThreadGroup {
this.threadFactory = threadFactory;
this.minThreadCnt = Math.max(1, minThreadCnt);
this.maxThreadCnt = Math.max(this.minThreadCnt, maxThreadCnt);
- for (int i = 0; i < this.maxThreadCnt; i++) {
+
+ int initialThreadNum = (this.minThreadCnt + this.maxThreadCnt) / 2;
+ for (int i = 0; i < initialThreadNum; i++) {
addThread();
}
logger.info(
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 4e39691ed0..f30205c707 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1088,6 +1088,11 @@ public class IoTDBConfig {
*/
private String RateLimiterType = "FixedIntervalRateLimiter";
+ /** The minimum/maximum number of subtask threads of each stage when flushing one MemTable. */
+ private int flushMemTableMinSubThread = 1;
+
+ private int flushMemTableMaxSubThread = 16;
+
IoTDBConfig() {}
public float getUdfMemoryBudgetInMB() {
@@ -3758,4 +3763,20 @@ public class IoTDBConfig {
public void setRateLimiterType(String rateLimiterType) {
RateLimiterType = rateLimiterType;
}
+
+ public int getFlushMemTableMinSubThread() {
+ return flushMemTableMinSubThread;
+ }
+
+ public void setFlushMemTableMinSubThread(int flushMemTableMinSubThread) {
+ this.flushMemTableMinSubThread = flushMemTableMinSubThread;
+ }
+
+ public int getFlushMemTableMaxSubThread() {
+ return flushMemTableMaxSubThread;
+ }
+
+ public void setFlushMemTableMaxSubThread(int flushMemTableMaxSubThread) {
+ this.flushMemTableMaxSubThread = flushMemTableMaxSubThread;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 5581bacde9..9228d5cfae 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1009,6 +1009,17 @@ public class IoTDBDescriptor {
"coordinator_write_executor_size",
Integer.toString(conf.getCoordinatorWriteExecutorSize()))));
+ conf.setFlushMemTableMinSubThread(
+ Integer.parseInt(
+ properties.getProperty(
+ "flush_min_sub_thread_num",
+ Integer.toString(conf.getFlushMemTableMinSubThread()))));
+ conf.setFlushMemTableMaxSubThread(
+ Integer.parseInt(
+ properties.getProperty(
+ "flush_max_sub_thread_num",
+ Integer.toString(conf.getFlushMemTableMaxSubThread()))));
+
// commons
commonDescriptor.loadCommonProps(properties);
commonDescriptor.initCommonConfigDir(conf.getSystemDir());
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskV2.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskV2.java
index 9b03bc731c..c36587922d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskV2.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskV2.java
@@ -107,15 +107,15 @@ public class MemTableFlushTaskV2 {
storageGroup + "-" + dataRegionId + "-" + memTable,
SUB_TASK_POOL_MANAGER::submit,
this::newSortThread,
- 1,
- 8);
+ config.getFlushMemTableMinSubThread(),
+ config.getFlushMemTableMaxSubThread());
this.encodingTasks =
new DynamicThreadGroup(
storageGroup + "-" + dataRegionId + "-" + memTable,
SUB_TASK_POOL_MANAGER::submit,
this::newEncodingThread,
- 1,
- 8);
+ config.getFlushMemTableMinSubThread(),
+ config.getFlushMemTableMaxSubThread());
this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(newIOThread());
LOGGER.debug(
"flush task of database {} memtable is created, flushing to file {}.",