You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2021/09/01 11:20:48 UTC

[GitHub] [iotdb] chengjianyun opened a new pull request #3886: [cluster-refactor] refactor client pool

chengjianyun opened a new pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886


   This is a draft PR. 
   
   Here introduce the main idea of the refactor. 
   
   - Let client pool recycle client resource instead do it in client.
   - Extract some interfaces to make code clean.
   
   The PR will contain:
   
   - [ ] Close client means close the connection after refactor.
   - [x] extract interface for ClientFactory
   - [x] extract interface for ClientPool
   - [ ] fix the test cases error
   - [ ] self test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] chengjianyun commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
chengjianyun commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r709815273



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
##########
@@ -0,0 +1,203 @@
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.ClusterIoTDB;
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * One should borrow the reusable client from this class and return the client after use. The
+ * underlying client pool is powered by Apache Commons Pool. The class provided 3 default pool group
+ * according to current usage: ClusterClients, DataGroupClients, MetaGroupClients.
+ *
+ * <p>ClusterClients implement the data query and insert interfaces such as query and non-query
+ * request
+ *
+ * <p>DataGroupClient implement the data group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>MetaGroupClient implement the meta group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>TODO: We can refine the client structure by reorg the interfaces defined in cluster-thrift.
+ */
+public class ClientManager implements IClientPool {
+
+  private static final Logger logger = LoggerFactory.getLogger(ClusterIoTDB.class);
+
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.AsyncClient>> asyncClientPoolMap;
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.Client>> syncClientPoolMap;
+
+  public enum Type {
+    ClusterClient,
+    DataGroupClient,
+    MetaGroupClient
+  }
+
+  public ClientManager(boolean isAsyncMode, Type type) {

Review comment:
       That's not a good idea from my view. `ClientManager` should know nothing about `ClusterConfig` to make it reusable and write UT easily. Same for all of clients implementation, I didn't parameterize the config in clients to avoid the refactor breaking original logic too much. Ideally, all the config should pass into client via parameters.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] chengjianyun commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
chengjianyun commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r708791663



##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java.orig
##########
@@ -0,0 +1,143 @@
+/*
+ *   * Licensed to the Apache Software Foundation (ASF) under one  * or more contributor license agreements.  See the NOTICE file  * distributed with this work for additional information  * regarding copyright ownership.  The ASF licenses this file  * to you under the Apache License, Version 2.0 (the  * "License"); you may not use this file except in compliance  * with the License.  You may obtain a copy of the License at  *  *     http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing,  * software distributed under the License is distributed on an  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY  * KIND, either express or implied.  See the License for the  * specific language governing permissions and limitations  * under the License.
+ */
+
+package org.apache.iotdb.cluster.client.async;
+
+<<<<<<< HEAD

Review comment:
       Sorry, sounds like the merge temp file, should be removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] wangchao316 commented on pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
wangchao316 commented on pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#issuecomment-919734397


   Hi, Thanks your contribution.
   I have some question for this pr.
   
   1. Check whether the refactoring performance is tested because the pool directly affects the query and insertion performance.
   2. Whether rolling restart of a node is supported, which does not affect external query or write.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] LebronAl commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
