You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/04/19 09:43:31 UTC

[iotdb] 03/03: make interface more generic

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch client_manager
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0953f6a72ad54349fd3aced1b4e492c18d31743d
Author: LebronAl <TX...@gmail.com>
AuthorDate: Tue Apr 19 17:42:28 2022 +0800

    make interface more generic
---
 .../iotdb/confignode/client/ClientPoolFactory.java |  8 +--
 .../iotdb/consensus/ratis/RatisClientFactory.java  | 67 ++++++++++++++++++++++
 .../iotdb/consensus/ratis/RatisConsensus.java      | 56 +++++++++++++-----
 node-commons/pom.xml                               |  4 ++
 .../commons/client/AsyncBaseClientFactory.java     |  4 +-
 .../iotdb/commons/client/BaseClientFactory.java    | 12 ++--
 .../apache/iotdb/commons/client/ClientManager.java | 19 +++---
 .../commons/client/ClientManagerProperty.java      | 26 ++++-----
 .../iotdb/commons/client/IClientManager.java       | 11 ++--
 .../iotdb/commons/client/IClientPoolFactory.java   |  7 +--
 .../async/AsyncDataNodeInternalServiceClient.java  |  6 +-
 .../sync/SyncDataNodeInternalServiceClient.java    |  6 +-
 .../apache/iotdb/db/client/ClientPoolFactory.java  | 16 +++---
 .../async/AsyncConfigNodeIServiceClient.java       |  6 +-
 .../async/AsyncDataNodeDataBlockServiceClient.java |  6 +-
 .../client/sync/SyncConfigNodeIServiceClient.java  |  6 +-
 .../sync/SyncDataNodeDataBlockServiceClient.java   |  6 +-
 17 files changed, 177 insertions(+), 89 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/ClientPoolFactory.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/ClientPoolFactory.java
