You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/08/03 23:35:14 UTC
[iotdb] branch master updated: [IOTDB-4037] reduce thread count for new standalone server (#6888)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4251d49630 [IOTDB-4037] reduce thread count for new standalone server (#6888)
4251d49630 is described below
commit 4251d49630f0aa51ac2d7b4679288be91fdcc75a
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Thu Aug 4 07:35:07 2022 +0800
[IOTDB-4037] reduce thread count for new standalone server (#6888)
Co-authored-by: qiaojialin <64...@qq.com>
---
.../resources/conf/iotdb-datanode.properties | 2 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 4 +---
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 23 +++++++++---------
.../org/apache/iotdb/db/mpp/plan/Coordinator.java | 5 ++--
.../db/mpp/plan/execution/QueryExecution.java | 3 +--
.../mpp/plan/execution/config/ConfigExecution.java | 6 +----
.../scheduler/AbstractFragInsStateTracker.java | 4 ----
.../db/mpp/plan/scheduler/ClusterScheduler.java | 3 +--
.../scheduler/FixedRateFragInsStateTracker.java | 4 +---
.../mpp/plan/scheduler/SimpleQueryTerminator.java | 4 ----
.../db/mpp/plan/scheduler/StandaloneScheduler.java | 28 +++++-----------------
.../db/mpp/execution/ConfigExecutionTest.java | 15 ++++--------
.../plan/scheduler/StandaloneSchedulerTest.java | 5 ----
13 files changed, 30 insertions(+), 76 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 66c8460c5c..212e503163 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -1047,7 +1047,7 @@ timestamp_precision=ms
# thread pool size for read operation in DataNode's coordinator.
# Datatype: int
-# coordinator_read_executor_size=50
+# coordinator_read_executor_size=20
# thread pool size for write operation in DataNode's coordinator.
# Datatype: int
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 3df27bacd3..44842dddab 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -736,8 +736,6 @@ public class IoTDBConfig {
// wait for 60 second by default.
private int thriftServerAwaitTimeForStopService = 60;
- private int queryCacheSizeInMetric = 50;
-
// max size for tag and attribute of one time series
private int tagAttributeTotalSize = 700;
@@ -931,7 +929,7 @@ public class IoTDBConfig {
private int triggerForwardMQTTPoolSize = 4;
/** ThreadPool size for read operation in coordinator */
- private int coordinatorReadExecutorSize = 50;
+ private int coordinatorReadExecutorSize = 20;
/** ThreadPool size for write operation in coordinator */
private int coordinatorWriteExecutorSize = 50;
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 046cebbdc0..72f48793dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -146,8 +146,6 @@ public class IoTDBDescriptor {
conf.setRpcAddress(properties.getProperty(IoTDBConstant.RPC_ADDRESS, conf.getRpcAddress()));
- loadClusterProps(properties);
-
// TODO: Use FQDN to identify our nodes afterwards
try {
replaceHostnameWithIP();
@@ -888,6 +886,17 @@ public class IoTDBDescriptor {
properties.getProperty("kerberos_principal", conf.getKerberosPrincipal()));
TSFileDescriptor.getInstance().getConfig().setBatchSize(conf.getBatchSize());
+ conf.setCoordinatorReadExecutorSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "coordinator_read_executor_size",
+ Integer.toString(conf.getCoordinatorReadExecutorSize()))));
+ conf.setCoordinatorWriteExecutorSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "coordinator_write_executor_size",
+ Integer.toString(conf.getCoordinatorWriteExecutorSize()))));
+
// commons
commonDescriptor.loadCommonProps(properties);
commonDescriptor.initCommonConfigDir(conf.getSystemDir());
@@ -1640,16 +1649,6 @@ public class IoTDBDescriptor {
properties.getProperty(
"trigger_forward_mqtt_pool_size",
Integer.toString(conf.getTriggerForwardMQTTPoolSize()))));
- conf.setCoordinatorReadExecutorSize(
- Integer.parseInt(
- properties.getProperty(
- "coordinator_read_executor_size",
- Integer.toString(conf.getCoordinatorReadExecutorSize()))));
- conf.setCoordinatorWriteExecutorSize(
- Integer.parseInt(
- properties.getProperty(
- "coordinator_write_executor_size",
- Integer.toString(conf.getCoordinatorWriteExecutorSize()))));
}
private void loadCQProps(Properties properties) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index 70ab26f83b..d4f8ac8e9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -172,14 +172,13 @@ public class Coordinator {
// TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
private ExecutorService getQueryExecutor() {
int coordinatorReadExecutorSize =
- IoTDBDescriptor.getInstance().getConfig().getCoordinatorReadExecutorSize();
+ config.isClusterMode() ? config.getCoordinatorReadExecutorSize() : 1;
return IoTDBThreadPoolFactory.newFixedThreadPool(
coordinatorReadExecutorSize, COORDINATOR_EXECUTOR_NAME);
}
private ExecutorService getWriteExecutor() {
- int coordinatorWriteExecutorSize =
- IoTDBDescriptor.getInstance().getConfig().getCoordinatorWriteExecutorSize();
+ int coordinatorWriteExecutorSize = config.getCoordinatorWriteExecutorSize();
return IoTDBThreadPoolFactory.newFixedThreadPool(
coordinatorWriteExecutorSize, COORDINATOR_WRITE_EXECUTOR_NAME);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 4534b7b740..842352a7b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -98,7 +98,7 @@ public class QueryExecution implements IQueryExecution {
private final List<PlanOptimizer> planOptimizers;
- private Statement rawStatement;
+ private final Statement rawStatement;
private Analysis analysis;
private LogicalQueryPlan logicalPlan;
private DistributedQueryPlan distributedPlan;
@@ -252,7 +252,6 @@ public class QueryExecution implements IQueryExecution {
stateMachine,
distributedPlan.getInstances(),
context.getQueryType(),
- executor,
scheduledExecutor,
internalServiceClientManager);
this.scheduler.start();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
index b807d51565..91260063a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
@@ -53,7 +53,6 @@ public class ConfigExecution implements IQueryExecution {
private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final MPPQueryContext context;
- private final Statement statement;
private final ExecutorService executor;
private final QueryStateMachine stateMachine;
@@ -66,7 +65,6 @@ public class ConfigExecution implements IQueryExecution {
public ConfigExecution(MPPQueryContext context, Statement statement, ExecutorService executor) {
this.context = context;
- this.statement = statement;
this.executor = executor;
this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
this.taskFuture = SettableFuture.create();
@@ -80,10 +78,8 @@ public class ConfigExecution implements IQueryExecution {
}
@TestOnly
- public ConfigExecution(
- MPPQueryContext context, Statement statement, ExecutorService executor, IConfigTask task) {
+ public ConfigExecution(MPPQueryContext context, ExecutorService executor, IConfigTask task) {
this.context = context;
- this.statement = statement;
this.executor = executor;
this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
this.taskFuture = SettableFuture.create();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
index 1be7c12ab0..50031dc9b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
@@ -33,13 +33,11 @@ import org.apache.thrift.TException;
import java.io.IOException;
import java.util.List;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
public abstract class AbstractFragInsStateTracker implements IFragInstanceStateTracker {
protected QueryStateMachine stateMachine;
- protected ExecutorService executor;
protected ScheduledExecutorService scheduledExecutor;
protected List<FragmentInstance> instances;
@@ -48,12 +46,10 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
public AbstractFragInsStateTracker(
QueryStateMachine stateMachine,
- ExecutorService executor,
ScheduledExecutorService scheduledExecutor,
List<FragmentInstance> instances,
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
this.stateMachine = stateMachine;
- this.executor = executor;
this.scheduledExecutor = scheduledExecutor;
this.instances = instances;
this.internalServiceClientManager = internalServiceClientManager;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index f2d36f093d..e9829cc637 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -80,10 +80,9 @@ public class ClusterScheduler implements IScheduler {
if (queryType == QueryType.READ) {
this.stateTracker =
new FixedRateFragInsStateTracker(
- stateMachine, executor, scheduledExecutor, instances, internalServiceClientManager);
+ stateMachine, scheduledExecutor, instances, internalServiceClientManager);
this.queryTerminator =
new SimpleQueryTerminator(
- executor,
scheduledExecutor,
queryContext.getQueryId(),
instances,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
index 59dbce1208..f000add3ba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
@@ -34,7 +34,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -54,11 +53,10 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
public FixedRateFragInsStateTracker(
QueryStateMachine stateMachine,
- ExecutorService executor,
ScheduledExecutorService scheduledExecutor,
List<FragmentInstance> instances,
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
- super(stateMachine, executor, scheduledExecutor, instances, internalServiceClientManager);
+ super(stateMachine, scheduledExecutor, instances, internalServiceClientManager);
this.aborted = false;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
index 0a459f2e0f..32b329d640 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
@@ -35,7 +35,6 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -44,7 +43,6 @@ import java.util.stream.Collectors;
public class SimpleQueryTerminator implements IQueryTerminator {
private static final Logger logger = LoggerFactory.getLogger(SimpleQueryTerminator.class);
private static final long TERMINATION_GRACE_PERIOD_IN_MS = 1000L;
- private final ExecutorService executor;
protected ScheduledExecutorService scheduledExecutor;
private final QueryId queryId;
private List<TEndPoint> relatedHost;
@@ -54,12 +52,10 @@ public class SimpleQueryTerminator implements IQueryTerminator {
internalServiceClientManager;
public SimpleQueryTerminator(
- ExecutorService executor,
ScheduledExecutorService scheduledExecutor,
QueryId queryId,
List<FragmentInstance> fragmentInstances,
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
- this.executor = executor;
this.scheduledExecutor = scheduledExecutor;
this.queryId = queryId;
this.internalServiceClientManager = internalServiceClientManager;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
index 328cbaa32f..09c7da7b43 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
@@ -49,7 +49,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
public class StandaloneScheduler implements IScheduler {
@@ -60,44 +59,29 @@ public class StandaloneScheduler implements IScheduler {
private static final Logger LOGGER = LoggerFactory.getLogger(StandaloneScheduler.class);
- private MPPQueryContext queryContext;
+ private final MPPQueryContext queryContext;
// The stateMachine of the QueryExecution owned by this QueryScheduler
- private QueryStateMachine stateMachine;
- private QueryType queryType;
+ private final QueryStateMachine stateMachine;
+ private final QueryType queryType;
// The fragment instances which should be sent to corresponding Nodes.
- private List<FragmentInstance> instances;
+ private final List<FragmentInstance> instances;
- private ExecutorService executor;
- private ScheduledExecutorService scheduledExecutor;
-
- private IFragInstanceDispatcher dispatcher;
- private IFragInstanceStateTracker stateTracker;
- private IQueryTerminator queryTerminator;
+ private final IFragInstanceStateTracker stateTracker;
public StandaloneScheduler(
MPPQueryContext queryContext,
QueryStateMachine stateMachine,
List<FragmentInstance> instances,
QueryType queryType,
- ExecutorService executor,
ScheduledExecutorService scheduledExecutor,
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
this.queryContext = queryContext;
this.instances = instances;
this.queryType = queryType;
- this.executor = executor;
- this.scheduledExecutor = scheduledExecutor;
this.stateMachine = stateMachine;
this.stateTracker =
new FixedRateFragInsStateTracker(
- stateMachine, executor, scheduledExecutor, instances, internalServiceClientManager);
- this.queryTerminator =
- new SimpleQueryTerminator(
- executor,
- scheduledExecutor,
- queryContext.getQueryId(),
- instances,
- internalServiceClientManager);
+ stateMachine, scheduledExecutor, instances, internalServiceClientManager);
}
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
index f43e702772..1fc92886d4 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
@@ -54,8 +54,7 @@ public class ConfigExecutionTest {
public void normalConfigTaskTest() {
IConfigTask task =
(clientManager) -> immediateFuture(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
- ConfigExecution execution =
- new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+ ConfigExecution execution = new ConfigExecution(genMPPQueryContext(), getExecutor(), task);
execution.start();
ExecutionResult result = execution.getStatus();
assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), result.status.code);
@@ -74,8 +73,7 @@ public class ConfigExecutionTest {
(clientManager) ->
immediateFuture(
new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, tsBlock, datasetHeader));
- ConfigExecution execution =
- new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+ ConfigExecution execution = new ConfigExecution(genMPPQueryContext(), getExecutor(), task);
execution.start();
ExecutionResult result = execution.getStatus();
TsBlock tsBlockFromExecution = null;
@@ -94,8 +92,7 @@ public class ConfigExecutionTest {
(clientManager) -> {
throw new RuntimeException("task throw exception when executing");
};
- ConfigExecution execution =
- new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+ ConfigExecution execution = new ConfigExecution(genMPPQueryContext(), getExecutor(), task);
execution.start();
ExecutionResult result = execution.getStatus();
assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), result.status.code);
@@ -118,8 +115,7 @@ public class ConfigExecutionTest {
}
}
IConfigTask task = new SimpleTask(taskResult);
- ConfigExecution execution =
- new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+ ConfigExecution execution = new ConfigExecution(genMPPQueryContext(), getExecutor(), task);
execution.start();
Thread resultThread =
@@ -139,8 +135,7 @@ public class ConfigExecutionTest {
(clientManager) -> {
throw new RuntimeException("task throw exception when executing");
};
- ConfigExecution execution =
- new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+ ConfigExecution execution = new ConfigExecution(genMPPQueryContext(), getExecutor(), task);
Thread resultThread =
new Thread(
() -> {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
index a5c65def26..34e2a9e85f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
@@ -157,7 +157,6 @@ public class StandaloneSchedulerTest {
stateMachine,
Collections.singletonList(fragmentInstance),
QueryType.WRITE,
- executor,
null,
null);
try {
@@ -261,7 +260,6 @@ public class StandaloneSchedulerTest {
stateMachine,
Collections.singletonList(fragmentInstance),
QueryType.WRITE,
- executor,
null,
null);
try {
@@ -375,7 +373,6 @@ public class StandaloneSchedulerTest {
stateMachine,
Collections.singletonList(fragmentInstance),
QueryType.WRITE,
- executor,
null,
null);
try {
@@ -428,7 +425,6 @@ public class StandaloneSchedulerTest {
stateMachine,
Collections.singletonList(fragmentInstance),
QueryType.WRITE,
- executor,
null,
null);
try {
@@ -510,7 +506,6 @@ public class StandaloneSchedulerTest {
stateMachine,
Collections.singletonList(fragmentInstance),
QueryType.WRITE,
- executor,
null,
null);
try {