You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/08/03 23:35:14 UTC

[iotdb] branch master updated: [IOTDB-4037] reduce thread count for new standalone server (#6888)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4251d49630 [IOTDB-4037] reduce thread count for new standalone server (#6888)
4251d49630 is described below

commit 4251d49630f0aa51ac2d7b4679288be91fdcc75a
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Thu Aug 4 07:35:07 2022 +0800

    [IOTDB-4037] reduce thread count for new standalone server (#6888)
    
    Co-authored-by: qiaojialin <64...@qq.com>
---
 .../resources/conf/iotdb-datanode.properties       |  2 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  4 +---
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 23 +++++++++---------
 .../org/apache/iotdb/db/mpp/plan/Coordinator.java  |  5 ++--
 .../db/mpp/plan/execution/QueryExecution.java      |  3 +--
 .../mpp/plan/execution/config/ConfigExecution.java |  6 +----
 .../scheduler/AbstractFragInsStateTracker.java     |  4 ----
 .../db/mpp/plan/scheduler/ClusterScheduler.java    |  3 +--
 .../scheduler/FixedRateFragInsStateTracker.java    |  4 +---
 .../mpp/plan/scheduler/SimpleQueryTerminator.java  |  4 ----
 .../db/mpp/plan/scheduler/StandaloneScheduler.java | 28 +++++-----------------
 .../db/mpp/execution/ConfigExecutionTest.java      | 15 ++++--------
 .../plan/scheduler/StandaloneSchedulerTest.java    |  5 ----
 13 files changed, 30 insertions(+), 76 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 66c8460c5c..212e503163 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -1047,7 +1047,7 @@ timestamp_precision=ms
 
 # thread pool size for read operation in DataNode's coordinator.
 # Datatype: int
-# coordinator_read_executor_size=50
+# coordinator_read_executor_size=20
 
 # thread pool size for write operation in DataNode's coordinator.
 # Datatype: int
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 3df27bacd3..44842dddab 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -736,8 +736,6 @@ public class IoTDBConfig {
   // wait for 60 second by default.
   private int thriftServerAwaitTimeForStopService = 60;
 
-  private int queryCacheSizeInMetric = 50;
-
   // max size for tag and attribute of one time series
   private int tagAttributeTotalSize = 700;
 
@@ -931,7 +929,7 @@ public class IoTDBConfig {
   private int triggerForwardMQTTPoolSize = 4;
 
   /** ThreadPool size for read operation in coordinator */
-  private int coordinatorReadExecutorSize = 50;
+  private int coordinatorReadExecutorSize = 20;
 
   /** ThreadPool size for write operation in coordinator */
   private int coordinatorWriteExecutorSize = 50;
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 046cebbdc0..72f48793dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -146,8 +146,6 @@ public class IoTDBDescriptor {
 
       conf.setRpcAddress(properties.getProperty(IoTDBConstant.RPC_ADDRESS, conf.getRpcAddress()));
 
-      loadClusterProps(properties);
-
       // TODO: Use FQDN  to identify our nodes afterwards
       try {
         replaceHostnameWithIP();
@@ -888,6 +886,17 @@ public class IoTDBDescriptor {
               properties.getProperty("kerberos_principal", conf.getKerberosPrincipal()));
       TSFileDescriptor.getInstance().getConfig().setBatchSize(conf.getBatchSize());
 
+      conf.setCoordinatorReadExecutorSize(
+          Integer.parseInt(
+              properties.getProperty(
+                  "coordinator_read_executor_size",
+                  Integer.toString(conf.getCoordinatorReadExecutorSize()))));
+      conf.setCoordinatorWriteExecutorSize(
+          Integer.parseInt(
+              properties.getProperty(
+                  "coordinator_write_executor_size",
+                  Integer.toString(conf.getCoordinatorWriteExecutorSize()))));
+
       // commons
       commonDescriptor.loadCommonProps(properties);
       commonDescriptor.initCommonConfigDir(conf.getSystemDir());
@@ -1640,16 +1649,6 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "trigger_forward_mqtt_pool_size",
                 Integer.toString(conf.getTriggerForwardMQTTPoolSize()))));
-    conf.setCoordinatorReadExecutorSize(
-        Integer.parseInt(
-            properties.getProperty(
-                "coordinator_read_executor_size",
-                Integer.toString(conf.getCoordinatorReadExecutorSize()))));
-    conf.setCoordinatorWriteExecutorSize(
-        Integer.parseInt(
-            properties.getProperty(
-                "coordinator_write_executor_size",
-                Integer.toString(conf.getCoordinatorWriteExecutorSize()))));
   }
 
   private void loadCQProps(Properties properties) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index 70ab26f83b..d4f8ac8e9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -172,14 +172,13 @@ public class Coordinator {
   // TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
   private ExecutorService getQueryExecutor() {
     int coordinatorReadExecutorSize =
-        IoTDBDescriptor.getInstance().getConfig().getCoordinatorReadExecutorSize();
+        config.isClusterMode() ? config.getCoordinatorReadExecutorSize() : 1;
     return IoTDBThreadPoolFactory.newFixedThreadPool(
         coordinatorReadExecutorSize, COORDINATOR_EXECUTOR_NAME);
   }
 
   private ExecutorService getWriteExecutor() {
-    int coordinatorWriteExecutorSize =
-        IoTDBDescriptor.getInstance().getConfig().getCoordinatorWriteExecutorSize();
+    int coordinatorWriteExecutorSize = config.getCoordinatorWriteExecutorSize();
     return IoTDBThreadPoolFactory.newFixedThreadPool(
         coordinatorWriteExecutorSize, COORDINATOR_WRITE_EXECUTOR_NAME);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 4534b7b740..842352a7b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -98,7 +98,7 @@ public class QueryExecution implements IQueryExecution {
 
   private final List<PlanOptimizer> planOptimizers;
 
-  private Statement rawStatement;
+  private final Statement rawStatement;
   private Analysis analysis;
   private LogicalQueryPlan logicalPlan;
   private DistributedQueryPlan distributedPlan;
@@ -252,7 +252,6 @@ public class QueryExecution implements IQueryExecution {
                 stateMachine,
                 distributedPlan.getInstances(),
                 context.getQueryType(),
-                executor,
                 scheduledExecutor,
                 internalServiceClientManager);
     this.scheduler.start();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
index b807d51565..91260063a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
@@ -53,7 +53,6 @@ public class ConfigExecution implements IQueryExecution {
   private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
   private final MPPQueryContext context;
-  private final Statement statement;
   private final ExecutorService executor;
 
   private final QueryStateMachine stateMachine;
@@ -66,7 +65,6 @@ public class ConfigExecution implements IQueryExecution {
 
   public ConfigExecution(MPPQueryContext context, Statement statement, ExecutorService executor) {
     this.context = context;
-    this.statement = statement;
     this.executor = executor;
     this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
     this.taskFuture = SettableFuture.create();
@@ -80,10 +78,8 @@ public class ConfigExecution implements IQueryExecution {
   }
 
   @TestOnly
-  public ConfigExecution(
-      MPPQueryContext context, Statement statement, ExecutorService executor, IConfigTask task) {
+  public ConfigExecution(MPPQueryContext context, ExecutorService executor, IConfigTask task) {
     this.context = context;
-    this.statement = statement;
     this.executor = executor;
     this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
     this.taskFuture = SettableFuture.create();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
index 1be7c12ab0..50031dc9b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
@@ -33,13 +33,11 @@ import org.apache.thrift.TException;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 
 public abstract class AbstractFragInsStateTracker implements IFragInstanceStateTracker {
 
   protected QueryStateMachine stateMachine;
-  protected ExecutorService executor;
   protected ScheduledExecutorService scheduledExecutor;
   protected List<FragmentInstance> instances;
 
@@ -48,12 +46,10 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
 
   public AbstractFragInsStateTracker(
       QueryStateMachine stateMachine,
-      ExecutorService executor,
       ScheduledExecutorService scheduledExecutor,
       List<FragmentInstance> instances,
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
     this.stateMachine = stateMachine;
-    this.executor = executor;
     this.scheduledExecutor = scheduledExecutor;
     this.instances = instances;
     this.internalServiceClientManager = internalServiceClientManager;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index f2d36f093d..e9829cc637 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -80,10 +80,9 @@ public class ClusterScheduler implements IScheduler {
     if (queryType == QueryType.READ) {
       this.stateTracker =
           new FixedRateFragInsStateTracker(
-              stateMachine, executor, scheduledExecutor, instances, internalServiceClientManager);
+              stateMachine, scheduledExecutor, instances, internalServiceClientManager);
       this.queryTerminator =
           new SimpleQueryTerminator(
-              executor,
               scheduledExecutor,
               queryContext.getQueryId(),
               instances,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
index 59dbce1208..f000add3ba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
@@ -34,7 +34,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -54,11 +53,10 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
 
   public FixedRateFragInsStateTracker(
       QueryStateMachine stateMachine,
-      ExecutorService executor,
       ScheduledExecutorService scheduledExecutor,
       List<FragmentInstance> instances,
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
-    super(stateMachine, executor, scheduledExecutor, instances, internalServiceClientManager);
+    super(stateMachine, scheduledExecutor, instances, internalServiceClientManager);
     this.aborted = false;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
index 0a459f2e0f..32b329d640 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
@@ -35,7 +35,6 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -44,7 +43,6 @@ import java.util.stream.Collectors;
 public class SimpleQueryTerminator implements IQueryTerminator {
   private static final Logger logger = LoggerFactory.getLogger(SimpleQueryTerminator.class);
   private static final long TERMINATION_GRACE_PERIOD_IN_MS = 1000L;
-  private final ExecutorService executor;
   protected ScheduledExecutorService scheduledExecutor;
   private final QueryId queryId;
   private List<TEndPoint> relatedHost;
@@ -54,12 +52,10 @@ public class SimpleQueryTerminator implements IQueryTerminator {
       internalServiceClientManager;
 
   public SimpleQueryTerminator(
-      ExecutorService executor,
       ScheduledExecutorService scheduledExecutor,
       QueryId queryId,
       List<FragmentInstance> fragmentInstances,
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
-    this.executor = executor;
     this.scheduledExecutor = scheduledExecutor;
     this.queryId = queryId;
     this.internalServiceClientManager = internalServiceClientManager;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
index 328cbaa32f..09c7da7b43 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
@@ -49,7 +49,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 
 public class StandaloneScheduler implements IScheduler {
@@ -60,44 +59,29 @@ public class StandaloneScheduler implements IScheduler {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(StandaloneScheduler.class);
 
-  private MPPQueryContext queryContext;
+  private final MPPQueryContext queryContext;
   // The stateMachine of the QueryExecution owned by this QueryScheduler
-  private QueryStateMachine stateMachine;
-  private QueryType queryType;
+  private final QueryStateMachine stateMachine;
+  private final QueryType queryType;
   // The fragment instances which should be sent to corresponding Nodes.
-  private List<FragmentInstance> instances;
+  private final List<FragmentInstance> instances;
 
-  private ExecutorService executor;
-  private ScheduledExecutorService scheduledExecutor;
-
-  private IFragInstanceDispatcher dispatcher;
-  private IFragInstanceStateTracker stateTracker;
-  private IQueryTerminator queryTerminator;
+  private final IFragInstanceStateTracker stateTracker;
 
   public StandaloneScheduler(
       MPPQueryContext queryContext,
       QueryStateMachine stateMachine,
       List<FragmentInstance> instances,
       QueryType queryType,
-      ExecutorService executor,
       ScheduledExecutorService scheduledExecutor,
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
     this.queryContext = queryContext;
     this.instances = instances;
     this.queryType = queryType;
-    this.executor = executor;
-    this.scheduledExecutor = scheduledExecutor;
     this.stateMachine = stateMachine;
     this.stateTracker =
         new FixedRateFragInsStateTracker(
-            stateMachine, executor, scheduledExecutor, instances, internalServiceClientManager);
-    this.queryTerminator =
-        new SimpleQueryTerminator(
-            executor,
-            scheduledExecutor,
-            queryContext.getQueryId(),
-            instances,
-            internalServiceClientManager);
+            stateMachine, scheduledExecutor, instances, internalServiceClientManager);
   }
 
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
index f43e702772..1fc92886d4 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
@@ -54,8 +54,7 @@ public class ConfigExecutionTest {
   public void normalConfigTaskTest() {
     IConfigTask task =
         (clientManager) -> immediateFuture(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
-    ConfigExecution execution =
-        new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+    ConfigExecution execution = new ConfigExecution(genMPPQueryContext(), getExecutor(), task);
     execution.start();
     ExecutionResult result = execution.getStatus();
     assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), result.status.code);
@@ -74,8 +73,7 @@ public class ConfigExecutionTest {
         (clientManager) ->
             immediateFuture(
                 new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, tsBlock, datasetHeader));
-    ConfigExecution execution =
-        new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+    ConfigExecution execution = new ConfigExecution(genMPPQueryContext(), getExecutor(), task);
     execution.start();
     ExecutionResult result = execution.getStatus();
     TsBlock tsBlockFromExecution = null;
@@ -94,8 +92,7 @@ public class ConfigExecutionTest {
         (clientManager) -> {
           throw new RuntimeException("task throw exception when executing");
         };
-    ConfigExecution execution =
-        new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+    ConfigExecution execution = new ConfigExecution(genMPPQueryContext(), getExecutor(), task);
     execution.start();
     ExecutionResult result = execution.getStatus();
     assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), result.status.code);
@@ -118,8 +115,7 @@ public class ConfigExecutionTest {
       }
     }
     IConfigTask task = new SimpleTask(taskResult);
-    ConfigExecution execution =
-        new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+    ConfigExecution execution = new ConfigExecution(genMPPQueryContext(), getExecutor(), task);
     execution.start();
 
     Thread resultThread =
@@ -139,8 +135,7 @@ public class ConfigExecutionTest {
         (clientManager) -> {
           throw new RuntimeException("task throw exception when executing");
         };
-    ConfigExecution execution =
-        new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+    ConfigExecution execution = new ConfigExecution(genMPPQueryContext(), getExecutor(), task);
     Thread resultThread =
         new Thread(
             () -> {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
index a5c65def26..34e2a9e85f 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneSchedulerTest.java
@@ -157,7 +157,6 @@ public class StandaloneSchedulerTest {
             stateMachine,
             Collections.singletonList(fragmentInstance),
             QueryType.WRITE,
-            executor,
             null,
             null);
     try {
@@ -261,7 +260,6 @@ public class StandaloneSchedulerTest {
             stateMachine,
             Collections.singletonList(fragmentInstance),
             QueryType.WRITE,
-            executor,
             null,
             null);
     try {
@@ -375,7 +373,6 @@ public class StandaloneSchedulerTest {
             stateMachine,
             Collections.singletonList(fragmentInstance),
             QueryType.WRITE,
-            executor,
             null,
             null);
     try {
@@ -428,7 +425,6 @@ public class StandaloneSchedulerTest {
             stateMachine,
             Collections.singletonList(fragmentInstance),
             QueryType.WRITE,
-            executor,
             null,
             null);
     try {
@@ -510,7 +506,6 @@ public class StandaloneSchedulerTest {
             stateMachine,
             Collections.singletonList(fragmentInstance),
             QueryType.WRITE,
-            executor,
             null,
             null);
     try {