You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/04/24 07:48:53 UTC

[iotdb] 03/03: reduce rpc count for dispatch and thread count for internal service

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch ca
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5431cbfd6324afeaba81c722ccb45237a9e708b2
Author: OneSizeFitQuorum <ta...@apache.org>
AuthorDate: Mon Apr 24 00:59:29 2023 +0800

    reduce rpc count for dispatch and thread count for internal service
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
 .../db/mpp/plan/scheduler/AsyncPlanNodeSender.java | 54 ++++++++++++++--------
 .../plan/scheduler/AsyncSendPlanNodeHandler.java   | 27 ++++++-----
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 20 +++++---
 .../impl/DataNodeInternalRPCServiceImpl.java       | 35 ++++++++------
 .../DataNodeInternalRPCServiceImplTest.java        | 31 ++++++++-----
 thrift/src/main/thrift/datanode.thrift             | 14 ++++--
 6 files changed, 117 insertions(+), 64 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
index 29a5f80f62..0d6406c3f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
@@ -25,15 +25,18 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TPlanNode;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeBatchReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeSingleReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeSingleResp;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -43,38 +46,53 @@ import java.util.concurrent.atomic.AtomicLong;
 import static com.google.common.util.concurrent.Futures.immediateFuture;
 
 public class AsyncPlanNodeSender {
+
   private static final Logger logger = LoggerFactory.getLogger(AsyncPlanNodeSender.class);
   private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
       asyncInternalServiceClientManager;
   private final List<FragmentInstance> instances;
-  private final Map<Integer, TSendPlanNodeResp> instanceId2RespMap;
+
+  private final Map<TEndPoint, Pair<List<Integer>, TSendPlanNodeBatchReq>> batchRequests;
+  private final Map<Integer, TSendPlanNodeSingleResp> instanceId2RespMap;
   private final AtomicLong pendingNumber;
+  private final long startSendTime;
 
   public AsyncPlanNodeSender(
       IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
           asyncInternalServiceClientManager,
       List<FragmentInstance> instances) {
+    this.startSendTime = System.nanoTime();
     this.asyncInternalServiceClientManager = asyncInternalServiceClientManager;
     this.instances = instances;
-    this.instanceId2RespMap = new ConcurrentHashMap<>();
-    this.pendingNumber = new AtomicLong(instances.size());
+    this.batchRequests = new HashMap<>(instances.size());
+    for (int i = 0; i < instances.size(); i++) {
+      Pair<List<Integer>, TSendPlanNodeBatchReq> value =
+          this.batchRequests.computeIfAbsent(
+              instances.get(i).getHostDataNode().getInternalEndPoint(),
+              x -> new Pair<>(new ArrayList<>(), new TSendPlanNodeBatchReq()));
+      value.getLeft().add(i);
+      value
+          .getRight()
+          .addToRequests(
+              new TSendPlanNodeSingleReq(
+                  new TPlanNode(
+                      instances.get(i).getFragment().getPlanNodeTree().serializeToByteBuffer()),
+                  instances.get(i).getRegionReplicaSet().getRegionId()));
+    }
+    this.instanceId2RespMap = new ConcurrentHashMap<>(instances.size());
+    this.pendingNumber = new AtomicLong(batchRequests.keySet().size());
   }
 
   public void sendAll() {
-    long startSendTime = System.nanoTime();
-    for (int i = 0; i < instances.size(); ++i) {
-      FragmentInstance instance = instances.get(i);
+    for (Map.Entry<TEndPoint, Pair<List<Integer>, TSendPlanNodeBatchReq>> entry :
+        batchRequests.entrySet()) {
       AsyncSendPlanNodeHandler handler =
-          new AsyncSendPlanNodeHandler(i, pendingNumber, instanceId2RespMap, startSendTime);
+          new AsyncSendPlanNodeHandler(
+              entry.getValue().getLeft(), pendingNumber, instanceId2RespMap, startSendTime);
       try {
-        TSendPlanNodeReq sendPlanNodeReq =
-            new TSendPlanNodeReq(
-                new TPlanNode(instance.getFragment().getPlanNodeTree().serializeToByteBuffer()),
-                instance.getRegionReplicaSet().getRegionId());
         AsyncDataNodeInternalServiceClient client =
-            asyncInternalServiceClientManager.borrowClient(
-                instance.getHostDataNode().getInternalEndPoint());
-        client.sendPlanNode(sendPlanNodeReq, handler);
+            asyncInternalServiceClientManager.borrowClient(entry.getKey());
+        client.sendPlanNode(entry.getValue().getRight(), handler);
       } catch (Exception e) {
         handler.onError(e);
       }
@@ -92,7 +110,7 @@ public class AsyncPlanNodeSender {
   public List<TSStatus> getFailureStatusList() {
     List<TSStatus> failureStatusList = new ArrayList<>();
     TSStatus status;
-    for (Map.Entry<Integer, TSendPlanNodeResp> entry : instanceId2RespMap.entrySet()) {
+    for (Map.Entry<Integer, TSendPlanNodeSingleResp> entry : instanceId2RespMap.entrySet()) {
       status = entry.getValue().getStatus();
       if (!entry.getValue().accepted) {
         if (status == null) {
@@ -122,7 +140,7 @@ public class AsyncPlanNodeSender {
   }
 
   public Future<FragInstanceDispatchResult> getResult() {
-    for (Map.Entry<Integer, TSendPlanNodeResp> entry : instanceId2RespMap.entrySet()) {
+    for (Map.Entry<Integer, TSendPlanNodeSingleResp> entry : instanceId2RespMap.entrySet()) {
       if (!entry.getValue().accepted) {
         logger.warn(
             "dispatch write failed. status: {}, code: {}, message: {}, node {}",
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncSendPlanNodeHandler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncSendPlanNodeHandler.java
index 2bd50a6c7e..eab4bb5ea6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncSendPlanNodeHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncSendPlanNodeHandler.java
@@ -19,37 +19,42 @@
 package org.apache.iotdb.db.mpp.plan.scheduler;
 
 import org.apache.iotdb.commons.service.metric.enums.PerformanceOverviewMetrics;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeBatchResp;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeSingleResp;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.async.AsyncMethodCallback;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
-public class AsyncSendPlanNodeHandler implements AsyncMethodCallback<TSendPlanNodeResp> {
-  private final int instanceId;
+public class AsyncSendPlanNodeHandler implements AsyncMethodCallback<TSendPlanNodeBatchResp> {
+
+  private final List<Integer> instanceIds;
   private final AtomicLong pendingNumber;
-  private final Map<Integer, TSendPlanNodeResp> instanceId2RespMap;
+  private final Map<Integer, TSendPlanNodeSingleResp> instanceId2RespMap;
   private final long sendTime;
   private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
       PerformanceOverviewMetrics.getInstance();
 
   public AsyncSendPlanNodeHandler(
-      int instanceId,
+      List<Integer> instanceIds,
       AtomicLong pendingNumber,
-      Map<Integer, TSendPlanNodeResp> instanceId2RespMap,
+      Map<Integer, TSendPlanNodeSingleResp> instanceId2RespMap,
       long sendTime) {
-    this.instanceId = instanceId;
+    this.instanceIds = instanceIds;
     this.pendingNumber = pendingNumber;
     this.instanceId2RespMap = instanceId2RespMap;
     this.sendTime = sendTime;
   }
 
   @Override
-  public void onComplete(TSendPlanNodeResp tSendPlanNodeResp) {
-    instanceId2RespMap.put(instanceId, tSendPlanNodeResp);
+  public void onComplete(TSendPlanNodeBatchResp tSendPlanNodeResp) {
+    for (int i = 0; i < tSendPlanNodeResp.getResponses().size(); i++) {
+      instanceId2RespMap.put(instanceIds.get(i), tSendPlanNodeResp.getResponses().get(i));
+    }
     if (pendingNumber.decrementAndGet() == 0) {
       PERFORMANCE_OVERVIEW_METRICS.recordScheduleRemoteCost(System.nanoTime() - sendTime);
       synchronized (pendingNumber) {
@@ -60,13 +65,13 @@ public class AsyncSendPlanNodeHandler implements AsyncMethodCallback<TSendPlanNo
 
   @Override
   public void onError(Exception e) {
-    TSendPlanNodeResp resp = new TSendPlanNodeResp();
+    TSendPlanNodeSingleResp resp = new TSendPlanNodeSingleResp();
     String errorMsg = String.format("Fail to send plan node, exception message: %s", e);
     resp.setAccepted(false);
     resp.setMessage(errorMsg);
     resp.setStatus(
         RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg));
-    instanceId2RespMap.put(instanceId, resp);
+    instanceIds.forEach(instanceId -> instanceId2RespMap.put(instanceId, resp));
     if (pendingNumber.decrementAndGet() == 0) {
       PERFORMANCE_OVERVIEW_METRICS.recordScheduleRemoteCost(System.nanoTime() - sendTime);
       synchronized (pendingNumber) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index a471e9256a..9ba7e1d421 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -42,8 +42,9 @@ import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TPlanNode;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeBatchReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeSingleReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeSingleResp;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -52,6 +53,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -268,11 +270,15 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
           }
           break;
         case WRITE:
-          TSendPlanNodeReq sendPlanNodeReq =
-              new TSendPlanNodeReq(
-                  new TPlanNode(instance.getFragment().getPlanNodeTree().serializeToByteBuffer()),
-                  instance.getRegionReplicaSet().getRegionId());
-          TSendPlanNodeResp sendPlanNodeResp = client.sendPlanNode(sendPlanNodeReq);
+          TSendPlanNodeBatchReq sendPlanNodeReq =
+              new TSendPlanNodeBatchReq(
+                  Collections.singletonList(
+                      new TSendPlanNodeSingleReq(
+                          new TPlanNode(
+                              instance.getFragment().getPlanNodeTree().serializeToByteBuffer()),
+                          instance.getRegionReplicaSet().getRegionId())));
+          TSendPlanNodeSingleResp sendPlanNodeResp =
+              client.sendPlanNode(sendPlanNodeReq).getResponses().get(0);
           if (!sendPlanNodeResp.accepted) {
             logger.warn(
                 "dispatch write failed. status: {}, code: {}, message: {}, node {}",
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index f29dddcac5..54b2a7d27a 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -176,8 +176,9 @@ import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchRequest;
 import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchResponse;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeBatchReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeBatchResp;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeSingleResp;
 import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
@@ -290,18 +291,24 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
   }
 
   @Override
-  public TSendPlanNodeResp sendPlanNode(TSendPlanNodeReq req) {
-    LOGGER.debug("receive PlanNode to group[{}]", req.getConsensusGroupId());
-    ConsensusGroupId groupId =
-        ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
-    PlanNode planNode = PlanNodeType.deserialize(req.planNode.body);
-    RegionWriteExecutor executor = new RegionWriteExecutor();
-    TSendPlanNodeResp resp = new TSendPlanNodeResp();
-    RegionExecutionResult executionResult = executor.execute(groupId, planNode);
-    resp.setAccepted(executionResult.isAccepted());
-    resp.setMessage(executionResult.getMessage());
-    resp.setStatus(executionResult.getStatus());
-    return resp;
+  public TSendPlanNodeBatchResp sendPlanNode(TSendPlanNodeBatchReq req) {
+    TSendPlanNodeBatchResp responses = new TSendPlanNodeBatchResp();
+    req.getRequests()
+        .forEach(
+            request -> {
+              ConsensusGroupId groupId =
+                  ConsensusGroupId.Factory.createFromTConsensusGroupId(
+                      request.getConsensusGroupId());
+              PlanNode planNode = PlanNodeType.deserialize(request.planNode.body);
+              RegionWriteExecutor executor = new RegionWriteExecutor();
+              TSendPlanNodeSingleResp resp = new TSendPlanNodeSingleResp();
+              RegionExecutionResult executionResult = executor.execute(groupId, planNode);
+              resp.setAccepted(executionResult.isAccepted());
+              resp.setMessage(executionResult.getMessage());
+              resp.setStatus(executionResult.getStatus());
+              responses.addToResponses(resp);
+            });
+    return responses;
   }
 
   @Override
diff --git a/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
index c06fcf6007..43e9cc680b 100644
--- a/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
@@ -43,8 +43,9 @@ import org.apache.iotdb.db.service.thrift.impl.DataNodeInternalRPCServiceImpl;
 import org.apache.iotdb.db.service.thrift.impl.DataNodeRegionManager;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.mpp.rpc.thrift.TPlanNode;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeBatchReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeBatchResp;
+import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeSingleReq;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -61,11 +62,13 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 public class DataNodeInternalRPCServiceImplTest {
+
   private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
   DataNodeInternalRPCServiceImpl dataNodeInternalRPCServiceImpl;
   private static final int dataNodeId = 0;
@@ -145,16 +148,18 @@ public class DataNodeInternalRPCServiceImplTest {
     ByteBuffer byteBuffer = createTimeSeriesNode.serializeToByteBuffer();
 
     // put serialized planNode to TSendPlanNodeReq
-    TSendPlanNodeReq request = new TSendPlanNodeReq();
+    TSendPlanNodeSingleReq request = new TSendPlanNodeSingleReq();
     TPlanNode tPlanNode = new TPlanNode();
     tPlanNode.setBody(byteBuffer);
     request.setPlanNode(tPlanNode);
     request.setConsensusGroupId(regionReplicaSet.getRegionId());
 
     // Use consensus layer to execute request
-    TSendPlanNodeResp response = dataNodeInternalRPCServiceImpl.sendPlanNode(request);
+    TSendPlanNodeBatchResp response =
+        dataNodeInternalRPCServiceImpl.sendPlanNode(
+            new TSendPlanNodeBatchReq(Collections.singletonList(request)));
 
-    Assert.assertTrue(response.accepted);
+    Assert.assertTrue(response.getResponses().get(0).accepted);
   }
 
   @Test
@@ -221,16 +226,18 @@ public class DataNodeInternalRPCServiceImplTest {
     ByteBuffer byteBuffer = createAlignedTimeSeriesNode.serializeToByteBuffer();
 
     // put serialized planNode to TSendPlanNodeReq
-    TSendPlanNodeReq request = new TSendPlanNodeReq();
+    TSendPlanNodeSingleReq request = new TSendPlanNodeSingleReq();
     TPlanNode tPlanNode = new TPlanNode();
     tPlanNode.setBody(byteBuffer);
     request.setPlanNode(tPlanNode);
     request.setConsensusGroupId(regionReplicaSet.getRegionId());
 
     // Use consensus layer to execute request
-    TSendPlanNodeResp response = dataNodeInternalRPCServiceImpl.sendPlanNode(request);
+    TSendPlanNodeBatchResp response =
+        dataNodeInternalRPCServiceImpl.sendPlanNode(
+            new TSendPlanNodeBatchReq(Collections.singletonList(request)));
 
-    Assert.assertTrue(response.accepted);
+    Assert.assertTrue(response.getResponses().get(0).accepted);
   }
 
   @Test
@@ -308,16 +315,18 @@ public class DataNodeInternalRPCServiceImplTest {
     ByteBuffer byteBuffer = createMultiTimeSeriesNode.serializeToByteBuffer();
 
     // put serialized planNode to TSendPlanNodeReq
-    TSendPlanNodeReq request = new TSendPlanNodeReq();
+    TSendPlanNodeSingleReq request = new TSendPlanNodeSingleReq();
     TPlanNode tPlanNode = new TPlanNode();
     tPlanNode.setBody(byteBuffer);
     request.setPlanNode(tPlanNode);
     request.setConsensusGroupId(regionReplicaSet.getRegionId());
 
     // Use consensus layer to execute request
-    TSendPlanNodeResp response = dataNodeInternalRPCServiceImpl.sendPlanNode(request);
+    TSendPlanNodeBatchResp response =
+        dataNodeInternalRPCServiceImpl.sendPlanNode(
+            new TSendPlanNodeBatchReq(Collections.singletonList(request)));
 
-    Assert.assertTrue(response.accepted);
+    Assert.assertTrue(response.getResponses().get(0).accepted);
   }
 
   private TRegionReplicaSet genRegionReplicaSet() {
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index 73b631487b..c4c40eeed3 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -118,12 +118,20 @@ struct TSendFragmentInstanceResp {
   2: optional string message
 }
 
-struct TSendPlanNodeReq {
+struct TSendPlanNodeBatchReq {
+  1: required list<TSendPlanNodeSingleReq> requests;
+}
+
+struct TSendPlanNodeSingleReq {
   1: required TPlanNode planNode
   2: required common.TConsensusGroupId consensusGroupId
 }
 
-struct TSendPlanNodeResp {
+struct TSendPlanNodeBatchResp {
+  1: required list<TSendPlanNodeSingleResp> responses;
+}
+
+struct TSendPlanNodeSingleResp {
   1: required bool accepted
   2: optional string message
   3: optional common.TSStatus status
@@ -472,7 +480,7 @@ service IDataNodeRPCService {
   /**
   * dispatch PlanNode to remote node for write request in order to save resource
   */
-  TSendPlanNodeResp sendPlanNode(TSendPlanNodeReq req);
+  TSendPlanNodeBatchResp sendPlanNode(TSendPlanNodeBatchReq req);
 
   TFragmentInstanceInfoResp fetchFragmentInstanceInfo(TFetchFragmentInstanceInfoReq req);