index 73bb856065..d8731cb952 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/ClientPoolFactory.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/ClientPoolFactory.java
@@ -36,10 +36,10 @@ public class ClientPoolFactory {
   private static final ConfigNodeConf conf = ConfigNodeDescriptor.getInstance().getConf();
 
   public static class SyncDataNodeInternalServiceClientPoolFactory
-      implements IClientPoolFactory<SyncDataNodeInternalServiceClient> {
+      implements IClientPoolFactory<EndPoint, SyncDataNodeInternalServiceClient> {
     @Override
     public KeyedObjectPool<EndPoint, SyncDataNodeInternalServiceClient> createClientPool(
-        ClientManager<SyncDataNodeInternalServiceClient> manager) {
+        ClientManager<EndPoint, SyncDataNodeInternalServiceClient> manager) {
       ClientManagerProperty<SyncDataNodeInternalServiceClient> property =
           new ClientManagerProperty.Builder<SyncDataNodeInternalServiceClient>()
               .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
@@ -51,10 +51,10 @@ public class ClientPoolFactory {
   }
 
   public static class AsyncDataNodeInternalServiceClientPoolFactory
-      implements IClientPoolFactory<AsyncDataNodeInternalServiceClient> {
+      implements IClientPoolFactory<EndPoint, AsyncDataNodeInternalServiceClient> {
     @Override
     public KeyedObjectPool<EndPoint, AsyncDataNodeInternalServiceClient> createClientPool(
-        ClientManager<AsyncDataNodeInternalServiceClient> manager) {
+        ClientManager<EndPoint, AsyncDataNodeInternalServiceClient> manager) {
       ClientManagerProperty<AsyncDataNodeInternalServiceClient> property =
           new ClientManagerProperty.Builder<AsyncDataNodeInternalServiceClient>()
               .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClientFactory.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClientFactory.java
new file mode 100644
index 0000000000..27d5f7a640
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClientFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.commons.client.BaseClientFactory;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ClientManagerProperty;
+
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftGroup;
+
+public class RatisClientFactory extends BaseClientFactory<RaftGroup, RaftClient> {
+
+  private final RaftProperties raftProperties;
+  private final RaftClientRpc clientRpc;
+
+  public RatisClientFactory(
+      ClientManager<RaftGroup, RaftClient> clientManager,
+      ClientManagerProperty<RaftClient> clientManagerProperty,
+      RaftProperties raftProperties,
+      RaftClientRpc clientRpc) {
+    super(clientManager, clientManagerProperty);
+    this.raftProperties = raftProperties;
+    this.clientRpc = clientRpc;
+  }
+
+  @Override
+  public void destroyObject(RaftGroup key, PooledObject<RaftClient> pooledObject) throws Exception {
+    pooledObject.getObject().close();
+  }
+
+  @Override
+  public PooledObject<RaftClient> makeObject(RaftGroup group) throws Exception {
+    return new DefaultPooledObject<>(
+        RaftClient.newBuilder()
+            .setProperties(raftProperties)
+            .setRaftGroup(group)
+            .setClientRpc(clientRpc)
+            .build());
+  }
+
+  @Override
+  public boolean validateObject(RaftGroup key, PooledObject<RaftClient> pooledObject) {
+    return true;
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index a87cafd2fe..cbf3f713f6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.consensus.ratis;
 
 import org.apache.iotdb.common.rpc.thrift.EndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ClientManagerProperty;
+import org.apache.iotdb.commons.client.IClientPoolFactory;
 import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.IConsensus;
@@ -37,7 +40,10 @@ import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
 import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
 import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
 import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
@@ -74,41 +80,40 @@ import java.util.stream.Collectors;
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 class RatisConsensus implements IConsensus {
+
+  private final Logger logger = LoggerFactory.getLogger(RatisConsensus.class);
+
   // the unique net communication endpoint
   private final RaftPeer myself;
-
   private final RaftServer server;
 
-  private final Map<RaftGroupId, RaftClient> clientMap;
-  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+  private final RaftProperties properties = new RaftProperties();;
+  private final RaftClientRpc clientRpc;
+
+  private final ClientManager<RaftGroup, RaftClient> clientManager =
+      new ClientManager<>(new RatisClientPoolFactory());
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap = new ConcurrentHashMap<>();
+  private final Map<RaftGroupId, RaftClient> clientMap = new ConcurrentHashMap<>();
 
-  private ClientId localFakeId;
-  private AtomicLong localFakeCallId;
+  private final ClientId localFakeId = ClientId.randomId();
+  private final AtomicLong localFakeCallId = new AtomicLong(0);
 
   private static final int DEFAULT_PRIORITY = 0;
   private static final int LEADER_PRIORITY = 1;
 
-  private Logger logger = LoggerFactory.getLogger(RatisConsensus.class);
-
   public RatisConsensus(Endpoint endpoint, File ratisStorageDir, IStateMachine.Registry registry)
       throws IOException {
 
-    this.clientMap = new ConcurrentHashMap<>();
-    this.raftGroupMap = new ConcurrentHashMap<>();
-    this.localFakeId = ClientId.randomId();
-    this.localFakeCallId = new AtomicLong(0);
-
     // create a RaftPeer as endpoint of comm
     String address = Utils.IPAddress(endpoint);
     myself = Utils.toRaftPeer(endpoint, DEFAULT_PRIORITY);
 
-    RaftProperties properties = new RaftProperties();
-
     RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(ratisStorageDir));
 
     // set the port which server listen to in RaftProperty object
     final int port = NetUtils.createSocketAddr(address).getPort();
     GrpcConfigKeys.Server.setPort(properties, port);
+    clientRpc = new GrpcFactory(new Parameters()).newRaftClientRpc(ClientId.randomId(), properties);
 
     server =
         RaftServer.newBuilder()
@@ -128,6 +133,7 @@ class RatisConsensus implements IConsensus {
 
   @Override
   public void stop() throws IOException {
+    clientManager.close();
     server.close();
   }
 
@@ -443,7 +449,7 @@ class RatisConsensus implements IConsensus {
       isLeader = server.getDivision(raftGroupId).getInfo().isLeader();
     } catch (IOException exception) {
       // if the query fails, simply return not leader
-      logger.warn("isLeader request failed with exception: ", exception);
+      logger.info("isLeader request failed with exception: ", exception);
       isLeader = false;
     }
     return isLeader;
@@ -516,6 +522,15 @@ class RatisConsensus implements IConsensus {
     return client;
   }
 
+  private RaftClient getRaftClient(RaftGroup group) {
+    try {
+      return clientManager.borrowClient(group);
+    } catch (IOException e) {
+      logger.error(String.format("Borrow client from pool for group %s failed.", group), e);
+      return null;
+    }
+  }
+
   private void closeRaftClient(RaftGroupId groupId) {
     RaftClient client = clientMap.get(groupId);
     if (client != null) {
@@ -563,4 +578,15 @@ class RatisConsensus implements IConsensus {
     }
     return reply;
   }
+
+  private class RatisClientPoolFactory implements IClientPoolFactory<RaftGroup, RaftClient> {
+    @Override
+    public KeyedObjectPool<RaftGroup, RaftClient> createClientPool(
+        ClientManager<RaftGroup, RaftClient> manager) {
+      ClientManagerProperty<RaftClient> property =
+          new ClientManagerProperty.Builder<RaftClient>().build();
+      return new GenericKeyedObjectPool<>(
+          new RatisClientFactory(manager, property, properties, clientRpc), property.getConfig());
+    }
+  }
 }
diff --git a/node-commons/pom.xml b/node-commons/pom.xml
index f4e3587e38..a88c22bbf3 100644
--- a/node-commons/pom.xml
+++ b/node-commons/pom.xml
@@ -53,6 +53,10 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-pool2</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
index b710f5482c..b822b0043b 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
@@ -26,14 +26,14 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
-public abstract class AsyncBaseClientFactory<K, T> extends BaseClientFactory<K, T> {
+public abstract class AsyncBaseClientFactory<K, V> extends BaseClientFactory<K, V> {
 
   private static final Logger logger = LoggerFactory.getLogger(AsyncBaseClientFactory.class);
   protected TAsyncClientManager[] tManagers;
   protected AtomicInteger clientCnt = new AtomicInteger();
 
   protected AsyncBaseClientFactory(
-      ClientManager<T> clientManager, ClientManagerProperty<T> clientManagerProperty) {
+      ClientManager<K, V> clientManager, ClientManagerProperty<V> clientManagerProperty) {
     super(clientManager, clientManagerProperty);
     tManagers = new TAsyncClientManager[clientManagerProperty.getSelectorNumOfAsyncClientPool()];
     for (int i = 0; i < tManagers.length; i++) {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java
index f628692152..1f85805691 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java
@@ -22,20 +22,20 @@ package org.apache.iotdb.commons.client;
 import org.apache.commons.pool2.KeyedPooledObjectFactory;
 import org.apache.commons.pool2.PooledObject;
 
-public abstract class BaseClientFactory<K, T> implements KeyedPooledObjectFactory<K, T> {
+public abstract class BaseClientFactory<K, V> implements KeyedPooledObjectFactory<K, V> {
 
-  protected ClientManager<T> clientManager;
-  protected ClientManagerProperty<T> clientManagerProperty;
+  protected ClientManager<K, V> clientManager;
+  protected ClientManagerProperty<V> clientManagerProperty;
 
   public BaseClientFactory(
-      ClientManager<T> clientManager, ClientManagerProperty<T> clientManagerProperty) {
+      ClientManager<K, V> clientManager, ClientManagerProperty<V> clientManagerProperty) {
     this.clientManager = clientManager;
     this.clientManagerProperty = clientManagerProperty;
   }
 
   @Override
-  public void activateObject(K node, PooledObject<T> pooledObject) {}
+  public void activateObject(K node, PooledObject<V> pooledObject) {}
 
   @Override
-  public void passivateObject(K node, PooledObject<T> pooledObject) {}
+  public void passivateObject(K node, PooledObject<V> pooledObject) {}
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
index 0513af1a5d..de3b8947cb 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -19,31 +19,28 @@
 
 package org.apache.iotdb.commons.client;
 
-import org.apache.iotdb.common.rpc.thrift.EndPoint;
-
 import org.apache.commons.pool2.KeyedObjectPool;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Optional;
 
-public class ClientManager<E> implements IClientManager<E> {
+public class ClientManager<K, V> implements IClientManager<K, V> {
 
   private static final Logger logger = LoggerFactory.getLogger(ClientManager.class);
 
-  private final KeyedObjectPool<EndPoint, E> pool;
+  private final KeyedObjectPool<K, V> pool;
 
-  public ClientManager(IClientPoolFactory<E> factory) {
+  public ClientManager(IClientPoolFactory<K, V> factory) {
     pool = factory.createClientPool(this);
   }
 
   @Override
-  public Optional<E> borrowClient(EndPoint node) throws IOException {
-    Optional<E> client = Optional.empty();
+  public V borrowClient(K node) throws IOException {
+    V client = null;
     try {
-      client = Optional.of(pool.borrowObject(node));
+      client = pool.borrowObject(node);
     } catch (TTransportException e) {
       // external needs to check transport related exception
       throw new IOException(e);
@@ -58,7 +55,7 @@ public class ClientManager<E> implements IClientManager<E> {
   }
 
   @Override
-  public void returnClient(EndPoint node, E client) {
+  public void returnClient(K node, V client) {
     if (client != null && node != null) {
       try {
         pool.returnObject(node, client);
@@ -70,7 +67,7 @@ public class ClientManager<E> implements IClientManager<E> {
   }
 
   @Override
-  public void clear(EndPoint node) {
+  public void clear(K node) {
     if (node != null) {
       try {
         pool.clear(node);
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManagerProperty.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManagerProperty.java
index b35f163257..5fc95174db 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManagerProperty.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManagerProperty.java
@@ -26,9 +26,9 @@ import org.apache.thrift.protocol.TProtocolFactory;
 
 import java.time.Duration;
 
-public class ClientManagerProperty<T> {
+public class ClientManagerProperty<V> {
 
-  private final GenericKeyedObjectPoolConfig<T> config;
+  private final GenericKeyedObjectPoolConfig<V> config;
 
   // thrift client config
   private final TProtocolFactory protocolFactory;
@@ -36,7 +36,7 @@ public class ClientManagerProperty<T> {
   private int selectorNumOfAsyncClientPool = 1;
 
   public ClientManagerProperty(
-      GenericKeyedObjectPoolConfig<T> config,
+      GenericKeyedObjectPoolConfig<V> config,
       TProtocolFactory protocolFactory,
       int connectionTimeoutMs,
       int selectorNumOfAsyncClientPool) {
@@ -46,7 +46,7 @@ public class ClientManagerProperty<T> {
     this.selectorNumOfAsyncClientPool = selectorNumOfAsyncClientPool;
   }
 
-  public GenericKeyedObjectPoolConfig<T> getConfig() {
+  public GenericKeyedObjectPoolConfig<V> getConfig() {
     return config;
   }
 
@@ -62,7 +62,7 @@ public class ClientManagerProperty<T> {
     return selectorNumOfAsyncClientPool;
   }
 
-  public static class Builder<T> {
+  public static class Builder<V> {
 
     // pool config
     private long waitClientTimeoutMS = 20_000;
@@ -74,38 +74,38 @@ public class ClientManagerProperty<T> {
     private int connectionTimeoutMs = 20_000;
     private int selectorNumOfAsyncClientPool = 1;
 
-    public Builder<T> setWaitClientTimeoutMS(long waitClientTimeoutMS) {
+    public Builder<V> setWaitClientTimeoutMS(long waitClientTimeoutMS) {
       this.waitClientTimeoutMS = waitClientTimeoutMS;
       return this;
     }
 
-    public Builder<T> setMaxConnectionForEachNode(int maxConnectionForEachNode) {
+    public Builder<V> setMaxConnectionForEachNode(int maxConnectionForEachNode) {
       this.maxConnectionForEachNode = maxConnectionForEachNode;
       return this;
     }
 
-    public Builder<T> setMaxIdleConnectionForEachNode(int maxIdleConnectionForEachNode) {
+    public Builder<V> setMaxIdleConnectionForEachNode(int maxIdleConnectionForEachNode) {
       this.maxIdleConnectionForEachNode = maxIdleConnectionForEachNode;
       return this;
     }
 
-    public Builder<T> setRpcThriftCompressionEnabled(boolean rpcThriftCompressionEnabled) {
+    public Builder<V> setRpcThriftCompressionEnabled(boolean rpcThriftCompressionEnabled) {
       this.rpcThriftCompressionEnabled = rpcThriftCompressionEnabled;
       return this;
     }
 
-    public Builder<T> setConnectionTimeoutMs(int connectionTimeoutMs) {
+    public Builder<V> setConnectionTimeoutMs(int connectionTimeoutMs) {
       this.connectionTimeoutMs = connectionTimeoutMs;
       return this;
     }
 
-    public Builder<T> setSelectorNumOfAsyncClientPool(int selectorNumOfAsyncClientPool) {
+    public Builder<V> setSelectorNumOfAsyncClientPool(int selectorNumOfAsyncClientPool) {
       this.selectorNumOfAsyncClientPool = selectorNumOfAsyncClientPool;
       return this;
     }
 
-    public ClientManagerProperty<T> build() {
-      GenericKeyedObjectPoolConfig<T> poolConfig = new GenericKeyedObjectPoolConfig<>();
+    public ClientManagerProperty<V> build() {
+      GenericKeyedObjectPoolConfig<V> poolConfig = new GenericKeyedObjectPoolConfig<>();
       poolConfig.setMaxTotalPerKey(maxConnectionForEachNode);
       poolConfig.setMaxIdlePerKey(maxIdleConnectionForEachNode);
       poolConfig.setMaxWait(Duration.ofMillis(waitClientTimeoutMS));
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java
index 00ee3d45a5..529c78e8b1 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java
@@ -19,18 +19,15 @@
 
 package org.apache.iotdb.commons.client;
 
-import org.apache.iotdb.common.rpc.thrift.EndPoint;
-
 import java.io.IOException;
-import java.util.Optional;
 
-public interface IClientManager<E> {
+public interface IClientManager<K, V> {
 
-  Optional<E> borrowClient(EndPoint node) throws IOException;
+  V borrowClient(K node) throws IOException;
 
-  void returnClient(EndPoint node, E client);
+  void returnClient(K node, V client);
 
-  void clear(EndPoint node);
+  void clear(K node);
 
   void close();
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java
index 80341bf5f5..aa7980decb 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java
@@ -19,12 +19,9 @@
 
 package org.apache.iotdb.commons.client;
 
-import org.apache.iotdb.common.rpc.thrift.EndPoint;
-
 import org.apache.commons.pool2.KeyedObjectPool;
 
-public interface IClientPoolFactory<E> {
+public interface IClientPoolFactory<K, V> {
 
-  KeyedObjectPool<EndPoint, E> createClientPool(
-      ClientManager<E> manager);
+  KeyedObjectPool<K, V> createClientPool(ClientManager<K, V> manager);
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
index 6119b799a8..8547d13890 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
@@ -37,14 +37,14 @@ import java.io.IOException;
 public class AsyncDataNodeInternalServiceClient extends InternalService.AsyncClient {
 
   private final EndPoint endpoint;
-  private final ClientManager<AsyncDataNodeInternalServiceClient> clientManager;
+  private final ClientManager<EndPoint, AsyncDataNodeInternalServiceClient> clientManager;
 
   public AsyncDataNodeInternalServiceClient(
       TProtocolFactory protocolFactory,
       int connectionTimeout,
       EndPoint endpoint,
       TAsyncClientManager tClientManager,
-      ClientManager<AsyncDataNodeInternalServiceClient> clientManager)
+      ClientManager<EndPoint, AsyncDataNodeInternalServiceClient> clientManager)
       throws IOException {
     super(
         protocolFactory,
@@ -92,7 +92,7 @@ public class AsyncDataNodeInternalServiceClient extends InternalService.AsyncCli
       extends AsyncBaseClientFactory<EndPoint, AsyncDataNodeInternalServiceClient> {
 
     public Factory(
-        ClientManager<AsyncDataNodeInternalServiceClient> clientManager,
+        ClientManager<EndPoint, AsyncDataNodeInternalServiceClient> clientManager,
         ClientManagerProperty<AsyncDataNodeInternalServiceClient> clientManagerProperty) {
       super(clientManager, clientManagerProperty);
     }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
index a40104bbbc..c273809554 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
@@ -40,13 +40,13 @@ import java.net.SocketException;
 public class SyncDataNodeInternalServiceClient extends InternalService.Client {
 
   private final EndPoint endpoint;
-  private final ClientManager<SyncDataNodeInternalServiceClient> clientManager;
+  private final ClientManager<EndPoint, SyncDataNodeInternalServiceClient> clientManager;
 
   public SyncDataNodeInternalServiceClient(
       TProtocolFactory protocolFactory,
       int connectionTimeout,
       EndPoint endpoint,
-      ClientManager<SyncDataNodeInternalServiceClient> clientManager)
+      ClientManager<EndPoint, SyncDataNodeInternalServiceClient> clientManager)
       throws TTransportException {
     super(
         protocolFactory.getProtocol(
@@ -84,7 +84,7 @@ public class SyncDataNodeInternalServiceClient extends InternalService.Client {
       extends BaseClientFactory<EndPoint, SyncDataNodeInternalServiceClient> {
 
     public Factory(
-        ClientManager<SyncDataNodeInternalServiceClient> clientManager,
+        ClientManager<EndPoint, SyncDataNodeInternalServiceClient> clientManager,
         ClientManagerProperty<SyncDataNodeInternalServiceClient> clientManagerProperty) {
       super(clientManager, clientManagerProperty);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ClientPoolFactory.java b/server/src/main/java/org/apache/iotdb/db/client/ClientPoolFactory.java
index 63f6c7a45c..e24e5d4e94 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ClientPoolFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ClientPoolFactory.java
@@ -38,10 +38,10 @@ public class ClientPoolFactory {
   private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
 
   public static class SyncConfigNodeIServiceClientPoolFactory
-      implements IClientPoolFactory<SyncConfigNodeIServiceClient> {
+      implements IClientPoolFactory<EndPoint, SyncConfigNodeIServiceClient> {
     @Override
     public KeyedObjectPool<EndPoint, SyncConfigNodeIServiceClient> createClientPool(
-        ClientManager<SyncConfigNodeIServiceClient> manager) {
+        ClientManager<EndPoint, SyncConfigNodeIServiceClient> manager) {
       ClientManagerProperty<SyncConfigNodeIServiceClient> property =
           new ClientManagerProperty.Builder<SyncConfigNodeIServiceClient>()
               .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
@@ -53,10 +53,10 @@ public class ClientPoolFactory {
   }
 
   public static class AsyncConfigNodeIServiceClientPoolFactory
-      implements IClientPoolFactory<AsyncConfigNodeIServiceClient> {
+      implements IClientPoolFactory<EndPoint, AsyncConfigNodeIServiceClient> {
     @Override
     public KeyedObjectPool<EndPoint, AsyncConfigNodeIServiceClient> createClientPool(
-        ClientManager<AsyncConfigNodeIServiceClient> manager) {
+        ClientManager<EndPoint, AsyncConfigNodeIServiceClient> manager) {
       ClientManagerProperty<AsyncConfigNodeIServiceClient> property =
           new ClientManagerProperty.Builder<AsyncConfigNodeIServiceClient>()
               .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
@@ -69,10 +69,10 @@ public class ClientPoolFactory {
   }
 
   public static class SyncDataNodeInternalServiceClientPoolFactory
-      implements IClientPoolFactory<SyncDataNodeInternalServiceClient> {
+      implements IClientPoolFactory<EndPoint, SyncDataNodeInternalServiceClient> {
     @Override
     public KeyedObjectPool<EndPoint, SyncDataNodeInternalServiceClient> createClientPool(
-        ClientManager<SyncDataNodeInternalServiceClient> manager) {
+        ClientManager<EndPoint, SyncDataNodeInternalServiceClient> manager) {
       ClientManagerProperty<SyncDataNodeInternalServiceClient> property =
           new ClientManagerProperty.Builder<SyncDataNodeInternalServiceClient>()
               .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
@@ -84,10 +84,10 @@ public class ClientPoolFactory {
   }
 
   public static class AsyncDataNodeInternalServiceClientPoolFactory
-      implements IClientPoolFactory<AsyncDataNodeInternalServiceClient> {
+      implements IClientPoolFactory<EndPoint, AsyncDataNodeInternalServiceClient> {
     @Override
     public KeyedObjectPool<EndPoint, AsyncDataNodeInternalServiceClient> createClientPool(
-        ClientManager<AsyncDataNodeInternalServiceClient> manager) {
+        ClientManager<EndPoint, AsyncDataNodeInternalServiceClient> manager) {
       ClientManagerProperty<AsyncDataNodeInternalServiceClient> property =
           new ClientManagerProperty.Builder<AsyncDataNodeInternalServiceClient>()
               .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
diff --git a/server/src/main/java/org/apache/iotdb/db/client/async/AsyncConfigNodeIServiceClient.java b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncConfigNodeIServiceClient.java
index 00ac180429..48bb173b83 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/async/AsyncConfigNodeIServiceClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncConfigNodeIServiceClient.java
@@ -36,14 +36,14 @@ import java.io.IOException;
 public class AsyncConfigNodeIServiceClient extends ConfigIService.AsyncClient {
 
   private final EndPoint endpoint;
-  private final ClientManager<AsyncConfigNodeIServiceClient> clientManager;
+  private final ClientManager<EndPoint, AsyncConfigNodeIServiceClient> clientManager;
 
   public AsyncConfigNodeIServiceClient(
       TProtocolFactory protocolFactory,
       int connectionTimeout,
       EndPoint endpoint,
       TAsyncClientManager tClientManager,
-      ClientManager<AsyncConfigNodeIServiceClient> clientManager)
+      ClientManager<EndPoint, AsyncConfigNodeIServiceClient> clientManager)
       throws IOException {
     super(
         protocolFactory,
@@ -91,7 +91,7 @@ public class AsyncConfigNodeIServiceClient extends ConfigIService.AsyncClient {
       extends AsyncBaseClientFactory<EndPoint, AsyncConfigNodeIServiceClient> {
 
     public Factory(
-        ClientManager<AsyncConfigNodeIServiceClient> clientManager,
+        ClientManager<EndPoint, AsyncConfigNodeIServiceClient> clientManager,
         ClientManagerProperty<AsyncConfigNodeIServiceClient> clientManagerProperty) {
       super(clientManager, clientManagerProperty);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/client/async/AsyncDataNodeDataBlockServiceClient.java b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncDataNodeDataBlockServiceClient.java
index 1701083464..5b4e81e775 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/async/AsyncDataNodeDataBlockServiceClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncDataNodeDataBlockServiceClient.java
@@ -36,14 +36,14 @@ import java.io.IOException;
 public class AsyncDataNodeDataBlockServiceClient extends DataBlockService.AsyncClient {
 
   private final EndPoint endpoint;
-  private final ClientManager<AsyncDataNodeDataBlockServiceClient> clientManager;
+  private final ClientManager<EndPoint, AsyncDataNodeDataBlockServiceClient> clientManager;
 
   public AsyncDataNodeDataBlockServiceClient(
       TProtocolFactory protocolFactory,
       int connectionTimeout,
       EndPoint endpoint,
       TAsyncClientManager tClientManager,
-      ClientManager<AsyncDataNodeDataBlockServiceClient> clientManager)
+      ClientManager<EndPoint, AsyncDataNodeDataBlockServiceClient> clientManager)
       throws IOException {
     super(
         protocolFactory,
@@ -91,7 +91,7 @@ public class AsyncDataNodeDataBlockServiceClient extends DataBlockService.AsyncC
       extends AsyncBaseClientFactory<EndPoint, AsyncDataNodeDataBlockServiceClient> {
 
     public Factory(
-        ClientManager<AsyncDataNodeDataBlockServiceClient> clientManager,
+        ClientManager<EndPoint, AsyncDataNodeDataBlockServiceClient> clientManager,
         ClientManagerProperty<AsyncDataNodeDataBlockServiceClient> clientManagerProperty) {
       super(clientManager, clientManagerProperty);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/client/sync/SyncConfigNodeIServiceClient.java b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncConfigNodeIServiceClient.java
index 60e01bfc0c..ab9121f8de 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/sync/SyncConfigNodeIServiceClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncConfigNodeIServiceClient.java
@@ -39,13 +39,13 @@ import java.net.SocketException;
 public class SyncConfigNodeIServiceClient extends ConfigIService.Client {
 
   private final EndPoint endpoint;
-  private final ClientManager<SyncConfigNodeIServiceClient> clientManager;
+  private final ClientManager<EndPoint, SyncConfigNodeIServiceClient> clientManager;
 
   public SyncConfigNodeIServiceClient(
       TProtocolFactory protocolFactory,
       int connectionTimeout,
       EndPoint endpoint,
-      ClientManager<SyncConfigNodeIServiceClient> clientManager)
+      ClientManager<EndPoint, SyncConfigNodeIServiceClient> clientManager)
       throws TTransportException {
     super(
         protocolFactory.getProtocol(
@@ -82,7 +82,7 @@ public class SyncConfigNodeIServiceClient extends ConfigIService.Client {
   public static class Factory extends BaseClientFactory<EndPoint, SyncConfigNodeIServiceClient> {
 
     public Factory(
-        ClientManager<SyncConfigNodeIServiceClient> clientManager,
+        ClientManager<EndPoint, SyncConfigNodeIServiceClient> clientManager,
         ClientManagerProperty<SyncConfigNodeIServiceClient> clientManagerProperty) {
       super(clientManager, clientManagerProperty);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/client/sync/SyncDataNodeDataBlockServiceClient.java b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncDataNodeDataBlockServiceClient.java
index f6188709a9..7f598e6159 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/sync/SyncDataNodeDataBlockServiceClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncDataNodeDataBlockServiceClient.java
@@ -39,13 +39,13 @@ import java.net.SocketException;
 public class SyncDataNodeDataBlockServiceClient extends DataBlockService.Client {
 
   private final EndPoint endpoint;
-  private final ClientManager<SyncDataNodeDataBlockServiceClient> clientManager;
+  private final ClientManager<EndPoint, SyncDataNodeDataBlockServiceClient> clientManager;
 
   public SyncDataNodeDataBlockServiceClient(
       TProtocolFactory protocolFactory,
       int connectionTimeout,
       EndPoint endpoint,
-      ClientManager<SyncDataNodeDataBlockServiceClient> clientManager)
+      ClientManager<EndPoint, SyncDataNodeDataBlockServiceClient> clientManager)
       throws TTransportException {
     super(
         protocolFactory.getProtocol(
@@ -83,7 +83,7 @@ public class SyncDataNodeDataBlockServiceClient extends DataBlockService.Client
       extends BaseClientFactory<EndPoint, SyncDataNodeDataBlockServiceClient> {
 
     public Factory(
-        ClientManager<SyncDataNodeDataBlockServiceClient> clientManager,
+        ClientManager<EndPoint, SyncDataNodeDataBlockServiceClient> clientManager,
         ClientManagerProperty<SyncDataNodeDataBlockServiceClient> clientManagerProperty) {
       super(clientManager, clientManagerProperty);
     }