You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/26 08:33:35 UTC

[iotdb] 01/01: [IOTDB-3013] Using Client Pool to replace previous DataBlockServiceClientFactory and InternalServiceClientFactory

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch stable-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 115c8323758a14ec054c1e60252f3977f25402c4
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue Apr 26 16:31:58 2022 +0800

    [IOTDB-3013] Using Client Pool to replace previous DataBlockServiceClientFactory and InternalServiceClientFactory
---
 .../iotdb/db/mpp/buffer/DataBlockManager.java      |  17 +--
 .../iotdb/db/mpp/buffer/DataBlockService.java      |   8 +-
 .../mpp/buffer/DataBlockServiceClientFactory.java  | 117 ---------------------
 .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java |  31 ++++--
 .../apache/iotdb/db/mpp/buffer/SourceHandle.java   |  30 ++++--
 .../apache/iotdb/db/mpp/execution/Coordinator.java |  19 +++-
 .../iotdb/db/mpp/execution/QueryExecution.java     |  15 ++-
 .../scheduler/AbstractFragInsStateTracker.java     |  39 ++++---
 .../mpp/execution/scheduler/ClusterScheduler.java  |  14 ++-
 .../scheduler/FixedRateFragInsStateTracker.java    |  11 +-
 .../scheduler/InternalServiceClientFactory.java    | 112 --------------------
 .../scheduler/SimpleFragInstanceDispatcher.java    |  50 +++++----
 .../execution/scheduler/SimpleQueryTerminator.java |  31 ++++--
 .../execution/scheduler/StandaloneScheduler.java   |  12 ++-
 .../apache/iotdb/db/mpp/buffer/SinkHandleTest.java |  51 ++++++---
 .../iotdb/db/mpp/buffer/SourceHandleTest.java      |  65 ++++++++----
 .../iotdb/db/mpp/sql/plan/QueryPlannerTest.java    |  24 ++++-
 17 files changed, 300 insertions(+), 346 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
index d55de6f780..e4d38df2f1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.db.mpp.buffer;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeDataBlockServiceClient;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
 import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
@@ -259,7 +261,8 @@ public class DataBlockManager implements IDataBlockManager {
   private final LocalMemoryManager localMemoryManager;
   private final Supplier<TsBlockSerde> tsBlockSerdeFactory;
   private final ExecutorService executorService;
-  private final DataBlockServiceClientFactory clientFactory;
+  private final IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient>
+      dataBlockServiceClientManager;
   private final Map<TFragmentInstanceId, Map<String, SourceHandle>> sourceHandles;
   private final Map<TFragmentInstanceId, SinkHandle> sinkHandles;
 
@@ -269,11 +272,11 @@ public class DataBlockManager implements IDataBlockManager {
       LocalMemoryManager localMemoryManager,
       Supplier<TsBlockSerde> tsBlockSerdeFactory,
       ExecutorService executorService,
-      DataBlockServiceClientFactory clientFactory) {
+      IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> dataBlockServiceClientManager) {
     this.localMemoryManager = Validate.notNull(localMemoryManager);
     this.tsBlockSerdeFactory = Validate.notNull(tsBlockSerdeFactory);
     this.executorService = Validate.notNull(executorService);
-    this.clientFactory = Validate.notNull(clientFactory);
+    this.dataBlockServiceClientManager = Validate.notNull(dataBlockServiceClientManager);
     sourceHandles = new ConcurrentHashMap<>();
     sinkHandles = new ConcurrentHashMap<>();
   }
@@ -311,9 +314,9 @@ public class DataBlockManager implements IDataBlockManager {
             localFragmentInstanceId,
             localMemoryManager,
             executorService,
-            clientFactory.getDataBlockServiceClient(remoteEndpoint),
             tsBlockSerdeFactory.get(),
-            new SinkHandleListenerImpl(instanceContext, instanceContext::failed));
+            new SinkHandleListenerImpl(instanceContext, instanceContext::failed),
+            dataBlockServiceClientManager);
     sinkHandles.put(localFragmentInstanceId, sinkHandle);
     return sinkHandle;
   }
@@ -349,9 +352,9 @@ public class DataBlockManager implements IDataBlockManager {
             localPlanNodeId,
             localMemoryManager,
             executorService,
-            clientFactory.getDataBlockServiceClient(remoteEndpoint),
             tsBlockSerdeFactory.get(),
-            new SourceHandleListenerImpl(onFailureCallback));
+            new SourceHandleListenerImpl(onFailureCallback),
+            dataBlockServiceClientManager);
     sourceHandles
         .computeIfAbsent(localFragmentInstanceId, key -> new ConcurrentHashMap<>())
         .put(localPlanNodeId, sourceHandle);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
index cabb725b95..7f8c6a6f5d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockService.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.db.mpp.buffer;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeDataBlockServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
@@ -26,6 +29,7 @@ import org.apache.iotdb.commons.exception.runtime.RPCServiceException;
 import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.service.ThriftService;
 import org.apache.iotdb.commons.service.ThriftServiceThread;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
@@ -67,7 +71,9 @@ public class DataBlockService extends ThriftService implements DataBlockServiceM
             new LocalMemoryManager(),
             new TsBlockSerdeFactory(),
             executorService,
