You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/12/29 06:02:11 UTC

[iotdb] branch jira5312 updated (3af7631083 -> fc8b93d383)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch jira5312
in repository https://gitbox.apache.org/repos/asf/iotdb.git


 discard 3af7631083 finish
     add b1b17f971a change STARTUP_RETRY_INTERVAL_IN_MS from 30s to 3s (#8629)
     add 8bb400d17a Fix Bug in NullColumn
     add 8ad71667cd [IOTDB-5278] fix connection problem of dbeaver and supports more functions (#8624)
     add dd7291dbf2 Bump wheel from 0.36.2 to 0.38.1 in /client-py (#8617)
     add db38945c16 [IOTDB-5297] Add maxRetryTimes for IoTDBClusterNodeErrorStartUpIT to avoid infinite loop (#8628)
     add 217e3d7747 [IOTDB-5106] Parse PathPattern to Deterministic Finite Automate (#8607)
     add 905a356c8f [IOTDB-5306] Change default port of ConfigNode and DataNode (#8635)
     add 14db9f8784 fix grafana-connector bug (#8636)
     add 48bd8c9b68 [IOTDB-5290] Add retry failed tasks thread for sync and trigger(#8610)
     add 15c9231120 [IOTDB-5300] Added node status check in migrate Region sql (#8641)
     new fc8b93d383 finish

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (3af7631083)
            \
             N -- N -- N   refs/heads/jira5312 (fc8b93d383)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 client-py/requirements_dev.txt                     |   2 +-
 .../resources/conf/iotdb-confignode.properties     |   8 +-
 .../assembly/resources/sbin/start-confignode.bat   |   4 +-
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |   6 +-
 .../confignode/conf/SystemPropertiesUtils.java     |   9 +-
 .../statemachine/ConfigNodeRegionStateMachine.java |   4 +-
 .../iotdb/confignode/manager/ConfigManager.java    |   9 +
 .../apache/iotdb/confignode/manager/IManager.java  |   7 +
 .../iotdb/confignode/manager/ProcedureManager.java |  50 ++++--
 .../confignode/manager/RetryFailedTasksThread.java | 195 +++++++++++++++++++++
 .../iotdb/confignode/manager/SyncManager.java      |   7 +-
 .../iotdb/confignode/manager/node/NodeManager.java |  72 --------
 .../OperatePipeProcedureRollbackProcessor.java     | 122 -------------
 .../iotdb/confignode/service/ConfigNode.java       |   2 +-
 .../request/ConfigPhysicalPlanSerDeTest.java       |  56 +++---
 .../load/balancer/router/RegionRouteMapTest.java   |   8 +-
 .../router/priority/GreedyPriorityTest.java        |   8 +-
 .../priority/LeaderPriorityBalancerTest.java       |  16 +-
 .../confignode/persistence/PartitionInfoTest.java  |   8 +-
 .../impl/CreateRegionGroupsProcedureTest.java      |  16 +-
 .../impl/node/AddConfigNodeProcedureTest.java      |   2 +-
 .../impl/node/RemoveConfigNodeProcedureTest.java   |   2 +-
 .../confignode1conf/iotdb-confignode.properties    |   6 +-
 .../confignode2conf/iotdb-confignode.properties    |   6 +-
 .../confignode3conf/iotdb-confignode.properties    |   6 +-
 docker/ReadMe.md                                   |   6 +-
 .../DockerCompose/docker-compose-cluster-1c2d.yml  |   6 +-
 .../DockerCompose/docker-compose-cluster-3c3d.yml  |  16 +-
 .../DockerCompose/docker-compose-host-3c3d.yml     |  16 +-
 .../DockerCompose/docker-compose-standalone.yml    |   4 +-
 docker/src/main/Dockerfile-0.12.6-cluster          |   6 +-
 docker/src/main/Dockerfile-0.13.0-cluster          |   6 +-
 docker/src/main/Dockerfile-1.0.0-confignode        |   4 +-
 docker/src/main/Dockerfile-1.0.0-datanode          |   4 +-
 docs/UserGuide/Cluster/Cluster-Maintenance.md      |  50 +++---
 docs/UserGuide/Cluster/Cluster-Setup.md            |  44 ++---
 .../Edge-Cloud-Collaboration/Sync-Tool.md          |   8 +-
 docs/UserGuide/QuickStart/ClusterQuickStart.md     |  92 +++++-----
 docs/UserGuide/QuickStart/WayToGetIoTDB.md         |  26 +--
 .../Reference/ConfigNode-Config-Manual.md          |   6 +-
 docs/UserGuide/Reference/DataNode-Config-Manual.md |  10 +-
 docs/zh/UserGuide/Cluster/Cluster-Maintenance.md   |  50 +++---
 docs/zh/UserGuide/Cluster/Cluster-Setup.md         |  42 ++---
 .../Edge-Cloud-Collaboration/Sync-Tool.md          |   8 +-
 docs/zh/UserGuide/QuickStart/ClusterQuickStart.md  | 102 +++++------
 docs/zh/UserGuide/QuickStart/WayToGetIoTDB.md      |  24 +--
 .../Reference/ConfigNode-Config-Manual.md          |   6 +-
 .../UserGuide/Reference/DataNode-Config-Manual.md  |  10 +-
 .../web/grafana/interceptor/LoginInterceptor.java  |   3 +
 .../org/apache/iotdb/it/env/RemoteServerEnv.java   |   2 +-
 .../it/cluster/IoTDBClusterNodeErrorStartUpIT.java |  48 ++++-
 .../apache/iotdb/jdbc/IoTDBDatabaseMetadata.java   | 166 ++++--------------
 node-commons/pom.xml                               |   4 +
 .../src/assembly/resources/sbin/iotdb-common.sh    |  12 +-
 .../apache/iotdb/commons/path/fa/FAFactory.java    |  55 ++++++
 .../iotdb/commons/path/fa/IFATransition.java       |   4 +-
 .../apache/iotdb/commons/path/fa/IPatternFA.java   |  58 ++++++
 .../apache/iotdb/commons/path/fa/dfa/DFAState.java |  44 +++--
 .../iotdb/commons/path/fa/dfa/PatternDFA.java      | 152 ++++++++++++++++
 .../iotdb/commons/path/fa/dfa/graph/Closure.java   |  46 +++--
 .../iotdb/commons/path/fa/dfa/graph/DFAGraph.java  | 173 ++++++++++++++++++
 .../iotdb/commons/path/fa/dfa/graph/NFAGraph.java  | 126 +++++++++++++
 .../fa/dfa/transition/AbstractDFATransition.java   |  34 ++--
 .../fa/dfa/transition/DFAPreciseTransition.java    |  28 +--
 .../fa/dfa/transition/DFAWildcardTransition.java   |  45 +++--
 .../iotdb/commons/path/fa/{ => nfa}/SimpleNFA.java |  12 +-
 .../commons/schema/tree/AbstractTreeVisitor.java   |  32 +++-
 .../iotdb/commons/client/ClientManagerTest.java    |   2 +-
 .../apache/iotdb/commons/path/PatternDFATest.java  | 153 ++++++++++++++++
 .../iotdb/commons/utils/NodeUrlUtilsTest.java      |   8 +-
 .../commons/utils/ThriftCommonsSerDeUtilsTest.java |  16 +-
 .../utils/ThriftConfigNodeSerDeUtilsTest.java      |   2 +-
 .../resources/conf/iotdb-datanode.properties       |  12 +-
 .../src/assembly/resources/sbin/start-datanode.bat |   8 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  10 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   2 +-
 .../org/apache/iotdb/db/conf/IoTDBStartCheck.java  |   3 +-
 .../db/mpp/common/header/ColumnHeaderConstant.java |   4 +-
 .../apache/iotdb/db/mpp/plan/TestRPCClient.java    |  40 ++---
 .../config/metadata/ShowClusterDetailsTask.java    |   4 +-
 .../java/org/apache/iotdb/db/service/DataNode.java |   4 +-
 .../common/schematree/ClusterSchemaTreeTest.java   |  86 +++++++++
 .../mpp/plan/plan/FragmentInstanceSerdeTest.java   |  16 +-
 .../datanode1conf/iotdb-datanode.properties        |  10 +-
 .../datanode2conf/iotdb-datanode.properties        |  12 +-
 .../datanode3conf/iotdb-datanode.properties        |  12 +-
 server/src/test/resources/logback-test.xml         |   1 +
 .../read/common/block/column/NullColumn.java       |  19 +-
 .../iotdb/tsfile/read/common/ColumnTest.java       |  12 ++
 89 files changed, 1747 insertions(+), 910 deletions(-)
 create mode 100644 confignode/src/main/java/org/apache/iotdb/confignode/manager/RetryFailedTasksThread.java
 delete mode 100644 confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/OperatePipeProcedureRollbackProcessor.java
 create mode 100644 node-commons/src/main/java/org/apache/iotdb/commons/path/fa/FAFactory.java
 copy server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/ShowDevicesPlanImpl.java => node-commons/src/main/java/org/apache/iotdb/commons/path/fa/dfa/DFAState.java (58%)
 create mode 100644 node-commons/src/main/java/org/apache/iotdb/commons/path/fa/dfa/PatternDFA.java
 copy server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/ShowDevicesPlanImpl.java => node-commons/src/main/java/org/apache/iotdb/commons/path/fa/dfa/graph/Closure.java (56%)
 create mode 100644 node-commons/src/main/java/org/apache/iotdb/commons/path/fa/dfa/graph/DFAGraph.java
 create mode 100644 node-commons/src/main/java/org/apache/iotdb/commons/path/fa/dfa/graph/NFAGraph.java
 copy server/src/main/java/org/apache/iotdb/db/metadata/plan/schemaregion/impl/read/ShowDevicesPlanImpl.java => node-commons/src/main/java/org/apache/iotdb/commons/path/fa/dfa/transition/AbstractDFATransition.java (58%)
 copy spark-tsfile/src/main/java/org/apache/iotdb/spark/tsfile/qp/common/Operator.java => node-commons/src/main/java/org/apache/iotdb/commons/path/fa/dfa/transition/DFAPreciseTransition.java (65%)
 mode change 100755 => 100644
 copy server/src/main/java/org/apache/iotdb/db/mpp/common/NodeRef.java => node-commons/src/main/java/org/apache/iotdb/commons/path/fa/dfa/transition/DFAWildcardTransition.java (51%)
 rename node-commons/src/main/java/org/apache/iotdb/commons/path/fa/{ => nfa}/SimpleNFA.java (97%)
 create mode 100644 node-commons/src/test/java/org/apache/iotdb/commons/path/PatternDFATest.java


[iotdb] 01/01: finish

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch jira5312
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fc8b93d3833a6fc2aa75ac373617de00d2e0900a
Author: OneSizeFitQuorum <ta...@apache.org>
AuthorDate: Thu Dec 29 13:57:11 2022 +0800

    finish
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
 .../client/sync/SyncConfigNodeClientPool.java      |  18 ++-
 .../client/sync/SyncDataNodeClientPool.java        |  14 +-
 .../service/thrift/ConfigNodeRPCService.java       |   2 +-
 .../apache/iotdb/consensus/iot/IoTConsensus.java   |   6 +-
 .../consensus/iot/IoTConsensusServerImpl.java      |  25 ++--
 .../iot/client/AsyncIoTConsensusServiceClient.java |  73 +++++-----
 .../iot/client/IoTConsensusClientPool.java         |  20 ++-
 .../iot/client/IoTConsensusServiceClient.java      | 116 +++++++++++++++
 .../iot/client/SyncIoTConsensusServiceClient.java  | 144 -------------------
 .../apache/iotdb/consensus/ratis/RatisClient.java  |  15 +-
 .../iotdb/consensus/ratis/RatisConsensus.java      |   9 +-
 .../java/org/apache/iotdb/it/env/AbstractEnv.java  |  23 ++-
 .../org/apache/iotdb/it/env/RemoteServerEnv.java   |  11 +-
 .../confignode/it/IoTDBClusterAuthorityIT.java     |   6 +-
 .../confignode/it/IoTDBConfigNodeSnapshotIT.java   |  12 +-
 .../confignode/it/IoTDBSnapshotTransferIT.java     |   6 +-
 .../iotdb/confignode/it/IoTDBStorageGroupIT.java   |  10 +-
 .../it/cluster/IoTDBClusterNodeErrorStartUpIT.java |  14 +-
 .../it/cluster/IoTDBClusterNodeGetterIT.java       |  14 +-
 .../it/cluster/IoTDBClusterRestartIT.java          |   6 +-
 .../load/IoTDBClusterRegionLeaderBalancingIT.java  |  10 +-
 .../it/load/IoTDBConfigNodeSwitchLeaderIT.java     |  10 +-
 .../partition/IoTDBAutoRegionGroupExtensionIT.java |   8 +-
 .../IoTDBCustomRegionGroupExtensionIT.java         |   6 +-
 .../it/partition/IoTDBPartitionDurableIT.java      |  26 ++--
 .../it/partition/IoTDBPartitionGetterIT.java       |  27 ++--
 .../partition/IoTDBPartitionInheritPolicyIT.java   |  15 +-
 .../apache/iotdb/commons/client/ClientManager.java |   3 +
 .../iotdb/commons/client/ClientPoolFactory.java    | 133 +++++++++++++++---
 .../apache/iotdb/commons/client/ThriftClient.java  |  87 ++++++++++++
 .../AsyncConfigNodeHeartbeatServiceClient.java     |  73 +++++-----
 .../async/AsyncConfigNodeIServiceClient.java       |  74 +++++-----
 .../async/AsyncDataNodeHeartbeatServiceClient.java |  73 +++++-----
 .../async/AsyncDataNodeInternalServiceClient.java  |  73 +++++-----
 .../AsyncDataNodeMPPDataExchangeServiceClient.java |  77 +++++-----
 .../client/exception/ClientManagerException.java   |   5 +
 .../AsyncThriftClientFactory.java}                 |  41 +++---
 .../client/{ => factory}/BaseClientFactory.java    |   9 +-
 .../ThriftClientFactory.java}                      |  21 +--
 .../client/{ => property}/ClientPoolProperty.java  |   6 +-
 .../ThriftClientProperty.java}                     |  12 +-
 ...ceClient.java => ConfigNodeIServiceClient.java} |  70 +++++-----
 ...ent.java => DataNodeInternalServiceClient.java} |  87 ++++++------
 ...a => DataNodeMPPDataExchangeServiceClient.java} |  75 +++++-----
 .../sync/SyncThriftClientWithErrorHandler.java     |  62 ++-------
 .../apache/iotdb/commons/conf/CommonConfig.java    |  43 ++++--
 .../iotdb/commons/conf/CommonDescriptor.java       |  51 ++++++-
 .../iotdb/commons/client/ClientManagerTest.java    | 155 ++++++++++++---------
 .../iotdb/db/auth/ClusterAuthorityFetcher.java     |   6 +-
 .../apache/iotdb/db/client/ConfigNodeClient.java   |  31 ++---
 .../iotdb/db/client/DataNodeClientPoolFactory.java | 117 +---------------
 .../metadata/template/ClusterTemplateManager.java  |   6 +-
 .../execution/exchange/MPPDataExchangeManager.java |   6 +-
 .../execution/exchange/MPPDataExchangeService.java |   9 +-
 .../db/mpp/execution/exchange/SinkHandle.java      |  10 +-
 .../db/mpp/execution/exchange/SourceHandle.java    |  10 +-
 .../org/apache/iotdb/db/mpp/plan/Coordinator.java  |  16 ++-
 .../apache/iotdb/db/mpp/plan/TestRPCClient.java    |  36 ++---
 .../mpp/plan/analyze/ClusterPartitionFetcher.java  |   5 +-
 .../db/mpp/plan/analyze/cache/PartitionCache.java  |   5 +-
 .../db/mpp/plan/execution/QueryExecution.java      |   6 +-
 .../config/executor/ClusterConfigTaskExecutor.java |   7 +-
 .../scheduler/AbstractFragInsStateTracker.java     |   8 +-
 .../db/mpp/plan/scheduler/ClusterScheduler.java    |   4 +-
 .../scheduler/FixedRateFragInsStateTracker.java    |   4 +-
 .../scheduler/FragmentInstanceDispatcherImpl.java  |   8 +-
 .../mpp/plan/scheduler/SimpleQueryTerminator.java  |   8 +-
 .../scheduler/load/LoadTsFileDispatcherImpl.java   |  10 +-
 .../plan/scheduler/load/LoadTsFileScheduler.java   |   4 +-
 .../db/sync/common/ClusterSyncInfoFetcher.java     |   6 +-
 .../db/trigger/executor/TriggerFireVisitor.java    |  22 ++-
 .../trigger/service/TriggerInformationUpdater.java |   6 +-
 .../exchange/MPPDataExchangeManagerTest.java       |  14 +-
 .../db/mpp/execution/exchange/SinkHandleTest.java  |  26 ++--
 .../mpp/execution/exchange/SourceHandleTest.java   |  32 ++---
 .../iotdb/db/mpp/plan/plan/QueryPlannerTest.java   |  10 +-
 76 files changed, 1187 insertions(+), 1121 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
index 0d0f5336f3..c236567523 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
@@ -21,14 +21,14 @@ package org.apache.iotdb.confignode.client.sync;
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
-import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.client.sync.ConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
 import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -45,15 +45,14 @@ public class SyncConfigNodeClientPool {
 
   private static final int MAX_RETRY_NUM = 6;
 
-  private final IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager;
+  private final IClientManager<TEndPoint, ConfigNodeIServiceClient> clientManager;
 
   private TEndPoint configNodeLeader;
 
   private SyncConfigNodeClientPool() {
     clientManager =
-        new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
-            .createClientManager(
-                new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
+        new IClientManager.Factory<TEndPoint, ConfigNodeIServiceClient>()
+            .createClientManager(new ClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
     configNodeLeader = new TEndPoint();
   }
 
@@ -70,7 +69,7 @@ public class SyncConfigNodeClientPool {
 
     Throwable lastException = null;
     for (int retry = 0; retry < MAX_RETRY_NUM; retry++) {
-      try (SyncConfigNodeIServiceClient client = clientManager.borrowClient(endPoint)) {
+      try (ConfigNodeIServiceClient client = clientManager.borrowClient(endPoint)) {
         switch (requestType) {
           case REGISTER_CONFIG_NODE:
             // Only use registerConfigNode when the ConfigNode is first startup.
@@ -117,14 +116,13 @@ public class SyncConfigNodeClientPool {
    * @return SUCCESS_STATUS: remove ConfigNode success, other status remove failed
    */
   public TSStatus removeConfigNode(
-      TConfigNodeLocation configNodeLocation, SyncConfigNodeIServiceClient client)
+      TConfigNodeLocation configNodeLocation, ConfigNodeIServiceClient client)
       throws ClientManagerException, TException, InterruptedException {
     TSStatus status = client.removeConfigNode(configNodeLocation);
     while (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
       TimeUnit.MILLISECONDS.sleep(2000);
       updateConfigNodeLeader(status);
-      try (SyncConfigNodeIServiceClient clientLeader =
-          clientManager.borrowClient(configNodeLeader)) {
+      try (ConfigNodeIServiceClient clientLeader = clientManager.borrowClient(configNodeLeader)) {
         status = clientLeader.removeConfigNode(configNodeLocation);
       }
     }
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
index 3136e05da1..9f79645717 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.sync.DataNodeInternalServiceClient;
 import org.apache.iotdb.confignode.client.DataNodeRequestType;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
@@ -52,11 +52,11 @@ public class SyncDataNodeClientPool {
 
   private static final int DEFAULT_RETRY_NUM = 6;
 
-  private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager;
+  private final IClientManager<TEndPoint, DataNodeInternalServiceClient> clientManager;
 
   private SyncDataNodeClientPool() {
     clientManager =
-        new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
+        new IClientManager.Factory<TEndPoint, DataNodeInternalServiceClient>()
             .createClientManager(
                 new ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
   }
@@ -65,7 +65,7 @@ public class SyncDataNodeClientPool {
       TEndPoint endPoint, Object req, DataNodeRequestType requestType) {
     Throwable lastException = new TException();
     for (int retry = 0; retry < DEFAULT_RETRY_NUM; retry++) {
-      try (SyncDataNodeInternalServiceClient client = clientManager.borrowClient(endPoint)) {
+      try (DataNodeInternalServiceClient client = clientManager.borrowClient(endPoint)) {
         return executeSyncRequest(requestType, client, req);
       } catch (ClientManagerException | TException e) {
         lastException = e;
@@ -84,7 +84,7 @@ public class SyncDataNodeClientPool {
       TEndPoint endPoint, Object req, DataNodeRequestType requestType, int retryNum) {
     Throwable lastException = new TException();
     for (int retry = 0; retry < retryNum; retry++) {
-      try (SyncDataNodeInternalServiceClient client = clientManager.borrowClient(endPoint)) {
+      try (DataNodeInternalServiceClient client = clientManager.borrowClient(endPoint)) {
         return executeSyncRequest(requestType, client, req);
       } catch (ClientManagerException | TException e) {
         lastException = e;
@@ -100,7 +100,7 @@ public class SyncDataNodeClientPool {
   }
 
   private TSStatus executeSyncRequest(
-      DataNodeRequestType requestType, SyncDataNodeInternalServiceClient client, Object req)
+      DataNodeRequestType requestType, DataNodeInternalServiceClient client, Object req)
       throws TException {
     switch (requestType) {
       case INVALIDATE_PARTITION_CACHE:
@@ -164,7 +164,7 @@ public class SyncDataNodeClientPool {
       TConsensusGroupId regionId, TEndPoint dataNode, TDataNodeLocation newLeaderNode) {
     LOGGER.info("Send RPC to data node: {} for changing regions leader on it", dataNode);
     TSStatus status;
-    try (SyncDataNodeInternalServiceClient client = clientManager.borrowClient(dataNode)) {
+    try (DataNodeInternalServiceClient client = clientManager.borrowClient(dataNode)) {
       TRegionLeaderChangeReq req = new TRegionLeaderChangeReq(regionId, newLeaderNode);
       status = client.changeRegionLeader(req);
     } catch (ClientManagerException e) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCService.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCService.java
index e467d4b4bb..824b02b3e2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCService.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCService.java
@@ -74,7 +74,7 @@ public class ConfigNodeRPCService extends ThriftService implements ConfigNodeRPC
               configConf.getCnRpcMaxConcurrentClientNum(),
               configConf.getThriftServerAwaitTimeForStopService(),
               new ConfigNodeRPCServiceHandler(),
-              commonConfig.isCnRpcThriftCompressionEnabled());
+              commonConfig.isRpcThriftCompressionEnabled());
     } catch (RPCServiceException e) {
       throw new IllegalAccessException(e.getMessage());
     }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 5135d384a2..45e6d988ff 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -45,7 +45,7 @@ import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
 import org.apache.iotdb.consensus.iot.client.AsyncIoTConsensusServiceClient;
 import org.apache.iotdb.consensus.iot.client.IoTConsensusClientPool.AsyncIoTConsensusServiceClientPoolFactory;
 import org.apache.iotdb.consensus.iot.client.IoTConsensusClientPool.SyncIoTConsensusServiceClientPoolFactory;
-import org.apache.iotdb.consensus.iot.client.SyncIoTConsensusServiceClient;
+import org.apache.iotdb.consensus.iot.client.IoTConsensusServiceClient;
 import org.apache.iotdb.consensus.iot.logdispatcher.IoTConsensusMemoryManager;
 import org.apache.iotdb.consensus.iot.service.IoTConsensusRPCService;
 import org.apache.iotdb.consensus.iot.service.IoTConsensusRPCServiceProcessor;
@@ -80,7 +80,7 @@ public class IoTConsensus implements IConsensus {
   private final RegisterManager registerManager = new RegisterManager();
   private final IoTConsensusConfig config;
   private final IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager;
-  private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager;
+  private final IClientManager<TEndPoint, IoTConsensusServiceClient> syncClientManager;
 
   public IoTConsensus(ConsensusConfig config, Registry registry) {
     this.thisNode = config.getThisNodeEndPoint();
@@ -94,7 +94,7 @@ public class IoTConsensus implements IConsensus {
             .createClientManager(
                 new AsyncIoTConsensusServiceClientPoolFactory(config.getIoTConsensusConfig()));
     this.syncClientManager =
-        new IClientManager.Factory<TEndPoint, SyncIoTConsensusServiceClient>()
+        new IClientManager.Factory<TEndPoint, IoTConsensusServiceClient>()
             .createClientManager(
                 new SyncIoTConsensusServiceClientPoolFactory(config.getIoTConsensusConfig()));
     // init IoTConsensus memory manager
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 1d668c281a..39d663aa69 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -34,7 +34,7 @@ import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.config.IoTConsensusConfig;
 import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
 import org.apache.iotdb.consensus.iot.client.AsyncIoTConsensusServiceClient;
-import org.apache.iotdb.consensus.iot.client.SyncIoTConsensusServiceClient;
+import org.apache.iotdb.consensus.iot.client.IoTConsensusServiceClient;
 import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher;
 import org.apache.iotdb.consensus.iot.snapshot.SnapshotFragmentReader;
 import org.apache.iotdb.consensus.iot.thrift.TActivatePeerReq;
@@ -104,7 +104,7 @@ public class IoTConsensusServerImpl {
   private final ConsensusReqReader reader;
   private volatile boolean active;
   private String latestSnapshotId;
-  private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager;
+  private final IClientManager<TEndPoint, IoTConsensusServiceClient> syncClientManager;
   private final IoTConsensusServerMetrics metrics;
 
   private final String consensusGroupId;
@@ -115,7 +115,7 @@ public class IoTConsensusServerImpl {
       List<Peer> configuration,
       IStateMachine stateMachine,
       IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager,
-      IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager,
+      IClientManager<TEndPoint, IoTConsensusServiceClient> syncClientManager,
       IoTConsensusConfig config) {
     this.active = true;
     this.storageDir = storageDir;
@@ -315,7 +315,7 @@ public class IoTConsensusServerImpl {
     File snapshotDir = new File(storageDir, latestSnapshotId);
     List<Path> snapshotPaths = stateMachine.getSnapshotFiles(snapshotDir);
     logger.info("transit snapshots: {}", snapshotPaths);
-    try (SyncIoTConsensusServiceClient client =
+    try (IoTConsensusServiceClient client =
         syncClientManager.borrowClient(targetPeer.getEndpoint())) {
       for (Path path : snapshotPaths) {
         SnapshotFragmentReader reader = new SnapshotFragmentReader(latestSnapshotId, path);
@@ -395,8 +395,7 @@ public class IoTConsensusServerImpl {
   }
 
   public void inactivePeer(Peer peer) throws ConsensusGroupModifyPeerException {
-    try (SyncIoTConsensusServiceClient client =
-        syncClientManager.borrowClient(peer.getEndpoint())) {
+    try (IoTConsensusServiceClient client = syncClientManager.borrowClient(peer.getEndpoint())) {
       TInactivatePeerRes res =
           client.inactivatePeer(
               new TInactivatePeerReq(peer.getGroupId().convertToTConsensusGroupId()));
@@ -411,8 +410,7 @@ public class IoTConsensusServerImpl {
   }
 
   public void triggerSnapshotLoad(Peer peer) throws ConsensusGroupModifyPeerException {
-    try (SyncIoTConsensusServiceClient client =
-        syncClientManager.borrowClient(peer.getEndpoint())) {
+    try (IoTConsensusServiceClient client = syncClientManager.borrowClient(peer.getEndpoint())) {
       TTriggerSnapshotLoadRes res =
           client.triggerSnapshotLoad(
               new TTriggerSnapshotLoadReq(
@@ -428,8 +426,7 @@ public class IoTConsensusServerImpl {
   }
 
   public void activePeer(Peer peer) throws ConsensusGroupModifyPeerException {
-    try (SyncIoTConsensusServiceClient client =
-        syncClientManager.borrowClient(peer.getEndpoint())) {
+    try (IoTConsensusServiceClient client = syncClientManager.borrowClient(peer.getEndpoint())) {
       TActivatePeerRes res =
           client.activatePeer(new TActivatePeerReq(peer.getGroupId().convertToTConsensusGroupId()));
       if (!isSuccess(res.status)) {
@@ -459,7 +456,7 @@ public class IoTConsensusServerImpl {
         buildSyncLogChannel(targetPeer, index.get());
       } else {
         // use RPC to tell other peers to build sync log channel to target peer
-        try (SyncIoTConsensusServiceClient client =
+        try (IoTConsensusServiceClient client =
             syncClientManager.borrowClient(peer.getEndpoint())) {
           TBuildSyncLogChannelRes res =
               client.buildSyncLogChannel(
@@ -502,7 +499,7 @@ public class IoTConsensusServerImpl {
         removeSyncLogChannel(targetPeer);
       } else {
         // use RPC to tell other peers to build sync log channel to target peer
-        try (SyncIoTConsensusServiceClient client =
+        try (IoTConsensusServiceClient client =
             syncClientManager.borrowClient(peer.getEndpoint())) {
           TRemoveSyncLogChannelRes res =
               client.removeSyncLogChannel(
@@ -525,7 +522,7 @@ public class IoTConsensusServerImpl {
   public void waitTargetPeerUntilSyncLogCompleted(Peer targetPeer)
       throws ConsensusGroupModifyPeerException {
     long checkIntervalInMs = 10_000L;
-    try (SyncIoTConsensusServiceClient client =
+    try (IoTConsensusServiceClient client =
         syncClientManager.borrowClient(targetPeer.getEndpoint())) {
       while (true) {
         TWaitSyncLogCompleteRes res =
@@ -740,7 +737,7 @@ public class IoTConsensusServerImpl {
   }
 
   public void cleanupRemoteSnapshot(Peer targetPeer) throws ConsensusGroupModifyPeerException {
-    try (SyncIoTConsensusServiceClient client =
+    try (IoTConsensusServiceClient client =
         syncClientManager.borrowClient(targetPeer.getEndpoint())) {
       TCleanupTransferredSnapshotReq req =
           new TCleanupTransferredSnapshotReq(
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
index 1f08dabbc6..f4c68c47c4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
@@ -20,9 +20,10 @@
 package org.apache.iotdb.consensus.iot.client;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.AsyncBaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.consensus.iot.thrift.IoTConsensusIService;
 import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
 
@@ -35,7 +36,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
-public class AsyncIoTConsensusServiceClient extends IoTConsensusIService.AsyncClient {
+public class AsyncIoTConsensusServiceClient extends IoTConsensusIService.AsyncClient
+    implements ThriftClient {
 
   private static final Logger logger =
       LoggerFactory.getLogger(AsyncIoTConsensusServiceClient.class);
@@ -58,41 +60,44 @@ public class AsyncIoTConsensusServiceClient extends IoTConsensusIService.AsyncCl
     this.clientManager = clientManager;
   }
 
-  public void close() {
-    ___transport.close();
-    ___currentMethod = null;
-  }
-
-  /**
-   * return self if clientManager is not null, the method doesn't need to call by user, it will
-   * trigger once client transport complete.
-   */
-  private void returnSelf() {
-    if (clientManager != null) {
-      clientManager.returnClient(endpoint, this);
-    }
-  }
-
-  /**
-   * This method will be automatically called by the thrift selector thread, and we'll just simulate
-   * the behavior in our test
-   */
   @Override
   public void onComplete() {
     super.onComplete();
     returnSelf();
   }
 
-  /**
-   * This method will be automatically called by the thrift selector thread, and we'll just simulate
-   * the behavior in our test
-   */
   @Override
   public void onError(Exception e) {
     super.onError(e);
+    ThriftClient.resolveException(e, this);
     returnSelf();
   }
 
+  @Override
+  public void invalidate() {
+    if (!hasError()) {
+      super.onError(new Exception("This client has been invalidated"));
+    }
+  }
+
+  @Override
+  public void invalidateAll() {
+    clientManager.clear(endpoint);
+  }
+
+  /**
+   * return self if clientManager is not null, the method doesn't need to call by user, it will
+   * trigger once client transport complete.
+   */
+  private void returnSelf() {
+    clientManager.returnClient(endpoint, this);
+  }
+
+  private void close() {
+    ___transport.close();
+    ___currentMethod = null;
+  }
+
   public boolean isReady() {
     try {
       checkReady();
@@ -109,13 +114,13 @@ public class AsyncIoTConsensusServiceClient extends IoTConsensusIService.AsyncCl
   }
 
   public static class Factory
-      extends AsyncBaseClientFactory<TEndPoint, AsyncIoTConsensusServiceClient> {
+      extends AsyncThriftClientFactory<TEndPoint, AsyncIoTConsensusServiceClient> {
 
     public Factory(
         ClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager,
-        ClientFactoryProperty clientFactoryProperty,
+        ThriftClientProperty thriftClientProperty,
         String threadName) {
-      super(clientManager, clientFactoryProperty, threadName);
+      super(clientManager, thriftClientProperty, threadName);
     }
 
     @Override
@@ -127,21 +132,19 @@ public class AsyncIoTConsensusServiceClient extends IoTConsensusIService.AsyncCl
     @Override
     public PooledObject<AsyncIoTConsensusServiceClient> makeObject(TEndPoint endPoint)
         throws Exception {
-      TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % tManagers.length];
-      tManager = tManager == null ? new TAsyncClientManager() : tManager;
       return new DefaultPooledObject<>(
           new AsyncIoTConsensusServiceClient(
-              clientFactoryProperty.getProtocolFactory(),
-              clientFactoryProperty.getConnectionTimeoutMs(),
+              thriftClientProperty.getProtocolFactory(),
+              thriftClientProperty.getConnectionTimeoutMs(),
               endPoint,
-              tManager,
+              tManagers[clientCnt.incrementAndGet() % tManagers.length],
               clientManager));
     }
 
     @Override
     public boolean validateObject(
         TEndPoint endPoint, PooledObject<AsyncIoTConsensusServiceClient> pooledObject) {
-      return pooledObject.getObject() != null && pooledObject.getObject().isReady();
+      return pooledObject.getObject().isReady();
     }
   }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
index babc9fb4f5..6de9987914 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
@@ -20,10 +20,10 @@
 package org.apache.iotdb.consensus.iot.client;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
-import org.apache.iotdb.commons.client.ClientPoolProperty;
 import org.apache.iotdb.commons.client.IClientPoolFactory;
+import org.apache.iotdb.commons.client.property.ClientPoolProperty;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.consensus.config.IoTConsensusConfig;
 
 import org.apache.commons.pool2.KeyedObjectPool;
@@ -34,7 +34,7 @@ public class IoTConsensusClientPool {
   private IoTConsensusClientPool() {}
 
   public static class SyncIoTConsensusServiceClientPoolFactory
-      implements IClientPoolFactory<TEndPoint, SyncIoTConsensusServiceClient> {
+      implements IClientPoolFactory<TEndPoint, IoTConsensusServiceClient> {
 
     private final IoTConsensusConfig config;
 
@@ -43,18 +43,16 @@ public class IoTConsensusClientPool {
     }
 
     @Override
-    public KeyedObjectPool<TEndPoint, SyncIoTConsensusServiceClient> createClientPool(
-        ClientManager<TEndPoint, SyncIoTConsensusServiceClient> manager) {
+    public KeyedObjectPool<TEndPoint, IoTConsensusServiceClient> createClientPool(
+        ClientManager<TEndPoint, IoTConsensusServiceClient> manager) {
       return new GenericKeyedObjectPool<>(
-          new SyncIoTConsensusServiceClient.Factory(
+          new IoTConsensusServiceClient.Factory(
               manager,
-              new ClientFactoryProperty.Builder()
+              new ThriftClientProperty.Builder()
                   .setConnectionTimeoutMs(config.getRpc().getConnectionTimeoutInMs())
                   .setRpcThriftCompressionEnabled(config.getRpc().isRpcThriftCompressionEnabled())
-                  .setSelectorNumOfAsyncClientManager(
-                      config.getRpc().getSelectorNumOfClientManager())
                   .build()),
-          new ClientPoolProperty.Builder<SyncIoTConsensusServiceClient>().build().getConfig());
+          new ClientPoolProperty.Builder<IoTConsensusServiceClient>().build().getConfig());
     }
   }
 
@@ -74,7 +72,7 @@ public class IoTConsensusClientPool {
       return new GenericKeyedObjectPool<>(
           new AsyncIoTConsensusServiceClient.Factory(
               manager,
-              new ClientFactoryProperty.Builder()
+              new ThriftClientProperty.Builder()
                   .setConnectionTimeoutMs(config.getRpc().getConnectionTimeoutInMs())
                   .setRpcThriftCompressionEnabled(config.getRpc().isRpcThriftCompressionEnabled())
                   .setSelectorNumOfAsyncClientManager(
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusServiceClient.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusServiceClient.java
new file mode 100644
index 0000000000..c482191af6
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusServiceClient.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.iot.client;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.client.sync.SyncThriftClientWithErrorHandler;
+import org.apache.iotdb.consensus.iot.thrift.IoTConsensusIService;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
+
+public class IoTConsensusServiceClient extends IoTConsensusIService.Client
+    implements ThriftClient, AutoCloseable {
+
+  private final TEndPoint endpoint;
+  private final ClientManager<TEndPoint, IoTConsensusServiceClient> clientManager;
+
+  public IoTConsensusServiceClient(
+      TProtocolFactory protocolFactory,
+      int connectionTimeout,
+      TEndPoint endpoint,
+      ClientManager<TEndPoint, IoTConsensusServiceClient> clientManager)
+      throws TTransportException {
+    super(
+        protocolFactory.getProtocol(
+            RpcTransportFactory.INSTANCE.getTransport(
+                new TSocket(
+                    TConfigurationConst.defaultTConfiguration,
+                    endpoint.getIp(),
+                    endpoint.getPort(),
+                    connectionTimeout))));
+    this.endpoint = endpoint;
+    this.clientManager = clientManager;
+    getInputProtocol().getTransport().open();
+  }
+
+  @Override
+  public void close() {
+    clientManager.returnClient(endpoint, this);
+  }
+
+  @Override
+  public void invalidate() {
+    getInputProtocol().getTransport().close();
+  }
+
+  @Override
+  public void invalidateAll() {
+    clientManager.clear(endpoint);
+  }
+
+  @Override
+  public String toString() {
+    return String.format("SyncIoTConsensusServiceClient{%s}", endpoint);
+  }
+
+  public static class Factory extends ThriftClientFactory<TEndPoint, IoTConsensusServiceClient> {
+
+    public Factory(
+        ClientManager<TEndPoint, IoTConsensusServiceClient> clientManager,
+        ThriftClientProperty thriftClientProperty) {
+      super(clientManager, thriftClientProperty);
+    }
+
+    @Override
+    public void destroyObject(
+        TEndPoint endpoint, PooledObject<IoTConsensusServiceClient> pooledObject) {
+      pooledObject.getObject().invalidate();
+    }
+
+    @Override
+    public PooledObject<IoTConsensusServiceClient> makeObject(TEndPoint endpoint) throws Exception {
+      return new DefaultPooledObject<>(
+          SyncThriftClientWithErrorHandler.newErrorHandler(
+              IoTConsensusServiceClient.class,
+              IoTConsensusServiceClient.class.getConstructor(
+                  TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass()),
+              thriftClientProperty.getProtocolFactory(),
+              thriftClientProperty.getConnectionTimeoutMs(),
+              endpoint,
+              clientManager));
+    }
+
+    @Override
+    public boolean validateObject(
+        TEndPoint endpoint, PooledObject<IoTConsensusServiceClient> pooledObject) {
+      return pooledObject.getObject().getInputProtocol().getTransport().isOpen();
+    }
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java
deleted file mode 100644
index 06fb777810..0000000000
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.consensus.iot.client;
-
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.BaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
-import org.apache.iotdb.commons.client.ClientManager;
-import org.apache.iotdb.commons.client.sync.SyncThriftClient;
-import org.apache.iotdb.commons.client.sync.SyncThriftClientWithErrorHandler;
-import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.consensus.iot.thrift.IoTConsensusIService;
-import org.apache.iotdb.rpc.RpcTransportFactory;
-import org.apache.iotdb.rpc.TConfigurationConst;
-import org.apache.iotdb.rpc.TimeoutChangeableTransport;
-
-import org.apache.commons.pool2.PooledObject;
-import org.apache.commons.pool2.impl.DefaultPooledObject;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransportException;
-
-import java.lang.reflect.Constructor;
-import java.net.SocketException;
-
-public class SyncIoTConsensusServiceClient extends IoTConsensusIService.Client
-    implements SyncThriftClient, AutoCloseable {
-
-  private final TEndPoint endPoint;
-  private final ClientManager<TEndPoint, SyncIoTConsensusServiceClient> clientManager;
-
-  public SyncIoTConsensusServiceClient(
-      TProtocolFactory protocolFactory,
-      int connectionTimeout,
-      TEndPoint endPoint,
-      ClientManager<TEndPoint, SyncIoTConsensusServiceClient> clientManager)
-      throws TTransportException {
-    super(
-        protocolFactory.getProtocol(
-            RpcTransportFactory.INSTANCE.getTransport(
-                new TSocket(
-                    TConfigurationConst.defaultTConfiguration,
-                    endPoint.getIp(),
-                    endPoint.getPort(),
-                    connectionTimeout))));
-    this.endPoint = endPoint;
-    this.clientManager = clientManager;
-    getInputProtocol().getTransport().open();
-  }
-
-  @TestOnly
-  public TEndPoint getTEndpoint() {
-    return endPoint;
-  }
-
-  @TestOnly
-  public ClientManager<TEndPoint, SyncIoTConsensusServiceClient> getClientManager() {
-    return clientManager;
-  }
-
-  public void close() {
-    if (clientManager != null) {
-      clientManager.returnClient(endPoint, this);
-    }
-  }
-
-  public void setTimeout(int timeout) {
-    // the same transport is used in both input and output
-    ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
-  }
-
-  public void invalidate() {
-    getInputProtocol().getTransport().close();
-  }
-
-  @Override
-  public void invalidateAll() {
-    clientManager.clear(endPoint);
-  }
-
-  public int getTimeout() throws SocketException {
-    return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
-  }
-
-  @Override
-  public String toString() {
-    return String.format("SyncIoTConsensusServiceClient{%s}", endPoint);
-  }
-
-  public static class Factory extends BaseClientFactory<TEndPoint, SyncIoTConsensusServiceClient> {
-
-    public Factory(
-        ClientManager<TEndPoint, SyncIoTConsensusServiceClient> clientManager,
-        ClientFactoryProperty clientFactoryProperty) {
-      super(clientManager, clientFactoryProperty);
-    }
-
-    @Override
-    public void destroyObject(
-        TEndPoint endpoint, PooledObject<SyncIoTConsensusServiceClient> pooledObject) {
-      pooledObject.getObject().invalidate();
-    }
-
-    @Override
-    public PooledObject<SyncIoTConsensusServiceClient> makeObject(TEndPoint endpoint)
-        throws Exception {
-      Constructor<SyncIoTConsensusServiceClient> constructor =
-          SyncIoTConsensusServiceClient.class.getConstructor(
-              TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass());
-      return new DefaultPooledObject<>(
-          SyncThriftClientWithErrorHandler.newErrorHandler(
-              SyncIoTConsensusServiceClient.class,
-              constructor,
-              clientFactoryProperty.getProtocolFactory(),
-              clientFactoryProperty.getConnectionTimeoutMs(),
-              endpoint,
-              clientManager));
-    }
-
-    @Override
-    public boolean validateObject(
-        TEndPoint endpoint, PooledObject<SyncIoTConsensusServiceClient> pooledObject) {
-      return pooledObject.getObject() != null
-          && pooledObject.getObject().getInputProtocol().getTransport().isOpen();
-    }
-  }
-}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java
index d9884f57d6..273c80ee81 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java
@@ -18,9 +18,8 @@
  */
 package org.apache.iotdb.consensus.ratis;
 
-import org.apache.iotdb.commons.client.BaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.factory.BaseClientFactory;
 import org.apache.iotdb.consensus.config.RatisConfig;
 
 import org.apache.commons.pool2.PooledObject;
@@ -41,6 +40,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
 public class RatisClient {
+
   private final Logger logger = LoggerFactory.getLogger(RatisClient.class);
   private final RaftGroup serveGroup;
   private final RaftClient raftClient;
@@ -59,7 +59,7 @@ public class RatisClient {
     return raftClient;
   }
 
-  public void close() {
+  private void close() {
     try {
       raftClient.close();
     } catch (IOException e) {
@@ -68,9 +68,7 @@ public class RatisClient {
   }
 
   public void returnSelf() {
-    if (clientManager != null) {
-      clientManager.returnClient(serveGroup, this);
-    }
+    clientManager.returnClient(serveGroup, this);
   }
 
   public static class Factory extends BaseClientFactory<RaftGroup, RatisClient> {
@@ -81,11 +79,10 @@ public class RatisClient {
 
     public Factory(
         ClientManager<RaftGroup, RatisClient> clientManager,
-        ClientFactoryProperty clientPoolProperty,
         RaftProperties raftProperties,
         RaftClientRpc clientRpc,
         Supplier<RatisConfig.Impl> config) {
-      super(clientManager, clientPoolProperty);
+      super(clientManager);
       this.raftProperties = raftProperties;
       this.clientRpc = clientRpc;
       this.config = config;
@@ -97,7 +94,7 @@ public class RatisClient {
     }
 
     @Override
-    public PooledObject<RatisClient> makeObject(RaftGroup group) throws Exception {
+    public PooledObject<RatisClient> makeObject(RaftGroup group) {
       return new DefaultPooledObject<>(
           new RatisClient(
               group,
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index e00374c347..00d3dd45f4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -21,12 +21,11 @@ package org.apache.iotdb.consensus.ratis;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
-import org.apache.iotdb.commons.client.ClientPoolProperty;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.IClientPoolFactory;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.client.property.ClientPoolProperty;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -814,11 +813,7 @@ class RatisConsensus implements IConsensus {
         ClientManager<RaftGroup, RatisClient> manager) {
       return new GenericKeyedObjectPool<>(
           new RatisClient.Factory(
-              manager,
-              new ClientFactoryProperty.Builder().build(),
-              properties,
-              clientRpc,
-              MemoizedSupplier.valueOf(() -> config.getImpl())),
+              manager, properties, clientRpc, MemoizedSupplier.valueOf(config::getImpl)),
           new ClientPoolProperty.Builder<RatisClient>().build().getConfig());
     }
   }
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
index 98ae650290..4393bd4740 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
@@ -19,13 +19,13 @@
 package org.apache.iotdb.it.env;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
-import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.client.sync.ConfigNodeIServiceClient;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
 import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.isession.ISession;
 import org.apache.iotdb.isession.SessionConfig;
 import org.apache.iotdb.it.framework.IoTDBTestLogger;
@@ -76,16 +76,15 @@ public abstract class AbstractEnv implements BaseEnv {
   public static final String templateNodePath =
       System.getProperty("user.dir") + File.separator + "target" + File.separator + "template-node";
 
-  private IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager;
+  private IClientManager<TEndPoint, ConfigNodeIServiceClient> clientManager;
 
   protected void initEnvironment(int configNodesNum, int dataNodesNum) {
     this.configNodeWrapperList = new ArrayList<>();
     this.dataNodeWrapperList = new ArrayList<>();
 
     clientManager =
-        new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
-            .createClientManager(
-                new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
+        new IClientManager.Factory<TEndPoint, ConfigNodeIServiceClient>()
+            .createClientManager(new ClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
 
     final String testClassName = getTestClassName();
     final String testMethodName = getTestMethodName();
@@ -100,8 +99,8 @@ public abstract class AbstractEnv implements BaseEnv {
     this.configNodeWrapperList.add(seedConfigNodeWrapper);
 
     // Check if the Seed-ConfigNode started successfully
-    try (SyncConfigNodeIServiceClient ignored =
-        (SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient ignored =
+        (ConfigNodeIServiceClient) getLeaderConfigNodeConnection()) {
       // Do nothing
       logger.info("The Seed-ConfigNode started successfully!");
     } catch (Exception e) {
@@ -234,8 +233,8 @@ public abstract class AbstractEnv implements BaseEnv {
     Exception lastException = null;
     boolean flag;
     for (int i = 0; i < 30; i++) {
-      try (SyncConfigNodeIServiceClient client =
-          (SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) {
+      try (ConfigNodeIServiceClient client =
+          (ConfigNodeIServiceClient) getLeaderConfigNodeConnection()) {
         flag = true;
         showClusterResp = client.showCluster();
 
@@ -448,7 +447,7 @@ public abstract class AbstractEnv implements BaseEnv {
     for (int i = 0; i < 30; i++) {
       for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) {
         try {
-          SyncConfigNodeIServiceClient client =
+          ConfigNodeIServiceClient client =
               clientManager.borrowClient(
                   new TEndPoint(configNodeWrapper.getIp(), configNodeWrapper.getPort()));
           TShowClusterResp resp = client.showCluster();
@@ -484,7 +483,7 @@ public abstract class AbstractEnv implements BaseEnv {
     for (int retry = 0; retry < 30; retry++) {
       for (int configNodeId = 0; configNodeId < configNodeWrapperList.size(); configNodeId++) {
         ConfigNodeWrapper configNodeWrapper = configNodeWrapperList.get(configNodeId);
-        try (SyncConfigNodeIServiceClient client =
+        try (ConfigNodeIServiceClient client =
             clientManager.borrowClient(
                 new TEndPoint(configNodeWrapper.getIp(), configNodeWrapper.getPort()))) {
           TShowClusterResp resp = client.showCluster();
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
index 9cf6344f9e..9642b6a1b3 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
@@ -19,11 +19,11 @@
 package org.apache.iotdb.it.env;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
-import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.client.sync.ConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.isession.ISession;
 import org.apache.iotdb.itbase.env.BaseEnv;
 import org.apache.iotdb.jdbc.Config;
@@ -47,7 +47,7 @@ public class RemoteServerEnv implements BaseEnv {
   private final String port = System.getProperty("RemotePort", "6667");
   private final String user = System.getProperty("RemoteUser", "root");
   private final String password = System.getProperty("RemotePassword", "root");
-  private IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager;
+  private IClientManager<TEndPoint, ConfigNodeIServiceClient> clientManager;
 
   @Override
   public void initBeforeClass() {
@@ -60,9 +60,8 @@ public class RemoteServerEnv implements BaseEnv {
       fail(e.getMessage());
     }
     clientManager =
-        new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
-            .createClientManager(
-                new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
+        new IClientManager.Factory<TEndPoint, ConfigNodeIServiceClient>()
+            .createClientManager(new ClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
   }
 
   @Override
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterAuthorityIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterAuthorityIT.java
index 7a6dfc6b69..3acc43e525 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterAuthorityIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBClusterAuthorityIT.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.confignode.it;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.auth.entity.PrivilegeType;
-import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.client.sync.ConfigNodeIServiceClient;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
 import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
@@ -129,8 +129,8 @@ public class IoTDBClusterAuthorityIT {
     List<String> paths = new ArrayList<>();
     paths.add("root.ln.**");
 
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
       cleanUserAndRole(client);
 
       // create user
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
index c9d30c6668..bdadba8ae5 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.confignode.it;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.client.sync.ConfigNodeIServiceClient;
 import org.apache.iotdb.commons.cq.CQState;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
@@ -124,8 +124,8 @@ public class IoTDBConfigNodeSnapshotIT {
     final int seriesPartitionSlotsNum = 10;
     final int timePartitionSlotsNum = 10;
 
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
 
       List<TCreateTriggerReq> createTriggerReqs = createTrigger(client);
       List<TCreateFunctionReq> createFunctionReqs = createUDF(client);
@@ -210,7 +210,7 @@ public class IoTDBConfigNodeSnapshotIT {
     }
   }
 
-  private List<TCreateTriggerReq> createTrigger(SyncConfigNodeIServiceClient client)
+  private List<TCreateTriggerReq> createTrigger(ConfigNodeIServiceClient client)
       throws IllegalPathException, TException, IOException {
     final String triggerPath =
         System.getProperty("user.dir")
@@ -286,7 +286,7 @@ public class IoTDBConfigNodeSnapshotIT {
     }
   }
 
-  private List<TCreateFunctionReq> createUDF(SyncConfigNodeIServiceClient client)
+  private List<TCreateFunctionReq> createUDF(ConfigNodeIServiceClient client)
       throws TException, IOException {
     final String jarName = "udf-example.jar";
     final String triggerPath =
@@ -338,7 +338,7 @@ public class IoTDBConfigNodeSnapshotIT {
     }
   }
 
-  private Set<TCQEntry> createCQs(SyncConfigNodeIServiceClient client) throws TException {
+  private Set<TCQEntry> createCQs(ConfigNodeIServiceClient client) throws TException {
     String sql1 = "create cq testCq1 BEGIN select s1 into root.backup.d1(s1) from root.sg.d1 END";
     String sql2 = "create cq testCq2 BEGIN select s1 into root.backup.d2(s1) from root.sg.d2 END";
     TCreateCQReq req1 =
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBSnapshotTransferIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBSnapshotTransferIT.java
index 17d9a13565..3e0088cbf0 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBSnapshotTransferIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBSnapshotTransferIT.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.confignode.it;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.client.sync.ConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeConfigurationResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
@@ -78,8 +78,8 @@ public class IoTDBSnapshotTransferIT {
   public void testSnapshotTransfer() throws Exception {
     try (final Connection connection = EnvFactory.getEnv().getConnection();
         final Statement statement = connection.createStatement();
-        final SyncConfigNodeIServiceClient configClient =
-            (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+        final ConfigNodeIServiceClient configClient =
+            (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
 
       statement.execute("CREATE DATABASE root.emma");
       statement.execute(
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBStorageGroupIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBStorageGroupIT.java
index b305c07ae7..702edd8a88 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBStorageGroupIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBStorageGroupIT.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.confignode.it;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
-import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.client.sync.ConfigNodeIServiceClient;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupsReq;
@@ -67,8 +67,8 @@ public class IoTDBStorageGroupIT {
     final String sg0 = "root.sg0";
     final String sg1 = "root.sg1";
 
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
       // set StorageGroup0 by default values
       TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
       status = client.setStorageGroup(setReq0);
@@ -157,8 +157,8 @@ public class IoTDBStorageGroupIT {
     final String sg0 = "root.sg0";
     final String sg1 = "root.sg1";
 
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
       TSetStorageGroupReq setReq0 = new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
       // set StorageGroup0 by default values
       status = client.setStorageGroup(setReq0);
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java
index b1ccbd6a32..638a958b51 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
-import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.client.sync.ConfigNodeIServiceClient;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
@@ -105,8 +105,8 @@ public class IoTDBClusterNodeErrorStartUpIT {
     conflictConfigNodeWrapper.changeConfig(ConfigFactory.getConfig().getConfignodeProperties());
 
     // The registration request should be rejected since there exists conflict port
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
       TConfigNodeRegisterReq req =
           ConfigNodeTestUtils.generateTConfigNodeRegisterReq(conflictConfigNodeWrapper);
       TConfigNodeRegisterResp resp = client.registerConfigNode(req);
@@ -131,8 +131,8 @@ public class IoTDBClusterNodeErrorStartUpIT {
     conflictDataNodeWrapper.changeConfig(ConfigFactory.getConfig().getEngineProperties());
 
     // The registration request should be rejected since there exists conflict port
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
       TDataNodeRegisterReq req =
           ConfigNodeTestUtils.generateTDataNodeRegisterReq(conflictDataNodeWrapper);
       TDataNodeRegisterResp resp = client.registerDataNode(req);
@@ -155,8 +155,8 @@ public class IoTDBClusterNodeErrorStartUpIT {
     ConfigNodeWrapper registeredConfigNodeWrapper = EnvFactory.getEnv().getConfigNodeWrapper(1);
     DataNodeWrapper registeredDataNodeWrapper = EnvFactory.getEnv().getDataNodeWrapper(0);
 
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
 
       /* Restart with error cluster name */
 
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeGetterIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeGetterIT.java
index 79bbd29c38..7e889369f8 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeGetterIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeGetterIT.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.client.sync.ConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeConfigurationResp;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
@@ -99,8 +99,8 @@ public class IoTDBClusterNodeGetterIT {
 
   @Test
   public void showClusterAndNodesTest() throws Exception {
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
 
       List<ConfigNodeWrapper> configNodeWrappers = EnvFactory.getEnv().getConfigNodeWrapperList();
       List<DataNodeWrapper> dataNodeWrappers = EnvFactory.getEnv().getDataNodeWrapperList();
@@ -183,8 +183,8 @@ public class IoTDBClusterNodeGetterIT {
     TShowClusterResp showClusterResp;
     TSStatus status;
 
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
 
       // Test remove ConfigNode
       showClusterResp = client.showCluster();
@@ -217,8 +217,8 @@ public class IoTDBClusterNodeGetterIT {
   @Test
   public void queryAndRemoveDataNodeTest() throws Exception {
 
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
 
       /* Test success re-register */
       TDataNodeRegisterReq dataNodeRegisterReq = new TDataNodeRegisterReq();
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
index 28f2df6bb6..0464833254 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterRestartIT.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.client.sync.ConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
@@ -177,8 +177,8 @@ public class IoTDBClusterRestartIT {
     ((AbstractEnv) EnvFactory.getEnv()).testWorking();
 
     // check nodeInfo in cluster
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
       // check the number and status of nodes
       clusterNodes = getClusterNodeInfos(client, testConfigNodeNum, testDataNodeNum);
       assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), clusterNodes.getStatus().getCode());
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java
index faf6a6ccdb..7c1352c0e3 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.confignode.it.load;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.client.sync.ConfigNodeIServiceClient;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.cluster.RegionRoleType;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
@@ -117,8 +117,8 @@ public class IoTDBClusterRegionLeaderBalancingIT {
 
     TSStatus status;
     final int storageGroupNum = 3;
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
 
       // Set StorageGroups
       for (int i = 0; i < storageGroupNum; i++) {
@@ -171,8 +171,8 @@ public class IoTDBClusterRegionLeaderBalancingIT {
 
     TSStatus status;
     final int storageGroupNum = 6;
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
 
       for (int i = 0; i < storageGroupNum; i++) {
         // Set StorageGroups
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBConfigNodeSwitchLeaderIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBConfigNodeSwitchLeaderIT.java
index 81abd2087c..8d71c4c42a 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBConfigNodeSwitchLeaderIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBConfigNodeSwitchLeaderIT.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.confignode.it.load;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.client.sync.ConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
@@ -128,8 +128,8 @@ public class IoTDBConfigNodeSwitchLeaderIT {
     TSchemaPartitionTableResp schemaPartitionTableResp0;
     TDataPartitionTableResp dataPartitionTableResp0;
 
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
       // Set StorageGroups
       status = client.setStorageGroup(new TSetStorageGroupReq(new TStorageGroupSchema(sg0)));
       Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
@@ -166,8 +166,8 @@ public class IoTDBConfigNodeSwitchLeaderIT {
     // Switch the current ConfigNode-Leader
     switchLeader();
 
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
       // Check SchemaPartitionTable
       ByteBuffer buffer =
           ConfigNodeTestUtils.generatePatternTreeBuffer(new String[] {d00, d01, d10, d11});
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java
index 32282d19ab..8a7dc6a7f2 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.confignode.it.partition;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.client.sync.ConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
@@ -116,8 +116,8 @@ public class IoTDBAutoRegionGroupExtensionIT {
 
     final int retryNum = 100;
 
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
 
       setStorageGroupAndCheckRegionGroupDistribution(client);
 
@@ -143,7 +143,7 @@ public class IoTDBAutoRegionGroupExtensionIT {
     }
   }
 
-  private void setStorageGroupAndCheckRegionGroupDistribution(SyncConfigNodeIServiceClient client)
+  private void setStorageGroupAndCheckRegionGroupDistribution(ConfigNodeIServiceClient client)
       throws TException {
     for (int i = 0; i < testSgNum; i++) {
       String curSg = sg + i;
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBCustomRegionGroupExtensionIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBCustomRegionGroupExtensionIT.java
index 0d22f3982c..8417c45d1f 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBCustomRegionGroupExtensionIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBCustomRegionGroupExtensionIT.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.confignode.it.partition;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.client.sync.ConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
@@ -129,8 +129,8 @@ public class IoTDBCustomRegionGroupExtensionIT {
 
   @Test
   public void testCustomRegionGroupExtensionPolicy() throws Exception {
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
 
       for (int i = 0; i < testSgNum; i++) {
         String curSg = sg + i;
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java
index 665155e8ae..4987473fca 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.client.sync.ConfigNodeIServiceClient;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
@@ -127,8 +127,8 @@ public class IoTDBPartitionDurableIT {
   }
 
   private void setStorageGroup() throws Exception {
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
       TSetStorageGroupReq setStorageGroupReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg));
       TSStatus status = client.setStorageGroup(setStorageGroupReq);
       Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
@@ -154,8 +154,8 @@ public class IoTDBPartitionDurableIT {
 
   @Test
   public void testRemovingDataNode() throws Exception {
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
 
       /* Test getOrCreateSchemaPartition, ConfigNode should create SchemaPartition and return */
       TSchemaPartitionReq schemaPartitionReq =
@@ -280,8 +280,8 @@ public class IoTDBPartitionDurableIT {
 
   @Test
   public void testReadOnlyDataNode() throws Exception {
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
 
       /* Test getOrCreateDataPartition, ConfigNode should create DataPartition and return */
       Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap =
@@ -434,8 +434,8 @@ public class IoTDBPartitionDurableIT {
     // Shutdown a DataNode, the ConfigNode should still be able to create RegionGroup
     EnvFactory.getEnv().shutdownDataNode(testDataNodeId);
 
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
       // Test getOrCreateDataPartition, ConfigNode should create DataPartition and return
       Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap =
           ConfigNodeTestUtils.constructPartitionSlotsMap(
@@ -449,8 +449,8 @@ public class IoTDBPartitionDurableIT {
       TDataPartitionTableResp dataPartitionTableResp = null;
       for (int retry = 0; retry < 5; retry++) {
         // Build new Client since it's unstable in Win8 environment
-        try (SyncConfigNodeIServiceClient configNodeClient =
-            (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+        try (ConfigNodeIServiceClient configNodeClient =
+            (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
           dataPartitionTableResp = configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
           if (dataPartitionTableResp != null) {
             break;
@@ -538,8 +538,8 @@ public class IoTDBPartitionDurableIT {
       dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
       for (int retry = 0; retry < 5; retry++) {
         // Build new Client since it's unstable in Win8 environment
-        try (SyncConfigNodeIServiceClient configNodeClient =
-            (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+        try (ConfigNodeIServiceClient configNodeClient =
+            (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
           dataPartitionTableResp = configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
           if (dataPartitionTableResp != null) {
             break;
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java
index edfad728f9..62aad1f793 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.client.sync.ConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
@@ -135,8 +135,8 @@ public class IoTDBPartitionGetterIT {
   }
 
   private static void prepareData() throws Exception {
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
       /* Set StorageGroups */
       for (int i = 0; i < storageGroupNum; i++) {
         TSetStorageGroupReq setReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg + i));
@@ -190,9 +190,8 @@ public class IoTDBPartitionGetterIT {
             TDataPartitionTableResp dataPartitionTableResp = null;
             for (int retry = 0; retry < 5; retry++) {
               // Build new Client since it's unstable
-              try (SyncConfigNodeIServiceClient configNodeClient =
-                  (SyncConfigNodeIServiceClient)
-                      EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+              try (ConfigNodeIServiceClient configNodeClient =
+                  (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
                 dataPartitionTableResp =
                     configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
                 if (dataPartitionTableResp != null) {
@@ -250,8 +249,8 @@ public class IoTDBPartitionGetterIT {
 
     final String notExistsSg = "root.sg10.**";
 
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
       ByteBuffer buffer;
       TSchemaPartitionReq schemaPartitionReq;
       TSchemaPartitionTableResp schemaPartitionTableResp;
@@ -305,8 +304,8 @@ public class IoTDBPartitionGetterIT {
     final int seriesPartitionBatchSize = 100;
     final int timePartitionBatchSize = 10;
 
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
       TDataPartitionReq dataPartitionReq;
       TDataPartitionTableResp dataPartitionTableResp;
 
@@ -383,8 +382,8 @@ public class IoTDBPartitionGetterIT {
     final String sg0 = "root.sg0";
     final String sg1 = "root.sg1";
 
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
 
       // Test getRegionId
       TGetRegionIdReq getRegionIdReq = null;
@@ -531,8 +530,8 @@ public class IoTDBPartitionGetterIT {
     TSchemaNodeManagementReq nodeManagementReq;
     TSchemaNodeManagementResp nodeManagementResp;
 
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
 
       ByteBuffer byteBuffer = generatePatternTreeBuffer(new String[] {"root"});
       nodeManagementReq = new TSchemaNodeManagementReq(byteBuffer);
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
index f9dd8af7e9..769ebeb0cd 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.confignode.it.partition;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.client.sync.ConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
@@ -101,8 +101,8 @@ public class IoTDBPartitionInheritPolicyIT {
     EnvFactory.getEnv().initClusterEnvironment(1, 3);
 
     // Set StorageGroups
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
       for (int i = 0; i < storageGroupNum; i++) {
         TSetStorageGroupReq setReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg + i));
         TSStatus status = client.setStorageGroup(setReq);
@@ -127,8 +127,8 @@ public class IoTDBPartitionInheritPolicyIT {
   @Test
   public void testDataPartitionInheritPolicy() throws Exception {
 
-    try (SyncConfigNodeIServiceClient client =
-        (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+    try (ConfigNodeIServiceClient client =
+        (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
       TDataPartitionReq dataPartitionReq = new TDataPartitionReq();
       TDataPartitionTableResp dataPartitionTableResp;
       Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap;
@@ -150,9 +150,8 @@ public class IoTDBPartitionInheritPolicyIT {
             dataPartitionReq.setPartitionSlotsMap(partitionSlotsMap);
             for (int retry = 0; retry < 5; retry++) {
               // Build new Client since it's unstable
-              try (SyncConfigNodeIServiceClient configNodeClient =
-                  (SyncConfigNodeIServiceClient)
-                      EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+              try (ConfigNodeIServiceClient configNodeClient =
+                  (ConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
                 dataPartitionTableResp =
                     configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
                 if (dataPartitionTableResp != null) {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
index 66836967de..ab737ac1c8 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -45,6 +45,9 @@ public class ClientManager<K, V> implements IClientManager<K, V> {
 
   @Override
   public V borrowClient(K node) throws ClientManagerException {
+    if (node == null) {
+      throw new ClientManagerException("Can not borrow client for node null");
+    }
     try {
       return pool.borrowObject(node);
     } catch (Exception e) {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index 094470deb6..7c03777a22 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -21,9 +21,15 @@ package org.apache.iotdb.commons.client;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.async.AsyncConfigNodeHeartbeatServiceClient;
+import org.apache.iotdb.commons.client.async.AsyncConfigNodeIServiceClient;
 import org.apache.iotdb.commons.client.async.AsyncDataNodeHeartbeatServiceClient;
 import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.async.AsyncDataNodeMPPDataExchangeServiceClient;
+import org.apache.iotdb.commons.client.property.ClientPoolProperty;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.client.sync.ConfigNodeIServiceClient;
+import org.apache.iotdb.commons.client.sync.DataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.sync.DataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -37,35 +43,76 @@ public class ClientPoolFactory {
 
   private ClientPoolFactory() {}
 
+  public static class SyncConfigNodeIServiceClientPoolFactory
+      implements IClientPoolFactory<TEndPoint, ConfigNodeIServiceClient> {
+
+    @Override
+    public KeyedObjectPool<TEndPoint, ConfigNodeIServiceClient> createClientPool(
+        ClientManager<TEndPoint, ConfigNodeIServiceClient> manager) {
+      return new GenericKeyedObjectPool<>(
+          new ConfigNodeIServiceClient.Factory(
+              manager,
+              new ThriftClientProperty.Builder()
+                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+                  .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+                  .build()),
+          new ClientPoolProperty.Builder<ConfigNodeIServiceClient>().build().getConfig());
+    }
+  }
+
+  public static class AsyncConfigNodeIServiceClientPoolFactory
+      implements IClientPoolFactory<TEndPoint, AsyncConfigNodeIServiceClient> {
+
+    @Override
+    public KeyedObjectPool<TEndPoint, AsyncConfigNodeIServiceClient> createClientPool(
+        ClientManager<TEndPoint, AsyncConfigNodeIServiceClient> manager) {
+      return new GenericKeyedObjectPool<>(
+          new AsyncConfigNodeIServiceClient.Factory(
+              manager,
+              new ThriftClientProperty.Builder()
+                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+                  .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+                  .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+                  .build(),
+              ThreadName.ASYNC_CONFIGNODE_CLIENT_POOL.getName()),
+          new ClientPoolProperty.Builder<AsyncConfigNodeIServiceClient>()
+              .setMaxIdleClientForEachNode(conf.getMaxIdleClientForEachNode())
+              .setMaxTotalClientForEachNode(conf.getMaxTotalClientForEachNode())
+              .build()
+              .getConfig());
+    }
+  }
+
   public static class SyncDataNodeInternalServiceClientPoolFactory
-      implements IClientPoolFactory<TEndPoint, SyncDataNodeInternalServiceClient> {
+      implements IClientPoolFactory<TEndPoint, DataNodeInternalServiceClient> {
+
     @Override
-    public KeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient> createClientPool(
-        ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) {
+    public KeyedObjectPool<TEndPoint, DataNodeInternalServiceClient> createClientPool(
+        ClientManager<TEndPoint, DataNodeInternalServiceClient> manager) {
       return new GenericKeyedObjectPool<>(
-          new SyncDataNodeInternalServiceClient.Factory(
+          new DataNodeInternalServiceClient.Factory(
               manager,
-              new ClientFactoryProperty.Builder()
-                  .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS())
-                  .setRpcThriftCompressionEnabled(conf.isCnRpcThriftCompressionEnabled())
-                  .setSelectorNumOfAsyncClientManager(conf.getCnSelectorNumOfClientManager())
+              new ThriftClientProperty.Builder()
+                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+                  .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
                   .build()),
-          new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>().build().getConfig());
+          new ClientPoolProperty.Builder<DataNodeInternalServiceClient>().build().getConfig());
     }
   }
 
   public static class AsyncDataNodeInternalServiceClientPoolFactory
       implements IClientPoolFactory<TEndPoint, AsyncDataNodeInternalServiceClient> {
+
     @Override
     public KeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> createClientPool(
         ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> manager) {
       return new GenericKeyedObjectPool<>(
           new AsyncDataNodeInternalServiceClient.Factory(
               manager,
-              new ClientFactoryProperty.Builder()
-                  .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS())
-                  .setRpcThriftCompressionEnabled(conf.isCnRpcThriftCompressionEnabled())
-                  .setSelectorNumOfAsyncClientManager(conf.getCnSelectorNumOfClientManager())
+              new ThriftClientProperty.Builder()
+                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+                  .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+                  .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
                   .build(),
               ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
           new ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>().build().getConfig());
@@ -74,16 +121,17 @@ public class ClientPoolFactory {
 
   public static class AsyncConfigNodeHeartbeatServiceClientPoolFactory
       implements IClientPoolFactory<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> {
+
     @Override
     public KeyedObjectPool<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> createClientPool(
         ClientManager<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> manager) {
       return new GenericKeyedObjectPool<>(
           new AsyncConfigNodeHeartbeatServiceClient.Factory(
               manager,
-              new ClientFactoryProperty.Builder()
-                  .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS())
-                  .setRpcThriftCompressionEnabled(conf.isCnRpcThriftCompressionEnabled())
-                  .setSelectorNumOfAsyncClientManager(conf.getCnSelectorNumOfClientManager())
+              new ThriftClientProperty.Builder()
+                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+                  .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+                  .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
                   .build(),
               ThreadName.ASYNC_CONFIGNODE_HEARTBEAT_CLIENT_POOL.getName()),
           new ClientPoolProperty.Builder<AsyncConfigNodeHeartbeatServiceClient>()
@@ -94,16 +142,17 @@ public class ClientPoolFactory {
 
   public static class AsyncDataNodeHeartbeatServiceClientPoolFactory
       implements IClientPoolFactory<TEndPoint, AsyncDataNodeHeartbeatServiceClient> {
+
     @Override
     public KeyedObjectPool<TEndPoint, AsyncDataNodeHeartbeatServiceClient> createClientPool(
         ClientManager<TEndPoint, AsyncDataNodeHeartbeatServiceClient> manager) {
       return new GenericKeyedObjectPool<>(
           new AsyncDataNodeHeartbeatServiceClient.Factory(
               manager,
-              new ClientFactoryProperty.Builder()
-                  .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS())
-                  .setRpcThriftCompressionEnabled(conf.isCnRpcThriftCompressionEnabled())
-                  .setSelectorNumOfAsyncClientManager(conf.getCnSelectorNumOfClientManager())
+              new ThriftClientProperty.Builder()
+                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+                  .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+                  .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
                   .build(),
               ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()),
           new ClientPoolProperty.Builder<AsyncDataNodeHeartbeatServiceClient>()
@@ -111,4 +160,44 @@ public class ClientPoolFactory {
               .getConfig());
     }
   }
+
+  public static class SyncDataNodeMPPDataExchangeServiceClientPoolFactory
+      implements IClientPoolFactory<TEndPoint, DataNodeMPPDataExchangeServiceClient> {
+
+    @Override
+    public KeyedObjectPool<TEndPoint, DataNodeMPPDataExchangeServiceClient> createClientPool(
+        ClientManager<TEndPoint, DataNodeMPPDataExchangeServiceClient> manager) {
+      return new GenericKeyedObjectPool<>(
+          new DataNodeMPPDataExchangeServiceClient.Factory(
+              manager,
+              new ThriftClientProperty.Builder()
+                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+                  .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+                  .build()),
+          new ClientPoolProperty.Builder<DataNodeMPPDataExchangeServiceClient>()
+              .build()
+              .getConfig());
+    }
+  }
+
+  public static class AsyncDataNodeMPPDataExchangeServiceClientPoolFactory
+      implements IClientPoolFactory<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> {
+
+    @Override
+    public KeyedObjectPool<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> createClientPool(
+        ClientManager<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> manager) {
+      return new GenericKeyedObjectPool<>(
+          new AsyncDataNodeMPPDataExchangeServiceClient.Factory(
+              manager,
+              new ThriftClientProperty.Builder()
+                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+                  .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+                  .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+                  .build(),
+              ThreadName.ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL.getName()),
+          new ClientPoolProperty.Builder<AsyncDataNodeMPPDataExchangeServiceClient>()
+              .build()
+              .getConfig());
+    }
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
new file mode 100644
index 0000000000..4209944b77
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.client;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.net.SocketException;
+
+/**
+ * This class defines the failed interfaces that thrift client needs to support so that the Thrift
+ * Client can clean up the clientManager when it receives the corresponding exception.
+ */
+public interface ThriftClient {
+
+  Logger LOGGER = LoggerFactory.getLogger(ThriftClient.class);
+
+  /** Close this connection. */
+  void invalidate();
+
+  /** Removing all pooled instances corresponding to current instance's endpoint. */
+  void invalidateAll();
+
+  static void resolveException(Throwable t, ThriftClient o) {
+    Throwable origin = t;
+    if (t instanceof InvocationTargetException) {
+      origin = ((InvocationTargetException) t).getTargetException();
+    }
+    Throwable cur = origin;
+    if (cur instanceof TException) {
+      int level = 0;
+      while (cur != null) {
+        LOGGER.debug(
+            "level-{} Exception class {}, message {}",
+            level,
+            cur.getClass().getName(),
+            cur.getMessage());
+        cur = cur.getCause();
+        level++;
+      }
+      o.invalidate();
+    }
+
+    Throwable rootCause = ExceptionUtils.getRootCause(origin);
+    if (rootCause != null) {
+      // if the exception is SocketException and its error message is Broken pipe, it means that
+      // the remote node may restart and all the connection we cached before should be cleared.
+      LOGGER.debug(
+          "root cause message {}, LocalizedMessage {}, ",
+          rootCause.getMessage(),
+          rootCause.getLocalizedMessage(),
+          rootCause);
+      if (isConnectionBroken(rootCause)) {
+        LOGGER.debug(
+            "Broken pipe error happened in sending RPC, we need to clear all previous cached connection",
+            t);
+        o.invalidateAll();
+      }
+    }
+  }
+
+  static boolean isConnectionBroken(Throwable cause) {
+    return (cause instanceof SocketException && cause.getMessage().contains("Broken pipe"))
+        || (cause instanceof TTransportException
+            && cause.getMessage().contains("Socket is closed by peer"));
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java
index 38b12d2370..62e6e40720 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java
@@ -20,9 +20,10 @@
 package org.apache.iotdb.commons.client.async;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.AsyncBaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
 import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
 
@@ -33,7 +34,8 @@ import org.apache.thrift.protocol.TProtocolFactory;
 
 import java.io.IOException;
 
-public class AsyncConfigNodeHeartbeatServiceClient extends IConfigNodeRPCService.AsyncClient {
+public class AsyncConfigNodeHeartbeatServiceClient extends IConfigNodeRPCService.AsyncClient
+    implements ThriftClient {
 
   private final TEndPoint endpoint;
   private final ClientManager<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> clientManager;
@@ -53,41 +55,44 @@ public class AsyncConfigNodeHeartbeatServiceClient extends IConfigNodeRPCService
     this.clientManager = clientManager;
   }
 
-  public void close() {
-    ___transport.close();
-    ___currentMethod = null;
-  }
-
-  /**
-   * return self if clientManager is not null, the method doesn't need to call by user, it will
-   * trigger once client transport complete.
-   */
-  private void returnSelf() {
-    if (clientManager != null) {
-      clientManager.returnClient(endpoint, this);
-    }
-  }
-
-  /**
-   * This method will be automatically called by the thrift selector thread, and we'll just simulate
-   * the behavior in our test
-   */
   @Override
   public void onComplete() {
     super.onComplete();
     returnSelf();
   }
 
-  /**
-   * This method will be automatically called by the thrift selector thread, and we'll just simulate
-   * the behavior in our test
-   */
   @Override
   public void onError(Exception e) {
     super.onError(e);
+    ThriftClient.resolveException(e, this);
     returnSelf();
   }
 
+  @Override
+  public void invalidate() {
+    if (!hasError()) {
+      super.onError(new Exception("This client has been invalidated"));
+    }
+  }
+
+  @Override
+  public void invalidateAll() {
+    clientManager.clear(endpoint);
+  }
+
+  /**
+   * return self if clientManager is not null, the method doesn't need to call by user, it will
+   * trigger once client transport complete.
+   */
+  private void returnSelf() {
+    clientManager.returnClient(endpoint, this);
+  }
+
+  private void close() {
+    ___transport.close();
+    ___currentMethod = null;
+  }
+
   public boolean isReady() {
     try {
       checkReady();
@@ -103,13 +108,13 @@ public class AsyncConfigNodeHeartbeatServiceClient extends IConfigNodeRPCService
   }
 
   public static class Factory
-      extends AsyncBaseClientFactory<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> {
+      extends AsyncThriftClientFactory<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> {
 
     public Factory(
         ClientManager<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> clientManager,
-        ClientFactoryProperty clientFactoryProperty,
+        ThriftClientProperty thriftClientProperty,
         String threadName) {
-      super(clientManager, clientFactoryProperty, threadName);
+      super(clientManager, thriftClientProperty, threadName);
     }
 
     @Override
@@ -121,21 +126,19 @@ public class AsyncConfigNodeHeartbeatServiceClient extends IConfigNodeRPCService
     @Override
     public PooledObject<AsyncConfigNodeHeartbeatServiceClient> makeObject(TEndPoint endPoint)
         throws Exception {
-      TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % tManagers.length];
-      tManager = tManager == null ? new TAsyncClientManager() : tManager;
       return new DefaultPooledObject<>(
           new AsyncConfigNodeHeartbeatServiceClient(
-              clientFactoryProperty.getProtocolFactory(),
-              clientFactoryProperty.getConnectionTimeoutMs(),
+              thriftClientProperty.getProtocolFactory(),
+              thriftClientProperty.getConnectionTimeoutMs(),
               endPoint,
-              tManager,
+              tManagers[clientCnt.incrementAndGet() % tManagers.length],
               clientManager));
     }
 
     @Override
     public boolean validateObject(
         TEndPoint endPoint, PooledObject<AsyncConfigNodeHeartbeatServiceClient> pooledObject) {
-      return pooledObject.getObject() != null && pooledObject.getObject().isReady();
+      return pooledObject.getObject().isReady();
     }
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
index 32219dcbfb..467553ccea 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
@@ -20,9 +20,10 @@
 package org.apache.iotdb.commons.client.async;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.AsyncBaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
 import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
 
@@ -35,7 +36,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
-public class AsyncConfigNodeIServiceClient extends IConfigNodeRPCService.AsyncClient {
+public class AsyncConfigNodeIServiceClient extends IConfigNodeRPCService.AsyncClient
+    implements ThriftClient {
 
   private static final Logger logger = LoggerFactory.getLogger(AsyncConfigNodeIServiceClient.class);
 
@@ -57,47 +59,49 @@ public class AsyncConfigNodeIServiceClient extends IConfigNodeRPCService.AsyncCl
     this.clientManager = clientManager;
   }
 
-  public void close() {
-    ___transport.close();
-    ___currentMethod = null;
-  }
-
-  /**
-   * return self if clientManager is not null, the method doesn't need to call by user, it will
-   * trigger once client transport complete.
-   */
-  private void returnSelf() {
-    if (clientManager != null) {
-      clientManager.returnClient(endpoint, this);
-    }
-  }
-
-  /**
-   * This method will be automatically called by the thrift selector thread, and we'll just simulate
-   * the behavior in our test
-   */
   @Override
   public void onComplete() {
     super.onComplete();
     returnSelf();
   }
 
-  /**
-   * This method will be automatically called by the thrift selector thread, and we'll just simulate
-   * the behavior in our test
-   */
   @Override
   public void onError(Exception e) {
     super.onError(e);
+    ThriftClient.resolveException(e, this);
     returnSelf();
   }
 
+  @Override
+  public void invalidate() {
+    if (!hasError()) {
+      super.onError(new Exception("This client has been invalidated"));
+    }
+  }
+
+  @Override
+  public void invalidateAll() {
+    clientManager.clear(endpoint);
+  }
+
+  /**
+   * return self if clientManager is not null, the method doesn't need to call by user, it will
+   * trigger once client transport complete.
+   */
+  private void returnSelf() {
+    clientManager.returnClient(endpoint, this);
+  }
+
+  private void close() {
+    ___transport.close();
+    ___currentMethod = null;
+  }
+
   public boolean isReady() {
     try {
       checkReady();
       return true;
     } catch (Exception e) {
-      logger.info("Unexpected exception occurs in {} :", this, e);
       return false;
     }
   }
@@ -108,13 +112,13 @@ public class AsyncConfigNodeIServiceClient extends IConfigNodeRPCService.AsyncCl
   }
 
   public static class Factory
-      extends AsyncBaseClientFactory<TEndPoint, AsyncConfigNodeIServiceClient> {
+      extends AsyncThriftClientFactory<TEndPoint, AsyncConfigNodeIServiceClient> {
 
     public Factory(
         ClientManager<TEndPoint, AsyncConfigNodeIServiceClient> clientManager,
-        ClientFactoryProperty clientFactoryProperty,
+        ThriftClientProperty thriftClientProperty,
         String threadName) {
-      super(clientManager, clientFactoryProperty, threadName);
+      super(clientManager, thriftClientProperty, threadName);
     }
 
     @Override
@@ -126,21 +130,19 @@ public class AsyncConfigNodeIServiceClient extends IConfigNodeRPCService.AsyncCl
     @Override
     public PooledObject<AsyncConfigNodeIServiceClient> makeObject(TEndPoint endPoint)
         throws Exception {
-      TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % tManagers.length];
-      tManager = tManager == null ? new TAsyncClientManager() : tManager;
       return new DefaultPooledObject<>(
           new AsyncConfigNodeIServiceClient(
-              clientFactoryProperty.getProtocolFactory(),
-              clientFactoryProperty.getConnectionTimeoutMs(),
+              thriftClientProperty.getProtocolFactory(),
+              thriftClientProperty.getConnectionTimeoutMs(),
               endPoint,
-              tManager,
+              tManagers[clientCnt.incrementAndGet() % tManagers.length],
               clientManager));
     }
 
     @Override
     public boolean validateObject(
         TEndPoint endPoint, PooledObject<AsyncConfigNodeIServiceClient> pooledObject) {
-      return pooledObject.getObject() != null && pooledObject.getObject().isReady();
+      return pooledObject.getObject().isReady();
     }
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java
index 2812886336..efd0c81faf 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java
@@ -20,9 +20,10 @@
 package org.apache.iotdb.commons.client.async;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.AsyncBaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
 import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
 
@@ -33,7 +34,8 @@ import org.apache.thrift.protocol.TProtocolFactory;
 
 import java.io.IOException;
 
-public class AsyncDataNodeHeartbeatServiceClient extends IDataNodeRPCService.AsyncClient {
+public class AsyncDataNodeHeartbeatServiceClient extends IDataNodeRPCService.AsyncClient
+    implements ThriftClient {
 
   private final TEndPoint endpoint;
   private final ClientManager<TEndPoint, AsyncDataNodeHeartbeatServiceClient> clientManager;
@@ -53,41 +55,44 @@ public class AsyncDataNodeHeartbeatServiceClient extends IDataNodeRPCService.Asy
     this.clientManager = clientManager;
   }
 
-  public void close() {
-    ___transport.close();
-    ___currentMethod = null;
-  }
-
-  /**
-   * return self if clientManager is not null, the method doesn't need to call by user, it will
-   * trigger once client transport complete.
-   */
-  private void returnSelf() {
-    if (clientManager != null) {
-      clientManager.returnClient(endpoint, this);
-    }
-  }
-
-  /**
-   * This method will be automatically called by the thrift selector thread, and we'll just simulate
-   * the behavior in our test
-   */
   @Override
   public void onComplete() {
     super.onComplete();
     returnSelf();
   }
 
-  /**
-   * This method will be automatically called by the thrift selector thread, and we'll just simulate
-   * the behavior in our test
-   */
   @Override
   public void onError(Exception e) {
     super.onError(e);
+    ThriftClient.resolveException(e, this);
     returnSelf();
   }
 
+  @Override
+  public void invalidate() {
+    if (!hasError()) {
+      super.onError(new Exception("This client has been invalidated"));
+    }
+  }
+
+  @Override
+  public void invalidateAll() {
+    clientManager.clear(endpoint);
+  }
+
+  /**
+   * return self if clientManager is not null, the method doesn't need to call by user, it will
+   * trigger once client transport complete.
+   */
+  private void returnSelf() {
+    clientManager.returnClient(endpoint, this);
+  }
+
+  private void close() {
+    ___transport.close();
+    ___currentMethod = null;
+  }
+
   public boolean isReady() {
     try {
       checkReady();
@@ -103,13 +108,13 @@ public class AsyncDataNodeHeartbeatServiceClient extends IDataNodeRPCService.Asy
   }
 
   public static class Factory
-      extends AsyncBaseClientFactory<TEndPoint, AsyncDataNodeHeartbeatServiceClient> {
+      extends AsyncThriftClientFactory<TEndPoint, AsyncDataNodeHeartbeatServiceClient> {
 
     public Factory(
         ClientManager<TEndPoint, AsyncDataNodeHeartbeatServiceClient> clientManager,
-        ClientFactoryProperty clientFactoryProperty,
+        ThriftClientProperty thriftClientProperty,
         String threadName) {
-      super(clientManager, clientFactoryProperty, threadName);
+      super(clientManager, thriftClientProperty, threadName);
     }
 
     @Override
@@ -121,21 +126,19 @@ public class AsyncDataNodeHeartbeatServiceClient extends IDataNodeRPCService.Asy
     @Override
     public PooledObject<AsyncDataNodeHeartbeatServiceClient> makeObject(TEndPoint endPoint)
         throws Exception {
-      TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % tManagers.length];
-      tManager = tManager == null ? new TAsyncClientManager() : tManager;
       return new DefaultPooledObject<>(
           new AsyncDataNodeHeartbeatServiceClient(
-              clientFactoryProperty.getProtocolFactory(),
-              clientFactoryProperty.getConnectionTimeoutMs(),
+              thriftClientProperty.getProtocolFactory(),
+              thriftClientProperty.getConnectionTimeoutMs(),
               endPoint,
-              tManager,
+              tManagers[clientCnt.incrementAndGet() % tManagers.length],
               clientManager));
     }
 
     @Override
     public boolean validateObject(
         TEndPoint endPoint, PooledObject<AsyncDataNodeHeartbeatServiceClient> pooledObject) {
-      return pooledObject.getObject() != null && pooledObject.getObject().isReady();
+      return pooledObject.getObject().isReady();
     }
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
index 5329a4d9f2..4c182b1627 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
@@ -20,9 +20,10 @@
 package org.apache.iotdb.commons.client.async;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.AsyncBaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
 import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
@@ -36,7 +37,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
-public class AsyncDataNodeInternalServiceClient extends IDataNodeRPCService.AsyncClient {
+public class AsyncDataNodeInternalServiceClient extends IDataNodeRPCService.AsyncClient
+    implements ThriftClient {
 
   private static final Logger logger =
       LoggerFactory.getLogger(AsyncDataNodeInternalServiceClient.class);
@@ -69,41 +71,44 @@ public class AsyncDataNodeInternalServiceClient extends IDataNodeRPCService.Asyn
     return clientManager;
   }
 
-  public void close() {
-    ___transport.close();
-    ___currentMethod = null;
-  }
-
-  /**
-   * return self if clientManager is not null, the method doesn't need to call by user, it will
-   * trigger once client transport complete.
-   */
-  private void returnSelf() {
-    if (clientManager != null) {
-      clientManager.returnClient(endpoint, this);
-    }
-  }
-
-  /**
-   * This method will be automatically called by the thrift selector thread, and we'll just simulate
-   * the behavior in our test
-   */
   @Override
   public void onComplete() {
     super.onComplete();
     returnSelf();
   }
 
-  /**
-   * This method will be automatically called by the thrift selector thread, and we'll just simulate
-   * the behavior in our test
-   */
   @Override
   public void onError(Exception e) {
     super.onError(e);
+    ThriftClient.resolveException(e, this);
     returnSelf();
   }
 
+  @Override
+  public void invalidate() {
+    if (!hasError()) {
+      super.onError(new Exception("This client has been invalidated"));
+    }
+  }
+
+  @Override
+  public void invalidateAll() {
+    clientManager.clear(endpoint);
+  }
+
+  /**
+   * return self if clientManager is not null, the method doesn't need to call by user, it will
+   * trigger once client transport complete.
+   */
+  private void returnSelf() {
+    clientManager.returnClient(endpoint, this);
+  }
+
+  private void close() {
+    ___transport.close();
+    ___currentMethod = null;
+  }
+
   public boolean isReady() {
     try {
       checkReady();
@@ -120,13 +125,13 @@ public class AsyncDataNodeInternalServiceClient extends IDataNodeRPCService.Asyn
   }
 
   public static class Factory
-      extends AsyncBaseClientFactory<TEndPoint, AsyncDataNodeInternalServiceClient> {
+      extends AsyncThriftClientFactory<TEndPoint, AsyncDataNodeInternalServiceClient> {
 
     public Factory(
         ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> clientManager,
-        ClientFactoryProperty clientFactoryProperty,
+        ThriftClientProperty thriftClientProperty,
         String threadName) {
-      super(clientManager, clientFactoryProperty, threadName);
+      super(clientManager, thriftClientProperty, threadName);
     }
 
     @Override
@@ -138,21 +143,19 @@ public class AsyncDataNodeInternalServiceClient extends IDataNodeRPCService.Asyn
     @Override
     public PooledObject<AsyncDataNodeInternalServiceClient> makeObject(TEndPoint endPoint)
         throws Exception {
-      TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % tManagers.length];
-      tManager = tManager == null ? new TAsyncClientManager() : tManager;
       return new DefaultPooledObject<>(
           new AsyncDataNodeInternalServiceClient(
-              clientFactoryProperty.getProtocolFactory(),
-              clientFactoryProperty.getConnectionTimeoutMs(),
+              thriftClientProperty.getProtocolFactory(),
+              thriftClientProperty.getConnectionTimeoutMs(),
               endPoint,
-              tManager,
+              tManagers[clientCnt.incrementAndGet() % tManagers.length],
               clientManager));
     }
 
     @Override
     public boolean validateObject(
         TEndPoint endPoint, PooledObject<AsyncDataNodeInternalServiceClient> pooledObject) {
-      return pooledObject.getObject() != null && pooledObject.getObject().isReady();
+      return pooledObject.getObject().isReady();
     }
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
index 667f3189b1..c1ef72cd64 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
@@ -20,9 +20,10 @@
 package org.apache.iotdb.commons.client.async;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.AsyncBaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.mpp.rpc.thrift.MPPDataExchangeService;
 import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
 
@@ -35,7 +36,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
-public class AsyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeService.AsyncClient {
+public class AsyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeService.AsyncClient
+    implements ThriftClient {
 
   private static final Logger logger =
       LoggerFactory.getLogger(AsyncDataNodeMPPDataExchangeServiceClient.class);
@@ -58,47 +60,50 @@ public class AsyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSe
     this.clientManager = clientManager;
   }
 
-  public void close() {
-    ___transport.close();
-    ___currentMethod = null;
-  }
-
-  /**
-   * return self if clientManager is not null, the method doesn't need to call by user, it will
-   * trigger once client transport complete.
-   */
-  private void returnSelf() {
-    if (clientManager != null) {
-      clientManager.returnClient(endpoint, this);
-    }
-  }
-
-  /**
-   * This method will be automatically called by the thrift selector thread, and we'll just simulate
-   * the behavior in our test
-   */
   @Override
   public void onComplete() {
     super.onComplete();
     returnSelf();
   }
 
-  /**
-   * This method will be automatically called by the thrift selector thread, and we'll just simulate
-   * the behavior in our test
-   */
   @Override
   public void onError(Exception e) {
     super.onError(e);
+    ThriftClient.resolveException(e, this);
     returnSelf();
   }
 
-  public boolean isReady() {
+  @Override
+  public void invalidate() {
+    if (!hasError()) {
+      super.onError(new Exception("This client has been invalidated"));
+    }
+  }
+
+  @Override
+  public void invalidateAll() {
+    clientManager.clear(endpoint);
+  }
+
+  /**
+   * return self if clientManager is not null, the method doesn't need to call by user, it will
+   * trigger once client transport complete.
+   */
+  private void returnSelf() {
+    clientManager.returnClient(endpoint, this);
+  }
+
+  private void close() {
+    ___transport.close();
+    ___currentMethod = null;
+  }
+
+  private boolean isReady() {
     try {
       checkReady();
       return true;
     } catch (Exception e) {
-      logger.info("Unexpected exception occurs in {} :", this, e);
+      logger.error("Unexpected exception occurs in {} : {}", this, e.getMessage());
       return false;
     }
   }
@@ -109,13 +114,13 @@ public class AsyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSe
   }
 
   public static class Factory
-      extends AsyncBaseClientFactory<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> {
+      extends AsyncThriftClientFactory<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> {
 
     public Factory(
         ClientManager<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> clientManager,
-        ClientFactoryProperty clientFactoryProperty,
+        ThriftClientProperty thriftClientProperty,
         String threadName) {
-      super(clientManager, clientFactoryProperty, threadName);
+      super(clientManager, thriftClientProperty, threadName);
     }
 
     @Override
@@ -127,21 +132,19 @@ public class AsyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSe
     @Override
     public PooledObject<AsyncDataNodeMPPDataExchangeServiceClient> makeObject(TEndPoint endPoint)
         throws Exception {
-      TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % tManagers.length];
-      tManager = tManager == null ? new TAsyncClientManager() : tManager;
       return new DefaultPooledObject<>(
           new AsyncDataNodeMPPDataExchangeServiceClient(
-              clientFactoryProperty.getProtocolFactory(),
-              clientFactoryProperty.getConnectionTimeoutMs(),
+              thriftClientProperty.getProtocolFactory(),
+              thriftClientProperty.getConnectionTimeoutMs(),
               endPoint,
-              tManager,
+              tManagers[clientCnt.incrementAndGet() % tManagers.length],
               clientManager));
     }
 
     @Override
     public boolean validateObject(
         TEndPoint endPoint, PooledObject<AsyncDataNodeMPPDataExchangeServiceClient> pooledObject) {
-      return pooledObject.getObject() != null && pooledObject.getObject().isReady();
+      return pooledObject.getObject().isReady();
     }
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java
index 439f25b655..7da6297305 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java
@@ -20,7 +20,12 @@
 package org.apache.iotdb.commons.client.exception;
 
 public class ClientManagerException extends Exception {
+
   public ClientManagerException(Exception exception) {
     super(exception);
   }
+
+  public ClientManagerException(String message) {
+    super(message);
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/factory/AsyncThriftClientFactory.java
similarity index 52%
rename from node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/client/factory/AsyncThriftClientFactory.java
index 73fd0dca22..756f99def9 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/factory/AsyncThriftClientFactory.java
@@ -17,41 +17,40 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.client;
+package org.apache.iotdb.commons.client.factory;
+
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 
 import org.apache.thrift.async.TAsyncClientManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
-public abstract class AsyncBaseClientFactory<K, V> extends BaseClientFactory<K, V> {
+public abstract class AsyncThriftClientFactory<K, V> extends ThriftClientFactory<K, V> {
 
-  private static final Logger logger = LoggerFactory.getLogger(AsyncBaseClientFactory.class);
-  protected TAsyncClientManager[] tManagers;
-  protected AtomicInteger clientCnt = new AtomicInteger();
+  protected final TAsyncClientManager[] tManagers;
+  protected final AtomicInteger clientCnt = new AtomicInteger();
   private static final String THRIFT_THREAD_NAME = "TAsyncClientManager#SelectorThread";
 
-  protected AsyncBaseClientFactory(
+  protected AsyncThriftClientFactory(
       ClientManager<K, V> clientManager,
-      ClientFactoryProperty clientFactoryProperty,
+      ThriftClientProperty thriftClientProperty,
       String threadName) {
-    super(clientManager, clientFactoryProperty);
-    synchronized (this) {
-      tManagers = new TAsyncClientManager[clientFactoryProperty.getSelectorNumOfAsyncClientPool()];
+    super(clientManager, thriftClientProperty);
+    try {
+      tManagers = new TAsyncClientManager[thriftClientProperty.getSelectorNumOfAsyncClientPool()];
       for (int i = 0; i < tManagers.length; i++) {
-        try {
-          tManagers[i] = new TAsyncClientManager();
-        } catch (IOException e) {
-          logger.error("Cannot create Async client factory", e);
-        }
+        tManagers[i] = new TAsyncClientManager();
       }
-      Thread.getAllStackTraces().keySet().stream()
-          .filter(thread -> thread.getName().contains(THRIFT_THREAD_NAME))
-          .collect(Collectors.toList())
-          .forEach(thread -> thread.setName(threadName + "-selector" + "-" + thread.getId()));
+    } catch (IOException e) {
+      throw new RuntimeException(
+          String.format("Cannot create Async thrift client factory %s", threadName), e);
     }
+    Thread.getAllStackTraces().keySet().stream()
+        .filter(thread -> thread.getName().contains(THRIFT_THREAD_NAME))
+        .collect(Collectors.toList())
+        .forEach(thread -> thread.setName(threadName + "-selector-" + thread.getId()));
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/factory/BaseClientFactory.java
similarity index 82%
rename from node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/client/factory/BaseClientFactory.java
index 510d495cae..680aa2efcd 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/factory/BaseClientFactory.java
@@ -17,7 +17,9 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.client;
+package org.apache.iotdb.commons.client.factory;
+
+import org.apache.iotdb.commons.client.ClientManager;
 
 import org.apache.commons.pool2.KeyedPooledObjectFactory;
 import org.apache.commons.pool2.PooledObject;
@@ -25,12 +27,9 @@ import org.apache.commons.pool2.PooledObject;
 public abstract class BaseClientFactory<K, V> implements KeyedPooledObjectFactory<K, V> {
 
   protected ClientManager<K, V> clientManager;
-  protected ClientFactoryProperty clientFactoryProperty;
 
-  protected BaseClientFactory(
-      ClientManager<K, V> clientManager, ClientFactoryProperty clientFactoryProperty) {
+  protected BaseClientFactory(ClientManager<K, V> clientManager) {
     this.clientManager = clientManager;
-    this.clientFactoryProperty = clientFactoryProperty;
   }
 
   @Override
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/factory/ThriftClientFactory.java
similarity index 61%
rename from node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/client/factory/ThriftClientFactory.java
index ff3862f879..8c6c178279 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/factory/ThriftClientFactory.java
@@ -16,16 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.commons.client.sync;
 
-public interface SyncThriftClient {
+package org.apache.iotdb.commons.client.factory;
 
-  /** close the connection */
-  void invalidate();
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 
-  /**
-   * Clears the specified pool, removing all pooled instances corresponding to current instance's
-   * endPoint.
-   */
-  void invalidateAll();
+public abstract class ThriftClientFactory<K, V> extends BaseClientFactory<K, V> {
+
+  protected ThriftClientProperty thriftClientProperty;
+
+  protected ThriftClientFactory(
+      ClientManager<K, V> clientManager, ThriftClientProperty thriftClientProperty) {
+    super(clientManager);
+    this.thriftClientProperty = thriftClientProperty;
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolProperty.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
similarity index 94%
rename from node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolProperty.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
index 45870e9ba6..fc9ae843f9 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolProperty.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.client;
+package org.apache.iotdb.commons.client.property;
 
 import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
 
@@ -84,7 +84,7 @@ public class ClientPoolProperty<V> {
     private DefaultProperty() {}
 
     public static final long WAIT_CLIENT_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10);
-    public static final int MAX_TOTAL_CLIENT_FOR_EACH_NODE = 100;
-    public static final int MAX_IDLE_CLIENT_FOR_EACH_NODE = 100;
+    public static final int MAX_TOTAL_CLIENT_FOR_EACH_NODE = 300;
+    public static final int MAX_IDLE_CLIENT_FOR_EACH_NODE = 200;
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java
similarity index 93%
rename from node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java
index 363decbf2d..c54c23fbd7 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.client;
+package org.apache.iotdb.commons.client.property;
 
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
@@ -25,13 +25,13 @@ import org.apache.thrift.protocol.TProtocolFactory;
 
 import java.util.concurrent.TimeUnit;
 
-public class ClientFactoryProperty {
+public class ThriftClientProperty {
 
   private final TProtocolFactory protocolFactory;
   private final int connectionTimeoutMs;
   private final int selectorNumOfAsyncClientPool;
 
-  public ClientFactoryProperty(
+  public ThriftClientProperty(
       TProtocolFactory protocolFactory, int connectionTimeoutMs, int selectorNumOfAsyncClientPool) {
     this.protocolFactory = protocolFactory;
     this.connectionTimeoutMs = connectionTimeoutMs;
@@ -75,8 +75,8 @@ public class ClientFactoryProperty {
       return this;
     }
 
-    public ClientFactoryProperty build() {
-      return new ClientFactoryProperty(
+    public ThriftClientProperty build() {
+      return new ThriftClientProperty(
           rpcThriftCompressionEnabled
               ? new TCompactProtocol.Factory()
               : new TBinaryProtocol.Factory(),
@@ -90,7 +90,7 @@ public class ClientFactoryProperty {
     private DefaultProperty() {}
 
     public static final boolean RPC_THRIFT_COMPRESSED_ENABLED = false;
-    public static final int CONNECTION_TIMEOUT_MS = (int) TimeUnit.SECONDS.toMillis(30);
+    public static final int CONNECTION_TIMEOUT_MS = (int) TimeUnit.SECONDS.toMillis(20);
     public static final int SELECTOR_NUM_OF_ASYNC_CLIENT_MANAGER = 1;
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/ConfigNodeIServiceClient.java
similarity index 62%
rename from node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/client/sync/ConfigNodeIServiceClient.java
index 522470d8be..249727f1ff 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/ConfigNodeIServiceClient.java
@@ -20,9 +20,10 @@
 package org.apache.iotdb.commons.client.sync;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.BaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
 import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.rpc.TConfigurationConst;
@@ -34,20 +35,19 @@ import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
-import java.lang.reflect.Constructor;
 import java.net.SocketException;
 
-public class SyncConfigNodeIServiceClient extends IConfigNodeRPCService.Client
-    implements SyncThriftClient, AutoCloseable {
+public class ConfigNodeIServiceClient extends IConfigNodeRPCService.Client
+    implements ThriftClient, AutoCloseable {
 
-  private final TEndPoint endPoint;
-  private final ClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager;
+  private final TEndPoint endpoint;
+  private final ClientManager<TEndPoint, ConfigNodeIServiceClient> clientManager;
 
-  public SyncConfigNodeIServiceClient(
+  public ConfigNodeIServiceClient(
       TProtocolFactory protocolFactory,
       int connectionTimeout,
       TEndPoint endPoint,
-      ClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager)
+      ClientManager<TEndPoint, ConfigNodeIServiceClient> clientManager)
       throws TTransportException {
     super(
         protocolFactory.getProtocol(
@@ -57,15 +57,13 @@ public class SyncConfigNodeIServiceClient extends IConfigNodeRPCService.Client
                     endPoint.getIp(),
                     endPoint.getPort(),
                     connectionTimeout))));
-    this.endPoint = endPoint;
+    this.endpoint = endPoint;
     this.clientManager = clientManager;
     getInputProtocol().getTransport().open();
   }
 
-  public void close() {
-    if (clientManager != null) {
-      clientManager.returnClient(endPoint, this);
-    }
+  public int getTimeout() throws SocketException {
+    return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
   }
 
   public void setTimeout(int timeout) {
@@ -73,59 +71,57 @@ public class SyncConfigNodeIServiceClient extends IConfigNodeRPCService.Client
     ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
   }
 
+  @Override
+  public void close() {
+    clientManager.returnClient(endpoint, this);
+  }
+
+  @Override
   public void invalidate() {
     getInputProtocol().getTransport().close();
   }
 
   @Override
   public void invalidateAll() {
-    clientManager.clear(endPoint);
-  }
-
-  public int getTimeout() throws SocketException {
-    return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
+    clientManager.clear(endpoint);
   }
 
   @Override
   public String toString() {
-    return String.format("SyncConfigNodeIServiceClient{%s}", endPoint);
+    return String.format("SyncConfigNodeIServiceClient{%s}", endpoint);
   }
 
-  public static class Factory extends BaseClientFactory<TEndPoint, SyncConfigNodeIServiceClient> {
+  public static class Factory extends ThriftClientFactory<TEndPoint, ConfigNodeIServiceClient> {
 
     public Factory(
-        ClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager,
-        ClientFactoryProperty clientFactoryProperty) {
-      super(clientManager, clientFactoryProperty);
+        ClientManager<TEndPoint, ConfigNodeIServiceClient> clientManager,
+        ThriftClientProperty thriftClientProperty) {
+      super(clientManager, thriftClientProperty);
     }
 
     @Override
     public void destroyObject(
-        TEndPoint endpoint, PooledObject<SyncConfigNodeIServiceClient> pooledObject) {
+        TEndPoint endpoint, PooledObject<ConfigNodeIServiceClient> pooledObject) {
       pooledObject.getObject().invalidate();
     }
 
     @Override
-    public PooledObject<SyncConfigNodeIServiceClient> makeObject(TEndPoint endpoint)
-        throws Exception {
-      Constructor<SyncConfigNodeIServiceClient> constructor =
-          SyncConfigNodeIServiceClient.class.getConstructor(
-              TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass());
+    public PooledObject<ConfigNodeIServiceClient> makeObject(TEndPoint endpoint) throws Exception {
       return new DefaultPooledObject<>(
           SyncThriftClientWithErrorHandler.newErrorHandler(
-              SyncConfigNodeIServiceClient.class,
-              constructor,
-              clientFactoryProperty.getProtocolFactory(),
-              clientFactoryProperty.getConnectionTimeoutMs(),
+              ConfigNodeIServiceClient.class,
+              ConfigNodeIServiceClient.class.getConstructor(
+                  TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass()),
+              thriftClientProperty.getProtocolFactory(),
+              thriftClientProperty.getConnectionTimeoutMs(),
               endpoint,
               clientManager));
     }
 
     @Override
     public boolean validateObject(
-        TEndPoint endpoint, PooledObject<SyncConfigNodeIServiceClient> pooledObject) {
-      return pooledObject.getObject() != null
-          && pooledObject.getObject().getInputProtocol().getTransport().isOpen();
+        TEndPoint endpoint, PooledObject<ConfigNodeIServiceClient> pooledObject) {
+      return pooledObject.getObject().getInputProtocol().getTransport().isOpen();
     }
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/DataNodeInternalServiceClient.java
similarity index 60%
rename from node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/client/sync/DataNodeInternalServiceClient.java
index 3d992c5b48..5cfbc0aa01 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/DataNodeInternalServiceClient.java
@@ -20,9 +20,10 @@
 package org.apache.iotdb.commons.client.sync;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.BaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
 import org.apache.iotdb.rpc.RpcTransportFactory;
@@ -35,109 +36,105 @@ import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
-import java.lang.reflect.Constructor;
 import java.net.SocketException;
 
-public class SyncDataNodeInternalServiceClient extends IDataNodeRPCService.Client
-    implements SyncThriftClient, AutoCloseable {
+public class DataNodeInternalServiceClient extends IDataNodeRPCService.Client
+    implements ThriftClient, AutoCloseable {
 
-  private final TEndPoint endPoint;
-  private final ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager;
+  private final TEndPoint endpoint;
+  private final ClientManager<TEndPoint, DataNodeInternalServiceClient> clientManager;
 
-  public SyncDataNodeInternalServiceClient(
+  public DataNodeInternalServiceClient(
       TProtocolFactory protocolFactory,
       int connectionTimeout,
-      TEndPoint endPoint,
-      ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager)
+      TEndPoint endpoint,
+      ClientManager<TEndPoint, DataNodeInternalServiceClient> clientManager)
       throws TTransportException {
     super(
         protocolFactory.getProtocol(
             RpcTransportFactory.INSTANCE.getTransport(
                 new TSocket(
                     TConfigurationConst.defaultTConfiguration,
-                    endPoint.getIp(),
-                    endPoint.getPort(),
+                    endpoint.getIp(),
+                    endpoint.getPort(),
                     connectionTimeout))));
-    this.endPoint = endPoint;
+    this.endpoint = endpoint;
     this.clientManager = clientManager;
     getInputProtocol().getTransport().open();
   }
 
+  public int getTimeout() throws SocketException {
+    return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
+  }
+
+  public void setTimeout(int timeout) {
+    // the same transport is used in both input and output
+    ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
+  }
+
   @TestOnly
   public TEndPoint getTEndpoint() {
-    return endPoint;
+    return endpoint;
   }
 
   @TestOnly
-  public ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> getClientManager() {
+  public ClientManager<TEndPoint, DataNodeInternalServiceClient> getClientManager() {
     return clientManager;
   }
 
+  @Override
   public void close() {
-    if (clientManager != null) {
-      clientManager.returnClient(endPoint, this);
-    }
-  }
-
-  public void setTimeout(int timeout) {
-    // the same transport is used in both input and output
-    ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
+    clientManager.returnClient(endpoint, this);
   }
 
+  @Override
   public void invalidate() {
     getInputProtocol().getTransport().close();
   }
 
   @Override
   public void invalidateAll() {
-    clientManager.clear(endPoint);
-  }
-
-  public int getTimeout() throws SocketException {
-    return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
+    clientManager.clear(endpoint);
   }
 
   @Override
   public String toString() {
-    return String.format("SyncDataNodeInternalServiceClient{%s}", endPoint);
+    return String.format("SyncDataNodeInternalServiceClient{%s}", endpoint);
   }
 
   public static class Factory
-      extends BaseClientFactory<TEndPoint, SyncDataNodeInternalServiceClient> {
+      extends ThriftClientFactory<TEndPoint, DataNodeInternalServiceClient> {
 
     public Factory(
-        ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager,
-        ClientFactoryProperty clientFactoryProperty) {
-      super(clientManager, clientFactoryProperty);
+        ClientManager<TEndPoint, DataNodeInternalServiceClient> clientManager,
+        ThriftClientProperty thriftClientProperty) {
+      super(clientManager, thriftClientProperty);
     }
 
     @Override
     public void destroyObject(
-        TEndPoint endpoint, PooledObject<SyncDataNodeInternalServiceClient> pooledObject) {
+        TEndPoint endpoint, PooledObject<DataNodeInternalServiceClient> pooledObject) {
       pooledObject.getObject().invalidate();
     }
 
     @Override
-    public PooledObject<SyncDataNodeInternalServiceClient> makeObject(TEndPoint endpoint)
+    public PooledObject<DataNodeInternalServiceClient> makeObject(TEndPoint endpoint)
         throws Exception {
-      Constructor<SyncDataNodeInternalServiceClient> constructor =
-          SyncDataNodeInternalServiceClient.class.getConstructor(
-              TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass());
       return new DefaultPooledObject<>(
           SyncThriftClientWithErrorHandler.newErrorHandler(
-              SyncDataNodeInternalServiceClient.class,
-              constructor,
-              clientFactoryProperty.getProtocolFactory(),
-              clientFactoryProperty.getConnectionTimeoutMs(),
+              DataNodeInternalServiceClient.class,
+              DataNodeInternalServiceClient.class.getConstructor(
+                  TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass()),
+              thriftClientProperty.getProtocolFactory(),
+              thriftClientProperty.getConnectionTimeoutMs(),
               endpoint,
               clientManager));
     }
 
     @Override
     public boolean validateObject(
-        TEndPoint endpoint, PooledObject<SyncDataNodeInternalServiceClient> pooledObject) {
-      return pooledObject.getObject() != null
-          && pooledObject.getObject().getInputProtocol().getTransport().isOpen();
+        TEndPoint endpoint, PooledObject<DataNodeInternalServiceClient> pooledObject) {
+      return pooledObject.getObject().getInputProtocol().getTransport().isOpen();
     }
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeMPPDataExchangeServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/DataNodeMPPDataExchangeServiceClient.java
similarity index 59%
rename from node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeMPPDataExchangeServiceClient.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/client/sync/DataNodeMPPDataExchangeServiceClient.java
index 7bcbd05071..59642a019b 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeMPPDataExchangeServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/DataNodeMPPDataExchangeServiceClient.java
@@ -20,9 +20,10 @@
 package org.apache.iotdb.commons.client.sync;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.BaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.mpp.rpc.thrift.MPPDataExchangeService;
 import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.rpc.TConfigurationConst;
@@ -34,38 +35,35 @@ import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
-import java.lang.reflect.Constructor;
 import java.net.SocketException;
 
-public class SyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeService.Client
-    implements SyncThriftClient, AutoCloseable {
+public class DataNodeMPPDataExchangeServiceClient extends MPPDataExchangeService.Client
+    implements ThriftClient, AutoCloseable {
 
-  private final TEndPoint endPoint;
-  private final ClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> clientManager;
+  private final TEndPoint endpoint;
+  private final ClientManager<TEndPoint, DataNodeMPPDataExchangeServiceClient> clientManager;
 
-  public SyncDataNodeMPPDataExchangeServiceClient(
+  public DataNodeMPPDataExchangeServiceClient(
       TProtocolFactory protocolFactory,
       int connectionTimeout,
-      TEndPoint endPoint,
-      ClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> clientManager)
+      TEndPoint endpoint,
+      ClientManager<TEndPoint, DataNodeMPPDataExchangeServiceClient> clientManager)
       throws TTransportException {
     super(
         protocolFactory.getProtocol(
             RpcTransportFactory.INSTANCE.getTransport(
                 new TSocket(
                     TConfigurationConst.defaultTConfiguration,
-                    endPoint.getIp(),
-                    endPoint.getPort(),
+                    endpoint.getIp(),
+                    endpoint.getPort(),
                     connectionTimeout))));
-    this.endPoint = endPoint;
+    this.endpoint = endpoint;
     this.clientManager = clientManager;
     getInputProtocol().getTransport().open();
   }
 
-  public void close() {
-    if (clientManager != null) {
-      clientManager.returnClient(endPoint, this);
-    }
+  public int getTimeout() throws SocketException {
+    return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
   }
 
   public void setTimeout(int timeout) {
@@ -73,60 +71,59 @@ public class SyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSer
     ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
   }
 
+  @Override
+  public void close() {
+    clientManager.returnClient(endpoint, this);
+  }
+
+  @Override
   public void invalidate() {
     getInputProtocol().getTransport().close();
   }
 
   @Override
   public void invalidateAll() {
-    clientManager.clear(endPoint);
-  }
-
-  public int getTimeout() throws SocketException {
-    return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
+    clientManager.clear(endpoint);
   }
 
   @Override
   public String toString() {
-    return String.format("SyncDataNodeMPPDataExchangeServiceClient{%s}", endPoint);
+    return String.format("SyncDataNodeMPPDataExchangeServiceClient{%s}", endpoint);
   }
 
   public static class Factory
-      extends BaseClientFactory<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> {
+      extends ThriftClientFactory<TEndPoint, DataNodeMPPDataExchangeServiceClient> {
 
     public Factory(
-        ClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> clientManager,
-        ClientFactoryProperty clientFactoryProperty) {
-      super(clientManager, clientFactoryProperty);
+        ClientManager<TEndPoint, DataNodeMPPDataExchangeServiceClient> clientManager,
+        ThriftClientProperty thriftClientProperty) {
+      super(clientManager, thriftClientProperty);
     }
 
     @Override
     public void destroyObject(
-        TEndPoint endpoint, PooledObject<SyncDataNodeMPPDataExchangeServiceClient> pooledObject) {
+        TEndPoint endpoint, PooledObject<DataNodeMPPDataExchangeServiceClient> pooledObject) {
       pooledObject.getObject().invalidate();
     }
 
     @Override
-    public PooledObject<SyncDataNodeMPPDataExchangeServiceClient> makeObject(TEndPoint endpoint)
+    public PooledObject<DataNodeMPPDataExchangeServiceClient> makeObject(TEndPoint endpoint)
         throws Exception {
-      Constructor<SyncDataNodeMPPDataExchangeServiceClient> constructor =
-          SyncDataNodeMPPDataExchangeServiceClient.class.getConstructor(
-              TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass());
       return new DefaultPooledObject<>(
           SyncThriftClientWithErrorHandler.newErrorHandler(
-              SyncDataNodeMPPDataExchangeServiceClient.class,
-              constructor,
-              clientFactoryProperty.getProtocolFactory(),
-              clientFactoryProperty.getConnectionTimeoutMs(),
+              DataNodeMPPDataExchangeServiceClient.class,
+              DataNodeMPPDataExchangeServiceClient.class.getConstructor(
+                  TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass()),
+              thriftClientProperty.getProtocolFactory(),
+              thriftClientProperty.getConnectionTimeoutMs(),
               endpoint,
               clientManager));
     }
 
     @Override
     public boolean validateObject(
-        TEndPoint endpoint, PooledObject<SyncDataNodeMPPDataExchangeServiceClient> pooledObject) {
-      return pooledObject.getObject() != null
-          && pooledObject.getObject().getInputProtocol().getTransport().isOpen();
+        TEndPoint endpoint, PooledObject<DataNodeMPPDataExchangeServiceClient> pooledObject) {
+      return pooledObject.getObject().getInputProtocol().getTransport().isOpen();
     }
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java
index d864984bc9..39eb6f713a 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java
@@ -18,26 +18,24 @@
  */
 package org.apache.iotdb.commons.client.sync;
 
+import org.apache.iotdb.commons.client.ThriftClient;
+
 import net.sf.cglib.proxy.Enhancer;
 import net.sf.cglib.proxy.MethodInterceptor;
 import net.sf.cglib.proxy.MethodProxy;
-import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.net.SocketException;
 
 public class SyncThriftClientWithErrorHandler implements MethodInterceptor {
 
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(SyncThriftClientWithErrorHandler.class);
-
-  public static <V extends SyncThriftClient> V newErrorHandler(
+  /**
+   * Note: The caller needs to ensure that the constructor corresponds to the class, or the cast
+   * might fail
+   */
+  @SuppressWarnings("unchecked")
+  public static <V extends ThriftClient> V newErrorHandler(
       Class<V> targetClass, Constructor<V> constructor, Object... args) {
     Enhancer enhancer = new Enhancer();
     enhancer.setSuperclass(targetClass);
@@ -54,51 +52,9 @@ public class SyncThriftClientWithErrorHandler implements MethodInterceptor {
     try {
       return methodProxy.invokeSuper(o, objects);
     } catch (Throwable t) {
-      Throwable origin = t;
-      if (t instanceof InvocationTargetException) {
-        origin = ((InvocationTargetException) t).getTargetException();
-      }
-      Throwable cur = origin;
-      if (cur instanceof TException) {
-        int level = 0;
-        while (cur != null) {
-          LOGGER.debug(
-              "level-{} Exception class {}, message {}",
-              level,
-              cur.getClass().getName(),
-              cur.getMessage());
-          cur = cur.getCause();
-          level++;
-        }
-        ((SyncThriftClient) o).invalidate();
-      }
-
-      Throwable rootCause = ExceptionUtils.getRootCause(origin);
-      if (rootCause != null) {
-        // if the exception is SocketException and its error message is Broken pipe, it means that
-        // the remote node may restart and all the connection we cached before should be cleared.
-        LOGGER.debug(
-            "root cause message {}, LocalizedMessage {}, ",
-            rootCause.getMessage(),
-            rootCause.getLocalizedMessage(),
-            rootCause);
-        if (isConnectionBroken(rootCause)) {
-          LOGGER.debug(
-              "Broken pipe error happened in calling method {}, we need to clear all previous cached connection, err: {}",
-              method.getName(),
-              t);
-          ((SyncThriftClient) o).invalidate();
-          ((SyncThriftClient) o).invalidateAll();
-        }
-      }
+      ThriftClient.resolveException(t, (ThriftClient) o);
       throw new TException(
           "Error in calling method " + method.getName() + ", because: " + t.getMessage(), t);
     }
   }
-
-  private boolean isConnectionBroken(Throwable cause) {
-    return (cause instanceof SocketException && cause.getMessage().contains("Broken pipe"))
-        || (cause instanceof TTransportException
-            && cause.getMessage().contains("Socket is closed by peer"));
-  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 3b3b3359f1..b930ea22e3 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -99,13 +99,14 @@ public class CommonConfig {
    * ClientManager will have so many selector threads (TAsyncClientManager) to distribute to its
    * clients.
    */
-  private int selectorNumOfClientManager =
-      Runtime.getRuntime().availableProcessors() / 4 > 0
-          ? Runtime.getRuntime().availableProcessors() / 4
-          : 1;
+  private int selectorNumOfClientManager = 1;
 
   /** whether to use thrift compression. */
-  private boolean isCnRpcThriftCompressionEnabled = false;
+  private boolean isRpcThriftCompressionEnabled = false;
+
+  private int maxTotalClientForEachNode = 300;
+
+  private int maxIdleClientForEachNode = 200;
 
   /** What will the system do when unrecoverable error occurs. */
   private HandleSystemErrorStrategy handleSystemErrorStrategy =
@@ -246,28 +247,44 @@ public class CommonConfig {
     this.defaultTTLInMs = defaultTTLInMs;
   }
 
-  public int getCnConnectionTimeoutInMS() {
+  public int getConnectionTimeoutInMS() {
     return connectionTimeoutInMS;
   }
 
-  public void setCnConnectionTimeoutInMS(int connectionTimeoutInMS) {
+  public void setConnectionTimeoutInMS(int connectionTimeoutInMS) {
     this.connectionTimeoutInMS = connectionTimeoutInMS;
   }
 
-  public int getCnSelectorNumOfClientManager() {
+  public int getSelectorNumOfClientManager() {
     return selectorNumOfClientManager;
   }
 
-  public void setCnSelectorNumOfClientManager(int selectorNumOfClientManager) {
+  public void setSelectorNumOfClientManager(int selectorNumOfClientManager) {
     this.selectorNumOfClientManager = selectorNumOfClientManager;
   }
 
-  public boolean isCnRpcThriftCompressionEnabled() {
-    return isCnRpcThriftCompressionEnabled;
+  public boolean isRpcThriftCompressionEnabled() {
+    return isRpcThriftCompressionEnabled;
+  }
+
+  public void setRpcThriftCompressionEnabled(boolean rpcThriftCompressionEnabled) {
+    isRpcThriftCompressionEnabled = rpcThriftCompressionEnabled;
+  }
+
+  public int getMaxTotalClientForEachNode() {
+    return maxTotalClientForEachNode;
+  }
+
+  public void setMaxTotalClientForEachNode(int maxTotalClientForEachNode) {
+    this.maxTotalClientForEachNode = maxTotalClientForEachNode;
+  }
+
+  public int getMaxIdleClientForEachNode() {
+    return maxIdleClientForEachNode;
   }
 
-  public void setCnRpcThriftCompressionEnabled(boolean cnRpcThriftCompressionEnabled) {
-    isCnRpcThriftCompressionEnabled = cnRpcThriftCompressionEnabled;
+  public void setMaxIdleClientForEachNode(int maxIdleClientForEachNode) {
+    this.maxIdleClientForEachNode = maxIdleClientForEachNode;
   }
 
   HandleSystemErrorStrategy getHandleSystemErrorStrategy() {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 6ef3e0cc01..f53521f77b 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -93,27 +93,66 @@ public class CommonDescriptor {
             .trim()
             .split(","));
 
-    config.setCnRpcThriftCompressionEnabled(
+    config.setRpcThriftCompressionEnabled(
         Boolean.parseBoolean(
             properties
                 .getProperty(
                     "cn_rpc_thrift_compression_enable",
-                    String.valueOf(config.isCnRpcThriftCompressionEnabled()))
+                    String.valueOf(config.isRpcThriftCompressionEnabled()))
                 .trim()));
 
-    config.setCnConnectionTimeoutInMS(
+    config.setConnectionTimeoutInMS(
         Integer.parseInt(
             properties
                 .getProperty(
-                    "cn_connection_timeout_ms", String.valueOf(config.getCnConnectionTimeoutInMS()))
+                    "cn_connection_timeout_ms", String.valueOf(config.getConnectionTimeoutInMS()))
                 .trim()));
 
-    config.setCnSelectorNumOfClientManager(
+    config.setSelectorNumOfClientManager(
         Integer.parseInt(
             properties
                 .getProperty(
                     "cn_selector_thread_nums_of_client_manager",
-                    String.valueOf(config.getCnSelectorNumOfClientManager()))
+                    String.valueOf(config.getSelectorNumOfClientManager()))
+                .trim()));
+
+    config.setConnectionTimeoutInMS(
+        Integer.parseInt(
+            properties
+                .getProperty(
+                    "dn_connection_timeout_ms", String.valueOf(config.getConnectionTimeoutInMS()))
+                .trim()));
+
+    config.setRpcThriftCompressionEnabled(
+        Boolean.parseBoolean(
+            properties
+                .getProperty(
+                    "dn_rpc_thrift_compression_enable",
+                    String.valueOf(config.isRpcThriftCompressionEnabled()))
+                .trim()));
+
+    config.setSelectorNumOfClientManager(
+        Integer.parseInt(
+            properties
+                .getProperty(
+                    "dn_selector_thread_nums_of_client_manager",
+                    String.valueOf(config.getSelectorNumOfClientManager()))
+                .trim()));
+
+    config.setMaxTotalClientForEachNode(
+        Integer.parseInt(
+            properties
+                .getProperty(
+                    "dn_max_connection_for_internal_service",
+                    String.valueOf(config.getMaxTotalClientForEachNode()))
+                .trim()));
+
+    config.setMaxIdleClientForEachNode(
+        Integer.parseInt(
+            properties
+                .getProperty(
+                    "dn_core_connection_for_internal_service",
+                    String.valueOf(config.getMaxIdleClientForEachNode()))
                 .trim()));
 
     config.setHandleSystemErrorStrategy(
diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java b/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
index 1375423486..5c47ace7a6 100644
--- a/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
+++ b/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
@@ -23,7 +23,10 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.mock.MockInternalRPCService;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.property.ClientPoolProperty;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.client.sync.DataNodeInternalServiceClient;
+import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
 
@@ -61,28 +64,30 @@ public class ClientManagerTest {
   /**
    * We put all tests together to avoid frequent restarts of thrift Servers, which can cause "bind
    * address already used" problems in macOS CI environments. The reason for this may be about this
-   * [blog](https://stackoverflow.com/questions/51998042/macos-so-reuseaddr-so-reuseport-not-consistent-with-linux)
+   * <a
+   * href="https://stackoverflow.com/questions/51998042/macos-so-reuseaddr-so-reuseport-not-consistent-with-linux">blog</a>
    */
   @Test
   public void allTest() throws Exception {
-    normalSyncClientManagersTest();
-    normalAsyncClientManagersTest();
-    MaxIdleClientManagersTest();
-    MaxTotalClientManagersTest();
-    MaxWaitClientTimeoutClientManagersTest();
-    InvalidSyncClientReturnClientManagersTest();
-    InvalidAsyncClientReturnClientManagersTest();
+    normalSyncTest();
+    normalAsyncTest();
+    MaxIdleTest();
+    MaxTotalTest();
+    MaxWaitClientTimeoutTest();
+    InvalidSyncClientReturnTest();
+    InvalidAsyncClientReturnTest();
+    BorrowNullTest();
   }
 
-  public void normalSyncClientManagersTest() throws Exception {
+  public void normalSyncTest() throws Exception {
     // init syncClientManager
-    ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> syncClusterManager =
-        (ClientManager<TEndPoint, SyncDataNodeInternalServiceClient>)
-            new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
+    ClientManager<TEndPoint, DataNodeInternalServiceClient> syncClusterManager =
+        (ClientManager<TEndPoint, DataNodeInternalServiceClient>)
+            new IClientManager.Factory<TEndPoint, DataNodeInternalServiceClient>()
                 .createClientManager(new TestSyncDataNodeInternalServiceClientPoolFactory());
 
     // get one sync client
-    SyncDataNodeInternalServiceClient syncClient1 = syncClusterManager.borrowClient(endPoint);
+    DataNodeInternalServiceClient syncClient1 = syncClusterManager.borrowClient(endPoint);
     Assert.assertNotNull(syncClient1);
     Assert.assertEquals(syncClient1.getTEndpoint(), endPoint);
     Assert.assertEquals(syncClient1.getClientManager(), syncClusterManager);
@@ -91,7 +96,7 @@ public class ClientManagerTest {
     Assert.assertEquals(0, syncClusterManager.getPool().getNumIdle(endPoint));
 
     // get another sync client
-    SyncDataNodeInternalServiceClient syncClient2 = syncClusterManager.borrowClient(endPoint);
+    DataNodeInternalServiceClient syncClient2 = syncClusterManager.borrowClient(endPoint);
     Assert.assertNotNull(syncClient2);
     Assert.assertEquals(syncClient2.getTEndpoint(), endPoint);
     Assert.assertEquals(syncClient2.getClientManager(), syncClusterManager);
@@ -117,7 +122,7 @@ public class ClientManagerTest {
     Assert.assertFalse(syncClient2.getInputProtocol().getTransport().isOpen());
   }
 
-  public void normalAsyncClientManagersTest() throws Exception {
+  public void normalAsyncTest() throws Exception {
     // init asyncClientManager
     ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> asyncClusterManager =
         (ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>)
@@ -158,23 +163,23 @@ public class ClientManagerTest {
     Assert.assertEquals(0, asyncClusterManager.getPool().getNumIdle(endPoint));
   }
 
-  public void MaxIdleClientManagersTest() throws Exception {
+  public void MaxIdleTest() throws Exception {
     int maxIdleClientForEachNode = 1;
 
     // init syncClientManager and set maxIdleClientForEachNode to 1
-    ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> syncClusterManager =
-        (ClientManager<TEndPoint, SyncDataNodeInternalServiceClient>)
-            new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
+    ClientManager<TEndPoint, DataNodeInternalServiceClient> syncClusterManager =
+        (ClientManager<TEndPoint, DataNodeInternalServiceClient>)
+            new IClientManager.Factory<TEndPoint, DataNodeInternalServiceClient>()
                 .createClientManager(
                     new TestSyncDataNodeInternalServiceClientPoolFactory() {
                       @Override
-                      public KeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient>
+                      public KeyedObjectPool<TEndPoint, DataNodeInternalServiceClient>
                           createClientPool(
-                              ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) {
+                              ClientManager<TEndPoint, DataNodeInternalServiceClient> manager) {
                         return new GenericKeyedObjectPool<>(
-                            new SyncDataNodeInternalServiceClient.Factory(
-                                manager, new ClientFactoryProperty.Builder().build()),
-                            new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
+                            new DataNodeInternalServiceClient.Factory(
+                                manager, new ThriftClientProperty.Builder().build()),
+                            new ClientPoolProperty.Builder<DataNodeInternalServiceClient>()
                                 .setMaxIdleClientForEachNode(maxIdleClientForEachNode)
                                 .build()
                                 .getConfig());
@@ -182,7 +187,7 @@ public class ClientManagerTest {
                     });
 
     // get one sync client
-    SyncDataNodeInternalServiceClient syncClient1 = syncClusterManager.borrowClient(endPoint);
+    DataNodeInternalServiceClient syncClient1 = syncClusterManager.borrowClient(endPoint);
     Assert.assertNotNull(syncClient1);
     Assert.assertEquals(syncClient1.getTEndpoint(), endPoint);
     Assert.assertEquals(syncClient1.getClientManager(), syncClusterManager);
@@ -191,7 +196,7 @@ public class ClientManagerTest {
     Assert.assertEquals(0, syncClusterManager.getPool().getNumIdle(endPoint));
 
     // get another sync client
-    SyncDataNodeInternalServiceClient syncClient2 = syncClusterManager.borrowClient(endPoint);
+    DataNodeInternalServiceClient syncClient2 = syncClusterManager.borrowClient(endPoint);
     Assert.assertNotNull(syncClient2);
     Assert.assertEquals(syncClient2.getTEndpoint(), endPoint);
     Assert.assertEquals(syncClient2.getClientManager(), syncClusterManager);
@@ -217,24 +222,24 @@ public class ClientManagerTest {
     Assert.assertFalse(syncClient1.getInputProtocol().getTransport().isOpen());
   }
 
-  public void MaxTotalClientManagersTest() throws Exception {
+  public void MaxTotalTest() throws Exception {
     int maxTotalClientForEachNode = 1;
     long waitClientTimeoutMs = TimeUnit.SECONDS.toMillis(1);
 
     // init syncClientManager and set maxTotalClientForEachNode to 1
-    ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> syncClusterManager =
-        (ClientManager<TEndPoint, SyncDataNodeInternalServiceClient>)
-            new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
+    ClientManager<TEndPoint, DataNodeInternalServiceClient> syncClusterManager =
+        (ClientManager<TEndPoint, DataNodeInternalServiceClient>)
+            new IClientManager.Factory<TEndPoint, DataNodeInternalServiceClient>()
                 .createClientManager(
                     new TestSyncDataNodeInternalServiceClientPoolFactory() {
                       @Override
-                      public KeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient>
+                      public KeyedObjectPool<TEndPoint, DataNodeInternalServiceClient>
                           createClientPool(
-                              ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) {
+                              ClientManager<TEndPoint, DataNodeInternalServiceClient> manager) {
                         return new GenericKeyedObjectPool<>(
-                            new SyncDataNodeInternalServiceClient.Factory(
-                                manager, new ClientFactoryProperty.Builder().build()),
-                            new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
+                            new DataNodeInternalServiceClient.Factory(
+                                manager, new ThriftClientProperty.Builder().build()),
+                            new ClientPoolProperty.Builder<DataNodeInternalServiceClient>()
                                 .setMaxTotalClientForEachNode(maxTotalClientForEachNode)
                                 .setWaitClientTimeoutMS(waitClientTimeoutMs)
                                 .build()
@@ -243,7 +248,7 @@ public class ClientManagerTest {
                     });
 
     // get one sync client
-    SyncDataNodeInternalServiceClient syncClient1 = syncClusterManager.borrowClient(endPoint);
+    DataNodeInternalServiceClient syncClient1 = syncClusterManager.borrowClient(endPoint);
     Assert.assertNotNull(syncClient1);
     Assert.assertEquals(syncClient1.getTEndpoint(), endPoint);
     Assert.assertEquals(syncClient1.getClientManager(), syncClusterManager);
@@ -252,11 +257,12 @@ public class ClientManagerTest {
     Assert.assertEquals(0, syncClusterManager.getPool().getNumIdle(endPoint));
 
     // get another sync client, should wait waitClientTimeoutMS ms, throw error
-    SyncDataNodeInternalServiceClient syncClient2 = null;
+    DataNodeInternalServiceClient syncClient2 = null;
     long start = 0;
     try {
       start = System.nanoTime();
       syncClient2 = syncClusterManager.borrowClient(endPoint);
+      Assert.fail();
     } catch (ClientManagerException e) {
       long end = System.nanoTime();
       Assert.assertTrue(end - start >= waitClientTimeoutMs * 1_000_000);
@@ -289,25 +295,25 @@ public class ClientManagerTest {
     Assert.assertFalse(syncClient2.getInputProtocol().getTransport().isOpen());
   }
 
-  public void MaxWaitClientTimeoutClientManagersTest() throws Exception {
+  public void MaxWaitClientTimeoutTest() throws Exception {
     long waitClientTimeoutMS = TimeUnit.SECONDS.toMillis(2);
     int maxTotalClientForEachNode = 1;
 
     // init syncClientManager and set maxTotalClientForEachNode to 1, set waitClientTimeoutMS to
     // DefaultProperty.WAIT_CLIENT_TIMEOUT_MS * 2
-    ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> syncClusterManager =
-        (ClientManager<TEndPoint, SyncDataNodeInternalServiceClient>)
-            new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
+    ClientManager<TEndPoint, DataNodeInternalServiceClient> syncClusterManager =
+        (ClientManager<TEndPoint, DataNodeInternalServiceClient>)
+            new IClientManager.Factory<TEndPoint, DataNodeInternalServiceClient>()
                 .createClientManager(
                     new TestSyncDataNodeInternalServiceClientPoolFactory() {
                       @Override
-                      public KeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient>
+                      public KeyedObjectPool<TEndPoint, DataNodeInternalServiceClient>
                           createClientPool(
-                              ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) {
+                              ClientManager<TEndPoint, DataNodeInternalServiceClient> manager) {
                         return new GenericKeyedObjectPool<>(
-                            new SyncDataNodeInternalServiceClient.Factory(
-                                manager, new ClientFactoryProperty.Builder().build()),
-                            new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
+                            new DataNodeInternalServiceClient.Factory(
+                                manager, new ThriftClientProperty.Builder().build()),
+                            new ClientPoolProperty.Builder<DataNodeInternalServiceClient>()
                                 .setWaitClientTimeoutMS(waitClientTimeoutMS)
                                 .setMaxTotalClientForEachNode(maxTotalClientForEachNode)
                                 .build()
@@ -316,7 +322,7 @@ public class ClientManagerTest {
                     });
 
     // get one sync client
-    SyncDataNodeInternalServiceClient syncClient1 = syncClusterManager.borrowClient(endPoint);
+    DataNodeInternalServiceClient syncClient1 = syncClusterManager.borrowClient(endPoint);
     Assert.assertNotNull(syncClient1);
     Assert.assertEquals(syncClient1.getTEndpoint(), endPoint);
     Assert.assertEquals(syncClient1.getClientManager(), syncClusterManager);
@@ -329,6 +335,7 @@ public class ClientManagerTest {
     try {
       start = System.nanoTime();
       syncClient1 = syncClusterManager.borrowClient(endPoint);
+      Assert.fail();
     } catch (ClientManagerException e) {
       long end = System.nanoTime();
       Assert.assertTrue(end - start >= waitClientTimeoutMS * 1_000_000);
@@ -348,15 +355,15 @@ public class ClientManagerTest {
     Assert.assertFalse(syncClient1.getInputProtocol().getTransport().isOpen());
   }
 
-  public void InvalidSyncClientReturnClientManagersTest() throws Exception {
+  public void InvalidSyncClientReturnTest() throws Exception {
     // init syncClientManager
-    ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> syncClusterManager =
-        (ClientManager<TEndPoint, SyncDataNodeInternalServiceClient>)
-            new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
+    ClientManager<TEndPoint, DataNodeInternalServiceClient> syncClusterManager =
+        (ClientManager<TEndPoint, DataNodeInternalServiceClient>)
+            new IClientManager.Factory<TEndPoint, DataNodeInternalServiceClient>()
                 .createClientManager(new TestSyncDataNodeInternalServiceClientPoolFactory());
 
     // get one sync client
-    SyncDataNodeInternalServiceClient syncClient1 = syncClusterManager.borrowClient(endPoint);
+    DataNodeInternalServiceClient syncClient1 = syncClusterManager.borrowClient(endPoint);
     Assert.assertNotNull(syncClient1);
     Assert.assertEquals(syncClient1.getTEndpoint(), endPoint);
     Assert.assertEquals(syncClient1.getClientManager(), syncClusterManager);
@@ -365,7 +372,7 @@ public class ClientManagerTest {
     Assert.assertEquals(0, syncClusterManager.getPool().getNumIdle(endPoint));
 
     // get another sync client
-    SyncDataNodeInternalServiceClient syncClient2 = syncClusterManager.borrowClient(endPoint);
+    DataNodeInternalServiceClient syncClient2 = syncClusterManager.borrowClient(endPoint);
     Assert.assertNotNull(syncClient2);
     Assert.assertEquals(syncClient2.getTEndpoint(), endPoint);
     Assert.assertEquals(syncClient2.getClientManager(), syncClusterManager);
@@ -391,7 +398,7 @@ public class ClientManagerTest {
     Assert.assertFalse(syncClient2.getInputProtocol().getTransport().isOpen());
   }
 
-  public void InvalidAsyncClientReturnClientManagersTest() throws Exception {
+  public void InvalidAsyncClientReturnTest() throws Exception {
     // init asyncClientManager
     ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> asyncClusterManager =
         (ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>)
@@ -432,28 +439,50 @@ public class ClientManagerTest {
     Assert.assertEquals(0, asyncClusterManager.getPool().getNumIdle(endPoint));
   }
 
+  public void BorrowNullTest() {
+    // init asyncClientManager
+    ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> asyncClusterManager =
+        (ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>)
+            new IClientManager.Factory<TEndPoint, AsyncDataNodeInternalServiceClient>()
+                .createClientManager(new TestAsyncDataNodeInternalServiceClientPoolFactory());
+
+    try {
+      asyncClusterManager.borrowClient(null);
+      Assert.fail();
+    } catch (ClientManagerException e) {
+      Assert.assertTrue(e.getMessage().contains("Can not borrow client for node null"));
+    }
+
+    // close asyncClientManager, asyncClientManager should destroy all client
+    asyncClusterManager.close();
+    Assert.assertEquals(0, asyncClusterManager.getPool().getNumActive(endPoint));
+    Assert.assertEquals(0, asyncClusterManager.getPool().getNumIdle(endPoint));
+  }
+
   public static class TestSyncDataNodeInternalServiceClientPoolFactory
-      implements IClientPoolFactory<TEndPoint, SyncDataNodeInternalServiceClient> {
+      implements IClientPoolFactory<TEndPoint, DataNodeInternalServiceClient> {
+
     @Override
-    public KeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient> createClientPool(
-        ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) {
+    public KeyedObjectPool<TEndPoint, DataNodeInternalServiceClient> createClientPool(
+        ClientManager<TEndPoint, DataNodeInternalServiceClient> manager) {
       return new GenericKeyedObjectPool<>(
-          new SyncDataNodeInternalServiceClient.Factory(
-              manager, new ClientFactoryProperty.Builder().build()),
-          new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>().build().getConfig());
+          new DataNodeInternalServiceClient.Factory(
+              manager, new ThriftClientProperty.Builder().build()),
+          new ClientPoolProperty.Builder<DataNodeInternalServiceClient>().build().getConfig());
     }
   }
 
   public static class TestAsyncDataNodeInternalServiceClientPoolFactory
       implements IClientPoolFactory<TEndPoint, AsyncDataNodeInternalServiceClient> {
+
     @Override
     public KeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> createClientPool(
         ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> manager) {
       return new GenericKeyedObjectPool<>(
           new AsyncDataNodeInternalServiceClient.Factory(
               manager,
-              new ClientFactoryProperty.Builder().build(),
-              "AsyncDataNodeInternalServiceClientPool"),
+              new ThriftClientProperty.Builder().build(),
+              ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
           new ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>().build().getConfig());
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java b/server/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java
index 56acc74894..8608138613 100644
--- a/server/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java
@@ -38,8 +38,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
 import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
 import org.apache.iotdb.db.mpp.plan.statement.AuthorType;
 import org.apache.iotdb.db.mpp.plan.statement.sys.AuthorStatement;
@@ -66,9 +66,7 @@ public class ClusterAuthorityFetcher implements IAuthorityFetcher {
   private IAuthorizer authorizer;
 
   private static final IClientManager<ConfigNodeRegionId, ConfigNodeClient>
-      CONFIG_NODE_CLIENT_MANAGER =
-          new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
-              .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+      CONFIG_NODE_CLIENT_MANAGER = ConfigNodeClientManager.getInstance();
 
   public ClusterAuthorityFetcher(IAuthorCache iAuthorCache) {
     this.iAuthorCache = iAuthorCache;
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 7c5d0fef2c..bbbf672d3f 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -24,11 +24,11 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
-import org.apache.iotdb.commons.client.BaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
-import org.apache.iotdb.commons.client.ClientPoolProperty;
-import org.apache.iotdb.commons.client.sync.SyncThriftClient;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ClientPoolProperty;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.client.sync.SyncThriftClientWithErrorHandler;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.ConfigNodeRegionId;
@@ -127,12 +127,11 @@ import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.Constructor;
 import java.util.ArrayList;
 import java.util.List;
 
-public class ConfigNodeClient
-    implements IConfigNodeRPCService.Iface, SyncThriftClient, AutoCloseable {
+public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClient, AutoCloseable {
+
   private static final Logger logger = LoggerFactory.getLogger(ConfigNodeClient.class);
 
   private static final int RETRY_NUM = 5;
@@ -168,7 +167,7 @@ public class ConfigNodeClient
     // Read config nodes from configuration
     configNodes = ConfigNodeInfo.getInstance().getLatestConfigNodes();
     protocolFactory =
-        CommonDescriptor.getInstance().getConfig().isCnRpcThriftCompressionEnabled()
+        CommonDescriptor.getInstance().getConfig().isRpcThriftCompressionEnabled()
             ? new TCompactProtocol.Factory()
             : new TBinaryProtocol.Factory();
 
@@ -1860,12 +1859,12 @@ public class ConfigNodeClient
     throw new TException(MSG_RECONNECTION_FAIL);
   }
 
-  public static class Factory extends BaseClientFactory<ConfigNodeRegionId, ConfigNodeClient> {
+  public static class Factory extends ThriftClientFactory<ConfigNodeRegionId, ConfigNodeClient> {
 
     public Factory(
         ClientManager<ConfigNodeRegionId, ConfigNodeClient> clientManager,
-        ClientFactoryProperty clientFactoryProperty) {
-      super(clientManager, clientFactoryProperty);
+        ThriftClientProperty thriftClientProperty) {
+      super(clientManager, thriftClientProperty);
     }
 
     @Override
@@ -1877,15 +1876,13 @@ public class ConfigNodeClient
     @Override
     public PooledObject<ConfigNodeClient> makeObject(ConfigNodeRegionId configNodeRegionId)
         throws Exception {
-      Constructor<ConfigNodeClient> constructor =
-          ConfigNodeClient.class.getConstructor(
-              TProtocolFactory.class, long.class, clientManager.getClass());
       return new DefaultPooledObject<>(
           SyncThriftClientWithErrorHandler.newErrorHandler(
               ConfigNodeClient.class,
-              constructor,
-              clientFactoryProperty.getProtocolFactory(),
-              clientFactoryProperty.getConnectionTimeoutMs(),
+              ConfigNodeClient.class.getConstructor(
+                  TProtocolFactory.class, long.class, clientManager.getClass()),
+              thriftClientProperty.getProtocolFactory(),
+              thriftClientProperty.getConnectionTimeoutMs(),
               clientManager));
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/client/DataNodeClientPoolFactory.java b/server/src/main/java/org/apache/iotdb/db/client/DataNodeClientPoolFactory.java
index b5fb12c209..b1451c9337 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/DataNodeClientPoolFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/DataNodeClientPoolFactory.java
@@ -19,17 +19,10 @@
 
 package org.apache.iotdb.db.client;
 
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
-import org.apache.iotdb.commons.client.ClientPoolProperty;
 import org.apache.iotdb.commons.client.IClientPoolFactory;
-import org.apache.iotdb.commons.client.async.AsyncConfigNodeIServiceClient;
-import org.apache.iotdb.commons.client.async.AsyncDataNodeMPPDataExchangeServiceClient;
-import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
-import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.client.property.ClientPoolProperty;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.consensus.ConfigNodeRegionId;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -43,117 +36,18 @@ public class DataNodeClientPoolFactory {
 
   private DataNodeClientPoolFactory() {}
 
-  public static class SyncConfigNodeIServiceClientPoolFactory
-      implements IClientPoolFactory<TEndPoint, SyncConfigNodeIServiceClient> {
-    @Override
-    public KeyedObjectPool<TEndPoint, SyncConfigNodeIServiceClient> createClientPool(
-        ClientManager<TEndPoint, SyncConfigNodeIServiceClient> manager) {
-      return new GenericKeyedObjectPool<>(
-          new SyncConfigNodeIServiceClient.Factory(
-              manager,
-              new ClientFactoryProperty.Builder()
-                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
-                  .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
-                  .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
-                  .build()),
-          new ClientPoolProperty.Builder<SyncConfigNodeIServiceClient>().build().getConfig());
-    }
-  }
-
-  public static class AsyncConfigNodeIServiceClientPoolFactory
-      implements IClientPoolFactory<TEndPoint, AsyncConfigNodeIServiceClient> {
-    @Override
-    public KeyedObjectPool<TEndPoint, AsyncConfigNodeIServiceClient> createClientPool(
-        ClientManager<TEndPoint, AsyncConfigNodeIServiceClient> manager) {
-      return new GenericKeyedObjectPool<>(
-          new AsyncConfigNodeIServiceClient.Factory(
-              manager,
-              new ClientFactoryProperty.Builder()
-                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
-                  .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
-                  .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
-                  .build(),
-              ThreadName.ASYNC_CONFIGNODE_CLIENT_POOL.getName()),
-          new ClientPoolProperty.Builder<AsyncConfigNodeIServiceClient>()
-              .setMaxIdleClientForEachNode(conf.getCoreConnectionForInternalService())
-              .setMaxTotalClientForEachNode(conf.getMaxConnectionForInternalService())
-              .build()
-              .getConfig());
-    }
-  }
-
-  public static class SyncDataNodeInternalServiceClientPoolFactory
-      implements IClientPoolFactory<TEndPoint, SyncDataNodeInternalServiceClient> {
-    @Override
-    public KeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient> createClientPool(
-        ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) {
-      return new GenericKeyedObjectPool<>(
-          new SyncDataNodeInternalServiceClient.Factory(
-              manager,
-              new ClientFactoryProperty.Builder()
-                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
-                  .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
-                  .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
-                  .build()),
-          new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
-              .setMaxIdleClientForEachNode(conf.getCoreConnectionForInternalService())
-              .setMaxTotalClientForEachNode(conf.getMaxConnectionForInternalService())
-              .build()
-              .getConfig());
-    }
-  }
-
-  public static class SyncDataNodeMPPDataExchangeServiceClientPoolFactory
-      implements IClientPoolFactory<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> {
-    @Override
-    public KeyedObjectPool<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> createClientPool(
-        ClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> manager) {
-      return new GenericKeyedObjectPool<>(
-          new SyncDataNodeMPPDataExchangeServiceClient.Factory(
-              manager,
-              new ClientFactoryProperty.Builder()
-                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
-                  .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
-                  .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
-                  .build()),
-          new ClientPoolProperty.Builder<SyncDataNodeMPPDataExchangeServiceClient>()
-              .build()
-              .getConfig());
-    }
-  }
-
-  public static class AsyncDataNodeMPPDataExchangeServiceClientPoolFactory
-      implements IClientPoolFactory<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> {
-    @Override
-    public KeyedObjectPool<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> createClientPool(
-        ClientManager<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> manager) {
-      return new GenericKeyedObjectPool<>(
-          new AsyncDataNodeMPPDataExchangeServiceClient.Factory(
-              manager,
-              new ClientFactoryProperty.Builder()
-                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
-                  .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
-                  .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
-                  .build(),
-              ThreadName.ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL.getName()),
-          new ClientPoolProperty.Builder<AsyncDataNodeMPPDataExchangeServiceClient>()
-              .build()
-              .getConfig());
-    }
-  }
-
   public static class ConfigNodeClientPoolFactory
       implements IClientPoolFactory<ConfigNodeRegionId, ConfigNodeClient> {
+
     @Override
     public KeyedObjectPool<ConfigNodeRegionId, ConfigNodeClient> createClientPool(
         ClientManager<ConfigNodeRegionId, ConfigNodeClient> manager) {
       return new GenericKeyedObjectPool<>(
           new ConfigNodeClient.Factory(
               manager,
-              new ClientFactoryProperty.Builder()
+              new ThriftClientProperty.Builder()
                   .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
                   .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
-                  .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
                   .build()),
           new ClientPoolProperty.Builder<ConfigNodeClient>()
               .setMaxIdleClientForEachNode(conf.getCoreConnectionForInternalService())
@@ -165,13 +59,14 @@ public class DataNodeClientPoolFactory {
 
   public static class ClusterDeletionConfigNodeClientPoolFactory
       implements IClientPoolFactory<ConfigNodeRegionId, ConfigNodeClient> {
+
     @Override
     public KeyedObjectPool<ConfigNodeRegionId, ConfigNodeClient> createClientPool(
         ClientManager<ConfigNodeRegionId, ConfigNodeClient> manager) {
       return new GenericKeyedObjectPool<>(
           new ConfigNodeClient.Factory(
               manager,
-              new ClientFactoryProperty.Builder()
+              new ThriftClientProperty.Builder()
                   .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS() * 10)
                   .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
                   .setSelectorNumOfAsyncClientManager(
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java
index 0e21320667..9c7012abc8 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java
@@ -32,8 +32,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq;
 import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.CreateSchemaTemplateStatement;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -79,9 +79,7 @@ public class ClusterTemplateManager implements ITemplateManager {
   }
 
   private static final IClientManager<ConfigNodeRegionId, ConfigNodeClient>
-      CONFIG_NODE_CLIENT_MANAGER =
-          new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
-              .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+      CONFIG_NODE_CLIENT_MANAGER = ConfigNodeClientManager.getInstance();
 
   @Override
   public TSStatus createSchemaTemplate(CreateSchemaTemplateStatement statement) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
index 28146749a0..8e67daad52 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManager.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.mpp.execution.exchange;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
+import org.apache.iotdb.commons.client.sync.DataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.utils.SetThreadName;
@@ -309,7 +309,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
   private final LocalMemoryManager localMemoryManager;
   private final Supplier<TsBlockSerde> tsBlockSerdeFactory;
   private final ExecutorService executorService;
-  private final IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
+  private final IClientManager<TEndPoint, DataNodeMPPDataExchangeServiceClient>
       mppDataExchangeServiceClientManager;
   private final Map<TFragmentInstanceId, Map<String, ISourceHandle>> sourceHandles;
   private final Map<TFragmentInstanceId, ISinkHandle> sinkHandles;
@@ -320,7 +320,7 @@ public class MPPDataExchangeManager implements IMPPDataExchangeManager {
       LocalMemoryManager localMemoryManager,
       Supplier<TsBlockSerde> tsBlockSerdeFactory,
       ExecutorService executorService,
-      IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
+      IClientManager<TEndPoint, DataNodeMPPDataExchangeServiceClient>
           mppDataExchangeServiceClientManager) {
     this.localMemoryManager = Validate.notNull(localMemoryManager);
     this.tsBlockSerdeFactory = Validate.notNull(tsBlockSerdeFactory);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeService.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeService.java
index f1ee4b6438..21bc211ea5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeService.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeService.java
@@ -20,8 +20,9 @@
 package org.apache.iotdb.db.mpp.execution.exchange;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
+import org.apache.iotdb.commons.client.sync.DataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
@@ -30,7 +31,6 @@ import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.service.ThriftService;
 import org.apache.iotdb.commons.service.ThriftServiceThread;
 import org.apache.iotdb.commons.service.metric.MetricService;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
@@ -67,10 +67,9 @@ public class MPPDataExchangeService extends ThriftService implements MPPDataExch
             new LocalMemoryManager(),
             new TsBlockSerdeFactory(),
             executorService,
-            new IClientManager.Factory<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>()
+            new IClientManager.Factory<TEndPoint, DataNodeMPPDataExchangeServiceClient>()
                 .createClientManager(
-                    new DataNodeClientPoolFactory
-                        .SyncDataNodeMPPDataExchangeServiceClientPoolFactory()));
+                    new ClientPoolFactory.SyncDataNodeMPPDataExchangeServiceClientPoolFactory()));
     LOGGER.info("MPPDataExchangeManager init successfully");
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index 50a3f2e1e8..bb93bc7aaa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.mpp.execution.exchange;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
+import org.apache.iotdb.commons.client.sync.DataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
@@ -81,7 +81,7 @@ public class SinkHandle implements ISinkHandle {
   // size for current TsBlock to reserve and free
   private long currentTsBlockSize;
 
-  private final IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
+  private final IClientManager<TEndPoint, DataNodeMPPDataExchangeServiceClient>
       mppDataExchangeServiceClientManager;
 
   private volatile ListenableFuture<Void> blocked;
@@ -109,7 +109,7 @@ public class SinkHandle implements ISinkHandle {
       ExecutorService executorService,
       TsBlockSerde serde,
       SinkHandleListener sinkHandleListener,
-      IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
+      IClientManager<TEndPoint, DataNodeMPPDataExchangeServiceClient>
           mppDataExchangeServiceClientManager) {
     this.remoteEndpoint = Validate.notNull(remoteEndpoint);
     this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
@@ -398,7 +398,7 @@ public class SinkHandle implements ISinkHandle {
                 blockSizes);
         while (attempt < MAX_ATTEMPT_TIMES) {
           attempt += 1;
-          try (SyncDataNodeMPPDataExchangeServiceClient client =
+          try (DataNodeMPPDataExchangeServiceClient client =
               mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
             client.onNewDataBlockEvent(newDataBlockEvent);
             break;
@@ -438,7 +438,7 @@ public class SinkHandle implements ISinkHandle {
                 nextSequenceId - 1);
         while (attempt < MAX_ATTEMPT_TIMES) {
           attempt += 1;
-          try (SyncDataNodeMPPDataExchangeServiceClient client =
+          try (DataNodeMPPDataExchangeServiceClient client =
               mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
             client.onEndOfDataBlockEvent(endOfDataBlockEvent);
             break;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
index cda072ff0f..b1f1776a91 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.mpp.execution.exchange;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
+import org.apache.iotdb.commons.client.sync.DataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
@@ -73,7 +73,7 @@ public class SourceHandle implements ISourceHandle {
   private final String threadName;
   private long retryIntervalInMs;
 
-  private final IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
+  private final IClientManager<TEndPoint, DataNodeMPPDataExchangeServiceClient>
       mppDataExchangeServiceClientManager;
 
   private SettableFuture<Void> blocked = SettableFuture.create();
@@ -109,7 +109,7 @@ public class SourceHandle implements ISourceHandle {
       ExecutorService executorService,
       TsBlockSerde serde,
       SourceHandleListener sourceHandleListener,
-      IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
+      IClientManager<TEndPoint, DataNodeMPPDataExchangeServiceClient>
           mppDataExchangeServiceClientManager) {
     this.remoteEndpoint = Validate.notNull(remoteEndpoint);
     this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
@@ -439,7 +439,7 @@ public class SourceHandle implements ISourceHandle {
         int attempt = 0;
         while (attempt < MAX_ATTEMPT_TIMES) {
           attempt += 1;
-          try (SyncDataNodeMPPDataExchangeServiceClient client =
+          try (DataNodeMPPDataExchangeServiceClient client =
               mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
             TGetDataBlockResponse resp = client.getDataBlock(req);
             List<ByteBuffer> tsBlocks = new ArrayList<>(resp.getTsBlocks().size());
@@ -528,7 +528,7 @@ public class SourceHandle implements ISourceHandle {
                 remoteFragmentInstanceId, startSequenceId, endSequenceId);
         while (attempt < MAX_ATTEMPT_TIMES) {
           attempt += 1;
-          try (SyncDataNodeMPPDataExchangeServiceClient client =
+          try (DataNodeMPPDataExchangeServiceClient client =
               mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
             client.onAcknowledgeDataBlockEvent(acknowledgeDataBlockEvent);
             break;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index a9cacb3e1f..1a9157c083 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -19,11 +19,11 @@
 package org.apache.iotdb.db.mpp.plan;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.sync.DataNodeInternalServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.DataNodeEndPoints;
@@ -54,6 +54,7 @@ import java.util.concurrent.ScheduledExecutorService;
  * QueryExecution.
  */
 public class Coordinator {
+
   private static final Logger LOGGER = LoggerFactory.getLogger(Coordinator.class);
 
   private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator";
@@ -65,11 +66,11 @@ public class Coordinator {
   private static final Logger SLOW_SQL_LOGGER =
       LoggerFactory.getLogger(IoTDBConstant.SLOW_SQL_LOGGER_NAME);
 
-  private static final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+  private static final IClientManager<TEndPoint, DataNodeInternalServiceClient>
       INTERNAL_SERVICE_CLIENT_MANAGER =
-          new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
+          new IClientManager.Factory<TEndPoint, DataNodeInternalServiceClient>()
               .createClientManager(
-                  new DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+                  new ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
 
   private final ExecutorService executor;
   private final ExecutorService writeOperationExecutor;
@@ -213,6 +214,11 @@ public class Coordinator {
     }
   }
 
+  public IClientManager<TEndPoint, DataNodeInternalServiceClient>
+      getInternalServiceClientManager() {
+    return INTERNAL_SERVICE_CLIENT_MANAGER;
+  }
+
   public static Coordinator getInstance() {
     return INSTANCE;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java
index 872e1eb66d..16d016e0fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java
@@ -23,17 +23,17 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.sync.DataNodeInternalServiceClient;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.consensus.config.IoTConsensusConfig;
 import org.apache.iotdb.consensus.iot.client.IoTConsensusClientPool;
-import org.apache.iotdb.consensus.iot.client.SyncIoTConsensusServiceClient;
+import org.apache.iotdb.consensus.iot.client.IoTConsensusServiceClient;
 import org.apache.iotdb.consensus.iot.thrift.TInactivatePeerReq;
 import org.apache.iotdb.consensus.iot.thrift.TInactivatePeerRes;
 import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadReq;
 import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadRes;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
 
@@ -41,17 +41,17 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class TestRPCClient {
-  private static final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+  private static final IClientManager<TEndPoint, DataNodeInternalServiceClient>
       INTERNAL_SERVICE_CLIENT_MANAGER =
-          new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
+          new IClientManager.Factory<TEndPoint, DataNodeInternalServiceClient>()
               .createClientManager(
-                  new DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+                  new ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
 
-  private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager;
+  private final IClientManager<TEndPoint, IoTConsensusServiceClient> syncClientManager;
 
   public TestRPCClient() {
     syncClientManager =
-        new IClientManager.Factory<TEndPoint, SyncIoTConsensusServiceClient>()
+        new IClientManager.Factory<TEndPoint, IoTConsensusServiceClient>()
             .createClientManager(
                 new IoTConsensusClientPool.SyncIoTConsensusServiceClientPoolFactory(
                     new IoTConsensusConfig.Builder().build()));
@@ -65,8 +65,8 @@ public class TestRPCClient {
   }
 
   private void loadSnapshot() {
-    try (SyncIoTConsensusServiceClient client =
-        syncClientManager.borrowClient(new TEndPoint("127.0.0.1", 10761))) {
+    try (IoTConsensusServiceClient client =
+        syncClientManager.borrowClient(new TEndPoint("127.0.0.1", 40011))) {
       TTriggerSnapshotLoadRes res =
           client.triggerSnapshotLoad(
               new TTriggerSnapshotLoadReq(
@@ -78,8 +78,8 @@ public class TestRPCClient {
   }
 
   private void testAddPeer() {
-    try (SyncIoTConsensusServiceClient client =
-        syncClientManager.borrowClient(new TEndPoint("127.0.0.1", 10762))) {
+    try (IoTConsensusServiceClient client =
+        syncClientManager.borrowClient(new TEndPoint("127.0.0.1", 40012))) {
       TInactivatePeerRes res =
           client.inactivatePeer(
               new TInactivatePeerReq(new DataRegionId(1).convertToTConsensusGroupId()));
@@ -90,8 +90,8 @@ public class TestRPCClient {
   }
 
   private void removeRegionPeer() {
-    try (SyncDataNodeInternalServiceClient client =
-        INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 10730))) {
+    try (DataNodeInternalServiceClient client =
+        INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 9003))) {
       client.removeRegionPeer(
           new TMaintainPeerReq(new DataRegionId(1).convertToTConsensusGroupId(), getLocation2(3)));
     } catch (Exception e) {
@@ -100,8 +100,8 @@ public class TestRPCClient {
   }
 
   private void addPeer() {
-    try (SyncDataNodeInternalServiceClient client =
-        INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 10730))) {
+    try (DataNodeInternalServiceClient client =
+        INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 9003))) {
       client.addRegionPeer(
           new TMaintainPeerReq(new DataRegionId(1).convertToTConsensusGroupId(), getLocation2(3)));
     } catch (Exception e) {
@@ -130,8 +130,8 @@ public class TestRPCClient {
   }
 
   private void createDataRegion() {
-    try (SyncDataNodeInternalServiceClient client =
-        INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 10732))) {
+    try (DataNodeInternalServiceClient client =
+        INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 9005))) {
       TCreateDataRegionReq req = new TCreateDataRegionReq();
       req.setStorageGroup("root.test.g_0");
       TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
index 2c70aaf3d0..e3e937d04d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
@@ -40,8 +40,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
 import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
@@ -72,8 +72,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
   private final PartitionCache partitionCache;
 
   private final IClientManager<ConfigNodeRegionId, ConfigNodeClient> configNodeClientManager =
-      new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
-          .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+      ConfigNodeClientManager.getInstance();
 
   private static final class ClusterPartitionFetcherHolder {
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
index 040c568b40..249be0db20 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
@@ -42,8 +42,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
 import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
@@ -106,8 +106,7 @@ public class PartitionCache {
   private final ReentrantReadWriteLock regionReplicaSetLock = new ReentrantReadWriteLock();
 
   private final IClientManager<ConfigNodeRegionId, ConfigNodeClient> configNodeClientManager =
-      new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
-          .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+      ConfigNodeClientManager.getInstance();
 
   public PartitionCache() {
     this.schemaPartitionCache = Caffeine.newBuilder().maximumSize(cacheSize).build();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index d4a40f0df0..bd29c7299a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.mpp.plan.execution;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.sync.DataNodeInternalServiceClient;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -118,7 +118,7 @@ public class QueryExecution implements IQueryExecution {
   // We use this SourceHandle to fetch the TsBlock from it.
   private ISourceHandle resultHandle;
 
-  private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+  private final IClientManager<TEndPoint, DataNodeInternalServiceClient>
       internalServiceClientManager;
 
   private AtomicBoolean stopped;
@@ -131,7 +131,7 @@ public class QueryExecution implements IQueryExecution {
       ScheduledExecutorService scheduledExecutor,
       IPartitionFetcher partitionFetcher,
       ISchemaFetcher schemaFetcher,
-      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
+      IClientManager<TEndPoint, DataNodeInternalServiceClient> internalServiceClientManager) {
     this.rawStatement = statement;
     this.executor = executor;
     this.writeOperationExecutor = writeOperationExecutor;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index ced8aa4dfd..c15504346e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -73,6 +73,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
 import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
 import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
 import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.localconfignode.LocalConfigNode;
@@ -175,10 +176,9 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
   private static final Logger LOGGER = LoggerFactory.getLogger(ClusterConfigTaskExecutor.class);
 
   private static final IClientManager<ConfigNodeRegionId, ConfigNodeClient>
-      CONFIG_NODE_CLIENT_MANAGER =
-          new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
-              .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+      CONFIG_NODE_CLIENT_MANAGER = ConfigNodeClientManager.getInstance();
 
+  /** Consolidate this clientManager with the upper one */
   private static final IClientManager<ConfigNodeRegionId, ConfigNodeClient>
       CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER =
           new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
@@ -186,6 +186,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
                   new DataNodeClientPoolFactory.ClusterDeletionConfigNodeClientPoolFactory());
 
   private static final class ClusterConfigTaskExecutorHolder {
+
     private static final ClusterConfigTaskExecutor INSTANCE = new ClusterConfigTaskExecutor();
 
     private ClusterConfigTaskExecutorHolder() {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
index bebb38f2b6..1b68e91402 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.sync.DataNodeInternalServiceClient;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceFailureInfo;
@@ -49,14 +49,14 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
   protected final String localhostIpAddr;
   protected final int localhostInternalPort;
 
-  private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+  private final IClientManager<TEndPoint, DataNodeInternalServiceClient>
       internalServiceClientManager;
 
   public AbstractFragInsStateTracker(
       QueryStateMachine stateMachine,
       ScheduledExecutorService scheduledExecutor,
       List<FragmentInstance> instances,
-      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
+      IClientManager<TEndPoint, DataNodeInternalServiceClient> internalServiceClientManager) {
     this.stateMachine = stateMachine;
     this.scheduledExecutor = scheduledExecutor;
     this.instances = instances;
@@ -81,7 +81,7 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
         return new FragmentInstanceInfo(FragmentInstanceState.NO_SUCH_INSTANCE);
       }
     } else {
-      try (SyncDataNodeInternalServiceClient client =
+      try (DataNodeInternalServiceClient client =
           internalServiceClientManager.borrowClient(endPoint)) {
         TFragmentInstanceInfoResp resp =
             client.fetchFragmentInstanceInfo(new TFetchFragmentInstanceInfoReq(getTId(instance)));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index fde66f115b..e24cbd2f11 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.sync.DataNodeInternalServiceClient;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
@@ -69,7 +69,7 @@ public class ClusterScheduler implements IScheduler {
       ExecutorService executor,
       ExecutorService writeOperationExecutor,
       ScheduledExecutorService scheduledExecutor,
-      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
+      IClientManager<TEndPoint, DataNodeInternalServiceClient> internalServiceClientManager) {
     this.stateMachine = stateMachine;
     this.instances = instances;
     this.queryType = queryType;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
index 80b90b9a21..076394da8a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.sync.DataNodeInternalServiceClient;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
@@ -60,7 +60,7 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
       QueryStateMachine stateMachine,
       ScheduledExecutorService scheduledExecutor,
       List<FragmentInstance> instances,
-      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
+      IClientManager<TEndPoint, DataNodeInternalServiceClient> internalServiceClientManager) {
     super(stateMachine, scheduledExecutor, instances, internalServiceClientManager);
     this.aborted = false;
     this.instanceStateMap = new HashMap<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index db04e97f0b..0f69212567 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.sync.DataNodeInternalServiceClient;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
@@ -64,7 +64,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
   private final MPPQueryContext queryContext;
   private final String localhostIpAddr;
   private final int localhostInternalPort;
-  private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+  private final IClientManager<TEndPoint, DataNodeInternalServiceClient>
       internalServiceClientManager;
 
   public FragmentInstanceDispatcherImpl(
@@ -72,7 +72,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
       MPPQueryContext queryContext,
       ExecutorService executor,
       ExecutorService writeOperationExecutor,
-      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
+      IClientManager<TEndPoint, DataNodeInternalServiceClient> internalServiceClientManager) {
     this.type = type;
     this.queryContext = queryContext;
     this.executor = executor;
@@ -146,7 +146,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
 
   private void dispatchRemote(FragmentInstance instance, TEndPoint endPoint)
       throws FragmentInstanceDispatchException {
-    try (SyncDataNodeInternalServiceClient client =
+    try (DataNodeInternalServiceClient client =
         internalServiceClientManager.borrowClient(endPoint)) {
       switch (instance.getType()) {
         case READ:
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
index 14bace8dc2..1c7c9a2322 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.sync.DataNodeInternalServiceClient;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
@@ -52,14 +52,14 @@ public class SimpleQueryTerminator implements IQueryTerminator {
   private List<TEndPoint> relatedHost;
   private Map<TEndPoint, List<TFragmentInstanceId>> ownedFragmentInstance;
 
-  private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+  private final IClientManager<TEndPoint, DataNodeInternalServiceClient>
       internalServiceClientManager;
 
   public SimpleQueryTerminator(
       ScheduledExecutorService scheduledExecutor,
       MPPQueryContext queryContext,
       List<FragmentInstance> fragmentInstances,
-      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager,
+      IClientManager<TEndPoint, DataNodeInternalServiceClient> internalServiceClientManager,
       IFragInstanceStateTracker stateTracker) {
     this.scheduledExecutor = scheduledExecutor;
     this.queryId = queryContext.getQueryId();
@@ -97,7 +97,7 @@ public class SimpleQueryTerminator implements IQueryTerminator {
       if (unfinishedFIs.isEmpty()) {
         continue;
       }
-      try (SyncDataNodeInternalServiceClient client =
+      try (DataNodeInternalServiceClient client =
           internalServiceClientManager.borrowClient(endPoint)) {
         client.cancelQuery(new TCancelQueryReq(queryId.getId(), unfinishedFIs));
       } catch (ClientManagerException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 80d9ec3516..3dad056085 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.sync.DataNodeInternalServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
@@ -65,12 +65,12 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher {
   private String uuid;
   private final String localhostIpAddr;
   private final int localhostInternalPort;
-  private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+  private final IClientManager<TEndPoint, DataNodeInternalServiceClient>
       internalServiceClientManager;
   private final ExecutorService executor;
 
   public LoadTsFileDispatcherImpl(
-      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
+      IClientManager<TEndPoint, DataNodeInternalServiceClient> internalServiceClientManager) {
     this.internalServiceClientManager = internalServiceClientManager;
     this.localhostIpAddr = IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
     this.localhostInternalPort = IoTDBDescriptor.getInstance().getConfig().getInternalPort();
@@ -123,7 +123,7 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher {
 
   private void dispatchRemote(FragmentInstance instance, TEndPoint endPoint)
       throws FragmentInstanceDispatchException {
-    try (SyncDataNodeInternalServiceClient client =
+    try (DataNodeInternalServiceClient client =
         internalServiceClientManager.borrowClient(endPoint)) {
       TTsFilePieceReq loadTsFileReq =
           new TTsFilePieceReq(
@@ -215,7 +215,7 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher {
 
   private void dispatchRemote(TLoadCommandReq loadCommandReq, TEndPoint endPoint)
       throws FragmentInstanceDispatchException {
-    try (SyncDataNodeInternalServiceClient client =
+    try (DataNodeInternalServiceClient client =
         internalServiceClientManager.borrowClient(endPoint)) {
       TLoadResp loadResp = client.sendLoadCommand(loadCommandReq);
       if (!loadResp.isAccepted()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
index ada2fab6ae..caee196f18 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileScheduler.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.sync.DataNodeInternalServiceClient;
 import org.apache.iotdb.commons.partition.StorageExecutor;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -95,7 +95,7 @@ public class LoadTsFileScheduler implements IScheduler {
       DistributedQueryPlan distributedQueryPlan,
       MPPQueryContext queryContext,
       QueryStateMachine stateMachine,
-      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
+      IClientManager<TEndPoint, DataNodeInternalServiceClient> internalServiceClientManager) {
     this.queryContext = queryContext;
     this.stateMachine = stateMachine;
     this.tsFileNodeList = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
index e596c89ede..10d4376d2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
@@ -30,8 +30,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
 import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
 import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -50,9 +50,7 @@ public class ClusterSyncInfoFetcher implements ISyncInfoFetcher {
   private static final Logger LOGGER = LoggerFactory.getLogger(ClusterSyncInfoFetcher.class);
 
   private static final IClientManager<ConfigNodeRegionId, ConfigNodeClient>
-      CONFIG_NODE_CLIENT_MANAGER =
-          new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
-              .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+      CONFIG_NODE_CLIENT_MANAGER = ConfigNodeClientManager.getInstance();
 
   // region Interfaces of PipeSink
 
diff --git a/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java b/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
index 8d5eb5cdcf..6028ebe48f 100644
--- a/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
@@ -20,10 +20,9 @@
 package org.apache.iotdb.db.trigger.executor;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.sync.DataNodeInternalServiceClient;
 import org.apache.iotdb.commons.consensus.ConfigNodeRegionId;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
@@ -31,9 +30,10 @@ import org.apache.iotdb.commons.trigger.TriggerTable;
 import org.apache.iotdb.commons.trigger.exception.TriggerExecutionException;
 import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
 import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
@@ -68,16 +68,8 @@ public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEv
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TriggerFireVisitor.class);
 
-  private static final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
-      INTERNAL_SERVICE_CLIENT_MANAGER =
-          new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
-              .createClientManager(
-                  new DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
-
   private static final IClientManager<ConfigNodeRegionId, ConfigNodeClient>
-      CONFIG_NODE_CLIENT_MANAGER =
-          new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
-              .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+      CONFIG_NODE_CLIENT_MANAGER = ConfigNodeClientManager.getInstance();
 
   /**
    * How many times should we retry when error occurred during firing a trigger on another datanode
@@ -327,8 +319,10 @@ public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEv
         TDataNodeLocation tDataNodeLocation =
             TriggerManagementService.getInstance()
                 .getDataNodeLocationOfStatefulTrigger(triggerName);
-        try (SyncDataNodeInternalServiceClient client =
-            INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(tDataNodeLocation.getInternalEndPoint())) {
+        try (DataNodeInternalServiceClient client =
+            Coordinator.getInstance()
+                .getInternalServiceClientManager()
+                .borrowClient(tDataNodeLocation.getInternalEndPoint())) {
           TFireTriggerReq req = new TFireTriggerReq(triggerName, tablet.serialize(), event.getId());
           TFireTriggerResp resp = client.fireTrigger(req);
           if (resp.foundExecutor) {
diff --git a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerInformationUpdater.java b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerInformationUpdater.java
index 0aadf06daa..f4b015074d 100644
--- a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerInformationUpdater.java
+++ b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerInformationUpdater.java
@@ -27,8 +27,8 @@ import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
 import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
@@ -45,9 +45,7 @@ public class TriggerInformationUpdater {
   private static final Logger LOGGER = LoggerFactory.getLogger(TriggerInformationUpdater.class);
 
   private static final IClientManager<ConfigNodeRegionId, ConfigNodeClient>
-      CONFIG_NODE_CLIENT_MANAGER =
-          new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
-              .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+      CONFIG_NODE_CLIENT_MANAGER = ConfigNodeClientManager.getInstance();
 
   private final ScheduledExecutorService triggerInformationUpdateExecutor =
       IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java
index bc0c5d5207..60eb0ddc84 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java
@@ -20,9 +20,9 @@
 package org.apache.iotdb.db.mpp.execution.exchange;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
+import org.apache.iotdb.commons.client.sync.DataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
@@ -53,10 +53,9 @@ public class MPPDataExchangeManagerTest {
             mockLocalMemoryManager,
             new TsBlockSerdeFactory(),
             Executors.newSingleThreadExecutor(),
-            new IClientManager.Factory<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>()
+            new IClientManager.Factory<TEndPoint, DataNodeMPPDataExchangeServiceClient>()
                 .createClientManager(
-                    new DataNodeClientPoolFactory
-                        .SyncDataNodeMPPDataExchangeServiceClientPoolFactory()));
+                    new ClientPoolFactory.SyncDataNodeMPPDataExchangeServiceClientPoolFactory()));
 
     ISinkHandle localSinkHandle =
         mppDataExchangeManager.createLocalSinkHandle(
@@ -96,10 +95,9 @@ public class MPPDataExchangeManagerTest {
             mockLocalMemoryManager,
             new TsBlockSerdeFactory(),
             Executors.newSingleThreadExecutor(),
-            new IClientManager.Factory<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>()
+            new IClientManager.Factory<TEndPoint, DataNodeMPPDataExchangeServiceClient>()
                 .createClientManager(
-                    new DataNodeClientPoolFactory
-                        .SyncDataNodeMPPDataExchangeServiceClientPoolFactory()));
+                    new ClientPoolFactory.SyncDataNodeMPPDataExchangeServiceClientPoolFactory()));
 
     ISourceHandle localSourceHandle =
         mppDataExchangeManager.createLocalSourceHandle(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
index 0c8ebf9f73..41d830263d 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.mpp.execution.exchange;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
+import org.apache.iotdb.commons.client.sync.DataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
@@ -62,11 +62,11 @@ public class SinkHandleTest {
     LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
     MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
     Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
-    IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> mockClientManager =
+    IClientManager<TEndPoint, DataNodeMPPDataExchangeServiceClient> mockClientManager =
         Mockito.mock(IClientManager.class);
     // Construct a mock client.
-    SyncDataNodeMPPDataExchangeServiceClient mockClient =
-        Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
+    DataNodeMPPDataExchangeServiceClient mockClient =
+        Mockito.mock(DataNodeMPPDataExchangeServiceClient.class);
     try {
       Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
       Mockito.doNothing()
@@ -211,11 +211,11 @@ public class SinkHandleTest {
     SinkHandleListener mockSinkHandleListener = Mockito.mock(SinkHandleListener.class);
     // Construct several mock TsBlock(s).
     List<TsBlock> mockTsBlocks = Utils.createMockTsBlocks(numOfMockTsBlock, mockTsBlockSize);
-    IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> mockClientManager =
+    IClientManager<TEndPoint, DataNodeMPPDataExchangeServiceClient> mockClientManager =
         Mockito.mock(IClientManager.class);
     // Construct a mock client.
-    SyncDataNodeMPPDataExchangeServiceClient mockClient =
-        Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
+    DataNodeMPPDataExchangeServiceClient mockClient =
+        Mockito.mock(DataNodeMPPDataExchangeServiceClient.class);
     try {
       Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
       Mockito.doNothing()
@@ -410,11 +410,11 @@ public class SinkHandleTest {
     SinkHandleListener mockSinkHandleListener = Mockito.mock(SinkHandleListener.class);
     // Construct several mock TsBlock(s).
     List<TsBlock> mockTsBlocks = Utils.createMockTsBlocks(numOfMockTsBlock, mockTsBlockSize);
-    IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> mockClientManager =
+    IClientManager<TEndPoint, DataNodeMPPDataExchangeServiceClient> mockClientManager =
         Mockito.mock(IClientManager.class);
     // Construct a mock client.
-    SyncDataNodeMPPDataExchangeServiceClient mockClient =
-        Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
+    DataNodeMPPDataExchangeServiceClient mockClient =
+        Mockito.mock(DataNodeMPPDataExchangeServiceClient.class);
     TException mockException = new TException("Mock exception");
     try {
       Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
@@ -520,11 +520,11 @@ public class SinkHandleTest {
     SinkHandleListener mockSinkHandleListener = Mockito.mock(SinkHandleListener.class);
     // Construct several mock TsBlock(s).
     List<TsBlock> mockTsBlocks = Utils.createMockTsBlocks(numOfMockTsBlock, mockTsBlockSize);
-    IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> mockClientManager =
+    IClientManager<TEndPoint, DataNodeMPPDataExchangeServiceClient> mockClientManager =
         Mockito.mock(IClientManager.class);
     // Construct a mock client.
-    SyncDataNodeMPPDataExchangeServiceClient mockClient =
-        Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
+    DataNodeMPPDataExchangeServiceClient mockClient =
+        Mockito.mock(DataNodeMPPDataExchangeServiceClient.class);
     try {
       Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
       Mockito.doNothing()
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
index aa9adbace6..62809e13aa 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.mpp.execution.exchange;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
+import org.apache.iotdb.commons.client.sync.DataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
@@ -62,11 +62,11 @@ public class SourceHandleTest {
     LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
     MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
     Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
-    IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> mockClientManager =
+    IClientManager<TEndPoint, DataNodeMPPDataExchangeServiceClient> mockClientManager =
         Mockito.mock(IClientManager.class);
     // Construct a mock client.
-    SyncDataNodeMPPDataExchangeServiceClient mockClient =
-        Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
+    DataNodeMPPDataExchangeServiceClient mockClient =
+        Mockito.mock(DataNodeMPPDataExchangeServiceClient.class);
     try {
       Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
       Mockito.doAnswer(
@@ -178,11 +178,11 @@ public class SourceHandleTest {
     MemoryPool spyMemoryPool =
         Mockito.spy(new MemoryPool("test", 10 * mockTsBlockSize, 5 * mockTsBlockSize));
     Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(spyMemoryPool);
-    IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> mockClientManager =
+    IClientManager<TEndPoint, DataNodeMPPDataExchangeServiceClient> mockClientManager =
         Mockito.mock(IClientManager.class);
     // Construct a mock client.
-    SyncDataNodeMPPDataExchangeServiceClient mockClient =
-        Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
+    DataNodeMPPDataExchangeServiceClient mockClient =
+        Mockito.mock(DataNodeMPPDataExchangeServiceClient.class);
     try {
       Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
       Mockito.doAnswer(
@@ -332,11 +332,11 @@ public class SourceHandleTest {
     SourceHandleListener mockSourceHandleListener = Mockito.mock(SourceHandleListener.class);
     // Construct a mock TsBlockSerde that deserializes any bytebuffer into a mock TsBlock.
     TsBlockSerde mockTsBlockSerde = Utils.createMockTsBlockSerde(mockTsBlockSize);
-    IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> mockClientManager =
+    IClientManager<TEndPoint, DataNodeMPPDataExchangeServiceClient> mockClientManager =
         Mockito.mock(IClientManager.class);
     // Construct a mock client.
-    SyncDataNodeMPPDataExchangeServiceClient mockClient =
-        Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
+    DataNodeMPPDataExchangeServiceClient mockClient =
+        Mockito.mock(DataNodeMPPDataExchangeServiceClient.class);
     try {
       Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
       Mockito.doAnswer(
@@ -512,11 +512,11 @@ public class SourceHandleTest {
     SourceHandleListener mockSourceHandleListener = Mockito.mock(SourceHandleListener.class);
     // Construct a mock TsBlockSerde that deserializes any bytebuffer into a mock TsBlock.
     TsBlockSerde mockTsBlockSerde = Utils.createMockTsBlockSerde(mockTsBlockSize);
-    IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> mockClientManager =
+    IClientManager<TEndPoint, DataNodeMPPDataExchangeServiceClient> mockClientManager =
         Mockito.mock(IClientManager.class);
     // Construct a mock client.
-    SyncDataNodeMPPDataExchangeServiceClient mockClient =
-        Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
+    DataNodeMPPDataExchangeServiceClient mockClient =
+        Mockito.mock(DataNodeMPPDataExchangeServiceClient.class);
     TException mockException = new TException("Mock exception");
     try {
       Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
@@ -587,11 +587,11 @@ public class SourceHandleTest {
     LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
     MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
     Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
-    IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> mockClientManager =
+    IClientManager<TEndPoint, DataNodeMPPDataExchangeServiceClient> mockClientManager =
         Mockito.mock(IClientManager.class);
     // Construct a mock client.
-    SyncDataNodeMPPDataExchangeServiceClient mockClient =
-        Mockito.mock(SyncDataNodeMPPDataExchangeServiceClient.class);
+    DataNodeMPPDataExchangeServiceClient mockClient =
+        Mockito.mock(DataNodeMPPDataExchangeServiceClient.class);
     try {
       Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
       Mockito.doAnswer(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
index de492a843b..a35b56be26 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
@@ -20,10 +20,10 @@
 package org.apache.iotdb.db.mpp.plan.plan;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.sync.DataNodeInternalServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
@@ -44,15 +44,15 @@ import java.time.ZoneId;
 
 public class QueryPlannerTest {
 
-  private static IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+  private static IClientManager<TEndPoint, DataNodeInternalServiceClient>
       internalServiceClientManager;
 
   @BeforeClass
   public static void setUp() {
     internalServiceClientManager =
-        new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
+        new IClientManager.Factory<TEndPoint, DataNodeInternalServiceClient>()
             .createClientManager(
-                new DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+                new ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
   }
 
   @AfterClass