You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2023/05/10 03:03:36 UTC
[iotdb] branch cpu-monitor updated: refactor query thread name
This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch cpu-monitor
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/cpu-monitor by this push:
new 9e86dce092 refactor query thread name
9e86dce092 is described below
commit 9e86dce09295aaa81ca94f10341941a52bc2dc9c
Author: Liu Xuxin <li...@outlook.com>
AuthorDate: Wed May 10 11:03:22 2023 +0800
refactor query thread name
---
.../iotdb/commons/concurrent/ThreadName.java | 4 --
.../iotdb/commons/constant/QueryThreadName.java | 55 ++++++++++++++++++++++
.../client/mock/MockInternalRPCService.java | 6 +--
.../execution/exchange/MPPDataExchangeService.java | 10 ++--
.../exchange/MPPDataExchangeServiceMetrics.java | 6 +--
.../fragment/FragmentInstanceManager.java | 7 ++-
.../db/mpp/execution/schedule/DriverScheduler.java | 5 +-
.../org/apache/iotdb/db/mpp/plan/Coordinator.java | 7 ++-
.../db/service/DataNodeInternalRPCService.java | 6 +--
.../service/DataNodeInternalRPCServiceMetrics.java | 6 +--
.../db/service/basic/QueryFrequencyRecorder.java | 4 +-
11 files changed, 86 insertions(+), 30 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index e88b7af0b8..3305f72036 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -25,10 +25,6 @@ public enum ThreadName {
CONFIGNODE_RPC_PROCESSOR("ConfigNodeRPC-Processor"),
IOT_CONSENSUS_RPC_SERVICE("IoTConsensusRPC-Service"),
IOT_CONSENSUS_RPC_PROCESSOR("IoTConsensusRPC-Processor"),
- MPP_DATA_EXCHANGE_RPC_SERVICE("MPPDataExchangeRPC-Service"),
- MPP_DATA_EXCHANGE_RPC_PROCESSOR("MPPDataExchangeRPC-Processor"),
- DATANODE_INTERNAL_RPC_SERVICE("DataNodeInternalRPC-Service"),
- DATANODE_INTERNAL_RPC_PROCESSOR("DataNodeInternalRPC-Processor"),
INFLUXDB_RPC_SERVICE("InfluxdbRPC-Service"),
INFLUXDB_RPC_PROCESSOR("InfluxdbRPC-Processor"),
STORAGE_ENGINE_CACHED_SERVICE("StorageEngine"),
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/constant/QueryThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/constant/QueryThreadName.java
new file mode 100644
index 0000000000..d95d6c3df3
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/constant/QueryThreadName.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.constant;
+
+public enum QueryThreadName {
+ QUERY_WORKER_THREAD("Query-Worker-Thread"),
+ QUERY_SENTINEL_THREAD("Query-Sentinel-Thread"),
+ DATANODE_INTERNAL_RPC_SERVICE("DataNodeInternalRPC-Service"),
+ DATANODE_INTERNAL_RPC_PROCESSOR("DataNodeInternalRPC-Processor"),
+ MPP_DATA_EXCHANGE_RPC_SERVICE("MPPDataExchangeRPC-Service"),
+ MPP_DATA_EXCHANGE_RPC_PROCESSOR("MPPDataExchangeRPC-Processor"),
+ MPP_DATA_EXCHANGE_TASK_EXECUTORS("MPPDataExchangeTaskExecutors"),
+ MPP_COORDINATOR("MPPCoordinator"),
+ MPP_COORDINATOR_SCHEDULED("MPPCoordinatorScheduled"),
+ FRAGMENT_INSTANCE_MANAGEMENT("FragmentInstanceManagement"),
+ FRAGMENT_INSTANCE_NOTIFICATION("FragmentInstanceNotification"),
+ TIMED_QUERY_SQL_COUNT("TimedQuerySqlCount"),
+ ;
+
+ private final String name;
+
+ QueryThreadName(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public static boolean contains(String threadName) {
+ for (QueryThreadName name : QueryThreadName.values()) {
+ if (threadName.contains(name.getName())) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/client/mock/MockInternalRPCService.java b/node-commons/src/test/java/org/apache/iotdb/commons/client/mock/MockInternalRPCService.java
index c4a6b4d3cc..53382b420a 100644
--- a/node-commons/src/test/java/org/apache/iotdb/commons/client/mock/MockInternalRPCService.java
+++ b/node-commons/src/test/java/org/apache/iotdb/commons/client/mock/MockInternalRPCService.java
@@ -20,8 +20,8 @@
package org.apache.iotdb.commons.client.mock;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.constant.QueryThreadName;
import org.apache.iotdb.commons.exception.runtime.RPCServiceException;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.service.ThriftService;
@@ -67,7 +67,7 @@ public class MockInternalRPCService extends ThriftService implements MockInterna
new ThriftServiceThread(
processor,
getID().getName(),
- ThreadName.DATANODE_INTERNAL_RPC_SERVICE.getName(),
+ QueryThreadName.DATANODE_INTERNAL_RPC_SERVICE.getName(),
getBindIP(),
getBindPort(),
65535,
@@ -77,7 +77,7 @@ public class MockInternalRPCService extends ThriftService implements MockInterna
} catch (RPCServiceException e) {
throw new IllegalAccessException(e.getMessage());
}
- thriftServiceThread.setName(ThreadName.DATANODE_INTERNAL_RPC_SERVICE.getName());
+ thriftServiceThread.setName(QueryThreadName.DATANODE_INTERNAL_RPC_SERVICE.getName());
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeService.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeService.java
index fd935ff3c8..8cc70f4a98 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeService.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeService.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
-import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.constant.QueryThreadName;
import org.apache.iotdb.commons.exception.runtime.RPCServiceException;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.service.ThriftService;
@@ -60,8 +60,8 @@ public class MPPDataExchangeService extends ThriftService implements MPPDataExch
TimeUnit.MILLISECONDS,
// TODO: Use a priority queue.
new LinkedBlockingQueue<>(),
- new IoTThreadFactory("mpp-data-exchange-task-executors"),
- "mpp-data-exchange-task-executors");
+ new IoTThreadFactory(QueryThreadName.MPP_DATA_EXCHANGE_TASK_EXECUTORS.getName()),
+ QueryThreadName.MPP_DATA_EXCHANGE_TASK_EXECUTORS.getName());
this.mppDataExchangeManager =
new MPPDataExchangeManager(
new LocalMemoryManager(),
@@ -91,7 +91,7 @@ public class MPPDataExchangeService extends ThriftService implements MPPDataExch
new ThriftServiceThread(
processor,
getID().getName(),
- ThreadName.MPP_DATA_EXCHANGE_RPC_PROCESSOR.getName(),
+ QueryThreadName.MPP_DATA_EXCHANGE_RPC_PROCESSOR.getName(),
getBindIP(),
getBindPort(),
config.getRpcMaxConcurrentClientNum(),
@@ -102,7 +102,7 @@ public class MPPDataExchangeService extends ThriftService implements MPPDataExch
} catch (RPCServiceException e) {
throw new IllegalAccessException(e.getMessage());
}
- thriftServiceThread.setName(ThreadName.MPP_DATA_EXCHANGE_RPC_SERVICE.getName());
+ thriftServiceThread.setName(QueryThreadName.MPP_DATA_EXCHANGE_RPC_SERVICE.getName());
MetricService.getInstance()
.addMetricSet(new MPPDataExchangeServiceMetrics(thriftServiceThread));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeServiceMetrics.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeServiceMetrics.java
index 4297a217a9..3047ae0756 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeServiceMetrics.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeServiceMetrics.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.mpp.execution.exchange;
-import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.constant.QueryThreadName;
import org.apache.iotdb.commons.service.AbstractThriftServiceThread;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
@@ -43,7 +43,7 @@ public class MPPDataExchangeServiceMetrics implements IMetricSet {
thriftServiceThread,
AbstractThriftServiceThread::getActiveThreadCount,
Tag.NAME.toString(),
- ThreadName.MPP_DATA_EXCHANGE_RPC_SERVICE.getName());
+ QueryThreadName.MPP_DATA_EXCHANGE_RPC_SERVICE.getName());
}
@Override
@@ -52,7 +52,7 @@ public class MPPDataExchangeServiceMetrics implements IMetricSet {
MetricType.AUTO_GAUGE,
Metric.THRIFT_ACTIVE_THREADS.toString(),
Tag.NAME.toString(),
- ThreadName.MPP_DATA_EXCHANGE_RPC_SERVICE.getName());
+ QueryThreadName.MPP_DATA_EXCHANGE_RPC_SERVICE.getName());
}
public AbstractThriftServiceThread getThriftServiceThread() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 6e5390fdc1..a41204965a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution.fragment;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.constant.QueryThreadName;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.IDataRegionForQuery;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
@@ -85,9 +86,11 @@ public class FragmentInstanceManager {
this.instanceContext = new ConcurrentHashMap<>();
this.instanceExecution = new ConcurrentHashMap<>();
this.instanceManagementExecutor =
- IoTDBThreadPoolFactory.newScheduledThreadPool(1, "instance-management");
+ IoTDBThreadPoolFactory.newScheduledThreadPool(
+ 1, QueryThreadName.FRAGMENT_INSTANCE_MANAGEMENT.getName());
this.instanceNotificationExecutor =
- IoTDBThreadPoolFactory.newFixedThreadPool(4, "instance-notification");
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ 4, QueryThreadName.FRAGMENT_INSTANCE_NOTIFICATION.getName());
this.infoCacheTime = new Duration(5, TimeUnit.MINUTES);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
index 50862859e6..4675e083be 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.mpp.execution.schedule;
+import org.apache.iotdb.commons.constant.QueryThreadName;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
@@ -106,7 +107,7 @@ public class DriverScheduler implements IDriverScheduler, IService {
public void start() throws StartupException {
for (int i = 0; i < WORKER_THREAD_NUM; i++) {
int index = i;
- String threadName = "Query-Worker-Thread-" + i;
+ String threadName = QueryThreadName.QUERY_WORKER_THREAD.getName() + "-" + i;
ThreadProducer producer =
new ThreadProducer() {
@Override
@@ -127,7 +128,7 @@ public class DriverScheduler implements IDriverScheduler, IService {
t.start();
}
- String threadName = "Query-Sentinel-Thread";
+ String threadName = QueryThreadName.QUERY_SENTINEL_THREAD.getName();
ThreadProducer producer =
new ThreadProducer() {
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index 2a2ae6f0d8..aec7cdee4f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.constant.QueryThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.DataNodeEndPoints;
@@ -60,9 +61,7 @@ public class Coordinator {
private static final Logger LOGGER = LoggerFactory.getLogger(Coordinator.class);
- private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator";
private static final String COORDINATOR_WRITE_EXECUTOR_NAME = "MPPCoordinatorWrite";
- private static final String COORDINATOR_SCHEDULED_EXECUTOR_NAME = "MPPCoordinatorScheduled";
private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 10;
private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
@@ -189,7 +188,7 @@ public class Coordinator {
int coordinatorReadExecutorSize =
CONFIG.isClusterMode() ? CONFIG.getCoordinatorReadExecutorSize() : 1;
return IoTDBThreadPoolFactory.newFixedThreadPool(
- coordinatorReadExecutorSize, COORDINATOR_EXECUTOR_NAME);
+ coordinatorReadExecutorSize, QueryThreadName.MPP_COORDINATOR.getName());
}
private ExecutorService getWriteExecutor() {
@@ -201,7 +200,7 @@ public class Coordinator {
// TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
private ScheduledExecutorService getScheduledExecutor() {
return IoTDBThreadPoolFactory.newScheduledThreadPool(
- COORDINATOR_SCHEDULED_EXECUTOR_SIZE, COORDINATOR_SCHEDULED_EXECUTOR_NAME);
+ COORDINATOR_SCHEDULED_EXECUTOR_SIZE, QueryThreadName.MPP_COORDINATOR_SCHEDULED.getName());
}
public QueryId createQueryId() {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java b/server/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java
index 915ba071b9..32f24d25cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.service;
-import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.constant.QueryThreadName;
import org.apache.iotdb.commons.exception.runtime.RPCServiceException;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.service.ThriftService;
@@ -60,7 +60,7 @@ public class DataNodeInternalRPCService extends ThriftService
new ThriftServiceThread(
processor,
getID().getName(),
- ThreadName.DATANODE_INTERNAL_RPC_PROCESSOR.getName(),
+ QueryThreadName.DATANODE_INTERNAL_RPC_PROCESSOR.getName(),
getBindIP(),
getBindPort(),
config.getRpcMaxConcurrentClientNum(),
@@ -71,7 +71,7 @@ public class DataNodeInternalRPCService extends ThriftService
} catch (RPCServiceException e) {
throw new IllegalAccessException(e.getMessage());
}
- thriftServiceThread.setName(ThreadName.DATANODE_INTERNAL_RPC_SERVICE.getName());
+ thriftServiceThread.setName(QueryThreadName.DATANODE_INTERNAL_RPC_SERVICE.getName());
MetricService.getInstance()
.addMetricSet(new DataNodeInternalRPCServiceMetrics(thriftServiceThread));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceMetrics.java b/server/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceMetrics.java
index 782e6e59a0..bcaff62956 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceMetrics.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceMetrics.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.service;
-import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.constant.QueryThreadName;
import org.apache.iotdb.commons.service.AbstractThriftServiceThread;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
@@ -45,7 +45,7 @@ public class DataNodeInternalRPCServiceMetrics implements IMetricSet {
thriftServiceThread,
AbstractThriftServiceThread::getActiveThreadCount,
Tag.NAME.toString(),
- ThreadName.DATANODE_INTERNAL_RPC_SERVICE.getName());
+ QueryThreadName.DATANODE_INTERNAL_RPC_SERVICE.getName());
}
@Override
@@ -54,7 +54,7 @@ public class DataNodeInternalRPCServiceMetrics implements IMetricSet {
MetricType.AUTO_GAUGE,
Metric.THRIFT_ACTIVE_THREADS.toString(),
Tag.NAME.toString(),
- ThreadName.DATANODE_INTERNAL_RPC_SERVICE.getName());
+ QueryThreadName.DATANODE_INTERNAL_RPC_SERVICE.getName());
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/QueryFrequencyRecorder.java b/server/src/main/java/org/apache/iotdb/db/service/basic/QueryFrequencyRecorder.java
index 5c4290f4c4..ec89a817b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/basic/QueryFrequencyRecorder.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/QueryFrequencyRecorder.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.service.basic;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.constant.QueryThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.slf4j.Logger;
@@ -36,7 +37,8 @@ public class QueryFrequencyRecorder {
public QueryFrequencyRecorder(IoTDBConfig config) {
ScheduledExecutorService timedQuerySqlCountThread =
- IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("timedQuerySqlCount");
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ QueryThreadName.TIMED_QUERY_SQL_COUNT.getName());
ScheduledExecutorUtil.safelyScheduleAtFixedRate(
timedQuerySqlCountThread,
() -> {