LebronAl commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r709929250



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
##########
@@ -0,0 +1,203 @@
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.ClusterIoTDB;
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * One should borrow the reusable client from this class and return the client after use. The
+ * underlying client pool is powered by Apache Commons Pool. The class provided 3 default pool group
+ * according to current usage: ClusterClients, DataGroupClients, MetaGroupClients.
+ *
+ * <p>ClusterClients implement the data query and insert interfaces such as query and non-query
+ * request
+ *
+ * <p>DataGroupClient implement the data group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>MetaGroupClient implement the meta group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>TODO: We can refine the client structure by reorg the interfaces defined in cluster-thrift.
+ */
+public class ClientManager implements IClientPool {
+
+  private static final Logger logger = LoggerFactory.getLogger(ClusterIoTDB.class);
+
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.AsyncClient>> asyncClientPoolMap;
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.Client>> syncClientPoolMap;
+
+  public enum Type {
+    ClusterClient,
+    DataGroupClient,
+    MetaGroupClient
+  }
+
+  public ClientManager(boolean isAsyncMode, Type type) {

Review comment:
       OK, I have no problem with that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] chengjianyun commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
chengjianyun commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r708800944



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
##########
@@ -19,64 +19,142 @@
 
 package org.apache.iotdb.cluster.client.sync;
 
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.IClientPool;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.TSDataService.Client;
+import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TimeoutChangeableTransport;
 
+import org.apache.commons.pool2.KeyedPooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
+import java.net.SocketException;
+
 /**
  * Notice: Because a client will be returned to a pool immediately after a successful request, you
- * should not cache it anywhere else or there may be conflicts.
+ * should not cache it anywhere else.
  */
-// the two classes does not share a common parent and Java does not allow multiple extension
-@SuppressWarnings("common-java:DuplicatedBlocks")
-public class SyncDataClient extends TSDataServiceClient {
+// TODO: Refine the interfaces of TSDataService. TSDataService interfaces doesn't need extends
+// TODO: RaftService interfaces
+public class SyncDataClient extends Client {

Review comment:
       I'd really like to do that. But DataClient and MetaClient need to extend different super class separately: TSDataService.(Async)Client and TSMetaService.(Async)Client. Another way is make Client is a wrapper of  TSDataService.(Async)Client and TSMetaService.(Async)Client. It's means we need to rewrite all methods in (Async)Client which I don't think a good way. Actually, we should reorg the client interfaces to make it much clear but I don't suggest we do in this refactor as it has been a big change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] chengjianyun commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
chengjianyun commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r708795665



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/utils/ClientUtils.java
##########
@@ -19,52 +19,42 @@
 
 package org.apache.iotdb.cluster.utils;
 
-import org.apache.iotdb.cluster.client.async.AsyncDataClient;
-import org.apache.iotdb.cluster.client.async.AsyncDataHeartbeatClient;
-import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
-import org.apache.iotdb.cluster.client.async.AsyncMetaHeartbeatClient;
+import org.apache.iotdb.cluster.client.ClientCategory;
 import org.apache.iotdb.cluster.client.sync.SyncDataClient;
-import org.apache.iotdb.cluster.client.sync.SyncDataHeartbeatClient;
 import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
-import org.apache.iotdb.cluster.client.sync.SyncMetaHeartbeatClient;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
 
 public class ClientUtils {
 
   private ClientUtils() {
     // util class
   }
 
-  public static boolean isHeartbeatClientReady(AsyncClient client) {
-    if (client instanceof AsyncDataHeartbeatClient) {
-      return ((AsyncDataHeartbeatClient) client).isReady();
-    } else {
-      return ((AsyncMetaHeartbeatClient) client).isReady();
-    }
-  }
-
-  public static void putBackSyncHeartbeatClient(Client client) {
-    if (client instanceof SyncMetaHeartbeatClient) {
-      ((SyncMetaHeartbeatClient) client).putBack();
-    } else {
-      ((SyncDataHeartbeatClient) client).putBack();
-    }
-  }
-
-  public static void putBackSyncClient(Client client) {
-    if (client instanceof SyncDataClient) {
-      ((SyncDataClient) client).putBack();
-    } else if (client instanceof SyncMetaClient) {
-      ((SyncMetaClient) client).putBack();
+  public static int getPort(Node node, ClientCategory category) {
+    int port = -1;
+    if (category == ClientCategory.DATA) {
+      port = node.getDataPort();
+    } else if (ClientCategory.DATA_HEARTBEAT == category) {
+      port = node.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET;
+    } else if (ClientCategory.META == category) {
+      port = node.getMetaPort();
+    } else if (ClientCategory.META_HEARTBEAT == category) {
+      port = node.getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET;
+    } else if (ClientCategory.SINGLE_MASTER == category) {
+      // special data port type
+      port = node.getMetaPort();

Review comment:
       My fault, it's data client. Fixing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] LebronAl commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
LebronAl commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r708795249



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/utils/ClientUtils.java
##########
@@ -19,52 +19,42 @@
 
 package org.apache.iotdb.cluster.utils;
 
-import org.apache.iotdb.cluster.client.async.AsyncDataClient;
-import org.apache.iotdb.cluster.client.async.AsyncDataHeartbeatClient;
-import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
-import org.apache.iotdb.cluster.client.async.AsyncMetaHeartbeatClient;
+import org.apache.iotdb.cluster.client.ClientCategory;
 import org.apache.iotdb.cluster.client.sync.SyncDataClient;
-import org.apache.iotdb.cluster.client.sync.SyncDataHeartbeatClient;
 import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
-import org.apache.iotdb.cluster.client.sync.SyncMetaHeartbeatClient;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
 
 public class ClientUtils {
 
   private ClientUtils() {
     // util class
   }
 
-  public static boolean isHeartbeatClientReady(AsyncClient client) {
-    if (client instanceof AsyncDataHeartbeatClient) {
-      return ((AsyncDataHeartbeatClient) client).isReady();
-    } else {
-      return ((AsyncMetaHeartbeatClient) client).isReady();
-    }
-  }
-
-  public static void putBackSyncHeartbeatClient(Client client) {
-    if (client instanceof SyncMetaHeartbeatClient) {
-      ((SyncMetaHeartbeatClient) client).putBack();
-    } else {
-      ((SyncDataHeartbeatClient) client).putBack();
-    }
-  }
-
-  public static void putBackSyncClient(Client client) {
-    if (client instanceof SyncDataClient) {
-      ((SyncDataClient) client).putBack();
-    } else if (client instanceof SyncMetaClient) {
-      ((SyncMetaClient) client).putBack();
+  public static int getPort(Node node, ClientCategory category) {
+    int port = -1;
+    if (category == ClientCategory.DATA) {
+      port = node.getDataPort();
+    } else if (ClientCategory.DATA_HEARTBEAT == category) {
+      port = node.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET;
+    } else if (ClientCategory.META == category) {
+      port = node.getMetaPort();
+    } else if (ClientCategory.META_HEARTBEAT == category) {
+      port = node.getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET;
+    } else if (ClientCategory.SINGLE_MASTER == category) {
+      // special data port type
+      port = node.getMetaPort();

Review comment:
       Really? I only see it in [DataGroupMember](https://github.com/apache/iotdb/blob/master/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java#L199).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] chengjianyun commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
chengjianyun commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r709864220



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
##########
@@ -0,0 +1,203 @@
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.ClusterIoTDB;
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * One should borrow the reusable client from this class and return the client after use. The
+ * underlying client pool is powered by Apache Commons Pool. The class provided 3 default pool group
+ * according to current usage: ClusterClients, DataGroupClients, MetaGroupClients.
+ *
+ * <p>ClusterClients implement the data query and insert interfaces such as query and non-query
+ * request
+ *
+ * <p>DataGroupClient implement the data group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>MetaGroupClient implement the meta group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>TODO: We can refine the client structure by reorg the interfaces defined in cluster-thrift.
+ */
+public class ClientManager implements IClientPool {
+
+  private static final Logger logger = LoggerFactory.getLogger(ClusterIoTDB.class);
+
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.AsyncClient>> asyncClientPoolMap;
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.Client>> syncClientPoolMap;
+
+  public enum Type {
+    ClusterClient,
+    DataGroupClient,
+    MetaGroupClient
+  }
+
+  public ClientManager(boolean isAsyncMode, Type type) {
+    if (isAsyncMode) {
+      asyncClientPoolMap = Maps.newHashMap();
+      constructAsyncClientMap(type);
+    } else {
+      syncClientPoolMap = Maps.newHashMap();
+      constructSyncClientMap(type);
+    }
+  }
+
+  private void constructAsyncClientMap(Type type) {
+    switch (type) {
+      case ClusterClient:
+        asyncClientPoolMap.put(
+            ClientCategory.DATA,
+            ClientPoolFactory.getInstance().createAsyncDataPool(ClientCategory.DATA));
+        break;
+      case MetaGroupClient:
+        asyncClientPoolMap.put(
+            ClientCategory.META,
+            ClientPoolFactory.getInstance().createAsyncMetaPool(ClientCategory.META));
+        asyncClientPoolMap.put(
+            ClientCategory.META_HEARTBEAT,
+            ClientPoolFactory.getInstance().createAsyncMetaPool(ClientCategory.META_HEARTBEAT));
+        break;
+      case DataGroupClient:
+        asyncClientPoolMap.put(
+            ClientCategory.DATA,
+            ClientPoolFactory.getInstance().createAsyncDataPool(ClientCategory.DATA));
+        asyncClientPoolMap.put(
+            ClientCategory.DATA_HEARTBEAT,
+            ClientPoolFactory.getInstance().createAsyncDataPool(ClientCategory.DATA_HEARTBEAT));
+        asyncClientPoolMap.put(
+            ClientCategory.SINGLE_MASTER,
+            ClientPoolFactory.getInstance().createSingleManagerAsyncDataPool());
+        break;
+      default:
+        logger.warn("unsupported ClientManager type: {}", type);
+        break;
+    }
+  }
+
+  private void constructSyncClientMap(Type type) {
+    switch (type) {
+      case ClusterClient:
+        syncClientPoolMap.put(
+            ClientCategory.DATA,
+            ClientPoolFactory.getInstance().createSyncDataPool(ClientCategory.DATA));
+        break;
+      case MetaGroupClient:
+        syncClientPoolMap.put(
+            ClientCategory.META,
+            ClientPoolFactory.getInstance().createSyncMetaPool(ClientCategory.META));
+        syncClientPoolMap.put(
+            ClientCategory.META_HEARTBEAT,
+            ClientPoolFactory.getInstance().createSyncMetaPool(ClientCategory.META_HEARTBEAT));
+        break;
+      case DataGroupClient:
+        syncClientPoolMap.put(
+            ClientCategory.DATA,
+            ClientPoolFactory.getInstance().createSyncDataPool(ClientCategory.DATA));
+        syncClientPoolMap.put(
+            ClientCategory.DATA_HEARTBEAT,
+            ClientPoolFactory.getInstance().createSyncDataPool(ClientCategory.DATA_HEARTBEAT));
+        break;
+      default:
+        logger.warn("unsupported ClientManager type: {}", type);
+        break;
+    }
+  }
+
+  /**
+   * It's safe to convert: 1. RaftService.AsyncClient to TSDataService.AsyncClient when category is
+   * DATA or DATA_HEARTBEAT; 2. RaftService.AsyncClient to TSMetaService.AsyncClient when category
+   * is META or META_HEARTBEAT.
+   *
+   * @param category
+   * @return RaftService.AsyncClient
+   */
+  @Override
+  public RaftService.AsyncClient borrowAsyncClient(Node node, ClientCategory category) {
+    try {
+      RaftService.AsyncClient client = asyncClientPoolMap.get(category).borrowObject(node);
+      if (ClientCategory.DATA == category
+          || ClientCategory.DATA_HEARTBEAT == category
+          || ClientCategory.SINGLE_MASTER == category) {
+        ((AsyncDataClient) client).setClientPool(this);
+      } else {
+        ((AsyncMetaClient) client).setClientPool(this);
+      }
+      return client;
+    } catch (NullPointerException e) {
+      logger.error("No AsyncClient pool found for {}", category, e);
+    } catch (TException e) {
+      logger.error("AsyncClient transport error for {}", category, e);
+    } catch (Exception e) {
+      logger.error("AsyncClient error for {}", category, e);
+    }
+    return null;
+  }
+
+  /**
+   * It's safe to convert: 1. RaftService.Client to TSDataService.Client when category is DATA or
+   * DATA_HEARTBEAT; 2. RaftService.Client to TSMetaService.Client when category is META or
+   * META_HEARTBEAT.
+   *
+   * @param category
+   * @return RaftService.Client
+   */
+  @Override
+  public RaftService.Client borrowSyncClient(Node node, ClientCategory category) {
+    try {
+      RaftService.Client client = syncClientPoolMap.get(category).borrowObject(node);
+      if (ClientCategory.DATA == category || ClientCategory.DATA_HEARTBEAT == category) {
+        ((SyncDataClient) client).setClientPool(this);

Review comment:
       It's good idea to move the setting into construction of client. The original design is remove pool from client, that's client doesn't need to know about pool. By the design, there are to many changes in the refactor. So I add the pool manager back by `ClientManager`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] LebronAl commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
LebronAl commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r708806492



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
##########
@@ -19,64 +19,142 @@
 
 package org.apache.iotdb.cluster.client.sync;
 
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.IClientPool;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.TSDataService.Client;
+import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TimeoutChangeableTransport;
 
+import org.apache.commons.pool2.KeyedPooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
+import java.net.SocketException;
+
 /**
  * Notice: Because a client will be returned to a pool immediately after a successful request, you
- * should not cache it anywhere else or there may be conflicts.
+ * should not cache it anywhere else.
  */
-// the two classes does not share a common parent and Java does not allow multiple extension
-@SuppressWarnings("common-java:DuplicatedBlocks")
-public class SyncDataClient extends TSDataServiceClient {
+// TODO: Refine the interfaces of TSDataService. TSDataService interfaces doesn't need extends
+// TODO: RaftService interfaces
+public class SyncDataClient extends Client {

Review comment:
       I see! These two `Client` are different! Then I'm ok with that. We can talk about refactoring it later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] chengjianyun commented on pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
chengjianyun commented on pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#issuecomment-920728476


   > Hi, Thanks your contribution.
   > I have some question for this pr.
   > 
   > Check whether the refactoring performance is tested because the pool directly affects the query and insertion performance.
   > Whether rolling restart of a node is supported, which does not affect external query or write.
   
   I have done some basic tests. For advanced tests(such as benchmark compare), I plan to do that once the `cluster-` branch is stable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] chengjianyun commented on pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
chengjianyun commented on pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#issuecomment-921426259


   > The `File Changed` tab gets stuck...
   > 
   > In SyncDataClient.java, what does `activateObject()` means? why it just sets the timeout value? @LebronAl ?
   
   `activateObject()` will be revoke before return object to someone (try to borrow object).  Some user may set the customized timeout, so reset it when give the client out (Or a more reasonable place is `passivateObject()` which will be revoke before object is returned to pool). Actually, the user who is using the client should reset the value, object pool does this for double check.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] LebronAl commented on pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
LebronAl commented on pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#issuecomment-921430285


   Hi, Please add licenses for this files
   ![image](https://user-images.githubusercontent.com/32640567/133720406-18a40cb1-7c65-4b76-8a28-8d9116e65c90.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] neuyilan commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
neuyilan commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r710828756



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.ClusterIoTDB;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import com.google.common.collect.Maps;
+import com.sun.istack.Nullable;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * One should borrow the reusable client from this class and return the client after use. The
+ * underlying client pool is powered by Apache Commons Pool. The class provided 3 default pool group
+ * according to current usage: ClusterClients, DataGroupClients, MetaGroupClients.
+ *
+ * <p>ClusterClients implement the data query and insert interfaces such as query and non-query
+ * request
+ *
+ * <p>DataGroupClient implement the data group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>MetaGroupClient implement the meta group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>TODO: We can refine the client structure by reorg the interfaces defined in cluster-thrift.
+ */
+public class ClientManager implements IClientManager {
+
+  private static final Logger logger = LoggerFactory.getLogger(ClusterIoTDB.class);

Review comment:
       ```suggestion
     private static final Logger logger = LoggerFactory.getLogger(ClientManager.class);
   ```

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
##########
@@ -0,0 +1,180 @@
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.ClusterIoTDB;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * One should borrow the reusable client from this class and return the client after use. The
+ * underlying client pool is powered by Apache Commons Pool. The class provided 3 default pool group
+ * according to current usage: ClusterClients, DataGroupClients, MetaGroupClients.
+ *
+ * <p>ClusterClients implement the data query and insert interfaces such as query and non-query
+ * request
+ *
+ * <p>DataGroupClient implement the data group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>MetaGroupClient implement the meta group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>TODO: We can refine the client structure by reorg the interfaces defined in cluster-thrift.
+ */
+public class ClientManager implements IClientManager {
+
+  private static final Logger logger = LoggerFactory.getLogger(ClusterIoTDB.class);
+
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.AsyncClient>> asyncClientPoolMap;
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.Client>> syncClientPoolMap;
+  private ClientPoolFactory clientPoolFactory;
+
+  public enum Type {
+    ClusterClient,
+    DataGroupClient,
+    MetaGroupClient
+  }
+
+  public ClientManager(boolean isAsyncMode, Type type) {
+    clientPoolFactory = new ClientPoolFactory();
+    clientPoolFactory.setClientManager(this);
+    if (isAsyncMode) {
+      asyncClientPoolMap = Maps.newHashMap();
+      constructAsyncClientMap(type);
+    } else {
+      syncClientPoolMap = Maps.newHashMap();
+      constructSyncClientMap(type);
+    }
+  }
+
+  private void constructAsyncClientMap(Type type) {
+    switch (type) {
+      case ClusterClient:
+        asyncClientPoolMap.put(
+            ClientCategory.DATA, clientPoolFactory.createAsyncDataPool(ClientCategory.DATA));
+        break;
+      case MetaGroupClient:
+        asyncClientPoolMap.put(
+            ClientCategory.META,
+            clientPoolFactory.createAsyncMetaPool(ClientCategory.META));
+        asyncClientPoolMap.put(
+            ClientCategory.META_HEARTBEAT,
+            clientPoolFactory.createAsyncMetaPool(ClientCategory.META_HEARTBEAT));
+        break;
+      case DataGroupClient:
+        asyncClientPoolMap.put(
+            ClientCategory.DATA, clientPoolFactory.createAsyncDataPool(ClientCategory.DATA));
+        asyncClientPoolMap.put(
+            ClientCategory.DATA_HEARTBEAT,
+            clientPoolFactory.createAsyncDataPool(ClientCategory.DATA_HEARTBEAT));
+        asyncClientPoolMap.put(
+            ClientCategory.SINGLE_MASTER, clientPoolFactory.createSingleManagerAsyncDataPool());
+        break;
+      default:
+        logger.warn("unsupported ClientManager type: {}", type);
+        break;
+    }
+  }
+
+  private void constructSyncClientMap(Type type) {
+    switch (type) {
+      case ClusterClient:
+        syncClientPoolMap.put(
+            ClientCategory.DATA, clientPoolFactory.createSyncDataPool(ClientCategory.DATA));

Review comment:
       +1,
   The same as `constructAsyncClientMap`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] LebronAl commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
LebronAl commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r707980449



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
##########
@@ -0,0 +1,201 @@
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.ClusterIoTDB;
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * One should borrow the reusable client from this class and return the client after use. The
+ * underlying client pool is powered by Apache Commons Pool. The class provided 3 default pool group
+ * according to current usage: ClusterClients, DataGroupClients, MetaGroupClients.
+ *
+ * <p>ClusterClients implement the data query and insert interfaces such as query and non-query
+ * request
+ *
+ * <p>DataGroupClient implement the data group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>MetaGroupClient implement the meta group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>TODO: We can refine the client structure by reorg the interfaces defined in cluster-thrift.
+ */
+public class ClientManager implements IClientPool {
+
+  private static final Logger logger = LoggerFactory.getLogger(ClusterIoTDB.class);
+
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.AsyncClient>> asyncClientPoolMap;
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.Client>> syncClientPoolMap;
+
+  public enum Type {
+    ClusterClient,
+    DataGroupClient,
+    MetaGroupClient
+  }
+
+  public ClientManager(boolean isAsyncMode, Type type) {
+    if (isAsyncMode) {
+      asyncClientPoolMap = Maps.newHashMap();
+      constructAsyncClientMap(type);
+    } else {
+      syncClientPoolMap = Maps.newHashMap();
+      constructSyncClientMap(type);
+    }
+  }
+
+  private void constructAsyncClientMap(Type type) {
+    switch (type) {
+      case ClusterClient:
+        asyncClientPoolMap.put(
+            ClientCategory.DATA,
+            ClientPoolFactory.getInstance().createAsyncDataPool(ClientCategory.DATA));
+        break;
+      case MetaGroupClient:
+        asyncClientPoolMap.put(
+            ClientCategory.META,
+            ClientPoolFactory.getInstance().createAsyncMetaPool(ClientCategory.META));
+        asyncClientPoolMap.put(
+            ClientCategory.META_HEARTBEAT,
+            ClientPoolFactory.getInstance().createAsyncMetaPool(ClientCategory.META_HEARTBEAT));
+        break;
+      case DataGroupClient:
+        asyncClientPoolMap.put(
+            ClientCategory.DATA,
+            ClientPoolFactory.getInstance().createAsyncDataPool(ClientCategory.DATA));
+        asyncClientPoolMap.put(
+            ClientCategory.DATA_HEARTBEAT,
+            ClientPoolFactory.getInstance().createAsyncDataPool(ClientCategory.DATA_HEARTBEAT));
+        asyncClientPoolMap.put(
+            ClientCategory.SINGLE_MASTER,
+            ClientPoolFactory.getInstance().createSingleManagerAsyncDataPool());
+        break;
+      default:
+        break;

Review comment:
       maybe a warn log?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
##########
@@ -119,15 +123,13 @@
 
   private boolean allowReport = true;
 
-  /**
-   * hardLinkCleaner will periodically clean expired hardlinks created during snapshots
-   */
+  /** hardLinkCleaner will periodically clean expired hardlinks created during snapshots */
   private ScheduledExecutorService hardLinkCleanerThread;
 
   // currently, dataClientProvider is only used for those instances who do not belong to any

Review comment:
       update comments

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
##########
@@ -19,64 +19,142 @@
 
 package org.apache.iotdb.cluster.client.sync;
 
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.IClientPool;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.TSDataService.Client;
+import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TimeoutChangeableTransport;
 
+import org.apache.commons.pool2.KeyedPooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
+import java.net.SocketException;
+
 /**
  * Notice: Because a client will be returned to a pool immediately after a successful request, you
- * should not cache it anywhere else or there may be conflicts.
+ * should not cache it anywhere else.
  */
-// the two classes does not share a common parent and Java does not allow multiple extension
-@SuppressWarnings("common-java:DuplicatedBlocks")
-public class SyncDataClient extends TSDataServiceClient {
+// TODO: Refine the interfaces of TSDataService. TSDataService interfaces doesn't need extends
+// TODO: RaftService interfaces
+public class SyncDataClient extends Client {
+
+  private Node node;
+  private ClientCategory category;
+  private IClientPool clientPool;
 
-  /** @param prot this constructor just create a new instance, but do not open the connection */
   @TestOnly
   public SyncDataClient(TProtocol prot) {
     super(prot);
   }
 
-  SyncDataClient(TProtocolFactory protocolFactory, Node target, SyncClientPool pool)
+  public SyncDataClient(TProtocolFactory protocolFactory, Node node, ClientCategory category)
       throws TTransportException {
+
+    // the difference of the two clients lies in the port
     super(
-        protocolFactory,
-        target.getInternalIp(),
-        target.getDataPort(),
-        ClusterConstant.getConnectionTimeoutInMS(),
-        target,
-        pool);
+        protocolFactory.getProtocol(
+            RpcTransportFactory.INSTANCE.getTransport(
+                new TSocket(
+                    TConfigurationConst.defaultTConfiguration,
+                    node.getInternalIp(),
+                    ClientUtils.getPort(node, category),
+                    ClusterConstant.getConnectionTimeoutInMS()))));
+    this.node = node;
+    this.category = category;
+    getInputProtocol().getTransport().open();
+  }
+
+  public void setClientPool(IClientPool clientPool) {
+    this.clientPool = clientPool;
+  }
+
+  public void returnSelf() {
+    if (clientPool != null) clientPool.returnSyncClient(this, node, category);
+  }
+
+  public void setTimeout(int timeout) {
+    // the same transport is used in both input and output
+    ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
+  }
+
+  public void close() {
+    getInputProtocol().getTransport().close();
+  }
+
+  @TestOnly
+  public int getTimeout() throws SocketException {
+    return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
   }
 
   @Override
   public String toString() {
-    return String.format(
-        "SyncDataClient (ip = %s, port = %d, id = %d)",
-        target.getInternalIp(), target.getDataPort(), target.getNodeIdentifier());
+    return "Sync"
+        + category.getName()
+        + "{"
+        + "node="
+        + node
+        + ","
+        + "port="
+        + ClientUtils.getPort(node, category)
+        + '}';
+  }
+
+  public Node getNode() {
+    return node;
   }
 
-  public static class Factory implements SyncClientFactory {
+  public static class SyncDataClientFactory
+      implements KeyedPooledObjectFactory<Node, SyncDataClient> {
 
     private TProtocolFactory protocolFactory;
+    private ClientCategory category;
 
-    public Factory(TProtocolFactory protocolFactory) {
+    public SyncDataClientFactory(TProtocolFactory protocolFactory, ClientCategory category) {
       this.protocolFactory = protocolFactory;
+      this.category = category;
+    }
+
+    //  public String nodeInfo(Node node) {
+    //    return String.format(
+    //        "MetaNode (listenIp = %s, HB port = %d, id = %d)",
+    //        node.getInternalIp(),
+    //        node.getMetaPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
+    //        node.getNodeIdentifier());
+    //  }
+
+    @Override
+    public void activateObject(Node node, PooledObject<SyncDataClient> pooledObject)

Review comment:
       Do we have to override these functions with empty implementation?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientPoolFactory.java
##########
@@ -0,0 +1,89 @@
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+
+import java.time.Duration;
+
+public class ClientPoolFactory {
+
+  protected long waitClientTimeoutMS;
+  protected int maxConnectionForEachNode;
+  private TProtocolFactory protocolFactory;
+
+  ClientPoolFactory() {
+    ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
+    this.waitClientTimeoutMS = config.getWaitClientTimeoutMS();
+    this.maxConnectionForEachNode = config.getMaxClientPerNodePerMember();
+    protocolFactory =
+        config.isRpcThriftCompressionEnabled()
+            ? new TCompactProtocol.Factory()
+            : new TBinaryProtocol.Factory();
+  }
+
+  public static ClientPoolFactory getInstance() {
+    return ClientPoolProviderHolder.INSTANCE;
+  }
+
+  public GenericKeyedObjectPool<Node, RaftService.Client> createSyncDataPool(
+      ClientCategory category) {
+    GenericKeyedObjectPoolConfig config = new GenericKeyedObjectPoolConfig();

Review comment:
       make this config a `field` of  `ClientPoolFactory.java`?

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReaderTest.java
##########
@@ -182,100 +174,103 @@ public void testDefaultBatchStrategySelect() {
   }
 
   private void setAsyncDataClient() {
-    ClusterIoTDB.getInstance()
-        .setClientProvider(
-            new DataClientProvider(new Factory()) {
-              @Override
-              public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
-                return new AsyncDataClient(null, null, node, null) {
-                  @Override
-                  public void fetchMultSeries(
-                      RaftNode header,
-                      long readerId,
-                      List<String> paths,
-                      AsyncMethodCallback<Map<String, ByteBuffer>> resultHandler)
-                      throws TException {
-                    if (failedNodes.contains(node)) {
-                      throw new TException("Node down.");
-                    }
-
-                    new Thread(
-                            () -> {
-                              Map<String, ByteBuffer> stringByteBufferMap = Maps.newHashMap();
-                              if (batchUsed) {
-                                paths.forEach(
-                                    path -> {
-                                      stringByteBufferMap.put(path, ByteBuffer.allocate(0));
-                                    });
-                              } else {
-                                batchUsed = true;
-
-                                for (int i = 0; i < batchData.size(); i++) {
-                                  stringByteBufferMap.put(
-                                      paths.get(i), generateByteBuffer(batchData.get(i)));
-                                }
-
-                                resultHandler.onComplete(stringByteBufferMap);
-                              }
-                            })
-                        .start();
-                  }
-
-                  @Override
-                  public void queryMultSeries(
-                      MultSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler)
-                      throws TException {
-                    if (failedNodes.contains(node)) {
-                      throw new TException("Node down.");
-                    }
-
-                    new Thread(() -> resultHandler.onComplete(1L)).start();
-                  }
-                };
-              }
-            });
+    //    ClusterIoTDB.getInstance()
+    //        .setClientProvider(
+    //            new DataClientProvider(new Factory()) {
+    //              @Override
+    //              public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws
+    // IOException {
+    //                return new AsyncDataClient(null, null, node, null) {
+    //                  @Override
+    //                  public void fetchMultSeries(
+    //                      RaftNode header,
+    //                      long readerId,
+    //                      List<String> paths,
+    //                      AsyncMethodCallback<Map<String, ByteBuffer>> resultHandler)
+    //                      throws TException {
+    //                    if (failedNodes.contains(node)) {
+    //                      throw new TException("Node down.");
+    //                    }
+    //
+    //                    new Thread(
+    //                            () -> {
+    //                              Map<String, ByteBuffer> stringByteBufferMap = Maps.newHashMap();
+    //                              if (batchUsed) {
+    //                                paths.forEach(
+    //                                    path -> {
+    //                                      stringByteBufferMap.put(path, ByteBuffer.allocate(0));
+    //                                    });
+    //                              } else {
+    //                                batchUsed = true;
+    //
+    //                                for (int i = 0; i < batchData.size(); i++) {
+    //                                  stringByteBufferMap.put(
+    //                                      paths.get(i), generateByteBuffer(batchData.get(i)));
+    //                                }
+    //
+    //                                resultHandler.onComplete(stringByteBufferMap);
+    //                              }
+    //                            })
+    //                        .start();
+    //                  }
+    //
+    //                  @Override
+    //                  public void queryMultSeries(
+    //                      MultSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler)
+    //                      throws TException {
+    //                    if (failedNodes.contains(node)) {
+    //                      throw new TException("Node down.");
+    //                    }
+    //
+    //                    new Thread(() -> resultHandler.onComplete(1L)).start();
+    //                  }
+    //                };
+    //              }
+    //            });
   }
 
   private void setSyncDataClient() {
-    ClusterIoTDB.getInstance()
-        .setClientProvider(
-            new DataClientProvider(new Factory()) {
-              @Override
-              public SyncDataClient getSyncDataClient(Node node, int timeout) {
-                return new SyncDataClient(null) {
-                  @Override
-                  public Map<String, ByteBuffer> fetchMultSeries(
-                      RaftNode header, long readerId, List<String> paths) throws TException {
-                    if (failedNodes.contains(node)) {
-                      throw new TException("Node down.");
-                    }
-
-                    Map<String, ByteBuffer> stringByteBufferMap = Maps.newHashMap();
-                    if (batchUsed) {
-                      paths.forEach(
-                          path -> {
-                            stringByteBufferMap.put(path, ByteBuffer.allocate(0));
-                          });
-                    } else {
-                      batchUsed = true;
-                      for (int i = 0; i < batchData.size(); i++) {
-                        stringByteBufferMap.put(paths.get(i), generateByteBuffer(batchData.get(i)));
-                      }
-                    }
-                    return stringByteBufferMap;
-                  }
-
-                  @Override
-                  public long queryMultSeries(MultSeriesQueryRequest request) throws TException {
-                    if (failedNodes.contains(node)) {
-                      throw new TException("Node down.");
-                    }
-
-                    return 1L;
-                  }
-                };
-              }
-            });
+    //    ClusterIoTDB.getInstance()
+    //        .setClientProvider(
+    //            new DataClientProvider(new Factory()) {
+    //              @Override
+    //              public SyncDataClient getSyncDataClient(Node node, int timeout) {
+    //                return new SyncDataClient(null) {
+    //                  @Override
+    //                  public Map<String, ByteBuffer> fetchMultSeries(
+    //                      RaftNode header, long readerId, List<String> paths) throws TException {
+    //                    if (failedNodes.contains(node)) {
+    //                      throw new TException("Node down.");
+    //                    }
+    //
+    //                    Map<String, ByteBuffer> stringByteBufferMap = Maps.newHashMap();
+    //                    if (batchUsed) {
+    //                      paths.forEach(
+    //                          path -> {
+    //                            stringByteBufferMap.put(path, ByteBuffer.allocate(0));
+    //                          });
+    //                    } else {
+    //                      batchUsed = true;
+    //                      for (int i = 0; i < batchData.size(); i++) {
+    //                        stringByteBufferMap.put(paths.get(i),
+    // generateByteBuffer(batchData.get(i)));
+    //                      }
+    //                    }

Review comment:
       same

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java.orig
##########
@@ -0,0 +1,143 @@
+/*
+ *   * Licensed to the Apache Software Foundation (ASF) under one  * or more contributor license agreements.  See the NOTICE file  * distributed with this work for additional information  * regarding copyright ownership.  The ASF licenses this file  * to you under the Apache License, Version 2.0 (the  * "License"); you may not use this file except in compliance  * with the License.  You may obtain a copy of the License at  *  *     http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing,  * software distributed under the License is distributed on an  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY  * KIND, either express or implied.  See the License for the  * specific language governing permissions and limitations  * under the License.
+ */
+
+package org.apache.iotdb.cluster.client.async;
+
+<<<<<<< HEAD

Review comment:
       what's this?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/utils/ClientUtils.java
##########
@@ -19,52 +19,42 @@
 
 package org.apache.iotdb.cluster.utils;
 
-import org.apache.iotdb.cluster.client.async.AsyncDataClient;
-import org.apache.iotdb.cluster.client.async.AsyncDataHeartbeatClient;
-import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
-import org.apache.iotdb.cluster.client.async.AsyncMetaHeartbeatClient;
+import org.apache.iotdb.cluster.client.ClientCategory;
 import org.apache.iotdb.cluster.client.sync.SyncDataClient;
-import org.apache.iotdb.cluster.client.sync.SyncDataHeartbeatClient;
 import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
-import org.apache.iotdb.cluster.client.sync.SyncMetaHeartbeatClient;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
 
 public class ClientUtils {
 
   private ClientUtils() {
     // util class
   }
 
-  public static boolean isHeartbeatClientReady(AsyncClient client) {
-    if (client instanceof AsyncDataHeartbeatClient) {
-      return ((AsyncDataHeartbeatClient) client).isReady();
-    } else {
-      return ((AsyncMetaHeartbeatClient) client).isReady();
-    }
-  }
-
-  public static void putBackSyncHeartbeatClient(Client client) {
-    if (client instanceof SyncMetaHeartbeatClient) {
-      ((SyncMetaHeartbeatClient) client).putBack();
-    } else {
-      ((SyncDataHeartbeatClient) client).putBack();
-    }
-  }
-
-  public static void putBackSyncClient(Client client) {
-    if (client instanceof SyncDataClient) {
-      ((SyncDataClient) client).putBack();
-    } else if (client instanceof SyncMetaClient) {
-      ((SyncMetaClient) client).putBack();
+  public static int getPort(Node node, ClientCategory category) {
+    int port = -1;
+    if (category == ClientCategory.DATA) {
+      port = node.getDataPort();
+    } else if (ClientCategory.DATA_HEARTBEAT == category) {
+      port = node.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET;
+    } else if (ClientCategory.META == category) {
+      port = node.getMetaPort();
+    } else if (ClientCategory.META_HEARTBEAT == category) {
+      port = node.getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET;
+    } else if (ClientCategory.SINGLE_MASTER == category) {
+      // special data port type
+      port = node.getMetaPort();

Review comment:
       should be `node.getDataPort()` ?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
##########
@@ -0,0 +1,201 @@
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.ClusterIoTDB;
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * One should borrow the reusable client from this class and return the client after use. The
+ * underlying client pool is powered by Apache Commons Pool. The class provided 3 default pool group
+ * according to current usage: ClusterClients, DataGroupClients, MetaGroupClients.
+ *
+ * <p>ClusterClients implement the data query and insert interfaces such as query and non-query
+ * request
+ *
+ * <p>DataGroupClient implement the data group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>MetaGroupClient implement the meta group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>TODO: We can refine the client structure by reorg the interfaces defined in cluster-thrift.
+ */
+public class ClientManager implements IClientPool {
+
+  private static final Logger logger = LoggerFactory.getLogger(ClusterIoTDB.class);
+
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.AsyncClient>> asyncClientPoolMap;
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.Client>> syncClientPoolMap;
+
+  public enum Type {
+    ClusterClient,
+    DataGroupClient,
+    MetaGroupClient
+  }
+
+  public ClientManager(boolean isAsyncMode, Type type) {
+    if (isAsyncMode) {
+      asyncClientPoolMap = Maps.newHashMap();
+      constructAsyncClientMap(type);
+    } else {
+      syncClientPoolMap = Maps.newHashMap();
+      constructSyncClientMap(type);
+    }
+  }
+
+  private void constructAsyncClientMap(Type type) {
+    switch (type) {
+      case ClusterClient:
+        asyncClientPoolMap.put(
+            ClientCategory.DATA,
+            ClientPoolFactory.getInstance().createAsyncDataPool(ClientCategory.DATA));
+        break;
+      case MetaGroupClient:
+        asyncClientPoolMap.put(
+            ClientCategory.META,
+            ClientPoolFactory.getInstance().createAsyncMetaPool(ClientCategory.META));
+        asyncClientPoolMap.put(
+            ClientCategory.META_HEARTBEAT,
+            ClientPoolFactory.getInstance().createAsyncMetaPool(ClientCategory.META_HEARTBEAT));
+        break;
+      case DataGroupClient:
+        asyncClientPoolMap.put(
+            ClientCategory.DATA,
+            ClientPoolFactory.getInstance().createAsyncDataPool(ClientCategory.DATA));
+        asyncClientPoolMap.put(
+            ClientCategory.DATA_HEARTBEAT,
+            ClientPoolFactory.getInstance().createAsyncDataPool(ClientCategory.DATA_HEARTBEAT));
+        asyncClientPoolMap.put(
+            ClientCategory.SINGLE_MASTER,
+            ClientPoolFactory.getInstance().createSingleManagerAsyncDataPool());
+        break;
+      default:
+        break;
+    }
+  }
+
+  private void constructSyncClientMap(Type type) {
+    switch (type) {
+      case ClusterClient:
+        syncClientPoolMap.put(
+            ClientCategory.DATA,
+            ClientPoolFactory.getInstance().createSyncDataPool(ClientCategory.DATA));
+        break;
+      case MetaGroupClient:
+        syncClientPoolMap.put(
+            ClientCategory.META,
+            ClientPoolFactory.getInstance().createSyncMetaPool(ClientCategory.META));
+        syncClientPoolMap.put(
+            ClientCategory.META_HEARTBEAT,
+            ClientPoolFactory.getInstance().createSyncMetaPool(ClientCategory.META_HEARTBEAT));
+        break;
+      case DataGroupClient:
+        syncClientPoolMap.put(
+            ClientCategory.DATA,
+            ClientPoolFactory.getInstance().createSyncDataPool(ClientCategory.DATA));
+        syncClientPoolMap.put(
+            ClientCategory.DATA_HEARTBEAT,
+            ClientPoolFactory.getInstance().createSyncDataPool(ClientCategory.DATA_HEARTBEAT));
+        break;
+      default:
+        break;

Review comment:
       same as above

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
##########
@@ -19,64 +19,142 @@
 
 package org.apache.iotdb.cluster.client.sync;
 
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.IClientPool;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.TSDataService.Client;
+import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TimeoutChangeableTransport;
 
+import org.apache.commons.pool2.KeyedPooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
+import java.net.SocketException;
+
 /**
  * Notice: Because a client will be returned to a pool immediately after a successful request, you
- * should not cache it anywhere else or there may be conflicts.
+ * should not cache it anywhere else.
  */
-// the two classes does not share a common parent and Java does not allow multiple extension
-@SuppressWarnings("common-java:DuplicatedBlocks")
-public class SyncDataClient extends TSDataServiceClient {
+// TODO: Refine the interfaces of TSDataService. TSDataService interfaces doesn't need extends
+// TODO: RaftService interfaces
+public class SyncDataClient extends Client {

Review comment:
       Since you have done a great abstraction, can we merge `SyncDataClient` and `SyncMetaClient` into one class? It seems their difference only lies on `category` now?

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaClientTest.java.orig
##########
@@ -0,0 +1,165 @@
+/*
+ *   * Licensed to the Apache Software Foundation (ASF) under one  * or more contributor license agreements.  See the NOTICE file  * distributed with this work for additional information  * regarding copyright ownership.  The ASF licenses this file  * to you under the Apache License, Version 2.0 (the  * "License"); you may not use this file except in compliance  * with the License.  You may obtain a copy of the License at  *  *     http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing,  * software distributed under the License is distributed on an  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY  * KIND, either express or implied.  See the License for the  * specific language governing permissions and limitations  * under the License.
+ */
+
+package org.apache.iotdb.cluster.client.sync;
+
+<<<<<<< HEAD
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient.Factory;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.rpc.TSocketWrapper;
+
+import org.apache.thrift.protocol.TBinaryProtocol;
+=======
+import org.apache.iotdb.cluster.client.BaseClientTest;
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.Assert;
+import org.junit.Before;
+>>>>>>> [cluster-refactor] refactor client pool
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.SocketException;
+
+import static org.junit.Assert.assertEquals;
+
+public class SyncMetaClientTest extends BaseClientTest {
+
+  private TProtocolFactory protocolFactory;
+
+  @Before
+  public void setUp() {
+    protocolFactory =
+        ClusterDescriptor.getInstance().getConfig().isRpcThriftCompressionEnabled()
+            ? new TCompactProtocol.Factory()
+            : new TBinaryProtocol.Factory();
+  }
+
+  @Test
+  public void testMetaClient() throws IOException, InterruptedException, TTransportException {
+    try {
+<<<<<<< HEAD
+      SyncClientPool syncClientPool =
+          new SyncClientPool(new Factory(new TBinaryProtocol.Factory()));
+      SyncMetaClient client;
+      client = (SyncMetaClient) syncClientPool.getClient(node);
+
+      assertEquals(node, client.getTarget());
+
+      client.putBack();
+      Client newClient = syncClientPool.getClient(node);
+      assertEquals(client, newClient);
+      assertTrue(client.getInputProtocol().getTransport().isOpen());
+
+      client =
+          new SyncMetaClient(
+              new TBinaryProtocol(TSocketWrapper.wrap(node.getInternalIp(), node.getDataPort())));
+      // client without a belong pool will be closed after putBack()
+      client.putBack();
+      assertFalse(client.getInputProtocol().getTransport().isOpen());
+=======
+      startMetaServer();
+      SyncMetaClient metaClient =
+          new SyncMetaClient(protocolFactory, defaultNode, ClientCategory.META);
+
+      assertEquals(
+          "SyncMetaClient{node=Node(internalIp:localhost, metaPort:9003, nodeIdentifier:0, "
+              + "dataPort:40010, clientPort:0, clientIp:localhost),port=9003}",
+          metaClient.toString());
+
+      assertCheck(metaClient);
+
+      metaClient =
+          new SyncMetaClient.SyncMetaClientFactory(protocolFactory, ClientCategory.META)
+              .makeObject(defaultNode)
+              .getObject();
+
+      assertEquals(
+          "SyncMetaClient{node=Node(internalIp:localhost, metaPort:9003, nodeIdentifier:0, "
+              + "dataPort:40010, clientPort:0, clientIp:localhost),port=9003}",
+          metaClient.toString());
+
+      assertCheck(metaClient);
+    } catch (Exception e) {
+      e.printStackTrace();
+>>>>>>> [cluster-refactor] refactor client pool
+    } finally {
+      stopMetaServer();
+    }
+  }
+
+  @Test
+  public void testDataHeartbeatClient()
+      throws IOException, InterruptedException, TTransportException {
+    try {
+<<<<<<< HEAD

Review comment:
       same

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
##########
@@ -19,64 +19,124 @@
 
 package org.apache.iotdb.cluster.client.sync;
 
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.IClientPool;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Client;
+import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TimeoutChangeableTransport;
 
-import org.apache.thrift.protocol.TProtocol;
+import org.apache.commons.pool2.KeyedPooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
+import java.net.SocketException;
+
 /**
  * Notice: Because a client will be returned to a pool immediately after a successful request, you
- * should not cache it anywhere else or there may be conflicts.
+ * should not cache it anywhere else.
  */
-// the two classes does not share a common parent and Java does not allow multiple extension
-@SuppressWarnings("common-java:DuplicatedBlocks")
-public class SyncMetaClient extends TSMetaServiceClient {
+public class SyncMetaClient extends Client {
 
-  /** @param prot this constructor just create a new instance, but do not open the connection */
-  @TestOnly
-  SyncMetaClient(TProtocol prot) {
-    super(prot);
-  }
+  private Node node;
+  private ClientCategory category;
+  private IClientPool clientPool;
 
-  private SyncMetaClient(TProtocolFactory protocolFactory, Node target, SyncClientPool pool)
+  public SyncMetaClient(TProtocolFactory protocolFactory, Node node, ClientCategory category)
       throws TTransportException {
     super(
-        protocolFactory,
-        target.getInternalIp(),
-        target.getMetaPort(),
-        ClusterConstant.getConnectionTimeoutInMS(),
-        target,
-        pool);
+        protocolFactory.getProtocol(
+            RpcTransportFactory.INSTANCE.getTransport(
+                new TSocket(
+                    TConfigurationConst.defaultTConfiguration,
+                    node.getInternalIp(),
+                    ClientUtils.getPort(node, category),
+                    ClusterConstant.getConnectionTimeoutInMS()))));
+    this.node = node;
+    this.category = category;
+    getInputProtocol().getTransport().open();
+  }
+
+  public void setClientPool(IClientPool clientPool) {
+    this.clientPool = clientPool;
+  }
+
+  public void returnSelf() {
+    if (clientPool != null) clientPool.returnSyncClient(this, node, category);
+  }
+
+  public void setTimeout(int timeout) {
+    // the same transport is used in both input and output
+    ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
+  }
+
+  @TestOnly
+  public int getTimeout() throws SocketException {
+    return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
+  }
+
+  public void close() {
+    getInputProtocol().getTransport().close();
+  }
+
+  public Node getNode() {
+    return node;
   }
 
   @Override
   public String toString() {
-    return String.format(
-        "SyncMetaClient (ip = %s, port = %d, id = %d)",
-        target.getInternalIp(), target.getMetaPort(), target.getNodeIdentifier());
+    return "Sync"
+        + category.getName()
+        + "{"
+        + "node="
+        + node
+        + ","
+        + "port="
+        + ClientUtils.getPort(node, category)
+        + '}';
   }
 
-  public static class Factory implements SyncClientFactory {
+  public static class SyncMetaClientFactory
+      implements KeyedPooledObjectFactory<Node, SyncMetaClient> {
 
     private TProtocolFactory protocolFactory;
+    private ClientCategory category;
 
-    public Factory(TProtocolFactory protocolFactory) {
+    public SyncMetaClientFactory(TProtocolFactory protocolFactory, ClientCategory category) {
       this.protocolFactory = protocolFactory;
+      this.category = category;
     }
 
     @Override
-    public SyncMetaClient getSyncClient(Node node, SyncClientPool pool) throws TTransportException {
-      return new SyncMetaClient(protocolFactory, node, pool);
+    public void activateObject(Node node, PooledObject<SyncMetaClient> pooledObject)
+        throws Exception {}
+
+    @Override
+    public void destroyObject(Node node, PooledObject<SyncMetaClient> pooledObject)
+        throws Exception {
+      pooledObject.getObject().getInputProtocol().getTransport().close();

Review comment:
       why not `pooledObject.getObject().close();` ?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/utils/ClientUtils.java
##########
@@ -19,52 +19,42 @@
 
 package org.apache.iotdb.cluster.utils;
 
-import org.apache.iotdb.cluster.client.async.AsyncDataClient;
-import org.apache.iotdb.cluster.client.async.AsyncDataHeartbeatClient;
-import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
-import org.apache.iotdb.cluster.client.async.AsyncMetaHeartbeatClient;
+import org.apache.iotdb.cluster.client.ClientCategory;
 import org.apache.iotdb.cluster.client.sync.SyncDataClient;
-import org.apache.iotdb.cluster.client.sync.SyncDataHeartbeatClient;
 import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
-import org.apache.iotdb.cluster.client.sync.SyncMetaHeartbeatClient;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
 
 public class ClientUtils {
 
   private ClientUtils() {
     // util class
   }
 
-  public static boolean isHeartbeatClientReady(AsyncClient client) {
-    if (client instanceof AsyncDataHeartbeatClient) {
-      return ((AsyncDataHeartbeatClient) client).isReady();
-    } else {
-      return ((AsyncMetaHeartbeatClient) client).isReady();
-    }
-  }
-
-  public static void putBackSyncHeartbeatClient(Client client) {
-    if (client instanceof SyncMetaHeartbeatClient) {
-      ((SyncMetaHeartbeatClient) client).putBack();
-    } else {
-      ((SyncDataHeartbeatClient) client).putBack();
-    }
-  }
-
-  public static void putBackSyncClient(Client client) {
-    if (client instanceof SyncDataClient) {
-      ((SyncDataClient) client).putBack();
-    } else if (client instanceof SyncMetaClient) {
-      ((SyncMetaClient) client).putBack();
+  public static int getPort(Node node, ClientCategory category) {
+    int port = -1;
+    if (category == ClientCategory.DATA) {

Review comment:
       Although it's trivial, I'm curious that why you do not use `switch`

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataClientTest.java.orig
##########
@@ -0,0 +1,165 @@
+/*
+ *   * Licensed to the Apache Software Foundation (ASF) under one  * or more contributor license agreements.  See the NOTICE file  * distributed with this work for additional information  * regarding copyright ownership.  The ASF licenses this file  * to you under the Apache License, Version 2.0 (the  * "License"); you may not use this file except in compliance  * with the License.  You may obtain a copy of the License at  *  *     http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing,  * software distributed under the License is distributed on an  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY  * KIND, either express or implied.  See the License for the  * specific language governing permissions and limitations  * under the License.
+ */
+
+package org.apache.iotdb.cluster.client.sync;
+
+<<<<<<< HEAD
+import org.apache.iotdb.cluster.client.sync.SyncDataClient.Factory;
+import org.apache.iotdb.cluster.rpc.thrift.Node;

Review comment:
       same

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReaderTest.java
##########
@@ -73,51 +62,55 @@ public void setUp() {
     batchUsed = false;
     metaGroupMember = new TestMetaGroupMember();
     // TODO fixme : 恢复正常的provider
-    ClusterIoTDB.getInstance()
-        .setClientProvider(
-            new DataClientProvider(new Factory()) {
-              @Override
-              public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
-                return new AsyncDataClient(null, null, node, null) {
-                  @Override
-                  public void fetchSingleSeries(
-                      RaftNode header, long readerId, AsyncMethodCallback<ByteBuffer> resultHandler)
-                      throws TException {
-                    if (failedNodes.contains(node)) {
-                      throw new TException("Node down.");
-                    }
-
-                    new Thread(
-                            () -> {
-                              if (batchUsed) {
-                                resultHandler.onComplete(ByteBuffer.allocate(0));
-                              } else {
-                                ByteArrayOutputStream byteArrayOutputStream =
-                                    new ByteArrayOutputStream();
-                                DataOutputStream dataOutputStream =
-                                    new DataOutputStream(byteArrayOutputStream);
-                                SerializeUtils.serializeBatchData(batchData, dataOutputStream);
-                                batchUsed = true;
-                                resultHandler.onComplete(
-                                    ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
-                              }
-                            })
-                        .start();
-                  }
-
-                  @Override
-                  public void querySingleSeries(
-                      SingleSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler)
-                      throws TException {
-                    if (failedNodes.contains(node)) {
-                      throw new TException("Node down.");
-                    }
-
-                    new Thread(() -> resultHandler.onComplete(1L)).start();
-                  }
-                };
-              }
-            });
+    //    ClusterIoTDB.getInstance()

Review comment:
       Should we remove these codes? What does this mean?

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathManagedMergeReaderTest.java
##########
@@ -129,58 +120,59 @@ public void testMultManagerMergeRemoteSeriesReader() throws IOException, Storage
   }
 
   private void setAsyncDataClient() {
-    ClusterIoTDB.getInstance()
-        .setClientProvider(
-            new DataClientProvider(new Factory()) {
-              @Override
-              public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
-                return new AsyncDataClient(null, null, node, null) {
-                  @Override
-                  public void fetchMultSeries(
-                      RaftNode header,
-                      long readerId,
-                      List<String> paths,
-                      AsyncMethodCallback<Map<String, ByteBuffer>> resultHandler)
-                      throws TException {
-                    if (failedNodes.contains(node)) {
-                      throw new TException("Node down.");
-                    }
-
-                    new Thread(
-                            () -> {
-                              Map<String, ByteBuffer> stringByteBufferMap = Maps.newHashMap();
-                              if (batchUsed) {
-                                paths.forEach(
-                                    path -> {
-                                      stringByteBufferMap.put(path, ByteBuffer.allocate(0));
-                                    });
-                              } else {
-                                batchUsed = true;
-
-                                for (int i = 0; i < batchData.size(); i++) {
-                                  stringByteBufferMap.put(
-                                      paths.get(i), generateByteBuffer(batchData.get(i)));
-                                }
-
-                                resultHandler.onComplete(stringByteBufferMap);
-                              }
-                            })
-                        .start();
-                  }
-
-                  @Override
-                  public void queryMultSeries(
-                      MultSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler)
-                      throws TException {
-                    if (failedNodes.contains(node)) {
-                      throw new TException("Node down.");
-                    }
-
-                    new Thread(() -> resultHandler.onComplete(1L)).start();
-                  }
-                };
-              }
-            });
+    //    ClusterIoTDB.getInstance()

Review comment:
       same as above

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
##########
@@ -19,64 +19,142 @@
 
 package org.apache.iotdb.cluster.client.sync;
 
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.IClientPool;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.TSDataService.Client;
+import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TimeoutChangeableTransport;
 
+import org.apache.commons.pool2.KeyedPooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
+import java.net.SocketException;
+
 /**
  * Notice: Because a client will be returned to a pool immediately after a successful request, you
- * should not cache it anywhere else or there may be conflicts.
+ * should not cache it anywhere else.
  */
-// the two classes does not share a common parent and Java does not allow multiple extension
-@SuppressWarnings("common-java:DuplicatedBlocks")
-public class SyncDataClient extends TSDataServiceClient {
+// TODO: Refine the interfaces of TSDataService. TSDataService interfaces doesn't need extends
+// TODO: RaftService interfaces
+public class SyncDataClient extends Client {
+
+  private Node node;
+  private ClientCategory category;
+  private IClientPool clientPool;
 
-  /** @param prot this constructor just create a new instance, but do not open the connection */
   @TestOnly
   public SyncDataClient(TProtocol prot) {
     super(prot);
   }
 
-  SyncDataClient(TProtocolFactory protocolFactory, Node target, SyncClientPool pool)
+  public SyncDataClient(TProtocolFactory protocolFactory, Node node, ClientCategory category)
       throws TTransportException {
+
+    // the difference of the two clients lies in the port
     super(
-        protocolFactory,
-        target.getInternalIp(),
-        target.getDataPort(),
-        ClusterConstant.getConnectionTimeoutInMS(),
-        target,
-        pool);
+        protocolFactory.getProtocol(
+            RpcTransportFactory.INSTANCE.getTransport(
+                new TSocket(
+                    TConfigurationConst.defaultTConfiguration,
+                    node.getInternalIp(),
+                    ClientUtils.getPort(node, category),
+                    ClusterConstant.getConnectionTimeoutInMS()))));
+    this.node = node;
+    this.category = category;
+    getInputProtocol().getTransport().open();
+  }
+
+  public void setClientPool(IClientPool clientPool) {
+    this.clientPool = clientPool;
+  }
+
+  public void returnSelf() {
+    if (clientPool != null) clientPool.returnSyncClient(this, node, category);
+  }
+
+  public void setTimeout(int timeout) {
+    // the same transport is used in both input and output
+    ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
+  }
+
+  public void close() {
+    getInputProtocol().getTransport().close();
+  }
+
+  @TestOnly
+  public int getTimeout() throws SocketException {
+    return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
   }
 
   @Override
   public String toString() {
-    return String.format(
-        "SyncDataClient (ip = %s, port = %d, id = %d)",
-        target.getInternalIp(), target.getDataPort(), target.getNodeIdentifier());
+    return "Sync"
+        + category.getName()
+        + "{"
+        + "node="
+        + node
+        + ","
+        + "port="
+        + ClientUtils.getPort(node, category)
+        + '}';
+  }
+
+  public Node getNode() {
+    return node;
   }
 
-  public static class Factory implements SyncClientFactory {
+  public static class SyncDataClientFactory
+      implements KeyedPooledObjectFactory<Node, SyncDataClient> {
 
     private TProtocolFactory protocolFactory;
+    private ClientCategory category;
 
-    public Factory(TProtocolFactory protocolFactory) {
+    public SyncDataClientFactory(TProtocolFactory protocolFactory, ClientCategory category) {
       this.protocolFactory = protocolFactory;
+      this.category = category;
+    }
+
+    //  public String nodeInfo(Node node) {

Review comment:
       just remove these codes

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReaderTest.java
##########
@@ -182,100 +174,103 @@ public void testDefaultBatchStrategySelect() {
   }
 
   private void setAsyncDataClient() {
-    ClusterIoTDB.getInstance()
-        .setClientProvider(
-            new DataClientProvider(new Factory()) {
-              @Override
-              public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
-                return new AsyncDataClient(null, null, node, null) {
-                  @Override
-                  public void fetchMultSeries(
-                      RaftNode header,
-                      long readerId,
-                      List<String> paths,
-                      AsyncMethodCallback<Map<String, ByteBuffer>> resultHandler)
-                      throws TException {
-                    if (failedNodes.contains(node)) {
-                      throw new TException("Node down.");
-                    }
-
-                    new Thread(
-                            () -> {
-                              Map<String, ByteBuffer> stringByteBufferMap = Maps.newHashMap();
-                              if (batchUsed) {
-                                paths.forEach(
-                                    path -> {
-                                      stringByteBufferMap.put(path, ByteBuffer.allocate(0));
-                                    });
-                              } else {
-                                batchUsed = true;
-
-                                for (int i = 0; i < batchData.size(); i++) {
-                                  stringByteBufferMap.put(
-                                      paths.get(i), generateByteBuffer(batchData.get(i)));
-                                }
-
-                                resultHandler.onComplete(stringByteBufferMap);
-                              }
-                            })
-                        .start();
-                  }
-
-                  @Override
-                  public void queryMultSeries(
-                      MultSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler)
-                      throws TException {
-                    if (failedNodes.contains(node)) {
-                      throw new TException("Node down.");
-                    }
-
-                    new Thread(() -> resultHandler.onComplete(1L)).start();
-                  }
-                };
-              }
-            });
+    //    ClusterIoTDB.getInstance()
+    //        .setClientProvider(
+    //            new DataClientProvider(new Factory()) {
+    //              @Override
+    //              public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws
+    // IOException {
+    //                return new AsyncDataClient(null, null, node, null) {
+    //                  @Override
+    //                  public void fetchMultSeries(
+    //                      RaftNode header,
+    //                      long readerId,
+    //                      List<String> paths,
+    //                      AsyncMethodCallback<Map<String, ByteBuffer>> resultHandler)
+    //                      throws TException {
+    //                    if (failedNodes.contains(node)) {
+    //                      throw new TException("Node down.");
+    //                    }
+    //
+    //                    new Thread(
+    //                            () -> {

Review comment:
       same




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] LebronAl commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
LebronAl commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r710825090



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
##########
@@ -637,11 +631,33 @@ public void disablePrintClientConnectionErrorStack() {
     printClientConnectionErrorStack = false;
   }
 
+  public SyncDataClient getSyncDataClient(Node node, int readOperationTimeoutMS) {
+    SyncDataClient dataClient =
+        (SyncDataClient) clientManager.borrowSyncClient(node, ClientCategory.DATA);
+    if (dataClient != null) {
+      dataClient.setTimeout(readOperationTimeoutMS);
+    }
+    return dataClient;
+  }
+
+  public AsyncDataClient getAsyncDataClient(Node node, int readOperationTimeoutMS) {
+    try {
+      AsyncDataClient dataClient =
+          (AsyncDataClient) clientManager.borrowAsyncClient(node, ClientCategory.DATA);
+      if (dataClient != null) {
+        dataClient.setTimeout(readOperationTimeoutMS);
+      }
+      return dataClient;
+    } catch (IOException e) {
+      logger.warn("error when get async client", e);
+    }
+    return null;

Review comment:
       Instead of returning a NULL and cause a NPE, we might need to throw an exception to the upper layer?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] LebronAl commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
LebronAl commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r718077059



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * One should borrow the reusable client from this manager and return the client after use. The
+ * underlying client pool is powered by Apache Commons Pool. The class provided 3 default pool group
+ * according to current usage: RequestForwardClient, DataGroupClients, MetaGroupClients.
+ *
+ * <p>TODO: We can refine the client structure by reorg the interfaces defined in cluster-thrift.
+ */
+public class ClientManager implements IClientManager {
+
+  private static final Logger logger = LoggerFactory.getLogger(ClientManager.class);
+
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.AsyncClient>> asyncClientPoolMap;
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.Client>> syncClientPoolMap;
+  private ClientPoolFactory clientPoolFactory;
+
+  /**
+   * {@link ClientManager.Type#RequestForwardClient} represents the clients used to forward external
+   * client requests to proper node to handle such as query, insert request.
+   *
+   * <p>{@link ClientManager.Type#DataGroupClient} represents the clients used to appendEntry,
+   * appendEntries, sendHeartbeat, etc for data raft group.
+   *
+   * <p>{@link ClientManager.Type#MetaGroupClient} represents the clients used to appendEntry,
+   * appendEntries, sendHeartbeat, etc for meta raft group. *
+   */
+  public enum Type {
+    RequestForwardClient,

Review comment:
       add a comment?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientCategory.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.client;
+
+public enum ClientCategory {
+  META("MetaClient"),
+  META_HEARTBEAT("MetaHeartbeatClient"),
+  DATA("DataClient"),
+  DATA_HEARTBEAT("DataHeartbeatClient"),
+  SINGLE_MASTER("SingleMasterClient");

Review comment:
       change name and add some comments? such as `Data_Async_Append_Client`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] chengjianyun commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
chengjianyun commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r709816828



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/IClientPool.java
##########
@@ -0,0 +1,16 @@
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import java.io.IOException;
+
+public interface IClientPool {

Review comment:
       Yes, the I named it `IClientManager` at first and change to `IClientPool` as I think the interfaces are more like a pool actions. Change to `IClientManager` to make it consistency.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] chengjianyun commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
chengjianyun commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r710239336



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientCategory.java
##########
@@ -0,0 +1,19 @@
+package org.apache.iotdb.cluster.client;
+
+public enum ClientCategory {
+  META("MetaClient"),
+  META_HEARTBEAT("MetaHeartbeatClient"),
+  DATA("DataClient"),
+  DATA_HEARTBEAT("DataHeartbeatClient"),
+  SINGLE_MASTER("SingleMasterClient");

Review comment:
       `SINGLE_MASTER` is a special async data client type in which all connection share a single selector and is only. As I don't intend to break original logic too much, so this one is kept.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] LebronAl commented on pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
LebronAl commented on pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#issuecomment-930946514


   Previously there were two main concerns about clientPool: concurrency and the release of failing clients. Since `GenericKeyedObjectPool` is thread-safe, the former is resolved. Moreover the use of `setTestOnReturn` and `setTestOnBorrow` in `GenericKeyedObjectPoolConfig` also guarantees the failing client will not be used again. So I think the external behavior of the PR and the previous version should be consistent in theory. Since the test results show almost no performance differences, I will merge the more readable clientPool implementation.
   
   The failed CI has nothing to do with the branch and will be fixed during a cluster- branch merge.
   
   Thanks @chengjianyun  for your excellent contribution!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] jixuan1989 commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
jixuan1989 commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r710059326



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
##########
@@ -0,0 +1,180 @@
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.ClusterIoTDB;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * One should borrow the reusable client from this class and return the client after use. The
+ * underlying client pool is powered by Apache Commons Pool. The class provided 3 default pool group
+ * according to current usage: ClusterClients, DataGroupClients, MetaGroupClients.
+ *
+ * <p>ClusterClients implement the data query and insert interfaces such as query and non-query
+ * request
+ *
+ * <p>DataGroupClient implement the data group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>MetaGroupClient implement the meta group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>TODO: We can refine the client structure by reorg the interfaces defined in cluster-thrift.
+ */
+public class ClientManager implements IClientManager {
+
+  private static final Logger logger = LoggerFactory.getLogger(ClusterIoTDB.class);
+
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.AsyncClient>> asyncClientPoolMap;
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.Client>> syncClientPoolMap;
+  private ClientPoolFactory clientPoolFactory;
+
+  public enum Type {
+    ClusterClient,
+    DataGroupClient,
+    MetaGroupClient
+  }
+
+  public ClientManager(boolean isAsyncMode, Type type) {
+    clientPoolFactory = new ClientPoolFactory();
+    clientPoolFactory.setClientManager(this);
+    if (isAsyncMode) {
+      asyncClientPoolMap = Maps.newHashMap();
+      constructAsyncClientMap(type);
+    } else {
+      syncClientPoolMap = Maps.newHashMap();
+      constructSyncClientMap(type);
+    }
+  }
+
+  private void constructAsyncClientMap(Type type) {
+    switch (type) {
+      case ClusterClient:
+        asyncClientPoolMap.put(
+            ClientCategory.DATA, clientPoolFactory.createAsyncDataPool(ClientCategory.DATA));
+        break;
+      case MetaGroupClient:
+        asyncClientPoolMap.put(
+            ClientCategory.META,
+            clientPoolFactory.createAsyncMetaPool(ClientCategory.META));
+        asyncClientPoolMap.put(
+            ClientCategory.META_HEARTBEAT,
+            clientPoolFactory.createAsyncMetaPool(ClientCategory.META_HEARTBEAT));
+        break;
+      case DataGroupClient:
+        asyncClientPoolMap.put(
+            ClientCategory.DATA, clientPoolFactory.createAsyncDataPool(ClientCategory.DATA));
+        asyncClientPoolMap.put(
+            ClientCategory.DATA_HEARTBEAT,
+            clientPoolFactory.createAsyncDataPool(ClientCategory.DATA_HEARTBEAT));
+        asyncClientPoolMap.put(
+            ClientCategory.SINGLE_MASTER, clientPoolFactory.createSingleManagerAsyncDataPool());
+        break;
+      default:
+        logger.warn("unsupported ClientManager type: {}", type);
+        break;
+    }
+  }
+
+  private void constructSyncClientMap(Type type) {
+    switch (type) {
+      case ClusterClient:
+        syncClientPoolMap.put(
+            ClientCategory.DATA, clientPoolFactory.createSyncDataPool(ClientCategory.DATA));
+        break;
+      case MetaGroupClient:
+        syncClientPoolMap.put(
+            ClientCategory.META, clientPoolFactory.createSyncMetaPool(ClientCategory.META));
+        syncClientPoolMap.put(
+            ClientCategory.META_HEARTBEAT,
+            clientPoolFactory.createSyncMetaPool(ClientCategory.META_HEARTBEAT));
+        break;
+      case DataGroupClient:
+        syncClientPoolMap.put(
+            ClientCategory.DATA, clientPoolFactory.createSyncDataPool(ClientCategory.DATA));
+        syncClientPoolMap.put(
+            ClientCategory.DATA_HEARTBEAT,
+            clientPoolFactory.createSyncDataPool(ClientCategory.DATA_HEARTBEAT));
+        break;
+      default:
+        logger.warn("unsupported ClientManager type: {}", type);
+        break;
+    }
+  }
+
+  /**
+   * It's safe to convert: 1. RaftService.AsyncClient to TSDataService.AsyncClient when category is
+   * DATA or DATA_HEARTBEAT; 2. RaftService.AsyncClient to TSMetaService.AsyncClient when category
+   * is META or META_HEARTBEAT.
+   *
+   * @param category
+   * @return RaftService.AsyncClient

Review comment:
       add NULL declaration

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
##########
@@ -0,0 +1,180 @@
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.ClusterIoTDB;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * One should borrow the reusable client from this class and return the client after use. The
+ * underlying client pool is powered by Apache Commons Pool. The class provided 3 default pool group
+ * according to current usage: ClusterClients, DataGroupClients, MetaGroupClients.
+ *
+ * <p>ClusterClients implement the data query and insert interfaces such as query and non-query
+ * request
+ *
+ * <p>DataGroupClient implement the data group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>MetaGroupClient implement the meta group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>TODO: We can refine the client structure by reorg the interfaces defined in cluster-thrift.
+ */
+public class ClientManager implements IClientManager {
+
+  private static final Logger logger = LoggerFactory.getLogger(ClusterIoTDB.class);
+
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.AsyncClient>> asyncClientPoolMap;
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.Client>> syncClientPoolMap;
+  private ClientPoolFactory clientPoolFactory;
+
+  public enum Type {
+    ClusterClient,
+    DataGroupClient,
+    MetaGroupClient
+  }
+
+  public ClientManager(boolean isAsyncMode, Type type) {
+    clientPoolFactory = new ClientPoolFactory();
+    clientPoolFactory.setClientManager(this);
+    if (isAsyncMode) {
+      asyncClientPoolMap = Maps.newHashMap();
+      constructAsyncClientMap(type);
+    } else {
+      syncClientPoolMap = Maps.newHashMap();
+      constructSyncClientMap(type);
+    }
+  }
+
+  private void constructAsyncClientMap(Type type) {
+    switch (type) {
+      case ClusterClient:
+        asyncClientPoolMap.put(
+            ClientCategory.DATA, clientPoolFactory.createAsyncDataPool(ClientCategory.DATA));
+        break;
+      case MetaGroupClient:
+        asyncClientPoolMap.put(
+            ClientCategory.META,
+            clientPoolFactory.createAsyncMetaPool(ClientCategory.META));
+        asyncClientPoolMap.put(
+            ClientCategory.META_HEARTBEAT,
+            clientPoolFactory.createAsyncMetaPool(ClientCategory.META_HEARTBEAT));
+        break;
+      case DataGroupClient:
+        asyncClientPoolMap.put(
+            ClientCategory.DATA, clientPoolFactory.createAsyncDataPool(ClientCategory.DATA));
+        asyncClientPoolMap.put(
+            ClientCategory.DATA_HEARTBEAT,
+            clientPoolFactory.createAsyncDataPool(ClientCategory.DATA_HEARTBEAT));
+        asyncClientPoolMap.put(
+            ClientCategory.SINGLE_MASTER, clientPoolFactory.createSingleManagerAsyncDataPool());
+        break;
+      default:
+        logger.warn("unsupported ClientManager type: {}", type);
+        break;
+    }
+  }
+
+  private void constructSyncClientMap(Type type) {
+    switch (type) {
+      case ClusterClient:
+        syncClientPoolMap.put(
+            ClientCategory.DATA, clientPoolFactory.createSyncDataPool(ClientCategory.DATA));
+        break;
+      case MetaGroupClient:
+        syncClientPoolMap.put(
+            ClientCategory.META, clientPoolFactory.createSyncMetaPool(ClientCategory.META));
+        syncClientPoolMap.put(
+            ClientCategory.META_HEARTBEAT,
+            clientPoolFactory.createSyncMetaPool(ClientCategory.META_HEARTBEAT));
+        break;
+      case DataGroupClient:
+        syncClientPoolMap.put(
+            ClientCategory.DATA, clientPoolFactory.createSyncDataPool(ClientCategory.DATA));
+        syncClientPoolMap.put(
+            ClientCategory.DATA_HEARTBEAT,
+            clientPoolFactory.createSyncDataPool(ClientCategory.DATA_HEARTBEAT));
+        break;
+      default:
+        logger.warn("unsupported ClientManager type: {}", type);
+        break;
+    }
+  }
+
+  /**
+   * It's safe to convert: 1. RaftService.AsyncClient to TSDataService.AsyncClient when category is
+   * DATA or DATA_HEARTBEAT; 2. RaftService.AsyncClient to TSMetaService.AsyncClient when category
+   * is META or META_HEARTBEAT.
+   *
+   * @param category
+   * @return RaftService.AsyncClient
+   */
+  @Override
+  public RaftService.AsyncClient borrowAsyncClient(Node node, ClientCategory category) {
+    try {
+      return asyncClientPoolMap.get(category).borrowObject(node);
+    } catch (NullPointerException e) {
+      logger.error("No AsyncClient pool found for {}", category, e);
+    } catch (TException e) {
+      logger.error("AsyncClient transport error for {}", category, e);
+    } catch (Exception e) {
+      logger.error("AsyncClient error for {}", category, e);
+    }
+    return null;
+  }
+
+  /**
+   * It's safe to convert: 1. RaftService.Client to TSDataService.Client when category is DATA or
+   * DATA_HEARTBEAT; 2. RaftService.Client to TSMetaService.Client when category is META or
+   * META_HEARTBEAT.
+   *
+   * @param category
+   * @return RaftService.Client

Review comment:
       add NULL declaration




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] chengjianyun commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
chengjianyun commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r708797966



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientPoolFactory.java
##########
@@ -0,0 +1,89 @@
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+
+import java.time.Duration;
+
+public class ClientPoolFactory {
+
+  protected long waitClientTimeoutMS;
+  protected int maxConnectionForEachNode;
+  private TProtocolFactory protocolFactory;
+
+  ClientPoolFactory() {
+    ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
+    this.waitClientTimeoutMS = config.getWaitClientTimeoutMS();
+    this.maxConnectionForEachNode = config.getMaxClientPerNodePerMember();
+    protocolFactory =
+        config.isRpcThriftCompressionEnabled()
+            ? new TCompactProtocol.Factory()
+            : new TBinaryProtocol.Factory();
+  }
+
+  public static ClientPoolFactory getInstance() {
+    return ClientPoolProviderHolder.INSTANCE;
+  }
+
+  public GenericKeyedObjectPool<Node, RaftService.Client> createSyncDataPool(
+      ClientCategory category) {
+    GenericKeyedObjectPoolConfig config = new GenericKeyedObjectPoolConfig();

Review comment:
       Good idea.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] chengjianyun commented on pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
chengjianyun commented on pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#issuecomment-913975566


   reopen the PR after resolve conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] jixuan1989 commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
jixuan1989 commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r710052100



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientCategory.java
##########
@@ -0,0 +1,19 @@
+package org.apache.iotdb.cluster.client;
+
+public enum ClientCategory {
+  META("MetaClient"),
+  META_HEARTBEAT("MetaHeartbeatClient"),
+  DATA("DataClient"),
+  DATA_HEARTBEAT("DataHeartbeatClient"),
+  SINGLE_MASTER("SingleMasterClient");

Review comment:
       what is this for?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
##########
@@ -0,0 +1,180 @@
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.ClusterIoTDB;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * One should borrow the reusable client from this class and return the client after use. The
+ * underlying client pool is powered by Apache Commons Pool. The class provided 3 default pool group
+ * according to current usage: ClusterClients, DataGroupClients, MetaGroupClients.
+ *
+ * <p>ClusterClients implement the data query and insert interfaces such as query and non-query
+ * request
+ *
+ * <p>DataGroupClient implement the data group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>MetaGroupClient implement the meta group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>TODO: We can refine the client structure by reorg the interfaces defined in cluster-thrift.
+ */
+public class ClientManager implements IClientManager {
+
+  private static final Logger logger = LoggerFactory.getLogger(ClusterIoTDB.class);
+
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.AsyncClient>> asyncClientPoolMap;
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.Client>> syncClientPoolMap;
+  private ClientPoolFactory clientPoolFactory;
+
+  public enum Type {
+    ClusterClient,
+    DataGroupClient,
+    MetaGroupClient
+  }
+
+  public ClientManager(boolean isAsyncMode, Type type) {
+    clientPoolFactory = new ClientPoolFactory();
+    clientPoolFactory.setClientManager(this);
+    if (isAsyncMode) {
+      asyncClientPoolMap = Maps.newHashMap();
+      constructAsyncClientMap(type);
+    } else {
+      syncClientPoolMap = Maps.newHashMap();
+      constructSyncClientMap(type);
+    }
+  }
+
+  private void constructAsyncClientMap(Type type) {
+    switch (type) {
+      case ClusterClient:
+        asyncClientPoolMap.put(
+            ClientCategory.DATA, clientPoolFactory.createAsyncDataPool(ClientCategory.DATA));
+        break;
+      case MetaGroupClient:
+        asyncClientPoolMap.put(
+            ClientCategory.META,
+            clientPoolFactory.createAsyncMetaPool(ClientCategory.META));
+        asyncClientPoolMap.put(
+            ClientCategory.META_HEARTBEAT,
+            clientPoolFactory.createAsyncMetaPool(ClientCategory.META_HEARTBEAT));
+        break;
+      case DataGroupClient:
+        asyncClientPoolMap.put(
+            ClientCategory.DATA, clientPoolFactory.createAsyncDataPool(ClientCategory.DATA));
+        asyncClientPoolMap.put(
+            ClientCategory.DATA_HEARTBEAT,
+            clientPoolFactory.createAsyncDataPool(ClientCategory.DATA_HEARTBEAT));
+        asyncClientPoolMap.put(
+            ClientCategory.SINGLE_MASTER, clientPoolFactory.createSingleManagerAsyncDataPool());
+        break;
+      default:
+        logger.warn("unsupported ClientManager type: {}", type);
+        break;
+    }
+  }
+
+  private void constructSyncClientMap(Type type) {
+    switch (type) {
+      case ClusterClient:
+        syncClientPoolMap.put(
+            ClientCategory.DATA, clientPoolFactory.createSyncDataPool(ClientCategory.DATA));

Review comment:
       Why is  DATA Tyle.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] chengjianyun commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
chengjianyun commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r708791329



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/utils/ClientUtils.java
##########
@@ -19,52 +19,42 @@
 
 package org.apache.iotdb.cluster.utils;
 
-import org.apache.iotdb.cluster.client.async.AsyncDataClient;
-import org.apache.iotdb.cluster.client.async.AsyncDataHeartbeatClient;
-import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
-import org.apache.iotdb.cluster.client.async.AsyncMetaHeartbeatClient;
+import org.apache.iotdb.cluster.client.ClientCategory;
 import org.apache.iotdb.cluster.client.sync.SyncDataClient;
-import org.apache.iotdb.cluster.client.sync.SyncDataHeartbeatClient;
 import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
-import org.apache.iotdb.cluster.client.sync.SyncMetaHeartbeatClient;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
 
 public class ClientUtils {
 
   private ClientUtils() {
     // util class
   }
 
-  public static boolean isHeartbeatClientReady(AsyncClient client) {
-    if (client instanceof AsyncDataHeartbeatClient) {
-      return ((AsyncDataHeartbeatClient) client).isReady();
-    } else {
-      return ((AsyncMetaHeartbeatClient) client).isReady();
-    }
-  }
-
-  public static void putBackSyncHeartbeatClient(Client client) {
-    if (client instanceof SyncMetaHeartbeatClient) {
-      ((SyncMetaHeartbeatClient) client).putBack();
-    } else {
-      ((SyncDataHeartbeatClient) client).putBack();
-    }
-  }
-
-  public static void putBackSyncClient(Client client) {
-    if (client instanceof SyncDataClient) {
-      ((SyncDataClient) client).putBack();
-    } else if (client instanceof SyncMetaClient) {
-      ((SyncMetaClient) client).putBack();
+  public static int getPort(Node node, ClientCategory category) {
+    int port = -1;
+    if (category == ClientCategory.DATA) {
+      port = node.getDataPort();
+    } else if (ClientCategory.DATA_HEARTBEAT == category) {
+      port = node.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET;
+    } else if (ClientCategory.META == category) {
+      port = node.getMetaPort();
+    } else if (ClientCategory.META_HEARTBEAT == category) {
+      port = node.getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET;
+    } else if (ClientCategory.SINGLE_MASTER == category) {
+      // special data port type
+      port = node.getMetaPort();

Review comment:
       Sorry for the wrong comment, SINGLE_MASTER is a special type of meta.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] chengjianyun commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
chengjianyun commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r709809035



##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/client/ClientPoolFactoryTest.java
##########
@@ -0,0 +1,181 @@
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+import org.apache.iotdb.cluster.utils.ClientUtils;
+
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+public class ClientPoolFactoryTest {
+  private ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
+  private int maxClientPerNodePerMember;
+  private long waitClientTimeoutMS;
+
+  @Before
+  public void setUp() {
+    maxClientPerNodePerMember = clusterConfig.getMaxClientPerNodePerMember();
+    waitClientTimeoutMS = clusterConfig.getWaitClientTimeoutMS();
+    clusterConfig.setMaxClientPerNodePerMember(2);

Review comment:
       Sure. It waits until someone return the object to pool or timeout when reaches the wait timeout threshold.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] jixuan1989 commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
jixuan1989 commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r710055237



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
##########
@@ -0,0 +1,180 @@
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.ClusterIoTDB;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * One should borrow the reusable client from this class and return the client after use. The
+ * underlying client pool is powered by Apache Commons Pool. The class provided 3 default pool group
+ * according to current usage: ClusterClients, DataGroupClients, MetaGroupClients.
+ *
+ * <p>ClusterClients implement the data query and insert interfaces such as query and non-query
+ * request
+ *
+ * <p>DataGroupClient implement the data group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>MetaGroupClient implement the meta group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>TODO: We can refine the client structure by reorg the interfaces defined in cluster-thrift.
+ */
+public class ClientManager implements IClientManager {
+
+  private static final Logger logger = LoggerFactory.getLogger(ClusterIoTDB.class);
+
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.AsyncClient>> asyncClientPoolMap;
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.Client>> syncClientPoolMap;
+  private ClientPoolFactory clientPoolFactory;
+
+  public enum Type {
+    ClusterClient,
+    DataGroupClient,
+    MetaGroupClient
+  }
+
+  public ClientManager(boolean isAsyncMode, Type type) {
+    clientPoolFactory = new ClientPoolFactory();
+    clientPoolFactory.setClientManager(this);
+    if (isAsyncMode) {
+      asyncClientPoolMap = Maps.newHashMap();
+      constructAsyncClientMap(type);
+    } else {
+      syncClientPoolMap = Maps.newHashMap();
+      constructSyncClientMap(type);
+    }
+  }
+
+  private void constructAsyncClientMap(Type type) {
+    switch (type) {
+      case ClusterClient:
+        asyncClientPoolMap.put(
+            ClientCategory.DATA, clientPoolFactory.createAsyncDataPool(ClientCategory.DATA));
+        break;
+      case MetaGroupClient:
+        asyncClientPoolMap.put(
+            ClientCategory.META,
+            clientPoolFactory.createAsyncMetaPool(ClientCategory.META));
+        asyncClientPoolMap.put(
+            ClientCategory.META_HEARTBEAT,
+            clientPoolFactory.createAsyncMetaPool(ClientCategory.META_HEARTBEAT));
+        break;
+      case DataGroupClient:
+        asyncClientPoolMap.put(
+            ClientCategory.DATA, clientPoolFactory.createAsyncDataPool(ClientCategory.DATA));
+        asyncClientPoolMap.put(
+            ClientCategory.DATA_HEARTBEAT,
+            clientPoolFactory.createAsyncDataPool(ClientCategory.DATA_HEARTBEAT));
+        asyncClientPoolMap.put(

Review comment:
       Why SINGLE_MASTER here but no  SINGLE_MASTER in Sync Mode.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] wangchao316 removed a comment on pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
wangchao316 removed a comment on pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#issuecomment-919734397


   Hi, Thanks your contribution.
   I have some question for this pr.
   
   1. Check whether the refactoring performance is tested because the pool directly affects the query and insertion performance.
   2. Whether rolling restart of a node is supported, which does not affect external query or write.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] jixuan1989 commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
jixuan1989 commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r710052725



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
##########
@@ -0,0 +1,180 @@
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.ClusterIoTDB;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * One should borrow the reusable client from this class and return the client after use. The
+ * underlying client pool is powered by Apache Commons Pool. The class provided 3 default pool group
+ * according to current usage: ClusterClients, DataGroupClients, MetaGroupClients.
+ *
+ * <p>ClusterClients implement the data query and insert interfaces such as query and non-query
+ * request
+ *
+ * <p>DataGroupClient implement the data group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>MetaGroupClient implement the meta group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>TODO: We can refine the client structure by reorg the interfaces defined in cluster-thrift.
+ */
+public class ClientManager implements IClientManager {
+
+  private static final Logger logger = LoggerFactory.getLogger(ClusterIoTDB.class);
+
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.AsyncClient>> asyncClientPoolMap;
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.Client>> syncClientPoolMap;
+  private ClientPoolFactory clientPoolFactory;
+
+  public enum Type {
+    ClusterClient,
+    DataGroupClient,
+    MetaGroupClient
+  }
+
+  public ClientManager(boolean isAsyncMode, Type type) {
+    clientPoolFactory = new ClientPoolFactory();
+    clientPoolFactory.setClientManager(this);
+    if (isAsyncMode) {
+      asyncClientPoolMap = Maps.newHashMap();
+      constructAsyncClientMap(type);
+    } else {
+      syncClientPoolMap = Maps.newHashMap();
+      constructSyncClientMap(type);
+    }
+  }
+
+  private void constructAsyncClientMap(Type type) {
+    switch (type) {
+      case ClusterClient:
+        asyncClientPoolMap.put(
+            ClientCategory.DATA, clientPoolFactory.createAsyncDataPool(ClientCategory.DATA));
+        break;
+      case MetaGroupClient:
+        asyncClientPoolMap.put(
+            ClientCategory.META,
+            clientPoolFactory.createAsyncMetaPool(ClientCategory.META));
+        asyncClientPoolMap.put(
+            ClientCategory.META_HEARTBEAT,
+            clientPoolFactory.createAsyncMetaPool(ClientCategory.META_HEARTBEAT));
+        break;
+      case DataGroupClient:
+        asyncClientPoolMap.put(
+            ClientCategory.DATA, clientPoolFactory.createAsyncDataPool(ClientCategory.DATA));
+        asyncClientPoolMap.put(
+            ClientCategory.DATA_HEARTBEAT,
+            clientPoolFactory.createAsyncDataPool(ClientCategory.DATA_HEARTBEAT));
+        asyncClientPoolMap.put(
+            ClientCategory.SINGLE_MASTER, clientPoolFactory.createSingleManagerAsyncDataPool());
+        break;
+      default:
+        logger.warn("unsupported ClientManager type: {}", type);
+        break;
+    }
+  }
+
+  private void constructSyncClientMap(Type type) {
+    switch (type) {
+      case ClusterClient:
+        syncClientPoolMap.put(
+            ClientCategory.DATA, clientPoolFactory.createSyncDataPool(ClientCategory.DATA));

Review comment:
       Why is  DATA Type.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] LebronAl commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
LebronAl commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r710719501



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
##########
@@ -119,15 +123,13 @@
 
   private boolean allowReport = true;
 
-  /**
-   * hardLinkCleaner will periodically clean expired hardlinks created during snapshots
-   */
+  /** hardLinkCleaner will periodically clean expired hardlinks created during snapshots */
   private ScheduledExecutorService hardLinkCleanerThread;
 
   // currently, dataClientProvider is only used for those instances who do not belong to any

Review comment:
       why still unchanged...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] LebronAl commented on pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
LebronAl commented on pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#issuecomment-921533889


   Please run `mvn spotless:apply` before you push your commit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] LebronAl commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
LebronAl commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r708943662



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/IClientPool.java
##########
@@ -0,0 +1,16 @@
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import java.io.IOException;
+
+public interface IClientPool {

Review comment:
       We'd better unify the context. Either use `clientPool` or `clientManager`. Otherwise it would seem strange to have `ClientManager` inherit `IClientPool`

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
##########
@@ -0,0 +1,203 @@
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.ClusterIoTDB;
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * One should borrow the reusable client from this class and return the client after use. The
+ * underlying client pool is powered by Apache Commons Pool. The class provided 3 default pool group
+ * according to current usage: ClusterClients, DataGroupClients, MetaGroupClients.
+ *
+ * <p>ClusterClients implement the data query and insert interfaces such as query and non-query
+ * request
+ *
+ * <p>DataGroupClient implement the data group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>MetaGroupClient implement the meta group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>TODO: We can refine the client structure by reorg the interfaces defined in cluster-thrift.
+ */
+public class ClientManager implements IClientPool {
+
+  private static final Logger logger = LoggerFactory.getLogger(ClusterIoTDB.class);
+
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.AsyncClient>> asyncClientPoolMap;
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.Client>> syncClientPoolMap;
+
+  public enum Type {
+    ClusterClient,
+    DataGroupClient,
+    MetaGroupClient
+  }
+
+  public ClientManager(boolean isAsyncMode, Type type) {

Review comment:
       Maybe we don't need the `isAsyncMode` parameters here. We can judge this here by `ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()` ?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
##########
@@ -637,11 +640,28 @@ public void disablePrintClientConnectionErrorStack() {
     printClientConnectionErrorStack = false;
   }
 
+  public SyncDataClient getSyncDataClient(Node node, int readOperationTimeoutMS) {
+    SyncDataClient dataClient =
+        (SyncDataClient) clientManager.borrowSyncClient(node, ClientCategory.DATA);
+    if (dataClient != null) {
+      dataClient.setTimeout(readOperationTimeoutMS);
+    }
+    return dataClient;
+  }
+
+  public AsyncDataClient getAsyncDataClient(Node node, int readOperationTimeoutMS) {
+    AsyncDataClient dataClient =
+        (AsyncDataClient) clientManager.borrowAsyncClient(node, ClientCategory.DATA);

Review comment:
       same

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
##########
@@ -0,0 +1,203 @@
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.ClusterIoTDB;
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * One should borrow the reusable client from this class and return the client after use. The
+ * underlying client pool is powered by Apache Commons Pool. The class provided 3 default pool group
+ * according to current usage: ClusterClients, DataGroupClients, MetaGroupClients.
+ *
+ * <p>ClusterClients implement the data query and insert interfaces such as query and non-query
+ * request
+ *
+ * <p>DataGroupClient implement the data group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>MetaGroupClient implement the meta group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>TODO: We can refine the client structure by reorg the interfaces defined in cluster-thrift.
+ */
+public class ClientManager implements IClientPool {
+
+  private static final Logger logger = LoggerFactory.getLogger(ClusterIoTDB.class);
+
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.AsyncClient>> asyncClientPoolMap;
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.Client>> syncClientPoolMap;
+
+  public enum Type {
+    ClusterClient,
+    DataGroupClient,
+    MetaGroupClient
+  }
+
+  public ClientManager(boolean isAsyncMode, Type type) {
+    if (isAsyncMode) {
+      asyncClientPoolMap = Maps.newHashMap();
+      constructAsyncClientMap(type);
+    } else {
+      syncClientPoolMap = Maps.newHashMap();
+      constructSyncClientMap(type);
+    }
+  }
+
+  private void constructAsyncClientMap(Type type) {
+    switch (type) {
+      case ClusterClient:
+        asyncClientPoolMap.put(
+            ClientCategory.DATA,
+            ClientPoolFactory.getInstance().createAsyncDataPool(ClientCategory.DATA));
+        break;
+      case MetaGroupClient:
+        asyncClientPoolMap.put(
+            ClientCategory.META,
+            ClientPoolFactory.getInstance().createAsyncMetaPool(ClientCategory.META));
+        asyncClientPoolMap.put(
+            ClientCategory.META_HEARTBEAT,
+            ClientPoolFactory.getInstance().createAsyncMetaPool(ClientCategory.META_HEARTBEAT));
+        break;
+      case DataGroupClient:
+        asyncClientPoolMap.put(
+            ClientCategory.DATA,
+            ClientPoolFactory.getInstance().createAsyncDataPool(ClientCategory.DATA));
+        asyncClientPoolMap.put(
+            ClientCategory.DATA_HEARTBEAT,
+            ClientPoolFactory.getInstance().createAsyncDataPool(ClientCategory.DATA_HEARTBEAT));
+        asyncClientPoolMap.put(
+            ClientCategory.SINGLE_MASTER,
+            ClientPoolFactory.getInstance().createSingleManagerAsyncDataPool());
+        break;
+      default:
+        logger.warn("unsupported ClientManager type: {}", type);
+        break;
+    }
+  }
+
+  private void constructSyncClientMap(Type type) {
+    switch (type) {
+      case ClusterClient:
+        syncClientPoolMap.put(
+            ClientCategory.DATA,
+            ClientPoolFactory.getInstance().createSyncDataPool(ClientCategory.DATA));
+        break;
+      case MetaGroupClient:
+        syncClientPoolMap.put(
+            ClientCategory.META,
+            ClientPoolFactory.getInstance().createSyncMetaPool(ClientCategory.META));
+        syncClientPoolMap.put(
+            ClientCategory.META_HEARTBEAT,
+            ClientPoolFactory.getInstance().createSyncMetaPool(ClientCategory.META_HEARTBEAT));
+        break;
+      case DataGroupClient:
+        syncClientPoolMap.put(
+            ClientCategory.DATA,
+            ClientPoolFactory.getInstance().createSyncDataPool(ClientCategory.DATA));
+        syncClientPoolMap.put(
+            ClientCategory.DATA_HEARTBEAT,
+            ClientPoolFactory.getInstance().createSyncDataPool(ClientCategory.DATA_HEARTBEAT));
+        break;
+      default:
+        logger.warn("unsupported ClientManager type: {}", type);
+        break;
+    }
+  }
+
+  /**
+   * It's safe to convert: 1. RaftService.AsyncClient to TSDataService.AsyncClient when category is
+   * DATA or DATA_HEARTBEAT; 2. RaftService.AsyncClient to TSMetaService.AsyncClient when category
+   * is META or META_HEARTBEAT.
+   *
+   * @param category
+   * @return RaftService.AsyncClient
+   */
+  @Override
+  public RaftService.AsyncClient borrowAsyncClient(Node node, ClientCategory category) {
+    try {
+      RaftService.AsyncClient client = asyncClientPoolMap.get(category).borrowObject(node);
+      if (ClientCategory.DATA == category
+          || ClientCategory.DATA_HEARTBEAT == category
+          || ClientCategory.SINGLE_MASTER == category) {
+        ((AsyncDataClient) client).setClientPool(this);
+      } else {
+        ((AsyncMetaClient) client).setClientPool(this);
+      }
+      return client;
+    } catch (NullPointerException e) {
+      logger.error("No AsyncClient pool found for {}", category, e);
+    } catch (TException e) {
+      logger.error("AsyncClient transport error for {}", category, e);
+    } catch (Exception e) {
+      logger.error("AsyncClient error for {}", category, e);
+    }
+    return null;
+  }
+
+  /**
+   * It's safe to convert: 1. RaftService.Client to TSDataService.Client when category is DATA or
+   * DATA_HEARTBEAT; 2. RaftService.Client to TSMetaService.Client when category is META or
+   * META_HEARTBEAT.
+   *
+   * @param category
+   * @return RaftService.Client
+   */
+  @Override
+  public RaftService.Client borrowSyncClient(Node node, ClientCategory category) {
+    try {
+      RaftService.Client client = syncClientPoolMap.get(category).borrowObject(node);
+      if (ClientCategory.DATA == category || ClientCategory.DATA_HEARTBEAT == category) {
+        ((SyncDataClient) client).setClientPool(this);

Review comment:
       Why are we setting clientPool all the times? Can we just set it once when creating?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
##########
@@ -1449,42 +1401,96 @@ public TSStatus forwardPlanSync(
    *     the node cannot be reached.
    */
   public AsyncClient getAsyncClient(Node node) {
-    return getAsyncClient(node, asyncClientPool, true);
+    if (ClientCategory.META == getClientCategory()) {
+      return clientManager.borrowAsyncClient(node, ClientCategory.META);

Review comment:
       Use ternary operators?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
##########
@@ -1449,42 +1401,96 @@ public TSStatus forwardPlanSync(
    *     the node cannot be reached.
    */
   public AsyncClient getAsyncClient(Node node) {
-    return getAsyncClient(node, asyncClientPool, true);
+    if (ClientCategory.META == getClientCategory()) {
+      return clientManager.borrowAsyncClient(node, ClientCategory.META);
+    } else {
+      return clientManager.borrowAsyncClient(node, ClientCategory.DATA);
+    }
   }
 
   public AsyncClient getAsyncClient(Node node, boolean activatedOnly) {
-    return getAsyncClient(node, asyncClientPool, activatedOnly);
+    if (ClusterConstant.EMPTY_NODE.equals(node) || node == null) {
+      return null;
+    }
+
+    if (activatedOnly && !NodeStatusManager.getINSTANCE().isActivated(node)) {
+      return null;
+    }
+
+    return getAsyncClient(node);
   }
 
   public AsyncClient getSendLogAsyncClient(Node node) {
-    return getAsyncClient(node, asyncSendLogClientPool, true);
+    return clientManager.borrowAsyncClient(node, ClientCategory.SINGLE_MASTER);
+  }
+
+  /**
+   * NOTICE: ClientManager.returnClient() must be called after use. the caller needs to check to see
+   * if the return value is null
+   *
+   * @param node the node to connect
+   * @return the client if node is available, otherwise null
+   */
+  public Client getSyncClient(Node node) {
+    if (ClientCategory.META == getClientCategory()) {
+      return clientManager.borrowSyncClient(node, ClientCategory.META);

Review comment:
       Use ternary operators?

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
##########
@@ -176,75 +167,97 @@ public void setUp()
     NodeStatusManager.getINSTANCE().setMetaGroupMember(testMetaGroupMember);
     partialWriteEnabled = IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert();
     IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(false);
-    //    //TODO fixme : 恢复正常的provider
+
+    // TODO fixme : 恢复正常的provider

Review comment:
       remove Chinese?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
##########
@@ -1449,42 +1401,96 @@ public TSStatus forwardPlanSync(
    *     the node cannot be reached.
    */
   public AsyncClient getAsyncClient(Node node) {
-    return getAsyncClient(node, asyncClientPool, true);
+    if (ClientCategory.META == getClientCategory()) {
+      return clientManager.borrowAsyncClient(node, ClientCategory.META);
+    } else {
+      return clientManager.borrowAsyncClient(node, ClientCategory.DATA);
+    }
   }
 
   public AsyncClient getAsyncClient(Node node, boolean activatedOnly) {
-    return getAsyncClient(node, asyncClientPool, activatedOnly);
+    if (ClusterConstant.EMPTY_NODE.equals(node) || node == null) {
+      return null;
+    }
+
+    if (activatedOnly && !NodeStatusManager.getINSTANCE().isActivated(node)) {
+      return null;
+    }
+
+    return getAsyncClient(node);
   }
 
   public AsyncClient getSendLogAsyncClient(Node node) {
-    return getAsyncClient(node, asyncSendLogClientPool, true);
+    return clientManager.borrowAsyncClient(node, ClientCategory.SINGLE_MASTER);
+  }
+
+  /**
+   * NOTICE: ClientManager.returnClient() must be called after use. the caller needs to check to see
+   * if the return value is null
+   *
+   * @param node the node to connect
+   * @return the client if node is available, otherwise null
+   */
+  public Client getSyncClient(Node node) {
+    if (ClientCategory.META == getClientCategory()) {
+      return clientManager.borrowSyncClient(node, ClientCategory.META);
+    } else {
+      return clientManager.borrowSyncClient(node, ClientCategory.DATA);
+    }
   }
 
-  private AsyncClient getAsyncClient(Node node, AsyncClientPool pool, boolean activatedOnly) {
+  public Client getSyncClient(Node node, boolean activatedOnly) {
     if (ClusterConstant.EMPTY_NODE.equals(node) || node == null) {
       return null;
     }
-    try {
-      return pool.getClient(node, activatedOnly);
-    } catch (IOException e) {
-      logger.warn("{} cannot connect to node {}", name, node, e);
+
+    if (activatedOnly && !NodeStatusManager.getINSTANCE().isActivated(node)) {
       return null;
     }
+
+    return getSyncClient(node);
   }
 
   /**
-   * NOTICE: client.putBack() must be called after use. the caller needs to check to see if the
-   * return value is null
+   * Get an asynchronous heartbeat thrift client to the given node.
    *
-   * @param node the node to connect
-   * @return the client if node is available, otherwise null
+   * @return an asynchronous thrift client or null if the caller tries to connect the local node.
    */
-  public Client getSyncClient(Node node) {
-    return getSyncClient(syncClientPool, node, true);
+  public AsyncClient getAsyncHeartbeatClient(Node node) {
+    if (ClientCategory.META == getClientCategory()) {
+      return clientManager.borrowAsyncClient(node, ClientCategory.META_HEARTBEAT);
+    } else {
+      return clientManager.borrowAsyncClient(node, ClientCategory.DATA_HEARTBEAT);
+    }
   }
 
-  public Client getSyncClient(Node node, boolean activatedOnly) {
-    return getSyncClient(syncClientPool, node, activatedOnly);
+  /**
+   * NOTICE: client.putBack() must be called after use.
+   *
+   * @return the heartbeat client for the node
+   */
+  public Client getSyncHeartbeatClient(Node node) {
+    if (ClientCategory.META == getClientCategory()) {
+      return clientManager.borrowSyncClient(node, ClientCategory.META_HEARTBEAT);

Review comment:
       Use ternary operators?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
##########
@@ -119,15 +123,13 @@
 
   private boolean allowReport = true;
 
-  /**
-   * hardLinkCleaner will periodically clean expired hardlinks created during snapshots
-   */
+  /** hardLinkCleaner will periodically clean expired hardlinks created during snapshots */
   private ScheduledExecutorService hardLinkCleanerThread;
 
   // currently, dataClientProvider is only used for those instances who do not belong to any

Review comment:
       update `dataClientProvider`?

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
##########
@@ -545,13 +542,10 @@ public void collectMigrationStatus(AsyncMethodCallback<ByteBuffer> resultHandler
     metaGroupMember.setAppendLogThreadPool(testThreadPool);
     // TODO fixme : 恢复正常的provider

Review comment:
       remove Chinese?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
##########
@@ -906,16 +906,16 @@ public void setCoordinator(Coordinator coordinator) {
         if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
           AsyncDataClient client =
               ClusterIoTDB.getInstance()
-                  .getClientProvider()
                   .getAsyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
           result =
               SyncClientAdaptor.getUnregisteredMeasurements(
                   client, partitionGroup.getHeader(), seriesList);
         } else {
-          try (SyncDataClient syncDataClient =
-              ClusterIoTDB.getInstance()
-                  .getClientProvider()
-                  .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS())) {
+          SyncDataClient syncDataClient = null;
+          try {
+            syncDataClient =
+                ClusterIoTDB.getInstance()
+                    .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
             try {

Review comment:
       We may merge these two `try` as long as `syncDataClient =
                   ClusterIoTDB.getInstance()
                       .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS())` doesn't throw `TException `.
   
   There are so many similar places need be fixed as you abandon the `try-with-resource` grammer, you can refer to this [doc](https://shimo.im/docs/gWjV8HPJXJq9PdDr) to see why we need this judgement.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
##########
@@ -637,11 +640,28 @@ public void disablePrintClientConnectionErrorStack() {
     printClientConnectionErrorStack = false;
   }
 
+  public SyncDataClient getSyncDataClient(Node node, int readOperationTimeoutMS) {
+    SyncDataClient dataClient =
+        (SyncDataClient) clientManager.borrowSyncClient(node, ClientCategory.DATA);

Review comment:
       As `clientManager.borrowSyncClient(node, ClientCategory.DATA);` may return `null`, I think we should do something here, otherwise we'll have an NPE on the upper level calling this function.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
##########
@@ -19,64 +19,142 @@
 
 package org.apache.iotdb.cluster.client.sync;
 
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.IClientPool;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.TSDataService.Client;
+import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TimeoutChangeableTransport;
 
+import org.apache.commons.pool2.KeyedPooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
+import java.net.SocketException;
+
 /**
  * Notice: Because a client will be returned to a pool immediately after a successful request, you
- * should not cache it anywhere else or there may be conflicts.
+ * should not cache it anywhere else.
  */
-// the two classes does not share a common parent and Java does not allow multiple extension
-@SuppressWarnings("common-java:DuplicatedBlocks")
-public class SyncDataClient extends TSDataServiceClient {
+// TODO: Refine the interfaces of TSDataService. TSDataService interfaces doesn't need extends
+// TODO: RaftService interfaces
+public class SyncDataClient extends Client {
+
+  private Node node;
+  private ClientCategory category;
+  private IClientPool clientPool;
 
-  /** @param prot this constructor just create a new instance, but do not open the connection */
   @TestOnly
   public SyncDataClient(TProtocol prot) {
     super(prot);
   }
 
-  SyncDataClient(TProtocolFactory protocolFactory, Node target, SyncClientPool pool)
+  public SyncDataClient(TProtocolFactory protocolFactory, Node node, ClientCategory category)
       throws TTransportException {
+
+    // the difference of the two clients lies in the port
     super(
-        protocolFactory,
-        target.getInternalIp(),
-        target.getDataPort(),
-        ClusterConstant.getConnectionTimeoutInMS(),
-        target,
-        pool);
+        protocolFactory.getProtocol(
+            RpcTransportFactory.INSTANCE.getTransport(
+                new TSocket(
+                    TConfigurationConst.defaultTConfiguration,
+                    node.getInternalIp(),
+                    ClientUtils.getPort(node, category),
+                    ClusterConstant.getConnectionTimeoutInMS()))));
+    this.node = node;
+    this.category = category;
+    getInputProtocol().getTransport().open();
+  }
+
+  public void setClientPool(IClientPool clientPool) {
+    this.clientPool = clientPool;
+  }
+
+  public void returnSelf() {
+    if (clientPool != null) clientPool.returnSyncClient(this, node, category);
+  }
+
+  public void setTimeout(int timeout) {
+    // the same transport is used in both input and output
+    ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
+  }
+
+  public void close() {
+    getInputProtocol().getTransport().close();
+  }
+
+  @TestOnly
+  public int getTimeout() throws SocketException {
+    return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
   }
 
   @Override
   public String toString() {
-    return String.format(
-        "SyncDataClient (ip = %s, port = %d, id = %d)",
-        target.getInternalIp(), target.getDataPort(), target.getNodeIdentifier());
+    return "Sync"
+        + category.getName()
+        + "{"
+        + "node="
+        + node
+        + ","
+        + "port="
+        + ClientUtils.getPort(node, category)
+        + '}';
+  }
+
+  public Node getNode() {
+    return node;
   }
 
-  public static class Factory implements SyncClientFactory {
+  public static class SyncDataClientFactory
+      implements KeyedPooledObjectFactory<Node, SyncDataClient> {
 
     private TProtocolFactory protocolFactory;
+    private ClientCategory category;
 
-    public Factory(TProtocolFactory protocolFactory) {
+    public SyncDataClientFactory(TProtocolFactory protocolFactory, ClientCategory category) {
       this.protocolFactory = protocolFactory;
+      this.category = category;
+    }
+
+    //  public String nodeInfo(Node node) {
+    //    return String.format(
+    //        "MetaNode (listenIp = %s, HB port = %d, id = %d)",
+    //        node.getInternalIp(),
+    //        node.getMetaPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
+    //        node.getNodeIdentifier());
+    //  }
+
+    @Override
+    public void activateObject(Node node, PooledObject<SyncDataClient> pooledObject)

Review comment:
       I see~

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/client/ClientPoolFactoryTest.java
##########
@@ -0,0 +1,181 @@
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+import org.apache.iotdb.cluster.utils.ClientUtils;
+
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+public class ClientPoolFactoryTest {
+  private ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
+  private int maxClientPerNodePerMember;
+  private long waitClientTimeoutMS;
+
+  @Before
+  public void setUp() {
+    maxClientPerNodePerMember = clusterConfig.getMaxClientPerNodePerMember();
+    waitClientTimeoutMS = clusterConfig.getWaitClientTimeoutMS();
+    clusterConfig.setMaxClientPerNodePerMember(2);

Review comment:
       Can you add some tests on the object pool parameters? What happens if the threshold is exceeded? Does it block, wait, throw, or return NULL?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
##########
@@ -1449,42 +1401,96 @@ public TSStatus forwardPlanSync(
    *     the node cannot be reached.
    */
   public AsyncClient getAsyncClient(Node node) {
-    return getAsyncClient(node, asyncClientPool, true);
+    if (ClientCategory.META == getClientCategory()) {
+      return clientManager.borrowAsyncClient(node, ClientCategory.META);
+    } else {
+      return clientManager.borrowAsyncClient(node, ClientCategory.DATA);
+    }
   }
 
   public AsyncClient getAsyncClient(Node node, boolean activatedOnly) {
-    return getAsyncClient(node, asyncClientPool, activatedOnly);
+    if (ClusterConstant.EMPTY_NODE.equals(node) || node == null) {
+      return null;
+    }
+
+    if (activatedOnly && !NodeStatusManager.getINSTANCE().isActivated(node)) {
+      return null;
+    }
+
+    return getAsyncClient(node);
   }
 
   public AsyncClient getSendLogAsyncClient(Node node) {
-    return getAsyncClient(node, asyncSendLogClientPool, true);
+    return clientManager.borrowAsyncClient(node, ClientCategory.SINGLE_MASTER);
+  }
+
+  /**
+   * NOTICE: ClientManager.returnClient() must be called after use. the caller needs to check to see
+   * if the return value is null
+   *
+   * @param node the node to connect
+   * @return the client if node is available, otherwise null
+   */
+  public Client getSyncClient(Node node) {
+    if (ClientCategory.META == getClientCategory()) {
+      return clientManager.borrowSyncClient(node, ClientCategory.META);
+    } else {
+      return clientManager.borrowSyncClient(node, ClientCategory.DATA);
+    }
   }
 
-  private AsyncClient getAsyncClient(Node node, AsyncClientPool pool, boolean activatedOnly) {
+  public Client getSyncClient(Node node, boolean activatedOnly) {
     if (ClusterConstant.EMPTY_NODE.equals(node) || node == null) {
       return null;
     }
-    try {
-      return pool.getClient(node, activatedOnly);
-    } catch (IOException e) {
-      logger.warn("{} cannot connect to node {}", name, node, e);
+
+    if (activatedOnly && !NodeStatusManager.getINSTANCE().isActivated(node)) {
       return null;
     }
+
+    return getSyncClient(node);
   }
 
   /**
-   * NOTICE: client.putBack() must be called after use. the caller needs to check to see if the
-   * return value is null
+   * Get an asynchronous heartbeat thrift client to the given node.
    *
-   * @param node the node to connect
-   * @return the client if node is available, otherwise null
+   * @return an asynchronous thrift client or null if the caller tries to connect the local node.
    */
-  public Client getSyncClient(Node node) {
-    return getSyncClient(syncClientPool, node, true);
+  public AsyncClient getAsyncHeartbeatClient(Node node) {
+    if (ClientCategory.META == getClientCategory()) {
+      return clientManager.borrowAsyncClient(node, ClientCategory.META_HEARTBEAT);

Review comment:
       Use ternary operators?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] LebronAl merged pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
LebronAl merged pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] jixuan1989 commented on pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
jixuan1989 commented on pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#issuecomment-920932942


   The `File Changed` tab gets stuck...
   
   In SyncDataClient.java, what does `activateObject()`  means? why it just sets the timeout value? @LebronAl ?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] chengjianyun commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
chengjianyun commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r708806543



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
##########
@@ -19,64 +19,142 @@
 
 package org.apache.iotdb.cluster.client.sync;
 
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.IClientPool;
 import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.TSDataService.Client;
+import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TimeoutChangeableTransport;
 
+import org.apache.commons.pool2.KeyedPooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
+import java.net.SocketException;
+
 /**
  * Notice: Because a client will be returned to a pool immediately after a successful request, you
- * should not cache it anywhere else or there may be conflicts.
+ * should not cache it anywhere else.
  */
-// the two classes does not share a common parent and Java does not allow multiple extension
-@SuppressWarnings("common-java:DuplicatedBlocks")
-public class SyncDataClient extends TSDataServiceClient {
+// TODO: Refine the interfaces of TSDataService. TSDataService interfaces doesn't need extends
+// TODO: RaftService interfaces
+public class SyncDataClient extends Client {
+
+  private Node node;
+  private ClientCategory category;
+  private IClientPool clientPool;
 
-  /** @param prot this constructor just create a new instance, but do not open the connection */
   @TestOnly
   public SyncDataClient(TProtocol prot) {
     super(prot);
   }
 
-  SyncDataClient(TProtocolFactory protocolFactory, Node target, SyncClientPool pool)
+  public SyncDataClient(TProtocolFactory protocolFactory, Node node, ClientCategory category)
       throws TTransportException {
+
+    // the difference of the two clients lies in the port
     super(
-        protocolFactory,
-        target.getInternalIp(),
-        target.getDataPort(),
-        ClusterConstant.getConnectionTimeoutInMS(),
-        target,
-        pool);
+        protocolFactory.getProtocol(
+            RpcTransportFactory.INSTANCE.getTransport(
+                new TSocket(
+                    TConfigurationConst.defaultTConfiguration,
+                    node.getInternalIp(),
+                    ClientUtils.getPort(node, category),
+                    ClusterConstant.getConnectionTimeoutInMS()))));
+    this.node = node;
+    this.category = category;
+    getInputProtocol().getTransport().open();
+  }
+
+  public void setClientPool(IClientPool clientPool) {
+    this.clientPool = clientPool;
+  }
+
+  public void returnSelf() {
+    if (clientPool != null) clientPool.returnSyncClient(this, node, category);
+  }
+
+  public void setTimeout(int timeout) {
+    // the same transport is used in both input and output
+    ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
+  }
+
+  public void close() {
+    getInputProtocol().getTransport().close();
+  }
+
+  @TestOnly
+  public int getTimeout() throws SocketException {
+    return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
   }
 
   @Override
   public String toString() {
-    return String.format(
-        "SyncDataClient (ip = %s, port = %d, id = %d)",
-        target.getInternalIp(), target.getDataPort(), target.getNodeIdentifier());
+    return "Sync"
+        + category.getName()
+        + "{"
+        + "node="
+        + node
+        + ","
+        + "port="
+        + ClientUtils.getPort(node, category)
+        + '}';
+  }
+
+  public Node getNode() {
+    return node;
   }
 
-  public static class Factory implements SyncClientFactory {
+  public static class SyncDataClientFactory
+      implements KeyedPooledObjectFactory<Node, SyncDataClient> {
 
     private TProtocolFactory protocolFactory;
+    private ClientCategory category;
 
-    public Factory(TProtocolFactory protocolFactory) {
+    public SyncDataClientFactory(TProtocolFactory protocolFactory, ClientCategory category) {
       this.protocolFactory = protocolFactory;
+      this.category = category;
+    }
+
+    //  public String nodeInfo(Node node) {
+    //    return String.format(
+    //        "MetaNode (listenIp = %s, HB port = %d, id = %d)",
+    //        node.getInternalIp(),
+    //        node.getMetaPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
+    //        node.getNodeIdentifier());
+    //  }
+
+    @Override
+    public void activateObject(Node node, PooledObject<SyncDataClient> pooledObject)

Review comment:
       `activateObject` is called before the object is borrow by come application from pool to do some initialization. Leave it empty is ok for thrift client as we have open transport when do construct and make it validate when the object is returned. I will do some re-initialization here. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] chengjianyun closed pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
chengjianyun closed pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] chengjianyun commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
chengjianyun commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r708816387



##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReaderTest.java
##########
@@ -182,100 +174,103 @@ public void testDefaultBatchStrategySelect() {
   }
 
   private void setAsyncDataClient() {
-    ClusterIoTDB.getInstance()
-        .setClientProvider(
-            new DataClientProvider(new Factory()) {
-              @Override
-              public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
-                return new AsyncDataClient(null, null, node, null) {
-                  @Override
-                  public void fetchMultSeries(
-                      RaftNode header,
-                      long readerId,
-                      List<String> paths,
-                      AsyncMethodCallback<Map<String, ByteBuffer>> resultHandler)
-                      throws TException {
-                    if (failedNodes.contains(node)) {
-                      throw new TException("Node down.");
-                    }
-
-                    new Thread(
-                            () -> {
-                              Map<String, ByteBuffer> stringByteBufferMap = Maps.newHashMap();
-                              if (batchUsed) {
-                                paths.forEach(
-                                    path -> {
-                                      stringByteBufferMap.put(path, ByteBuffer.allocate(0));
-                                    });
-                              } else {
-                                batchUsed = true;
-
-                                for (int i = 0; i < batchData.size(); i++) {
-                                  stringByteBufferMap.put(
-                                      paths.get(i), generateByteBuffer(batchData.get(i)));
-                                }
-
-                                resultHandler.onComplete(stringByteBufferMap);
-                              }
-                            })
-                        .start();
-                  }
-
-                  @Override
-                  public void queryMultSeries(
-                      MultSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler)
-                      throws TException {
-                    if (failedNodes.contains(node)) {
-                      throw new TException("Node down.");
-                    }
-
-                    new Thread(() -> resultHandler.onComplete(1L)).start();
-                  }
-                };
-              }
-            });
+    //    ClusterIoTDB.getInstance()
+    //        .setClientProvider(
+    //            new DataClientProvider(new Factory()) {
+    //              @Override
+    //              public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws
+    // IOException {
+    //                return new AsyncDataClient(null, null, node, null) {
+    //                  @Override
+    //                  public void fetchMultSeries(
+    //                      RaftNode header,
+    //                      long readerId,
+    //                      List<String> paths,
+    //                      AsyncMethodCallback<Map<String, ByteBuffer>> resultHandler)
+    //                      throws TException {
+    //                    if (failedNodes.contains(node)) {
+    //                      throw new TException("Node down.");
+    //                    }
+    //
+    //                    new Thread(
+    //                            () -> {

Review comment:
       Should be replaced by setClientManager().




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] LebronAl removed a comment on pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
LebronAl removed a comment on pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#issuecomment-921533889


   Please run `mvn spotless:apply` before you push your commit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] jixuan1989 commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
jixuan1989 commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r710054912



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
##########
@@ -0,0 +1,180 @@
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.ClusterIoTDB;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * One should borrow the reusable client from this class and return the client after use. The
+ * underlying client pool is powered by Apache Commons Pool. The class provided 3 default pool group
+ * according to current usage: ClusterClients, DataGroupClients, MetaGroupClients.
+ *
+ * <p>ClusterClients implement the data query and insert interfaces such as query and non-query
+ * request
+ *
+ * <p>DataGroupClient implement the data group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>MetaGroupClient implement the meta group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>TODO: We can refine the client structure by reorg the interfaces defined in cluster-thrift.
+ */
+public class ClientManager implements IClientManager {
+
+  private static final Logger logger = LoggerFactory.getLogger(ClusterIoTDB.class);
+
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.AsyncClient>> asyncClientPoolMap;
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.Client>> syncClientPoolMap;
+  private ClientPoolFactory clientPoolFactory;
+
+  public enum Type {
+    ClusterClient,
+    DataGroupClient,
+    MetaGroupClient
+  }
+
+  public ClientManager(boolean isAsyncMode, Type type) {
+    clientPoolFactory = new ClientPoolFactory();
+    clientPoolFactory.setClientManager(this);
+    if (isAsyncMode) {
+      asyncClientPoolMap = Maps.newHashMap();
+      constructAsyncClientMap(type);
+    } else {
+      syncClientPoolMap = Maps.newHashMap();
+      constructSyncClientMap(type);
+    }
+  }
+
+  private void constructAsyncClientMap(Type type) {
+    switch (type) {
+      case ClusterClient:
+        asyncClientPoolMap.put(
+            ClientCategory.DATA, clientPoolFactory.createAsyncDataPool(ClientCategory.DATA));
+        break;
+      case MetaGroupClient:
+        asyncClientPoolMap.put(
+            ClientCategory.META,
+            clientPoolFactory.createAsyncMetaPool(ClientCategory.META));
+        asyncClientPoolMap.put(
+            ClientCategory.META_HEARTBEAT,
+            clientPoolFactory.createAsyncMetaPool(ClientCategory.META_HEARTBEAT));
+        break;
+      case DataGroupClient:
+        asyncClientPoolMap.put(
+            ClientCategory.DATA, clientPoolFactory.createAsyncDataPool(ClientCategory.DATA));
+        asyncClientPoolMap.put(
+            ClientCategory.DATA_HEARTBEAT,
+            clientPoolFactory.createAsyncDataPool(ClientCategory.DATA_HEARTBEAT));
+        asyncClientPoolMap.put(

Review comment:
       Why SINGLE_MASTER here but no  SINGLE_MASTER in Sync Mode.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] LebronAl commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
LebronAl commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r709929250



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
##########
@@ -0,0 +1,203 @@
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.ClusterIoTDB;
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * One should borrow the reusable client from this class and return the client after use. The
+ * underlying client pool is powered by Apache Commons Pool. The class provided 3 default pool group
+ * according to current usage: ClusterClients, DataGroupClients, MetaGroupClients.
+ *
+ * <p>ClusterClients implement the data query and insert interfaces such as query and non-query
+ * request
+ *
+ * <p>DataGroupClient implement the data group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>MetaGroupClient implement the meta group raft rpc interfaces such as appendEntry,
+ * appendEntries, sendHeartbeat, etc.
+ *
+ * <p>TODO: We can refine the client structure by reorg the interfaces defined in cluster-thrift.
+ */
+public class ClientManager implements IClientPool {
+
+  private static final Logger logger = LoggerFactory.getLogger(ClusterIoTDB.class);
+
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.AsyncClient>> asyncClientPoolMap;
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.Client>> syncClientPoolMap;
+
+  public enum Type {
+    ClusterClient,
+    DataGroupClient,
+    MetaGroupClient
+  }
+
+  public ClientManager(boolean isAsyncMode, Type type) {

Review comment:
       OK




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] chengjianyun commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
chengjianyun commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r710239336



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientCategory.java
##########
@@ -0,0 +1,19 @@
+package org.apache.iotdb.cluster.client;
+
+public enum ClientCategory {
+  META("MetaClient"),
+  META_HEARTBEAT("MetaHeartbeatClient"),
+  DATA("DataClient"),
+  DATA_HEARTBEAT("DataHeartbeatClient"),
+  SINGLE_MASTER("SingleMasterClient");

Review comment:
       `SINGLE_MASTER` is a special async data client type in which all connection share a single selector. As I don't intend to break original logic too much, so this one is kept.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] LebronAl commented on a change in pull request #3886: [cluster-refactor] refactor client pool

Posted by GitBox <gi...@apache.org>.
LebronAl commented on a change in pull request #3886:
URL: https://github.com/apache/iotdb/pull/3886#discussion_r718077059



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * One should borrow the reusable client from this manager and return the client after use. The
+ * underlying client pool is powered by Apache Commons Pool. The class provided 3 default pool group
+ * according to current usage: RequestForwardClient, DataGroupClients, MetaGroupClients.
+ *
+ * <p>TODO: We can refine the client structure by reorg the interfaces defined in cluster-thrift.
+ */
+public class ClientManager implements IClientManager {
+
+  private static final Logger logger = LoggerFactory.getLogger(ClientManager.class);
+
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.AsyncClient>> asyncClientPoolMap;
+  private Map<ClientCategory, KeyedObjectPool<Node, RaftService.Client>> syncClientPoolMap;
+  private ClientPoolFactory clientPoolFactory;
+
+  /**
+   * {@link ClientManager.Type#RequestForwardClient} represents the clients used to forward external
+   * client requests to proper node to handle such as query, insert request.
+   *
+   * <p>{@link ClientManager.Type#DataGroupClient} represents the clients used to appendEntry,
+   * appendEntries, sendHeartbeat, etc for data raft group.
+   *
+   * <p>{@link ClientManager.Type#MetaGroupClient} represents the clients used to appendEntry,
+   * appendEntries, sendHeartbeat, etc for meta raft group. *
+   */
+  public enum Type {
+    RequestForwardClient,

Review comment:
       add a comment?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/ClientCategory.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.client;
+
+public enum ClientCategory {
+  META("MetaClient"),
+  META_HEARTBEAT("MetaHeartbeatClient"),
+  DATA("DataClient"),
+  DATA_HEARTBEAT("DataHeartbeatClient"),
+  SINGLE_MASTER("SingleMasterClient");

Review comment:
       change name and add some comments? such as `Data_Async_Append_Client`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org