You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2023/05/10 03:03:36 UTC

[iotdb] branch cpu-monitor updated: refactor query thread name

This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch cpu-monitor
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/cpu-monitor by this push:
     new 9e86dce092 refactor query thread name
9e86dce092 is described below

commit 9e86dce09295aaa81ca94f10341941a52bc2dc9c
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Wed May 10 11:03:22 2023 +0800

    refactor query thread name
---
 .../iotdb/commons/concurrent/ThreadName.java       |  4 --
 .../iotdb/commons/constant/QueryThreadName.java    | 55 ++++++++++++++++++++++
 .../client/mock/MockInternalRPCService.java        |  6 +--
 .../execution/exchange/MPPDataExchangeService.java | 10 ++--
 .../exchange/MPPDataExchangeServiceMetrics.java    |  6 +--
 .../fragment/FragmentInstanceManager.java          |  7 ++-
 .../db/mpp/execution/schedule/DriverScheduler.java |  5 +-
 .../org/apache/iotdb/db/mpp/plan/Coordinator.java  |  7 ++-
 .../db/service/DataNodeInternalRPCService.java     |  6 +--
 .../service/DataNodeInternalRPCServiceMetrics.java |  6 +--
 .../db/service/basic/QueryFrequencyRecorder.java   |  4 +-
 11 files changed, 86 insertions(+), 30 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index e88b7af0b8..3305f72036 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -25,10 +25,6 @@ public enum ThreadName {
   CONFIGNODE_RPC_PROCESSOR("ConfigNodeRPC-Processor"),
   IOT_CONSENSUS_RPC_SERVICE("IoTConsensusRPC-Service"),
   IOT_CONSENSUS_RPC_PROCESSOR("IoTConsensusRPC-Processor"),
-  MPP_DATA_EXCHANGE_RPC_SERVICE("MPPDataExchangeRPC-Service"),
-  MPP_DATA_EXCHANGE_RPC_PROCESSOR("MPPDataExchangeRPC-Processor"),
-  DATANODE_INTERNAL_RPC_SERVICE("DataNodeInternalRPC-Service"),
-  DATANODE_INTERNAL_RPC_PROCESSOR("DataNodeInternalRPC-Processor"),
   INFLUXDB_RPC_SERVICE("InfluxdbRPC-Service"),
   INFLUXDB_RPC_PROCESSOR("InfluxdbRPC-Processor"),
   STORAGE_ENGINE_CACHED_SERVICE("StorageEngine"),
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/constant/QueryThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/constant/QueryThreadName.java
new file mode 100644
index 0000000000..d95d6c3df3
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/constant/QueryThreadName.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.constant;
+
+public enum QueryThreadName {
+  QUERY_WORKER_THREAD("Query-Worker-Thread"),
+  QUERY_SENTINEL_THREAD("Query-Sentinel-Thread"),
+  DATANODE_INTERNAL_RPC_SERVICE("DataNodeInternalRPC-Service"),
+  DATANODE_INTERNAL_RPC_PROCESSOR("DataNodeInternalRPC-Processor"),
+  MPP_DATA_EXCHANGE_RPC_SERVICE("MPPDataExchangeRPC-Service"),
+  MPP_DATA_EXCHANGE_RPC_PROCESSOR("MPPDataExchangeRPC-Processor"),
+  MPP_DATA_EXCHANGE_TASK_EXECUTORS("MPPDataExchangeTaskExecutors"),
+  MPP_COORDINATOR("MPPCoordinator"),
+  MPP_COORDINATOR_SCHEDULED("MPPCoordinatorScheduled"),
+  FRAGMENT_INSTANCE_MANAGEMENT("FragmentInstanceManagement"),
+  FRAGMENT_INSTANCE_NOTIFICATION("FragmentInstanceNotification"),
+  TIMED_QUERY_SQL_COUNT("TimedQuerySqlCount"),
+  ;
+
+  private final String name;
+
+  QueryThreadName(String name) {
+    this.name = name;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public static boolean contains(String threadName) {
+    for (QueryThreadName name : QueryThreadName.values()) {
+      if (threadName.contains(name.getName())) {
+        return true;
+      }
+    }
+    return false;
+  }
+}
diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/client/mock/MockInternalRPCService.java b/node-commons/src/test/java/org/apache/iotdb/commons/client/mock/MockInternalRPCService.java
index c4a6b4d3cc..53382b420a 100644
--- a/node-commons/src/test/java/org/apache/iotdb/commons/client/mock/MockInternalRPCService.java
+++ b/node-commons/src/test/java/org/apache/iotdb/commons/client/mock/MockInternalRPCService.java
@@ -20,8 +20,8 @@
 package org.apache.iotdb.commons.client.mock;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.constant.QueryThreadName;
 import org.apache.iotdb.commons.exception.runtime.RPCServiceException;
 import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.service.ThriftService;
@@ -67,7 +67,7 @@ public class MockInternalRPCService extends ThriftService implements MockInterna
           new ThriftServiceThread(
               processor,
               getID().getName(),
-              ThreadName.DATANODE_INTERNAL_RPC_SERVICE.getName(),
+              QueryThreadName.DATANODE_INTERNAL_RPC_SERVICE.getName(),
               getBindIP(),
               getBindPort(),
               65535,
@@ -77,7 +77,7 @@ public class MockInternalRPCService extends ThriftService implements MockInterna
     } catch (RPCServiceException e) {
       throw new IllegalAccessException(e.getMessage());
     }
-    thriftServiceThread.setName(ThreadName.DATANODE_INTERNAL_RPC_SERVICE.getName());
+    thriftServiceThread.setName(QueryThreadName.DATANODE_INTERNAL_RPC_SERVICE.getName());
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeService.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeService.java
index fd935ff3c8..8cc70f4a98 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeService.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeService.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
-import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.constant.QueryThreadName;
 import org.apache.iotdb.commons.exception.runtime.RPCServiceException;
 import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.service.ThriftService;
@@ -60,8 +60,8 @@ public class MPPDataExchangeService extends ThriftService implements MPPDataExch
             TimeUnit.MILLISECONDS,
             // TODO: Use a priority queue.
             new LinkedBlockingQueue<>(),
-            new IoTThreadFactory("mpp-data-exchange-task-executors"),
-            "mpp-data-exchange-task-executors");
+            new IoTThreadFactory(QueryThreadName.MPP_DATA_EXCHANGE_TASK_EXECUTORS.getName()),
+            QueryThreadName.MPP_DATA_EXCHANGE_TASK_EXECUTORS.getName());
     this.mppDataExchangeManager =
         new MPPDataExchangeManager(
             new LocalMemoryManager(),
@@ -91,7 +91,7 @@ public class MPPDataExchangeService extends ThriftService implements MPPDataExch
           new ThriftServiceThread(
               processor,
               getID().getName(),
-              ThreadName.MPP_DATA_EXCHANGE_RPC_PROCESSOR.getName(),
+              QueryThreadName.MPP_DATA_EXCHANGE_RPC_PROCESSOR.getName(),
               getBindIP(),
               getBindPort(),
               config.getRpcMaxConcurrentClientNum(),
@@ -102,7 +102,7 @@ public class MPPDataExchangeService extends ThriftService implements MPPDataExch
     } catch (RPCServiceException e) {
       throw new IllegalAccessException(e.getMessage());
     }
-    thriftServiceThread.setName(ThreadName.MPP_DATA_EXCHANGE_RPC_SERVICE.getName());
+    thriftServiceThread.setName(QueryThreadName.MPP_DATA_EXCHANGE_RPC_SERVICE.getName());
     MetricService.getInstance()
         .addMetricSet(new MPPDataExchangeServiceMetrics(thriftServiceThread));
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeServiceMetrics.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeServiceMetrics.java
index 4297a217a9..3047ae0756 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeServiceMetrics.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeServiceMetrics.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.mpp.execution.exchange;
 
-import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.constant.QueryThreadName;
 import org.apache.iotdb.commons.service.AbstractThriftServiceThread;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
@@ -43,7 +43,7 @@ public class MPPDataExchangeServiceMetrics implements IMetricSet {
         thriftServiceThread,
         AbstractThriftServiceThread::getActiveThreadCount,
         Tag.NAME.toString(),
-        ThreadName.MPP_DATA_EXCHANGE_RPC_SERVICE.getName());
+        QueryThreadName.MPP_DATA_EXCHANGE_RPC_SERVICE.getName());
   }
 
   @Override
