You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/01/06 05:56:00 UTC

[iotdb] 01/01: [IOTDB-5312] Consolidate ClientManagers in Datanodes for unified management (#8654)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch jira5312
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7142f77f3e0596d86137b7ed84403d66987038b8
Author: Potato <ta...@apache.org>
AuthorDate: Fri Jan 6 13:31:05 2023 +0800

    [IOTDB-5312] Consolidate ClientManagers in Datanodes for unified management (#8654)
    
    * finish
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
 .../client/sync/SyncConfigNodeClientPool.java      |   5 +-
 .../service/thrift/ConfigNodeRPCService.java       |   2 +-
 .../iot/client/AsyncIoTConsensusServiceClient.java |  73 +++++++------
 .../iot/client/IoTConsensusClientPool.java         |  10 +-
 .../iot/client/SyncIoTConsensusServiceClient.java  |  72 ++++--------
 .../apache/iotdb/consensus/ratis/RatisClient.java  |  15 +--
 .../iotdb/consensus/ratis/RatisConsensus.java      |  17 ++-
 .../java/org/apache/iotdb/it/env/AbstractEnv.java  |   5 +-
 .../org/apache/iotdb/it/env/RemoteServerEnv.java   |   5 +-
 .../apache/iotdb/commons/client/ClientManager.java |   4 +
 .../iotdb/commons/client/ClientPoolFactory.java    | 121 ++++++++++++++++++---
 .../apache/iotdb/commons/client/ThriftClient.java  |  89 +++++++++++++++
 .../AsyncConfigNodeHeartbeatServiceClient.java     |  73 +++++++------
 .../async/AsyncConfigNodeIServiceClient.java       |  75 +++++++------
 .../async/AsyncDataNodeHeartbeatServiceClient.java |  73 +++++++------
 .../async/AsyncDataNodeInternalServiceClient.java  |  73 +++++++------
 .../AsyncDataNodeMPPDataExchangeServiceClient.java |  77 ++++++-------
 .../BorrowNullClientManagerException.java}         |  16 ++-
 .../client/exception/ClientManagerException.java   |   5 +
 ...ava => CreateTAsyncClientManagerException.java} |   7 +-
 .../AsyncThriftClientFactory.java}                 |  42 +++----
 .../client/{ => factory}/BaseClientFactory.java    |   9 +-
 .../ThriftClientFactory.java}                      |  16 ++-
 .../client/{ => property}/ClientPoolProperty.java  |   6 +-
 .../ThriftClientProperty.java}                     |  12 +-
 .../client/sync/SyncConfigNodeIServiceClient.java  |  51 ++++-----
 .../sync/SyncDataNodeInternalServiceClient.java    |  67 ++++++------
 .../SyncDataNodeMPPDataExchangeServiceClient.java  |  57 +++++-----
 .../sync/SyncThriftClientWithErrorHandler.java     |  62 ++---------
 .../apache/iotdb/commons/conf/CommonConfig.java    |  43 +++++---
 .../iotdb/commons/conf/CommonDescriptor.java       |  51 ++++++++-
 .../iotdb/commons/client/ClientManagerTest.java    |  73 +++++++++----
 .../iotdb/db/auth/ClusterAuthorityFetcher.java     |   6 +-
 .../apache/iotdb/db/client/ConfigNodeClient.java   |  34 +++---
 .../iotdb/db/client/ConfigNodeClientManager.java   |  19 +++-
 .../iotdb/db/client/DataNodeClientPoolFactory.java | 117 +-------------------
 .../metadata/template/ClusterTemplateManager.java  |   6 +-
 .../execution/exchange/MPPDataExchangeService.java |   5 +-
 .../org/apache/iotdb/db/mpp/plan/Coordinator.java  |  10 +-
 .../apache/iotdb/db/mpp/plan/TestRPCClient.java    |  14 +--
 .../mpp/plan/analyze/ClusterPartitionFetcher.java  |   5 +-
 .../db/mpp/plan/analyze/cache/PartitionCache.java  |   5 +-
 .../config/executor/ClusterConfigTaskExecutor.java |   7 +-
 .../db/sync/common/ClusterSyncInfoFetcher.java     |   6 +-
 .../db/trigger/executor/TriggerFireVisitor.java    |  18 +--
 .../trigger/service/TriggerInformationUpdater.java |   6 +-
 .../exchange/MPPDataExchangeManagerTest.java       |   8 +-
 .../iotdb/db/mpp/plan/plan/QueryPlannerTest.java   |   4 +-
 48 files changed, 837 insertions(+), 739 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
index 0d0f5336f3..cd19d0add8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.client.sync;
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
@@ -28,7 +29,6 @@ import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
 import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -52,8 +52,7 @@ public class SyncConfigNodeClientPool {
   private SyncConfigNodeClientPool() {
     clientManager =
         new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
-            .createClientManager(
-                new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
+            .createClientManager(new ClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
     configNodeLeader = new TEndPoint();
   }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCService.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCService.java
index e467d4b4bb..824b02b3e2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCService.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCService.java
@@ -74,7 +74,7 @@ public class ConfigNodeRPCService extends ThriftService implements ConfigNodeRPC
               configConf.getCnRpcMaxConcurrentClientNum(),
               configConf.getThriftServerAwaitTimeForStopService(),
               new ConfigNodeRPCServiceHandler(),
-              commonConfig.isCnRpcThriftCompressionEnabled());
+              commonConfig.isRpcThriftCompressionEnabled());
     } catch (RPCServiceException e) {
       throw new IllegalAccessException(e.getMessage());
     }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
index 1f08dabbc6..f84b583924 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
@@ -20,9 +20,10 @@
 package org.apache.iotdb.consensus.iot.client;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.AsyncBaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.consensus.iot.thrift.IoTConsensusIService;
 import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
 
