You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/04/21 01:47:02 UTC

[iotdb] branch ISSUE_5792 updated: add config of thread num

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch ISSUE_5792
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ISSUE_5792 by this push:
     new 2d32f6e2de add config of thread num
2d32f6e2de is described below

commit 2d32f6e2de53e6d218298d2b38a3044239c7d650
Author: Tian Jiang <jt...@163.com>
AuthorDate: Fri Apr 21 09:49:29 2023 +0800

    add config of thread num
---
 .../assembly/resources/conf/iotdb-common.properties |  7 +++++++
 .../concurrent/dynamic/DynamicThreadGroup.java      |  8 +++++---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java  | 21 +++++++++++++++++++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java   | 11 +++++++++++
 .../iotdb/db/engine/flush/MemTableFlushTaskV2.java  |  8 ++++----
 5 files changed, 48 insertions(+), 7 deletions(-)

diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 038b1f894d..2e5b787964 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -528,6 +528,13 @@ cluster_name=defaultCluster
 # Datatype: int
 # upgrade_thread_count=1
 
+
+# When flushing a MemTable, the range of thread number that will be available for each pipeline state.
+# Set to 1 when less than or equal to 0.
+# Datatype: int
+# flush_min_sub_thread_num = 1
+# flush_max_sub_thread_num = 16
+
 ####################
 ### Compaction Configurations
 ####################
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
index a09f831a4a..2266d5c579 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
@@ -35,8 +35,8 @@ public class DynamicThreadGroup {
   private String name;
   private Supplier<DynamicThread> threadFactory;
   private AtomicInteger threadCnt = new AtomicInteger();
-  private int minThreadCnt = 1;
-  private int maxThreadCnt = 8;
+  private int minThreadCnt;
+  private int maxThreadCnt;
   private Map<DynamicThread, Future<?>> threadFutureMap = new ConcurrentHashMap<>();
 
   public DynamicThreadGroup(
@@ -50,7 +50,9 @@ public class DynamicThreadGroup {
     this.threadFactory = threadFactory;
     this.minThreadCnt = Math.max(1, minThreadCnt);
     this.maxThreadCnt = Math.max(this.minThreadCnt, maxThreadCnt);
-    for (int i = 0; i < this.maxThreadCnt; i++) {
+
+    int initialThreadNum = (this.minThreadCnt + this.maxThreadCnt) / 2;
+    for (int i = 0; i < initialThreadNum; i++) {
       addThread();
     }
     logger.info(
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 4e39691ed0..f30205c707 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1088,6 +1088,11 @@ public class IoTDBConfig {
    */
   private String RateLimiterType = "FixedIntervalRateLimiter";
 
+  /** The minimum/maximum number of subtask threads of each stage when flushing one MemTable. */
+  private int flushMemTableMinSubThread = 1;
+
+  private int flushMemTableMaxSubThread = 16;
+
   IoTDBConfig() {}
 
   public float getUdfMemoryBudgetInMB() {
@@ -3758,4 +3763,20 @@ public class IoTDBConfig {
   public void setRateLimiterType(String rateLimiterType) {
     RateLimiterType = rateLimiterType;
   }
+
+  public int getFlushMemTableMinSubThread() {
+    return flushMemTableMinSubThread;
+  }
+
+  public void setFlushMemTableMinSubThread(int flushMemTableMinSubThread) {
+    this.flushMemTableMinSubThread = flushMemTableMinSubThread;
+  }
+
+  public int getFlushMemTableMaxSubThread() {
+    return flushMemTableMaxSubThread;
+  }
+
+  public void setFlushMemTableMaxSubThread(int flushMemTableMaxSubThread) {
+    this.flushMemTableMaxSubThread = flushMemTableMaxSubThread;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 5581bacde9..9228d5cfae 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1009,6 +1009,17 @@ public class IoTDBDescriptor {
                 "coordinator_write_executor_size",
                 Integer.toString(conf.getCoordinatorWriteExecutorSize()))));
 
+    conf.setFlushMemTableMinSubThread(
+        Integer.parseInt(
+            properties.getProperty(
+                "flush_min_sub_thread_num",
+                Integer.toString(conf.getFlushMemTableMinSubThread()))));
+    conf.setFlushMemTableMaxSubThread(
+        Integer.parseInt(
+            properties.getProperty(
+                "flush_max_sub_thread_num",
+                Integer.toString(conf.getFlushMemTableMaxSubThread()))));
+
     // commons
     commonDescriptor.loadCommonProps(properties);
     commonDescriptor.initCommonConfigDir(conf.getSystemDir());
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskV2.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskV2.java
index 9b03bc731c..c36587922d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskV2.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTaskV2.java
@@ -107,15 +107,15 @@ public class MemTableFlushTaskV2 {
             storageGroup + "-" + dataRegionId + "-" + memTable,
             SUB_TASK_POOL_MANAGER::submit,
             this::newSortThread,
-            1,
-            8);
+            config.getFlushMemTableMinSubThread(),
+            config.getFlushMemTableMaxSubThread());
     this.encodingTasks =
         new DynamicThreadGroup(
             storageGroup + "-" + dataRegionId + "-" + memTable,
             SUB_TASK_POOL_MANAGER::submit,
             this::newEncodingThread,
-            1,
-            8);
+            config.getFlushMemTableMinSubThread(),
+            config.getFlushMemTableMaxSubThread());
     this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(newIOThread());
     LOGGER.debug(
         "flush task of database {} memtable is created, flushing to file {}.",