You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2021/10/27 06:53:35 UTC

[GitHub] [iotdb] jt2594838 commented on a change in pull request #4079: [IOTDB-1639] Refactoring the cluster class structure to make it consistent with the server module

jt2594838 commented on a change in pull request #4079:
URL: https://github.com/apache/iotdb/pull/4079#discussion_r735328532



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
##########
@@ -58,104 +62,161 @@ public AsyncDataClient(
 
   public AsyncDataClient(
       TProtocolFactory protocolFactory,
-      TAsyncClientManager clientManager,
+      TAsyncClientManager tClientManager,
       Node node,
-      AsyncClientPool pool)
+      ClientCategory category)
       throws IOException {
     // the difference of the two clients lies in the port
     super(
         protocolFactory,
-        clientManager,
+        tClientManager,
         TNonblockingSocketWrapper.wrap(
-            node.getInternalIp(), node.getDataPort(), RaftServer.getConnectionTimeoutInMS()));
+            node.getInternalIp(),
+            ClientUtils.getPort(node, category),
+            ClusterConstant.getConnectionTimeoutInMS()));
     this.node = node;
-    this.pool = pool;
+    this.category = category;
+  }
+
+  public AsyncDataClient(
+      TProtocolFactory protocolFactory,
+      TAsyncClientManager tClientManager,
+      Node node,
+      ClientCategory category,
+      IClientManager manager)
+      throws IOException {
+    this(protocolFactory, tClientManager, node, category);
+    this.clientManager = manager;
+  }
+
+  public void close() {
+    ___transport.close();
+    ___currentMethod = null;
+  }
+
+  public boolean isValid() {
+    return ___transport != null;
+  }
+
+  /**
+   * return self if clientPool is not null, the method doesn't need to call by user, it will trigger
+   * once client transport complete
+   */
+  private void returnSelf() {
+    logger.debug("return client: ", toString());
+    if (clientManager != null) clientManager.returnAsyncClient(this, node, category);

Review comment:
       Mind the code style here.

##########
File path: .github/workflows/client-go.yml
##########
@@ -13,6 +13,8 @@ on:
     branches:
       - master
       - 'rel/*'
+      #remove me when cluster- branch is merged
+      - cluster-

Review comment:
       It says "remove me" here.

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/client/ClientPoolFactoryTest.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+import org.apache.iotdb.cluster.utils.ClientUtils;
+
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public class ClientPoolFactoryTest {
+  private ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
+
+  private long mockMaxWaitTimeoutMs = 10 * 1000L;
+  private int mockMaxClientPerMember = 10;
+
+  private int maxClientPerNodePerMember = clusterConfig.getMaxClientPerNodePerMember();
+  private long waitClientTimeoutMS = clusterConfig.getWaitClientTimeoutMS();
+
+  private ClientPoolFactory clientPoolFactory;
+  private MockClientManager mockClientManager;
+
+  @Before
+  public void setUp() {
+    clusterConfig.setMaxClientPerNodePerMember(mockMaxClientPerMember);
+    clusterConfig.setWaitClientTimeoutMS(mockMaxWaitTimeoutMs);
+    clientPoolFactory = new ClientPoolFactory();
+    mockClientManager =
+        new MockClientManager() {
+          @Override
+          public void returnAsyncClient(
+              RaftService.AsyncClient client, Node node, ClientCategory category) {
+            assert (client == asyncClient);
+          }
+
+          @Override
+          public void returnSyncClient(
+              RaftService.Client client, Node node, ClientCategory category) {
+            Assert.assertTrue(client == syncClient);
+          }
+        };
+    clientPoolFactory.setClientManager(mockClientManager);
+  }
+
+  @After
+  public void tearDown() {
+    clusterConfig.setMaxClientPerNodePerMember(maxClientPerNodePerMember);
+    clusterConfig.setWaitClientTimeoutMS(waitClientTimeoutMS);
+  }
+
+  @Test
+  public void poolConfigTest() throws Exception {
+    GenericKeyedObjectPool<Node, RaftService.AsyncClient> pool =
+        clientPoolFactory.createAsyncDataPool(ClientCategory.DATA);
+    Node node = constructDefaultNode();
+
+    for (int i = 0; i < mockMaxClientPerMember; i++) {
+      RaftService.AsyncClient client = pool.borrowObject(node);
+      Assert.assertNotNull(client);
+    }
+
+    long timeStart = System.currentTimeMillis();
+    try {
+      pool.borrowObject(node);
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof NoSuchElementException);
+    } finally {
+      Assert.assertTrue(System.currentTimeMillis() - timeStart + 10 > mockMaxWaitTimeoutMs);
+    }

Review comment:
       What if there is no exception thrown?

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataClientTest.java
##########
@@ -4,85 +4,76 @@
 
 package org.apache.iotdb.cluster.client.async;
 
-import org.apache.iotdb.cluster.client.async.AsyncDataClient.SingleManagerFactory;
-import org.apache.iotdb.cluster.common.TestUtils;
+import org.apache.iotdb.cluster.client.BaseClientTest;
+import org.apache.iotdb.cluster.client.ClientCategory;
 import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.server.RaftServer;
-
-import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.async.TAsyncClientManager;
-import org.apache.thrift.protocol.TBinaryProtocol.Factory;
-import org.apache.thrift.transport.TNonblockingSocket;
-import org.junit.After;
+
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
-
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-public class AsyncDataClientTest {
+public class AsyncDataClientTest extends BaseClientTest {
 
   private final ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
-  private boolean isAsyncServer;
+  private TProtocolFactory protocolFactory;
 
   @Before
   public void setUp() {
-    isAsyncServer = config.isUseAsyncServer();
     config.setUseAsyncServer(true);
+    protocolFactory =
+        config.isRpcThriftCompressionEnabled()
+            ? new TCompactProtocol.Factory()
+            : new TBinaryProtocol.Factory();
   }
 
-  @After
-  public void tearDown() {
-    config.setUseAsyncServer(isAsyncServer);
+  @Test
+  public void testDataClient() throws Exception {
+
+    AsyncDataClient.AsyncDataClientFactory factory =
+        new AsyncDataClient.AsyncDataClientFactory(protocolFactory, ClientCategory.DATA);
+
+    AsyncDataClient dataClient = factory.makeObject(defaultNode).getObject();
+
+    assertEquals(
+        "AsyncDataClient{node=Node(internalIp:localhost, metaPort:9003, nodeIdentifier:0, "
+            + "dataPort:40010, clientPort:0, clientIp:localhost),port=40010}",
+        dataClient.toString());
+    assertCheck(dataClient);
   }
 
   @Test
-  public void test() throws IOException, TException {
-    AsyncClientPool asyncClientPool = new AsyncClientPool(new SingleManagerFactory(new Factory()));
-    AsyncDataClient client;
-    Node node = TestUtils.getNode(0);
-    client =
-        new AsyncDataClient(
-            new Factory(),
-            new TAsyncClientManager(),
-            new TNonblockingSocket(
-                node.getInternalIp(), node.getDataPort(), RaftServer.getConnectionTimeoutInMS()));
-    assertTrue(client.isReady());
-
-    client = (AsyncDataClient) asyncClientPool.getClient(TestUtils.getNode(0));
-
-    assertEquals(TestUtils.getNode(0), client.getNode());
-
-    client.matchTerm(
-        0,
-        0,
-        TestUtils.getRaftNode(0, 0),
-        new AsyncMethodCallback<Boolean>() {
-          @Override
-          public void onComplete(Boolean aBoolean) {
-            // do nothing
-          }
-
-          @Override
-          public void onError(Exception e) {
-            // do nothing
-          }
-        });
-    assertFalse(client.isReady());
-
-    client.onError(new Exception());
-    assertNull(client.getCurrMethod());
-    assertFalse(client.isReady());
+  public void testMetaHeartbeatClient() throws Exception {
+
+    AsyncDataClient.AsyncDataClientFactory factory =
+        new AsyncDataClient.AsyncDataClientFactory(protocolFactory, ClientCategory.DATA_HEARTBEAT);
+
+    AsyncDataClient dataClient = factory.makeObject(defaultNode).getObject();
 
     assertEquals(
-        "DataClient{node=ClusterNode{ internalIp='192.168.0.0', metaPort=9003, nodeIdentifier=0, dataPort=40010, clientPort=6667, clientIp='0.0.0.0'}}",
-        client.toString());
+        "AsyncDataHeartbeatClient{node=Node(internalIp:localhost, metaPort:9003, nodeIdentifier:0, "
+            + "dataPort:40010, clientPort:0, clientIp:localhost),port=40011}",
+        dataClient.toString());
+    assertCheck(dataClient);
+  }

Review comment:
       The test name is inconsistent with its content.

##########
File path: server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
##########
@@ -241,7 +241,9 @@ public static void cleanAllDir() throws IOException {
   }
 
   public static void cleanDir(String dir) throws IOException {
-    FileUtils.deleteDirectory(new File(dir));
+    synchronized (EnvironmentUtils.class) {
+      FileUtils.deleteDirectory(new File(dir));
+    }
   }

Review comment:
       How will `cleanDir` be called concurrently?

##########
File path: server/src/test/java/org/apache/iotdb/db/integration/IoTDBJMXTest.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+public class IoTDBJMXTest {
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvironmentUtils.envSetUp();
+    Class.forName(Config.JDBC_DRIVER_NAME);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void testThreadPool() {
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement(); ) {
+      // make sure two storage groups having no conflict when registering their JMX info (for their
+      // thread pools)
+      statement.execute("set storage group to root.sg1");
+      statement.execute("set storage group to root.sg2");
+      statement.execute("insert into root.sg1.d1 (time, s1) values (1, 1)");
+      statement.execute("insert into root.sg2.d1 (time, s1) values (1, 1)");
+    } catch (SQLException throwables) {
+      throwables.printStackTrace();
+    }
+  }

Review comment:
       How is the result checked?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1441,31 +1446,33 @@ public TSStatus processNonPartitionedMetaPlan(PhysicalPlan plan) {
     return result;
   }
 
-  /**
-   * Forward a non-query plan to the data port of "receiver"
-   *
-   * @param plan a non-query plan
-   * @param header to determine which DataGroupMember of "receiver" will process the request.
-   * @return a TSStatus indicating if the forwarding is successful.
-   */
-  private TSStatus forwardDataPlanAsync(PhysicalPlan plan, Node receiver, RaftNode header)
-      throws IOException {
-    RaftService.AsyncClient client =
-        getClientProvider().getAsyncDataClient(receiver, RaftServer.getWriteOperationTimeoutMS());
-    return forwardPlanAsync(plan, receiver, header, client);
-  }
-
-  private TSStatus forwardDataPlanSync(PhysicalPlan plan, Node receiver, RaftNode header)
-      throws IOException {
-    Client client;
-    try {
-      client =
-          getClientProvider().getSyncDataClient(receiver, RaftServer.getWriteOperationTimeoutMS());
-    } catch (TException e) {
-      throw new IOException(e);
-    }
-    return forwardPlanSync(plan, receiver, header, client);
-  }
+  //  /**
+  //   * Forward a non-query plan to the data port of "receiver"
+  //   *
+  //   * @param plan a non-query plan
+  //   * @param header to determine which DataGroupMember of "receiver" will process the request.
+  //   * @return a TSStatus indicating if the forwarding is successful.
+  //   */
+  //  private TSStatus forwardDataPlanAsync(PhysicalPlan plan, Node receiver, RaftNode header)
+  //      throws IOException {
+  //    RaftService.AsyncClient client =
+  //        getClientProvider()
+  //            .getAsyncDataClient(receiver, ClusterConstant.getWriteOperationTimeoutMS());
+  //    return forwardPlanAsync(plan, receiver, header, client);
+  //  }
+  //
+  //  private TSStatus forwardDataPlanSync(PhysicalPlan plan, Node receiver, RaftNode header)
+  //      throws IOException {
+  //    Client client;
+  //    try {
+  //      client =
+  //          getClientProvider()
+  //              .getSyncDataClient(receiver, ClusterConstant.getWriteOperationTimeoutMS());
+  //    } catch (TException e) {
+  //      throw new IOException(e);
+  //    }
+  //    return forwardPlanSync(plan, receiver, header, client);
+  //  }
 

Review comment:
       Remove this.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -326,59 +294,31 @@ public void start() {
   @Override
   void startBackGroundThreads() {
     super.startBackGroundThreads();
-    reportThread =
-        Executors.newSingleThreadScheduledExecutor(n -> new Thread(n, "NodeReportThread"));
-    hardLinkCleanerThread =
-        Executors.newSingleThreadScheduledExecutor(n -> new Thread(n, "HardLinkCleaner"));
   }
 
   /**
-   * Stop the heartbeat and catch-up thread pool, DataClusterServer, ClientServer and reportThread.
-   * Calling the method twice does not induce side effects.
+   * Stop the heartbeat and catch-up thread pool, DataClusterServer, ClusterTSServiceImpl and
+   * reportThread. Calling the method twice does not induce side effects.
    */
   @Override
   public void stop() {
     super.stop();
-    if (getDataClusterServer() != null) {
-      getDataClusterServer().stop();
-    }
-    if (getDataHeartbeatServer() != null) {
-      getDataHeartbeatServer().stop();
-    }
-    if (clientServer != null) {
-      clientServer.stop();
-    }
-    if (reportThread != null) {
-      reportThread.shutdownNow();
-      try {
-        reportThread.awaitTermination(THREAD_POLL_WAIT_TERMINATION_TIME_S, TimeUnit.SECONDS);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        logger.error("Unexpected interruption when waiting for reportThread to end", e);
-      }
-    }
-    if (hardLinkCleanerThread != null) {
-      hardLinkCleanerThread.shutdownNow();
-      try {
-        hardLinkCleanerThread.awaitTermination(
-            THREAD_POLL_WAIT_TERMINATION_TIME_S, TimeUnit.SECONDS);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        logger.error("Unexpected interruption when waiting for hardlinkCleaner to end", e);
-      }
-    }
     logger.info("{}: stopped", name);
   }
 
+  @Override
+  public ServiceType getID() {
+    return ServiceType.CLUSTER_META_ENGINE;
+  }
+
   /**
-   * Start DataClusterServer and ClientServer so this node will be able to respond to other nodes
-   * and clients.
+   * Start DataClusterServer and ClusterTSServiceImpl so this node will be able to respond to other
+   * nodes and clients.
    */
   protected void initSubServers() throws TTransportException, StartupException {
-    getDataClusterServer().start();
-    getDataHeartbeatServer().start();
-    clientServer.setCoordinator(this.coordinator);
-    clientServer.start();
+    //    getDataClusterServer().start();
+    //    getDataHeartbeatServer().start();
+    // TODO FIXME
   }

Review comment:
       Maybe this should be removed?

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -582,8 +508,9 @@ private boolean joinCluster(Node node, StartUpStatus startUpStatus)
     } else if (resp.getRespNum() == Response.RESPONSE_AGREE) {
       logger.info("Node {} admitted this node into the cluster", node);
       ByteBuffer partitionTableBuffer = resp.partitionTableBytes;
-      acceptPartitionTable(partitionTableBuffer, true);
-      getDataClusterServer().pullSnapshots();
+      acceptVerifiedPartitionTable(partitionTableBuffer, true);
+      // this should be called in ClusterIoTDB TODO
+      // getDataGroupEngine().pullSnapshots();
       return true;

Review comment:
       Check and remove this.

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/client/ClientPoolFactoryTest.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+import org.apache.iotdb.cluster.utils.ClientUtils;
+
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public class ClientPoolFactoryTest {
+  private ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
+
+  private long mockMaxWaitTimeoutMs = 10 * 1000L;
+  private int mockMaxClientPerMember = 10;
+
+  private int maxClientPerNodePerMember = clusterConfig.getMaxClientPerNodePerMember();
+  private long waitClientTimeoutMS = clusterConfig.getWaitClientTimeoutMS();
+
+  private ClientPoolFactory clientPoolFactory;
+  private MockClientManager mockClientManager;
+
+  @Before
+  public void setUp() {
+    clusterConfig.setMaxClientPerNodePerMember(mockMaxClientPerMember);
+    clusterConfig.setWaitClientTimeoutMS(mockMaxWaitTimeoutMs);
+    clientPoolFactory = new ClientPoolFactory();
+    mockClientManager =
+        new MockClientManager() {
+          @Override
+          public void returnAsyncClient(
+              RaftService.AsyncClient client, Node node, ClientCategory category) {
+            assert (client == asyncClient);

Review comment:
       Change to JUnit assertion.

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/client/ClientPoolFactoryTest.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+import org.apache.iotdb.cluster.utils.ClientUtils;
+
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public class ClientPoolFactoryTest {
+  private ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
+
+  private long mockMaxWaitTimeoutMs = 10 * 1000L;
+  private int mockMaxClientPerMember = 10;
+
+  private int maxClientPerNodePerMember = clusterConfig.getMaxClientPerNodePerMember();
+  private long waitClientTimeoutMS = clusterConfig.getWaitClientTimeoutMS();
+
+  private ClientPoolFactory clientPoolFactory;
+  private MockClientManager mockClientManager;
+
+  @Before
+  public void setUp() {
+    clusterConfig.setMaxClientPerNodePerMember(mockMaxClientPerMember);
+    clusterConfig.setWaitClientTimeoutMS(mockMaxWaitTimeoutMs);
+    clientPoolFactory = new ClientPoolFactory();
+    mockClientManager =
+        new MockClientManager() {
+          @Override
+          public void returnAsyncClient(
+              RaftService.AsyncClient client, Node node, ClientCategory category) {
+            assert (client == asyncClient);
+          }
+
+          @Override
+          public void returnSyncClient(
+              RaftService.Client client, Node node, ClientCategory category) {
+            Assert.assertTrue(client == syncClient);
+          }
+        };
+    clientPoolFactory.setClientManager(mockClientManager);
+  }
+
+  @After
+  public void tearDown() {
+    clusterConfig.setMaxClientPerNodePerMember(maxClientPerNodePerMember);
+    clusterConfig.setWaitClientTimeoutMS(waitClientTimeoutMS);
+  }
+
+  @Test
+  public void poolConfigTest() throws Exception {
+    GenericKeyedObjectPool<Node, RaftService.AsyncClient> pool =
+        clientPoolFactory.createAsyncDataPool(ClientCategory.DATA);
+    Node node = constructDefaultNode();
+
+    for (int i = 0; i < mockMaxClientPerMember; i++) {
+      RaftService.AsyncClient client = pool.borrowObject(node);
+      Assert.assertNotNull(client);
+    }
+
+    long timeStart = System.currentTimeMillis();
+    try {
+      pool.borrowObject(node);
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof NoSuchElementException);
+    } finally {
+      Assert.assertTrue(System.currentTimeMillis() - timeStart + 10 > mockMaxWaitTimeoutMs);
+    }
+  }
+
+  @Test
+  public void poolRecycleTest() throws Exception {
+    GenericKeyedObjectPool<Node, RaftService.AsyncClient> pool =
+        clientPoolFactory.createAsyncDataPool(ClientCategory.DATA);
+
+    Node node = constructDefaultNode();
+    List<RaftService.AsyncClient> clientList = new ArrayList<>();
+    for (int i = 0; i < pool.getMaxIdlePerKey(); i++) {
+      RaftService.AsyncClient client = pool.borrowObject(node);
+      Assert.assertNotNull(client);
+      clientList.add(client);
+    }
+
+    for (RaftService.AsyncClient client : clientList) {
+      pool.returnObject(node, client);
+    }
+
+    for (int i = 0; i < pool.getMaxIdlePerKey(); i++) {
+      RaftService.AsyncClient client = pool.borrowObject(node);
+      Assert.assertNotNull(client);
+      Assert.assertTrue(clientList.contains(client));
+    }
+  }

Review comment:
       If more clients than idles are borrowed and returned, will the pool destroy the overflowing ones?

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java
##########
@@ -44,23 +45,27 @@
   @Before
   public void setUp() throws Exception {
     initConfigs();
-    metaServer = new MetaClusterServer();
-    metaServer.start();
-    metaServer.buildCluster();
+    daemon = ClusterIoTDB.getInstance();
+    daemon.initLocalEngines();
+    DataGroupEngine.getInstance().resetFactory();
+    daemon.activeStartNodeMode();
   }
 
   @After
   public void tearDown() throws Exception {
-    metaServer.stop();
+    // TODO fixme
+    daemon.stop();

Review comment:
       Please make it clear what should be fixed.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1807,15 +1821,16 @@ public void applyRemoveNode(RemoveNodeLog removeNodeLog) {
         new Thread(
                 () -> {
                   try {
-                    Thread.sleep(RaftServer.getHeartbeatIntervalMs());
+                    Thread.sleep(ClusterConstant.getHeartbeatIntervalMs());
                   } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     // ignore
                   }
                   super.stop();
-                  if (clientServer != null) {
-                    clientServer.stop();
-                  }
+                  // TODO FIXME
+                  //                  if (clusterTSServiceImpl != null) {
+                  //                    clusterTSServiceImpl.stop();
+                  //                  }

Review comment:
       Check the TODOs.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -200,37 +181,22 @@
   private PartitionTable partitionTable;
   /** router calculates the partition groups that a partitioned plan should be sent to */
   private ClusterPlanRouter router;
-  /**
-   * each node contains multiple DataGroupMembers and they are managed by a DataClusterServer acting
-   * as a broker
-   */
-  private DataClusterServer dataClusterServer;
 
-  /** each node starts a data heartbeat server to transfer heartbeat requests */
-  private DataHeartbeatServer dataHeartbeatServer;
-
-  /**
-   * an override of TSServiceImpl, which redirect JDBC and Session requests to the MetaGroupMember
-   * so they can be processed cluster-wide
-   */
-  private ClientServer clientServer;
-
-  private DataClientProvider dataClientProvider;
-
-  /**
-   * a single thread pool, every "REPORT_INTERVAL_SEC" seconds, "reportThread" will print the status
-   * of all raft members in this node
-   */
-  private ScheduledExecutorService reportThread;
+  //  /**
+  //   * a single thread pool, every "REPORT_INTERVAL_SEC" seconds, "reportThread" will print the
+  // status
+  //   * of all raft members in this node
+  //   */
+  //  private ScheduledExecutorService reportThread;

Review comment:
       Remove them.

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/client/ClientManagerTest.java
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class ClientManagerTest extends BaseClientTest {
+
+  @Before
+  public void setUp() throws IOException {
+    startDataServer();
+    startMetaServer();
+    startDataHeartbeatServer();
+    startMetaHeartbeatServer();
+  }
+
+  @After
+  public void tearDown() throws IOException, InterruptedException {
+    stopDataServer();
+    stopMetaServer();
+    stopDataHeartbeatServer();
+    stopMetaHeartbeatServer();
+  }
+
+  @Test
+  public void syncClientManagersTest() throws Exception {
+    // ---------Sync cluster clients manager test------------
+    ClientManager clusterManager =
+        new ClientManager(false, ClientManager.Type.RequestForwardClient);
+    RaftService.Client syncClusterClient =
+        clusterManager.borrowSyncClient(defaultNode, ClientCategory.DATA);
+
+    Assert.assertNotNull(syncClusterClient);
+    Assert.assertTrue(syncClusterClient instanceof SyncDataClient);
+    Assert.assertEquals(((SyncDataClient) syncClusterClient).getNode(), defaultNode);
+    Assert.assertTrue(syncClusterClient.getInputProtocol().getTransport().isOpen());
+    ((SyncDataClient) syncClusterClient).returnSelf();
+
+    // cluster test
+    Assert.assertNull(clusterManager.borrowSyncClient(defaultNode, ClientCategory.DATA_HEARTBEAT));
+    Assert.assertNull(clusterManager.borrowSyncClient(defaultNode, ClientCategory.META));
+    Assert.assertNull(clusterManager.borrowSyncClient(defaultNode, ClientCategory.META_HEARTBEAT));
+
+    Assert.assertNull(clusterManager.borrowAsyncClient(defaultNode, ClientCategory.DATA));
+    Assert.assertNull(clusterManager.borrowAsyncClient(defaultNode, ClientCategory.DATA_HEARTBEAT));
+    Assert.assertNull(clusterManager.borrowAsyncClient(defaultNode, ClientCategory.META));
+    Assert.assertNull(clusterManager.borrowAsyncClient(defaultNode, ClientCategory.META_HEARTBEAT));
+
+    // ---------Sync meta(meta heartbeat) clients manager test------------
+    ClientManager metaManager = new ClientManager(false, ClientManager.Type.MetaGroupClient);
+    RaftService.Client metaClient = metaManager.borrowSyncClient(defaultNode, ClientCategory.META);
+    Assert.assertNotNull(metaClient);
+    Assert.assertTrue(metaClient instanceof SyncMetaClient);
+    Assert.assertEquals(((SyncMetaClient) metaClient).getNode(), defaultNode);
+    Assert.assertTrue(metaClient.getInputProtocol().getTransport().isOpen());
+    ((SyncMetaClient) metaClient).returnSelf();
+
+    RaftService.Client metaHeartClient =
+        metaManager.borrowSyncClient(defaultNode, ClientCategory.META_HEARTBEAT);
+    Assert.assertNotNull(metaHeartClient);
+    Assert.assertTrue(metaHeartClient instanceof SyncMetaClient);
+    Assert.assertEquals(((SyncMetaClient) metaHeartClient).getNode(), defaultNode);
+    Assert.assertTrue(metaHeartClient.getInputProtocol().getTransport().isOpen());
+    ((SyncMetaClient) metaHeartClient).returnSelf();
+
+    // cluster test
+    Assert.assertNull(metaManager.borrowSyncClient(defaultNode, ClientCategory.DATA));
+    Assert.assertNull(metaManager.borrowSyncClient(defaultNode, ClientCategory.DATA_HEARTBEAT));
+
+    Assert.assertNull(metaManager.borrowAsyncClient(defaultNode, ClientCategory.DATA));
+    Assert.assertNull(metaManager.borrowAsyncClient(defaultNode, ClientCategory.DATA_HEARTBEAT));
+    Assert.assertNull(metaManager.borrowAsyncClient(defaultNode, ClientCategory.META));
+    Assert.assertNull(metaManager.borrowAsyncClient(defaultNode, ClientCategory.META_HEARTBEAT));
+
+    // ---------Sync data(data heartbeat) clients manager test------------
+    ClientManager dataManager = new ClientManager(false, ClientManager.Type.DataGroupClient);
+
+    RaftService.Client dataClient = dataManager.borrowSyncClient(defaultNode, ClientCategory.DATA);
+    Assert.assertNotNull(dataClient);
+    Assert.assertTrue(dataClient instanceof SyncDataClient);
+    Assert.assertEquals(((SyncDataClient) dataClient).getNode(), defaultNode);
+    Assert.assertTrue(dataClient.getInputProtocol().getTransport().isOpen());
+    ((SyncDataClient) dataClient).returnSelf();
+
+    RaftService.Client dataHeartClient =
+        dataManager.borrowSyncClient(defaultNode, ClientCategory.DATA_HEARTBEAT);
+    Assert.assertNotNull(dataHeartClient);
+    Assert.assertTrue(dataHeartClient instanceof SyncDataClient);
+    Assert.assertEquals(((SyncDataClient) dataHeartClient).getNode(), defaultNode);
+    Assert.assertTrue(dataHeartClient.getInputProtocol().getTransport().isOpen());
+    ((SyncDataClient) dataHeartClient).returnSelf();
+
+    // cluster test
+    Assert.assertNull(dataManager.borrowSyncClient(defaultNode, ClientCategory.META));
+    Assert.assertNull(dataManager.borrowSyncClient(defaultNode, ClientCategory.META_HEARTBEAT));
+
+    Assert.assertNull(dataManager.borrowAsyncClient(defaultNode, ClientCategory.DATA));
+    Assert.assertNull(dataManager.borrowAsyncClient(defaultNode, ClientCategory.DATA_HEARTBEAT));
+    Assert.assertNull(dataManager.borrowAsyncClient(defaultNode, ClientCategory.META));
+    Assert.assertNull(dataManager.borrowAsyncClient(defaultNode, ClientCategory.META_HEARTBEAT));
+  }
+
+  @Test
+  public void asyncClientManagersTest() throws Exception {
+    // ---------async cluster clients manager test------------
+    ClientManager clusterManager = new ClientManager(true, ClientManager.Type.RequestForwardClient);
+    RaftService.AsyncClient clusterClient =
+        clusterManager.borrowAsyncClient(defaultNode, ClientCategory.DATA);
+
+    Assert.assertNotNull(clusterClient);
+    Assert.assertTrue(clusterClient instanceof AsyncDataClient);
+    Assert.assertEquals(((AsyncDataClient) clusterClient).getNode(), defaultNode);
+    Assert.assertTrue(((AsyncDataClient) clusterClient).isValid());
+    Assert.assertTrue(((AsyncDataClient) clusterClient).isReady());

Review comment:
       Maybe you can invalidate the client and test that we can no longer get it from the manager.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
##########
@@ -120,7 +121,9 @@ private TimeValuePair performPreviousFill(
     }
     CountDownLatch latch = new CountDownLatch(partitionGroups.size());
     PreviousFillHandler handler = new PreviousFillHandler(latch);
-
+    // TODO it is not suitable for register and deregister an Object to JMX to such a frequent
+    // function call.
+    // BUT is it suitable to create a thread pool for each calling??
     ExecutorService fillService = Executors.newFixedThreadPool(partitionGroups.size());

Review comment:
       These were temporary solutions, and I expected you to fix them within this PR. However, they were initially for full concurrency, we did not want some failing connections to block others.

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataClientTest.java
##########
@@ -4,123 +4,107 @@
 
 package org.apache.iotdb.cluster.client.sync;
 
-import org.apache.iotdb.cluster.client.sync.SyncDataClient.FactorySync;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
-import org.apache.iotdb.rpc.TSocketWrapper;
+import org.apache.iotdb.cluster.client.BaseClientTest;
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
 
 import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.net.ServerSocket;
+import java.net.SocketException;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 
-public class SyncDataClientTest {
+public class SyncDataClientTest extends BaseClientTest {
 
-  @Test
-  public void test() throws IOException, InterruptedException {
-    Node node = new Node();
-    node.setDataPort(40010).setInternalIp("localhost").setClientIp("localhost");
-    ServerSocket serverSocket = new ServerSocket(node.getDataPort());
-    Thread listenThread =
-        new Thread(
-            () -> {
-              while (!Thread.interrupted()) {
-                try {
-                  serverSocket.accept();
-                } catch (IOException e) {
-                  return;
-                }
-              }
-            });
-    listenThread.start();
+  private TProtocolFactory protocolFactory;
+
+  @Before
+  public void setUp() {
+    protocolFactory =
+        ClusterDescriptor.getInstance().getConfig().isRpcThriftCompressionEnabled()
+            ? new TCompactProtocol.Factory()
+            : new TBinaryProtocol.Factory();
+  }
 
+  @Test
+  public void testDataClient() throws IOException, InterruptedException, TTransportException {
     try {
-      SyncClientPool syncClientPool = new SyncClientPool(new FactorySync(new Factory()));
-      SyncDataClient client;
-      client = (SyncDataClient) syncClientPool.getClient(node);
+      startDataServer();
+      SyncDataClient dataClient =
+          new SyncDataClient(protocolFactory, defaultNode, ClientCategory.DATA);
 
-      assertEquals(node, client.getNode());
+      assertEquals(
+          "SyncDataClient{node=Node(internalIp:localhost, metaPort:9003, nodeIdentifier:0, "
+              + "dataPort:40010, clientPort:0, clientIp:localhost),port=40010}",
+          dataClient.toString());
 
-      client.setTimeout(1000);
-      assertEquals(1000, client.getTimeout());
+      assertCheck(dataClient);
 
-      client.putBack();
-      Client newClient = syncClientPool.getClient(node);
-      assertEquals(client, newClient);
-      assertTrue(client.getInputProtocol().getTransport().isOpen());
+      dataClient =
+          new SyncDataClient.SyncDataClientFactory(protocolFactory, ClientCategory.DATA)
+              .makeObject(defaultNode)
+              .getObject();
 
       assertEquals(
-          "DataClient{node=ClusterNode{ internalIp='localhost', metaPort=0, nodeIdentifier=0,"
-              + " dataPort=40010, clientPort=0, clientIp='localhost'}}",
-          client.toString());
-
-      client =
-          new SyncDataClient(
-              new TBinaryProtocol(TSocketWrapper.wrap(node.getInternalIp(), node.getDataPort())));
-      // client without a belong pool will be closed after putBack()
-      client.putBack();
-      assertFalse(client.getInputProtocol().getTransport().isOpen());
+          "SyncDataClient{node=Node(internalIp:localhost, metaPort:9003, nodeIdentifier:0, "
+              + "dataPort:40010, clientPort:0, clientIp:localhost),port=40010}",
+          dataClient.toString());
+
+      assertCheck(dataClient);
+    } catch (Exception e) {
+      e.printStackTrace();

Review comment:
       Maybe the test should be failed here.

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/client/ClientManagerTest.java
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class ClientManagerTest extends BaseClientTest {
+
+  @Before
+  public void setUp() throws IOException {
+    startDataServer();
+    startMetaServer();
+    startDataHeartbeatServer();
+    startMetaHeartbeatServer();
+  }
+
+  @After
+  public void tearDown() throws IOException, InterruptedException {
+    stopDataServer();
+    stopMetaServer();
+    stopDataHeartbeatServer();
+    stopMetaHeartbeatServer();
+  }
+
+  @Test
+  public void syncClientManagersTest() throws Exception {
+    // ---------Sync cluster clients manager test------------
+    ClientManager clusterManager =
+        new ClientManager(false, ClientManager.Type.RequestForwardClient);
+    RaftService.Client syncClusterClient =
+        clusterManager.borrowSyncClient(defaultNode, ClientCategory.DATA);
+
+    Assert.assertNotNull(syncClusterClient);
+    Assert.assertTrue(syncClusterClient instanceof SyncDataClient);
+    Assert.assertEquals(((SyncDataClient) syncClusterClient).getNode(), defaultNode);
+    Assert.assertTrue(syncClusterClient.getInputProtocol().getTransport().isOpen());
+    ((SyncDataClient) syncClusterClient).returnSelf();
+

Review comment:
       Is there any method to check whether the client is returned? Maybe you can get the client again after it is returned and check that they are the same reference.

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/query/reader/DatasourceInfoTest.java
##########
@@ -48,20 +50,35 @@
   @Before
   public void setUp() {
     metaGroupMember = new TestMetaGroupMember();
-    metaGroupMember.setClientProvider(
-        new DataClientProvider(new Factory()) {
-          @Override
-          public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
-            return new AsyncDataClient(null, null, TestUtils.getNode(0), null) {
+    ClusterIoTDB.getInstance()
+        .setClientManager(
+            new IClientManager() {

Review comment:
       Static fields between tests should be restored after each tests, or some unexpected results may occur when adding further tests.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
##########
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster;
+
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.ClientManager;
+import org.apache.iotdb.cluster.client.IClientManager;
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.coordinator.Coordinator;
+import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
+import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.metadata.CMManager;
+import org.apache.iotdb.cluster.metadata.MetaPuller;
+import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
+import org.apache.iotdb.cluster.partition.slot.SlotStrategy;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.ClusterRPCService;
+import org.apache.iotdb.cluster.server.ClusterTSServiceImpl;
+import org.apache.iotdb.cluster.server.HardLinkCleaner;
+import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer;
+import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.server.monitor.NodeReport;
+import org.apache.iotdb.cluster.server.raft.DataRaftHeartBeatService;
+import org.apache.iotdb.cluster.server.raft.DataRaftService;
+import org.apache.iotdb.cluster.server.raft.MetaRaftHeartBeatService;
+import org.apache.iotdb.cluster.server.raft.MetaRaftService;
+import org.apache.iotdb.cluster.server.service.DataGroupEngine;
+import org.apache.iotdb.cluster.server.service.DataGroupServiceImpls;
+import org.apache.iotdb.cluster.server.service.MetaAsyncService;
+import org.apache.iotdb.cluster.server.service.MetaSyncService;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBConfigCheck;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.ConfigurationException;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.service.JMXService;
+import org.apache.iotdb.db.service.RegisterManager;
+import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+import org.apache.iotdb.db.utils.TestOnly;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
+import static org.apache.iotdb.cluster.utils.ClusterUtils.UNKNOWN_CLIENT_IP;
+
+// we do not inherent IoTDB instance, as it may break the singleton mode of IoTDB.
+public class ClusterIoTDB implements ClusterIoTDBMBean {
+
+  private static final Logger logger = LoggerFactory.getLogger(ClusterIoTDB.class);
+  private final String mbeanName =
+      String.format(
+          "%s:%s=%s", "org.apache.iotdb.cluster.service", IoTDBConstant.JMX_TYPE, "ClusterIoTDB");
+
+  // TODO fix me: better to throw exception if the client can not be get. Then we can remove this
+  // field.
+  public static boolean printClientConnectionErrorStack = false;
+
+  // establish the cluster as a seed
+  private static final String MODE_START = "-s";
+  // join an established cluster
+  private static final String MODE_ADD = "-a";
+  // send a request to remove a node, more arguments: ip-of-removed-node
+  // metaport-of-removed-node
+  private static final String MODE_REMOVE = "-r";
+
+  private MetaGroupMember metaGroupEngine;
+
+  // split DataGroupServiceImpls into engine and impls
+  private DataGroupEngine dataGroupEngine;
+
+  private Node thisNode;
+  private Coordinator coordinator;
+
+  private final IoTDB iotdb = IoTDB.getInstance();
+
+  // Cluster IoTDB uses a individual registerManager with its parent.
+  private RegisterManager registerManager = new RegisterManager();
+
+  /**
+   * a single thread pool, every "REPORT_INTERVAL_SEC" seconds, "reportThread" will print the status
+   * of all raft members in this node
+   */
+  private ScheduledExecutorService reportThread;
+
+  private boolean allowReport = true;
+
+  /** hardLinkCleaner will periodically clean expired hardlinks created during snapshots */
+  private ScheduledExecutorService hardLinkCleanerThread;
+
+  // currently, clientManager is only used for those instances who do not belong to any
+  // DataGroup..
+  private IClientManager clientManager;
+
+  private ClusterIoTDB() {
+    // we do not init anything here, so that we can re-initialize the instance in IT.
+  }
+
+  public void initLocalEngines() {
+    ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
+    thisNode = new Node();
+    // set internal rpc ip and ports
+    thisNode.setInternalIp(config.getInternalIp());
+    thisNode.setMetaPort(config.getInternalMetaPort());
+    thisNode.setDataPort(config.getInternalDataPort());
+    // set client rpc ip and ports
+    thisNode.setClientPort(config.getClusterRpcPort());
+    thisNode.setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress());
+    coordinator = new Coordinator();
+    // local engine
+    TProtocolFactory protocolFactory =
+        ThriftServiceThread.getProtocolFactory(
+            IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable());
+    metaGroupEngine = new MetaGroupMember(protocolFactory, thisNode, coordinator);
+    IoTDB.setClusterMode();
+    IoTDB.setMetaManager(CMManager.getInstance());
+    ((CMManager) IoTDB.metaManager).setMetaGroupMember(metaGroupEngine);
+    ((CMManager) IoTDB.metaManager).setCoordinator(coordinator);
+    MetaPuller.getInstance().init(metaGroupEngine);
+
+    // from the scope of the DataGroupEngine,it should be singleton pattern
+    // the way of setting MetaGroupMember in DataGroupEngine may need a better modification in
+    // future commit.
+    DataGroupEngine.setProtocolFactory(protocolFactory);
+    DataGroupEngine.setMetaGroupMember(metaGroupEngine);
+    dataGroupEngine = DataGroupEngine.getInstance();
+    clientManager =
+        new ClientManager(
+            ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
+            ClientManager.Type.RequestForwardClient);
+    initTasks();
+    try {
+      // we need to check config after initLocalEngines.
+      startServerCheck();
+    } catch (StartupException e) {
+      logger.error("Failed to check cluster config.", e);
+      stop();
+    }
+    JMXService.registerMBean(metaGroupEngine, metaGroupEngine.getMBeanName());
+  }
+
+  private void initTasks() {
+    reportThread = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("NodeReportThread");
+    reportThread.scheduleAtFixedRate(
+        this::generateNodeReport,
+        ClusterConstant.REPORT_INTERVAL_SEC,
+        ClusterConstant.REPORT_INTERVAL_SEC,
+        TimeUnit.SECONDS);

Review comment:
       I guess it is because we can re-enable the report during runtime, and it is not convenient to perform concurrency control when re-enabling it.

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
##########
@@ -179,75 +180,101 @@ public void setUp()
     IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(false);
     isPartitionEnabled = IoTDBDescriptor.getInstance().getConfig().isEnablePartition();
     IoTDBDescriptor.getInstance().getConfig().setEnablePartition(true);
-    testMetaGroupMember.setClientProvider(
-        new DataClientProvider(new Factory()) {
-          @Override
-          public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
-            return new AsyncDataClient(null, null, node, null) {
+    // TODO fixme: restore normal provider
+    ClusterIoTDB.getInstance()
+        .setClientManager(
+            new IClientManager() {

Review comment:
       I suggest it to be fixed now, otherwise, further tests may be interfered.

##########
File path: server/src/test/java/org/apache/iotdb/db/integration/IoTDBCheckConfigIT.java
##########
@@ -67,15 +67,15 @@ public void setUp() {
     EnvironmentUtils.closeStatMonitor();
     EnvironmentUtils.envSetUp();
 
-    final SecurityManager securityManager =
-        new SecurityManager() {
-          public void checkPermission(Permission permission) {
-            if (permission.getName().startsWith("exitVM")) {
-              throw new AccessControlException("Wrong system config");
-            }
-          }
-        };
-    System.setSecurityManager(securityManager);
+    //    final SecurityManager securityManager =
+    //        new SecurityManager() {
+    //          public void checkPermission(Permission permission) {
+    //            if (permission.getName().startsWith("exitVM")) {
+    //              throw new AccessControlException("Wrong system config");
+    //            }
+    //          }
+    //        };
+    //    System.setSecurityManager(securityManager);

Review comment:
       Remove commented code blocks.

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
##########
@@ -338,19 +343,20 @@ public void applyRemoveNode(RemoveNodeLog removeNodeLog) {
           }
 
           @Override
-          public DataClusterServer getDataClusterServer() {
+          public DataGroupEngine getDataGroupEngine() {
             return mockDataClusterServer
-                ? MetaGroupMemberTest.this.dataClusterServer
-                : super.getDataClusterServer();
+                ? MetaGroupMemberTest.this.dataGroupEngine
+                : ClusterIoTDB.getInstance().getDataGroupEngine();
           }
 
-          @Override
-          public DataHeartbeatServer getDataHeartbeatServer() {
-            return new DataHeartbeatServer(thisNode, dataClusterServer) {
-              @Override
-              public void start() {}
-            };
-          }
+          // TODO we remove a do-nothing DataHeartbeat here.
+          //          @Override
+          //          public DataHeartbeatServer getDataHeartbeatServer() {
+          //            return new DataHeartbeatServer(thisNode, dataGroupServiceImpls) {
+          //              @Override
+          //              public void start() {}
+          //            };
+          //          }

Review comment:
       Remove it if necessary.

##########
File path: cluster/src/test/java/org/apache/iotdb/cluster/client/ClientPoolFactoryTest.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+import org.apache.iotdb.cluster.utils.ClientUtils;
+
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public class ClientPoolFactoryTest {
+  private ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
+
+  private long mockMaxWaitTimeoutMs = 10 * 1000L;
+  private int mockMaxClientPerMember = 10;
+
+  private int maxClientPerNodePerMember = clusterConfig.getMaxClientPerNodePerMember();
+  private long waitClientTimeoutMS = clusterConfig.getWaitClientTimeoutMS();
+
+  private ClientPoolFactory clientPoolFactory;
+  private MockClientManager mockClientManager;
+
+  @Before
+  public void setUp() {
+    clusterConfig.setMaxClientPerNodePerMember(mockMaxClientPerMember);
+    clusterConfig.setWaitClientTimeoutMS(mockMaxWaitTimeoutMs);
+    clientPoolFactory = new ClientPoolFactory();
+    mockClientManager =
+        new MockClientManager() {
+          @Override
+          public void returnAsyncClient(
+              RaftService.AsyncClient client, Node node, ClientCategory category) {
+            assert (client == asyncClient);
+          }
+
+          @Override
+          public void returnSyncClient(
+              RaftService.Client client, Node node, ClientCategory category) {
+            Assert.assertTrue(client == syncClient);
+          }
+        };
+    clientPoolFactory.setClientManager(mockClientManager);
+  }
+
+  @After
+  public void tearDown() {
+    clusterConfig.setMaxClientPerNodePerMember(maxClientPerNodePerMember);
+    clusterConfig.setWaitClientTimeoutMS(waitClientTimeoutMS);
+  }
+
+  @Test
+  public void poolConfigTest() throws Exception {
+    GenericKeyedObjectPool<Node, RaftService.AsyncClient> pool =
+        clientPoolFactory.createAsyncDataPool(ClientCategory.DATA);
+    Node node = constructDefaultNode();
+
+    for (int i = 0; i < mockMaxClientPerMember; i++) {
+      RaftService.AsyncClient client = pool.borrowObject(node);
+      Assert.assertNotNull(client);
+    }
+
+    long timeStart = System.currentTimeMillis();
+    try {
+      pool.borrowObject(node);
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof NoSuchElementException);
+    } finally {
+      Assert.assertTrue(System.currentTimeMillis() - timeStart + 10 > mockMaxWaitTimeoutMs);
+    }
+  }
+
+  @Test
+  public void poolRecycleTest() throws Exception {
+    GenericKeyedObjectPool<Node, RaftService.AsyncClient> pool =
+        clientPoolFactory.createAsyncDataPool(ClientCategory.DATA);
+
+    Node node = constructDefaultNode();
+    List<RaftService.AsyncClient> clientList = new ArrayList<>();
+    for (int i = 0; i < pool.getMaxIdlePerKey(); i++) {
+      RaftService.AsyncClient client = pool.borrowObject(node);
+      Assert.assertNotNull(client);
+      clientList.add(client);
+    }
+
+    for (RaftService.AsyncClient client : clientList) {
+      pool.returnObject(node, client);
+    }
+
+    for (int i = 0; i < pool.getMaxIdlePerKey(); i++) {
+      RaftService.AsyncClient client = pool.borrowObject(node);
+      Assert.assertNotNull(client);
+      Assert.assertTrue(clientList.contains(client));
+    }
+  }
+
+  @Test
+  public void createAsyncDataClientTest() throws Exception {
+    GenericKeyedObjectPool<Node, RaftService.AsyncClient> pool =
+        clientPoolFactory.createAsyncDataPool(ClientCategory.DATA);
+
+    Assert.assertEquals(pool.getMaxTotalPerKey(), mockMaxClientPerMember);
+    Assert.assertEquals(pool.getMaxWaitDuration(), Duration.ofMillis(mockMaxWaitTimeoutMs));
+
+    RaftService.AsyncClient asyncClient = null;
+
+    Node node = constructDefaultNode();
+
+    asyncClient = pool.borrowObject(node);
+    mockClientManager.setAsyncClient(asyncClient);
+    Assert.assertNotNull(asyncClient);
+    Assert.assertTrue(asyncClient instanceof AsyncDataClient);

Review comment:
       What is the meaning of `mockClientManager.setAsyncClient(asyncClient);` here?

##########
File path: server/src/test/java/org/apache/iotdb/db/integration/IoTDBCheckConfigIT.java
##########
@@ -145,9 +140,7 @@ public void testSameTimeEncoderAfterStartService() throws Exception {
     try {
       IoTDBConfigCheck.getInstance().checkConfig();
     } catch (Throwable t) {
-      assertTrue(false);
-    } finally {
-      System.setSecurityManager(null);
+      fail("should have no configration errors");

Review comment:
       It would be better to provide the caught exception in the failing message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org