@@ -35,7 +36,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
-public class AsyncIoTConsensusServiceClient extends IoTConsensusIService.AsyncClient {
+public class AsyncIoTConsensusServiceClient extends IoTConsensusIService.AsyncClient
+    implements ThriftClient {
 
   private static final Logger logger =
       LoggerFactory.getLogger(AsyncIoTConsensusServiceClient.class);
@@ -58,41 +60,44 @@ public class AsyncIoTConsensusServiceClient extends IoTConsensusIService.AsyncCl
     this.clientManager = clientManager;
   }
 
-  public void close() {
-    ___transport.close();
-    ___currentMethod = null;
-  }
-
-  /**
-   * return self if clientManager is not null, the method doesn't need to call by user, it will
-   * trigger once client transport complete.
-   */
-  private void returnSelf() {
-    if (clientManager != null) {
-      clientManager.returnClient(endpoint, this);
-    }
-  }
-
-  /**
-   * This method will be automatically called by the thrift selector thread, and we'll just simulate
-   * the behavior in our test
-   */
   @Override
   public void onComplete() {
     super.onComplete();
     returnSelf();
   }
 
-  /**
-   * This method will be automatically called by the thrift selector thread, and we'll just simulate
-   * the behavior in our test
-   */
   @Override
   public void onError(Exception e) {
     super.onError(e);
+    ThriftClient.resolveException(e, this);
     returnSelf();
   }
 
+  @Override
+  public void invalidate() {
+    if (!hasError()) {
+      super.onError(new Exception("This client has been invalidated"));
+    }
+  }
+
+  @Override
+  public void invalidateAll() {
+    clientManager.clear(endpoint);
+  }
+
+  /**
+   * return self, the method doesn't need to be called by the user and will be triggered after the
+   * RPC is finished.
+   */
+  private void returnSelf() {
+    clientManager.returnClient(endpoint, this);
+  }
+
+  private void close() {
+    ___transport.close();
+    ___currentMethod = null;
+  }
+
   public boolean isReady() {
     try {
       checkReady();
@@ -109,13 +114,13 @@ public class AsyncIoTConsensusServiceClient extends IoTConsensusIService.AsyncCl
   }
 
   public static class Factory
-      extends AsyncBaseClientFactory<TEndPoint, AsyncIoTConsensusServiceClient> {
+      extends AsyncThriftClientFactory<TEndPoint, AsyncIoTConsensusServiceClient> {
 
     public Factory(
         ClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager,
-        ClientFactoryProperty clientFactoryProperty,
+        ThriftClientProperty thriftClientProperty,
         String threadName) {
-      super(clientManager, clientFactoryProperty, threadName);
+      super(clientManager, thriftClientProperty, threadName);
     }
 
     @Override
@@ -127,21 +132,19 @@ public class AsyncIoTConsensusServiceClient extends IoTConsensusIService.AsyncCl
     @Override
     public PooledObject<AsyncIoTConsensusServiceClient> makeObject(TEndPoint endPoint)
         throws Exception {
-      TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % tManagers.length];
-      tManager = tManager == null ? new TAsyncClientManager() : tManager;
       return new DefaultPooledObject<>(
           new AsyncIoTConsensusServiceClient(
-              clientFactoryProperty.getProtocolFactory(),
-              clientFactoryProperty.getConnectionTimeoutMs(),
+              thriftClientProperty.getProtocolFactory(),
+              thriftClientProperty.getConnectionTimeoutMs(),
               endPoint,
-              tManager,
+              tManagers[clientCnt.incrementAndGet() % tManagers.length],
               clientManager));
     }
 
     @Override
     public boolean validateObject(
         TEndPoint endPoint, PooledObject<AsyncIoTConsensusServiceClient> pooledObject) {
-      return pooledObject.getObject() != null && pooledObject.getObject().isReady();
+      return pooledObject.getObject().isReady();
     }
   }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
index babc9fb4f5..eddb1067fd 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
@@ -20,10 +20,10 @@
 package org.apache.iotdb.consensus.iot.client;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
-import org.apache.iotdb.commons.client.ClientPoolProperty;
 import org.apache.iotdb.commons.client.IClientPoolFactory;
+import org.apache.iotdb.commons.client.property.ClientPoolProperty;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.consensus.config.IoTConsensusConfig;
 
 import org.apache.commons.pool2.KeyedObjectPool;
@@ -48,11 +48,9 @@ public class IoTConsensusClientPool {
       return new GenericKeyedObjectPool<>(
           new SyncIoTConsensusServiceClient.Factory(
               manager,
-              new ClientFactoryProperty.Builder()
+              new ThriftClientProperty.Builder()
                   .setConnectionTimeoutMs(config.getRpc().getConnectionTimeoutInMs())
                   .setRpcThriftCompressionEnabled(config.getRpc().isRpcThriftCompressionEnabled())
-                  .setSelectorNumOfAsyncClientManager(
-                      config.getRpc().getSelectorNumOfClientManager())
                   .build()),
           new ClientPoolProperty.Builder<SyncIoTConsensusServiceClient>().build().getConfig());
     }
@@ -74,7 +72,7 @@ public class IoTConsensusClientPool {
       return new GenericKeyedObjectPool<>(
           new AsyncIoTConsensusServiceClient.Factory(
               manager,
-              new ClientFactoryProperty.Builder()
+              new ThriftClientProperty.Builder()
                   .setConnectionTimeoutMs(config.getRpc().getConnectionTimeoutInMs())
                   .setRpcThriftCompressionEnabled(config.getRpc().isRpcThriftCompressionEnabled())
                   .setSelectorNumOfAsyncClientManager(
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java
index 06fb777810..638893b116 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java
@@ -20,16 +20,14 @@
 package org.apache.iotdb.consensus.iot.client;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.BaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
-import org.apache.iotdb.commons.client.sync.SyncThriftClient;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.client.sync.SyncThriftClientWithErrorHandler;
-import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.iot.thrift.IoTConsensusIService;
 import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.rpc.TConfigurationConst;
-import org.apache.iotdb.rpc.TimeoutChangeableTransport;
 
 import org.apache.commons.pool2.PooledObject;
 import org.apache.commons.pool2.impl.DefaultPooledObject;
@@ -37,19 +35,16 @@ import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
-import java.lang.reflect.Constructor;
-import java.net.SocketException;
-
 public class SyncIoTConsensusServiceClient extends IoTConsensusIService.Client
-    implements SyncThriftClient, AutoCloseable {
+    implements ThriftClient, AutoCloseable {
 
-  private final TEndPoint endPoint;
+  private final TEndPoint endpoint;
   private final ClientManager<TEndPoint, SyncIoTConsensusServiceClient> clientManager;
 
   public SyncIoTConsensusServiceClient(
       TProtocolFactory protocolFactory,
       int connectionTimeout,
-      TEndPoint endPoint,
+      TEndPoint endpoint,
       ClientManager<TEndPoint, SyncIoTConsensusServiceClient> clientManager)
       throws TTransportException {
     super(
@@ -57,59 +52,41 @@ public class SyncIoTConsensusServiceClient extends IoTConsensusIService.Client
             RpcTransportFactory.INSTANCE.getTransport(
                 new TSocket(
                     TConfigurationConst.defaultTConfiguration,
-                    endPoint.getIp(),
-                    endPoint.getPort(),
+                    endpoint.getIp(),
+                    endpoint.getPort(),
                     connectionTimeout))));
-    this.endPoint = endPoint;
+    this.endpoint = endpoint;
     this.clientManager = clientManager;
     getInputProtocol().getTransport().open();
   }
 
-  @TestOnly
-  public TEndPoint getTEndpoint() {
-    return endPoint;
-  }
-
-  @TestOnly
-  public ClientManager<TEndPoint, SyncIoTConsensusServiceClient> getClientManager() {
-    return clientManager;
-  }
-
+  @Override
   public void close() {
-    if (clientManager != null) {
-      clientManager.returnClient(endPoint, this);
-    }
-  }
-
-  public void setTimeout(int timeout) {
-    // the same transport is used in both input and output
-    ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
+    clientManager.returnClient(endpoint, this);
   }
 
+  @Override
   public void invalidate() {
     getInputProtocol().getTransport().close();
   }
 
   @Override
   public void invalidateAll() {
-    clientManager.clear(endPoint);
-  }
-
-  public int getTimeout() throws SocketException {
-    return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
+    clientManager.clear(endpoint);
   }
 
   @Override
   public String toString() {
-    return String.format("SyncIoTConsensusServiceClient{%s}", endPoint);
+    return String.format("SyncIoTConsensusServiceClient{%s}", endpoint);
   }
 
-  public static class Factory extends BaseClientFactory<TEndPoint, SyncIoTConsensusServiceClient> {
+  public static class Factory
+      extends ThriftClientFactory<TEndPoint, SyncIoTConsensusServiceClient> {
 
     public Factory(
         ClientManager<TEndPoint, SyncIoTConsensusServiceClient> clientManager,
-        ClientFactoryProperty clientFactoryProperty) {
-      super(clientManager, clientFactoryProperty);
+        ThriftClientProperty thriftClientProperty) {
+      super(clientManager, thriftClientProperty);
     }
 
     @Override
@@ -121,15 +98,13 @@ public class SyncIoTConsensusServiceClient extends IoTConsensusIService.Client
     @Override
     public PooledObject<SyncIoTConsensusServiceClient> makeObject(TEndPoint endpoint)
         throws Exception {
-      Constructor<SyncIoTConsensusServiceClient> constructor =
-          SyncIoTConsensusServiceClient.class.getConstructor(
-              TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass());
       return new DefaultPooledObject<>(
           SyncThriftClientWithErrorHandler.newErrorHandler(
               SyncIoTConsensusServiceClient.class,
-              constructor,
-              clientFactoryProperty.getProtocolFactory(),
-              clientFactoryProperty.getConnectionTimeoutMs(),
+              SyncIoTConsensusServiceClient.class.getConstructor(
+                  TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass()),
+              thriftClientProperty.getProtocolFactory(),
+              thriftClientProperty.getConnectionTimeoutMs(),
               endpoint,
               clientManager));
     }
@@ -137,8 +112,7 @@ public class SyncIoTConsensusServiceClient extends IoTConsensusIService.Client
     @Override
     public boolean validateObject(
         TEndPoint endpoint, PooledObject<SyncIoTConsensusServiceClient> pooledObject) {
-      return pooledObject.getObject() != null
-          && pooledObject.getObject().getInputProtocol().getTransport().isOpen();
+      return pooledObject.getObject().getInputProtocol().getTransport().isOpen();
     }
   }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java
index 7f0b11d117..80e80232da 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java
@@ -18,9 +18,8 @@
  */
 package org.apache.iotdb.consensus.ratis;
 
-import org.apache.iotdb.commons.client.BaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.factory.BaseClientFactory;
 import org.apache.iotdb.consensus.config.RatisConfig;
 
 import org.apache.commons.pool2.PooledObject;
@@ -41,6 +40,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
 public class RatisClient {
+
   private final Logger logger = LoggerFactory.getLogger(RatisClient.class);
   private final RaftGroup serveGroup;
   private final RaftClient raftClient;
@@ -59,7 +59,7 @@ public class RatisClient {
     return raftClient;
   }
 
-  public void close() {
+  private void close() {
     try {
       raftClient.close();
     } catch (IOException e) {
@@ -68,9 +68,7 @@ public class RatisClient {
   }
 
   public void returnSelf() {
-    if (clientManager != null) {
-      clientManager.returnClient(serveGroup, this);
-    }
+    clientManager.returnClient(serveGroup, this);
   }
 
   public static class Factory extends BaseClientFactory<RaftGroup, RatisClient> {
@@ -81,11 +79,10 @@ public class RatisClient {
 
     public Factory(
         ClientManager<RaftGroup, RatisClient> clientManager,
-        ClientFactoryProperty clientPoolProperty,
         RaftProperties raftProperties,
         RaftClientRpc clientRpc,
         Supplier<RatisConfig.Impl> config) {
-      super(clientManager, clientPoolProperty);
+      super(clientManager);
       this.raftProperties = raftProperties;
       this.clientRpc = clientRpc;
       this.config = config;
@@ -97,7 +94,7 @@ public class RatisClient {
     }
 
     @Override
-    public PooledObject<RatisClient> makeObject(RaftGroup group) throws Exception {
+    public PooledObject<RatisClient> makeObject(RaftGroup group) {
       return new DefaultPooledObject<>(
           new RatisClient(
               group,
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 4fd5c46e95..94be8ca634 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -21,12 +21,11 @@ package org.apache.iotdb.consensus.ratis;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
-import org.apache.iotdb.commons.client.ClientPoolProperty;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.IClientPoolFactory;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.client.property.ClientPoolProperty;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -106,9 +105,7 @@ class RatisConsensus implements IConsensus {
   private final RaftProperties properties = new RaftProperties();
   private final RaftClientRpc clientRpc;
 
-  private final IClientManager<RaftGroup, RatisClient> clientManager =
-      new IClientManager.Factory<RaftGroup, RatisClient>()
-          .createClientManager(new RatisClientPoolFactory());
+  private final IClientManager<RaftGroup, RatisClient> clientManager;
 
   private final Map<RaftGroupId, RaftGroup> lastSeen = new ConcurrentHashMap<>();
 
@@ -145,6 +142,10 @@ class RatisConsensus implements IConsensus {
     Utils.initRatisConfig(properties, config.getRatisConfig());
     this.config = config.getRatisConfig();
 
+    clientManager =
+        new IClientManager.Factory<RaftGroup, RatisClient>()
+            .createClientManager(new RatisClientPoolFactory());
+
     clientRpc = new GrpcFactory(new Parameters()).newRaftClientRpc(ClientId.randomId(), properties);
 
     server =
@@ -812,11 +813,7 @@ class RatisConsensus implements IConsensus {
         ClientManager<RaftGroup, RatisClient> manager) {
       return new GenericKeyedObjectPool<>(
           new RatisClient.Factory(
-              manager,
-              new ClientFactoryProperty.Builder().build(),
-              properties,
-              clientRpc,
-              MemoizedSupplier.valueOf(() -> config.getImpl())),
+              manager, properties, clientRpc, MemoizedSupplier.valueOf(config::getImpl)),
           new ClientPoolProperty.Builder<RatisClient>().build().getConfig());
     }
   }
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
index bd5bd2f247..af9d89866d 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
@@ -19,13 +19,13 @@
 package org.apache.iotdb.it.env;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
 import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.isession.ISession;
 import org.apache.iotdb.isession.SessionConfig;
 import org.apache.iotdb.it.framework.IoTDBTestLogger;
@@ -81,8 +81,7 @@ public abstract class AbstractEnv implements BaseEnv {
 
     clientManager =
         new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
-            .createClientManager(
-                new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
+            .createClientManager(new ClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
 
     final String testClassName = getTestClassName();
     final String testMethodName = getTestMethodName();
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
index 20040842e2..0099fb11c2 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
@@ -19,11 +19,11 @@
 package org.apache.iotdb.it.env;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.isession.ISession;
 import org.apache.iotdb.itbase.env.BaseEnv;
 import org.apache.iotdb.jdbc.Config;
@@ -61,8 +61,7 @@ public class RemoteServerEnv implements BaseEnv {
     }
     clientManager =
         new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
-            .createClientManager(
-                new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
+            .createClientManager(new ClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
   }
 
   @Override
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
index 66836967de..dc1f5fc83f 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.commons.client;
 
+import org.apache.iotdb.commons.client.exception.BorrowNullClientManagerException;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.utils.TestOnly;
 
@@ -45,6 +46,9 @@ public class ClientManager<K, V> implements IClientManager<K, V> {
 
   @Override
   public V borrowClient(K node) throws ClientManagerException {
+    if (node == null) {
+      throw new BorrowNullClientManagerException();
+    }
     try {
       return pool.borrowObject(node);
     } catch (Exception e) {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index 094470deb6..a795b20199 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -21,9 +21,15 @@ package org.apache.iotdb.commons.client;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.async.AsyncConfigNodeHeartbeatServiceClient;
+import org.apache.iotdb.commons.client.async.AsyncConfigNodeIServiceClient;
 import org.apache.iotdb.commons.client.async.AsyncDataNodeHeartbeatServiceClient;
 import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.async.AsyncDataNodeMPPDataExchangeServiceClient;
+import org.apache.iotdb.commons.client.property.ClientPoolProperty;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -37,18 +43,58 @@ public class ClientPoolFactory {
 
   private ClientPoolFactory() {}
 
+  public static class SyncConfigNodeIServiceClientPoolFactory
+      implements IClientPoolFactory<TEndPoint, SyncConfigNodeIServiceClient> {
+
+    @Override
+    public KeyedObjectPool<TEndPoint, SyncConfigNodeIServiceClient> createClientPool(
+        ClientManager<TEndPoint, SyncConfigNodeIServiceClient> manager) {
+      return new GenericKeyedObjectPool<>(
+          new SyncConfigNodeIServiceClient.Factory(
+              manager,
+              new ThriftClientProperty.Builder()
+                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+                  .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+                  .build()),
+          new ClientPoolProperty.Builder<SyncConfigNodeIServiceClient>().build().getConfig());
+    }
+  }
+
+  public static class AsyncConfigNodeIServiceClientPoolFactory
+      implements IClientPoolFactory<TEndPoint, AsyncConfigNodeIServiceClient> {
+
+    @Override
+    public KeyedObjectPool<TEndPoint, AsyncConfigNodeIServiceClient> createClientPool(
+        ClientManager<TEndPoint, AsyncConfigNodeIServiceClient> manager) {
+      return new GenericKeyedObjectPool<>(
+          new AsyncConfigNodeIServiceClient.Factory(
+              manager,
+              new ThriftClientProperty.Builder()
+                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+                  .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+                  .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+                  .build(),
+              ThreadName.ASYNC_CONFIGNODE_CLIENT_POOL.getName()),
+          new ClientPoolProperty.Builder<AsyncConfigNodeIServiceClient>()
+              .setMaxIdleClientForEachNode(conf.getMaxIdleClientForEachNode())
+              .setMaxTotalClientForEachNode(conf.getMaxTotalClientForEachNode())
+              .build()
+              .getConfig());
+    }
+  }
+
   public static class SyncDataNodeInternalServiceClientPoolFactory
       implements IClientPoolFactory<TEndPoint, SyncDataNodeInternalServiceClient> {
+
     @Override
     public KeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient> createClientPool(
         ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) {
       return new GenericKeyedObjectPool<>(
           new SyncDataNodeInternalServiceClient.Factory(
               manager,
-              new ClientFactoryProperty.Builder()
-                  .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS())
-                  .setRpcThriftCompressionEnabled(conf.isCnRpcThriftCompressionEnabled())
-                  .setSelectorNumOfAsyncClientManager(conf.getCnSelectorNumOfClientManager())
+              new ThriftClientProperty.Builder()
+                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+                  .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
                   .build()),
           new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>().build().getConfig());
     }
@@ -56,16 +102,17 @@ public class ClientPoolFactory {
 
   public static class AsyncDataNodeInternalServiceClientPoolFactory
       implements IClientPoolFactory<TEndPoint, AsyncDataNodeInternalServiceClient> {
+
     @Override
     public KeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> createClientPool(
         ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> manager) {
       return new GenericKeyedObjectPool<>(
           new AsyncDataNodeInternalServiceClient.Factory(
               manager,
-              new ClientFactoryProperty.Builder()
-                  .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS())
-                  .setRpcThriftCompressionEnabled(conf.isCnRpcThriftCompressionEnabled())
-                  .setSelectorNumOfAsyncClientManager(conf.getCnSelectorNumOfClientManager())
+              new ThriftClientProperty.Builder()
+                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+                  .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+                  .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
                   .build(),
               ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
           new ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>().build().getConfig());
@@ -74,16 +121,17 @@ public class ClientPoolFactory {
 
   public static class AsyncConfigNodeHeartbeatServiceClientPoolFactory
       implements IClientPoolFactory<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> {
+
     @Override
     public KeyedObjectPool<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> createClientPool(
         ClientManager<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> manager) {
       return new GenericKeyedObjectPool<>(
           new AsyncConfigNodeHeartbeatServiceClient.Factory(
               manager,
-              new ClientFactoryProperty.Builder()
-                  .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS())
-                  .setRpcThriftCompressionEnabled(conf.isCnRpcThriftCompressionEnabled())
-                  .setSelectorNumOfAsyncClientManager(conf.getCnSelectorNumOfClientManager())
+              new ThriftClientProperty.Builder()
+                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+                  .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+                  .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
                   .build(),
               ThreadName.ASYNC_CONFIGNODE_HEARTBEAT_CLIENT_POOL.getName()),
           new ClientPoolProperty.Builder<AsyncConfigNodeHeartbeatServiceClient>()
@@ -94,16 +142,17 @@ public class ClientPoolFactory {
 
   public static class AsyncDataNodeHeartbeatServiceClientPoolFactory
       implements IClientPoolFactory<TEndPoint, AsyncDataNodeHeartbeatServiceClient> {
+
     @Override
     public KeyedObjectPool<TEndPoint, AsyncDataNodeHeartbeatServiceClient> createClientPool(
         ClientManager<TEndPoint, AsyncDataNodeHeartbeatServiceClient> manager) {
       return new GenericKeyedObjectPool<>(
           new AsyncDataNodeHeartbeatServiceClient.Factory(
               manager,
-              new ClientFactoryProperty.Builder()
-                  .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS())
-                  .setRpcThriftCompressionEnabled(conf.isCnRpcThriftCompressionEnabled())
-                  .setSelectorNumOfAsyncClientManager(conf.getCnSelectorNumOfClientManager())
+              new ThriftClientProperty.Builder()
+                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+                  .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+                  .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
                   .build(),
               ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()),
           new ClientPoolProperty.Builder<AsyncDataNodeHeartbeatServiceClient>()
@@ -111,4 +160,44 @@ public class ClientPoolFactory {
               .getConfig());
     }
   }
+
+  public static class SyncDataNodeMPPDataExchangeServiceClientPoolFactory
+      implements IClientPoolFactory<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> {
+
+    @Override
+    public KeyedObjectPool<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> createClientPool(
+        ClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> manager) {
+      return new GenericKeyedObjectPool<>(
+          new SyncDataNodeMPPDataExchangeServiceClient.Factory(
+              manager,
+              new ThriftClientProperty.Builder()
+                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+                  .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+                  .build()),
+          new ClientPoolProperty.Builder<SyncDataNodeMPPDataExchangeServiceClient>()
+              .build()
+              .getConfig());
+    }
+  }
+
+  public static class AsyncDataNodeMPPDataExchangeServiceClientPoolFactory
+      implements IClientPoolFactory<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> {
+
+    @Override
+    public KeyedObjectPool<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> createClientPool(
+        ClientManager<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> manager) {
+      return new GenericKeyedObjectPool<>(
+          new AsyncDataNodeMPPDataExchangeServiceClient.Factory(
+              manager,
+              new ThriftClientProperty.Builder()
+                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+                  .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+                  .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+                  .build(),
+              ThreadName.ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL.getName()),
+          new ClientPoolProperty.Builder<AsyncDataNodeMPPDataExchangeServiceClient>()
+              .build()
+              .getConfig());
+    }
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
new file mode 100644
index 0000000000..b8124f939f
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.net.SocketException;
+
+/**
+ * This class defines the failed interfaces that thrift client needs to support so that the Thrift
+ * Client can clean up the clientManager when it receives the corresponding exception.
+ */
+public interface ThriftClient {
+
+  Logger logger = LoggerFactory.getLogger(ThriftClient.class);
+
+  /** Close this connection. */
+  void invalidate();
+
+  /** Removing all pooled instances corresponding to current instance's endpoint. */
+  void invalidateAll();
+
+  static void resolveException(Throwable t, ThriftClient o) {
+    Throwable origin = t;
+    if (t instanceof InvocationTargetException) {
+      origin = ((InvocationTargetException) t).getTargetException();
+    }
+    Throwable cur = origin;
+    if (cur instanceof TException) {
+      int level = 0;
+      while (cur != null) {
+        logger.debug(
+            "level-{} Exception class {}, message {}",
+            level,
+            cur.getClass().getName(),
+            cur.getMessage());
+        cur = cur.getCause();
+        level++;
+      }
+      o.invalidate();
+    }
+
+    Throwable rootCause = ExceptionUtils.getRootCause(origin);
+    if (rootCause != null) {
+      // if the exception is SocketException and its error message is Broken pipe, it means that
+      // the remote node may restart and all the connection we cached before should be cleared.
+      logger.debug(
+          "root cause message {}, LocalizedMessage {}, ",
+          rootCause.getMessage(),
+          rootCause.getLocalizedMessage(),
+          rootCause);
+      if (isConnectionBroken(rootCause)) {
+        logger.debug(
+            "Broken pipe error happened in sending RPC,"
+                + " we need to clear all previous cached connection",
+            t);
+        o.invalidateAll();
+      }
+    }
+  }
+
+  static boolean isConnectionBroken(Throwable cause) {
+    return (cause instanceof SocketException && cause.getMessage().contains("Broken pipe"))
+        || (cause instanceof TTransportException
+            && cause.getMessage().contains("Socket is closed by peer"));
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java
index 38b12d2370..86de8b6f02 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java
@@ -20,9 +20,10 @@
 package org.apache.iotdb.commons.client.async;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.AsyncBaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
 import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
 
@@ -33,7 +34,8 @@ import org.apache.thrift.protocol.TProtocolFactory;
 
 import java.io.IOException;
 
-public class AsyncConfigNodeHeartbeatServiceClient extends IConfigNodeRPCService.AsyncClient {
+public class AsyncConfigNodeHeartbeatServiceClient extends IConfigNodeRPCService.AsyncClient
+    implements ThriftClient {
 
   private final TEndPoint endpoint;
   private final ClientManager<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> clientManager;
@@ -53,41 +55,44 @@ public class AsyncConfigNodeHeartbeatServiceClient extends IConfigNodeRPCService
     this.clientManager = clientManager;
   }
 
-  public void close() {
-    ___transport.close();
-    ___currentMethod = null;
-  }
-
-  /**
-   * return self if clientManager is not null, the method doesn't need to call by user, it will
-   * trigger once client transport complete.
-   */
-  private void returnSelf() {
-    if (clientManager != null) {
-      clientManager.returnClient(endpoint, this);
-    }
-  }
-
-  /**
-   * This method will be automatically called by the thrift selector thread, and we'll just simulate
-   * the behavior in our test
-   */
   @Override
   public void onComplete() {
     super.onComplete();
     returnSelf();
   }
 
-  /**
-   * This method will be automatically called by the thrift selector thread, and we'll just simulate
-   * the behavior in our test
-   */
   @Override
   public void onError(Exception e) {
     super.onError(e);
+    ThriftClient.resolveException(e, this);
     returnSelf();
   }
 
+  @Override
+  public void invalidate() {
+    if (!hasError()) {
+      super.onError(new Exception("This client has been invalidated"));
+    }
+  }
+
+  @Override
+  public void invalidateAll() {
+    clientManager.clear(endpoint);
+  }
+
+  /**
+   * return self, the method doesn't need to be called by the user and will be triggered after the
+   * RPC is finished.
+   */
+  private void returnSelf() {
+    clientManager.returnClient(endpoint, this);
+  }
+
+  private void close() {
+    ___transport.close();
+    ___currentMethod = null;
+  }
+
   public boolean isReady() {
     try {
       checkReady();
@@ -103,13 +108,13 @@ public class AsyncConfigNodeHeartbeatServiceClient extends IConfigNodeRPCService
   }
 
   public static class Factory
-      extends AsyncBaseClientFactory<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> {
+      extends AsyncThriftClientFactory<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> {
 
     public Factory(
         ClientManager<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> clientManager,
-        ClientFactoryProperty clientFactoryProperty,
+        ThriftClientProperty thriftClientProperty,
         String threadName) {
-      super(clientManager, clientFactoryProperty, threadName);
+      super(clientManager, thriftClientProperty, threadName);
     }
 
     @Override
@@ -121,21 +126,19 @@ public class AsyncConfigNodeHeartbeatServiceClient extends IConfigNodeRPCService
     @Override
     public PooledObject<AsyncConfigNodeHeartbeatServiceClient> makeObject(TEndPoint endPoint)
         throws Exception {
-      TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % tManagers.length];
-      tManager = tManager == null ? new TAsyncClientManager() : tManager;
       return new DefaultPooledObject<>(
           new AsyncConfigNodeHeartbeatServiceClient(
-              clientFactoryProperty.getProtocolFactory(),
-              clientFactoryProperty.getConnectionTimeoutMs(),
+              thriftClientProperty.getProtocolFactory(),
+              thriftClientProperty.getConnectionTimeoutMs(),
               endPoint,
-              tManager,
+              tManagers[clientCnt.incrementAndGet() % tManagers.length],
               clientManager));
     }
 
     @Override
     public boolean validateObject(
         TEndPoint endPoint, PooledObject<AsyncConfigNodeHeartbeatServiceClient> pooledObject) {
-      return pooledObject.getObject() != null && pooledObject.getObject().isReady();
+      return pooledObject.getObject().isReady();
     }
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
index 32219dcbfb..91d64ab257 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
@@ -20,9 +20,10 @@
 package org.apache.iotdb.commons.client.async;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.AsyncBaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
 import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
 
@@ -35,7 +36,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
-public class AsyncConfigNodeIServiceClient extends IConfigNodeRPCService.AsyncClient {
+public class AsyncConfigNodeIServiceClient extends IConfigNodeRPCService.AsyncClient
+    implements ThriftClient {
 
   private static final Logger logger = LoggerFactory.getLogger(AsyncConfigNodeIServiceClient.class);
 
@@ -57,47 +59,50 @@ public class AsyncConfigNodeIServiceClient extends IConfigNodeRPCService.AsyncCl
     this.clientManager = clientManager;
   }
 
-  public void close() {
-    ___transport.close();
-    ___currentMethod = null;
-  }
-
-  /**
-   * return self if clientManager is not null, the method doesn't need to call by user, it will
-   * trigger once client transport complete.
-   */
-  private void returnSelf() {
-    if (clientManager != null) {
-      clientManager.returnClient(endpoint, this);
-    }
-  }
-
-  /**
-   * This method will be automatically called by the thrift selector thread, and we'll just simulate
-   * the behavior in our test
-   */
   @Override
   public void onComplete() {
     super.onComplete();
     returnSelf();
   }
 
-  /**
-   * This method will be automatically called by the thrift selector thread, and we'll just simulate
-   * the behavior in our test
-   */
   @Override
   public void onError(Exception e) {
     super.onError(e);
+    ThriftClient.resolveException(e, this);
     returnSelf();
   }
 
+  @Override
+  public void invalidate() {
+    if (!hasError()) {
+      super.onError(new Exception("This client has been invalidated"));
+    }
+  }
+
+  @Override
+  public void invalidateAll() {
+    clientManager.clear(endpoint);
+  }
+
+  /**
+   * return self, the method doesn't need to be called by the user and will be triggered after the
+   * RPC is finished.
+   */
+  private void returnSelf() {
+    clientManager.returnClient(endpoint, this);
+  }
+
+  private void close() {
+    ___transport.close();
+    ___currentMethod = null;
+  }
+
   public boolean isReady() {
     try {
       checkReady();
       return true;
     } catch (Exception e) {
-      logger.info("Unexpected exception occurs in {} :", this, e);
+      logger.error("Unexpected exception occurs in {} : {}", this, e.getMessage());
       return false;
     }
   }
@@ -108,13 +113,13 @@ public class AsyncConfigNodeIServiceClient extends IConfigNodeRPCService.AsyncCl
   }
 
   public static class Factory
-      extends AsyncBaseClientFactory<TEndPoint, AsyncConfigNodeIServiceClient> {
+      extends AsyncThriftClientFactory<TEndPoint, AsyncConfigNodeIServiceClient> {
 
     public Factory(
         ClientManager<TEndPoint, AsyncConfigNodeIServiceClient> clientManager,
-        ClientFactoryProperty clientFactoryProperty,
+        ThriftClientProperty thriftClientProperty,
         String threadName) {
-      super(clientManager, clientFactoryProperty, threadName);
+      super(clientManager, thriftClientProperty, threadName);
     }
 
     @Override
@@ -126,21 +131,19 @@ public class AsyncConfigNodeIServiceClient extends IConfigNodeRPCService.AsyncCl
     @Override
     public PooledObject<AsyncConfigNodeIServiceClient> makeObject(TEndPoint endPoint)
         throws Exception {
-      TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % tManagers.length];
-      tManager = tManager == null ? new TAsyncClientManager() : tManager;
       return new DefaultPooledObject<>(
           new AsyncConfigNodeIServiceClient(
-              clientFactoryProperty.getProtocolFactory(),
-              clientFactoryProperty.getConnectionTimeoutMs(),
+              thriftClientProperty.getProtocolFactory(),
+              thriftClientProperty.getConnectionTimeoutMs(),
               endPoint,
-              tManager,
+              tManagers[clientCnt.incrementAndGet() % tManagers.length],
               clientManager));
     }
 
     @Override
     public boolean validateObject(
         TEndPoint endPoint, PooledObject<AsyncConfigNodeIServiceClient> pooledObject) {
-      return pooledObject.getObject() != null && pooledObject.getObject().isReady();
+      return pooledObject.getObject().isReady();
     }
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java
index 2812886336..50c0540fd0 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java
@@ -20,9 +20,10 @@
 package org.apache.iotdb.commons.client.async;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.AsyncBaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
 import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
 
@@ -33,7 +34,8 @@ import org.apache.thrift.protocol.TProtocolFactory;
 
 import java.io.IOException;
 
-public class AsyncDataNodeHeartbeatServiceClient extends IDataNodeRPCService.AsyncClient {
+public class AsyncDataNodeHeartbeatServiceClient extends IDataNodeRPCService.AsyncClient
+    implements ThriftClient {
 
   private final TEndPoint endpoint;
   private final ClientManager<TEndPoint, AsyncDataNodeHeartbeatServiceClient> clientManager;
@@ -53,41 +55,44 @@ public class AsyncDataNodeHeartbeatServiceClient extends IDataNodeRPCService.Asy
     this.clientManager = clientManager;
   }
 
-  public void close() {
-    ___transport.close();
-    ___currentMethod = null;
-  }
-
-  /**
-   * return self if clientManager is not null, the method doesn't need to call by user, it will
-   * trigger once client transport complete.
-   */
-  private void returnSelf() {
-    if (clientManager != null) {
-      clientManager.returnClient(endpoint, this);
-    }
-  }
-
-  /**
-   * This method will be automatically called by the thrift selector thread, and we'll just simulate
-   * the behavior in our test
-   */
   @Override
   public void onComplete() {
     super.onComplete();
     returnSelf();
   }
 
-  /**
-   * This method will be automatically called by the thrift selector thread, and we'll just simulate
-   * the behavior in our test
-   */
   @Override
   public void onError(Exception e) {
     super.onError(e);
+    ThriftClient.resolveException(e, this);
     returnSelf();
   }
 
+  @Override
+  public void invalidate() {
+    if (!hasError()) {
+      super.onError(new Exception("This client has been invalidated"));
+    }
+  }
+
+  @Override
+  public void invalidateAll() {
+    clientManager.clear(endpoint);
+  }
+
+  /**
+   * return self, the method doesn't need to be called by the user and will be triggered after the
+   * RPC is finished.
+   */
+  private void returnSelf() {
+    clientManager.returnClient(endpoint, this);
+  }
+
+  private void close() {
+    ___transport.close();
+    ___currentMethod = null;
+  }
+
   public boolean isReady() {
     try {
       checkReady();
@@ -103,13 +108,13 @@ public class AsyncDataNodeHeartbeatServiceClient extends IDataNodeRPCService.Asy
   }
 
   public static class Factory
-      extends AsyncBaseClientFactory<TEndPoint, AsyncDataNodeHeartbeatServiceClient> {
+      extends AsyncThriftClientFactory<TEndPoint, AsyncDataNodeHeartbeatServiceClient> {
 
     public Factory(
         ClientManager<TEndPoint, AsyncDataNodeHeartbeatServiceClient> clientManager,
-        ClientFactoryProperty clientFactoryProperty,
+        ThriftClientProperty thriftClientProperty,
         String threadName) {
-      super(clientManager, clientFactoryProperty, threadName);
+      super(clientManager, thriftClientProperty, threadName);
     }
 
     @Override
@@ -121,21 +126,19 @@ public class AsyncDataNodeHeartbeatServiceClient extends IDataNodeRPCService.Asy
     @Override
     public PooledObject<AsyncDataNodeHeartbeatServiceClient> makeObject(TEndPoint endPoint)
         throws Exception {
-      TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % tManagers.length];
-      tManager = tManager == null ? new TAsyncClientManager() : tManager;
       return new DefaultPooledObject<>(
           new AsyncDataNodeHeartbeatServiceClient(
-              clientFactoryProperty.getProtocolFactory(),
-              clientFactoryProperty.getConnectionTimeoutMs(),
+              thriftClientProperty.getProtocolFactory(),
+              thriftClientProperty.getConnectionTimeoutMs(),
               endPoint,
-              tManager,
+              tManagers[clientCnt.incrementAndGet() % tManagers.length],
               clientManager));
     }
 
     @Override
     public boolean validateObject(
         TEndPoint endPoint, PooledObject<AsyncDataNodeHeartbeatServiceClient> pooledObject) {
-      return pooledObject.getObject() != null && pooledObject.getObject().isReady();
+      return pooledObject.getObject().isReady();
     }
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
index 5329a4d9f2..46c6eec275 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
@@ -20,9 +20,10 @@
 package org.apache.iotdb.commons.client.async;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.AsyncBaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
 import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
@@ -36,7 +37,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
-public class AsyncDataNodeInternalServiceClient extends IDataNodeRPCService.AsyncClient {
+public class AsyncDataNodeInternalServiceClient extends IDataNodeRPCService.AsyncClient
+    implements ThriftClient {
 
   private static final Logger logger =
       LoggerFactory.getLogger(AsyncDataNodeInternalServiceClient.class);
@@ -69,41 +71,44 @@ public class AsyncDataNodeInternalServiceClient extends IDataNodeRPCService.Asyn
     return clientManager;
   }
 
-  public void close() {
-    ___transport.close();
-    ___currentMethod = null;
-  }
-
-  /**
-   * return self if clientManager is not null, the method doesn't need to call by user, it will
-   * trigger once client transport complete.
-   */
-  private void returnSelf() {
-    if (clientManager != null) {
-      clientManager.returnClient(endpoint, this);
-    }
-  }
-
-  /**
-   * This method will be automatically called by the thrift selector thread, and we'll just simulate
-   * the behavior in our test
-   */
   @Override
   public void onComplete() {
     super.onComplete();
     returnSelf();
   }
 
-  /**
-   * This method will be automatically called by the thrift selector thread, and we'll just simulate
-   * the behavior in our test
-   */
   @Override
   public void onError(Exception e) {
     super.onError(e);
+    ThriftClient.resolveException(e, this);
     returnSelf();
   }
 
+  @Override
+  public void invalidate() {
+    if (!hasError()) {
+      super.onError(new Exception("This client has been invalidated"));
+    }
+  }
+
+  @Override
+  public void invalidateAll() {
+    clientManager.clear(endpoint);
+  }
+
+  /**
+   * return self, the method doesn't need to be called by the user and will be triggered after the
+   * RPC is finished.
+   */
+  private void returnSelf() {
+    clientManager.returnClient(endpoint, this);
+  }
+
+  private void close() {
+    ___transport.close();
+    ___currentMethod = null;
+  }
+
   public boolean isReady() {
     try {
       checkReady();
@@ -120,13 +125,13 @@ public class AsyncDataNodeInternalServiceClient extends IDataNodeRPCService.Asyn
   }
 
   public static class Factory
-      extends AsyncBaseClientFactory<TEndPoint, AsyncDataNodeInternalServiceClient> {
+      extends AsyncThriftClientFactory<TEndPoint, AsyncDataNodeInternalServiceClient> {
 
     public Factory(
         ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> clientManager,
-        ClientFactoryProperty clientFactoryProperty,
+        ThriftClientProperty thriftClientProperty,
         String threadName) {
-      super(clientManager, clientFactoryProperty, threadName);
+      super(clientManager, thriftClientProperty, threadName);
     }
 
     @Override
@@ -138,21 +143,19 @@ public class AsyncDataNodeInternalServiceClient extends IDataNodeRPCService.Asyn
     @Override
     public PooledObject<AsyncDataNodeInternalServiceClient> makeObject(TEndPoint endPoint)
         throws Exception {
-      TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % tManagers.length];
-      tManager = tManager == null ? new TAsyncClientManager() : tManager;
       return new DefaultPooledObject<>(
           new AsyncDataNodeInternalServiceClient(
-              clientFactoryProperty.getProtocolFactory(),
-              clientFactoryProperty.getConnectionTimeoutMs(),
+              thriftClientProperty.getProtocolFactory(),
+              thriftClientProperty.getConnectionTimeoutMs(),
               endPoint,
-              tManager,
+              tManagers[clientCnt.incrementAndGet() % tManagers.length],
               clientManager));
     }
 
     @Override
     public boolean validateObject(
         TEndPoint endPoint, PooledObject<AsyncDataNodeInternalServiceClient> pooledObject) {
-      return pooledObject.getObject() != null && pooledObject.getObject().isReady();
+      return pooledObject.getObject().isReady();
     }
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
index 667f3189b1..7b056fc350 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
@@ -20,9 +20,10 @@
 package org.apache.iotdb.commons.client.async;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.AsyncBaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.mpp.rpc.thrift.MPPDataExchangeService;
 import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
 
@@ -35,7 +36,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
-public class AsyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeService.AsyncClient {
+public class AsyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeService.AsyncClient
+    implements ThriftClient {
 
   private static final Logger logger =
       LoggerFactory.getLogger(AsyncDataNodeMPPDataExchangeServiceClient.class);
@@ -58,47 +60,50 @@ public class AsyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSe
     this.clientManager = clientManager;
   }
 
-  public void close() {
-    ___transport.close();
-    ___currentMethod = null;
-  }
-
-  /**
-   * return self if clientManager is not null, the method doesn't need to call by user, it will
-   * trigger once client transport complete.
-   */
-  private void returnSelf() {
-    if (clientManager != null) {
-      clientManager.returnClient(endpoint, this);
-    }
-  }
-
-  /**
-   * This method will be automatically called by the thrift selector thread, and we'll just simulate
-   * the behavior in our test
-   */
   @Override
   public void onComplete() {
     super.onComplete();
     returnSelf();
   }
 
-  /**
-   * This method will be automatically called by the thrift selector thread, and we'll just simulate
-   * the behavior in our test
-   */
   @Override
   public void onError(Exception e) {
     super.onError(e);
+    ThriftClient.resolveException(e, this);
     returnSelf();
   }
 
-  public boolean isReady() {
+  @Override
+  public void invalidate() {
+    if (!hasError()) {
+      super.onError(new Exception("This client has been invalidated"));
+    }
+  }
+
+  @Override
+  public void invalidateAll() {
+    clientManager.clear(endpoint);
+  }
+
+  /**
+   * return self, the method doesn't need to be called by the user and will be triggered after the
+   * RPC is finished.
+   */
+  private void returnSelf() {
+    clientManager.returnClient(endpoint, this);
+  }
+
+  private void close() {
+    ___transport.close();
+    ___currentMethod = null;
+  }
+
+  private boolean isReady() {
     try {
       checkReady();
       return true;
     } catch (Exception e) {
-      logger.info("Unexpected exception occurs in {} :", this, e);
+      logger.error("Unexpected exception occurs in {} : {}", this, e.getMessage());
       return false;
     }
   }
@@ -109,13 +114,13 @@ public class AsyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSe
   }
 
   public static class Factory
-      extends AsyncBaseClientFactory<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> {
+      extends AsyncThriftClientFactory<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> {
 
     public Factory(
         ClientManager<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> clientManager,
-        ClientFactoryProperty clientFactoryProperty,
+        ThriftClientProperty thriftClientProperty,
         String threadName) {
-      super(clientManager, clientFactoryProperty, threadName);
+      super(clientManager, thriftClientProperty, threadName);
     }
 
     @Override
@@ -127,21 +132,19 @@ public class AsyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSe
     @Override
     public PooledObject<AsyncDataNodeMPPDataExchangeServiceClient> makeObject(TEndPoint endPoint)
         throws Exception {
-      TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % tManagers.length];
-      tManager = tManager == null ? new TAsyncClientManager() : tManager;
       return new DefaultPooledObject<>(
           new AsyncDataNodeMPPDataExchangeServiceClient(
-              clientFactoryProperty.getProtocolFactory(),
-              clientFactoryProperty.getConnectionTimeoutMs(),
+              thriftClientProperty.getProtocolFactory(),
+              thriftClientProperty.getConnectionTimeoutMs(),
               endPoint,
-              tManager,
+              tManagers[clientCnt.incrementAndGet() % tManagers.length],
               clientManager));
     }
 
     @Override
     public boolean validateObject(
         TEndPoint endPoint, PooledObject<AsyncDataNodeMPPDataExchangeServiceClient> pooledObject) {
-      return pooledObject.getObject() != null && pooledObject.getObject().isReady();
+      return pooledObject.getObject().isReady();
     }
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/BorrowNullClientManagerException.java
similarity index 74%
rename from node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/client/exception/BorrowNullClientManagerException.java
index ff3862f879..4cdf92fa4a 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/BorrowNullClientManagerException.java
@@ -16,16 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.commons.client.sync;
 
-public interface SyncThriftClient {
+package org.apache.iotdb.commons.client.exception;
 
-  /** close the connection */
-  void invalidate();
+public class BorrowNullClientManagerException extends ClientManagerException {
 
-  /**
-   * Clears the specified pool, removing all pooled instances corresponding to current instance's
-   * endPoint.
-   */
-  void invalidateAll();
+  private static final String MESSAGE = "Can not borrow client for node null";
+
+  public BorrowNullClientManagerException() {
+    super(MESSAGE);
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java
index 439f25b655..7da6297305 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java
@@ -20,7 +20,12 @@
 package org.apache.iotdb.commons.client.exception;
 
 public class ClientManagerException extends Exception {
+
   public ClientManagerException(Exception exception) {
     super(exception);
   }
+
+  public ClientManagerException(String message) {
+    super(message);
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/CreateTAsyncClientManagerException.java
similarity index 82%
copy from node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/client/exception/CreateTAsyncClientManagerException.java
index 439f25b655..075e0cfb11 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/CreateTAsyncClientManagerException.java
@@ -19,8 +19,9 @@
 
 package org.apache.iotdb.commons.client.exception;
 
-public class ClientManagerException extends Exception {
-  public ClientManagerException(Exception exception) {
-    super(exception);
+public class CreateTAsyncClientManagerException extends RuntimeException {
+
+  public CreateTAsyncClientManagerException(String message, Throwable cause) {
+    super(message, cause);
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/factory/AsyncThriftClientFactory.java
similarity index 50%
rename from node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/client/factory/AsyncThriftClientFactory.java
index 73fd0dca22..afde802f4c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/factory/AsyncThriftClientFactory.java
@@ -17,41 +17,41 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.client;
+package org.apache.iotdb.commons.client.factory;
+
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.exception.CreateTAsyncClientManagerException;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 
 import org.apache.thrift.async.TAsyncClientManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
-public abstract class AsyncBaseClientFactory<K, V> extends BaseClientFactory<K, V> {
+public abstract class AsyncThriftClientFactory<K, V> extends ThriftClientFactory<K, V> {
 
-  private static final Logger logger = LoggerFactory.getLogger(AsyncBaseClientFactory.class);
-  protected TAsyncClientManager[] tManagers;
-  protected AtomicInteger clientCnt = new AtomicInteger();
+  protected final TAsyncClientManager[] tManagers;
+  protected final AtomicInteger clientCnt = new AtomicInteger();
   private static final String THRIFT_THREAD_NAME = "TAsyncClientManager#SelectorThread";
 
-  protected AsyncBaseClientFactory(
+  protected AsyncThriftClientFactory(
       ClientManager<K, V> clientManager,
-      ClientFactoryProperty clientFactoryProperty,
+      ThriftClientProperty thriftClientProperty,
       String threadName) {
-    super(clientManager, clientFactoryProperty);
-    synchronized (this) {
-      tManagers = new TAsyncClientManager[clientFactoryProperty.getSelectorNumOfAsyncClientPool()];
+    super(clientManager, thriftClientProperty);
+    try {
+      tManagers = new TAsyncClientManager[thriftClientProperty.getSelectorNumOfAsyncClientPool()];
       for (int i = 0; i < tManagers.length; i++) {
-        try {
-          tManagers[i] = new TAsyncClientManager();
-        } catch (IOException e) {
-          logger.error("Cannot create Async client factory", e);
-        }
+        tManagers[i] = new TAsyncClientManager();
       }
-      Thread.getAllStackTraces().keySet().stream()
-          .filter(thread -> thread.getName().contains(THRIFT_THREAD_NAME))
-          .collect(Collectors.toList())
-          .forEach(thread -> thread.setName(threadName + "-selector" + "-" + thread.getId()));
+    } catch (IOException e) {
+      throw new CreateTAsyncClientManagerException(
+          String.format("Cannot create Async thrift client factory %s", threadName), e);
     }
+    Thread.getAllStackTraces().keySet().stream()
+        .filter(thread -> thread.getName().contains(THRIFT_THREAD_NAME))
+        .collect(Collectors.toList())
+        .forEach(thread -> thread.setName(threadName + "-selector-" + thread.getId()));
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/factory/BaseClientFactory.java
similarity index 82%
rename from node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/client/factory/BaseClientFactory.java
index 510d495cae..680aa2efcd 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/factory/BaseClientFactory.java
@@ -17,7 +17,9 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.client;
+package org.apache.iotdb.commons.client.factory;
+
+import org.apache.iotdb.commons.client.ClientManager;
 
 import org.apache.commons.pool2.KeyedPooledObjectFactory;
 import org.apache.commons.pool2.PooledObject;
@@ -25,12 +27,9 @@ import org.apache.commons.pool2.PooledObject;
 public abstract class BaseClientFactory<K, V> implements KeyedPooledObjectFactory<K, V> {
 
   protected ClientManager<K, V> clientManager;
-  protected ClientFactoryProperty clientFactoryProperty;
 
-  protected BaseClientFactory(
-      ClientManager<K, V> clientManager, ClientFactoryProperty clientFactoryProperty) {
+  protected BaseClientFactory(ClientManager<K, V> clientManager) {
     this.clientManager = clientManager;
-    this.clientFactoryProperty = clientFactoryProperty;
   }
 
   @Override
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/factory/ThriftClientFactory.java
similarity index 61%
copy from node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/client/factory/ThriftClientFactory.java
index 439f25b655..8c6c178279 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/factory/ThriftClientFactory.java
@@ -17,10 +17,18 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.client.exception;
+package org.apache.iotdb.commons.client.factory;
 
-public class ClientManagerException extends Exception {
-  public ClientManagerException(Exception exception) {
-    super(exception);
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+
+public abstract class ThriftClientFactory<K, V> extends BaseClientFactory<K, V> {
+
+  protected ThriftClientProperty thriftClientProperty;
+
+  protected ThriftClientFactory(
+      ClientManager<K, V> clientManager, ThriftClientProperty thriftClientProperty) {
+    super(clientManager);
+    this.thriftClientProperty = thriftClientProperty;
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolProperty.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
similarity index 94%
rename from node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolProperty.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
index 45870e9ba6..fc9ae843f9 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolProperty.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.client;
+package org.apache.iotdb.commons.client.property;
 
 import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
 
@@ -84,7 +84,7 @@ public class ClientPoolProperty<V> {
     private DefaultProperty() {}
 
     public static final long WAIT_CLIENT_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10);
-    public static final int MAX_TOTAL_CLIENT_FOR_EACH_NODE = 100;
-    public static final int MAX_IDLE_CLIENT_FOR_EACH_NODE = 100;
+    public static final int MAX_TOTAL_CLIENT_FOR_EACH_NODE = 300;
+    public static final int MAX_IDLE_CLIENT_FOR_EACH_NODE = 200;
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java
similarity index 93%
rename from node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java
index 363decbf2d..c54c23fbd7 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.client;
+package org.apache.iotdb.commons.client.property;
 
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
@@ -25,13 +25,13 @@ import org.apache.thrift.protocol.TProtocolFactory;
 
 import java.util.concurrent.TimeUnit;
 
-public class ClientFactoryProperty {
+public class ThriftClientProperty {
 
   private final TProtocolFactory protocolFactory;
   private final int connectionTimeoutMs;
   private final int selectorNumOfAsyncClientPool;
 
-  public ClientFactoryProperty(
+  public ThriftClientProperty(
       TProtocolFactory protocolFactory, int connectionTimeoutMs, int selectorNumOfAsyncClientPool) {
     this.protocolFactory = protocolFactory;
     this.connectionTimeoutMs = connectionTimeoutMs;
@@ -75,8 +75,8 @@ public class ClientFactoryProperty {
       return this;
     }
 
-    public ClientFactoryProperty build() {
-      return new ClientFactoryProperty(
+    public ThriftClientProperty build() {
+      return new ThriftClientProperty(
           rpcThriftCompressionEnabled
               ? new TCompactProtocol.Factory()
               : new TBinaryProtocol.Factory(),
@@ -90,7 +90,7 @@ public class ClientFactoryProperty {
     private DefaultProperty() {}
 
     public static final boolean RPC_THRIFT_COMPRESSED_ENABLED = false;
-    public static final int CONNECTION_TIMEOUT_MS = (int) TimeUnit.SECONDS.toMillis(30);
+    public static final int CONNECTION_TIMEOUT_MS = (int) TimeUnit.SECONDS.toMillis(20);
     public static final int SELECTOR_NUM_OF_ASYNC_CLIENT_MANAGER = 1;
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
index 522470d8be..6f6024ccd2 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
@@ -20,9 +20,10 @@
 package org.apache.iotdb.commons.client.sync;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.BaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
 import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.rpc.TConfigurationConst;
@@ -34,13 +35,12 @@ import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
-import java.lang.reflect.Constructor;
 import java.net.SocketException;
 
 public class SyncConfigNodeIServiceClient extends IConfigNodeRPCService.Client
-    implements SyncThriftClient, AutoCloseable {
+    implements ThriftClient, AutoCloseable {
 
-  private final TEndPoint endPoint;
+  private final TEndPoint endpoint;
   private final ClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager;
 
   public SyncConfigNodeIServiceClient(
@@ -57,15 +57,13 @@ public class SyncConfigNodeIServiceClient extends IConfigNodeRPCService.Client
                     endPoint.getIp(),
                     endPoint.getPort(),
                     connectionTimeout))));
-    this.endPoint = endPoint;
+    this.endpoint = endPoint;
     this.clientManager = clientManager;
     getInputProtocol().getTransport().open();
   }
 
-  public void close() {
-    if (clientManager != null) {
-      clientManager.returnClient(endPoint, this);
-    }
+  public int getTimeout() throws SocketException {
+    return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
   }
 
   public void setTimeout(int timeout) {
@@ -73,30 +71,32 @@ public class SyncConfigNodeIServiceClient extends IConfigNodeRPCService.Client
     ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
   }
 
+  @Override
+  public void close() {
+    clientManager.returnClient(endpoint, this);
+  }
+
+  @Override
   public void invalidate() {
     getInputProtocol().getTransport().close();
   }
 
   @Override
   public void invalidateAll() {
-    clientManager.clear(endPoint);
-  }
-
-  public int getTimeout() throws SocketException {
-    return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
+    clientManager.clear(endpoint);
   }
 
   @Override
   public String toString() {
-    return String.format("SyncConfigNodeIServiceClient{%s}", endPoint);
+    return String.format("SyncConfigNodeIServiceClient{%s}", endpoint);
   }
 
-  public static class Factory extends BaseClientFactory<TEndPoint, SyncConfigNodeIServiceClient> {
+  public static class Factory extends ThriftClientFactory<TEndPoint, SyncConfigNodeIServiceClient> {
 
     public Factory(
         ClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager,
-        ClientFactoryProperty clientFactoryProperty) {
-      super(clientManager, clientFactoryProperty);
+        ThriftClientProperty thriftClientProperty) {
+      super(clientManager, thriftClientProperty);
     }
 
     @Override
@@ -108,15 +108,13 @@ public class SyncConfigNodeIServiceClient extends IConfigNodeRPCService.Client
     @Override
     public PooledObject<SyncConfigNodeIServiceClient> makeObject(TEndPoint endpoint)
         throws Exception {
-      Constructor<SyncConfigNodeIServiceClient> constructor =
-          SyncConfigNodeIServiceClient.class.getConstructor(
-              TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass());
       return new DefaultPooledObject<>(
           SyncThriftClientWithErrorHandler.newErrorHandler(
               SyncConfigNodeIServiceClient.class,
-              constructor,
-              clientFactoryProperty.getProtocolFactory(),
-              clientFactoryProperty.getConnectionTimeoutMs(),
+              SyncConfigNodeIServiceClient.class.getConstructor(
+                  TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass()),
+              thriftClientProperty.getProtocolFactory(),
+              thriftClientProperty.getConnectionTimeoutMs(),
               endpoint,
               clientManager));
     }
@@ -124,8 +122,7 @@ public class SyncConfigNodeIServiceClient extends IConfigNodeRPCService.Client
     @Override
     public boolean validateObject(
         TEndPoint endpoint, PooledObject<SyncConfigNodeIServiceClient> pooledObject) {
-      return pooledObject.getObject() != null
-          && pooledObject.getObject().getInputProtocol().getTransport().isOpen();
+      return pooledObject.getObject().getInputProtocol().getTransport().isOpen();
     }
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
index 3d992c5b48..cab879e08a 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
@@ -20,9 +20,10 @@
 package org.apache.iotdb.commons.client.sync;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.BaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
 import org.apache.iotdb.rpc.RpcTransportFactory;
@@ -35,19 +36,18 @@ import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
-import java.lang.reflect.Constructor;
 import java.net.SocketException;
 
 public class SyncDataNodeInternalServiceClient extends IDataNodeRPCService.Client
-    implements SyncThriftClient, AutoCloseable {
+    implements ThriftClient, AutoCloseable {
 
-  private final TEndPoint endPoint;
+  private final TEndPoint endpoint;
   private final ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager;
 
   public SyncDataNodeInternalServiceClient(
       TProtocolFactory protocolFactory,
       int connectionTimeout,
-      TEndPoint endPoint,
+      TEndPoint endpoint,
       ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager)
       throws TTransportException {
     super(
@@ -55,17 +55,26 @@ public class SyncDataNodeInternalServiceClient extends IDataNodeRPCService.Clien
             RpcTransportFactory.INSTANCE.getTransport(
                 new TSocket(
                     TConfigurationConst.defaultTConfiguration,
-                    endPoint.getIp(),
-                    endPoint.getPort(),
+                    endpoint.getIp(),
+                    endpoint.getPort(),
                     connectionTimeout))));
-    this.endPoint = endPoint;
+    this.endpoint = endpoint;
     this.clientManager = clientManager;
     getInputProtocol().getTransport().open();
   }
 
+  public int getTimeout() throws SocketException {
+    return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
+  }
+
+  public void setTimeout(int timeout) {
+    // the same transport is used in both input and output
+    ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
+  }
+
   @TestOnly
   public TEndPoint getTEndpoint() {
-    return endPoint;
+    return endpoint;
   }
 
   @TestOnly
@@ -73,42 +82,33 @@ public class SyncDataNodeInternalServiceClient extends IDataNodeRPCService.Clien
     return clientManager;
   }
 
+  @Override
   public void close() {
-    if (clientManager != null) {
-      clientManager.returnClient(endPoint, this);
-    }
-  }
-
-  public void setTimeout(int timeout) {
-    // the same transport is used in both input and output
-    ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
+    clientManager.returnClient(endpoint, this);
   }
 
+  @Override
   public void invalidate() {
     getInputProtocol().getTransport().close();
   }
 
   @Override
   public void invalidateAll() {
-    clientManager.clear(endPoint);
-  }
-
-  public int getTimeout() throws SocketException {
-    return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
+    clientManager.clear(endpoint);
   }
 
   @Override
   public String toString() {
-    return String.format("SyncDataNodeInternalServiceClient{%s}", endPoint);
+    return String.format("SyncDataNodeInternalServiceClient{%s}", endpoint);
   }
 
   public static class Factory
-      extends BaseClientFactory<TEndPoint, SyncDataNodeInternalServiceClient> {
+      extends ThriftClientFactory<TEndPoint, SyncDataNodeInternalServiceClient> {
 
     public Factory(
         ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager,
-        ClientFactoryProperty clientFactoryProperty) {
-      super(clientManager, clientFactoryProperty);
+        ThriftClientProperty thriftClientProperty) {
+      super(clientManager, thriftClientProperty);
     }
 
     @Override
@@ -120,15 +120,13 @@ public class SyncDataNodeInternalServiceClient extends IDataNodeRPCService.Clien
     @Override
     public PooledObject<SyncDataNodeInternalServiceClient> makeObject(TEndPoint endpoint)
         throws Exception {
-      Constructor<SyncDataNodeInternalServiceClient> constructor =
-          SyncDataNodeInternalServiceClient.class.getConstructor(
-              TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass());
       return new DefaultPooledObject<>(
           SyncThriftClientWithErrorHandler.newErrorHandler(
               SyncDataNodeInternalServiceClient.class,
-              constructor,
-              clientFactoryProperty.getProtocolFactory(),
-              clientFactoryProperty.getConnectionTimeoutMs(),
+              SyncDataNodeInternalServiceClient.class.getConstructor(
+                  TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass()),
+              thriftClientProperty.getProtocolFactory(),
+              thriftClientProperty.getConnectionTimeoutMs(),
               endpoint,
               clientManager));
     }
@@ -136,8 +134,7 @@ public class SyncDataNodeInternalServiceClient extends IDataNodeRPCService.Clien
     @Override
     public boolean validateObject(
         TEndPoint endpoint, PooledObject<SyncDataNodeInternalServiceClient> pooledObject) {
-      return pooledObject.getObject() != null
-          && pooledObject.getObject().getInputProtocol().getTransport().isOpen();
+      return pooledObject.getObject().getInputProtocol().getTransport().isOpen();
     }
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeMPPDataExchangeServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeMPPDataExchangeServiceClient.java
index 7bcbd05071..dd3474e865 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeMPPDataExchangeServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeMPPDataExchangeServiceClient.java
@@ -20,9 +20,10 @@
 package org.apache.iotdb.commons.client.sync;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.BaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.mpp.rpc.thrift.MPPDataExchangeService;
 import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.rpc.TConfigurationConst;
@@ -34,19 +35,18 @@ import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
-import java.lang.reflect.Constructor;
 import java.net.SocketException;
 
 public class SyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeService.Client
-    implements SyncThriftClient, AutoCloseable {
+    implements ThriftClient, AutoCloseable {
 
-  private final TEndPoint endPoint;
+  private final TEndPoint endpoint;
   private final ClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> clientManager;
 
   public SyncDataNodeMPPDataExchangeServiceClient(
       TProtocolFactory protocolFactory,
       int connectionTimeout,
-      TEndPoint endPoint,
+      TEndPoint endpoint,
       ClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> clientManager)
       throws TTransportException {
     super(
@@ -54,18 +54,16 @@ public class SyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSer
             RpcTransportFactory.INSTANCE.getTransport(
                 new TSocket(
                     TConfigurationConst.defaultTConfiguration,
-                    endPoint.getIp(),
-                    endPoint.getPort(),
+                    endpoint.getIp(),
+                    endpoint.getPort(),
                     connectionTimeout))));
-    this.endPoint = endPoint;
+    this.endpoint = endpoint;
     this.clientManager = clientManager;
     getInputProtocol().getTransport().open();
   }
 
-  public void close() {
-    if (clientManager != null) {
-      clientManager.returnClient(endPoint, this);
-    }
+  public int getTimeout() throws SocketException {
+    return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
   }
 
   public void setTimeout(int timeout) {
@@ -73,31 +71,33 @@ public class SyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSer
     ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
   }
 
+  @Override
+  public void close() {
+    clientManager.returnClient(endpoint, this);
+  }
+
+  @Override
   public void invalidate() {
     getInputProtocol().getTransport().close();
   }
 
   @Override
   public void invalidateAll() {
-    clientManager.clear(endPoint);
-  }
-
-  public int getTimeout() throws SocketException {
-    return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
+    clientManager.clear(endpoint);
   }
 
   @Override
   public String toString() {
-    return String.format("SyncDataNodeMPPDataExchangeServiceClient{%s}", endPoint);
+    return String.format("SyncDataNodeMPPDataExchangeServiceClient{%s}", endpoint);
   }
 
   public static class Factory
-      extends BaseClientFactory<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> {
+      extends ThriftClientFactory<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> {
 
     public Factory(
         ClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> clientManager,
-        ClientFactoryProperty clientFactoryProperty) {
-      super(clientManager, clientFactoryProperty);
+        ThriftClientProperty thriftClientProperty) {
+      super(clientManager, thriftClientProperty);
     }
 
     @Override
@@ -109,15 +109,13 @@ public class SyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSer
     @Override
     public PooledObject<SyncDataNodeMPPDataExchangeServiceClient> makeObject(TEndPoint endpoint)
         throws Exception {
-      Constructor<SyncDataNodeMPPDataExchangeServiceClient> constructor =
-          SyncDataNodeMPPDataExchangeServiceClient.class.getConstructor(
-              TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass());
       return new DefaultPooledObject<>(
           SyncThriftClientWithErrorHandler.newErrorHandler(
               SyncDataNodeMPPDataExchangeServiceClient.class,
-              constructor,
-              clientFactoryProperty.getProtocolFactory(),
-              clientFactoryProperty.getConnectionTimeoutMs(),
+              SyncDataNodeMPPDataExchangeServiceClient.class.getConstructor(
+                  TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass()),
+              thriftClientProperty.getProtocolFactory(),
+              thriftClientProperty.getConnectionTimeoutMs(),
               endpoint,
               clientManager));
     }
@@ -125,8 +123,7 @@ public class SyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSer
     @Override
     public boolean validateObject(
         TEndPoint endpoint, PooledObject<SyncDataNodeMPPDataExchangeServiceClient> pooledObject) {
-      return pooledObject.getObject() != null
-          && pooledObject.getObject().getInputProtocol().getTransport().isOpen();
+      return pooledObject.getObject().getInputProtocol().getTransport().isOpen();
     }
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java
index d864984bc9..32c75da38f 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java
@@ -18,26 +18,24 @@
  */
 package org.apache.iotdb.commons.client.sync;
 
+import org.apache.iotdb.commons.client.ThriftClient;
+
 import net.sf.cglib.proxy.Enhancer;
 import net.sf.cglib.proxy.MethodInterceptor;
 import net.sf.cglib.proxy.MethodProxy;
-import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.net.SocketException;
 
 public class SyncThriftClientWithErrorHandler implements MethodInterceptor {
 
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(SyncThriftClientWithErrorHandler.class);
-
-  public static <V extends SyncThriftClient> V newErrorHandler(
+  /**
+   * Note: The caller needs to ensure that the constructor corresponds to the class, or the cast
+   * might fail.
+   */
+  @SuppressWarnings("unchecked")
+  public static <V extends ThriftClient> V newErrorHandler(
       Class<V> targetClass, Constructor<V> constructor, Object... args) {
     Enhancer enhancer = new Enhancer();
     enhancer.setSuperclass(targetClass);
@@ -54,51 +52,9 @@ public class SyncThriftClientWithErrorHandler implements MethodInterceptor {
     try {
       return methodProxy.invokeSuper(o, objects);
     } catch (Throwable t) {
-      Throwable origin = t;
-      if (t instanceof InvocationTargetException) {
-        origin = ((InvocationTargetException) t).getTargetException();
-      }
-      Throwable cur = origin;
-      if (cur instanceof TException) {
-        int level = 0;
-        while (cur != null) {
-          LOGGER.debug(
-              "level-{} Exception class {}, message {}",
-              level,
-              cur.getClass().getName(),
-              cur.getMessage());
-          cur = cur.getCause();
-          level++;
-        }
-        ((SyncThriftClient) o).invalidate();
-      }
-
-      Throwable rootCause = ExceptionUtils.getRootCause(origin);
-      if (rootCause != null) {
-        // if the exception is SocketException and its error message is Broken pipe, it means that
-        // the remote node may restart and all the connection we cached before should be cleared.
-        LOGGER.debug(
-            "root cause message {}, LocalizedMessage {}, ",
-            rootCause.getMessage(),
-            rootCause.getLocalizedMessage(),
-            rootCause);
-        if (isConnectionBroken(rootCause)) {
-          LOGGER.debug(
-              "Broken pipe error happened in calling method {}, we need to clear all previous cached connection, err: {}",
-              method.getName(),
-              t);
-          ((SyncThriftClient) o).invalidate();
-          ((SyncThriftClient) o).invalidateAll();
-        }
-      }
+      ThriftClient.resolveException(t, (ThriftClient) o);
       throw new TException(
           "Error in calling method " + method.getName() + ", because: " + t.getMessage(), t);
     }
   }
-
-  private boolean isConnectionBroken(Throwable cause) {
-    return (cause instanceof SocketException && cause.getMessage().contains("Broken pipe"))
-        || (cause instanceof TTransportException
-            && cause.getMessage().contains("Socket is closed by peer"));
-  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 3b3b3359f1..b930ea22e3 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -99,13 +99,14 @@ public class CommonConfig {
    * ClientManager will have so many selector threads (TAsyncClientManager) to distribute to its
    * clients.
    */
-  private int selectorNumOfClientManager =
-      Runtime.getRuntime().availableProcessors() / 4 > 0
-          ? Runtime.getRuntime().availableProcessors() / 4
-          : 1;
+  private int selectorNumOfClientManager = 1;
 
   /** whether to use thrift compression. */
-  private boolean isCnRpcThriftCompressionEnabled = false;
+  private boolean isRpcThriftCompressionEnabled = false;
+
+  private int maxTotalClientForEachNode = 300;
+
+  private int maxIdleClientForEachNode = 200;
 
   /** What will the system do when unrecoverable error occurs. */
   private HandleSystemErrorStrategy handleSystemErrorStrategy =
@@ -246,28 +247,44 @@ public class CommonConfig {
     this.defaultTTLInMs = defaultTTLInMs;
   }
 
-  public int getCnConnectionTimeoutInMS() {
+  public int getConnectionTimeoutInMS() {
     return connectionTimeoutInMS;
   }
 
-  public void setCnConnectionTimeoutInMS(int connectionTimeoutInMS) {
+  public void setConnectionTimeoutInMS(int connectionTimeoutInMS) {
     this.connectionTimeoutInMS = connectionTimeoutInMS;
   }
 
-  public int getCnSelectorNumOfClientManager() {
+  public int getSelectorNumOfClientManager() {
     return selectorNumOfClientManager;
   }
 
-  public void setCnSelectorNumOfClientManager(int selectorNumOfClientManager) {
+  public void setSelectorNumOfClientManager(int selectorNumOfClientManager) {
     this.selectorNumOfClientManager = selectorNumOfClientManager;
   }
 
-  public boolean isCnRpcThriftCompressionEnabled() {
-    return isCnRpcThriftCompressionEnabled;
+  public boolean isRpcThriftCompressionEnabled() {
+    return isRpcThriftCompressionEnabled;
+  }
+
+  public void setRpcThriftCompressionEnabled(boolean rpcThriftCompressionEnabled) {
+    isRpcThriftCompressionEnabled = rpcThriftCompressionEnabled;
+  }
+
+  public int getMaxTotalClientForEachNode() {
+    return maxTotalClientForEachNode;
+  }
+
+  public void setMaxTotalClientForEachNode(int maxTotalClientForEachNode) {
+    this.maxTotalClientForEachNode = maxTotalClientForEachNode;
+  }
+
+  public int getMaxIdleClientForEachNode() {
+    return maxIdleClientForEachNode;
   }
 
-  public void setCnRpcThriftCompressionEnabled(boolean cnRpcThriftCompressionEnabled) {
-    isCnRpcThriftCompressionEnabled = cnRpcThriftCompressionEnabled;
+  public void setMaxIdleClientForEachNode(int maxIdleClientForEachNode) {
+    this.maxIdleClientForEachNode = maxIdleClientForEachNode;
   }
 
   HandleSystemErrorStrategy getHandleSystemErrorStrategy() {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 6ef3e0cc01..f53521f77b 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -93,27 +93,66 @@ public class CommonDescriptor {
             .trim()
             .split(","));
 
-    config.setCnRpcThriftCompressionEnabled(
+    config.setRpcThriftCompressionEnabled(
         Boolean.parseBoolean(
             properties
                 .getProperty(
                     "cn_rpc_thrift_compression_enable",
-                    String.valueOf(config.isCnRpcThriftCompressionEnabled()))
+                    String.valueOf(config.isRpcThriftCompressionEnabled()))
                 .trim()));
 
-    config.setCnConnectionTimeoutInMS(
+    config.setConnectionTimeoutInMS(
         Integer.parseInt(
             properties
                 .getProperty(
-                    "cn_connection_timeout_ms", String.valueOf(config.getCnConnectionTimeoutInMS()))
+                    "cn_connection_timeout_ms", String.valueOf(config.getConnectionTimeoutInMS()))
                 .trim()));
 
-    config.setCnSelectorNumOfClientManager(
+    config.setSelectorNumOfClientManager(
         Integer.parseInt(
             properties
                 .getProperty(
                     "cn_selector_thread_nums_of_client_manager",
-                    String.valueOf(config.getCnSelectorNumOfClientManager()))
+                    String.valueOf(config.getSelectorNumOfClientManager()))
+                .trim()));
+
+    config.setConnectionTimeoutInMS(
+        Integer.parseInt(
+            properties
+                .getProperty(
+                    "dn_connection_timeout_ms", String.valueOf(config.getConnectionTimeoutInMS()))
+                .trim()));
+
+    config.setRpcThriftCompressionEnabled(
+        Boolean.parseBoolean(
+            properties
+                .getProperty(
+                    "dn_rpc_thrift_compression_enable",
+                    String.valueOf(config.isRpcThriftCompressionEnabled()))
+                .trim()));
+
+    config.setSelectorNumOfClientManager(
+        Integer.parseInt(
+            properties
+                .getProperty(
+                    "dn_selector_thread_nums_of_client_manager",
+                    String.valueOf(config.getSelectorNumOfClientManager()))
+                .trim()));
+
+    config.setMaxTotalClientForEachNode(
+        Integer.parseInt(
+            properties
+                .getProperty(
+                    "dn_max_connection_for_internal_service",
+                    String.valueOf(config.getMaxTotalClientForEachNode()))
+                .trim()));
+
+    config.setMaxIdleClientForEachNode(
+        Integer.parseInt(
+            properties
+                .getProperty(
+                    "dn_core_connection_for_internal_service",
+                    String.valueOf(config.getMaxIdleClientForEachNode()))
                 .trim()));
 
     config.setHandleSystemErrorStrategy(
diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java b/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
index 1375423486..061fd0f775 100644
--- a/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
+++ b/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
@@ -21,9 +21,13 @@ package org.apache.iotdb.commons.client;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.exception.BorrowNullClientManagerException;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.mock.MockInternalRPCService;
+import org.apache.iotdb.commons.client.property.ClientPoolProperty;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
 
@@ -61,20 +65,22 @@ public class ClientManagerTest {
   /**
    * We put all tests together to avoid frequent restarts of thrift Servers, which can cause "bind
    * address already used" problems in macOS CI environments. The reason for this may be about this
-   * [blog](https://stackoverflow.com/questions/51998042/macos-so-reuseaddr-so-reuseport-not-consistent-with-linux)
+   * <a
+   * href="https://stackoverflow.com/questions/51998042/macos-so-reuseaddr-so-reuseport-not-consistent-with-linux">blog</a>
    */
   @Test
   public void allTest() throws Exception {
-    normalSyncClientManagersTest();
-    normalAsyncClientManagersTest();
-    MaxIdleClientManagersTest();
-    MaxTotalClientManagersTest();
-    MaxWaitClientTimeoutClientManagersTest();
-    InvalidSyncClientReturnClientManagersTest();
-    InvalidAsyncClientReturnClientManagersTest();
+    normalSyncTest();
+    normalAsyncTest();
+    maxIdleTest();
+    maxTotalTest();
+    maxWaitClientTimeoutTest();
+    invalidSyncClientReturnTest();
+    invalidAsyncClientReturnTest();
+    borrowNullTest();
   }
 
-  public void normalSyncClientManagersTest() throws Exception {
+  public void normalSyncTest() throws Exception {
     // init syncClientManager
     ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> syncClusterManager =
         (ClientManager<TEndPoint, SyncDataNodeInternalServiceClient>)
@@ -117,7 +123,7 @@ public class ClientManagerTest {
     Assert.assertFalse(syncClient2.getInputProtocol().getTransport().isOpen());
   }
 
-  public void normalAsyncClientManagersTest() throws Exception {
+  public void normalAsyncTest() throws Exception {
     // init asyncClientManager
     ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> asyncClusterManager =
         (ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>)
@@ -158,7 +164,7 @@ public class ClientManagerTest {
     Assert.assertEquals(0, asyncClusterManager.getPool().getNumIdle(endPoint));
   }
 
-  public void MaxIdleClientManagersTest() throws Exception {
+  public void maxIdleTest() throws Exception {
     int maxIdleClientForEachNode = 1;
 
     // init syncClientManager and set maxIdleClientForEachNode to 1
@@ -173,7 +179,7 @@ public class ClientManagerTest {
                               ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) {
                         return new GenericKeyedObjectPool<>(
                             new SyncDataNodeInternalServiceClient.Factory(
-                                manager, new ClientFactoryProperty.Builder().build()),
+                                manager, new ThriftClientProperty.Builder().build()),
                             new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
                                 .setMaxIdleClientForEachNode(maxIdleClientForEachNode)
                                 .build()
@@ -217,7 +223,7 @@ public class ClientManagerTest {
     Assert.assertFalse(syncClient1.getInputProtocol().getTransport().isOpen());
   }
 
-  public void MaxTotalClientManagersTest() throws Exception {
+  public void maxTotalTest() throws Exception {
     int maxTotalClientForEachNode = 1;
     long waitClientTimeoutMs = TimeUnit.SECONDS.toMillis(1);
 
@@ -233,7 +239,7 @@ public class ClientManagerTest {
                               ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) {
                         return new GenericKeyedObjectPool<>(
                             new SyncDataNodeInternalServiceClient.Factory(
-                                manager, new ClientFactoryProperty.Builder().build()),
+                                manager, new ThriftClientProperty.Builder().build()),
                             new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
                                 .setMaxTotalClientForEachNode(maxTotalClientForEachNode)
                                 .setWaitClientTimeoutMS(waitClientTimeoutMs)
@@ -257,6 +263,7 @@ public class ClientManagerTest {
     try {
       start = System.nanoTime();
       syncClient2 = syncClusterManager.borrowClient(endPoint);
+      Assert.fail();
     } catch (ClientManagerException e) {
       long end = System.nanoTime();
       Assert.assertTrue(end - start >= waitClientTimeoutMs * 1_000_000);
@@ -289,7 +296,7 @@ public class ClientManagerTest {
     Assert.assertFalse(syncClient2.getInputProtocol().getTransport().isOpen());
   }
 
-  public void MaxWaitClientTimeoutClientManagersTest() throws Exception {
+  public void maxWaitClientTimeoutTest() throws Exception {
     long waitClientTimeoutMS = TimeUnit.SECONDS.toMillis(2);
     int maxTotalClientForEachNode = 1;
 
@@ -306,7 +313,7 @@ public class ClientManagerTest {
                               ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) {
                         return new GenericKeyedObjectPool<>(
                             new SyncDataNodeInternalServiceClient.Factory(
-                                manager, new ClientFactoryProperty.Builder().build()),
+                                manager, new ThriftClientProperty.Builder().build()),
                             new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
                                 .setWaitClientTimeoutMS(waitClientTimeoutMS)
                                 .setMaxTotalClientForEachNode(maxTotalClientForEachNode)
@@ -329,6 +336,7 @@ public class ClientManagerTest {
     try {
       start = System.nanoTime();
       syncClient1 = syncClusterManager.borrowClient(endPoint);
+      Assert.fail();
     } catch (ClientManagerException e) {
       long end = System.nanoTime();
       Assert.assertTrue(end - start >= waitClientTimeoutMS * 1_000_000);
@@ -348,7 +356,7 @@ public class ClientManagerTest {
     Assert.assertFalse(syncClient1.getInputProtocol().getTransport().isOpen());
   }
 
-  public void InvalidSyncClientReturnClientManagersTest() throws Exception {
+  public void invalidSyncClientReturnTest() throws Exception {
     // init syncClientManager
     ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> syncClusterManager =
         (ClientManager<TEndPoint, SyncDataNodeInternalServiceClient>)
@@ -391,7 +399,7 @@ public class ClientManagerTest {
     Assert.assertFalse(syncClient2.getInputProtocol().getTransport().isOpen());
   }
 
-  public void InvalidAsyncClientReturnClientManagersTest() throws Exception {
+  public void invalidAsyncClientReturnTest() throws Exception {
     // init asyncClientManager
     ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> asyncClusterManager =
         (ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>)
@@ -432,28 +440,51 @@ public class ClientManagerTest {
     Assert.assertEquals(0, asyncClusterManager.getPool().getNumIdle(endPoint));
   }
 
+  public void borrowNullTest() {
+    // init asyncClientManager
+    ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> asyncClusterManager =
+        (ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>)
+            new IClientManager.Factory<TEndPoint, AsyncDataNodeInternalServiceClient>()
+                .createClientManager(new TestAsyncDataNodeInternalServiceClientPoolFactory());
+
+    try {
+      asyncClusterManager.borrowClient(null);
+      Assert.fail();
+    } catch (ClientManagerException e) {
+      Assert.assertTrue(e instanceof BorrowNullClientManagerException);
+      Assert.assertTrue(e.getMessage().contains("Can not borrow client for node null"));
+    }
+
+    // close asyncClientManager, asyncClientManager should destroy all client
+    asyncClusterManager.close();
+    Assert.assertEquals(0, asyncClusterManager.getPool().getNumActive(endPoint));
+    Assert.assertEquals(0, asyncClusterManager.getPool().getNumIdle(endPoint));
+  }
+
   public static class TestSyncDataNodeInternalServiceClientPoolFactory
       implements IClientPoolFactory<TEndPoint, SyncDataNodeInternalServiceClient> {
+
     @Override
     public KeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient> createClientPool(
         ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) {
       return new GenericKeyedObjectPool<>(
           new SyncDataNodeInternalServiceClient.Factory(
-              manager, new ClientFactoryProperty.Builder().build()),
+              manager, new ThriftClientProperty.Builder().build()),
           new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>().build().getConfig());
     }
   }
 
   public static class TestAsyncDataNodeInternalServiceClientPoolFactory
       implements IClientPoolFactory<TEndPoint, AsyncDataNodeInternalServiceClient> {
+
     @Override
     public KeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> createClientPool(
         ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> manager) {
       return new GenericKeyedObjectPool<>(
           new AsyncDataNodeInternalServiceClient.Factory(
               manager,
-              new ClientFactoryProperty.Builder().build(),
-              "AsyncDataNodeInternalServiceClientPool"),
+              new ThriftClientProperty.Builder().build(),
+              ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
           new ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>().build().getConfig());
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java b/server/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java
index a41ef93b2b..1091bcea22 100644
--- a/server/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java
@@ -38,8 +38,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
 import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
 import org.apache.iotdb.db.mpp.plan.statement.sys.AuthorStatement;
 import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
@@ -66,9 +66,7 @@ public class ClusterAuthorityFetcher implements IAuthorityFetcher {
   private IAuthorizer authorizer;
 
   private static final IClientManager<ConfigNodeRegionId, ConfigNodeClient>
-      CONFIG_NODE_CLIENT_MANAGER =
-          new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
-              .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+      CONFIG_NODE_CLIENT_MANAGER = ConfigNodeClientManager.getInstance();
 
   public ClusterAuthorityFetcher(IAuthorCache iAuthorCache) {
     this.iAuthorCache = iAuthorCache;
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index ac6f4d0a49..393300c5b7 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -24,11 +24,11 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
-import org.apache.iotdb.commons.client.BaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
-import org.apache.iotdb.commons.client.ClientPoolProperty;
-import org.apache.iotdb.commons.client.sync.SyncThriftClient;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ClientPoolProperty;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.client.sync.SyncThriftClientWithErrorHandler;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.ConfigNodeRegionId;
@@ -126,12 +126,12 @@ import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.Constructor;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
+
+public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClient, AutoCloseable {
 
-public class ConfigNodeClient
-    implements IConfigNodeRPCService.Iface, SyncThriftClient, AutoCloseable {
   private static final Logger logger = LoggerFactory.getLogger(ConfigNodeClient.class);
 
   private static final int RETRY_NUM = 5;
@@ -167,7 +167,7 @@ public class ConfigNodeClient
     // Read config nodes from configuration
     configNodes = ConfigNodeInfo.getInstance().getLatestConfigNodes();
     protocolFactory =
-        CommonDescriptor.getInstance().getConfig().isCnRpcThriftCompressionEnabled()
+        CommonDescriptor.getInstance().getConfig().isRpcThriftCompressionEnabled()
             ? new TCompactProtocol.Factory()
             : new TBinaryProtocol.Factory();
 
@@ -283,7 +283,7 @@ public class ConfigNodeClient
 
   @Override
   public void invalidate() {
-    transport.close();
+    Optional.ofNullable(transport).ifPresent(TTransport::close);
   }
 
   @Override
@@ -1838,12 +1838,12 @@ public class ConfigNodeClient
     throw new TException(MSG_RECONNECTION_FAIL);
   }
 
-  public static class Factory extends BaseClientFactory<ConfigNodeRegionId, ConfigNodeClient> {
+  public static class Factory extends ThriftClientFactory<ConfigNodeRegionId, ConfigNodeClient> {
 
     public Factory(
         ClientManager<ConfigNodeRegionId, ConfigNodeClient> clientManager,
-        ClientFactoryProperty clientFactoryProperty) {
-      super(clientManager, clientFactoryProperty);
+        ThriftClientProperty thriftClientProperty) {
+      super(clientManager, thriftClientProperty);
     }
 
     @Override
@@ -1855,15 +1855,13 @@ public class ConfigNodeClient
     @Override
     public PooledObject<ConfigNodeClient> makeObject(ConfigNodeRegionId configNodeRegionId)
         throws Exception {
-      Constructor<ConfigNodeClient> constructor =
-          ConfigNodeClient.class.getConstructor(
-              TProtocolFactory.class, long.class, clientManager.getClass());
       return new DefaultPooledObject<>(
           SyncThriftClientWithErrorHandler.newErrorHandler(
               ConfigNodeClient.class,
-              constructor,
-              clientFactoryProperty.getProtocolFactory(),
-              clientFactoryProperty.getConnectionTimeoutMs(),
+              ConfigNodeClient.class.getConstructor(
+                  TProtocolFactory.class, long.class, clientManager.getClass()),
+              thriftClientProperty.getProtocolFactory(),
+              thriftClientProperty.getConnectionTimeoutMs(),
               clientManager));
     }
 
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClientManager.java
similarity index 52%
copy from node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java
copy to server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClientManager.java
index 439f25b655..12df1907bb 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClientManager.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -17,10 +17,19 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.client.exception;
+package org.apache.iotdb.db.client;
 
-public class ClientManagerException extends Exception {
-  public ClientManagerException(Exception exception) {
-    super(exception);
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.consensus.ConfigNodeRegionId;
+
+public class ConfigNodeClientManager {
+  private static final class ConfigNodeClientManagerHolder {
+    private static final IClientManager<ConfigNodeRegionId, ConfigNodeClient> INSTANCE =
+        new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
+            .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+  }
+
+  public static IClientManager<ConfigNodeRegionId, ConfigNodeClient> getInstance() {
+    return ConfigNodeClientManagerHolder.INSTANCE;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/client/DataNodeClientPoolFactory.java b/server/src/main/java/org/apache/iotdb/db/client/DataNodeClientPoolFactory.java
index b5fb12c209..b1451c9337 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/DataNodeClientPoolFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/DataNodeClientPoolFactory.java
@@ -19,17 +19,10 @@
 
 package org.apache.iotdb.db.client;
 
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
 import org.apache.iotdb.commons.client.ClientManager;
-import org.apache.iotdb.commons.client.ClientPoolProperty;
 import org.apache.iotdb.commons.client.IClientPoolFactory;
-import org.apache.iotdb.commons.client.async.AsyncConfigNodeIServiceClient;
-import org.apache.iotdb.commons.client.async.AsyncDataNodeMPPDataExchangeServiceClient;
-import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
-import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.client.property.ClientPoolProperty;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.consensus.ConfigNodeRegionId;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -43,117 +36,18 @@ public class DataNodeClientPoolFactory {
 
   private DataNodeClientPoolFactory() {}
 
-  public static class SyncConfigNodeIServiceClientPoolFactory
-      implements IClientPoolFactory<TEndPoint, SyncConfigNodeIServiceClient> {
-    @Override
-    public KeyedObjectPool<TEndPoint, SyncConfigNodeIServiceClient> createClientPool(
-        ClientManager<TEndPoint, SyncConfigNodeIServiceClient> manager) {
-      return new GenericKeyedObjectPool<>(
-          new SyncConfigNodeIServiceClient.Factory(
-              manager,
-              new ClientFactoryProperty.Builder()
-                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
-                  .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
-                  .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
-                  .build()),
-          new ClientPoolProperty.Builder<SyncConfigNodeIServiceClient>().build().getConfig());
-    }
-  }
-
-  public static class AsyncConfigNodeIServiceClientPoolFactory
-      implements IClientPoolFactory<TEndPoint, AsyncConfigNodeIServiceClient> {
-    @Override
-    public KeyedObjectPool<TEndPoint, AsyncConfigNodeIServiceClient> createClientPool(
-        ClientManager<TEndPoint, AsyncConfigNodeIServiceClient> manager) {
-      return new GenericKeyedObjectPool<>(
-          new AsyncConfigNodeIServiceClient.Factory(
-              manager,
-              new ClientFactoryProperty.Builder()
-                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
-                  .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
-                  .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
-                  .build(),
-              ThreadName.ASYNC_CONFIGNODE_CLIENT_POOL.getName()),
-          new ClientPoolProperty.Builder<AsyncConfigNodeIServiceClient>()
-              .setMaxIdleClientForEachNode(conf.getCoreConnectionForInternalService())
-              .setMaxTotalClientForEachNode(conf.getMaxConnectionForInternalService())
-              .build()
-              .getConfig());
-    }
-  }
-
-  public static class SyncDataNodeInternalServiceClientPoolFactory
-      implements IClientPoolFactory<TEndPoint, SyncDataNodeInternalServiceClient> {
-    @Override
-    public KeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient> createClientPool(
-        ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) {
-      return new GenericKeyedObjectPool<>(
-          new SyncDataNodeInternalServiceClient.Factory(
-              manager,
-              new ClientFactoryProperty.Builder()
-                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
-                  .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
-                  .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
-                  .build()),
-          new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
-              .setMaxIdleClientForEachNode(conf.getCoreConnectionForInternalService())
-              .setMaxTotalClientForEachNode(conf.getMaxConnectionForInternalService())
-              .build()
-              .getConfig());
-    }
-  }
-
-  public static class SyncDataNodeMPPDataExchangeServiceClientPoolFactory
-      implements IClientPoolFactory<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> {
-    @Override
-    public KeyedObjectPool<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> createClientPool(
-        ClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> manager) {
-      return new GenericKeyedObjectPool<>(
-          new SyncDataNodeMPPDataExchangeServiceClient.Factory(
-              manager,
-              new ClientFactoryProperty.Builder()
-                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
-                  .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
-                  .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
-                  .build()),
-          new ClientPoolProperty.Builder<SyncDataNodeMPPDataExchangeServiceClient>()
-              .build()
-              .getConfig());
-    }
-  }
-
-  public static class AsyncDataNodeMPPDataExchangeServiceClientPoolFactory
-      implements IClientPoolFactory<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> {
-    @Override
-    public KeyedObjectPool<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> createClientPool(
-        ClientManager<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> manager) {
-      return new GenericKeyedObjectPool<>(
-          new AsyncDataNodeMPPDataExchangeServiceClient.Factory(
-              manager,
-              new ClientFactoryProperty.Builder()
-                  .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
-                  .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
-                  .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
-                  .build(),
-              ThreadName.ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL.getName()),
-          new ClientPoolProperty.Builder<AsyncDataNodeMPPDataExchangeServiceClient>()
-              .build()
-              .getConfig());
-    }
-  }
-
   public static class ConfigNodeClientPoolFactory
       implements IClientPoolFactory<ConfigNodeRegionId, ConfigNodeClient> {
+
     @Override
     public KeyedObjectPool<ConfigNodeRegionId, ConfigNodeClient> createClientPool(
         ClientManager<ConfigNodeRegionId, ConfigNodeClient> manager) {
       return new GenericKeyedObjectPool<>(
           new ConfigNodeClient.Factory(
               manager,
-              new ClientFactoryProperty.Builder()
+              new ThriftClientProperty.Builder()
                   .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
                   .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
-                  .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
                   .build()),
           new ClientPoolProperty.Builder<ConfigNodeClient>()
               .setMaxIdleClientForEachNode(conf.getCoreConnectionForInternalService())
@@ -165,13 +59,14 @@ public class DataNodeClientPoolFactory {
 
   public static class ClusterDeletionConfigNodeClientPoolFactory
       implements IClientPoolFactory<ConfigNodeRegionId, ConfigNodeClient> {
+
     @Override
     public KeyedObjectPool<ConfigNodeRegionId, ConfigNodeClient> createClientPool(
         ClientManager<ConfigNodeRegionId, ConfigNodeClient> manager) {
       return new GenericKeyedObjectPool<>(
           new ConfigNodeClient.Factory(
               manager,
-              new ClientFactoryProperty.Builder()
+              new ThriftClientProperty.Builder()
                   .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS() * 10)
                   .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
                   .setSelectorNumOfAsyncClientManager(
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java
index 19564f2528..88f87b6639 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java
@@ -32,8 +32,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq;
 import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.CreateSchemaTemplateStatement;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -79,9 +79,7 @@ public class ClusterTemplateManager implements ITemplateManager {
   }
 
   private static final IClientManager<ConfigNodeRegionId, ConfigNodeClient>
-      CONFIG_NODE_CLIENT_MANAGER =
-          new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
-              .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+      CONFIG_NODE_CLIENT_MANAGER = ConfigNodeClientManager.getInstance();
 
   @Override
   public TSStatus createSchemaTemplate(CreateSchemaTemplateStatement statement) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeService.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeService.java
index f1ee4b6438..fd935ff3c8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeService.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeService.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.execution.exchange;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
@@ -30,7 +31,6 @@ import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.service.ThriftService;
 import org.apache.iotdb.commons.service.ThriftServiceThread;
 import org.apache.iotdb.commons.service.metric.MetricService;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
@@ -69,8 +69,7 @@ public class MPPDataExchangeService extends ThriftService implements MPPDataExch
             executorService,
             new IClientManager.Factory<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>()
                 .createClientManager(
-                    new DataNodeClientPoolFactory
-                        .SyncDataNodeMPPDataExchangeServiceClientPoolFactory()));
+                    new ClientPoolFactory.SyncDataNodeMPPDataExchangeServiceClientPoolFactory()));
     LOGGER.info("MPPDataExchangeManager init successfully");
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index 3a3002850e..8d18d611c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -19,11 +19,11 @@
 package org.apache.iotdb.db.mpp.plan;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
@@ -54,6 +54,7 @@ import java.util.concurrent.ScheduledExecutorService;
  * QueryExecution.
  */
 public class Coordinator {
+
   private static final Logger LOGGER = LoggerFactory.getLogger(Coordinator.class);
 
   private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator";
@@ -69,7 +70,7 @@ public class Coordinator {
       INTERNAL_SERVICE_CLIENT_MANAGER =
           new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
               .createClientManager(
-                  new DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+                  new ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
 
   private final ExecutorService executor;
   private final ExecutorService writeOperationExecutor;
@@ -213,6 +214,11 @@ public class Coordinator {
     }
   }
 
+  public IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+      getInternalServiceClientManager() {
+    return INTERNAL_SERVICE_CLIENT_MANAGER;
+  }
+
   public static Coordinator getInstance() {
     return INSTANCE;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java
index 872e1eb66d..a0569e390a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.consensus.DataRegionId;
@@ -33,7 +34,6 @@ import org.apache.iotdb.consensus.iot.thrift.TInactivatePeerReq;
 import org.apache.iotdb.consensus.iot.thrift.TInactivatePeerRes;
 import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadReq;
 import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadRes;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
 
@@ -45,7 +45,7 @@ public class TestRPCClient {
       INTERNAL_SERVICE_CLIENT_MANAGER =
           new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
               .createClientManager(
-                  new DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+                  new ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
 
   private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager;
 
@@ -66,7 +66,7 @@ public class TestRPCClient {
 
   private void loadSnapshot() {
     try (SyncIoTConsensusServiceClient client =
-        syncClientManager.borrowClient(new TEndPoint("127.0.0.1", 10761))) {
+        syncClientManager.borrowClient(new TEndPoint("127.0.0.1", 40011))) {
       TTriggerSnapshotLoadRes res =
           client.triggerSnapshotLoad(
               new TTriggerSnapshotLoadReq(
@@ -79,7 +79,7 @@ public class TestRPCClient {
 
   private void testAddPeer() {
     try (SyncIoTConsensusServiceClient client =
-        syncClientManager.borrowClient(new TEndPoint("127.0.0.1", 10762))) {
+        syncClientManager.borrowClient(new TEndPoint("127.0.0.1", 40012))) {
       TInactivatePeerRes res =
           client.inactivatePeer(
               new TInactivatePeerReq(new DataRegionId(1).convertToTConsensusGroupId()));
@@ -91,7 +91,7 @@ public class TestRPCClient {
 
   private void removeRegionPeer() {
     try (SyncDataNodeInternalServiceClient client =
-        INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 10730))) {
+        INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 9003))) {
       client.removeRegionPeer(
           new TMaintainPeerReq(new DataRegionId(1).convertToTConsensusGroupId(), getLocation2(3)));
     } catch (Exception e) {
@@ -101,7 +101,7 @@ public class TestRPCClient {
 
   private void addPeer() {
     try (SyncDataNodeInternalServiceClient client =
-        INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 10730))) {
+        INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 9003))) {
       client.addRegionPeer(
           new TMaintainPeerReq(new DataRegionId(1).convertToTConsensusGroupId(), getLocation2(3)));
     } catch (Exception e) {
@@ -131,7 +131,7 @@ public class TestRPCClient {
 
   private void createDataRegion() {
     try (SyncDataNodeInternalServiceClient client =
-        INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 10732))) {
+        INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 9005))) {
       TCreateDataRegionReq req = new TCreateDataRegionReq();
       req.setStorageGroup("root.test.g_0");
       TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
index 2c70aaf3d0..e3e937d04d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
@@ -40,8 +40,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
 import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
@@ -72,8 +72,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
   private final PartitionCache partitionCache;
 
   private final IClientManager<ConfigNodeRegionId, ConfigNodeClient> configNodeClientManager =
-      new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
-          .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+      ConfigNodeClientManager.getInstance();
 
   private static final class ClusterPartitionFetcherHolder {
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
index 040c568b40..249be0db20 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
@@ -42,8 +42,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
 import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
@@ -106,8 +106,7 @@ public class PartitionCache {
   private final ReentrantReadWriteLock regionReplicaSetLock = new ReentrantReadWriteLock();
 
   private final IClientManager<ConfigNodeRegionId, ConfigNodeClient> configNodeClientManager =
-      new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
-          .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+      ConfigNodeClientManager.getInstance();
 
   public PartitionCache() {
     this.schemaPartitionCache = Caffeine.newBuilder().maximumSize(cacheSize).build();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 11f6ad0494..a3753360c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -74,6 +74,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
 import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
 import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
 import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
 import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.localconfignode.LocalConfigNode;
@@ -176,10 +177,9 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
   private static final Logger LOGGER = LoggerFactory.getLogger(ClusterConfigTaskExecutor.class);
 
   private static final IClientManager<ConfigNodeRegionId, ConfigNodeClient>
-      CONFIG_NODE_CLIENT_MANAGER =
-          new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
-              .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+      CONFIG_NODE_CLIENT_MANAGER = ConfigNodeClientManager.getInstance();
 
+  /** FIXME Consolidate this clientManager with the upper one. */
   private static final IClientManager<ConfigNodeRegionId, ConfigNodeClient>
       CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER =
           new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
@@ -187,6 +187,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
                   new DataNodeClientPoolFactory.ClusterDeletionConfigNodeClientPoolFactory());
 
   private static final class ClusterConfigTaskExecutorHolder {
+
     private static final ClusterConfigTaskExecutor INSTANCE = new ClusterConfigTaskExecutor();
 
     private ClusterConfigTaskExecutorHolder() {}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
index e596c89ede..10d4376d2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
@@ -30,8 +30,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
 import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
 import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
 import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -50,9 +50,7 @@ public class ClusterSyncInfoFetcher implements ISyncInfoFetcher {
   private static final Logger LOGGER = LoggerFactory.getLogger(ClusterSyncInfoFetcher.class);
 
   private static final IClientManager<ConfigNodeRegionId, ConfigNodeClient>
-      CONFIG_NODE_CLIENT_MANAGER =
-          new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
-              .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+      CONFIG_NODE_CLIENT_MANAGER = ConfigNodeClientManager.getInstance();
 
   // region Interfaces of PipeSink
 
diff --git a/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java b/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
index 8d5eb5cdcf..af2f1e5179 100644
--- a/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.trigger.executor;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
@@ -31,9 +30,10 @@ import org.apache.iotdb.commons.trigger.TriggerTable;
 import org.apache.iotdb.commons.trigger.exception.TriggerExecutionException;
 import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
 import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
@@ -68,16 +68,8 @@ public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEv
 
   private static final Logger LOGGER = LoggerFactory.getLogger(TriggerFireVisitor.class);
 
-  private static final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
-      INTERNAL_SERVICE_CLIENT_MANAGER =
-          new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
-              .createClientManager(
-                  new DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
-
   private static final IClientManager<ConfigNodeRegionId, ConfigNodeClient>
-      CONFIG_NODE_CLIENT_MANAGER =
-          new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
-              .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+      CONFIG_NODE_CLIENT_MANAGER = ConfigNodeClientManager.getInstance();
 
   /**
    * How many times should we retry when error occurred during firing a trigger on another datanode
@@ -328,7 +320,9 @@ public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEv
             TriggerManagementService.getInstance()
                 .getDataNodeLocationOfStatefulTrigger(triggerName);
         try (SyncDataNodeInternalServiceClient client =
-            INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(tDataNodeLocation.getInternalEndPoint())) {
+            Coordinator.getInstance()
+                .getInternalServiceClientManager()
+                .borrowClient(tDataNodeLocation.getInternalEndPoint())) {
           TFireTriggerReq req = new TFireTriggerReq(triggerName, tablet.serialize(), event.getId());
           TFireTriggerResp resp = client.fireTrigger(req);
           if (resp.foundExecutor) {
diff --git a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerInformationUpdater.java b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerInformationUpdater.java
index 0aadf06daa..f4b015074d 100644
--- a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerInformationUpdater.java
+++ b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerInformationUpdater.java
@@ -27,8 +27,8 @@ import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
 import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
@@ -45,9 +45,7 @@ public class TriggerInformationUpdater {
   private static final Logger LOGGER = LoggerFactory.getLogger(TriggerInformationUpdater.class);
 
   private static final IClientManager<ConfigNodeRegionId, ConfigNodeClient>
-      CONFIG_NODE_CLIENT_MANAGER =
-          new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
-              .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+      CONFIG_NODE_CLIENT_MANAGER = ConfigNodeClientManager.getInstance();
 
   private final ScheduledExecutorService triggerInformationUpdateExecutor =
       IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java
index bc0c5d5207..add9d54009 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java
@@ -20,9 +20,9 @@
 package org.apache.iotdb.db.mpp.execution.exchange;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
@@ -55,8 +55,7 @@ public class MPPDataExchangeManagerTest {
             Executors.newSingleThreadExecutor(),
             new IClientManager.Factory<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>()
                 .createClientManager(
-                    new DataNodeClientPoolFactory
-                        .SyncDataNodeMPPDataExchangeServiceClientPoolFactory()));
+                    new ClientPoolFactory.SyncDataNodeMPPDataExchangeServiceClientPoolFactory()));
 
     ISinkHandle localSinkHandle =
         mppDataExchangeManager.createLocalSinkHandle(
@@ -98,8 +97,7 @@ public class MPPDataExchangeManagerTest {
             Executors.newSingleThreadExecutor(),
             new IClientManager.Factory<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>()
                 .createClientManager(
-                    new DataNodeClientPoolFactory
-                        .SyncDataNodeMPPDataExchangeServiceClientPoolFactory()));
+                    new ClientPoolFactory.SyncDataNodeMPPDataExchangeServiceClientPoolFactory()));
 
     ISourceHandle localSourceHandle =
         mppDataExchangeManager.createLocalSourceHandle(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
index de492a843b..5d0c47d011 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
@@ -20,10 +20,10 @@
 package org.apache.iotdb.db.mpp.plan.plan;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
@@ -52,7 +52,7 @@ public class QueryPlannerTest {
     internalServiceClientManager =
         new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
             .createClientManager(
-                new DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+                new ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
   }
 
   @AfterClass