-            new DataBlockServiceClientFactory());
+            new IClientManager.Factory<TEndPoint, SyncDataNodeDataBlockServiceClient>()
+                .createClientManager(
+                    new DataNodeClientPoolFactory.SyncDataNodeDataBlockServiceClientPoolFactory()));
     processor = new Processor<>(dataBlockManager.getOrCreateDataBlockServiceImpl());
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceClientFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceClientFactory.java
deleted file mode 100644
index a363fed2bb..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceClientFactory.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.mpp.buffer;
-
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
-import org.apache.iotdb.rpc.RpcTransportFactory;
-
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class DataBlockServiceClientFactory {
-
-  private static final int TIMEOUT_MS = 30_000;
-
-  private static final Logger logger = LoggerFactory.getLogger(DataBlockServiceClientFactory.class);
-
-  // TODO need to be replaced by mature client pool in the future
-  private static final Map<TEndPoint, DataBlockService.Iface> dataBlockServiceClientMap =
-      new ConcurrentHashMap<>();
-
-  public DataBlockService.Iface getDataBlockServiceClient(TEndPoint endpoint) {
-
-    return dataBlockServiceClientMap.computeIfAbsent(
-        endpoint,
-        address -> {
-          TTransport transport;
-          try {
-            transport =
-                RpcTransportFactory.INSTANCE.getTransport(
-                    // as there is a try-catch already, we do not need to use TSocket.wrap
-                    address.getIp(), address.getPort(), TIMEOUT_MS);
-            transport.open();
-            TProtocol protocol =
-                IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()
-                    ? new TCompactProtocol(transport)
-                    : new TBinaryProtocol(transport);
-            return newSynchronizedClient(new DataBlockService.Client(protocol));
-          } catch (TTransportException e) {
-            logger.error(
-                "error happened while creating mpp service client for {}:{}",
-                endpoint.getIp(),
-                endpoint.getPort(),
-                e);
-            throw new RuntimeException(e);
-          }
-        });
-  }
-
-  public static DataBlockService.Iface newSynchronizedClient(DataBlockService.Iface client) {
-    return (DataBlockService.Iface)
-        Proxy.newProxyInstance(
-            DataBlockServiceClientFactory.class.getClassLoader(),
-            new Class[] {DataBlockService.Iface.class},
-            new SynchronizedHandler(client));
-  }
-
-  private static class SynchronizedHandler implements InvocationHandler {
-
-    private final DataBlockService.Iface client;
-
-    public SynchronizedHandler(DataBlockService.Iface client) {
-      this.client = client;
-    }
-
-    @Override
-    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
-      try {
-        synchronized (client) {
-          return method.invoke(client, args);
-        }
-      } catch (InvocationTargetException e) {
-        // all IFace APIs throw TException
-        if (e.getTargetException() instanceof TException) {
-          throw e.getTargetException();
-        } else {
-          // should not happen
-          throw new TException(
-              "Error in calling method " + method.getName(), e.getTargetException());
-        }
-      } catch (Exception e) {
-        throw new TException("Error in calling method " + method.getName(), e);
-      }
-    }
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
index 5f734747b5..18736727ed 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
@@ -20,9 +20,10 @@
 package org.apache.iotdb.db.mpp.buffer;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeDataBlockServiceClient;
 import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SinkHandleListener;
 import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
-import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
 import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.mpp.rpc.thrift.TNewDataBlockEvent;
@@ -60,7 +61,6 @@ public class SinkHandle implements ISinkHandle {
   private final TFragmentInstanceId localFragmentInstanceId;
   private final LocalMemoryManager localMemoryManager;
   private final ExecutorService executorService;
-  private final DataBlockService.Iface client;
   private final TsBlockSerde serde;
   private final SinkHandleListener sinkHandleListener;
 
@@ -69,6 +69,9 @@ public class SinkHandle implements ISinkHandle {
   //   2. Fast lookup.
   private final LinkedHashMap<Integer, TsBlock> sequenceIdToTsBlock = new LinkedHashMap<>();
 
+  private final IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient>
+      dataBlockServiceClientManager;
+
   private volatile ListenableFuture<Void> blocked = immediateFuture(null);
   private int nextSequenceId = 0;
   private long bufferRetainedSizeInBytes = 0;
@@ -82,18 +85,18 @@ public class SinkHandle implements ISinkHandle {
       TFragmentInstanceId localFragmentInstanceId,
       LocalMemoryManager localMemoryManager,
       ExecutorService executorService,
-      DataBlockService.Iface client,
       TsBlockSerde serde,
-      SinkHandleListener sinkHandleListener) {
+      SinkHandleListener sinkHandleListener,
+      IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> dataBlockServiceClientManager) {
     this.remoteEndpoint = Validate.notNull(remoteEndpoint);
     this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
     this.remotePlanNodeId = Validate.notNull(remotePlanNodeId);
     this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
     this.localMemoryManager = Validate.notNull(localMemoryManager);
     this.executorService = Validate.notNull(executorService);
-    this.client = Validate.notNull(client);
     this.serde = Validate.notNull(serde);
     this.sinkHandleListener = Validate.notNull(sinkHandleListener);
+    this.dataBlockServiceClientManager = dataBlockServiceClientManager;
   }
 
   @Override
@@ -152,7 +155,7 @@ public class SinkHandle implements ISinkHandle {
     throw new UnsupportedOperationException();
   }
 
-  private void sendEndOfDataBlockEvent() throws TException {
+  private void sendEndOfDataBlockEvent() throws Exception {
     logger.debug(
         "Send end of data block event to plan node {} of {}. {}",
         remotePlanNodeId,
@@ -167,10 +170,12 @@ public class SinkHandle implements ISinkHandle {
             nextSequenceId - 1);
     while (attempt < MAX_ATTEMPT_TIMES) {
       attempt += 1;
+      SyncDataNodeDataBlockServiceClient client = null;
       try {
+        client = dataBlockServiceClientManager.borrowClient(remoteEndpoint);
         client.onEndOfDataBlockEvent(endOfDataBlockEvent);
         break;
-      } catch (TException e) {
+      } catch (TException | IOException e) {
         logger.error(
             "Failed to send end of data block event to plan node {} of {} due to {}, attempt times: {}",
             remotePlanNodeId,
@@ -181,6 +186,10 @@ public class SinkHandle implements ISinkHandle {
         if (attempt == MAX_ATTEMPT_TIMES) {
           throw e;
         }
+      } finally {
+        if (client != null) {
+          client.returnSelf();
+        }
       }
     }
   }
@@ -193,7 +202,7 @@ public class SinkHandle implements ISinkHandle {
     }
     try {
       sendEndOfDataBlockEvent();
-    } catch (TException e) {
+    } catch (Exception e) {
       throw new RuntimeException("Send EndOfDataBlockEvent failed", e);
     }
     synchronized (this) {
@@ -356,7 +365,9 @@ public class SinkHandle implements ISinkHandle {
               blockSizes);
       while (attempt < MAX_ATTEMPT_TIMES) {
         attempt += 1;
+        SyncDataNodeDataBlockServiceClient client = null;
         try {
+          client = dataBlockServiceClientManager.borrowClient(remoteEndpoint);
           client.onNewDataBlockEvent(newDataBlockEvent);
           break;
         } catch (Throwable e) {
@@ -370,6 +381,10 @@ public class SinkHandle implements ISinkHandle {
           if (attempt == MAX_ATTEMPT_TIMES) {
             sinkHandleListener.onFailure(SinkHandle.this, e);
           }
+        } finally {
+          if (client != null) {
+            client.returnSelf();
+          }
         }
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
index cbe5e9d540..10aaf4ee33 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java
@@ -20,9 +20,10 @@
 package org.apache.iotdb.db.mpp.buffer;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeDataBlockServiceClient;
 import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SourceHandleListener;
 import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
-import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
 import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockRequest;
@@ -33,7 +34,6 @@ import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import org.apache.commons.lang3.Validate;
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,13 +59,15 @@ public class SourceHandle implements ISourceHandle {
   private final String localPlanNodeId;
   private final LocalMemoryManager localMemoryManager;
   private final ExecutorService executorService;
-  private final DataBlockService.Iface client;
   private final TsBlockSerde serde;
   private final SourceHandleListener sourceHandleListener;
 
   private final Map<Integer, TsBlock> sequenceIdToTsBlock = new HashMap<>();
   private final Map<Integer, Long> sequenceIdToDataBlockSize = new HashMap<>();
 
+  private final IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient>
+      dataBlockServiceClientManager;
+
   private volatile SettableFuture<Void> blocked = SettableFuture.create();
   private long bufferRetainedSizeInBytes = 0L;
   private int currSequenceId = 0;
@@ -80,19 +82,19 @@ public class SourceHandle implements ISourceHandle {
       String localPlanNodeId,
       LocalMemoryManager localMemoryManager,
       ExecutorService executorService,
-      DataBlockService.Iface client,
       TsBlockSerde serde,
-      SourceHandleListener sourceHandleListener) {
+      SourceHandleListener sourceHandleListener,
+      IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> dataBlockServiceClientManager) {
     this.remoteEndpoint = Validate.notNull(remoteEndpoint);
     this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
     this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
     this.localPlanNodeId = Validate.notNull(localPlanNodeId);
     this.localMemoryManager = Validate.notNull(localMemoryManager);
     this.executorService = Validate.notNull(executorService);
-    this.client = Validate.notNull(client);
     this.serde = Validate.notNull(serde);
     this.sourceHandleListener = Validate.notNull(sourceHandleListener);
     bufferRetainedSizeInBytes = 0L;
+    this.dataBlockServiceClientManager = dataBlockServiceClientManager;
   }
 
   @Override
@@ -315,7 +317,9 @@ public class SourceHandle implements ISourceHandle {
       int attempt = 0;
       while (attempt < MAX_ATTEMPT_TIMES) {
         attempt += 1;
+        SyncDataNodeDataBlockServiceClient client = null;
         try {
+          client = dataBlockServiceClientManager.borrowClient(remoteEndpoint);
           TGetDataBlockResponse resp = client.getDataBlock(req);
           List<TsBlock> tsBlocks = new ArrayList<>(resp.getTsBlocks().size());
           for (ByteBuffer byteBuffer : resp.getTsBlocks()) {
@@ -336,7 +340,7 @@ public class SourceHandle implements ISourceHandle {
           executorService.submit(
               new SendAcknowledgeDataBlockEventTask(startSequenceId, endSequenceId));
           break;
-        } catch (TException e) {
+        } catch (Throwable e) {
           logger.error(
               "Failed to get data block from {} due to {}, attempt times: {}",
               remoteFragmentInstanceId,
@@ -351,6 +355,10 @@ public class SourceHandle implements ISourceHandle {
               sourceHandleListener.onFailure(SourceHandle.this, e);
             }
           }
+        } finally {
+          if (client != null) {
+            client.returnSelf();
+          }
         }
       }
       // TODO: try to issue another GetDataBlocksTask to make the query run faster.
@@ -379,10 +387,12 @@ public class SourceHandle implements ISourceHandle {
           new TAcknowledgeDataBlockEvent(remoteFragmentInstanceId, startSequenceId, endSequenceId);
       while (attempt < MAX_ATTEMPT_TIMES) {
         attempt += 1;
+        SyncDataNodeDataBlockServiceClient client = null;
         try {
+          client = dataBlockServiceClientManager.borrowClient(remoteEndpoint);
           client.onAcknowledgeDataBlockEvent(acknowledgeDataBlockEvent);
           break;
-        } catch (TException e) {
+        } catch (Throwable e) {
           logger.error(
               "Failed to send ack data block event [{}, {}) to {} due to {}, attempt times: {}",
               startSequenceId,
@@ -395,6 +405,10 @@ public class SourceHandle implements ISourceHandle {
               sourceHandleListener.onFailure(SourceHandle.this, e);
             }
           }
+        } finally {
+          if (client != null) {
+            client.returnSelf();
+          }
         }
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index a11103df49..3a7c367317 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -19,7 +19,10 @@
 package org.apache.iotdb.db.mpp.execution;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
@@ -43,7 +46,7 @@ import java.util.concurrent.ScheduledExecutorService;
  * QueryExecution.
  */
 public class Coordinator {
-  private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(Coordinator.class);
 
   private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator";
   private static final int COORDINATOR_EXECUTOR_SIZE = 1;
@@ -55,6 +58,12 @@ public class Coordinator {
           IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
           IoTDBDescriptor.getInstance().getConfig().getInternalPort());
 
+  private static final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+      INTERNAL_SERVICE_CLIENT_MANAGER =
+          new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
+              .createClientManager(
+                  new DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+
   private final ExecutorService executor;
   private final ScheduledExecutorService scheduledExecutor;
 
@@ -78,7 +87,13 @@ public class Coordinator {
       return new ConfigExecution(queryContext, statement, executor);
     }
     return new QueryExecution(
-        statement, queryContext, executor, scheduledExecutor, partitionFetcher, schemaFetcher);
+        statement,
+        queryContext,
+        executor,
+        scheduledExecutor,
+        partitionFetcher,
+        schemaFetcher,
+        INTERNAL_SERVICE_CLIENT_MANAGER);
   }
 
   public ExecutionResult execute(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 67b4be045c..f0960163dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -19,6 +19,8 @@
 package org.apache.iotdb.db.mpp.execution;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.buffer.DataBlockService;
@@ -90,13 +92,17 @@ public class QueryExecution implements IQueryExecution {
   // We use this SourceHandle to fetch the TsBlock from it.
   private ISourceHandle resultHandle;
 
+  private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+      internalServiceClientManager;
+
   public QueryExecution(
       Statement statement,
       MPPQueryContext context,
       ExecutorService executor,
       ScheduledExecutorService scheduledExecutor,
       IPartitionFetcher partitionFetcher,
-      ISchemaFetcher schemaFetcher) {
+      ISchemaFetcher schemaFetcher,
+      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
     this.executor = executor;
     this.scheduledExecutor = scheduledExecutor;
     this.context = context;
@@ -105,6 +111,7 @@ public class QueryExecution implements IQueryExecution {
     this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
     this.partitionFetcher = partitionFetcher;
     this.schemaFetcher = schemaFetcher;
+    this.internalServiceClientManager = internalServiceClientManager;
 
     // We add the abort logic inside the QueryExecution.
     // So that the other components can only focus on the state change.
@@ -153,14 +160,16 @@ public class QueryExecution implements IQueryExecution {
                 distributedPlan.getInstances(),
                 context.getQueryType(),
                 executor,
-                scheduledExecutor)
+                scheduledExecutor,
+                internalServiceClientManager)
             : new StandaloneScheduler(
                 context,
                 stateMachine,
                 distributedPlan.getInstances(),
                 context.getQueryType(),
                 executor,
-                scheduledExecutor);
+                scheduledExecutor,
+                internalServiceClientManager);
     this.scheduler.start();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
index 71706b4857..89e48860fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/AbstractFragInsStateTracker.java
@@ -20,17 +20,19 @@
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
-import org.apache.iotdb.mpp.rpc.thrift.InternalService;
 import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp;
 
 import org.apache.thrift.TException;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
@@ -42,31 +44,44 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
   protected ScheduledExecutorService scheduledExecutor;
   protected List<FragmentInstance> instances;
 
+  private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+      internalServiceClientManager;
+
   public AbstractFragInsStateTracker(
       QueryStateMachine stateMachine,
       ExecutorService executor,
       ScheduledExecutorService scheduledExecutor,
-      List<FragmentInstance> instances) {
+      List<FragmentInstance> instances,
+      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
     this.stateMachine = stateMachine;
     this.executor = executor;
     this.scheduledExecutor = scheduledExecutor;
     this.instances = instances;
+    this.internalServiceClientManager = internalServiceClientManager;
   }
 
   public abstract void start();
 
   public abstract void abort();
 
-  protected FragmentInstanceState fetchState(FragmentInstance instance) throws TException {
-    // TODO (jackie tien) change the port
-    InternalService.Iface client =
-        InternalServiceClientFactory.getInternalServiceClient(
-            new TEndPoint(
-                instance.getHostEndpoint().getIp(),
-                IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
-    TFragmentInstanceStateResp resp =
-        client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
-    return FragmentInstanceState.valueOf(resp.state);
+  protected FragmentInstanceState fetchState(FragmentInstance instance)
+      throws TException, IOException {
+    SyncDataNodeInternalServiceClient client = null;
+    try {
+      // TODO: (jackie tien) change the port
+      TEndPoint endPoint =
+          new TEndPoint(
+              instance.getHostEndpoint().getIp(),
+              IoTDBDescriptor.getInstance().getConfig().getInternalPort());
+      client = internalServiceClientManager.borrowClient(endPoint);
+      TFragmentInstanceStateResp resp =
+          client.fetchFragmentInstanceState(new TFetchFragmentInstanceStateReq(getTId(instance)));
+      return FragmentInstanceState.valueOf(resp.state);
+    } finally {
+      if (client != null) {
+        client.returnSelf();
+      }
+    }
   }
 
   private TFragmentInstanceId getTId(FragmentInstance instance) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index 18c9b47367..1ec7398d23 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -18,6 +18,9 @@
  */
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
@@ -67,18 +70,21 @@ public class ClusterScheduler implements IScheduler {
       List<FragmentInstance> instances,
       QueryType queryType,
       ExecutorService executor,
-      ScheduledExecutorService scheduledExecutor) {
+      ScheduledExecutorService scheduledExecutor,
+      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
     this.queryContext = queryContext;
     this.stateMachine = stateMachine;
     this.instances = instances;
     this.queryType = queryType;
     this.executor = executor;
     this.scheduledExecutor = scheduledExecutor;
-    this.dispatcher = new SimpleFragInstanceDispatcher(executor);
+    this.dispatcher = new SimpleFragInstanceDispatcher(executor, internalServiceClientManager);
     this.stateTracker =
-        new FixedRateFragInsStateTracker(stateMachine, executor, scheduledExecutor, instances);
+        new FixedRateFragInsStateTracker(
+            stateMachine, executor, scheduledExecutor, instances, internalServiceClientManager);
     this.queryTerminator =
-        new SimpleQueryTerminator(executor, queryContext.getQueryId(), instances);
+        new SimpleQueryTerminator(
+            executor, queryContext.getQueryId(), instances, internalServiceClientManager);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
index b9336f350e..4608b45e4e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/FixedRateFragInsStateTracker.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
@@ -27,6 +30,7 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
@@ -45,8 +49,9 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
       QueryStateMachine stateMachine,
       ExecutorService executor,
       ScheduledExecutorService scheduledExecutor,
-      List<FragmentInstance> instances) {
-    super(stateMachine, executor, scheduledExecutor, instances);
+      List<FragmentInstance> instances,
+      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
+    super(stateMachine, executor, scheduledExecutor, instances, internalServiceClientManager);
   }
 
   @Override
@@ -72,7 +77,7 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
         if (state != null) {
           stateMachine.updateFragInstanceState(instance.getId(), state);
         }
-      } catch (TException e) {
+      } catch (TException | IOException e) {
         // TODO: do nothing ?
         logger.error("error happened while fetching query state", e);
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
deleted file mode 100644
index aa35c3854b..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/InternalServiceClientFactory.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.mpp.execution.scheduler;
-
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.mpp.rpc.thrift.InternalService;
-import org.apache.iotdb.rpc.RpcTransportFactory;
-
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class InternalServiceClientFactory {
-  private static final int TIMEOUT_MS = 10_000;
-
-  private static final Logger logger = LoggerFactory.getLogger(InternalServiceClientFactory.class);
-
-  // TODO need to be replaced by mature client pool in the future
-  private static final Map<TEndPoint, InternalService.Iface> internalServiceClientMap =
-      new ConcurrentHashMap<>();
-
-  public static InternalService.Iface getInternalServiceClient(TEndPoint endpoint)
-      throws TTransportException {
-    return internalServiceClientMap.computeIfAbsent(
-        endpoint,
-        address -> {
-          TTransport transport;
-          try {
-            transport =
-                RpcTransportFactory.INSTANCE.getTransport(
-                    // as there is a try-catch already, we do not need to use TSocket.wrap
-                    address.getIp(), address.getPort(), TIMEOUT_MS);
-            transport.open();
-
-          } catch (TTransportException e) {
-            logger.error(
-                "error happened while creating mpp service client for {}:{}",
-                endpoint.getIp(),
-                endpoint.getPort(),
-                e);
-            throw new RuntimeException(e);
-          }
-          TProtocol protocol = new TBinaryProtocol(transport);
-          return newSynchronizedClient(new InternalService.Client(protocol));
-        });
-  }
-
-  public static InternalService.Iface newSynchronizedClient(InternalService.Iface client) {
-    return (InternalService.Iface)
-        Proxy.newProxyInstance(
-            InternalServiceClientFactory.class.getClassLoader(),
-            new Class[] {InternalService.Iface.class},
-            new SynchronizedHandler(client));
-  }
-
-  private static class SynchronizedHandler implements InvocationHandler {
-
-    private final InternalService.Iface client;
-
-    public SynchronizedHandler(InternalService.Iface client) {
-      this.client = client;
-    }
-
-    @Override
-    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
-      try {
-        synchronized (client) {
-          return method.invoke(client, args);
-        }
-      } catch (InvocationTargetException e) {
-        // all IFace APIs throw TException
-        if (e.getTargetException() instanceof TException) {
-          throw e.getTargetException();
-        } else {
-          // should not happen
-          throw new TException(
-              "Error in calling method " + method.getName(), e.getTargetException());
-        }
-      } catch (Exception e) {
-        throw new TException("Error in calling method " + method.getName(), e);
-      }
-    }
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
index b2e9252f93..e70addd9f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleFragInstanceDispatcher.java
@@ -21,9 +21,10 @@ package org.apache.iotdb.db.mpp.execution.scheduler;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
-import org.apache.iotdb.mpp.rpc.thrift.InternalService;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
@@ -40,8 +41,14 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
   private static final Logger LOGGER = LoggerFactory.getLogger(SimpleFragInstanceDispatcher.class);
   private final ExecutorService executor;
 
-  public SimpleFragInstanceDispatcher(ExecutorService exeutor) {
-    this.executor = exeutor;
+  private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+      internalServiceClientManager;
+
+  public SimpleFragInstanceDispatcher(
+      ExecutorService executor,
+      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
+    this.executor = executor;
+    this.internalServiceClientManager = internalServiceClientManager;
   }
 
   @Override
@@ -50,21 +57,28 @@ public class SimpleFragInstanceDispatcher implements IFragInstanceDispatcher {
         () -> {
           TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
           for (FragmentInstance instance : instances) {
-            // TODO: (jackie tien) change the port
-            InternalService.Iface client =
-                InternalServiceClientFactory.getInternalServiceClient(
-                    new TEndPoint(
-                        instance.getHostEndpoint().getIp(),
-                        IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
-            // TODO: (xingtanzjr) consider how to handle the buffer here
-            ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
-            instance.serializeRequest(buffer);
-            buffer.flip();
-            TConsensusGroupId groupId = instance.getRegionReplicaSet().getRegionId();
-            TSendFragmentInstanceReq req =
-                new TSendFragmentInstanceReq(
-                    new TFragmentInstance(buffer), groupId, instance.getType().toString());
-            resp = client.sendFragmentInstance(req);
+            SyncDataNodeInternalServiceClient client = null;
+            try {
+              // TODO: (jackie tien) change the port
+              client =
+                  internalServiceClientManager.borrowClient(
+                      new TEndPoint(
+                          instance.getHostEndpoint().getIp(),
+                          IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
+              // TODO: (xingtanzjr) consider how to handle the buffer here
+              ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
+              instance.serializeRequest(buffer);
+              buffer.flip();
+              TConsensusGroupId groupId = instance.getRegionReplicaSet().getRegionId();
+              TSendFragmentInstanceReq req =
+                  new TSendFragmentInstanceReq(
+                      new TFragmentInstance(buffer), groupId, instance.getType().toString());
+              resp = client.sendFragmentInstance(req);
+            } finally {
+              if (client != null) {
+                client.returnSelf();
+              }
+            }
             if (!resp.accepted) {
               break;
             }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
index 856d8d6e6d..abcaf1bfe9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/SimpleQueryTerminator.java
@@ -20,10 +20,11 @@
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
-import org.apache.iotdb.mpp.rpc.thrift.InternalService;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
 
 import org.apache.thrift.TException;
@@ -41,11 +42,18 @@ public class SimpleQueryTerminator implements IQueryTerminator {
   private final QueryId queryId;
   private final List<FragmentInstance> fragmentInstances;
 
+  private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+      internalServiceClientManager;
+
   public SimpleQueryTerminator(
-      ExecutorService executor, QueryId queryId, List<FragmentInstance> fragmentInstances) {
+      ExecutorService executor,
+      QueryId queryId,
+      List<FragmentInstance> fragmentInstances,
+      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
     this.executor = executor;
     this.queryId = queryId;
     this.fragmentInstances = fragmentInstances;
+    this.internalServiceClientManager = internalServiceClientManager;
   }
 
   @Override
@@ -57,12 +65,19 @@ public class SimpleQueryTerminator implements IQueryTerminator {
           try {
             for (TEndPoint endpoint : relatedHost) {
               // TODO (jackie tien) change the port
-              InternalService.Iface client =
-                  InternalServiceClientFactory.getInternalServiceClient(
-                      new TEndPoint(
-                          endpoint.getIp(),
-                          IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
-              client.cancelQuery(new TCancelQueryReq(queryId.getId()));
+              SyncDataNodeInternalServiceClient client = null;
+              try {
+                client =
+                    internalServiceClientManager.borrowClient(
+                        new TEndPoint(
+                            endpoint.getIp(),
+                            IoTDBDescriptor.getInstance().getConfig().getInternalPort()));
+                client.cancelQuery(new TCancelQueryReq(queryId.getId()));
+              } finally {
+                if (client != null) {
+                  client.returnSelf();
+                }
+              }
             }
           } catch (TException e) {
             return false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
index a1c31c99ab..8589f535d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
@@ -18,6 +18,9 @@
  */
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
@@ -64,16 +67,19 @@ public class StandaloneScheduler implements IScheduler {
       List<FragmentInstance> instances,
       QueryType queryType,
       ExecutorService executor,
-      ScheduledExecutorService scheduledExecutor) {
+      ScheduledExecutorService scheduledExecutor,
+      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
     this.queryContext = queryContext;
     this.instances = instances;
     this.queryType = queryType;
     this.executor = executor;
     this.scheduledExecutor = scheduledExecutor;
     this.stateTracker =
-        new FixedRateFragInsStateTracker(stateMachine, executor, scheduledExecutor, instances);
+        new FixedRateFragInsStateTracker(
+            stateMachine, executor, scheduledExecutor, instances, internalServiceClientManager);
     this.queryTerminator =
-        new SimpleQueryTerminator(executor, queryContext.getQueryId(), instances);
+        new SimpleQueryTerminator(
+            executor, queryContext.getQueryId(), instances, internalServiceClientManager);
   }
 
   @Override
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
index d45eae5db9..8ac22daac7 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SinkHandleTest.java
@@ -20,11 +20,12 @@
 package org.apache.iotdb.db.mpp.buffer;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeDataBlockServiceClient;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SinkHandleListener;
 import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.memory.MemoryPool;
-import org.apache.iotdb.mpp.rpc.thrift.DataBlockService.Client;
 import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.mpp.rpc.thrift.TNewDataBlockEvent;
@@ -58,16 +59,20 @@ public class SinkHandleTest {
     LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
     MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
     Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
+    IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> mockClientManager =
+        Mockito.mock(IClientManager.class);
     // Construct a mock client.
-    Client mockClient = Mockito.mock(Client.class);
+    SyncDataNodeDataBlockServiceClient mockClient =
+        Mockito.mock(SyncDataNodeDataBlockServiceClient.class);
     try {
+      Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
       Mockito.doNothing()
           .when(mockClient)
           .onEndOfDataBlockEvent(Mockito.any(TEndOfDataBlockEvent.class));
       Mockito.doNothing()
           .when(mockClient)
           .onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class));
-    } catch (TException e) {
+    } catch (TException | IOException e) {
       e.printStackTrace();
       Assert.fail();
     }
@@ -85,9 +90,9 @@ public class SinkHandleTest {
             localFragmentInstanceId,
             mockLocalMemoryManager,
             Executors.newSingleThreadExecutor(),
-            mockClient,
             Utils.createMockTsBlockSerde(mockTsBlockSize),
-            mockSinkHandleListener);
+            mockSinkHandleListener,
+            mockClientManager);
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isClosed());
@@ -185,16 +190,20 @@ public class SinkHandleTest {
     SinkHandleListener mockSinkHandleListener = Mockito.mock(SinkHandleListener.class);
     // Construct several mock TsBlock(s).
     List<TsBlock> mockTsBlocks = Utils.createMockTsBlocks(numOfMockTsBlock, mockTsBlockSize);
+    IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> mockClientManager =
+        Mockito.mock(IClientManager.class);
     // Construct a mock client.
-    Client mockClient = Mockito.mock(Client.class);
+    SyncDataNodeDataBlockServiceClient mockClient =
+        Mockito.mock(SyncDataNodeDataBlockServiceClient.class);
     try {
+      Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
       Mockito.doNothing()
           .when(mockClient)
           .onEndOfDataBlockEvent(Mockito.any(TEndOfDataBlockEvent.class));
       Mockito.doNothing()
           .when(mockClient)
           .onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class));
-    } catch (TException e) {
+    } catch (TException | IOException e) {
       e.printStackTrace();
       Assert.fail();
     }
@@ -208,9 +217,9 @@ public class SinkHandleTest {
             localFragmentInstanceId,
             mockLocalMemoryManager,
             Executors.newSingleThreadExecutor(),
-            mockClient,
             Utils.createMockTsBlockSerde(mockTsBlockSize),
-            mockSinkHandleListener);
+            mockSinkHandleListener,
+            mockClientManager);
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isClosed());
@@ -353,17 +362,21 @@ public class SinkHandleTest {
     SinkHandleListener mockSinkHandleListener = Mockito.mock(SinkHandleListener.class);
     // Construct several mock TsBlock(s).
     List<TsBlock> mockTsBlocks = Utils.createMockTsBlocks(numOfMockTsBlock, mockTsBlockSize);
+    IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> mockClientManager =
+        Mockito.mock(IClientManager.class);
     // Construct a mock client.
-    Client mockClient = Mockito.mock(Client.class);
+    SyncDataNodeDataBlockServiceClient mockClient =
+        Mockito.mock(SyncDataNodeDataBlockServiceClient.class);
     TException mockException = new TException("Mock exception");
     try {
+      Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
       Mockito.doThrow(mockException)
           .when(mockClient)
           .onEndOfDataBlockEvent(Mockito.any(TEndOfDataBlockEvent.class));
       Mockito.doThrow(mockException)
           .when(mockClient)
           .onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class));
-    } catch (TException e) {
+    } catch (TException | IOException e) {
       e.printStackTrace();
       Assert.fail();
     }
@@ -377,9 +390,9 @@ public class SinkHandleTest {
             localFragmentInstanceId,
             mockLocalMemoryManager,
             Executors.newSingleThreadExecutor(),
-            mockClient,
             Utils.createMockTsBlockSerde(mockTsBlockSize),
-            mockSinkHandleListener);
+            mockSinkHandleListener,
+            mockClientManager);
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isClosed());
@@ -454,16 +467,20 @@ public class SinkHandleTest {
     SinkHandleListener mockSinkHandleListener = Mockito.mock(SinkHandleListener.class);
     // Construct several mock TsBlock(s).
     List<TsBlock> mockTsBlocks = Utils.createMockTsBlocks(numOfMockTsBlock, mockTsBlockSize);
+    IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> mockClientManager =
+        Mockito.mock(IClientManager.class);
     // Construct a mock client.
-    Client mockClient = Mockito.mock(Client.class);
+    SyncDataNodeDataBlockServiceClient mockClient =
+        Mockito.mock(SyncDataNodeDataBlockServiceClient.class);
     try {
+      Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
       Mockito.doNothing()
           .when(mockClient)
           .onEndOfDataBlockEvent(Mockito.any(TEndOfDataBlockEvent.class));
       Mockito.doNothing()
           .when(mockClient)
           .onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class));
-    } catch (TException e) {
+    } catch (TException | IOException e) {
       e.printStackTrace();
       Assert.fail();
     }
@@ -477,9 +494,9 @@ public class SinkHandleTest {
             localFragmentInstanceId,
             mockLocalMemoryManager,
             Executors.newSingleThreadExecutor(),
-            mockClient,
             Utils.createMockTsBlockSerde(mockTsBlockSize),
-            mockSinkHandleListener);
+            mockSinkHandleListener,
+            mockClientManager);
     Assert.assertTrue(sinkHandle.isFull().isDone());
     Assert.assertFalse(sinkHandle.isFinished());
     Assert.assertFalse(sinkHandle.isClosed());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
index 7f53a2f4ab..87d76463db 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/buffer/SourceHandleTest.java
@@ -20,11 +20,12 @@
 package org.apache.iotdb.db.mpp.buffer;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeDataBlockServiceClient;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SourceHandleListener;
 import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.memory.MemoryPool;
-import org.apache.iotdb.mpp.rpc.thrift.DataBlockService.Client;
 import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockRequest;
@@ -36,6 +37,7 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -45,7 +47,6 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 public class SourceHandleTest {
-
   @Test
   public void testNonBlockedOneTimeReceive() {
     final String queryId = "q0";
@@ -62,9 +63,13 @@ public class SourceHandleTest {
     LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
     MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
     Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
+    IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> mockClientManager =
+        Mockito.mock(IClientManager.class);
     // Construct a mock client.
-    Client mockClient = Mockito.mock(Client.class);
+    SyncDataNodeDataBlockServiceClient mockClient =
+        Mockito.mock(SyncDataNodeDataBlockServiceClient.class);
     try {
+      Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
       Mockito.doAnswer(
               invocation -> {
                 TGetDataBlockRequest req = invocation.getArgument(0);
@@ -77,7 +82,7 @@ public class SourceHandleTest {
               })
           .when(mockClient)
           .getDataBlock(Mockito.any(TGetDataBlockRequest.class));
-    } catch (TException e) {
+    } catch (TException | IOException e) {
       e.printStackTrace();
       Assert.fail();
     }
@@ -94,9 +99,9 @@ public class SourceHandleTest {
             localPlanNodeId,
             mockLocalMemoryManager,
             Executors.newSingleThreadExecutor(),
-            mockClient,
             mockTsBlockSerde,
-            mockSourceHandleListener);
+            mockSourceHandleListener,
+            mockClientManager);
     Assert.assertFalse(sourceHandle.isBlocked().isDone());
     Assert.assertFalse(sourceHandle.isClosed());
     Assert.assertFalse(sourceHandle.isFinished());
@@ -181,9 +186,13 @@ public class SourceHandleTest {
     MemoryPool spyMemoryPool =
         Mockito.spy(new MemoryPool("test", 10 * mockTsBlockSize, 5 * mockTsBlockSize));
     Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(spyMemoryPool);
+    IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> mockClientManager =
+        Mockito.mock(IClientManager.class);
     // Construct a mock client.
-    Client mockClient = Mockito.mock(Client.class);
+    SyncDataNodeDataBlockServiceClient mockClient =
+        Mockito.mock(SyncDataNodeDataBlockServiceClient.class);
     try {
+      Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
       Mockito.doAnswer(
               invocation -> {
                 TGetDataBlockRequest req = invocation.getArgument(0);
@@ -196,7 +205,7 @@ public class SourceHandleTest {
               })
           .when(mockClient)
           .getDataBlock(Mockito.any(TGetDataBlockRequest.class));
-    } catch (TException e) {
+    } catch (TException | IOException e) {
       e.printStackTrace();
       Assert.fail();
     }
@@ -213,9 +222,9 @@ public class SourceHandleTest {
             localPlanNodeId,
             mockLocalMemoryManager,
             Executors.newSingleThreadExecutor(),
-            mockClient,
             mockTsBlockSerde,
-            mockSourceHandleListener);
+            mockSourceHandleListener,
+            mockClientManager);
     Assert.assertFalse(sourceHandle.isBlocked().isDone());
     Assert.assertFalse(sourceHandle.isClosed());
     Assert.assertFalse(sourceHandle.isFinished());
@@ -326,9 +335,13 @@ public class SourceHandleTest {
     SourceHandleListener mockSourceHandleListener = Mockito.mock(SourceHandleListener.class);
     // Construct a mock TsBlockSerde that deserializes any bytebuffer into a mock TsBlock.
     TsBlockSerde mockTsBlockSerde = Utils.createMockTsBlockSerde(mockTsBlockSize);
+    IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> mockClientManager =
+        Mockito.mock(IClientManager.class);
     // Construct a mock client.
-    Client mockClient = Mockito.mock(Client.class);
+    SyncDataNodeDataBlockServiceClient mockClient =
+        Mockito.mock(SyncDataNodeDataBlockServiceClient.class);
     try {
+      Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
       Mockito.doAnswer(
               invocation -> {
                 TGetDataBlockRequest req = invocation.getArgument(0);
@@ -341,7 +354,7 @@ public class SourceHandleTest {
               })
           .when(mockClient)
           .getDataBlock(Mockito.any(TGetDataBlockRequest.class));
-    } catch (TException e) {
+    } catch (TException | IOException e) {
       e.printStackTrace();
       Assert.fail();
     }
@@ -354,9 +367,9 @@ public class SourceHandleTest {
             localPlanNodeId,
             mockLocalMemoryManager,
             Executors.newSingleThreadExecutor(),
-            mockClient,
             mockTsBlockSerde,
-            mockSourceHandleListener);
+            mockSourceHandleListener,
+            mockClientManager);
     Assert.assertFalse(sourceHandle.isBlocked().isDone());
     Assert.assertFalse(sourceHandle.isClosed());
     Assert.assertFalse(sourceHandle.isFinished());
@@ -509,14 +522,18 @@ public class SourceHandleTest {
     SourceHandleListener mockSourceHandleListener = Mockito.mock(SourceHandleListener.class);
     // Construct a mock TsBlockSerde that deserializes any bytebuffer into a mock TsBlock.
     TsBlockSerde mockTsBlockSerde = Utils.createMockTsBlockSerde(mockTsBlockSize);
+    IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> mockClientManager =
+        Mockito.mock(IClientManager.class);
     // Construct a mock client.
-    Client mockClient = Mockito.mock(Client.class);
+    SyncDataNodeDataBlockServiceClient mockClient =
+        Mockito.mock(SyncDataNodeDataBlockServiceClient.class);
     TException mockException = new TException("Mock exception");
     try {
+      Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
       Mockito.doThrow(mockException)
           .when(mockClient)
           .getDataBlock(Mockito.any(TGetDataBlockRequest.class));
-    } catch (TException e) {
+    } catch (TException | IOException e) {
       e.printStackTrace();
       Assert.fail();
     }
@@ -529,9 +546,9 @@ public class SourceHandleTest {
             localPlanNodeId,
             mockLocalMemoryManager,
             Executors.newSingleThreadExecutor(),
-            mockClient,
             mockTsBlockSerde,
-            mockSourceHandleListener);
+            mockSourceHandleListener,
+            mockClientManager);
     Future<?> blocked = sourceHandle.isBlocked();
     Assert.assertFalse(blocked.isDone());
     Assert.assertFalse(sourceHandle.isClosed());
@@ -579,9 +596,13 @@ public class SourceHandleTest {
     LocalMemoryManager mockLocalMemoryManager = Mockito.mock(LocalMemoryManager.class);
     MemoryPool mockMemoryPool = Utils.createMockNonBlockedMemoryPool();
     Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
+    IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> mockClientManager =
+        Mockito.mock(IClientManager.class);
     // Construct a mock client.
-    Client mockClient = Mockito.mock(Client.class);
+    SyncDataNodeDataBlockServiceClient mockClient =
+        Mockito.mock(SyncDataNodeDataBlockServiceClient.class);
     try {
+      Mockito.when(mockClientManager.borrowClient(remoteEndpoint)).thenReturn(mockClient);
       Mockito.doAnswer(
               invocation -> {
                 TGetDataBlockRequest req = invocation.getArgument(0);
@@ -594,7 +615,7 @@ public class SourceHandleTest {
               })
           .when(mockClient)
           .getDataBlock(Mockito.any(TGetDataBlockRequest.class));
-    } catch (TException e) {
+    } catch (TException | IOException e) {
       e.printStackTrace();
       Assert.fail();
     }
@@ -611,9 +632,9 @@ public class SourceHandleTest {
             localPlanNodeId,
             mockLocalMemoryManager,
             Executors.newSingleThreadExecutor(),
-            mockClient,
             mockTsBlockSerde,
-            mockSourceHandleListener);
+            mockSourceHandleListener,
+            mockClientManager);
     Future<?> blocked = sourceHandle.isBlocked();
     Assert.assertFalse(blocked.isDone());
     Assert.assertFalse(blocked.isCancelled());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
index f10596f775..44e75fbbd7 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryPlannerTest.java
@@ -20,7 +20,10 @@
 package org.apache.iotdb.db.mpp.sql.plan;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.common.SessionInfo;
@@ -32,6 +35,8 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
 
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -39,6 +44,22 @@ import java.time.ZoneId;
 
 public class QueryPlannerTest {
 
+  private static IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+      internalServiceClientManager;
+
+  @BeforeClass
+  public static void setUp() {
+    internalServiceClientManager =
+        new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
+            .createClientManager(
+                new DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+  }
+
+  @AfterClass
+  public static void destroy() {
+    internalServiceClientManager.close();
+  }
+
   @Ignore
   @Test
   public void TestSqlToDistributedPlan() {
@@ -55,7 +76,8 @@ public class QueryPlannerTest {
             IoTDBThreadPoolFactory.newSingleThreadExecutor("test_query"),
             IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("test_query_scheduled"),
             new FakePartitionFetcherImpl(),
-            new FakeSchemaFetcherImpl());
+            new FakeSchemaFetcherImpl(),
+            internalServiceClientManager);
     queryExecution.doLogicalPlan();
     System.out.printf("SQL: %s%n%n", querySql);
     System.out.println("===== Step 1: Logical Plan =====");