You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/04/19 09:43:28 UTC

[iotdb] branch client_manager created (now 0953f6a72a)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch client_manager
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 0953f6a72a make interface more generic

This branch includes the following new commits:

     new 85f4bba214 init
     new 414d2b38e3 add clear & close interface
     new 0953f6a72a make interface more generic

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 02/03: add clear & close interface

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch client_manager
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 414d2b38e3bcc9d152b843bfe2334b91d8ddc8c2
Author: LebronAl <TX...@gmail.com>
AuthorDate: Tue Apr 19 15:02:02 2022 +0800

    add clear & close interface
---
 .../apache/iotdb/commons/client/ClientManager.java   | 20 ++++++++++++++++++++
 .../apache/iotdb/commons/client/IClientManager.java  |  4 ++++
 2 files changed, 24 insertions(+)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
index 10ad2090c0..0513af1a5d 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -68,4 +68,24 @@ public class ClientManager<E> implements IClientManager<E> {
       }
     }
   }
+
+  @Override
+  public void clear(EndPoint node) {
+    if (node != null) {
+      try {
+        pool.clear(node);
+      } catch (Exception e) {
+        logger.error(String.format("clear all client in pool for node %s failed.", node), e);
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    try {
+      pool.close();
+    } catch (Exception e) {
+      logger.error("close client pool failed", e);
+    }
+  }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java
index 11883aefda..00ee3d45a5 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java
@@ -29,4 +29,8 @@ public interface IClientManager<E> {
   Optional<E> borrowClient(EndPoint node) throws IOException;
 
   void returnClient(EndPoint node, E client);
+
+  void clear(EndPoint node);
+
+  void close();
 }


[iotdb] 03/03: make interface more generic

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch client_manager
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0953f6a72ad54349fd3aced1b4e492c18d31743d
Author: LebronAl <TX...@gmail.com>
AuthorDate: Tue Apr 19 17:42:28 2022 +0800

    make interface more generic
---
 .../iotdb/confignode/client/ClientPoolFactory.java |  8 +--
 .../iotdb/consensus/ratis/RatisClientFactory.java  | 67 ++++++++++++++++++++++
 .../iotdb/consensus/ratis/RatisConsensus.java      | 56 +++++++++++++-----
 node-commons/pom.xml                               |  4 ++
 .../commons/client/AsyncBaseClientFactory.java     |  4 +-
 .../iotdb/commons/client/BaseClientFactory.java    | 12 ++--
 .../apache/iotdb/commons/client/ClientManager.java | 19 +++---
 .../commons/client/ClientManagerProperty.java      | 26 ++++-----
 .../iotdb/commons/client/IClientManager.java       | 11 ++--
 .../iotdb/commons/client/IClientPoolFactory.java   |  7 +--
 .../async/AsyncDataNodeInternalServiceClient.java  |  6 +-
 .../sync/SyncDataNodeInternalServiceClient.java    |  6 +-
 .../apache/iotdb/db/client/ClientPoolFactory.java  | 16 +++---
 .../async/AsyncConfigNodeIServiceClient.java       |  6 +-
 .../async/AsyncDataNodeDataBlockServiceClient.java |  6 +-
 .../client/sync/SyncConfigNodeIServiceClient.java  |  6 +-
 .../sync/SyncDataNodeDataBlockServiceClient.java   |  6 +-
 17 files changed, 177 insertions(+), 89 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/ClientPoolFactory.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/ClientPoolFactory.java
index 73bb856065..d8731cb952 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/ClientPoolFactory.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/ClientPoolFactory.java
@@ -36,10 +36,10 @@ public class ClientPoolFactory {
   private static final ConfigNodeConf conf = ConfigNodeDescriptor.getInstance().getConf();
 
   public static class SyncDataNodeInternalServiceClientPoolFactory
-      implements IClientPoolFactory<SyncDataNodeInternalServiceClient> {
+      implements IClientPoolFactory<EndPoint, SyncDataNodeInternalServiceClient> {
     @Override
     public KeyedObjectPool<EndPoint, SyncDataNodeInternalServiceClient> createClientPool(
-        ClientManager<SyncDataNodeInternalServiceClient> manager) {
+        ClientManager<EndPoint, SyncDataNodeInternalServiceClient> manager) {
       ClientManagerProperty<SyncDataNodeInternalServiceClient> property =
           new ClientManagerProperty.Builder<SyncDataNodeInternalServiceClient>()
               .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
@@ -51,10 +51,10 @@ public class ClientPoolFactory {
   }
 
   public static class AsyncDataNodeInternalServiceClientPoolFactory
-      implements IClientPoolFactory<AsyncDataNodeInternalServiceClient> {
+      implements IClientPoolFactory<EndPoint, AsyncDataNodeInternalServiceClient> {
     @Override
     public KeyedObjectPool<EndPoint, AsyncDataNodeInternalServiceClient> createClientPool(
-        ClientManager<AsyncDataNodeInternalServiceClient> manager) {
+        ClientManager<EndPoint, AsyncDataNodeInternalServiceClient> manager) {
       ClientManagerProperty<AsyncDataNodeInternalServiceClient> property =
           new ClientManagerProperty.Builder<AsyncDataNodeInternalServiceClient>()
               .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClientFactory.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClientFactory.java
new file mode 100644
index 0000000000..27d5f7a640
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClientFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.commons.client.BaseClientFactory;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ClientManagerProperty;
+
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftGroup;
+
+public class RatisClientFactory extends BaseClientFactory<RaftGroup, RaftClient> {
+
+  private final RaftProperties raftProperties;
+  private final RaftClientRpc clientRpc;
+
+  public RatisClientFactory(
+      ClientManager<RaftGroup, RaftClient> clientManager,
+      ClientManagerProperty<RaftClient> clientManagerProperty,
+      RaftProperties raftProperties,
+      RaftClientRpc clientRpc) {
+    super(clientManager, clientManagerProperty);
+    this.raftProperties = raftProperties;
+    this.clientRpc = clientRpc;
+  }
+
+  @Override
+  public void destroyObject(RaftGroup key, PooledObject<RaftClient> pooledObject) throws Exception {
+    pooledObject.getObject().close();
+  }
+
+  @Override
+  public PooledObject<RaftClient> makeObject(RaftGroup group) throws Exception {
+    return new DefaultPooledObject<>(
+        RaftClient.newBuilder()
+            .setProperties(raftProperties)
+            .setRaftGroup(group)
+            .setClientRpc(clientRpc)
+            .build());
+  }
+
+  @Override
+  public boolean validateObject(RaftGroup key, PooledObject<RaftClient> pooledObject) {
+    return true;
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index a87cafd2fe..cbf3f713f6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.consensus.ratis;
 
 import org.apache.iotdb.common.rpc.thrift.EndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ClientManagerProperty;
+import org.apache.iotdb.commons.client.IClientPoolFactory;
 import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.IConsensus;
@@ -37,7 +40,10 @@ import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
 import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
 import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
 import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
@@ -74,41 +80,40 @@ import java.util.stream.Collectors;
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 class RatisConsensus implements IConsensus {
+
+  private final Logger logger = LoggerFactory.getLogger(RatisConsensus.class);
+
   // the unique net communication endpoint
   private final RaftPeer myself;
-
   private final RaftServer server;
 
-  private final Map<RaftGroupId, RaftClient> clientMap;
-  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+  private final RaftProperties properties = new RaftProperties();;
+  private final RaftClientRpc clientRpc;
+
+  private final ClientManager<RaftGroup, RaftClient> clientManager =
+      new ClientManager<>(new RatisClientPoolFactory());
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap = new ConcurrentHashMap<>();
+  private final Map<RaftGroupId, RaftClient> clientMap = new ConcurrentHashMap<>();
 
-  private ClientId localFakeId;
-  private AtomicLong localFakeCallId;
+  private final ClientId localFakeId = ClientId.randomId();
+  private final AtomicLong localFakeCallId = new AtomicLong(0);
 
   private static final int DEFAULT_PRIORITY = 0;
   private static final int LEADER_PRIORITY = 1;
 
-  private Logger logger = LoggerFactory.getLogger(RatisConsensus.class);
-
   public RatisConsensus(Endpoint endpoint, File ratisStorageDir, IStateMachine.Registry registry)
       throws IOException {
 
-    this.clientMap = new ConcurrentHashMap<>();
-    this.raftGroupMap = new ConcurrentHashMap<>();
-    this.localFakeId = ClientId.randomId();
-    this.localFakeCallId = new AtomicLong(0);
-
     // create a RaftPeer as endpoint of comm
     String address = Utils.IPAddress(endpoint);
     myself = Utils.toRaftPeer(endpoint, DEFAULT_PRIORITY);
 
-    RaftProperties properties = new RaftProperties();
-
     RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(ratisStorageDir));
 
     // set the port which server listen to in RaftProperty object
     final int port = NetUtils.createSocketAddr(address).getPort();
     GrpcConfigKeys.Server.setPort(properties, port);
+    clientRpc = new GrpcFactory(new Parameters()).newRaftClientRpc(ClientId.randomId(), properties);
 
     server =
         RaftServer.newBuilder()
@@ -128,6 +133,7 @@ class RatisConsensus implements IConsensus {
 
   @Override
   public void stop() throws IOException {
+    clientManager.close();
     server.close();
   }
 
@@ -443,7 +449,7 @@ class RatisConsensus implements IConsensus {
       isLeader = server.getDivision(raftGroupId).getInfo().isLeader();
     } catch (IOException exception) {
       // if the query fails, simply return not leader
-      logger.warn("isLeader request failed with exception: ", exception);
+      logger.info("isLeader request failed with exception: ", exception);
       isLeader = false;
     }
     return isLeader;
@@ -516,6 +522,15 @@ class RatisConsensus implements IConsensus {
     return client;
   }
 
+  private RaftClient getRaftClient(RaftGroup group) {
+    try {
+      return clientManager.borrowClient(group);
+    } catch (IOException e) {
+      logger.error(String.format("Borrow client from pool for group %s failed.", group), e);
+      return null;
+    }
+  }
+
   private void closeRaftClient(RaftGroupId groupId) {
     RaftClient client = clientMap.get(groupId);
     if (client != null) {
@@ -563,4 +578,15 @@ class RatisConsensus implements IConsensus {
     }
     return reply;
   }
+
+  private class RatisClientPoolFactory implements IClientPoolFactory<RaftGroup, RaftClient> {
+    @Override
+    public KeyedObjectPool<RaftGroup, RaftClient> createClientPool(
+        ClientManager<RaftGroup, RaftClient> manager) {
+      ClientManagerProperty<RaftClient> property =
+          new ClientManagerProperty.Builder<RaftClient>().build();
+      return new GenericKeyedObjectPool<>(
+          new RatisClientFactory(manager, property, properties, clientRpc), property.getConfig());
+    }
+  }
 }
diff --git a/node-commons/pom.xml b/node-commons/pom.xml
index f4e3587e38..a88c22bbf3 100644
--- a/node-commons/pom.xml
+++ b/node-commons/pom.xml
@@ -53,6 +53,10 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-pool2</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
index b710f5482c..b822b0043b 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
@@ -26,14 +26,14 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
-public abstract class AsyncBaseClientFactory<K, T> extends BaseClientFactory<K, T> {
+public abstract class AsyncBaseClientFactory<K, V> extends BaseClientFactory<K, V> {
 
   private static final Logger logger = LoggerFactory.getLogger(AsyncBaseClientFactory.class);
   protected TAsyncClientManager[] tManagers;
   protected AtomicInteger clientCnt = new AtomicInteger();
 
   protected AsyncBaseClientFactory(
-      ClientManager<T> clientManager, ClientManagerProperty<T> clientManagerProperty) {
+      ClientManager<K, V> clientManager, ClientManagerProperty<V> clientManagerProperty) {
     super(clientManager, clientManagerProperty);
     tManagers = new TAsyncClientManager[clientManagerProperty.getSelectorNumOfAsyncClientPool()];
     for (int i = 0; i < tManagers.length; i++) {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java
index f628692152..1f85805691 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java
@@ -22,20 +22,20 @@ package org.apache.iotdb.commons.client;
 import org.apache.commons.pool2.KeyedPooledObjectFactory;
 import org.apache.commons.pool2.PooledObject;
 
-public abstract class BaseClientFactory<K, T> implements KeyedPooledObjectFactory<K, T> {
+public abstract class BaseClientFactory<K, V> implements KeyedPooledObjectFactory<K, V> {
 
-  protected ClientManager<T> clientManager;
-  protected ClientManagerProperty<T> clientManagerProperty;
+  protected ClientManager<K, V> clientManager;
+  protected ClientManagerProperty<V> clientManagerProperty;
 
   public BaseClientFactory(
-      ClientManager<T> clientManager, ClientManagerProperty<T> clientManagerProperty) {
+      ClientManager<K, V> clientManager, ClientManagerProperty<V> clientManagerProperty) {
     this.clientManager = clientManager;
     this.clientManagerProperty = clientManagerProperty;
   }
 
   @Override
-  public void activateObject(K node, PooledObject<T> pooledObject) {}
+  public void activateObject(K node, PooledObject<V> pooledObject) {}
 
   @Override
-  public void passivateObject(K node, PooledObject<T> pooledObject) {}
+  public void passivateObject(K node, PooledObject<V> pooledObject) {}
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
index 0513af1a5d..de3b8947cb 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -19,31 +19,28 @@
 
 package org.apache.iotdb.commons.client;
 
-import org.apache.iotdb.common.rpc.thrift.EndPoint;
-
 import org.apache.commons.pool2.KeyedObjectPool;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Optional;
 
-public class ClientManager<E> implements IClientManager<E> {
+public class ClientManager<K, V> implements IClientManager<K, V> {
 
   private static final Logger logger = LoggerFactory.getLogger(ClientManager.class);
 
-  private final KeyedObjectPool<EndPoint, E> pool;
+  private final KeyedObjectPool<K, V> pool;
 
-  public ClientManager(IClientPoolFactory<E> factory) {
+  public ClientManager(IClientPoolFactory<K, V> factory) {
     pool = factory.createClientPool(this);
   }
 
   @Override
-  public Optional<E> borrowClient(EndPoint node) throws IOException {
-    Optional<E> client = Optional.empty();
+  public V borrowClient(K node) throws IOException {
+    V client = null;
     try {
-      client = Optional.of(pool.borrowObject(node));
+      client = pool.borrowObject(node);
     } catch (TTransportException e) {
       // external needs to check transport related exception
       throw new IOException(e);
@@ -58,7 +55,7 @@ public class ClientManager<E> implements IClientManager<E> {
   }
 
   @Override
-  public void returnClient(EndPoint node, E client) {
+  public void returnClient(K node, V client) {
     if (client != null && node != null) {
       try {
         pool.returnObject(node, client);
@@ -70,7 +67,7 @@ public class ClientManager<E> implements IClientManager<E> {
   }
 
   @Override
-  public void clear(EndPoint node) {
+  public void clear(K node) {
     if (node != null) {
       try {
         pool.clear(node);
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManagerProperty.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManagerProperty.java
index b35f163257..5fc95174db 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManagerProperty.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManagerProperty.java
@@ -26,9 +26,9 @@ import org.apache.thrift.protocol.TProtocolFactory;
 
 import java.time.Duration;
 
-public class ClientManagerProperty<T> {
+public class ClientManagerProperty<V> {
 
-  private final GenericKeyedObjectPoolConfig<T> config;
+  private final GenericKeyedObjectPoolConfig<V> config;
 
   // thrift client config
   private final TProtocolFactory protocolFactory;
@@ -36,7 +36,7 @@ public class ClientManagerProperty<T> {
   private int selectorNumOfAsyncClientPool = 1;
 
   public ClientManagerProperty(
-      GenericKeyedObjectPoolConfig<T> config,
+      GenericKeyedObjectPoolConfig<V> config,
       TProtocolFactory protocolFactory,
       int connectionTimeoutMs,
       int selectorNumOfAsyncClientPool) {
@@ -46,7 +46,7 @@ public class ClientManagerProperty<T> {
     this.selectorNumOfAsyncClientPool = selectorNumOfAsyncClientPool;
   }
 
-  public GenericKeyedObjectPoolConfig<T> getConfig() {
+  public GenericKeyedObjectPoolConfig<V> getConfig() {
     return config;
   }
 
@@ -62,7 +62,7 @@ public class ClientManagerProperty<T> {
     return selectorNumOfAsyncClientPool;
   }
 
-  public static class Builder<T> {
+  public static class Builder<V> {
 
     // pool config
     private long waitClientTimeoutMS = 20_000;
@@ -74,38 +74,38 @@ public class ClientManagerProperty<T> {
     private int connectionTimeoutMs = 20_000;
     private int selectorNumOfAsyncClientPool = 1;
 
-    public Builder<T> setWaitClientTimeoutMS(long waitClientTimeoutMS) {
+    public Builder<V> setWaitClientTimeoutMS(long waitClientTimeoutMS) {
       this.waitClientTimeoutMS = waitClientTimeoutMS;
       return this;
     }
 
-    public Builder<T> setMaxConnectionForEachNode(int maxConnectionForEachNode) {
+    public Builder<V> setMaxConnectionForEachNode(int maxConnectionForEachNode) {
       this.maxConnectionForEachNode = maxConnectionForEachNode;
       return this;
     }
 
-    public Builder<T> setMaxIdleConnectionForEachNode(int maxIdleConnectionForEachNode) {
+    public Builder<V> setMaxIdleConnectionForEachNode(int maxIdleConnectionForEachNode) {
       this.maxIdleConnectionForEachNode = maxIdleConnectionForEachNode;
       return this;
     }
 
-    public Builder<T> setRpcThriftCompressionEnabled(boolean rpcThriftCompressionEnabled) {
+    public Builder<V> setRpcThriftCompressionEnabled(boolean rpcThriftCompressionEnabled) {
       this.rpcThriftCompressionEnabled = rpcThriftCompressionEnabled;
       return this;
     }
 
-    public Builder<T> setConnectionTimeoutMs(int connectionTimeoutMs) {
+    public Builder<V> setConnectionTimeoutMs(int connectionTimeoutMs) {
       this.connectionTimeoutMs = connectionTimeoutMs;
       return this;
     }
 
-    public Builder<T> setSelectorNumOfAsyncClientPool(int selectorNumOfAsyncClientPool) {
+    public Builder<V> setSelectorNumOfAsyncClientPool(int selectorNumOfAsyncClientPool) {
       this.selectorNumOfAsyncClientPool = selectorNumOfAsyncClientPool;
       return this;
     }
 
-    public ClientManagerProperty<T> build() {
-      GenericKeyedObjectPoolConfig<T> poolConfig = new GenericKeyedObjectPoolConfig<>();
+    public ClientManagerProperty<V> build() {
+      GenericKeyedObjectPoolConfig<V> poolConfig = new GenericKeyedObjectPoolConfig<>();
       poolConfig.setMaxTotalPerKey(maxConnectionForEachNode);
       poolConfig.setMaxIdlePerKey(maxIdleConnectionForEachNode);
       poolConfig.setMaxWait(Duration.ofMillis(waitClientTimeoutMS));
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java
index 00ee3d45a5..529c78e8b1 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java
@@ -19,18 +19,15 @@
 
 package org.apache.iotdb.commons.client;
 
-import org.apache.iotdb.common.rpc.thrift.EndPoint;
-
 import java.io.IOException;
-import java.util.Optional;
 
-public interface IClientManager<E> {
+public interface IClientManager<K, V> {
 
-  Optional<E> borrowClient(EndPoint node) throws IOException;
+  V borrowClient(K node) throws IOException;
 
-  void returnClient(EndPoint node, E client);
+  void returnClient(K node, V client);
 
-  void clear(EndPoint node);
+  void clear(K node);
 
   void close();
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java
index 80341bf5f5..aa7980decb 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java
@@ -19,12 +19,9 @@
 
 package org.apache.iotdb.commons.client;
 
-import org.apache.iotdb.common.rpc.thrift.EndPoint;
-
 import org.apache.commons.pool2.KeyedObjectPool;
 
-public interface IClientPoolFactory<E> {
+public interface IClientPoolFactory<K, V> {
 
-  KeyedObjectPool<EndPoint, E> createClientPool(
-      ClientManager<E> manager);
+  KeyedObjectPool<K, V> createClientPool(ClientManager<K, V> manager);
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
index 6119b799a8..8547d13890 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
@@ -37,14 +37,14 @@ import java.io.IOException;
 public class AsyncDataNodeInternalServiceClient extends InternalService.AsyncClient {
 
   private final EndPoint endpoint;
-  private final ClientManager<AsyncDataNodeInternalServiceClient> clientManager;
+  private final ClientManager<EndPoint, AsyncDataNodeInternalServiceClient> clientManager;
 
   public AsyncDataNodeInternalServiceClient(
       TProtocolFactory protocolFactory,
       int connectionTimeout,
       EndPoint endpoint,
       TAsyncClientManager tClientManager,
-      ClientManager<AsyncDataNodeInternalServiceClient> clientManager)
+      ClientManager<EndPoint, AsyncDataNodeInternalServiceClient> clientManager)
       throws IOException {
     super(
         protocolFactory,
@@ -92,7 +92,7 @@ public class AsyncDataNodeInternalServiceClient extends InternalService.AsyncCli
       extends AsyncBaseClientFactory<EndPoint, AsyncDataNodeInternalServiceClient> {
 
     public Factory(
-        ClientManager<AsyncDataNodeInternalServiceClient> clientManager,
+        ClientManager<EndPoint, AsyncDataNodeInternalServiceClient> clientManager,
         ClientManagerProperty<AsyncDataNodeInternalServiceClient> clientManagerProperty) {
       super(clientManager, clientManagerProperty);
     }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
index a40104bbbc..c273809554 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
@@ -40,13 +40,13 @@ import java.net.SocketException;
 public class SyncDataNodeInternalServiceClient extends InternalService.Client {
 
   private final EndPoint endpoint;
-  private final ClientManager<SyncDataNodeInternalServiceClient> clientManager;
+  private final ClientManager<EndPoint, SyncDataNodeInternalServiceClient> clientManager;
 
   public SyncDataNodeInternalServiceClient(
       TProtocolFactory protocolFactory,
       int connectionTimeout,
       EndPoint endpoint,
-      ClientManager<SyncDataNodeInternalServiceClient> clientManager)
+      ClientManager<EndPoint, SyncDataNodeInternalServiceClient> clientManager)
       throws TTransportException {
     super(
         protocolFactory.getProtocol(
@@ -84,7 +84,7 @@ public class SyncDataNodeInternalServiceClient extends InternalService.Client {
       extends BaseClientFactory<EndPoint, SyncDataNodeInternalServiceClient> {
 
     public Factory(
-        ClientManager<SyncDataNodeInternalServiceClient> clientManager,
+        ClientManager<EndPoint, SyncDataNodeInternalServiceClient> clientManager,
         ClientManagerProperty<SyncDataNodeInternalServiceClient> clientManagerProperty) {
       super(clientManager, clientManagerProperty);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ClientPoolFactory.java b/server/src/main/java/org/apache/iotdb/db/client/ClientPoolFactory.java
index 63f6c7a45c..e24e5d4e94 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ClientPoolFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ClientPoolFactory.java
@@ -38,10 +38,10 @@ public class ClientPoolFactory {
   private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
 
   public static class SyncConfigNodeIServiceClientPoolFactory
-      implements IClientPoolFactory<SyncConfigNodeIServiceClient> {
+      implements IClientPoolFactory<EndPoint, SyncConfigNodeIServiceClient> {
     @Override
     public KeyedObjectPool<EndPoint, SyncConfigNodeIServiceClient> createClientPool(
-        ClientManager<SyncConfigNodeIServiceClient> manager) {
+        ClientManager<EndPoint, SyncConfigNodeIServiceClient> manager) {
       ClientManagerProperty<SyncConfigNodeIServiceClient> property =
           new ClientManagerProperty.Builder<SyncConfigNodeIServiceClient>()
               .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
@@ -53,10 +53,10 @@ public class ClientPoolFactory {
   }
 
   public static class AsyncConfigNodeIServiceClientPoolFactory
-      implements IClientPoolFactory<AsyncConfigNodeIServiceClient> {
+      implements IClientPoolFactory<EndPoint, AsyncConfigNodeIServiceClient> {
     @Override
     public KeyedObjectPool<EndPoint, AsyncConfigNodeIServiceClient> createClientPool(
-        ClientManager<AsyncConfigNodeIServiceClient> manager) {
+        ClientManager<EndPoint, AsyncConfigNodeIServiceClient> manager) {
       ClientManagerProperty<AsyncConfigNodeIServiceClient> property =
           new ClientManagerProperty.Builder<AsyncConfigNodeIServiceClient>()
               .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
@@ -69,10 +69,10 @@ public class ClientPoolFactory {
   }
 
   public static class SyncDataNodeInternalServiceClientPoolFactory
-      implements IClientPoolFactory<SyncDataNodeInternalServiceClient> {
+      implements IClientPoolFactory<EndPoint, SyncDataNodeInternalServiceClient> {
     @Override
     public KeyedObjectPool<EndPoint, SyncDataNodeInternalServiceClient> createClientPool(
-        ClientManager<SyncDataNodeInternalServiceClient> manager) {
+        ClientManager<EndPoint, SyncDataNodeInternalServiceClient> manager) {
       ClientManagerProperty<SyncDataNodeInternalServiceClient> property =
           new ClientManagerProperty.Builder<SyncDataNodeInternalServiceClient>()
               .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
@@ -84,10 +84,10 @@ public class ClientPoolFactory {
   }
 
   public static class AsyncDataNodeInternalServiceClientPoolFactory
-      implements IClientPoolFactory<AsyncDataNodeInternalServiceClient> {
+      implements IClientPoolFactory<EndPoint, AsyncDataNodeInternalServiceClient> {
     @Override
     public KeyedObjectPool<EndPoint, AsyncDataNodeInternalServiceClient> createClientPool(
-        ClientManager<AsyncDataNodeInternalServiceClient> manager) {
+        ClientManager<EndPoint, AsyncDataNodeInternalServiceClient> manager) {
       ClientManagerProperty<AsyncDataNodeInternalServiceClient> property =
           new ClientManagerProperty.Builder<AsyncDataNodeInternalServiceClient>()
               .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
diff --git a/server/src/main/java/org/apache/iotdb/db/client/async/AsyncConfigNodeIServiceClient.java b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncConfigNodeIServiceClient.java
index 00ac180429..48bb173b83 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/async/AsyncConfigNodeIServiceClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncConfigNodeIServiceClient.java
@@ -36,14 +36,14 @@ import java.io.IOException;
 public class AsyncConfigNodeIServiceClient extends ConfigIService.AsyncClient {
 
   private final EndPoint endpoint;
-  private final ClientManager<AsyncConfigNodeIServiceClient> clientManager;
+  private final ClientManager<EndPoint, AsyncConfigNodeIServiceClient> clientManager;
 
   public AsyncConfigNodeIServiceClient(
       TProtocolFactory protocolFactory,
       int connectionTimeout,
       EndPoint endpoint,
       TAsyncClientManager tClientManager,
-      ClientManager<AsyncConfigNodeIServiceClient> clientManager)
+      ClientManager<EndPoint, AsyncConfigNodeIServiceClient> clientManager)
       throws IOException {
     super(
         protocolFactory,
@@ -91,7 +91,7 @@ public class AsyncConfigNodeIServiceClient extends ConfigIService.AsyncClient {
       extends AsyncBaseClientFactory<EndPoint, AsyncConfigNodeIServiceClient> {
 
     public Factory(
-        ClientManager<AsyncConfigNodeIServiceClient> clientManager,
+        ClientManager<EndPoint, AsyncConfigNodeIServiceClient> clientManager,
         ClientManagerProperty<AsyncConfigNodeIServiceClient> clientManagerProperty) {
       super(clientManager, clientManagerProperty);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/client/async/AsyncDataNodeDataBlockServiceClient.java b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncDataNodeDataBlockServiceClient.java
index 1701083464..5b4e81e775 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/async/AsyncDataNodeDataBlockServiceClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncDataNodeDataBlockServiceClient.java
@@ -36,14 +36,14 @@ import java.io.IOException;
 public class AsyncDataNodeDataBlockServiceClient extends DataBlockService.AsyncClient {
 
   private final EndPoint endpoint;
-  private final ClientManager<AsyncDataNodeDataBlockServiceClient> clientManager;
+  private final ClientManager<EndPoint, AsyncDataNodeDataBlockServiceClient> clientManager;
 
   public AsyncDataNodeDataBlockServiceClient(
       TProtocolFactory protocolFactory,
       int connectionTimeout,
       EndPoint endpoint,
       TAsyncClientManager tClientManager,
-      ClientManager<AsyncDataNodeDataBlockServiceClient> clientManager)
+      ClientManager<EndPoint, AsyncDataNodeDataBlockServiceClient> clientManager)
       throws IOException {
     super(
         protocolFactory,
@@ -91,7 +91,7 @@ public class AsyncDataNodeDataBlockServiceClient extends DataBlockService.AsyncC
       extends AsyncBaseClientFactory<EndPoint, AsyncDataNodeDataBlockServiceClient> {
 
     public Factory(
-        ClientManager<AsyncDataNodeDataBlockServiceClient> clientManager,
+        ClientManager<EndPoint, AsyncDataNodeDataBlockServiceClient> clientManager,
         ClientManagerProperty<AsyncDataNodeDataBlockServiceClient> clientManagerProperty) {
       super(clientManager, clientManagerProperty);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/client/sync/SyncConfigNodeIServiceClient.java b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncConfigNodeIServiceClient.java
index 60e01bfc0c..ab9121f8de 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/sync/SyncConfigNodeIServiceClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncConfigNodeIServiceClient.java
@@ -39,13 +39,13 @@ import java.net.SocketException;
 public class SyncConfigNodeIServiceClient extends ConfigIService.Client {
 
   private final EndPoint endpoint;
-  private final ClientManager<SyncConfigNodeIServiceClient> clientManager;
+  private final ClientManager<EndPoint, SyncConfigNodeIServiceClient> clientManager;
 
   public SyncConfigNodeIServiceClient(
       TProtocolFactory protocolFactory,
       int connectionTimeout,
       EndPoint endpoint,
-      ClientManager<SyncConfigNodeIServiceClient> clientManager)
+      ClientManager<EndPoint, SyncConfigNodeIServiceClient> clientManager)
       throws TTransportException {
     super(
         protocolFactory.getProtocol(
@@ -82,7 +82,7 @@ public class SyncConfigNodeIServiceClient extends ConfigIService.Client {
   public static class Factory extends BaseClientFactory<EndPoint, SyncConfigNodeIServiceClient> {
 
     public Factory(
-        ClientManager<SyncConfigNodeIServiceClient> clientManager,
+        ClientManager<EndPoint, SyncConfigNodeIServiceClient> clientManager,
         ClientManagerProperty<SyncConfigNodeIServiceClient> clientManagerProperty) {
       super(clientManager, clientManagerProperty);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/client/sync/SyncDataNodeDataBlockServiceClient.java b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncDataNodeDataBlockServiceClient.java
index f6188709a9..7f598e6159 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/sync/SyncDataNodeDataBlockServiceClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncDataNodeDataBlockServiceClient.java
@@ -39,13 +39,13 @@ import java.net.SocketException;
 public class SyncDataNodeDataBlockServiceClient extends DataBlockService.Client {
 
   private final EndPoint endpoint;
-  private final ClientManager<SyncDataNodeDataBlockServiceClient> clientManager;
+  private final ClientManager<EndPoint, SyncDataNodeDataBlockServiceClient> clientManager;
 
   public SyncDataNodeDataBlockServiceClient(
       TProtocolFactory protocolFactory,
       int connectionTimeout,
       EndPoint endpoint,
-      ClientManager<SyncDataNodeDataBlockServiceClient> clientManager)
+      ClientManager<EndPoint, SyncDataNodeDataBlockServiceClient> clientManager)
       throws TTransportException {
     super(
         protocolFactory.getProtocol(
@@ -83,7 +83,7 @@ public class SyncDataNodeDataBlockServiceClient extends DataBlockService.Client
       extends BaseClientFactory<EndPoint, SyncDataNodeDataBlockServiceClient> {
 
     public Factory(
-        ClientManager<SyncDataNodeDataBlockServiceClient> clientManager,
+        ClientManager<EndPoint, SyncDataNodeDataBlockServiceClient> clientManager,
         ClientManagerProperty<SyncDataNodeDataBlockServiceClient> clientManagerProperty) {
       super(clientManager, clientManagerProperty);
     }


[iotdb] 01/03: init

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch client_manager
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 85f4bba214677dfa61bdec81a1e666a0531d7c1e
Author: LebronAl <TX...@gmail.com>
AuthorDate: Tue Apr 19 14:49:45 2022 +0800

    init
---
 .../iotdb/confignode/client/ClientPoolFactory.java |  68 +++++++++++
 .../iotdb/confignode/conf/ConfigNodeConf.java      |  27 ++++-
 node-commons/pom.xml                               |  10 +-
 .../commons/client/AsyncBaseClientFactory.java     |  47 ++++++++
 .../iotdb/commons/client/BaseClientFactory.java    |  41 +++++++
 .../apache/iotdb/commons/client/ClientManager.java |  71 ++++++++++++
 .../commons/client/ClientManagerProperty.java      | 123 ++++++++++++++++++++
 .../iotdb/commons/client/IClientManager.java       |  32 ++++++
 .../iotdb/commons/client/IClientPoolFactory.java   |  30 +++++
 .../async/AsyncDataNodeInternalServiceClient.java  | 126 +++++++++++++++++++++
 .../sync/SyncDataNodeInternalServiceClient.java    | 116 +++++++++++++++++++
 .../apache/iotdb/db/client/ClientPoolFactory.java  | 101 +++++++++++++++++
 .../async/AsyncConfigNodeIServiceClient.java       | 125 ++++++++++++++++++++
 .../async/AsyncDataNodeDataBlockServiceClient.java | 125 ++++++++++++++++++++
 .../client/sync/SyncConfigNodeIServiceClient.java  | 114 +++++++++++++++++++
 .../sync/SyncDataNodeDataBlockServiceClient.java   | 115 +++++++++++++++++++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  24 ++++
 17 files changed, 1290 insertions(+), 5 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/ClientPoolFactory.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/ClientPoolFactory.java
new file mode 100644
index 0000000000..73bb856065
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/ClientPoolFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.client;
+
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ClientManagerProperty;
+import org.apache.iotdb.commons.client.IClientPoolFactory;
+import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.confignode.conf.ConfigNodeConf;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+
+public class ClientPoolFactory {
+
+  private static final ConfigNodeConf conf = ConfigNodeDescriptor.getInstance().getConf();
+
+  public static class SyncDataNodeInternalServiceClientPoolFactory
+      implements IClientPoolFactory<SyncDataNodeInternalServiceClient> {
+    @Override
+    public KeyedObjectPool<EndPoint, SyncDataNodeInternalServiceClient> createClientPool(
+        ClientManager<SyncDataNodeInternalServiceClient> manager) {
+      ClientManagerProperty<SyncDataNodeInternalServiceClient> property =
+          new ClientManagerProperty.Builder<SyncDataNodeInternalServiceClient>()
+              .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+              .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+              .build();
+      return new GenericKeyedObjectPool<>(
+          new SyncDataNodeInternalServiceClient.Factory(manager, property), property.getConfig());
+    }
+  }
+
+  public static class AsyncDataNodeInternalServiceClientPoolFactory
+      implements IClientPoolFactory<AsyncDataNodeInternalServiceClient> {
+    @Override
+    public KeyedObjectPool<EndPoint, AsyncDataNodeInternalServiceClient> createClientPool(
+        ClientManager<AsyncDataNodeInternalServiceClient> manager) {
+      ClientManagerProperty<AsyncDataNodeInternalServiceClient> property =
+          new ClientManagerProperty.Builder<AsyncDataNodeInternalServiceClient>()
+              .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+              .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+              .setSelectorNumOfAsyncClientPool(conf.getSelectorNumOfClientPool())
+              .build();
+      return new GenericKeyedObjectPool<>(
+          new AsyncDataNodeInternalServiceClient.Factory(manager, property), property.getConfig());
+    }
+  }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
index 27a60ea35b..9a9b6aa336 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.rpc.RpcUtils;
 
 import java.io.File;
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 
 public class ConfigNodeConf {
 
@@ -32,10 +33,12 @@ public class ConfigNodeConf {
 
   /** used for communication between data node and config node */
   private int rpcPort = 22277;
-
   /** used for communication between config node and config node */
   private int internalPort = 22278;
 
+  /** Thrift socket and connection timeout between data node and config node */
+  private int connectionTimeoutInMS = (int) TimeUnit.SECONDS.toMillis(20);
+
   /** ConfigNode consensus protocol */
   private String configNodeConsensusProtocolClass =
       "org.apache.iotdb.consensus.ratis.RatisConsensus";
@@ -46,6 +49,15 @@ public class ConfigNodeConf {
   private Endpoint[] configNodeGroupAddressList =
       Collections.singletonList(new Endpoint("0.0.0.0", 22278)).toArray(new Endpoint[0]);
 
+  /**
+   * ClientPool will have so many selector threads (TAsyncClientManager) to distribute to its
+   * clients.
+   */
+  private final int selectorNumOfClientPool =
+      Runtime.getRuntime().availableProcessors() / 4 > 0
+          ? Runtime.getRuntime().availableProcessors() / 4
+          : 1;
+
   /** Number of SeriesPartitionSlots per StorageGroup */
   private int seriesPartitionSlotNum = 10000;
 
@@ -141,6 +153,10 @@ public class ConfigNodeConf {
     this.seriesPartitionExecutorClass = seriesPartitionExecutorClass;
   }
 
+  public int getSelectorNumOfClientPool() {
+    return selectorNumOfClientPool;
+  }
+
   public long getTimePartitionInterval() {
     return timePartitionInterval;
   }
@@ -213,6 +229,15 @@ public class ConfigNodeConf {
     this.internalPort = internalPort;
   }
 
+  public int getConnectionTimeoutInMS() {
+    return connectionTimeoutInMS;
+  }
+
+  public ConfigNodeConf setConnectionTimeoutInMS(int connectionTimeoutInMS) {
+    this.connectionTimeoutInMS = connectionTimeoutInMS;
+    return this;
+  }
+
   public String getConsensusDir() {
     return consensusDir;
   }
diff --git a/node-commons/pom.xml b/node-commons/pom.xml
index 3997cc6c00..f4e3587e38 100644
--- a/node-commons/pom.xml
+++ b/node-commons/pom.xml
@@ -39,7 +39,6 @@
         <dependency>
             <groupId>org.apache.thrift</groupId>
             <artifactId>libthrift</artifactId>
-            <version>${thrift.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.iotdb</groupId>
@@ -47,9 +46,12 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-            <version>[${guava.version},)</version>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-pool2</artifactId>
         </dependency>
     </dependencies>
     <build>
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
new file mode 100644
index 0000000000..b710f5482c
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client;
+
+import org.apache.thrift.async.TAsyncClientManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class AsyncBaseClientFactory<K, T> extends BaseClientFactory<K, T> {
+
+  private static final Logger logger = LoggerFactory.getLogger(AsyncBaseClientFactory.class);
+  protected TAsyncClientManager[] tManagers;
+  protected AtomicInteger clientCnt = new AtomicInteger();
+
+  protected AsyncBaseClientFactory(
+      ClientManager<T> clientManager, ClientManagerProperty<T> clientManagerProperty) {
+    super(clientManager, clientManagerProperty);
+    tManagers = new TAsyncClientManager[clientManagerProperty.getSelectorNumOfAsyncClientPool()];
+    for (int i = 0; i < tManagers.length; i++) {
+      try {
+        tManagers[i] = new TAsyncClientManager();
+      } catch (IOException e) {
+        logger.error("Cannot create Async client factory", e);
+      }
+    }
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java
new file mode 100644
index 0000000000..f628692152
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client;
+
+import org.apache.commons.pool2.KeyedPooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+
+public abstract class BaseClientFactory<K, T> implements KeyedPooledObjectFactory<K, T> {
+
+  protected ClientManager<T> clientManager;
+  protected ClientManagerProperty<T> clientManagerProperty;
+
+  public BaseClientFactory(
+      ClientManager<T> clientManager, ClientManagerProperty<T> clientManagerProperty) {
+    this.clientManager = clientManager;
+    this.clientManagerProperty = clientManagerProperty;
+  }
+
+  @Override
+  public void activateObject(K node, PooledObject<T> pooledObject) {}
+
+  @Override
+  public void passivateObject(K node, PooledObject<T> pooledObject) {}
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
new file mode 100644
index 0000000000..10ad2090c0
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client;
+
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
+
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Optional;
+
+public class ClientManager<E> implements IClientManager<E> {
+
+  private static final Logger logger = LoggerFactory.getLogger(ClientManager.class);
+
+  private final KeyedObjectPool<EndPoint, E> pool;
+
+  public ClientManager(IClientPoolFactory<E> factory) {
+    pool = factory.createClientPool(this);
+  }
+
+  @Override
+  public Optional<E> borrowClient(EndPoint node) throws IOException {
+    Optional<E> client = Optional.empty();
+    try {
+      client = Optional.of(pool.borrowObject(node));
+    } catch (TTransportException e) {
+      // external needs to check transport related exception
+      throw new IOException(e);
+    } catch (IOException e) {
+      // external needs the IOException to check connection
+      throw e;
+    } catch (Exception e) {
+      // external doesn't care of other exceptions
+      logger.error(String.format("Borrow client from pool for node %s failed.", node), e);
+    }
+    return client;
+  }
+
+  @Override
+  public void returnClient(EndPoint node, E client) {
+    if (client != null && node != null) {
+      try {
+        pool.returnObject(node, client);
+      } catch (Exception e) {
+        logger.error(
+            String.format("Return client %s for node %s to pool failed.", client, node), e);
+      }
+    }
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManagerProperty.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManagerProperty.java
new file mode 100644
index 0000000000..b35f163257
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManagerProperty.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client;
+
+import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+
+import java.time.Duration;
+
+public class ClientManagerProperty<T> {
+
+  private final GenericKeyedObjectPoolConfig<T> config;
+
+  // thrift client config
+  private final TProtocolFactory protocolFactory;
+  private int connectionTimeoutMs = 20_000;
+  private int selectorNumOfAsyncClientPool = 1;
+
+  public ClientManagerProperty(
+      GenericKeyedObjectPoolConfig<T> config,
+      TProtocolFactory protocolFactory,
+      int connectionTimeoutMs,
+      int selectorNumOfAsyncClientPool) {
+    this.config = config;
+    this.protocolFactory = protocolFactory;
+    this.connectionTimeoutMs = connectionTimeoutMs;
+    this.selectorNumOfAsyncClientPool = selectorNumOfAsyncClientPool;
+  }
+
+  public GenericKeyedObjectPoolConfig<T> getConfig() {
+    return config;
+  }
+
+  public TProtocolFactory getProtocolFactory() {
+    return protocolFactory;
+  }
+
+  public int getConnectionTimeoutMs() {
+    return connectionTimeoutMs;
+  }
+
+  public int getSelectorNumOfAsyncClientPool() {
+    return selectorNumOfAsyncClientPool;
+  }
+
+  public static class Builder<T> {
+
+    // pool config
+    private long waitClientTimeoutMS = 20_000;
+    private int maxConnectionForEachNode = 1_000;
+    private int maxIdleConnectionForEachNode = 1_000;
+
+    // thrift client config
+    private boolean rpcThriftCompressionEnabled = false;
+    private int connectionTimeoutMs = 20_000;
+    private int selectorNumOfAsyncClientPool = 1;
+
+    public Builder<T> setWaitClientTimeoutMS(long waitClientTimeoutMS) {
+      this.waitClientTimeoutMS = waitClientTimeoutMS;
+      return this;
+    }
+
+    public Builder<T> setMaxConnectionForEachNode(int maxConnectionForEachNode) {
+      this.maxConnectionForEachNode = maxConnectionForEachNode;
+      return this;
+    }
+
+    public Builder<T> setMaxIdleConnectionForEachNode(int maxIdleConnectionForEachNode) {
+      this.maxIdleConnectionForEachNode = maxIdleConnectionForEachNode;
+      return this;
+    }
+
+    public Builder<T> setRpcThriftCompressionEnabled(boolean rpcThriftCompressionEnabled) {
+      this.rpcThriftCompressionEnabled = rpcThriftCompressionEnabled;
+      return this;
+    }
+
+    public Builder<T> setConnectionTimeoutMs(int connectionTimeoutMs) {
+      this.connectionTimeoutMs = connectionTimeoutMs;
+      return this;
+    }
+
+    public Builder<T> setSelectorNumOfAsyncClientPool(int selectorNumOfAsyncClientPool) {
+      this.selectorNumOfAsyncClientPool = selectorNumOfAsyncClientPool;
+      return this;
+    }
+
+    public ClientManagerProperty<T> build() {
+      GenericKeyedObjectPoolConfig<T> poolConfig = new GenericKeyedObjectPoolConfig<>();
+      poolConfig.setMaxTotalPerKey(maxConnectionForEachNode);
+      poolConfig.setMaxIdlePerKey(maxIdleConnectionForEachNode);
+      poolConfig.setMaxWait(Duration.ofMillis(waitClientTimeoutMS));
+      poolConfig.setTestOnReturn(true);
+      poolConfig.setTestOnBorrow(true);
+      return new ClientManagerProperty<>(
+          poolConfig,
+          rpcThriftCompressionEnabled
+              ? new TCompactProtocol.Factory()
+              : new TBinaryProtocol.Factory(),
+          connectionTimeoutMs,
+          selectorNumOfAsyncClientPool);
+    }
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java
new file mode 100644
index 0000000000..11883aefda
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client;
+
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
+
+import java.io.IOException;
+import java.util.Optional;
+
+public interface IClientManager<E> {
+
+  Optional<E> borrowClient(EndPoint node) throws IOException;
+
+  void returnClient(EndPoint node, E client);
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java
new file mode 100644
index 0000000000..80341bf5f5
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client;
+
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
+
+import org.apache.commons.pool2.KeyedObjectPool;
+
+public interface IClientPoolFactory<E> {
+
+  KeyedObjectPool<EndPoint, E> createClientPool(
+      ClientManager<E> manager);
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
new file mode 100644
index 0000000000..6119b799a8
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client.async;
+
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
+import org.apache.iotdb.commons.client.AsyncBaseClientFactory;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ClientManagerProperty;
+import org.apache.iotdb.mpp.rpc.thrift.InternalService;
+import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
+
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.protocol.TProtocolFactory;
+
+import java.io.IOException;
+
+/** We put the client in this module because it is used by both ConfigNode and Datanode */
+public class AsyncDataNodeInternalServiceClient extends InternalService.AsyncClient {
+
+  private final EndPoint endpoint;
+  private final ClientManager<AsyncDataNodeInternalServiceClient> clientManager;
+
+  public AsyncDataNodeInternalServiceClient(
+      TProtocolFactory protocolFactory,
+      int connectionTimeout,
+      EndPoint endpoint,
+      TAsyncClientManager tClientManager,
+      ClientManager<AsyncDataNodeInternalServiceClient> clientManager)
+      throws IOException {
+    super(
+        protocolFactory,
+        tClientManager,
+        TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), connectionTimeout));
+    this.endpoint = endpoint;
+    this.clientManager = clientManager;
+  }
+
+  public void close() {
+    ___transport.close();
+    ___currentMethod = null;
+  }
+
+  public boolean isValid() {
+    return ___transport != null;
+  }
+
+  /**
+   * return self if clientPool is not null, the method doesn't need to call by user, it will trigger
+   * once client transport complete.
+   */
+  private void returnSelf() {
+    if (clientManager != null) {
+      clientManager.returnClient(endpoint, this);
+    }
+  }
+
+  @Override
+  public void onComplete() {
+    super.onComplete();
+    returnSelf();
+  }
+
+  public boolean isReady() {
+    try {
+      checkReady();
+      return true;
+    } catch (Exception e) {
+      return false;
+    }
+  }
+
+  public static class Factory
+      extends AsyncBaseClientFactory<EndPoint, AsyncDataNodeInternalServiceClient> {
+
+    public Factory(
+        ClientManager<AsyncDataNodeInternalServiceClient> clientManager,
+        ClientManagerProperty<AsyncDataNodeInternalServiceClient> clientManagerProperty) {
+      super(clientManager, clientManagerProperty);
+    }
+
+    @Override
+    public void destroyObject(
+        EndPoint endPoint, PooledObject<AsyncDataNodeInternalServiceClient> pooledObject) {
+      pooledObject.getObject().close();
+    }
+
+    @Override
+    public PooledObject<AsyncDataNodeInternalServiceClient> makeObject(EndPoint endPoint)
+        throws Exception {
+      TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % tManagers.length];
+      tManager = tManager == null ? new TAsyncClientManager() : tManager;
+      return new DefaultPooledObject<>(
+          new AsyncDataNodeInternalServiceClient(
+              clientManagerProperty.getProtocolFactory(),
+              clientManagerProperty.getConnectionTimeoutMs(),
+              endPoint,
+              tManager,
+              clientManager));
+    }
+
+    @Override
+    public boolean validateObject(
+        EndPoint endPoint, PooledObject<AsyncDataNodeInternalServiceClient> pooledObject) {
+      return pooledObject.getObject() != null && pooledObject.getObject().isValid();
+    }
+  }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
new file mode 100644
index 0000000000..a40104bbbc
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client.sync;
+
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
+import org.apache.iotdb.commons.client.BaseClientFactory;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ClientManagerProperty;
+import org.apache.iotdb.mpp.rpc.thrift.InternalService;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TimeoutChangeableTransport;
+
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
+
+import java.net.SocketException;
+
+/** We put the client in this module because it is used by both ConfigNode and Datanode */
+public class SyncDataNodeInternalServiceClient extends InternalService.Client {
+
+  private final EndPoint endpoint;
+  private final ClientManager<SyncDataNodeInternalServiceClient> clientManager;
+
+  public SyncDataNodeInternalServiceClient(
+      TProtocolFactory protocolFactory,
+      int connectionTimeout,
+      EndPoint endpoint,
+      ClientManager<SyncDataNodeInternalServiceClient> clientManager)
+      throws TTransportException {
+    super(
+        protocolFactory.getProtocol(
+            RpcTransportFactory.INSTANCE.getTransport(
+                new TSocket(
+                    TConfigurationConst.defaultTConfiguration,
+                    endpoint.getIp(),
+                    endpoint.getPort(),
+                    connectionTimeout))));
+    this.endpoint = endpoint;
+    this.clientManager = clientManager;
+    getInputProtocol().getTransport().open();
+  }
+
+  public void returnSelf() {
+    if (clientManager != null) {
+      clientManager.returnClient(endpoint, this);
+    }
+  }
+
+  public void setTimeout(int timeout) {
+    // the same transport is used in both input and output
+    ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
+  }
+
+  public void close() {
+    getInputProtocol().getTransport().close();
+  }
+
+  public int getTimeout() throws SocketException {
+    return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
+  }
+
+  public static class Factory
+      extends BaseClientFactory<EndPoint, SyncDataNodeInternalServiceClient> {
+
+    public Factory(
+        ClientManager<SyncDataNodeInternalServiceClient> clientManager,
+        ClientManagerProperty<SyncDataNodeInternalServiceClient> clientManagerProperty) {
+      super(clientManager, clientManagerProperty);
+    }
+
+    @Override
+    public void destroyObject(
+        EndPoint endpoint, PooledObject<SyncDataNodeInternalServiceClient> pooledObject) {
+      pooledObject.getObject().close();
+    }
+
+    @Override
+    public PooledObject<SyncDataNodeInternalServiceClient> makeObject(EndPoint endpoint)
+        throws Exception {
+      return new DefaultPooledObject<>(
+          new SyncDataNodeInternalServiceClient(
+              clientManagerProperty.getProtocolFactory(),
+              clientManagerProperty.getConnectionTimeoutMs(),
+              endpoint,
+              clientManager));
+    }
+
+    @Override
+    public boolean validateObject(
+        EndPoint endpoint, PooledObject<SyncDataNodeInternalServiceClient> pooledObject) {
+      return pooledObject.getObject() != null
+          && pooledObject.getObject().getInputProtocol().getTransport().isOpen();
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ClientPoolFactory.java b/server/src/main/java/org/apache/iotdb/db/client/ClientPoolFactory.java
new file mode 100644
index 0000000000..63f6c7a45c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/client/ClientPoolFactory.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.client;
+
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ClientManagerProperty;
+import org.apache.iotdb.commons.client.IClientPoolFactory;
+import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.db.client.async.AsyncConfigNodeIServiceClient;
+import org.apache.iotdb.db.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+
+public class ClientPoolFactory {
+
+  private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
+
+  public static class SyncConfigNodeIServiceClientPoolFactory
+      implements IClientPoolFactory<SyncConfigNodeIServiceClient> {
+    @Override
+    public KeyedObjectPool<EndPoint, SyncConfigNodeIServiceClient> createClientPool(
+        ClientManager<SyncConfigNodeIServiceClient> manager) {
+      ClientManagerProperty<SyncConfigNodeIServiceClient> property =
+          new ClientManagerProperty.Builder<SyncConfigNodeIServiceClient>()
+              .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+              .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
+              .build();
+      return new GenericKeyedObjectPool<>(
+          new SyncConfigNodeIServiceClient.Factory(manager, property), property.getConfig());
+    }
+  }
+
+  public static class AsyncConfigNodeIServiceClientPoolFactory
+      implements IClientPoolFactory<AsyncConfigNodeIServiceClient> {
+    @Override
+    public KeyedObjectPool<EndPoint, AsyncConfigNodeIServiceClient> createClientPool(
+        ClientManager<AsyncConfigNodeIServiceClient> manager) {
+      ClientManagerProperty<AsyncConfigNodeIServiceClient> property =
+          new ClientManagerProperty.Builder<AsyncConfigNodeIServiceClient>()
+              .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+              .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
+              .setSelectorNumOfAsyncClientPool(conf.getSelectorNumOfClientPool())
+              .build();
+      return new GenericKeyedObjectPool<>(
+          new AsyncConfigNodeIServiceClient.Factory(manager, property), property.getConfig());
+    }
+  }
+
+  public static class SyncDataNodeInternalServiceClientPoolFactory
+      implements IClientPoolFactory<SyncDataNodeInternalServiceClient> {
+    @Override
+    public KeyedObjectPool<EndPoint, SyncDataNodeInternalServiceClient> createClientPool(
+        ClientManager<SyncDataNodeInternalServiceClient> manager) {
+      ClientManagerProperty<SyncDataNodeInternalServiceClient> property =
+          new ClientManagerProperty.Builder<SyncDataNodeInternalServiceClient>()
+              .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+              .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
+              .build();
+      return new GenericKeyedObjectPool<>(
+          new SyncDataNodeInternalServiceClient.Factory(manager, property), property.getConfig());
+    }
+  }
+
+  public static class AsyncDataNodeInternalServiceClientPoolFactory
+      implements IClientPoolFactory<AsyncDataNodeInternalServiceClient> {
+    @Override
+    public KeyedObjectPool<EndPoint, AsyncDataNodeInternalServiceClient> createClientPool(
+        ClientManager<AsyncDataNodeInternalServiceClient> manager) {
+      ClientManagerProperty<AsyncDataNodeInternalServiceClient> property =
+          new ClientManagerProperty.Builder<AsyncDataNodeInternalServiceClient>()
+              .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+              .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
+              .setSelectorNumOfAsyncClientPool(conf.getSelectorNumOfClientPool())
+              .build();
+      return new GenericKeyedObjectPool<>(
+          new AsyncDataNodeInternalServiceClient.Factory(manager, property), property.getConfig());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/client/async/AsyncConfigNodeIServiceClient.java b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncConfigNodeIServiceClient.java
new file mode 100644
index 0000000000..00ac180429
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncConfigNodeIServiceClient.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.client.async;
+
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
+import org.apache.iotdb.commons.client.AsyncBaseClientFactory;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ClientManagerProperty;
+import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
+import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
+
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.protocol.TProtocolFactory;
+
+import java.io.IOException;
+
+public class AsyncConfigNodeIServiceClient extends ConfigIService.AsyncClient {
+
+  private final EndPoint endpoint;
+  private final ClientManager<AsyncConfigNodeIServiceClient> clientManager;
+
+  public AsyncConfigNodeIServiceClient(
+      TProtocolFactory protocolFactory,
+      int connectionTimeout,
+      EndPoint endpoint,
+      TAsyncClientManager tClientManager,
+      ClientManager<AsyncConfigNodeIServiceClient> clientManager)
+      throws IOException {
+    super(
+        protocolFactory,
+        tClientManager,
+        TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), connectionTimeout));
+    this.endpoint = endpoint;
+    this.clientManager = clientManager;
+  }
+
+  public void close() {
+    ___transport.close();
+    ___currentMethod = null;
+  }
+
+  public boolean isValid() {
+    return ___transport != null;
+  }
+
+  /**
+   * return self if clientPool is not null, the method doesn't need to call by user, it will trigger
+   * once client transport complete.
+   */
+  private void returnSelf() {
+    if (clientManager != null) {
+      clientManager.returnClient(endpoint, this);
+    }
+  }
+
+  @Override
+  public void onComplete() {
+    super.onComplete();
+    returnSelf();
+  }
+
+  public boolean isReady() {
+    try {
+      checkReady();
+      return true;
+    } catch (Exception e) {
+      return false;
+    }
+  }
+
+  public static class Factory
+      extends AsyncBaseClientFactory<EndPoint, AsyncConfigNodeIServiceClient> {
+
+    public Factory(
+        ClientManager<AsyncConfigNodeIServiceClient> clientManager,
+        ClientManagerProperty<AsyncConfigNodeIServiceClient> clientManagerProperty) {
+      super(clientManager, clientManagerProperty);
+    }
+
+    @Override
+    public void destroyObject(
+        EndPoint endPoint, PooledObject<AsyncConfigNodeIServiceClient> pooledObject) {
+      pooledObject.getObject().close();
+    }
+
+    @Override
+    public PooledObject<AsyncConfigNodeIServiceClient> makeObject(EndPoint endPoint)
+        throws Exception {
+      TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % tManagers.length];
+      tManager = tManager == null ? new TAsyncClientManager() : tManager;
+      return new DefaultPooledObject<>(
+          new AsyncConfigNodeIServiceClient(
+              clientManagerProperty.getProtocolFactory(),
+              clientManagerProperty.getConnectionTimeoutMs(),
+              endPoint,
+              tManager,
+              clientManager));
+    }
+
+    @Override
+    public boolean validateObject(
+        EndPoint endPoint, PooledObject<AsyncConfigNodeIServiceClient> pooledObject) {
+      return pooledObject.getObject() != null && pooledObject.getObject().isValid();
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/client/async/AsyncDataNodeDataBlockServiceClient.java b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncDataNodeDataBlockServiceClient.java
new file mode 100644
index 0000000000..1701083464
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncDataNodeDataBlockServiceClient.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.client.async;
+
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
+import org.apache.iotdb.commons.client.AsyncBaseClientFactory;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ClientManagerProperty;
+import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
+import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
+
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.protocol.TProtocolFactory;
+
+import java.io.IOException;
+
+public class AsyncDataNodeDataBlockServiceClient extends DataBlockService.AsyncClient {
+
+  private final EndPoint endpoint;
+  private final ClientManager<AsyncDataNodeDataBlockServiceClient> clientManager;
+
+  public AsyncDataNodeDataBlockServiceClient(
+      TProtocolFactory protocolFactory,
+      int connectionTimeout,
+      EndPoint endpoint,
+      TAsyncClientManager tClientManager,
+      ClientManager<AsyncDataNodeDataBlockServiceClient> clientManager)
+      throws IOException {
+    super(
+        protocolFactory,
+        tClientManager,
+        TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), connectionTimeout));
+    this.endpoint = endpoint;
+    this.clientManager = clientManager;
+  }
+
+  public void close() {
+    ___transport.close();
+    ___currentMethod = null;
+  }
+
+  public boolean isValid() {
+    return ___transport != null;
+  }
+
+  /**
+   * return self if clientPool is not null, the method doesn't need to call by user, it will trigger
+   * once client transport complete.
+   */
+  private void returnSelf() {
+    if (clientManager != null) {
+      clientManager.returnClient(endpoint, this);
+    }
+  }
+
+  @Override
+  public void onComplete() {
+    super.onComplete();
+    returnSelf();
+  }
+
+  public boolean isReady() {
+    try {
+      checkReady();
+      return true;
+    } catch (Exception e) {
+      return false;
+    }
+  }
+
+  public static class Factory
+      extends AsyncBaseClientFactory<EndPoint, AsyncDataNodeDataBlockServiceClient> {
+
+    public Factory(
+        ClientManager<AsyncDataNodeDataBlockServiceClient> clientManager,
+        ClientManagerProperty<AsyncDataNodeDataBlockServiceClient> clientManagerProperty) {
+      super(clientManager, clientManagerProperty);
+    }
+
+    @Override
+    public void destroyObject(
+        EndPoint endPoint, PooledObject<AsyncDataNodeDataBlockServiceClient> pooledObject) {
+      pooledObject.getObject().close();
+    }
+
+    @Override
+    public PooledObject<AsyncDataNodeDataBlockServiceClient> makeObject(EndPoint endPoint)
+        throws Exception {
+      TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % tManagers.length];
+      tManager = tManager == null ? new TAsyncClientManager() : tManager;
+      return new DefaultPooledObject<>(
+          new AsyncDataNodeDataBlockServiceClient(
+              clientManagerProperty.getProtocolFactory(),
+              clientManagerProperty.getConnectionTimeoutMs(),
+              endPoint,
+              tManager,
+              clientManager));
+    }
+
+    @Override
+    public boolean validateObject(
+        EndPoint endPoint, PooledObject<AsyncDataNodeDataBlockServiceClient> pooledObject) {
+      return pooledObject.getObject() != null && pooledObject.getObject().isValid();
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/client/sync/SyncConfigNodeIServiceClient.java b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncConfigNodeIServiceClient.java
new file mode 100644
index 0000000000..60e01bfc0c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncConfigNodeIServiceClient.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.client.sync;
+
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
+import org.apache.iotdb.commons.client.BaseClientFactory;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ClientManagerProperty;
+import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TimeoutChangeableTransport;
+
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
+
+import java.net.SocketException;
+
+public class SyncConfigNodeIServiceClient extends ConfigIService.Client {
+
+  private final EndPoint endpoint;
+  private final ClientManager<SyncConfigNodeIServiceClient> clientManager;
+
+  public SyncConfigNodeIServiceClient(
+      TProtocolFactory protocolFactory,
+      int connectionTimeout,
+      EndPoint endpoint,
+      ClientManager<SyncConfigNodeIServiceClient> clientManager)
+      throws TTransportException {
+    super(
+        protocolFactory.getProtocol(
+            RpcTransportFactory.INSTANCE.getTransport(
+                new TSocket(
+                    TConfigurationConst.defaultTConfiguration,
+                    endpoint.getIp(),
+                    endpoint.getPort(),
+                    connectionTimeout))));
+    this.endpoint = endpoint;
+    this.clientManager = clientManager;
+    getInputProtocol().getTransport().open();
+  }
+
+  public void returnSelf() {
+    if (clientManager != null) {
+      clientManager.returnClient(endpoint, this);
+    }
+  }
+
+  public void setTimeout(int timeout) {
+    // the same transport is used in both input and output
+    ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
+  }
+
+  public void close() {
+    getInputProtocol().getTransport().close();
+  }
+
+  public int getTimeout() throws SocketException {
+    return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
+  }
+
+  public static class Factory extends BaseClientFactory<EndPoint, SyncConfigNodeIServiceClient> {
+
+    public Factory(
+        ClientManager<SyncConfigNodeIServiceClient> clientManager,
+        ClientManagerProperty<SyncConfigNodeIServiceClient> clientManagerProperty) {
+      super(clientManager, clientManagerProperty);
+    }
+
+    @Override
+    public void destroyObject(
+        EndPoint endpoint, PooledObject<SyncConfigNodeIServiceClient> pooledObject) {
+      pooledObject.getObject().close();
+    }
+
+    @Override
+    public PooledObject<SyncConfigNodeIServiceClient> makeObject(EndPoint endpoint)
+        throws Exception {
+      return new DefaultPooledObject<>(
+          new SyncConfigNodeIServiceClient(
+              clientManagerProperty.getProtocolFactory(),
+              clientManagerProperty.getConnectionTimeoutMs(),
+              endpoint,
+              clientManager));
+    }
+
+    @Override
+    public boolean validateObject(
+        EndPoint endpoint, PooledObject<SyncConfigNodeIServiceClient> pooledObject) {
+      return pooledObject.getObject() != null
+          && pooledObject.getObject().getInputProtocol().getTransport().isOpen();
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/client/sync/SyncDataNodeDataBlockServiceClient.java b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncDataNodeDataBlockServiceClient.java
new file mode 100644
index 0000000000..f6188709a9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncDataNodeDataBlockServiceClient.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.client.sync;
+
+import org.apache.iotdb.common.rpc.thrift.EndPoint;
+import org.apache.iotdb.commons.client.BaseClientFactory;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ClientManagerProperty;
+import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TimeoutChangeableTransport;
+
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
+
+import java.net.SocketException;
+
+public class SyncDataNodeDataBlockServiceClient extends DataBlockService.Client {
+
+  private final EndPoint endpoint;
+  private final ClientManager<SyncDataNodeDataBlockServiceClient> clientManager;
+
+  public SyncDataNodeDataBlockServiceClient(
+      TProtocolFactory protocolFactory,
+      int connectionTimeout,
+      EndPoint endpoint,
+      ClientManager<SyncDataNodeDataBlockServiceClient> clientManager)
+      throws TTransportException {
+    super(
+        protocolFactory.getProtocol(
+            RpcTransportFactory.INSTANCE.getTransport(
+                new TSocket(
+                    TConfigurationConst.defaultTConfiguration,
+                    endpoint.getIp(),
+                    endpoint.getPort(),
+                    connectionTimeout))));
+    this.endpoint = endpoint;
+    this.clientManager = clientManager;
+    getInputProtocol().getTransport().open();
+  }
+
+  public void returnSelf() {
+    if (clientManager != null) {
+      clientManager.returnClient(endpoint, this);
+    }
+  }
+
+  public void setTimeout(int timeout) {
+    // the same transport is used in both input and output
+    ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
+  }
+
+  public void close() {
+    getInputProtocol().getTransport().close();
+  }
+
+  public int getTimeout() throws SocketException {
+    return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
+  }
+
+  public static class Factory
+      extends BaseClientFactory<EndPoint, SyncDataNodeDataBlockServiceClient> {
+
+    public Factory(
+        ClientManager<SyncDataNodeDataBlockServiceClient> clientManager,
+        ClientManagerProperty<SyncDataNodeDataBlockServiceClient> clientManagerProperty) {
+      super(clientManager, clientManagerProperty);
+    }
+
+    @Override
+    public void destroyObject(
+        EndPoint endpoint, PooledObject<SyncDataNodeDataBlockServiceClient> pooledObject) {
+      pooledObject.getObject().close();
+    }
+
+    @Override
+    public PooledObject<SyncDataNodeDataBlockServiceClient> makeObject(EndPoint endpoint)
+        throws Exception {
+      return new DefaultPooledObject<>(
+          new SyncDataNodeDataBlockServiceClient(
+              clientManagerProperty.getProtocolFactory(),
+              clientManagerProperty.getConnectionTimeoutMs(),
+              endpoint,
+              clientManager));
+    }
+
+    @Override
+    public boolean validateObject(
+        EndPoint endpoint, PooledObject<SyncDataNodeDataBlockServiceClient> pooledObject) {
+      return pooledObject.getObject() != null
+          && pooledObject.getObject().getInputProtocol().getTransport().isOpen();
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c702a9dba6..e9ee063e4f 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -875,6 +875,18 @@ public class IoTDBConfig {
   /** Thread keep alive time in ms of data block manager. */
   private int dataBlockManagerKeepAliveTimeInMs = 1000;
 
+  /** Thrift socket and connection timeout between data node and config node */
+  private int connectionTimeoutInMS = (int) TimeUnit.SECONDS.toMillis(20);
+
+  /**
+   * ClientPool will have so many selector threads (TAsyncClientManager) to distribute to its
+   * clients.
+   */
+  private final int selectorNumOfClientPool =
+      Runtime.getRuntime().availableProcessors() / 4 > 0
+          ? Runtime.getRuntime().availableProcessors() / 4
+          : 1;
+
   public float getUdfMemoryBudgetInMB() {
     return udfMemoryBudgetInMB;
   }
@@ -2769,6 +2781,18 @@ public class IoTDBConfig {
     this.dataBlockManagerKeepAliveTimeInMs = dataBlockManagerKeepAliveTimeInMs;
   }
 
+  public int getConnectionTimeoutInMS() {
+    return connectionTimeoutInMS;
+  }
+
+  public void setConnectionTimeoutInMS(int connectionTimeoutInMS) {
+    this.connectionTimeoutInMS = connectionTimeoutInMS;
+  }
+
+  public int getSelectorNumOfClientPool() {
+    return selectorNumOfClientPool;
+  }
+
   public boolean isMppMode() {
     return mppMode;
   }