@@ -52,7 +52,7 @@ public class MPPDataExchangeServiceMetrics implements IMetricSet {
         MetricType.AUTO_GAUGE,
         Metric.THRIFT_ACTIVE_THREADS.toString(),
         Tag.NAME.toString(),
-        ThreadName.MPP_DATA_EXCHANGE_RPC_SERVICE.getName());
+        QueryThreadName.MPP_DATA_EXCHANGE_RPC_SERVICE.getName());
   }
 
   public AbstractThriftServiceThread getThriftServiceThread() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 6e5390fdc1..a41204965a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution.fragment;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.constant.QueryThreadName;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.IDataRegionForQuery;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
@@ -85,9 +86,11 @@ public class FragmentInstanceManager {
     this.instanceContext = new ConcurrentHashMap<>();
     this.instanceExecution = new ConcurrentHashMap<>();
     this.instanceManagementExecutor =
-        IoTDBThreadPoolFactory.newScheduledThreadPool(1, "instance-management");
+        IoTDBThreadPoolFactory.newScheduledThreadPool(
+            1, QueryThreadName.FRAGMENT_INSTANCE_MANAGEMENT.getName());
     this.instanceNotificationExecutor =
-        IoTDBThreadPoolFactory.newFixedThreadPool(4, "instance-notification");
+        IoTDBThreadPoolFactory.newFixedThreadPool(
+            4, QueryThreadName.FRAGMENT_INSTANCE_NOTIFICATION.getName());
 
     this.infoCacheTime = new Duration(5, TimeUnit.MINUTES);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index 50862859e6..4675e083be 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.execution.schedule;
 
+import org.apache.iotdb.commons.constant.QueryThreadName;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
@@ -106,7 +107,7 @@ public class DriverScheduler implements IDriverScheduler, IService {
   public void start() throws StartupException {
     for (int i = 0; i < WORKER_THREAD_NUM; i++) {
       int index = i;
-      String threadName = "Query-Worker-Thread-" + i;
+      String threadName = QueryThreadName.QUERY_WORKER_THREAD.getName() + "-" + i;
       ThreadProducer producer =
           new ThreadProducer() {
             @Override
@@ -127,7 +128,7 @@ public class DriverScheduler implements IDriverScheduler, IService {
       t.start();
     }
 
-    String threadName = "Query-Sentinel-Thread";
+    String threadName = QueryThreadName.QUERY_SENTINEL_THREAD.getName();
     ThreadProducer producer =
         new ThreadProducer() {
           @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index 2a2ae6f0d8..aec7cdee4f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.constant.QueryThreadName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.DataNodeEndPoints;
@@ -60,9 +61,7 @@ public class Coordinator {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(Coordinator.class);
 
-  private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator";
   private static final String COORDINATOR_WRITE_EXECUTOR_NAME = "MPPCoordinatorWrite";
-  private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME = "MPPCoordinatorScheduled";
   private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 10;
   private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
 
@@ -189,7 +188,7 @@ public class Coordinator {
     int coordinatorReadExecutorSize =
         CONFIG.isClusterMode() ? CONFIG.getCoordinatorReadExecutorSize() : 1;
     return IoTDBThreadPoolFactory.newFixedThreadPool(
-        coordinatorReadExecutorSize, COORDINATOR_EXECUTOR_NAME);
+        coordinatorReadExecutorSize, QueryThreadName.MPP_COORDINATOR.getName());
   }
 
   private ExecutorService getWriteExecutor() {
@@ -201,7 +200,7 @@ public class Coordinator {
   // TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
   private ScheduledExecutorService getScheduledExecutor() {
     return IoTDBThreadPoolFactory.newScheduledThreadPool(
-        COORDINATOR_SCHEDULED_EXECUTOR_SIZE, COORDINATOR_SCHEDULED_EXECUTOR_NAME);
+        COORDINATOR_SCHEDULED_EXECUTOR_SIZE, QueryThreadName.MPP_COORDINATOR_SCHEDULED.getName());
   }
 
   public QueryId createQueryId() {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java b/server/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java
index 915ba071b9..32f24d25cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.service;
 
-import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.constant.QueryThreadName;
 import org.apache.iotdb.commons.exception.runtime.RPCServiceException;
 import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.service.ThriftService;
@@ -60,7 +60,7 @@ public class DataNodeInternalRPCService extends ThriftService
           new ThriftServiceThread(
               processor,
               getID().getName(),
-              ThreadName.DATANODE_INTERNAL_RPC_PROCESSOR.getName(),
+              QueryThreadName.DATANODE_INTERNAL_RPC_PROCESSOR.getName(),
               getBindIP(),
               getBindPort(),
               config.getRpcMaxConcurrentClientNum(),
@@ -71,7 +71,7 @@ public class DataNodeInternalRPCService extends ThriftService
     } catch (RPCServiceException e) {
       throw new IllegalAccessException(e.getMessage());
     }
-    thriftServiceThread.setName(ThreadName.DATANODE_INTERNAL_RPC_SERVICE.getName());
+    thriftServiceThread.setName(QueryThreadName.DATANODE_INTERNAL_RPC_SERVICE.getName());
     MetricService.getInstance()
         .addMetricSet(new DataNodeInternalRPCServiceMetrics(thriftServiceThread));
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceMetrics.java b/server/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceMetrics.java
index 782e6e59a0..bcaff62956 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceMetrics.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceMetrics.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.service;
 
-import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.constant.QueryThreadName;
 import org.apache.iotdb.commons.service.AbstractThriftServiceThread;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
@@ -45,7 +45,7 @@ public class DataNodeInternalRPCServiceMetrics implements IMetricSet {
         thriftServiceThread,
         AbstractThriftServiceThread::getActiveThreadCount,
         Tag.NAME.toString(),
-        ThreadName.DATANODE_INTERNAL_RPC_SERVICE.getName());
+        QueryThreadName.DATANODE_INTERNAL_RPC_SERVICE.getName());
   }
 
   @Override
@@ -54,7 +54,7 @@ public class DataNodeInternalRPCServiceMetrics implements IMetricSet {
         MetricType.AUTO_GAUGE,
         Metric.THRIFT_ACTIVE_THREADS.toString(),
         Tag.NAME.toString(),
-        ThreadName.DATANODE_INTERNAL_RPC_SERVICE.getName());
+        QueryThreadName.DATANODE_INTERNAL_RPC_SERVICE.getName());
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/QueryFrequencyRecorder.java b/server/src/main/java/org/apache/iotdb/db/service/basic/QueryFrequencyRecorder.java
index 5c4290f4c4..ec89a817b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/basic/QueryFrequencyRecorder.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/QueryFrequencyRecorder.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.service.basic;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.constant.QueryThreadName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 
 import org.slf4j.Logger;
@@ -36,7 +37,8 @@ public class QueryFrequencyRecorder {
 
   public QueryFrequencyRecorder(IoTDBConfig config) {
     ScheduledExecutorService timedQuerySqlCountThread =
-        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("timedQuerySqlCount");
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+            QueryThreadName.TIMED_QUERY_SQL_COUNT.getName());
     ScheduledExecutorUtil.safelyScheduleAtFixedRate(
         timedQuerySqlCountThread,
         () -> {