You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2021/09/14 08:34:45 UTC

[GitHub] [iotdb] LebronAl commented on a change in pull request #3886: [cluster-refactor] refactor client pool

LebronAl commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r707980449



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
##########
@@ -0,0 +1,201 @@
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.ClusterIoTDB;
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * One should borrow the reusable client from this class and return the client after use. The
+ * underlying client pool is powered by Apache Commons Pool. The class provided 3 default pool group
+ * according to current usage: ClusterClients, DataGroupClients, MetaGroupClients.
+ *
+ * <p>ClusterClients implement the data query and insert interfaces such as query and non-query
+ * request
+ *
+ * <p>DataGroupClient implement the data group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>MetaGroupClient implement the meta group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>TODO: We can refine the client structure by reorg the interfaces defined in cluster-thrift.
+ */
+public class ClientManager implements IClientPool {
+
+  private static final Logger logger = LoggerFactory.getLogger(ClusterIoTDB.class);
+
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.AsyncClient>> asyncClientPoolMap;
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.Client>> syncClientPoolMap;
+
+  public enum Type {
+    ClusterClient,
+    DataGroupClient,
+    MetaGroupClient
+  }
+
+  public ClientManager(boolean isAsyncMode, Type type) {
+    if (isAsyncMode) {
+      asyncClientPoolMap = Maps.newHashMap();
+      constructAsyncClientMap(type);
+    } else {
+      syncClientPoolMap = Maps.newHashMap();
+      constructSyncClientMap(type);
+    }
+  }
+
+  private void constructAsyncClientMap(Type type) {
+    switch (type) {
+      case ClusterClient:
+        asyncClientPoolMap.put(
+            ClientCategory.DATA,
+            ClientPoolFactory.getInstance().createAsyncDataPool(ClientCategory.DATA));
+        break;
+      case MetaGroupClient:
+        asyncClientPoolMap.put(
+            ClientCategory.META,
+            ClientPoolFactory.getInstance().createAsyncMetaPool(ClientCategory.META));
+        asyncClientPoolMap.put(
+            ClientCategory.META_HEARTBEAT,
+            ClientPoolFactory.getInstance().createAsyncMetaPool(ClientCategory.META_HEARTBEAT));
+        break;
+      case DataGroupClient:
+        asyncClientPoolMap.put(
+            ClientCategory.DATA,
+            ClientPoolFactory.getInstance().createAsyncDataPool(ClientCategory.DATA));
+        asyncClientPoolMap.put(
+            ClientCategory.DATA_HEARTBEAT,
+            ClientPoolFactory.getInstance().createAsyncDataPool(ClientCategory.DATA_HEARTBEAT));
+        asyncClientPoolMap.put(
+            ClientCategory.SINGLE_MASTER,
+            ClientPoolFactory.getInstance().createSingleManagerAsyncDataPool());
+        break;
+      default:
+        break;

Review comment:
       maybe a warn log?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
##########
@@ -119,15 +123,13 @@
 
   private boolean allowReport = true;
 
-  /**
-   * hardLinkCleaner will periodically clean expired hardlinks created during snapshots
-   */
+  /** hardLinkCleaner will periodically clean expired hardlinks created during snapshots */
   private ScheduledExecutorService hardLinkCleanerThread;
 
   // currently, dataClientProvider is only used for those instances who do not belong to any

Review comment:
       update comments

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
##########
@@ -19,64 +19,142 @@
 
 package org.apache.iotdb.cluster.client.sync;
 
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.IClientPool;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.TSDataService.Client;
+import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TimeoutChangeableTransport;
 
+import org.apache.commons.pool2.KeyedPooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
+import java.net.SocketException;
+
 /**
  * Notice: Because a client will be returned to a pool immediately after a successful request, you
- * should not cache it anywhere else or there may be conflicts.
+ * should not cache it anywhere else.
  */
-// the two classes does not share a common parent and Java does not allow multiple extension
-@SuppressWarnings("common-java:DuplicatedBlocks")
-public class SyncDataClient extends TSDataServiceClient {
+// TODO: Refine the interfaces of TSDataService. TSDataService interfaces doesn't need extends
+// TODO: RaftService interfaces
+public class SyncDataClient extends Client {
+
+  private Node node;
+  private ClientCategory category;
+  private IClientPool clientPool;
 
-  /** @param prot this constructor just create a new instance, but do not open the connection */
   @TestOnly
   public SyncDataClient(TProtocol prot) {
     super(prot);
   }
 
-  SyncDataClient(TProtocolFactory protocolFactory, Node target, SyncClientPool pool)
+  public SyncDataClient(TProtocolFactory protocolFactory, Node node, ClientCategory category)
       throws TTransportException {
+
+    // the difference of the two clients lies in the port
     super(
-        protocolFactory,
-        target.getInternalIp(),
-        target.getDataPort(),
-        ClusterConstant.getConnectionTimeoutInMS(),
-        target,
-        pool);
+        protocolFactory.getProtocol(
+            RpcTransportFactory.INSTANCE.getTransport(
+                new TSocket(
+                    TConfigurationConst.defaultTConfiguration,
+                    node.getInternalIp(),
+                    ClientUtils.getPort(node, category),
+                    ClusterConstant.getConnectionTimeoutInMS()))));
+    this.node = node;
+    this.category = category;
+    getInputProtocol().getTransport().open();
+  }
+
+  public void setClientPool(IClientPool clientPool) {
+    this.clientPool = clientPool;
+  }
+
+  public void returnSelf() {
+    if (clientPool != null) clientPool.returnSyncClient(this, node, category);
+  }
+
+  public void setTimeout(int timeout) {
+    // the same transport is used in both input and output
+    ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
+  }
+
+  public void close() {
+    getInputProtocol().getTransport().close();
+  }
+
+  @TestOnly
+  public int getTimeout() throws SocketException {
+    return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
   }
 
   @Override
   public String toString() {
-    return String.format(
-        "SyncDataClient (ip = %s, port = %d, id = %d)",
-        target.getInternalIp(), target.getDataPort(), target.getNodeIdentifier());
+    return "Sync"
+        + category.getName()
+        + "{"
+        + "node="
+        + node
+        + ","
+        + "port="
+        + ClientUtils.getPort(node, category)
+        + '}';
+  }
+
+  public Node getNode() {
+    return node;
   }
 
-  public static class Factory implements SyncClientFactory {
+  public static class SyncDataClientFactory
+      implements KeyedPooledObjectFactory<Node, SyncDataClient> {
 
     private TProtocolFactory protocolFactory;
+    private ClientCategory category;
 
-    public Factory(TProtocolFactory protocolFactory) {
+    public SyncDataClientFactory(TProtocolFactory protocolFactory, ClientCategory category) {
       this.protocolFactory = protocolFactory;
+      this.category = category;
+    }
+
+    //  public String nodeInfo(Node node) {
+    //    return String.format(
+    //        "MetaNode (listenIp = %s, HB port = %d, id = %d)",
+    //        node.getInternalIp(),
+    //        node.getMetaPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
+    //        node.getNodeIdentifier());
+    //  }
+
+    @Override
+    public void activateObject(Node node, PooledObject<SyncDataClient> pooledObject)

Review comment:
       Do we have to override these functions with empty implementation?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientPoolFactory.java
##########
@@ -0,0 +1,89 @@
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+
+import java.time.Duration;
+
+public class ClientPoolFactory {
+
+  protected long waitClientTimeoutMS;
+  protected int maxConnectionForEachNode;
+  private TProtocolFactory protocolFactory;
+
+  ClientPoolFactory() {
+    ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
+    this.waitClientTimeoutMS = config.getWaitClientTimeoutMS();
+    this.maxConnectionForEachNode = config.getMaxClientPerNodePerMember();
+    protocolFactory =
+        config.isRpcThriftCompressionEnabled()
+            ? new TCompactProtocol.Factory()
+            : new TBinaryProtocol.Factory();
+  }
+
+  public static ClientPoolFactory getInstance() {
+    return ClientPoolProviderHolder.INSTANCE;
+  }
+
+  public GenericKeyedObjectPool<Node, RaftService.Client> createSyncDataPool(
+      ClientCategory category) {
+    GenericKeyedObjectPoolConfig config = new GenericKeyedObjectPoolConfig();

Review comment:
       make this config a `field` of  `ClientPoolFactory.java`?

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReaderTest.java
##########
@@ -182,100 +174,103 @@ public void testDefaultBatchStrategySelect() {
   }
 
   private void setAsyncDataClient() {
-    ClusterIoTDB.getInstance()
-        .setClientProvider(
-            new DataClientProvider(new Factory()) {
-              @Override
-              public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
-                return new AsyncDataClient(null, null, node, null) {
-                  @Override
-                  public void fetchMultSeries(
-                      RaftNode header,
-                      long readerId,
-                      List<String> paths,
-                      AsyncMethodCallback<Map<String, ByteBuffer>> resultHandler)
-                      throws TException {
-                    if (failedNodes.contains(node)) {
-                      throw new TException("Node down.");
-                    }
-
-                    new Thread(
-                            () -> {
-                              Map<String, ByteBuffer> stringByteBufferMap = Maps.newHashMap();
-                              if (batchUsed) {
-                                paths.forEach(
-                                    path -> {
-                                      stringByteBufferMap.put(path, ByteBuffer.allocate(0));
-                                    });
-                              } else {
-                                batchUsed = true;
-
-                                for (int i = 0; i < batchData.size(); i++) {
-                                  stringByteBufferMap.put(
-                                      paths.get(i), generateByteBuffer(batchData.get(i)));
-                                }
-
-                                resultHandler.onComplete(stringByteBufferMap);
-                              }
-                            })
-                        .start();
-                  }
-
-                  @Override
-                  public void queryMultSeries(
-                      MultSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler)
-                      throws TException {
-                    if (failedNodes.contains(node)) {
-                      throw new TException("Node down.");
-                    }
-
-                    new Thread(() -> resultHandler.onComplete(1L)).start();
-                  }
-                };
-              }
-            });
+    //    ClusterIoTDB.getInstance()
+    //        .setClientProvider(
+    //            new DataClientProvider(new Factory()) {
+    //              @Override
+    //              public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws
+    // IOException {
+    //                return new AsyncDataClient(null, null, node, null) {
+    //                  @Override
+    //                  public void fetchMultSeries(
+    //                      RaftNode header,
+    //                      long readerId,
+    //                      List<String> paths,
+    //                      AsyncMethodCallback<Map<String, ByteBuffer>> resultHandler)
+    //                      throws TException {
+    //                    if (failedNodes.contains(node)) {
+    //                      throw new TException("Node down.");
+    //                    }
+    //
+    //                    new Thread(
+    //                            () -> {
+    //                              Map<String, ByteBuffer> stringByteBufferMap = Maps.newHashMap();
+    //                              if (batchUsed) {
+    //                                paths.forEach(
+    //                                    path -> {
+    //                                      stringByteBufferMap.put(path, ByteBuffer.allocate(0));
+    //                                    });
+    //                              } else {
+    //                                batchUsed = true;
+    //
+    //                                for (int i = 0; i < batchData.size(); i++) {
+    //                                  stringByteBufferMap.put(
+    //                                      paths.get(i), generateByteBuffer(batchData.get(i)));
+    //                                }
+    //
+    //                                resultHandler.onComplete(stringByteBufferMap);
+    //                              }
+    //                            })
+    //                        .start();
+    //                  }
+    //
+    //                  @Override
+    //                  public void queryMultSeries(
+    //                      MultSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler)
+    //                      throws TException {
+    //                    if (failedNodes.contains(node)) {
+    //                      throw new TException("Node down.");
+    //                    }
+    //
+    //                    new Thread(() -> resultHandler.onComplete(1L)).start();
+    //                  }
+    //                };
+    //              }
+    //            });
   }
 
   private void setSyncDataClient() {
-    ClusterIoTDB.getInstance()
-        .setClientProvider(
-            new DataClientProvider(new Factory()) {
-              @Override
-              public SyncDataClient getSyncDataClient(Node node, int timeout) {
-                return new SyncDataClient(null) {
-                  @Override
-                  public Map<String, ByteBuffer> fetchMultSeries(
-                      RaftNode header, long readerId, List<String> paths) throws TException {
-                    if (failedNodes.contains(node)) {
-                      throw new TException("Node down.");
-                    }
-
-                    Map<String, ByteBuffer> stringByteBufferMap = Maps.newHashMap();
-                    if (batchUsed) {
-                      paths.forEach(
-                          path -> {
-                            stringByteBufferMap.put(path, ByteBuffer.allocate(0));
-                          });
-                    } else {
-                      batchUsed = true;
-                      for (int i = 0; i < batchData.size(); i++) {
-                        stringByteBufferMap.put(paths.get(i), generateByteBuffer(batchData.get(i)));
-                      }
-                    }
-                    return stringByteBufferMap;
-                  }
-
-                  @Override
-                  public long queryMultSeries(MultSeriesQueryRequest request) throws TException {
-                    if (failedNodes.contains(node)) {
-                      throw new TException("Node down.");
-                    }
-
-                    return 1L;
-                  }
-                };
-              }
-            });
+    //    ClusterIoTDB.getInstance()
+    //        .setClientProvider(
+    //            new DataClientProvider(new Factory()) {
+    //              @Override
+    //              public SyncDataClient getSyncDataClient(Node node, int timeout) {
+    //                return new SyncDataClient(null) {
+    //                  @Override
+    //                  public Map<String, ByteBuffer> fetchMultSeries(
+    //                      RaftNode header, long readerId, List<String> paths) throws TException {
+    //                    if (failedNodes.contains(node)) {
+    //                      throw new TException("Node down.");
+    //                    }
+    //
+    //                    Map<String, ByteBuffer> stringByteBufferMap = Maps.newHashMap();
+    //                    if (batchUsed) {
+    //                      paths.forEach(
+    //                          path -> {
+    //                            stringByteBufferMap.put(path, ByteBuffer.allocate(0));
+    //                          });
+    //                    } else {
+    //                      batchUsed = true;
+    //                      for (int i = 0; i < batchData.size(); i++) {
+    //                        stringByteBufferMap.put(paths.get(i),
+    // generateByteBuffer(batchData.get(i)));
+    //                      }
+    //                    }

Review comment:
       same

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java.orig
##########
@@ -0,0 +1,143 @@
+/*
+ *   * Licensed to the Apache Software Foundation (ASF) under one  * or more contributor license agreements.  See the NOTICE file  * distributed with this work for additional information  * regarding copyright ownership.  The ASF licenses this file  * to you under the Apache License, Version 2.0 (the  * "License"); you may not use this file except in compliance  * with the License.  You may obtain a copy of the License at  *  *     http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing,  * software distributed under the License is distributed on an  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY  * KIND, either express or implied.  See the License for the  * specific language governing permissions and limitations  * under the License.
+ */
+
+package org.apache.iotdb.cluster.client.async;
+
+<<<<<<< HEAD

Review comment:
       what's this?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/utils/ClientUtils.java
##########
@@ -19,52 +19,42 @@
 
 package org.apache.iotdb.cluster.utils;
 
-import org.apache.iotdb.cluster.client.async.AsyncDataClient;
-import org.apache.iotdb.cluster.client.async.AsyncDataHeartbeatClient;
-import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
-import org.apache.iotdb.cluster.client.async.AsyncMetaHeartbeatClient;
+import org.apache.iotdb.cluster.client.ClientCategory;
 import org.apache.iotdb.cluster.client.sync.SyncDataClient;
-import org.apache.iotdb.cluster.client.sync.SyncDataHeartbeatClient;
 import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
-import org.apache.iotdb.cluster.client.sync.SyncMetaHeartbeatClient;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
 
 public class ClientUtils {
 
   private ClientUtils() {
     // util class
   }
 
-  public static boolean isHeartbeatClientReady(AsyncClient client) {
-    if (client instanceof AsyncDataHeartbeatClient) {
-      return ((AsyncDataHeartbeatClient) client).isReady();
-    } else {
-      return ((AsyncMetaHeartbeatClient) client).isReady();
-    }
-  }
-
-  public static void putBackSyncHeartbeatClient(Client client) {
-    if (client instanceof SyncMetaHeartbeatClient) {
-      ((SyncMetaHeartbeatClient) client).putBack();
-    } else {
-      ((SyncDataHeartbeatClient) client).putBack();
-    }
-  }
-
-  public static void putBackSyncClient(Client client) {
-    if (client instanceof SyncDataClient) {
-      ((SyncDataClient) client).putBack();
-    } else if (client instanceof SyncMetaClient) {
-      ((SyncMetaClient) client).putBack();
+  public static int getPort(Node node, ClientCategory category) {
+    int port = -1;
+    if (category == ClientCategory.DATA) {
+      port = node.getDataPort();
+    } else if (ClientCategory.DATA_HEARTBEAT == category) {
+      port = node.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET;
+    } else if (ClientCategory.META == category) {
+      port = node.getMetaPort();
+    } else if (ClientCategory.META_HEARTBEAT == category) {
+      port = node.getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET;
+    } else if (ClientCategory.SINGLE_MASTER == category) {
+      // special data port type
+      port = node.getMetaPort();

Review comment:
       should be `node.getDataPort()` ?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
##########
@@ -0,0 +1,201 @@
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.ClusterIoTDB;
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * One should borrow the reusable client from this class and return the client after use. The
+ * underlying client pool is powered by Apache Commons Pool. The class provided 3 default pool group
+ * according to current usage: ClusterClients, DataGroupClients, MetaGroupClients.
+ *
+ * <p>ClusterClients implement the data query and insert interfaces such as query and non-query
+ * request
+ *
+ * <p>DataGroupClient implement the data group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>MetaGroupClient implement the meta group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>TODO: We can refine the client structure by reorg the interfaces defined in cluster-thrift.
+ */
+public class ClientManager implements IClientPool {
+
+  private static final Logger logger = LoggerFactory.getLogger(ClusterIoTDB.class);
+
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.AsyncClient>> asyncClientPoolMap;
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.Client>> syncClientPoolMap;
+
+  public enum Type {
+    ClusterClient,
+    DataGroupClient,
+    MetaGroupClient
+  }
+
+  public ClientManager(boolean isAsyncMode, Type type) {
+    if (isAsyncMode) {
+      asyncClientPoolMap = Maps.newHashMap();
+      constructAsyncClientMap(type);
+    } else {
+      syncClientPoolMap = Maps.newHashMap();
+      constructSyncClientMap(type);
+    }
+  }
+
+  private void constructAsyncClientMap(Type type) {
+    switch (type) {
+      case ClusterClient:
+        asyncClientPoolMap.put(
+            ClientCategory.DATA,
+            ClientPoolFactory.getInstance().createAsyncDataPool(ClientCategory.DATA));
+        break;
+      case MetaGroupClient:
+        asyncClientPoolMap.put(
+            ClientCategory.META,
+            ClientPoolFactory.getInstance().createAsyncMetaPool(ClientCategory.META));
+        asyncClientPoolMap.put(
+            ClientCategory.META_HEARTBEAT,
+            ClientPoolFactory.getInstance().createAsyncMetaPool(ClientCategory.META_HEARTBEAT));
+        break;
+      case DataGroupClient:
+        asyncClientPoolMap.put(
+            ClientCategory.DATA,
+            ClientPoolFactory.getInstance().createAsyncDataPool(ClientCategory.DATA));
+        asyncClientPoolMap.put(
+            ClientCategory.DATA_HEARTBEAT,
+            ClientPoolFactory.getInstance().createAsyncDataPool(ClientCategory.DATA_HEARTBEAT));
+        asyncClientPoolMap.put(
+            ClientCategory.SINGLE_MASTER,
+            ClientPoolFactory.getInstance().createSingleManagerAsyncDataPool());
+        break;
+      default:
+        break;
+    }
+  }
+
+  private void constructSyncClientMap(Type type) {
+    switch (type) {
+      case ClusterClient:
+        syncClientPoolMap.put(
+            ClientCategory.DATA,
+            ClientPoolFactory.getInstance().createSyncDataPool(ClientCategory.DATA));
+        break;
+      case MetaGroupClient:
+        syncClientPoolMap.put(
+            ClientCategory.META,
+            ClientPoolFactory.getInstance().createSyncMetaPool(ClientCategory.META));
+        syncClientPoolMap.put(
+            ClientCategory.META_HEARTBEAT,
+            ClientPoolFactory.getInstance().createSyncMetaPool(ClientCategory.META_HEARTBEAT));
+        break;
+      case DataGroupClient:
+        syncClientPoolMap.put(
+            ClientCategory.DATA,
+            ClientPoolFactory.getInstance().createSyncDataPool(ClientCategory.DATA));
+        syncClientPoolMap.put(
+            ClientCategory.DATA_HEARTBEAT,
+            ClientPoolFactory.getInstance().createSyncDataPool(ClientCategory.DATA_HEARTBEAT));
+        break;
+      default:
+        break;

Review comment:
       same as above

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
##########
@@ -19,64 +19,142 @@
 
 package org.apache.iotdb.cluster.client.sync;
 
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.IClientPool;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.TSDataService.Client;
+import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TimeoutChangeableTransport;
 
+import org.apache.commons.pool2.KeyedPooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
+import java.net.SocketException;
+
 /**
  * Notice: Because a client will be returned to a pool immediately after a successful request, you
- * should not cache it anywhere else or there may be conflicts.
+ * should not cache it anywhere else.
  */
-// the two classes does not share a common parent and Java does not allow multiple extension
-@SuppressWarnings("common-java:DuplicatedBlocks")
-public class SyncDataClient extends TSDataServiceClient {
+// TODO: Refine the interfaces of TSDataService. TSDataService interfaces doesn't need extends
+// TODO: RaftService interfaces
+public class SyncDataClient extends Client {

Review comment:
       Since you have done a great abstraction, can we merge `SyncDataClient` and `SyncMetaClient` into one class? It seems their difference only lies on `category` now?

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaClientTest.java.orig
##########
@@ -0,0 +1,165 @@
+/*
+ *   * Licensed to the Apache Software Foundation (ASF) under one  * or more contributor license agreements.  See the NOTICE file  * distributed with this work for additional information  * regarding copyright ownership.  The ASF licenses this file  * to you under the Apache License, Version 2.0 (the  * "License"); you may not use this file except in compliance  * with the License.  You may obtain a copy of the License at  *  *     http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing,  * software distributed under the License is distributed on an  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY  * KIND, either express or implied.  See the License for the  * specific language governing permissions and limitations  * under the License.
+ */
+
+package org.apache.iotdb.cluster.client.sync;
+
+<<<<<<< HEAD
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient.Factory;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.rpc.TSocketWrapper;
+
+import org.apache.thrift.protocol.TBinaryProtocol;
+=======
+import org.apache.iotdb.cluster.client.BaseClientTest;
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.Assert;
+import org.junit.Before;
+>>>>>>> [cluster-refactor] refactor client pool
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.SocketException;
+
+import static org.junit.Assert.assertEquals;
+
+public class SyncMetaClientTest extends BaseClientTest {
+
+  private TProtocolFactory protocolFactory;
+
+  @Before
+  public void setUp() {
+    protocolFactory =
+        ClusterDescriptor.getInstance().getConfig().isRpcThriftCompressionEnabled()
+            ? new TCompactProtocol.Factory()
+            : new TBinaryProtocol.Factory();
+  }
+
+  @Test
+  public void testMetaClient() throws IOException, InterruptedException, TTransportException {
+    try {
+<<<<<<< HEAD
+      SyncClientPool syncClientPool =
+          new SyncClientPool(new Factory(new TBinaryProtocol.Factory()));
+      SyncMetaClient client;
+      client = (SyncMetaClient) syncClientPool.getClient(node);
+
+      assertEquals(node, client.getTarget());
+
+      client.putBack();
+      Client newClient = syncClientPool.getClient(node);
+      assertEquals(client, newClient);
+      assertTrue(client.getInputProtocol().getTransport().isOpen());
+
+      client =
+          new SyncMetaClient(
+              new TBinaryProtocol(TSocketWrapper.wrap(node.getInternalIp(), node.getDataPort())));
+      // client without a belong pool will be closed after putBack()
+      client.putBack();
+      assertFalse(client.getInputProtocol().getTransport().isOpen());
+=======
+      startMetaServer();
+      SyncMetaClient metaClient =
+          new SyncMetaClient(protocolFactory, defaultNode, ClientCategory.META);
+
+      assertEquals(
+          "SyncMetaClient{node=Node(internalIp:localhost, metaPort:9003, nodeIdentifier:0, "
+              + "dataPort:40010, clientPort:0, clientIp:localhost),port=9003}",
+          metaClient.toString());
+
+      assertCheck(metaClient);
+
+      metaClient =
+          new SyncMetaClient.SyncMetaClientFactory(protocolFactory, ClientCategory.META)
+              .makeObject(defaultNode)
+              .getObject();
+
+      assertEquals(
+          "SyncMetaClient{node=Node(internalIp:localhost, metaPort:9003, nodeIdentifier:0, "
+              + "dataPort:40010, clientPort:0, clientIp:localhost),port=9003}",
+          metaClient.toString());
+
+      assertCheck(metaClient);
+    } catch (Exception e) {
+      e.printStackTrace();
+>>>>>>> [cluster-refactor] refactor client pool
+    } finally {
+      stopMetaServer();
+    }
+  }
+
+  @Test
+  public void testDataHeartbeatClient()
+      throws IOException, InterruptedException, TTransportException {
+    try {
+<<<<<<< HEAD

Review comment:
       same

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
##########
@@ -19,64 +19,124 @@
 
 package org.apache.iotdb.cluster.client.sync;
 
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.IClientPool;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Client;
+import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TimeoutChangeableTransport;
 
-import org.apache.thrift.protocol.TProtocol;
+import org.apache.commons.pool2.KeyedPooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
+import java.net.SocketException;
+
 /**
  * Notice: Because a client will be returned to a pool immediately after a successful request, you
- * should not cache it anywhere else or there may be conflicts.
+ * should not cache it anywhere else.
  */
-// the two classes does not share a common parent and Java does not allow multiple extension
-@SuppressWarnings("common-java:DuplicatedBlocks")
-public class SyncMetaClient extends TSMetaServiceClient {
+public class SyncMetaClient extends Client {
 
-  /** @param prot this constructor just create a new instance, but do not open the connection */
-  @TestOnly
-  SyncMetaClient(TProtocol prot) {
-    super(prot);
-  }
+  private Node node;
+  private ClientCategory category;
+  private IClientPool clientPool;
 
-  private SyncMetaClient(TProtocolFactory protocolFactory, Node target, SyncClientPool pool)
+  public SyncMetaClient(TProtocolFactory protocolFactory, Node node, ClientCategory category)
       throws TTransportException {
     super(
-        protocolFactory,
-        target.getInternalIp(),
-        target.getMetaPort(),
-        ClusterConstant.getConnectionTimeoutInMS(),
-        target,
-        pool);
+        protocolFactory.getProtocol(
+            RpcTransportFactory.INSTANCE.getTransport(
+                new TSocket(
+                    TConfigurationConst.defaultTConfiguration,
+                    node.getInternalIp(),
+                    ClientUtils.getPort(node, category),
+                    ClusterConstant.getConnectionTimeoutInMS()))));
+    this.node = node;
+    this.category = category;
+    getInputProtocol().getTransport().open();
+  }
+
+  public void setClientPool(IClientPool clientPool) {
+    this.clientPool = clientPool;
+  }
+
+  public void returnSelf() {
+    if (clientPool != null) clientPool.returnSyncClient(this, node, category);
+  }
+
+  public void setTimeout(int timeout) {
+    // the same transport is used in both input and output
+    ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
+  }
+
+  @TestOnly
+  public int getTimeout() throws SocketException {
+    return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
+  }
+
+  public void close() {
+    getInputProtocol().getTransport().close();
+  }
+
+  public Node getNode() {
+    return node;
   }
 
   @Override
   public String toString() {
-    return String.format(
-        "SyncMetaClient (ip = %s, port = %d, id = %d)",
-        target.getInternalIp(), target.getMetaPort(), target.getNodeIdentifier());
+    return "Sync"
+        + category.getName()
+        + "{"
+        + "node="
+        + node
+        + ","
+        + "port="
+        + ClientUtils.getPort(node, category)
+        + '}';
   }
 
-  public static class Factory implements SyncClientFactory {
+  public static class SyncMetaClientFactory
+      implements KeyedPooledObjectFactory<Node, SyncMetaClient> {
 
     private TProtocolFactory protocolFactory;
+    private ClientCategory category;
 
-    public Factory(TProtocolFactory protocolFactory) {
+    public SyncMetaClientFactory(TProtocolFactory protocolFactory, ClientCategory category) {
       this.protocolFactory = protocolFactory;
+      this.category = category;
     }
 
     @Override
-    public SyncMetaClient getSyncClient(Node node, SyncClientPool pool) throws TTransportException {
-      return new SyncMetaClient(protocolFactory, node, pool);
+    public void activateObject(Node node, PooledObject<SyncMetaClient> pooledObject)
+        throws Exception {}
+
+    @Override
+    public void destroyObject(Node node, PooledObject<SyncMetaClient> pooledObject)
+        throws Exception {
+      pooledObject.getObject().getInputProtocol().getTransport().close();

Review comment:
       why not `pooledObject.getObject().close();` ?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/utils/ClientUtils.java
##########
@@ -19,52 +19,42 @@
 
 package org.apache.iotdb.cluster.utils;
 
-import org.apache.iotdb.cluster.client.async.AsyncDataClient;
-import org.apache.iotdb.cluster.client.async.AsyncDataHeartbeatClient;
-import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
-import org.apache.iotdb.cluster.client.async.AsyncMetaHeartbeatClient;
+import org.apache.iotdb.cluster.client.ClientCategory;
 import org.apache.iotdb.cluster.client.sync.SyncDataClient;
-import org.apache.iotdb.cluster.client.sync.SyncDataHeartbeatClient;
 import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
-import org.apache.iotdb.cluster.client.sync.SyncMetaHeartbeatClient;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
 
 public class ClientUtils {
 
   private ClientUtils() {
     // util class
   }
 
-  public static boolean isHeartbeatClientReady(AsyncClient client) {
-    if (client instanceof AsyncDataHeartbeatClient) {
-      return ((AsyncDataHeartbeatClient) client).isReady();
-    } else {
-      return ((AsyncMetaHeartbeatClient) client).isReady();
-    }
-  }
-
-  public static void putBackSyncHeartbeatClient(Client client) {
-    if (client instanceof SyncMetaHeartbeatClient) {
-      ((SyncMetaHeartbeatClient) client).putBack();
-    } else {
-      ((SyncDataHeartbeatClient) client).putBack();
-    }
-  }
-
-  public static void putBackSyncClient(Client client) {
-    if (client instanceof SyncDataClient) {
-      ((SyncDataClient) client).putBack();
-    } else if (client instanceof SyncMetaClient) {
-      ((SyncMetaClient) client).putBack();
+  public static int getPort(Node node, ClientCategory category) {
+    int port = -1;
+    if (category == ClientCategory.DATA) {

Review comment:
       Although it's trivial, I'm curious that why you do not use `switch`

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataClientTest.java.orig
##########
@@ -0,0 +1,165 @@
+/*
+ *   * Licensed to the Apache Software Foundation (ASF) under one  * or more contributor license agreements.  See the NOTICE file  * distributed with this work for additional information  * regarding copyright ownership.  The ASF licenses this file  * to you under the Apache License, Version 2.0 (the  * "License"); you may not use this file except in compliance  * with the License.  You may obtain a copy of the License at  *  *     http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing,  * software distributed under the License is distributed on an  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY  * KIND, either express or implied.  See the License for the  * specific language governing permissions and limitations  * under the License.
+ */
+
+package org.apache.iotdb.cluster.client.sync;
+
+<<<<<<< HEAD
+import org.apache.iotdb.cluster.client.sync.SyncDataClient.Factory;
+import org.apache.iotdb.cluster.rpc.thrift.Node;

Review comment:
       same

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReaderTest.java
##########
@@ -73,51 +62,55 @@ public void setUp() {
     batchUsed = false;
     metaGroupMember = new TestMetaGroupMember();
     // TODO fixme : 恢复正常的provider
-    ClusterIoTDB.getInstance()
-        .setClientProvider(
-            new DataClientProvider(new Factory()) {
-              @Override
-              public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
-                return new AsyncDataClient(null, null, node, null) {
-                  @Override
-                  public void fetchSingleSeries(
-                      RaftNode header, long readerId, AsyncMethodCallback<ByteBuffer> resultHandler)
-                      throws TException {
-                    if (failedNodes.contains(node)) {
-                      throw new TException("Node down.");
-                    }
-
-                    new Thread(
-                            () -> {
-                              if (batchUsed) {
-                                resultHandler.onComplete(ByteBuffer.allocate(0));
-                              } else {
-                                ByteArrayOutputStream byteArrayOutputStream =
-                                    new ByteArrayOutputStream();
-                                DataOutputStream dataOutputStream =
-                                    new DataOutputStream(byteArrayOutputStream);
-                                SerializeUtils.serializeBatchData(batchData, dataOutputStream);
-                                batchUsed = true;
-                                resultHandler.onComplete(
-                                    ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
-                              }
-                            })
-                        .start();
-                  }
-
-                  @Override
-                  public void querySingleSeries(
-                      SingleSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler)
-                      throws TException {
-                    if (failedNodes.contains(node)) {
-                      throw new TException("Node down.");
-                    }
-
-                    new Thread(() -> resultHandler.onComplete(1L)).start();
-                  }
-                };
-              }
-            });
+    //    ClusterIoTDB.getInstance()

Review comment:
       Should we remove these codes? What does this mean?

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathManagedMergeReaderTest.java
##########
@@ -129,58 +120,59 @@ public void testMultManagerMergeRemoteSeriesReader() throws IOException, Storage
   }
 
   private void setAsyncDataClient() {
-    ClusterIoTDB.getInstance()
-        .setClientProvider(
-            new DataClientProvider(new Factory()) {
-              @Override
-              public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
-                return new AsyncDataClient(null, null, node, null) {
-                  @Override
-                  public void fetchMultSeries(
-                      RaftNode header,
-                      long readerId,
-                      List<String> paths,
-                      AsyncMethodCallback<Map<String, ByteBuffer>> resultHandler)
-                      throws TException {
-                    if (failedNodes.contains(node)) {
-                      throw new TException("Node down.");
-                    }
-
-                    new Thread(
-                            () -> {
-                              Map<String, ByteBuffer> stringByteBufferMap = Maps.newHashMap();
-                              if (batchUsed) {
-                                paths.forEach(
-                                    path -> {
-                                      stringByteBufferMap.put(path, ByteBuffer.allocate(0));
-                                    });
-                              } else {
-                                batchUsed = true;
-
-                                for (int i = 0; i < batchData.size(); i++) {
-                                  stringByteBufferMap.put(
-                                      paths.get(i), generateByteBuffer(batchData.get(i)));
-                                }
-
-                                resultHandler.onComplete(stringByteBufferMap);
-                              }
-                            })
-                        .start();
-                  }
-
-                  @Override
-                  public void queryMultSeries(
-                      MultSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler)
-                      throws TException {
-                    if (failedNodes.contains(node)) {
-                      throw new TException("Node down.");
-                    }
-
-                    new Thread(() -> resultHandler.onComplete(1L)).start();
-                  }
-                };
-              }
-            });
+    //    ClusterIoTDB.getInstance()

Review comment:
       same as above

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
##########
@@ -19,64 +19,142 @@
 
 package org.apache.iotdb.cluster.client.sync;
 
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.IClientPool;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.TSDataService.Client;
+import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TimeoutChangeableTransport;
 
+import org.apache.commons.pool2.KeyedPooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
+import java.net.SocketException;
+
 /**
  * Notice: Because a client will be returned to a pool immediately after a successful request, you
- * should not cache it anywhere else or there may be conflicts.
+ * should not cache it anywhere else.
  */
-// the two classes does not share a common parent and Java does not allow multiple extension
-@SuppressWarnings("common-java:DuplicatedBlocks")
-public class SyncDataClient extends TSDataServiceClient {
+// TODO: Refine the interfaces of TSDataService. TSDataService interfaces doesn't need extends
+// TODO: RaftService interfaces
+public class SyncDataClient extends Client {
+
+  private Node node;
+  private ClientCategory category;
+  private IClientPool clientPool;
 
-  /** @param prot this constructor just create a new instance, but do not open the connection */
   @TestOnly
   public SyncDataClient(TProtocol prot) {
     super(prot);
   }
 
-  SyncDataClient(TProtocolFactory protocolFactory, Node target, SyncClientPool pool)
+  public SyncDataClient(TProtocolFactory protocolFactory, Node node, ClientCategory category)
       throws TTransportException {
+
+    // the difference of the two clients lies in the port
     super(
-        protocolFactory,
-        target.getInternalIp(),
-        target.getDataPort(),
-        ClusterConstant.getConnectionTimeoutInMS(),
-        target,
-        pool);
+        protocolFactory.getProtocol(
+            RpcTransportFactory.INSTANCE.getTransport(
+                new TSocket(
+                    TConfigurationConst.defaultTConfiguration,
+                    node.getInternalIp(),
+                    ClientUtils.getPort(node, category),
+                    ClusterConstant.getConnectionTimeoutInMS()))));
+    this.node = node;
+    this.category = category;
+    getInputProtocol().getTransport().open();
+  }
+
+  public void setClientPool(IClientPool clientPool) {
+    this.clientPool = clientPool;
+  }
+
+  public void returnSelf() {
+    if (clientPool != null) clientPool.returnSyncClient(this, node, category);
+  }
+
+  public void setTimeout(int timeout) {
+    // the same transport is used in both input and output
+    ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
+  }
+
+  public void close() {
+    getInputProtocol().getTransport().close();
+  }
+
+  @TestOnly
+  public int getTimeout() throws SocketException {
+    return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
   }
 
   @Override
   public String toString() {
-    return String.format(
-        "SyncDataClient (ip = %s, port = %d, id = %d)",
-        target.getInternalIp(), target.getDataPort(), target.getNodeIdentifier());
+    return "Sync"
+        + category.getName()
+        + "{"
+        + "node="
+        + node
+        + ","
+        + "port="
+        + ClientUtils.getPort(node, category)
+        + '}';
+  }
+
+  public Node getNode() {
+    return node;
   }
 
-  public static class Factory implements SyncClientFactory {
+  public static class SyncDataClientFactory
+      implements KeyedPooledObjectFactory<Node, SyncDataClient> {
 
     private TProtocolFactory protocolFactory;
+    private ClientCategory category;
 
-    public Factory(TProtocolFactory protocolFactory) {
+    public SyncDataClientFactory(TProtocolFactory protocolFactory, ClientCategory category) {
       this.protocolFactory = protocolFactory;
+      this.category = category;
+    }
+
+    //  public String nodeInfo(Node node) {

Review comment:
       just remove these codes

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReaderTest.java
##########
@@ -182,100 +174,103 @@ public void testDefaultBatchStrategySelect() {
   }
 
   private void setAsyncDataClient() {
-    ClusterIoTDB.getInstance()
-        .setClientProvider(
-            new DataClientProvider(new Factory()) {
-              @Override
-              public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
-                return new AsyncDataClient(null, null, node, null) {
-                  @Override
-                  public void fetchMultSeries(
-                      RaftNode header,
-                      long readerId,
-                      List<String> paths,
-                      AsyncMethodCallback<Map<String, ByteBuffer>> resultHandler)
-                      throws TException {
-                    if (failedNodes.contains(node)) {
-                      throw new TException("Node down.");
-                    }
-
-                    new Thread(
-                            () -> {
-                              Map<String, ByteBuffer> stringByteBufferMap = Maps.newHashMap();
-                              if (batchUsed) {
-                                paths.forEach(
-                                    path -> {
-                                      stringByteBufferMap.put(path, ByteBuffer.allocate(0));
-                                    });
-                              } else {
-                                batchUsed = true;
-
-                                for (int i = 0; i < batchData.size(); i++) {
-                                  stringByteBufferMap.put(
-                                      paths.get(i), generateByteBuffer(batchData.get(i)));
-                                }
-
-                                resultHandler.onComplete(stringByteBufferMap);
-                              }
-                            })
-                        .start();
-                  }
-
-                  @Override
-                  public void queryMultSeries(
-                      MultSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler)
-                      throws TException {
-                    if (failedNodes.contains(node)) {
-                      throw new TException("Node down.");
-                    }
-
-                    new Thread(() -> resultHandler.onComplete(1L)).start();
-                  }
-                };
-              }
-            });
+    //    ClusterIoTDB.getInstance()
+    //        .setClientProvider(
+    //            new DataClientProvider(new Factory()) {
+    //              @Override
+    //              public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws
+    // IOException {
+    //                return new AsyncDataClient(null, null, node, null) {
+    //                  @Override
+    //                  public void fetchMultSeries(
+    //                      RaftNode header,
+    //                      long readerId,
+    //                      List<String> paths,
+    //                      AsyncMethodCallback<Map<String, ByteBuffer>> resultHandler)
+    //                      throws TException {
+    //                    if (failedNodes.contains(node)) {
+    //                      throw new TException("Node down.");
+    //                    }
+    //
+    //                    new Thread(
+    //                            () -> {

Review comment:
       same




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org