You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/26 08:33:34 UTC

[iotdb] branch stable-mpp created (now 115c832375)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch stable-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 115c832375 [IOTDB-3013] Using Client Pool to replace previous DataBlockServiceClientFactory and InternalServiceClientFactory

This branch includes the following new commits:

     new 115c832375 [IOTDB-3013] Using Client Pool to replace previous DataBlockServiceClientFactory and InternalServiceClientFactory

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [IOTDB-3013] Using Client Pool to replace previous DataBlockServiceClientFactory and InternalServiceClientFactory

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch stable-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 115c8323758a14ec054c1e60252f3977f25402c4
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Apr 26 16:31:58 2022 +0800

    [IOTDB-3013] Using Client Pool to replace previous DataBlockServiceClientFactory and InternalServiceClientFactory
---
 .../iotdb/db/mpp/buffer/DataBlockManager.java      |  17 +--
 .../iotdb/db/mpp/buffer/DataBlockService.java      |   8 +-
 .../mpp/buffer/DataBlockServiceClientFactory.java  | 117 ---------------------
 .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java |  31 ++++--
 .../apache/iotdb/db/mpp/buffer/SourceHandle.java   |  30 ++++--
 .../apache/iotdb/db/mpp/execution/Coordinator.java |  19 +++-
 .../iotdb/db/mpp/execution/QueryExecution.java     |  15 ++-
 .../scheduler/AbstractFragInsStateTracker.java     |  39 ++++---
 .../mpp/execution/scheduler/ClusterScheduler.java  |  14 ++-
 .../scheduler/FixedRateFragInsStateTracker.java    |  11 +-
 .../scheduler/InternalServiceClientFactory.java    | 112 --------------------
 .../scheduler/SimpleFragInstanceDispatcher.java    |  50 +++++----
 .../execution/scheduler/SimpleQueryTerminator.java |  31 ++++--
 .../execution/scheduler/StandaloneScheduler.java   |  12 ++-
 .../apache/iotdb/db/mpp/buffer/SinkHandleTest.java |  51 ++++++---
 .../iotdb/db/mpp/buffer/SourceHandleTest.java      |  65 ++++++++----
 .../iotdb/db/mpp/sql/plan/QueryPlannerTest.java    |  24 ++++-
 17 files changed, 300 insertions(+), 346 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index d55de6f780..e4d38df2f1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.db.mpp.buffer;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeDataBlockServiceClient;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
 import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
@@ -259,7 +261,8 @@ public class DataBlockManager implements IDataBlockManager {
   private final LocalMemoryManager localMemoryManager;
   private final Supplier<TsBlockSerde> tsBlockSerdeFactory;
   private final ExecutorService executorService;
-  private final DataBlockServiceClientFactory clientFactory;
+  private final IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient>
+      dataBlockServiceClientManager;
   private final Map<TFragmentInstanceId, Map<String, SourceHandle>> sourceHandles;
   private final Map<TFragmentInstanceId, SinkHandle> sinkHandles;
 
@@ -269,11 +272,11 @@ public class DataBlockManager implements IDataBlockManager {
       LocalMemoryManager localMemoryManager,
       Supplier<TsBlockSerde> tsBlockSerdeFactory,
       ExecutorService executorService,
-      DataBlockServiceClientFactory clientFactory) {
+      IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> dataBlockServiceClientManager) {
     this.localMemoryManager = Validate.notNull(localMemoryManager);
     this.tsBlockSerdeFactory = Validate.notNull(tsBlockSerdeFactory);
     this.executorService = Validate.notNull(executorService);
-    this.clientFactory = Validate.notNull(clientFactory);
+    this.dataBlockServiceClientManager = Validate.notNull(dataBlockServiceClientManager);
     sourceHandles = new ConcurrentHashMap<>();
     sinkHandles = new ConcurrentHashMap<>();
   }
@@ -311,9 +314,9 @@ public class DataBlockManager implements IDataBlockManager {
             localFragmentInstanceId,
             localMemoryManager,
             executorService,
-            clientFactory.getDataBlockServiceClient(remoteEndpoint),
             tsBlockSerdeFactory.get(),
-            new SinkHandleListenerImpl(instanceContext, instanceContext::failed));
+            new SinkHandleListenerImpl(instanceContext, instanceContext::failed),
+            dataBlockServiceClientManager);
     sinkHandles.put(localFragmentInstanceId, sinkHandle);
     return sinkHandle;
   }
@@ -349,9 +352,9 @@ public class DataBlockManager implements IDataBlockManager {
             localPlanNodeId,
             localMemoryManager,
             executorService,
-            clientFactory.getDataBlockServiceClient(remoteEndpoint),
             tsBlockSerdeFactory.get(),
-            new SourceHandleListenerImpl(onFailureCallback));
+            new SourceHandleListenerImpl(onFailureCallback),
+            dataBlockServiceClientManager);
     sourceHandles
         .computeIfAbsent(localFragmentInstanceId, key -> new ConcurrentHashMap<>())
         .put(localPlanNodeId, sourceHandle);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
index cabb725b95..7f8c6a6f5d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.db.mpp.buffer;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeDataBlockServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
@@ -26,6 +29,7 @@ import org.apache.iotdb.commons.exception.runtime.RPCServiceException;
 import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.service.ThriftService;
 import org.apache.iotdb.commons.service.ThriftServiceThread;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
@@ -67,7 +71,9 @@ public class DataBlockService extends ThriftService implements DataBlockServiceM
             new LocalMemoryManager(),
             new TsBlockSerdeFactory(),
             executorService,
