You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/02/13 06:51:35 UTC

[iotdb] branch master updated: [IOTDB-5337] Parallelization of write operation in FragmentInstanceDispatcher (#8920)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a9168656d6 [IOTDB-5337] Parallelization of write operation in FragmentInstanceDispatcher (#8920)
a9168656d6 is described below

commit a9168656d680a4e2f37243adf9a226f92060a532
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Mon Feb 13 14:51:29 2023 +0800

    [IOTDB-5337] Parallelization of write operation in FragmentInstanceDispatcher (#8920)
---
 .../org/apache/iotdb/db/mpp/plan/Coordinator.java  |  14 ++-
 .../db/mpp/plan/execution/QueryExecution.java      |  18 +++-
 .../db/mpp/plan/scheduler/AsyncPlanNodeSender.java | 105 +++++++++++++++++++++
 .../plan/scheduler/AsyncSendPlanNodeHandler.java   |  61 ++++++++++++
 .../db/mpp/plan/scheduler/ClusterScheduler.java    |  12 ++-
 .../scheduler/FragmentInstanceDispatcherImpl.java  |  60 +++++++++++-
 .../iotdb/db/mpp/plan/plan/QueryPlannerTest.java   |  17 +++-
 7 files changed, 266 insertions(+), 21 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index 731b650dad..2a2ae6f0d8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
@@ -69,11 +70,17 @@ public class Coordinator {
       LoggerFactory.getLogger(IoTDBConstant.SLOW_SQL_LOGGER_NAME);
 
   private static final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
-      INTERNAL_SERVICE_CLIENT_MANAGER =
+      SYNC_INTERNAL_SERVICE_CLIENT_MANAGER =
           new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
               .createClientManager(
                   new ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
 
+  private static final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
+      ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER =
+          new IClientManager.Factory<TEndPoint, AsyncDataNodeInternalServiceClient>()
+              .createClientManager(
+                  new ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory());
+
   private final ExecutorService executor;
   private final ExecutorService writeOperationExecutor;
   private final ScheduledExecutorService scheduledExecutor;
@@ -112,7 +119,8 @@ public class Coordinator {
         scheduledExecutor,
         partitionFetcher,
         schemaFetcher,
-        INTERNAL_SERVICE_CLIENT_MANAGER);
+        SYNC_INTERNAL_SERVICE_CLIENT_MANAGER,
+        ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER);
   }
 
   public ExecutionResult execute(
@@ -222,7 +230,7 @@ public class Coordinator {
 
   public IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
       getInternalServiceClientManager() {
-    return INTERNAL_SERVICE_CLIENT_MANAGER;
+    return SYNC_INTERNAL_SERVICE_CLIENT_MANAGER;
   }
 
   public static Coordinator getInstance() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index b0326403b0..284a634444 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.execution;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -121,7 +122,10 @@ public class QueryExecution implements IQueryExecution {
   private ISourceHandle resultHandle;
 
   private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
-      internalServiceClientManager;
+      syncInternalServiceClientManager;
+
+  private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
+      asyncInternalServiceClientManager;
 
   private AtomicBoolean stopped;
 
@@ -137,7 +141,9 @@ public class QueryExecution implements IQueryExecution {
       ScheduledExecutorService scheduledExecutor,
       IPartitionFetcher partitionFetcher,
       ISchemaFetcher schemaFetcher,
-      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
+      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> syncInternalServiceClientManager,
+      IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
+          asyncInternalServiceClientManager) {
     this.rawStatement = statement;
     this.executor = executor;
     this.writeOperationExecutor = writeOperationExecutor;
@@ -148,7 +154,8 @@ public class QueryExecution implements IQueryExecution {
     this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
     this.partitionFetcher = partitionFetcher;
     this.schemaFetcher = schemaFetcher;
-    this.internalServiceClientManager = internalServiceClientManager;
+    this.syncInternalServiceClientManager = syncInternalServiceClientManager;
+    this.asyncInternalServiceClientManager = asyncInternalServiceClientManager;
 
     // We add the abort logic inside the QueryExecution.
     // So that the other components can only focus on the state change.
@@ -267,7 +274,7 @@ public class QueryExecution implements IQueryExecution {
     if (rawStatement instanceof LoadTsFileStatement) {
       this.scheduler =
           new LoadTsFileScheduler(
-              distributedPlan, context, stateMachine, internalServiceClientManager);
+              distributedPlan, context, stateMachine, syncInternalServiceClientManager);
       this.scheduler.start();
       return;
     }
@@ -282,7 +289,8 @@ public class QueryExecution implements IQueryExecution {
             executor,
             writeOperationExecutor,
             scheduledExecutor,
-            internalServiceClientManager);
+            syncInternalServiceClientManager,
+            asyncInternalServiceClientManager);
     this.scheduler.start();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
new file mode 100644
index 0000000000..957f638ed1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.scheduler;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
+import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.mpp.rpc.thrift.TPlanNode;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
+
+public class AsyncPlanNodeSender {
+  private static final Logger logger = LoggerFactory.getLogger(AsyncPlanNodeSender.class);
+  private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
+      asyncInternalServiceClientManager;
+  private final List<FragmentInstance> instances;
+  private final CountDownLatch countDownLatch;
+  private final Map<Integer, TSendPlanNodeResp> instanceId2RespMap;
+
+  public AsyncPlanNodeSender(
+      IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
+          asyncInternalServiceClientManager,
+      List<FragmentInstance> instances) {
+    this.asyncInternalServiceClientManager = asyncInternalServiceClientManager;
+    this.instances = instances;
+    this.countDownLatch = new CountDownLatch(instances.size());
+    this.instanceId2RespMap = new ConcurrentHashMap<>();
+  }
+
+  public void sendAll() {
+    for (int i = 0; i < instances.size(); ++i) {
+      FragmentInstance instance = instances.get(i);
+      AsyncSendPlanNodeHandler handler =
+          new AsyncSendPlanNodeHandler(i, countDownLatch, instanceId2RespMap);
+      try {
+        TSendPlanNodeReq sendPlanNodeReq =
+            new TSendPlanNodeReq(
+                new TPlanNode(instance.getFragment().getPlanNodeTree().serializeToByteBuffer()),
+                instance.getRegionReplicaSet().getRegionId());
+        AsyncDataNodeInternalServiceClient client =
+            asyncInternalServiceClientManager.borrowClient(
+                instance.getHostDataNode().getInternalEndPoint());
+        client.sendPlanNode(sendPlanNodeReq, handler);
+      } catch (Exception e) {
+        handler.onError(e);
+      }
+    }
+  }
+
+  public void waitUntilCompleted() throws InterruptedException {
+    countDownLatch.await();
+  }
+
+  public Future<FragInstanceDispatchResult> getResult() {
+    for (Map.Entry<Integer, TSendPlanNodeResp> entry : instanceId2RespMap.entrySet()) {
+      if (!entry.getValue().accepted) {
+        logger.warn(
+            "dispatch write failed. status: {}, code: {}, message: {}, node {}",
+            entry.getValue().status,
+            TSStatusCode.representOf(entry.getValue().status.code),
+            entry.getValue().message,
+            instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint());
+        if (entry.getValue().getStatus() == null) {
+          return immediateFuture(
+              new FragInstanceDispatchResult(
+                  RpcUtils.getStatus(
+                      TSStatusCode.WRITE_PROCESS_ERROR, entry.getValue().getMessage())));
+        } else {
+          return immediateFuture(new FragInstanceDispatchResult(entry.getValue().getStatus()));
+        }
+      }
+    }
+    return immediateFuture(new FragInstanceDispatchResult(true));
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncSendPlanNodeHandler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncSendPlanNodeHandler.java
new file mode 100644
index 0000000000..9b8f88b9ae
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncSendPlanNodeHandler.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.scheduler;
+
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+public class AsyncSendPlanNodeHandler implements AsyncMethodCallback<TSendPlanNodeResp> {
+  private final int instanceId;
+  private final CountDownLatch countDownLatch;
+  private final Map<Integer, TSendPlanNodeResp> instanceId2RespMap;
+
+  public AsyncSendPlanNodeHandler(
+      int instanceId,
+      CountDownLatch countDownLatch,
+      Map<Integer, TSendPlanNodeResp> instanceId2RespMap) {
+    this.instanceId = instanceId;
+    this.countDownLatch = countDownLatch;
+    this.instanceId2RespMap = instanceId2RespMap;
+  }
+
+  @Override
+  public void onComplete(TSendPlanNodeResp tSendPlanNodeResp) {
+    instanceId2RespMap.put(instanceId, tSendPlanNodeResp);
+    countDownLatch.countDown();
+  }
+
+  @Override
+  public void onError(Exception e) {
+    TSendPlanNodeResp resp = new TSendPlanNodeResp();
+    String errorMsg = String.format("Fail to send plan node, exception message: %s", e);
+    resp.setAccepted(false);
+    resp.setMessage(errorMsg);
+    resp.setStatus(
+        RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg));
+    instanceId2RespMap.put(instanceId, resp);
+    countDownLatch.countDown();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index 773f4b3165..d704726266 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
@@ -74,7 +75,9 @@ public class ClusterScheduler implements IScheduler {
       ExecutorService executor,
       ExecutorService writeOperationExecutor,
       ScheduledExecutorService scheduledExecutor,
-      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
+      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> syncInternalServiceClientManager,
+      IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
+          asyncInternalServiceClientManager) {
     this.stateMachine = stateMachine;
     this.instances = instances;
     this.queryType = queryType;
@@ -84,17 +87,18 @@ public class ClusterScheduler implements IScheduler {
             queryContext,
             executor,
             writeOperationExecutor,
-            internalServiceClientManager);
+            syncInternalServiceClientManager,
+            asyncInternalServiceClientManager);
     if (queryType == QueryType.READ) {
       this.stateTracker =
           new FixedRateFragInsStateTracker(
-              stateMachine, scheduledExecutor, instances, internalServiceClientManager);
+              stateMachine, scheduledExecutor, instances, syncInternalServiceClientManager);
       this.queryTerminator =
           new SimpleQueryTerminator(
               scheduledExecutor,
               queryContext,
               instances,
-              internalServiceClientManager,
+              syncInternalServiceClientManager,
               stateTracker);
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 0007d17f18..0425b4fc72 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -68,7 +69,9 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
   private final String localhostIpAddr;
   private final int localhostInternalPort;
   private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
-      internalServiceClientManager;
+      syncInternalServiceClientManager;
+  private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
+      asyncInternalServiceClientManager;
 
   private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
 
@@ -77,12 +80,15 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
       MPPQueryContext queryContext,
       ExecutorService executor,
       ExecutorService writeOperationExecutor,
-      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
+      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> syncInternalServiceClientManager,
+      IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
+          asyncInternalServiceClientManager) {
     this.type = type;
     this.queryContext = queryContext;
     this.executor = executor;
     this.writeOperationExecutor = writeOperationExecutor;
-    this.internalServiceClientManager = internalServiceClientManager;
+    this.syncInternalServiceClientManager = syncInternalServiceClientManager;
+    this.asyncInternalServiceClientManager = asyncInternalServiceClientManager;
     this.localhostIpAddr = IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
     this.localhostInternalPort = IoTDBDescriptor.getInstance().getConfig().getInternalPort();
   }
@@ -92,7 +98,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
     if (type == QueryType.READ) {
       return dispatchRead(instances);
     } else {
-      return dispatchWriteSync(instances);
+      return dispatchWriteAsync(instances);
     }
   }
 
@@ -154,6 +160,50 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
     }
   }
 
+  private Future<FragInstanceDispatchResult> dispatchWriteAsync(List<FragmentInstance> instances) {
+    // split local and remote instances
+    List<FragmentInstance> localInstances = new ArrayList<>();
+    List<FragmentInstance> remoteInstances = new ArrayList<>();
+    for (FragmentInstance instance : instances) {
+      TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
+      if (isDispatchedToLocal(endPoint)) {
+        localInstances.add(instance);
+      } else {
+        remoteInstances.add(instance);
+      }
+    }
+    // async dispatch to remote
+    AsyncPlanNodeSender asyncPlanNodeSender =
+        new AsyncPlanNodeSender(asyncInternalServiceClientManager, remoteInstances);
+    asyncPlanNodeSender.sendAll();
+    // sync dispatch to local
+    for (FragmentInstance localInstance : localInstances) {
+      try (SetThreadName threadName = new SetThreadName(localInstance.getId().getFullId())) {
+        dispatchOneInstance(localInstance);
+      } catch (FragmentInstanceDispatchException e) {
+        return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus()));
+      } catch (Throwable t) {
+        logger.warn("[DispatchFailed]", t);
+        return immediateFuture(
+            new FragInstanceDispatchResult(
+                RpcUtils.getStatus(
+                    TSStatusCode.INTERNAL_SERVER_ERROR, "Unexpected errors: " + t.getMessage())));
+      }
+    }
+    // wait until remote dispatch done
+    try {
+      asyncPlanNodeSender.waitUntilCompleted();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      logger.error("Interrupted when dispatching write async", e);
+      return immediateFuture(
+          new FragInstanceDispatchResult(
+              RpcUtils.getStatus(
+                  TSStatusCode.INTERNAL_SERVER_ERROR, "Interrupted errors: " + e.getMessage())));
+    }
+    return asyncPlanNodeSender.getResult();
+  }
+
   private void dispatchOneInstance(FragmentInstance instance)
       throws FragmentInstanceDispatchException {
     TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
@@ -171,7 +221,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
   private void dispatchRemote(FragmentInstance instance, TEndPoint endPoint)
       throws FragmentInstanceDispatchException {
     try (SyncDataNodeInternalServiceClient client =
-        internalServiceClientManager.borrowClient(endPoint)) {
+        syncInternalServiceClientManager.borrowClient(endPoint)) {
       switch (instance.getType()) {
         case READ:
           TSendFragmentInstanceReq sendFragmentInstanceReq =
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
index 5d0c47d011..20881e3a3a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.plan.plan;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
@@ -45,19 +46,26 @@ import java.time.ZoneId;
 public class QueryPlannerTest {
 
   private static IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
-      internalServiceClientManager;
+      syncInternalServiceClientManager;
+
+  private static IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
+      asyncInternalServiceClientManager;
 
   @BeforeClass
   public static void setUp() {
-    internalServiceClientManager =
+    syncInternalServiceClientManager =
         new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
             .createClientManager(
                 new ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+    asyncInternalServiceClientManager =
+        new IClientManager.Factory<TEndPoint, AsyncDataNodeInternalServiceClient>()
+            .createClientManager(
+                new ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory());
   }
 
   @AfterClass
   public static void destroy() {
-    internalServiceClientManager.close();
+    syncInternalServiceClientManager.close();
   }
 
   @Ignore
@@ -82,7 +90,8 @@ public class QueryPlannerTest {
             IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("test_query_scheduled"),
             new FakePartitionFetcherImpl(),
             new FakeSchemaFetcherImpl(),
-            internalServiceClientManager);
+            syncInternalServiceClientManager,
+            asyncInternalServiceClientManager);
     queryExecution.doLogicalPlan();
     System.out.printf("SQL: %s%n%n", querySql);
     System.out.println("===== Step 1: Logical Plan =====");