You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/08/03 13:28:55 UTC

[iotdb] branch clean_thread updated: change executor in coordinator to 1 in standalone

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch clean_thread
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/clean_thread by this push:
     new 9453e43743 change executor in coordinator to 1 in standalone
9453e43743 is described below

commit 9453e43743b52ff873226e9c981ebf460cc8149e
Author: qiaojialin <64...@qq.com>
AuthorDate: Wed Aug 3 21:28:34 2022 +0800

    change executor in coordinator to 1 in standalone
---
 .../resources/conf/iotdb-datanode.properties       |  2 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  4 +---
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 23 +++++++++++-----------
 .../org/apache/iotdb/db/mpp/plan/Coordinator.java  |  5 ++---
 .../mpp/plan/execution/config/ConfigExecution.java |  6 +-----
 .../db/mpp/execution/ConfigExecutionTest.java      | 15 +++++---------
 6 files changed, 21 insertions(+), 34 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 66c8460c5c..212e503163 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -1047,7 +1047,7 @@ timestamp_precision=ms
 
 # thread pool size for read operation in DataNode's coordinator.
 # Datatype: int
-# coordinator_read_executor_size=50
+# coordinator_read_executor_size=20
 
 # thread pool size for write operation in DataNode's coordinator.
 # Datatype: int
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 3df27bacd3..44842dddab 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -736,8 +736,6 @@ public class IoTDBConfig {
   // wait for 60 second by default.
   private int thriftServerAwaitTimeForStopService = 60;
 
-  private int queryCacheSizeInMetric = 50;
-
   // max size for tag and attribute of one time series
   private int tagAttributeTotalSize = 700;
 
@@ -931,7 +929,7 @@ public class IoTDBConfig {
   private int triggerForwardMQTTPoolSize = 4;
 
   /** ThreadPool size for read operation in coordinator */
-  private int coordinatorReadExecutorSize = 50;
+  private int coordinatorReadExecutorSize = 20;
 
   /** ThreadPool size for write operation in coordinator */
   private int coordinatorWriteExecutorSize = 50;
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 046cebbdc0..72f48793dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -146,8 +146,6 @@ public class IoTDBDescriptor {
 
       conf.setRpcAddress(properties.getProperty(IoTDBConstant.RPC_ADDRESS, conf.getRpcAddress()));
 
-      loadClusterProps(properties);
-
       // TODO: Use FQDN  to identify our nodes afterwards
       try {
         replaceHostnameWithIP();
@@ -888,6 +886,17 @@ public class IoTDBDescriptor {
               properties.getProperty("kerberos_principal", conf.getKerberosPrincipal()));
       TSFileDescriptor.getInstance().getConfig().setBatchSize(conf.getBatchSize());
 
+      conf.setCoordinatorReadExecutorSize(
+          Integer.parseInt(
+              properties.getProperty(
+                  "coordinator_read_executor_size",
+                  Integer.toString(conf.getCoordinatorReadExecutorSize()))));
+      conf.setCoordinatorWriteExecutorSize(
+          Integer.parseInt(
+              properties.getProperty(
+                  "coordinator_write_executor_size",
+                  Integer.toString(conf.getCoordinatorWriteExecutorSize()))));
+
       // commons
       commonDescriptor.loadCommonProps(properties);
       commonDescriptor.initCommonConfigDir(conf.getSystemDir());
@@ -1640,16 +1649,6 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "trigger_forward_mqtt_pool_size",
                 Integer.toString(conf.getTriggerForwardMQTTPoolSize()))));
-    conf.setCoordinatorReadExecutorSize(
-        Integer.parseInt(
-            properties.getProperty(
-                "coordinator_read_executor_size",
-                Integer.toString(conf.getCoordinatorReadExecutorSize()))));
-    conf.setCoordinatorWriteExecutorSize(
-        Integer.parseInt(
-            properties.getProperty(
-                "coordinator_write_executor_size",
-                Integer.toString(conf.getCoordinatorWriteExecutorSize()))));
   }
 
   private void loadCQProps(Properties properties) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index 70ab26f83b..d4f8ac8e9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -172,14 +172,13 @@ public class Coordinator {
   // TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
   private ExecutorService getQueryExecutor() {
     int coordinatorReadExecutorSize =
-        IoTDBDescriptor.getInstance().getConfig().getCoordinatorReadExecutorSize();
+        config.isClusterMode() ? config.getCoordinatorReadExecutorSize() : 1;
     return IoTDBThreadPoolFactory.newFixedThreadPool(
         coordinatorReadExecutorSize, COORDINATOR_EXECUTOR_NAME);
   }
 
   private ExecutorService getWriteExecutor() {
-    int coordinatorWriteExecutorSize =
-        IoTDBDescriptor.getInstance().getConfig().getCoordinatorWriteExecutorSize();
+    int coordinatorWriteExecutorSize = config.getCoordinatorWriteExecutorSize();
     return IoTDBThreadPoolFactory.newFixedThreadPool(
         coordinatorWriteExecutorSize, COORDINATOR_WRITE_EXECUTOR_NAME);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
index b807d51565..91260063a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
@@ -53,7 +53,6 @@ public class ConfigExecution implements IQueryExecution {
   private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
   private final MPPQueryContext context;
-  private final Statement statement;
   private final ExecutorService executor;
 
   private final QueryStateMachine stateMachine;
@@ -66,7 +65,6 @@ public class ConfigExecution implements IQueryExecution {
 
   public ConfigExecution(MPPQueryContext context, Statement statement, ExecutorService executor) {
     this.context = context;
-    this.statement = statement;
     this.executor = executor;
     this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
     this.taskFuture = SettableFuture.create();
@@ -80,10 +78,8 @@ public class ConfigExecution implements IQueryExecution {
   }
 
   @TestOnly
-  public ConfigExecution(
-      MPPQueryContext context, Statement statement, ExecutorService executor, IConfigTask task) {
+  public ConfigExecution(MPPQueryContext context, ExecutorService executor, IConfigTask task) {
     this.context = context;
-    this.statement = statement;
     this.executor = executor;
     this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
     this.taskFuture = SettableFuture.create();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
index f43e702772..1fc92886d4 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
@@ -54,8 +54,7 @@ public class ConfigExecutionTest {
   public void normalConfigTaskTest() {
     IConfigTask task =
         (clientManager) -> immediateFuture(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
-    ConfigExecution execution =
-        new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+    ConfigExecution execution = new ConfigExecution(genMPPQueryContext(), getExecutor(), task);
     execution.start();
     ExecutionResult result = execution.getStatus();
     assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), result.status.code);
@@ -74,8 +73,7 @@ public class ConfigExecutionTest {
         (clientManager) ->
             immediateFuture(
                 new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, tsBlock, datasetHeader));
-    ConfigExecution execution =
-        new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+    ConfigExecution execution = new ConfigExecution(genMPPQueryContext(), getExecutor(), task);
     execution.start();
     ExecutionResult result = execution.getStatus();
     TsBlock tsBlockFromExecution = null;
@@ -94,8 +92,7 @@ public class ConfigExecutionTest {
         (clientManager) -> {
           throw new RuntimeException("task throw exception when executing");
         };
-    ConfigExecution execution =
-        new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+    ConfigExecution execution = new ConfigExecution(genMPPQueryContext(), getExecutor(), task);
     execution.start();
     ExecutionResult result = execution.getStatus();
     assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), result.status.code);
@@ -118,8 +115,7 @@ public class ConfigExecutionTest {
       }
     }
     IConfigTask task = new SimpleTask(taskResult);
-    ConfigExecution execution =
-        new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+    ConfigExecution execution = new ConfigExecution(genMPPQueryContext(), getExecutor(), task);
     execution.start();
 
     Thread resultThread =
@@ -139,8 +135,7 @@ public class ConfigExecutionTest {
         (clientManager) -> {
           throw new RuntimeException("task throw exception when executing");
         };
-    ConfigExecution execution =
-        new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+    ConfigExecution execution = new ConfigExecution(genMPPQueryContext(), getExecutor(), task);
     Thread resultThread =
         new Thread(
             () -> {