-            new DataBlockServiceClientFactory());
+            new IClientManager.Factory<TEndPoint, SyncDataNodeDataBlockServiceClient>()
+                .createClientManager(
+                    new DataNodeClientPoolFactory.SyncDataNodeDataBlockServiceClientPoolFactory()));
     processor = new Processor<>(dataBlockManager.getOrCreateDataBlockServiceImpl());
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceClientFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceClientFactory.java
deleted file mode 100644
index a363fed2bb..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceClientFactory.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.mpp.buffer;
-
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
-import org.apache.iotdb.rpc.RpcTransportFactory;
-
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class DataBlockServiceClientFactory {
-
-  private static final int TIMEOUT_MS = 30_000;
-
-  private static final Logger logger = LoggerFactory.getLogger(DataBlockServiceClientFactory.class);
-
-  // TODO need to be replaced by mature client pool in the future
-  private static final Map<TEndPoint, DataBlockService.Iface> dataBlockServiceClientMap =
-      new ConcurrentHashMap<>();
-
-  public DataBlockService.Iface getDataBlockServiceClient(TEndPoint endpoint) {
-
-    return dataBlockServiceClientMap.computeIfAbsent(
-        endpoint,
-        address -> {
-          TTransport transport;
-          try {
-            transport =
-                RpcTransportFactory.INSTANCE.getTransport(
-                    // as there is a try-catch already, we do not need to use TSocket.wrap
-                    address.getIp(), address.getPort(), TIMEOUT_MS);
-            transport.open();
-            TProtocol protocol =
-                IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()
-                    ? new TCompactProtocol(transport)
-                    : new TBinaryProtocol(transport);
-            return newSynchronizedClient(new DataBlockService.Client(protocol));
-          } catch (TTransportException e) {
-            logger.error(
-                "error happened while creating mpp service client for {}:{}",
-                endpoint.getIp(),
-                endpoint.getPort(),
-                e);
-            throw new RuntimeException(e);
-          }
-        });
-  }
-
-  public static DataBlockService.Iface newSynchronizedClient(DataBlockService.Iface client) {
-    return (DataBlockService.Iface)
-        Proxy.newProxyInstance(
-            DataBlockServiceClientFactory.class.getClassLoader(),
-            new Class[] {DataBlockService.Iface.class},
-            new SynchronizedHandler(client));
-  }
-
-  private static class SynchronizedHandler implements InvocationHandler {
-
-    private final DataBlockService.Iface client;
-
-    public SynchronizedHandler(DataBlockService.Iface client) {
-      this.client = client;
-    }
-
-    @Override
-    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
-      try {
-        synchronized (client) {
-          return method.invoke(client, args);
-        }
-      } catch (InvocationTargetException e) {
-        // all IFace APIs throw TException
-        if (e.getTargetException() instanceof TException) {
-          throw e.getTargetException();
-        } else {
-          // should not happen
-          throw new TException(
-              "Error in calling method " + method.getName(), e.getTargetException());
-        }
-      } catch (Exception e) {
-        throw new TException("Error in calling method " + method.getName(), e);
-      }
-    }
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index 5f734747b5..18736727ed 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -20,9 +20,10 @@
 package org.apache.iotdb.db.mpp.buffer;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeDataBlockServiceClient;
 import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SinkHandleListener;
 import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
-import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
 import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.mpp.rpc.thrift.TNewDataBlockEvent;
@@ -60,7 +61,6 @@ public class SinkHandle implements ISinkHandle {
   private final TFragmentInstanceId localFragmentInstanceId;
   private final LocalMemoryManager localMemoryManager;
   private final ExecutorService executorService;
-  private final DataBlockService.Iface client;
   private final TsBlockSerde serde;
   private final SinkHandleListener sinkHandleListener;
 
@@ -69,6 +69,9 @@ public class SinkHandle implements ISinkHandle {
   //   2. Fast lookup.
   private final LinkedHashMap<Integer, TsBlock> sequenceIdToTsBlock = new LinkedHashMap<>();
 
+  private final IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient>
+      dataBlockServiceClientManager;
+
   private volatile ListenableFuture<Void> blocked = immediateFuture(null);
   private int nextSequenceId = 0;
   private long bufferRetainedSizeInBytes = 0;
@@ -82,18 +85,18 @@ public class SinkHandle implements ISinkHandle {
       TFragmentInstanceId localFragmentInstanceId,
       LocalMemoryManager localMemoryManager,
       ExecutorService executorService,
-      DataBlockService.Iface client,
       TsBlockSerde serde,
-      SinkHandleListener sinkHandleListener) {
+      SinkHandleListener sinkHandleListener,
+      IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> dataBlockServiceClientManager) {
     this.remoteEndpoint = Validate.notNull(remoteEndpoint);
     this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
     this.remotePlanNodeId = Validate.notNull(remotePlanNodeId);
     this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
     this.localMemoryManager = Validate.notNull(localMemoryManager);
     this.executorService = Validate.notNull(executorService);
-    this.client = Validate.notNull(client);
     this.serde = Validate.notNull(serde);
     this.sinkHandleListener = Validate.notNull(sinkHandleListener);
+    this.dataBlockServiceClientManager = dataBlockServiceClientManager;
   }
 
   @Override
@@ -152,7 +155,7 @@ public class SinkHandle implements ISinkHandle {
     throw new UnsupportedOperationException();
   }
 
-  private void sendEndOfDataBlockEvent() throws TException {
+  private void sendEndOfDataBlockEvent() throws Exception {
     logger.debug(
         "Send end of data block event to plan node {} of {}. {}",
         remotePlanNodeId,
@@ -167,10 +170,12 @@ public class SinkHandle implements ISinkHandle {
             nextSequenceId - 1);
     while (attempt < MAX_ATTEMPT_TIMES) {
       attempt += 1;
+      SyncDataNodeDataBlockServiceClient client = null;
       try {
+        client = dataBlockServiceClientManager.borrowClient(remoteEndpoint);
         client.onEndOfDataBlockEvent(endOfDataBlockEvent);
         break;
-      } catch (TException e) {
+      } catch (TException | IOException e) {
         logger.error(
             "Failed to send end of data block event to plan node {} of {} due to {}, attempt times: {}",
             remotePlanNodeId,
@@ -181,6 +186,10 @@ public class SinkHandle implements ISinkHandle {
         if (attempt == MAX_ATTEMPT_TIMES) {
           throw e;
         }
+      } finally {
+        if (client != null) {
+          client.returnSelf();
+        }
       }
     }
   }
@@ -193,7 +202,7 @@ public class SinkHandle implements ISinkHandle {
     }
     try {
       sendEndOfDataBlockEvent();
-    } catch (TException e) {
+    } catch (Exception e) {
       throw new RuntimeException("Send EndOfDataBlockEvent failed", e);
     }
     synchronized (this) {
@@ -356,7 +365,9 @@ public class SinkHandle implements ISinkHandle {
               blockSizes);
       while (attempt < MAX_ATTEMPT_TIMES) {
         attempt += 1;
+        SyncDataNodeDataBlockServiceClient client = null;
         try {
+          client = dataBlockServiceClientManager.borrowClient(remoteEndpoint);
           client.onNewDataBlockEvent(newDataBlockEvent);
           break;
         } catch (Throwable e) {
@@ -370,6 +381,10 @@ public class SinkHandle implements ISinkHandle {
           if (attempt == MAX_ATTEMPT_TIMES) {
             sinkHandleListener.onFailure(SinkHandle.this, e);
           }
+        } finally {
+          if (client != null) {
+            client.returnSelf();
+          }
         }
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index cbe5e9d540..10aaf4ee33 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -20,9 +20,10 @@
 package org.apache.iotdb.db.mpp.buffer;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeDataBlockServiceClient;
 import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SourceHandleListener;
 import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
-import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
 import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockRequest;
@@ -33,7 +34,6 @@ import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import org.apache.commons.lang3.Validate;
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,13 +59,15 @@ public class SourceHandle implements ISourceHandle {
   private final String localPlanNodeId;
   private final LocalMemoryManager localMemoryManager;
   private final ExecutorService executorService;
-  private final DataBlockService.Iface client;
   private final TsBlockSerde serde;
   private final SourceHandleListener sourceHandleListener;
 
   private final Map<Integer, TsBlock> sequenceIdToTsBlock = new HashMap<>();
   private final Map<Integer, Long> sequenceIdToDataBlockSize = new HashMap<>();
 
+  private final IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient>
+      dataBlockServiceClientManager;
+
   private volatile SettableFuture<Void> blocked = SettableFuture.create();
   private long bufferRetainedSizeInBytes = 0L;
   private int currSequenceId = 0;
@@ -80,19 +82,19 @@ public class SourceHandle implements ISourceHandle {
       String localPlanNodeId,
       LocalMemoryManager localMemoryManager,
       ExecutorService executorService,
-      DataBlockService.Iface client,
       TsBlockSerde serde,
-      SourceHandleListener sourceHandleListener) {
+      SourceHandleListener sourceHandleListener,
+      IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> dataBlockServiceClientManager) {
     this.remoteEndpoint = Validate.notNull(remoteEndpoint);
     this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
     this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
     this.localPlanNodeId = Validate.notNull(localPlanNodeId);
     this.localMemoryManager = Validate.notNull(localMemoryManager);
     this.executorService = Validate.notNull(executorService);
-    this.client = Validate.notNull(client);
     this.serde = Validate.notNull(serde);
     this.sourceHandleListener = Validate.notNull(sourceHandleListener);
     bufferRetainedSizeInBytes = 0L;
+    this.dataBlockServiceClientManager = dataBlockServiceClientManager;
   }
 
   @Override
@@ -315,7 +317,9 @@ public class SourceHandle implements ISourceHandle {
       int attempt = 0;
       while (attempt < MAX_ATTEMPT_TIMES) {
         attempt += 1;
+        SyncDataNodeDataBlockServiceClient client = null;
         try {
+          client = dataBlockServiceClientManager.borrowClient(remoteEndpoint);
           TGetDataBlockResponse resp = client.getDataBlock(req);
           List<TsBlock> tsBlocks = new ArrayList<>(resp.getTsBlocks().size());
           for (ByteBuffer byteBuffer : resp.getTsBlocks()) {
@@ -336,7 +340,7 @@ public class SourceHandle implements ISourceHandle {
           executorService.submit(
               new SendAcknowledgeDataBlockEventTask(startSequenceId, endSequenceId));
           break;
-        } catch (TException e) {
+        } catch (Throwable e) {
           logger.error(
               "Failed to get data block from {} due to {}, attempt times: {}",
               remoteFragmentInstanceId,
@@ -351,6 +355,10 @@ public class SourceHandle implements ISourceHandle {
               sourceHandleListener.onFailure(SourceHandle.this, e);
             }
           }
+        } finally {
+          if (client != null) {
+            client.returnSelf();
+          }
         }
       }
       // TODO: try to issue another GetDataBlocksTask to make the query run faster.
@@ -379,10 +387,12 @@ public class SourceHandle implements ISourceHandle {
           new TAcknowledgeDataBlockEvent(remoteFragmentInstanceId, startSequenceId, endSequenceId);
       while (attempt < MAX_ATTEMPT_TIMES) {
         attempt += 1;
+        SyncDataNodeDataBlockServiceClient client = null;
         try {
+          client = dataBlockServiceClientManager.borrowClient(remoteEndpoint);
           client.onAcknowledgeDataBlockEvent(acknowledgeDataBlockEvent);
           break;
-        } catch (TException e) {
+        } catch (Throwable e) {
           logger.error(
               "Failed to send ack data block event [{}, {}) to {} due to {}, attempt times: {}",
               startSequenceId,
@@ -395,6 +405,10 @@ public class SourceHandle implements ISourceHandle {
               sourceHandleListener.onFailure(SourceHandle.this, e);
             }
           }
+        } finally {
+          if (client != null) {
+            client.returnSelf();
+          }
         }
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index a11103df49..3a7c367317 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -19,7 +19,10 @@
 package org.apache.iotdb.db.mpp.execution;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
@@ -43,7 +46,7 @@ import java.util.concurrent.ScheduledExecutorService;
  * QueryExecution.
  */
 public class Coordinator {
-  private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(Coordinator.class);
 
   private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator";
   private static final int COORDINATOR_EXECUTOR_SIZE = 1;
@@ -55,6 +58,12 @@ public class Coordinator {
           IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
           IoTDBDescriptor.getInstance().getConfig().getInternalPort());
 
+  private static final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+      INTERNAL_SERVICE_CLIENT_MANAGER =
+          new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
+              .createClientManager(
+                  new DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+
   private final ExecutorService executor;
   private final ScheduledExecutorService scheduledExecutor;
 
@@ -78,7 +87,13 @@ public class Coordinator {
       return new ConfigExecution(queryContext, statement, executor);
     }
     return new QueryExecution(
-        statement, queryContext, executor, scheduledExecutor, partitionFetcher, schemaFetcher);
+        statement,
+        queryContext,
+        executor,
+        scheduledExecutor,
+        partitionFetcher,
+        schemaFetcher,
+        INTERNAL_SERVICE_CLIENT_MANAGER);
   }
 
   public ExecutionResult execute(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 67b4be045c..f0960163dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -19,6 +19,8 @@
 package org.apache.iotdb.db.mpp.execution;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.buffer.DataBlockService;
@@ -90,13 +92,17 @@ public class QueryExecution implements IQueryExecution {
   // We use this SourceHandle to fetch the TsBlock from it.
   private ISourceHandle resultHandle;
 
+  private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+      internalServiceClientManager;
+
   public QueryExecution(
       Statement statement,
       MPPQueryContext context,
       ExecutorService executor,
       ScheduledExecutorService scheduledExecutor,
       IPartitionFetcher partitionFetcher,
-      ISchemaFetcher schemaFetcher) {
+      ISchemaFetcher schemaFetcher,
+      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
     this.executor = executor;
     this.scheduledExecutor = scheduledExecutor;
     this.context = context;
@@ -105,6 +111,7 @@ public class QueryExecution implements IQueryExecution {
     this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
     this.partitionFetcher = partitionFetcher;
     this.schemaFetcher = schemaFetcher;
+    this.internalServiceClientManager = internalServiceClientManager;
 
     // We add the abort logic inside the QueryExecution.
     // So that the other components can only focus on the state change.
@@ -153,14 +160,16 @@ public class QueryExecution implements IQueryExecution {
                 distributedPlan.getInstances(),
                 context.getQueryType(),
                 executor,
-                scheduledExecutor)
+                scheduledExecutor,
+                internalServiceClientManager)
             : new StandaloneScheduler(
                 context,
                 stateMachine,
                 distributedPlan.getInstances(),
                 context.getQueryType(),
                 executor,
-                scheduledExecutor);
+                scheduledExecutor,
+                internalServiceClientManager);
     this.scheduler.start();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
index 71706b4857..89e48860fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
@@ -20,17 +20,19 @@
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
-import org.apache.iotdb.mpp.rpc.thrift.InternalService;
 import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp;
 
 import org.apache.thrift.TException;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
@@ -42,31 +44,44 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
   protected ScheduledExecutorService scheduledExecutor;
   protected List<FragmentInstance> instances;
 
+  private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+      internalServiceClientManager;
+
   public AbstractFragInsStateTracker(
       QueryStateMachine stateMachine,
       ExecutorService executor,
       ScheduledExecutorService scheduledExecutor,
-      List<FragmentInstance> instances) {
+      List<FragmentInstance> instances,
+      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
     this.stateMachine = stateMachine;
     this.executor = executor;
     this.scheduledExecutor = scheduledExecutor;
     this.instances = instances;
+    this.internalServiceClientManager = internalServiceClientManager;
   }
 
   public abstract void start();
 
   public abstract void abort();
 
-  protected FragmentInstanceState fetchState(FragmentInstance instance) throws TException {
-    // TODO (jackie tien) change the port
-    InternalService.Iface client =
-        InternalServiceClientFactory.getInternalServiceClient(
-            new TEndPoint(
-                instance.getHostEndpoint().getIp(),
-                IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
-    TFragmentInstanceStateResp resp =
-        client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
-    return FragmentInstanceState.valueOf(resp.state);
+  protected FragmentInstanceState fetchState(FragmentInstance instance)
+      throws TException, IOException {
+    SyncDataNodeInternalServiceClient client = null;
+    try {
+      // TODO: (jackie tien) change the port
+      TEndPoint endPoint =
+          new TEndPoint(
+              instance.getHostEndpoint().getIp(),
+              IoTDBDescriptor.getInstance().getConfig().getInternalPort());
+      client = internalServiceClientManager.borrowClient(endPoint);
+      TFragmentInstanceStateResp resp =
+          client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
+      return FragmentInstanceState.valueOf(resp.state);
+    } finally {
+      if (client != null) {
+        client.returnSelf();
+      }
+    }
   }
 
   private TFragmentInstanceId getTId(FragmentInstance instance) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index 18c9b47367..1ec7398d23 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -18,6 +18,9 @@
  */
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
@@ -67,18 +70,21 @@ public class ClusterScheduler implements IScheduler {
       List<FragmentInstance> instances,
       QueryType queryType,
       ExecutorService executor,
-      ScheduledExecutorService scheduledExecutor) {
+      ScheduledExecutorService scheduledExecutor,
+      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
     this.queryContext = queryContext;
     this.stateMachine = stateMachine;
     this.instances = instances;
     this.queryType = queryType;
     this.executor = executor;
     this.scheduledExecutor = scheduledExecutor;
-    this.dispatcher = new SimpleFragInstanceDispatcher(executor);
+    this.dispatcher = new SimpleFragInstanceDispatcher(executor, internalServiceClientManager);
     this.stateTracker =
-        new FixedRateFragInsStateTracker(stateMachine, executor, scheduledExecutor, instances);
+        new FixedRateFragInsStateTracker(
+            stateMachine, executor, scheduledExecutor, instances, internalServiceClientManager);
     this.queryTerminator =
-        new SimpleQueryTerminator(executor, queryContext.getQueryId(), instances);
+        new SimpleQueryTerminator(
+            executor, queryContext.getQueryId(), instances, internalServiceClientManager);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
index b9336f350e..4608b45e4e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
@@ -27,6 +30,7 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
@@ -45,8 +49,9 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
       QueryStateMachine stateMachine,
       ExecutorService executor,
       ScheduledExecutorService scheduledExecutor,
-      List<FragmentInstance> instances) {
-    super(stateMachine, executor, scheduledExecutor, instances);
+      List<FragmentInstance> instances,
+      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
+    super(stateMachine, executor, scheduledExecutor, instances, internalServiceClientManager);
   }
 
   @Override
@@ -72,7 +77,7 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
         if (state != null) {
           stateMachine.updateFragInstanceState(instance.getId(), state);
         }
-      } catch (TException e) {
+      } catch (TException | IOException e) {
         // TODO: do nothing ?
         logger.error("error happened while fetching query state", e);
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
deleted file mode 100644
index aa35c3854b..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.mpp.execution.scheduler;
-
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.mpp.rpc.thrift.InternalService;
-import org.apache.iotdb.rpc.RpcTransportFactory;
-
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class InternalServiceClientFactory {
-  private static final int TIMEOUT_MS = 10_000;
-
-  private static final Logger logger = LoggerFactory.getLogger(InternalServiceClientFactory.class);
-
-  // TODO need to be replaced by mature client pool in the future
-  private static final Map<TEndPoint, InternalService.Iface> internalServiceClientMap =
-      new ConcurrentHashMap<>();
-
-  public static InternalService.Iface getInternalServiceClient(TEndPoint endpoint)
-      throws TTransportException {
-    return internalServiceClientMap.computeIfAbsent(
-        endpoint,
-        address -> {
-          TTransport transport;
-          try {
-            transport =
-                RpcTransportFactory.INSTANCE.getTransport(
-                    // as there is a try-catch already, we do not need to use TSocket.wrap
-                    address.getIp(), address.getPort(), TIMEOUT_MS);
-            transport.open();
-
-          } catch (TTransportException e) {
-            logger.error(
-                "error happened while creating mpp service client for {}:{}",
-                endpoint.getIp(),
-                endpoint.getPort(),
-                e);
-            throw new RuntimeException(e);
-          }
-          TProtocol protocol = new TBinaryProtocol(transport);
-          return newSynchronizedClient(new InternalService.Client(protocol));
-        });
-  }
-
-  public static InternalService.Iface newSynchronizedClient(InternalService.Iface client) {
-    return (InternalService.Iface)
-        Proxy.newProxyInstance(
-            InternalServiceClientFactory.class.getClassLoader(),
-            new Class[] {InternalService.Iface.class},
-            new SynchronizedHandler(client));
-  }
-
-  private static class SynchronizedHandler implements InvocationHandler {
-
-    private final InternalService.Iface client;
-
-    public SynchronizedHandler(InternalService.Iface client) {
-      this.client = client;
-    }
-
-    @Override
-    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
-      try {
-        synchronized (client) {
-          return method.invoke(client, args);
-        }
-      } catch (InvocationTargetException e) {
-        // all IFace APIs throw TException
-        if (e.getTargetException() instanceof TException) {
-          throw e.getTargetException();
-        } else {
-          // should not happen
-          throw new TException(
-              "Error in calling method " + method.getName(), e.getTargetException());
-        }
-      } catch (Exception e) {
-        throw new TException("Error in calling method " + method.getName(), e);
-      }
-    }
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index b2e9252f93..e70addd9f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -21,9 +21,10 @@ package org.apache.iotdb.db.mpp.execution.scheduler;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
-import org.apache.iotdb.mpp.rpc.thrift.InternalService;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
@@ -40,8 +41,14 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
   private static final Logger LOGGER = LoggerFactory.getLogger(SimpleFragInstanceDispatcher.class);
   private final ExecutorService executor;
 
-  public SimpleFragInstanceDispatcher(ExecutorService exeutor) {
-    this.executor = exeutor;
+  private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+      internalServiceClientManager;
+
+  public SimpleFragInstanceDispatcher(
+      ExecutorService executor,
+      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
+    this.executor = executor;
+    this.internalServiceClientManager = internalServiceClientManager;
   }
 
   @Override
@@ -50,21 +57,28 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
         () -> {
           TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
           for (FragmentInstance instance : instances) {
-            // TODO: (jackie tien) change the port
-            InternalService.Iface client =
-                InternalServiceClientFactory.getInternalServiceClient(
-                    new TEndPoint(
-                        instance.getHostEndpoint().getIp(),
-                        IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
-            // TODO: (xingtanzjr) consider how to handle the buffer here
-            ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
-            instance.serializeRequest(buffer);
-            buffer.flip();
-            TConsensusGroupId groupId = instance.getRegionReplicaSet().getRegionId();
-            TSendFragmentInstanceReq req =
-                new TSendFragmentInstanceReq(
-                    new TFragmentInstance(buffer), groupId, instance.getType().toString());
-            resp = client.sendFragmentInstance(req);
+            SyncDataNodeInternalServiceClient client = null;
+            try {
+              // TODO: (jackie tien) change the port
+              client =
+                  internalServiceClientManager.borrowClient(
+                      new TEndPoint(
+                          instance.getHostEndpoint().getIp(),
+                          IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
+              // TODO: (xingtanzjr) consider how to handle the buffer here
+              ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
+              instance.serializeRequest(buffer);
+              buffer.flip();
+              TConsensusGroupId groupId = instance.getRegionReplicaSet().getRegionId();
+              TSendFragmentInstanceReq req =
+                  new TSendFragmentInstanceReq(
+                      new TFragmentInstance(buffer), groupId, instance.getType().toString());
+              resp = client.sendFragmentInstance(req);
+            } finally {
+              if (client != null) {
+                client.returnSelf();
+              }
+            }
             if (!resp.accepted) {
               break;
             }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
index 856d8d6e6d..abcaf1bfe9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
@@ -20,10 +20,11 @@
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
-import org.apache.iotdb.mpp.rpc.thrift.InternalService;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
 
 import org.apache.thrift.TException;
@@ -41,11 +42,18 @@ public class SimpleQueryTerminator implements IQueryTerminator {
   private final QueryId queryId;
   private final List<FragmentInstance> fragmentInstances;
 
+  private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+      internalServiceClientManager;
+
   public SimpleQueryTerminator(
-      ExecutorService executor, QueryId queryId, List<FragmentInstance> fragmentInstances) {
+      ExecutorService executor,
+      QueryId queryId,
+      List<FragmentInstance> fragmentInstances,
+      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
     this.executor = executor;
     this.queryId = queryId;
     this.fragmentInstances = fragmentInstances;
+    this.internalServiceClientManager = internalServiceClientManager;
   }
 
   @Override
@@ -57,12 +65,19 @@ public class SimpleQueryTerminator implements IQueryTerminator {
           try {
             for (TEndPoint endpoint : relatedHost) {
               // TODO (jackie tien) change the port
-              InternalService.Iface client =
-                  InternalServiceClientFactory.getInternalServiceClient(
-                      new TEndPoint(
-                          endpoint.getIp(),
-                          IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
-              client.cancelQuery(new TCancelQueryReq(queryId.getId()));
+              SyncDataNodeInternalServiceClient client = null;
+              try {
+                client =
+                    internalServiceClientManager.borrowClient(
+                        new TEndPoint(
+                            endpoint.getIp(),
+                            IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
+                client.cancelQuery(new TCancelQueryReq(queryId.getId()));
+              } finally {
+                if (client != null) {
+                  client.returnSelf();
+                }
+              }
             }
           } catch (TException e) {
             return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
index a1c31c99ab..8589f535d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
@@ -18,6 +18,9 @@
  */
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
@@ -64,16 +67,19 @@ public class StandaloneScheduler implements IScheduler {
       List<FragmentInstance> instances,
       QueryType queryType,
       ExecutorService executor,
-      ScheduledExecutorService scheduledExecutor) {
+      ScheduledExecutorService scheduledExecutor,
+      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
     this.queryContext = queryContext;
     this.instances = instances;
     this.queryType = queryType;
     this.executor = executor;
     this.scheduledExecutor = scheduledExecutor;
     this.stateTracker =
-        new FixedRateFragInsStateTracker(stateMachine, executor, scheduledExecutor, instances);
+        new FixedRateFragInsStateTracker(
+            stateMachine, executor, scheduledExecutor, instances, internalServiceClientManager);
     this.queryTerminator =
-        new SimpleQueryTerminator(executor, queryContext.getQueryId(), instances);
+        new SimpleQueryTerminator(
+            executor, queryContext.getQueryId(), instances, internalServiceClientManager);
   }
 
   @Override
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
index d45eae5db9..8ac22daac7 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
@@ -20,11 +20,12 @@
 package org.apache.iotdb.db.mpp.buffer;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeDataBlockServiceClient;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SinkHandleListener;
 import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.memory.MemoryPool;
-import org.apache.iotdb.mpp.rpc.thrift.DataBlockService.Client;
 import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.mpp.rpc.thrift.TNewDataBlockEvent;
@@ -58,16 +59,20 @@ public class SinkHandleTest {
     LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
     MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
     Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
+    IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> mockClientManager =
+        Mockito.mock(IClientManager.class);
     // Construct a mock client.
-    Client mockClient = Mockito.mock(Client.class);
+    SyncDataNodeDataBlockServiceClient mockClient =
+        Mockito.mock(SyncDataNodeDataBlockServiceClient.class);
     try {
+      Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
       Mockito.doNothing()
           .when(mockClient)
           .onEndOfDataBlockEvent(Mockito.any(TEndOfDataBlockEvent.class));
       Mockito.doNothing()
           .when(mockClient)
           .onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class));
-    } catch (TException e) {
+    } catch (TException | IOException e) {
       e.printStackTrace();
       Assert.fail();
     }
@@ -85,9 +90,9 @@ public class SinkHandleTest {
             localFragmentInstanceId,
             mockLocalMemoryManager,
             Executors.newSingleThreadExecutor(),
-            mockClient,
             Utils.createMockTsBlockSerde(mockTsBlockSize),
-            mockSinkHandleListener);
+            mockSinkHandleListener,
+            mockClientManager);
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isClosed());
@@ -185,16 +190,20 @@ public class SinkHandleTest {
     SinkHandleListener mockSinkHandleListener = Mockito.mock(SinkHandleListener.class);
     // Construct several mock TsBlock(s).
     List<TsBlock> mockTsBlocks = Utils.createMockTsBlocks(numOfMockTsBlock, mockTsBlockSize);
+    IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> mockClientManager =
+        Mockito.mock(IClientManager.class);
     // Construct a mock client.
-    Client mockClient = Mockito.mock(Client.class);
+    SyncDataNodeDataBlockServiceClient mockClient =
+        Mockito.mock(SyncDataNodeDataBlockServiceClient.class);
     try {
+      Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
       Mockito.doNothing()
           .when(mockClient)
           .onEndOfDataBlockEvent(Mockito.any(TEndOfDataBlockEvent.class));
       Mockito.doNothing()
           .when(mockClient)
           .onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class));
-    } catch (TException e) {
+    } catch (TException | IOException e) {
       e.printStackTrace();
       Assert.fail();
     }
@@ -208,9 +217,9 @@ public class SinkHandleTest {
             localFragmentInstanceId,
             mockLocalMemoryManager,
             Executors.newSingleThreadExecutor(),
-            mockClient,
             Utils.createMockTsBlockSerde(mockTsBlockSize),
-            mockSinkHandleListener);
+            mockSinkHandleListener,
+            mockClientManager);
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isClosed());
@@ -353,17 +362,21 @@ public class SinkHandleTest {
     SinkHandleListener mockSinkHandleListener = Mockito.mock(SinkHandleListener.class);
     // Construct several mock TsBlock(s).
     List<TsBlock> mockTsBlocks = Utils.createMockTsBlocks(numOfMockTsBlock, mockTsBlockSize);
+    IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> mockClientManager =
+        Mockito.mock(IClientManager.class);
     // Construct a mock client.
-    Client mockClient = Mockito.mock(Client.class);
+    SyncDataNodeDataBlockServiceClient mockClient =
+        Mockito.mock(SyncDataNodeDataBlockServiceClient.class);
     TException mockException = new TException("Mock exception");
     try {
+      Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
       Mockito.doThrow(mockException)
           .when(mockClient)
           .onEndOfDataBlockEvent(Mockito.any(TEndOfDataBlockEvent.class));
       Mockito.doThrow(mockException)
           .when(mockClient)
           .onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class));
-    } catch (TException e) {
+    } catch (TException | IOException e) {
       e.printStackTrace();
       Assert.fail();
     }
@@ -377,9 +390,9 @@ public class SinkHandleTest {
             localFragmentInstanceId,
             mockLocalMemoryManager,
             Executors.newSingleThreadExecutor(),
-            mockClient,
             Utils.createMockTsBlockSerde(mockTsBlockSize),
-            mockSinkHandleListener);
+            mockSinkHandleListener,
+            mockClientManager);
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isClosed());
@@ -454,16 +467,20 @@ public class SinkHandleTest {
     SinkHandleListener mockSinkHandleListener = Mockito.mock(SinkHandleListener.class);
     // Construct several mock TsBlock(s).
     List<TsBlock> mockTsBlocks = Utils.createMockTsBlocks(numOfMockTsBlock, mockTsBlockSize);
+    IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> mockClientManager =
+        Mockito.mock(IClientManager.class);
     // Construct a mock client.
-    Client mockClient = Mockito.mock(Client.class);
+    SyncDataNodeDataBlockServiceClient mockClient =
+        Mockito.mock(SyncDataNodeDataBlockServiceClient.class);
     try {
+      Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
       Mockito.doNothing()
           .when(mockClient)
           .onEndOfDataBlockEvent(Mockito.any(TEndOfDataBlockEvent.class));
       Mockito.doNothing()
           .when(mockClient)
           .onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class));
-    } catch (TException e) {
+    } catch (TException | IOException e) {
       e.printStackTrace();
       Assert.fail();
     }
@@ -477,9 +494,9 @@ public class SinkHandleTest {
             localFragmentInstanceId,
             mockLocalMemoryManager,
             Executors.newSingleThreadExecutor(),
-            mockClient,
             Utils.createMockTsBlockSerde(mockTsBlockSize),
-            mockSinkHandleListener);
+            mockSinkHandleListener,
+            mockClientManager);
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isClosed());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
index 7f53a2f4ab..87d76463db 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
@@ -20,11 +20,12 @@
 package org.apache.iotdb.db.mpp.buffer;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeDataBlockServiceClient;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SourceHandleListener;
 import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.memory.MemoryPool;
-import org.apache.iotdb.mpp.rpc.thrift.DataBlockService.Client;
 import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockRequest;
@@ -36,6 +37,7 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -45,7 +47,6 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 public class SourceHandleTest {
-
   @Test
   public void testNonBlockedOneTimeReceive() {
     final String queryId = "q0";
@@ -62,9 +63,13 @@ public class SourceHandleTest {
     LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
     MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
     Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
+    IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> mockClientManager =
+        Mockito.mock(IClientManager.class);
     // Construct a mock client.
-    Client mockClient = Mockito.mock(Client.class);
+    SyncDataNodeDataBlockServiceClient mockClient =
+        Mockito.mock(SyncDataNodeDataBlockServiceClient.class);
     try {
+      Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
       Mockito.doAnswer(
               invocation -> {
                 TGetDataBlockRequest req = invocation.getArgument(0);
@@ -77,7 +82,7 @@ public class SourceHandleTest {
               })
           .when(mockClient)
           .getDataBlock(Mockito.any(TGetDataBlockRequest.class));
-    } catch (TException e) {
+    } catch (TException | IOException e) {
       e.printStackTrace();
       Assert.fail();
     }
@@ -94,9 +99,9 @@ public class SourceHandleTest {
             localPlanNodeId,
             mockLocalMemoryManager,
             Executors.newSingleThreadExecutor(),
-            mockClient,
             mockTsBlockSerde,
-            mockSourceHandleListener);
+            mockSourceHandleListener,
+            mockClientManager);
     Assert.assertFalse(sourceHandle.isBlocked().isDone());
     Assert.assertFalse(sourceHandle.isClosed());
     Assert.assertFalse(sourceHandle.isFinished());
@@ -181,9 +186,13 @@ public class SourceHandleTest {
     MemoryPool spyMemoryPool =
         Mockito.spy(new MemoryPool("test", 10 * mockTsBlockSize, 5 * mockTsBlockSize));
     Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(spyMemoryPool);
+    IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> mockClientManager =
+        Mockito.mock(IClientManager.class);
     // Construct a mock client.
-    Client mockClient = Mockito.mock(Client.class);
+    SyncDataNodeDataBlockServiceClient mockClient =
+        Mockito.mock(SyncDataNodeDataBlockServiceClient.class);
     try {
+      Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
       Mockito.doAnswer(
               invocation -> {
                 TGetDataBlockRequest req = invocation.getArgument(0);
@@ -196,7 +205,7 @@ public class SourceHandleTest {
               })
           .when(mockClient)
           .getDataBlock(Mockito.any(TGetDataBlockRequest.class));
-    } catch (TException e) {
+    } catch (TException | IOException e) {
       e.printStackTrace();
       Assert.fail();
     }
@@ -213,9 +222,9 @@ public class SourceHandleTest {
             localPlanNodeId,
             mockLocalMemoryManager,
             Executors.newSingleThreadExecutor(),
-            mockClient,
             mockTsBlockSerde,
-            mockSourceHandleListener);
+            mockSourceHandleListener,
+            mockClientManager);
     Assert.assertFalse(sourceHandle.isBlocked().isDone());
     Assert.assertFalse(sourceHandle.isClosed());
     Assert.assertFalse(sourceHandle.isFinished());
@@ -326,9 +335,13 @@ public class SourceHandleTest {
     SourceHandleListener mockSourceHandleListener = Mockito.mock(SourceHandleListener.class);
     // Construct a mock TsBlockSerde that deserializes any bytebuffer into a mock TsBlock.
     TsBlockSerde mockTsBlockSerde = Utils.createMockTsBlockSerde(mockTsBlockSize);
+    IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> mockClientManager =
+        Mockito.mock(IClientManager.class);
     // Construct a mock client.
-    Client mockClient = Mockito.mock(Client.class);
+    SyncDataNodeDataBlockServiceClient mockClient =
+        Mockito.mock(SyncDataNodeDataBlockServiceClient.class);
     try {
+      Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
       Mockito.doAnswer(
               invocation -> {
                 TGetDataBlockRequest req = invocation.getArgument(0);
@@ -341,7 +354,7 @@ public class SourceHandleTest {
               })
           .when(mockClient)
           .getDataBlock(Mockito.any(TGetDataBlockRequest.class));
-    } catch (TException e) {
+    } catch (TException | IOException e) {
       e.printStackTrace();
       Assert.fail();
     }
@@ -354,9 +367,9 @@ public class SourceHandleTest {
             localPlanNodeId,
             mockLocalMemoryManager,
             Executors.newSingleThreadExecutor(),
-            mockClient,
             mockTsBlockSerde,
-            mockSourceHandleListener);
+            mockSourceHandleListener,
+            mockClientManager);
     Assert.assertFalse(sourceHandle.isBlocked().isDone());
     Assert.assertFalse(sourceHandle.isClosed());
     Assert.assertFalse(sourceHandle.isFinished());
@@ -509,14 +522,18 @@ public class SourceHandleTest {
     SourceHandleListener mockSourceHandleListener = Mockito.mock(SourceHandleListener.class);
     // Construct a mock TsBlockSerde that deserializes any bytebuffer into a mock TsBlock.
     TsBlockSerde mockTsBlockSerde = Utils.createMockTsBlockSerde(mockTsBlockSize);
+    IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> mockClientManager =
+        Mockito.mock(IClientManager.class);
     // Construct a mock client.
-    Client mockClient = Mockito.mock(Client.class);
+    SyncDataNodeDataBlockServiceClient mockClient =
+        Mockito.mock(SyncDataNodeDataBlockServiceClient.class);
     TException mockException = new TException("Mock exception");
     try {
+      Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
       Mockito.doThrow(mockException)
           .when(mockClient)
           .getDataBlock(Mockito.any(TGetDataBlockRequest.class));
-    } catch (TException e) {
+    } catch (TException | IOException e) {
       e.printStackTrace();
       Assert.fail();
     }
@@ -529,9 +546,9 @@ public class SourceHandleTest {
             localPlanNodeId,
             mockLocalMemoryManager,
             Executors.newSingleThreadExecutor(),
-            mockClient,
             mockTsBlockSerde,
-            mockSourceHandleListener);
+            mockSourceHandleListener,
+            mockClientManager);
     Future<?> blocked = sourceHandle.isBlocked();
     Assert.assertFalse(blocked.isDone());
     Assert.assertFalse(sourceHandle.isClosed());
@@ -579,9 +596,13 @@ public class SourceHandleTest {
     LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
     MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
     Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
+    IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> mockClientManager =
+        Mockito.mock(IClientManager.class);
     // Construct a mock client.
-    Client mockClient = Mockito.mock(Client.class);
+    SyncDataNodeDataBlockServiceClient mockClient =
+        Mockito.mock(SyncDataNodeDataBlockServiceClient.class);
     try {
+      Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
       Mockito.doAnswer(
               invocation -> {
                 TGetDataBlockRequest req = invocation.getArgument(0);
@@ -594,7 +615,7 @@ public class SourceHandleTest {
               })
           .when(mockClient)
           .getDataBlock(Mockito.any(TGetDataBlockRequest.class));
-    } catch (TException e) {
+    } catch (TException | IOException e) {
       e.printStackTrace();
       Assert.fail();
     }
@@ -611,9 +632,9 @@ public class SourceHandleTest {
             localPlanNodeId,
             mockLocalMemoryManager,
             Executors.newSingleThreadExecutor(),
-            mockClient,
             mockTsBlockSerde,
-            mockSourceHandleListener);
+            mockSourceHandleListener,
+            mockClientManager);
     Future<?> blocked = sourceHandle.isBlocked();
     Assert.assertFalse(blocked.isDone());
     Assert.assertFalse(blocked.isCancelled());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
index f10596f775..44e75fbbd7 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
@@ -20,7 +20,10 @@
 package org.apache.iotdb.db.mpp.sql.plan;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
@@ -32,6 +35,8 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
 
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -39,6 +44,22 @@ import java.time.ZoneId;
 
 public class QueryPlannerTest {
 
+  private static IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+      internalServiceClientManager;
+
+  @BeforeClass
+  public static void setUp() {
+    internalServiceClientManager =
+        new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
+            .createClientManager(
+                new DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+  }
+
+  @AfterClass
+  public static void destroy() {
+    internalServiceClientManager.close();
+  }
+
   @Ignore
   @Test
   public void TestSqlToDistributedPlan() {
@@ -55,7 +76,8 @@ public class QueryPlannerTest {
             IoTDBThreadPoolFactory.newSingleThreadExecutor("test_query"),
             IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("test_query_scheduled"),
             new FakePartitionFetcherImpl(),
-            new FakeSchemaFetcherImpl());
+            new FakeSchemaFetcherImpl(),
+            internalServiceClientManager);
     queryExecution.doLogicalPlan();
     System.out.printf("SQL: %s%n%n", querySql);
     System.out.println("===== Step 1: Logical Plan =====");