You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/08/03 13:28:55 UTC
[iotdb] branch clean_thread updated: change executor in coordinator to 1 in standalone
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch clean_thread
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/clean_thread by this push:
new 9453e43743 change executor in coordinator to 1 in standalone
9453e43743 is described below
commit 9453e43743b52ff873226e9c981ebf460cc8149e
Author: qiaojialin <64...@qq.com>
AuthorDate: Wed Aug 3 21:28:34 2022 +0800
change executor in coordinator to 1 in standalone
---
.../resources/conf/iotdb-datanode.properties | 2 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 4 +---
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 23 +++++++++++-----------
.../org/apache/iotdb/db/mpp/plan/Coordinator.java | 5 ++---
.../mpp/plan/execution/config/ConfigExecution.java | 6 +-----
.../db/mpp/execution/ConfigExecutionTest.java | 15 +++++---------
6 files changed, 21 insertions(+), 34 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 66c8460c5c..212e503163 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -1047,7 +1047,7 @@ timestamp_precision=ms
# thread pool size for read operation in DataNode's coordinator.
# Datatype: int
-# coordinator_read_executor_size=50
+# coordinator_read_executor_size=20
# thread pool size for write operation in DataNode's coordinator.
# Datatype: int
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 3df27bacd3..44842dddab 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -736,8 +736,6 @@ public class IoTDBConfig {
// wait for 60 second by default.
private int thriftServerAwaitTimeForStopService = 60;
- private int queryCacheSizeInMetric = 50;
-
// max size for tag and attribute of one time series
private int tagAttributeTotalSize = 700;
@@ -931,7 +929,7 @@ public class IoTDBConfig {
private int triggerForwardMQTTPoolSize = 4;
/** ThreadPool size for read operation in coordinator */
- private int coordinatorReadExecutorSize = 50;
+ private int coordinatorReadExecutorSize = 20;
/** ThreadPool size for write operation in coordinator */
private int coordinatorWriteExecutorSize = 50;
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 046cebbdc0..72f48793dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -146,8 +146,6 @@ public class IoTDBDescriptor {
conf.setRpcAddress(properties.getProperty(IoTDBConstant.RPC_ADDRESS, conf.getRpcAddress()));
- loadClusterProps(properties);
-
// TODO: Use FQDN to identify our nodes afterwards
try {
replaceHostnameWithIP();
@@ -888,6 +886,17 @@ public class IoTDBDescriptor {
properties.getProperty("kerberos_principal", conf.getKerberosPrincipal()));
TSFileDescriptor.getInstance().getConfig().setBatchSize(conf.getBatchSize());
+ conf.setCoordinatorReadExecutorSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "coordinator_read_executor_size",
+ Integer.toString(conf.getCoordinatorReadExecutorSize()))));
+ conf.setCoordinatorWriteExecutorSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "coordinator_write_executor_size",
+ Integer.toString(conf.getCoordinatorWriteExecutorSize()))));
+
// commons
commonDescriptor.loadCommonProps(properties);
commonDescriptor.initCommonConfigDir(conf.getSystemDir());
@@ -1640,16 +1649,6 @@ public class IoTDBDescriptor {
properties.getProperty(
"trigger_forward_mqtt_pool_size",
Integer.toString(conf.getTriggerForwardMQTTPoolSize()))));
- conf.setCoordinatorReadExecutorSize(
- Integer.parseInt(
- properties.getProperty(
- "coordinator_read_executor_size",
- Integer.toString(conf.getCoordinatorReadExecutorSize()))));
- conf.setCoordinatorWriteExecutorSize(
- Integer.parseInt(
- properties.getProperty(
- "coordinator_write_executor_size",
- Integer.toString(conf.getCoordinatorWriteExecutorSize()))));
}
private void loadCQProps(Properties properties) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index 70ab26f83b..d4f8ac8e9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -172,14 +172,13 @@ public class Coordinator {
// TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
private ExecutorService getQueryExecutor() {
int coordinatorReadExecutorSize =
- IoTDBDescriptor.getInstance().getConfig().getCoordinatorReadExecutorSize();
+ config.isClusterMode() ? config.getCoordinatorReadExecutorSize() : 1;
return IoTDBThreadPoolFactory.newFixedThreadPool(
coordinatorReadExecutorSize, COORDINATOR_EXECUTOR_NAME);
}
private ExecutorService getWriteExecutor() {
- int coordinatorWriteExecutorSize =
- IoTDBDescriptor.getInstance().getConfig().getCoordinatorWriteExecutorSize();
+ int coordinatorWriteExecutorSize = config.getCoordinatorWriteExecutorSize();
return IoTDBThreadPoolFactory.newFixedThreadPool(
coordinatorWriteExecutorSize, COORDINATOR_WRITE_EXECUTOR_NAME);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
index b807d51565..91260063a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
@@ -53,7 +53,6 @@ public class ConfigExecution implements IQueryExecution {
private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final MPPQueryContext context;
- private final Statement statement;
private final ExecutorService executor;
private final QueryStateMachine stateMachine;
@@ -66,7 +65,6 @@ public class ConfigExecution implements IQueryExecution {
public ConfigExecution(MPPQueryContext context, Statement statement, ExecutorService executor) {
this.context = context;
- this.statement = statement;
this.executor = executor;
this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
this.taskFuture = SettableFuture.create();
@@ -80,10 +78,8 @@ public class ConfigExecution implements IQueryExecution {
}
@TestOnly
- public ConfigExecution(
- MPPQueryContext context, Statement statement, ExecutorService executor, IConfigTask task) {
+ public ConfigExecution(MPPQueryContext context, ExecutorService executor, IConfigTask task) {
this.context = context;
- this.statement = statement;
this.executor = executor;
this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
this.taskFuture = SettableFuture.create();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
index f43e702772..1fc92886d4 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
@@ -54,8 +54,7 @@ public class ConfigExecutionTest {
public void normalConfigTaskTest() {
IConfigTask task =
(clientManager) -> immediateFuture(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
- ConfigExecution execution =
- new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+ ConfigExecution execution = new ConfigExecution(genMPPQueryContext(), getExecutor(), task);
execution.start();
ExecutionResult result = execution.getStatus();
assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), result.status.code);
@@ -74,8 +73,7 @@ public class ConfigExecutionTest {
(clientManager) ->
immediateFuture(
new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, tsBlock, datasetHeader));
- ConfigExecution execution =
- new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+ ConfigExecution execution = new ConfigExecution(genMPPQueryContext(), getExecutor(), task);
execution.start();
ExecutionResult result = execution.getStatus();
TsBlock tsBlockFromExecution = null;
@@ -94,8 +92,7 @@ public class ConfigExecutionTest {
(clientManager) -> {
throw new RuntimeException("task throw exception when executing");
};
- ConfigExecution execution =
- new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+ ConfigExecution execution = new ConfigExecution(genMPPQueryContext(), getExecutor(), task);
execution.start();
ExecutionResult result = execution.getStatus();
assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), result.status.code);
@@ -118,8 +115,7 @@ public class ConfigExecutionTest {
}
}
IConfigTask task = new SimpleTask(taskResult);
- ConfigExecution execution =
- new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+ ConfigExecution execution = new ConfigExecution(genMPPQueryContext(), getExecutor(), task);
execution.start();
Thread resultThread =
@@ -139,8 +135,7 @@ public class ConfigExecutionTest {
(clientManager) -> {
throw new RuntimeException("task throw exception when executing");
};
- ConfigExecution execution =
- new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+ ConfigExecution execution = new ConfigExecution(genMPPQueryContext(), getExecutor(), task);
Thread resultThread =
new Thread(
() -> {