You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/05/05 10:54:39 UTC

[iotdb] branch jira5840_cp created (now 721981167d3)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch jira5840_cp
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 721981167d3 combine rpc

This branch includes the following new commits:

     new 721981167d3 combine rpc

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: combine rpc

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch jira5840_cp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 721981167d3f31493cf303a42265c3c4c0ee42b1
Author: OneSizeFitQuorum <ta...@apache.org>
AuthorDate: Fri May 5 18:51:17 2023 +0800

    combine rpc
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
 .../db/mpp/plan/scheduler/AsyncPlanNodeSender.java | 54 ++++++++++++++--------
 .../plan/scheduler/AsyncSendPlanNodeHandler.java   | 27 ++++++-----
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 20 +++++---
 .../impl/DataNodeInternalRPCServiceImpl.java       | 36 +++++++++------
 .../DataNodeInternalRPCServiceImplTest.java        | 31 ++++++++-----
 thrift/src/main/thrift/datanode.thrift             | 16 +++++--
 6 files changed, 119 insertions(+), 65 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
index 29a5f80f628..c18d5fa4ffb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncPlanNodeSender.java
@@ -25,15 +25,18 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TPlanNode;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
+import org.apache.iotdb.mpp.rpc.thrift.TSendBatchPlanNodeReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendSinglePlanNodeReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendSinglePlanNodeResp;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -43,38 +46,53 @@ import java.util.concurrent.atomic.AtomicLong;
 import static com.google.common.util.concurrent.Futures.immediateFuture;
 
 public class AsyncPlanNodeSender {
+
   private static final Logger logger = LoggerFactory.getLogger(AsyncPlanNodeSender.class);
   private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
       asyncInternalServiceClientManager;
   private final List<FragmentInstance> instances;
-  private final Map<Integer, TSendPlanNodeResp> instanceId2RespMap;
+
+  private final Map<TEndPoint, Pair<List<Integer>, TSendBatchPlanNodeReq>> batchRequests;
+  private final Map<Integer, TSendSinglePlanNodeResp> instanceId2RespMap;
   private final AtomicLong pendingNumber;
+  private final long startSendTime;
 
   public AsyncPlanNodeSender(
       IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
           asyncInternalServiceClientManager,
       List<FragmentInstance> instances) {
+    this.startSendTime = System.nanoTime();
     this.asyncInternalServiceClientManager = asyncInternalServiceClientManager;
     this.instances = instances;
-    this.instanceId2RespMap = new ConcurrentHashMap<>();
-    this.pendingNumber = new AtomicLong(instances.size());
+    this.batchRequests = new HashMap<>();
+    for (int i = 0; i < instances.size(); i++) {
+      Pair<List<Integer>, TSendBatchPlanNodeReq> value =
+          this.batchRequests.computeIfAbsent(
+              instances.get(i).getHostDataNode().getInternalEndPoint(),
+              x -> new Pair<>(new ArrayList<>(), new TSendBatchPlanNodeReq()));
+      value.getLeft().add(i);
+      value
+          .getRight()
+          .addToRequests(
+              new TSendSinglePlanNodeReq(
+                  new TPlanNode(
+                      instances.get(i).getFragment().getPlanNodeTree().serializeToByteBuffer()),
+                  instances.get(i).getRegionReplicaSet().getRegionId()));
+    }
+    this.instanceId2RespMap = new ConcurrentHashMap<>(instances.size() + 1, 1);
+    this.pendingNumber = new AtomicLong(batchRequests.keySet().size());
   }
 
   public void sendAll() {
-    long startSendTime = System.nanoTime();
-    for (int i = 0; i < instances.size(); ++i) {
-      FragmentInstance instance = instances.get(i);
+    for (Map.Entry<TEndPoint, Pair<List<Integer>, TSendBatchPlanNodeReq>> entry :
+        batchRequests.entrySet()) {
       AsyncSendPlanNodeHandler handler =
-          new AsyncSendPlanNodeHandler(i, pendingNumber, instanceId2RespMap, startSendTime);
+          new AsyncSendPlanNodeHandler(
+              entry.getValue().getLeft(), pendingNumber, instanceId2RespMap, startSendTime);
       try {
-        TSendPlanNodeReq sendPlanNodeReq =
-            new TSendPlanNodeReq(
-                new TPlanNode(instance.getFragment().getPlanNodeTree().serializeToByteBuffer()),
-                instance.getRegionReplicaSet().getRegionId());
         AsyncDataNodeInternalServiceClient client =
-            asyncInternalServiceClientManager.borrowClient(
-                instance.getHostDataNode().getInternalEndPoint());
-        client.sendPlanNode(sendPlanNodeReq, handler);
+            asyncInternalServiceClientManager.borrowClient(entry.getKey());
+        client.sendBatchPlanNode(entry.getValue().getRight(), handler);
       } catch (Exception e) {
         handler.onError(e);
       }
@@ -92,7 +110,7 @@ public class AsyncPlanNodeSender {
   public List<TSStatus> getFailureStatusList() {
     List<TSStatus> failureStatusList = new ArrayList<>();
     TSStatus status;
-    for (Map.Entry<Integer, TSendPlanNodeResp> entry : instanceId2RespMap.entrySet()) {
+    for (Map.Entry<Integer, TSendSinglePlanNodeResp> entry : instanceId2RespMap.entrySet()) {
       status = entry.getValue().getStatus();
       if (!entry.getValue().accepted) {
         if (status == null) {
@@ -122,7 +140,7 @@ public class AsyncPlanNodeSender {
   }
 
   public Future<FragInstanceDispatchResult> getResult() {
-    for (Map.Entry<Integer, TSendPlanNodeResp> entry : instanceId2RespMap.entrySet()) {
+    for (Map.Entry<Integer, TSendSinglePlanNodeResp> entry : instanceId2RespMap.entrySet()) {
       if (!entry.getValue().accepted) {
         logger.warn(
             "dispatch write failed. status: {}, code: {}, message: {}, node {}",
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncSendPlanNodeHandler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncSendPlanNodeHandler.java
index 2bd50a6c7ed..61c517ac73e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncSendPlanNodeHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AsyncSendPlanNodeHandler.java
@@ -19,37 +19,42 @@
 package org.apache.iotdb.db.mpp.plan.scheduler;
 
 import org.apache.iotdb.commons.service.metric.enums.PerformanceOverviewMetrics;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
+import org.apache.iotdb.mpp.rpc.thrift.TSendBatchPlanNodeResp;
+import org.apache.iotdb.mpp.rpc.thrift.TSendSinglePlanNodeResp;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.async.AsyncMethodCallback;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
-public class AsyncSendPlanNodeHandler implements AsyncMethodCallback<TSendPlanNodeResp> {
-  private final int instanceId;
+public class AsyncSendPlanNodeHandler implements AsyncMethodCallback<TSendBatchPlanNodeResp> {
+
+  private final List<Integer> instanceIds;
   private final AtomicLong pendingNumber;
-  private final Map<Integer, TSendPlanNodeResp> instanceId2RespMap;
+  private final Map<Integer, TSendSinglePlanNodeResp> instanceId2RespMap;
   private final long sendTime;
   private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
       PerformanceOverviewMetrics.getInstance();
 
   public AsyncSendPlanNodeHandler(
-      int instanceId,
+      List<Integer> instanceIds,
       AtomicLong pendingNumber,
-      Map<Integer, TSendPlanNodeResp> instanceId2RespMap,
+      Map<Integer, TSendSinglePlanNodeResp> instanceId2RespMap,
       long sendTime) {
-    this.instanceId = instanceId;
+    this.instanceIds = instanceIds;
     this.pendingNumber = pendingNumber;
     this.instanceId2RespMap = instanceId2RespMap;
     this.sendTime = sendTime;
   }
 
   @Override
-  public void onComplete(TSendPlanNodeResp tSendPlanNodeResp) {
-    instanceId2RespMap.put(instanceId, tSendPlanNodeResp);
+  public void onComplete(TSendBatchPlanNodeResp tSendPlanNodeResp) {
+    for (int i = 0; i < tSendPlanNodeResp.getResponses().size(); i++) {
+      instanceId2RespMap.put(instanceIds.get(i), tSendPlanNodeResp.getResponses().get(i));
+    }
     if (pendingNumber.decrementAndGet() == 0) {
       PERFORMANCE_OVERVIEW_METRICS.recordScheduleRemoteCost(System.nanoTime() - sendTime);
       synchronized (pendingNumber) {
@@ -60,13 +65,13 @@ public class AsyncSendPlanNodeHandler implements AsyncMethodCallback<TSendPlanNo
 
   @Override
   public void onError(Exception e) {
-    TSendPlanNodeResp resp = new TSendPlanNodeResp();
+    TSendSinglePlanNodeResp resp = new TSendSinglePlanNodeResp();
     String errorMsg = String.format("Fail to send plan node, exception message: %s", e);
     resp.setAccepted(false);
     resp.setMessage(errorMsg);
     resp.setStatus(
         RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg));
-    instanceId2RespMap.put(instanceId, resp);
+    instanceIds.forEach(instanceId -> instanceId2RespMap.put(instanceId, resp));
     if (pendingNumber.decrementAndGet() == 0) {
       PERFORMANCE_OVERVIEW_METRICS.recordScheduleRemoteCost(System.nanoTime() - sendTime);
       synchronized (pendingNumber) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index a471e9256aa..cb64acaf6bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -40,10 +40,11 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TPlanNode;
+import org.apache.iotdb.mpp.rpc.thrift.TSendBatchPlanNodeReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
+import org.apache.iotdb.mpp.rpc.thrift.TSendSinglePlanNodeReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendSinglePlanNodeResp;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -52,6 +53,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -268,11 +270,15 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
           }
           break;
         case WRITE:
-          TSendPlanNodeReq sendPlanNodeReq =
-              new TSendPlanNodeReq(
-                  new TPlanNode(instance.getFragment().getPlanNodeTree().serializeToByteBuffer()),
-                  instance.getRegionReplicaSet().getRegionId());
-          TSendPlanNodeResp sendPlanNodeResp = client.sendPlanNode(sendPlanNodeReq);
+          TSendBatchPlanNodeReq sendPlanNodeReq =
+              new TSendBatchPlanNodeReq(
+                  Collections.singletonList(
+                      new TSendSinglePlanNodeReq(
+                          new TPlanNode(
+                              instance.getFragment().getPlanNodeTree().serializeToByteBuffer()),
+                          instance.getRegionReplicaSet().getRegionId())));
+          TSendSinglePlanNodeResp sendPlanNodeResp =
+              client.sendBatchPlanNode(sendPlanNodeReq).getResponses().get(0);
           if (!sendPlanNodeResp.accepted) {
             logger.warn(
                 "dispatch write failed. status: {}, code: {}, message: {}, node {}",
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 76a03e517b1..80fc5c1c640 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -166,10 +166,11 @@ import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListWithTemplateReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchRequest;
 import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchResponse;
+import org.apache.iotdb.mpp.rpc.thrift.TSendBatchPlanNodeReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendBatchPlanNodeResp;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
+import org.apache.iotdb.mpp.rpc.thrift.TSendSinglePlanNodeResp;
 import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
@@ -272,18 +273,25 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface
   }
 
   @Override
-  public TSendPlanNodeResp sendPlanNode(TSendPlanNodeReq req) {
-    LOGGER.debug("receive PlanNode to group[{}]", req.getConsensusGroupId());
-    ConsensusGroupId groupId =
-        ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
-    PlanNode planNode = PlanNodeType.deserialize(req.planNode.body);
-    RegionWriteExecutor executor = new RegionWriteExecutor();
-    TSendPlanNodeResp resp = new TSendPlanNodeResp();
-    RegionExecutionResult executionResult = executor.execute(groupId, planNode);
-    resp.setAccepted(executionResult.isAccepted());
-    resp.setMessage(executionResult.getMessage());
-    resp.setStatus(executionResult.getStatus());
-    return resp;
+  public TSendBatchPlanNodeResp sendBatchPlanNode(TSendBatchPlanNodeReq req) {
+    List<TSendSinglePlanNodeResp> responses =
+        req.getRequests().stream()
+            .map(
+                request -> {
+                  ConsensusGroupId groupId =
+                      ConsensusGroupId.Factory.createFromTConsensusGroupId(
+                          request.getConsensusGroupId());
+                  PlanNode planNode = PlanNodeType.deserialize(request.planNode.body);
+                  RegionWriteExecutor executor = new RegionWriteExecutor();
+                  TSendSinglePlanNodeResp resp = new TSendSinglePlanNodeResp();
+                  RegionExecutionResult executionResult = executor.execute(groupId, planNode);
+                  resp.setAccepted(executionResult.isAccepted());
+                  resp.setMessage(executionResult.getMessage());
+                  resp.setStatus(executionResult.getStatus());
+                  return resp;
+                })
+            .collect(Collectors.toList());
+    return new TSendBatchPlanNodeResp(responses);
   }
 
   @Override
diff --git a/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
index c06fcf60078..6e183699289 100644
--- a/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
@@ -43,8 +43,9 @@ import org.apache.iotdb.db.service.thrift.impl.DataNodeInternalRPCServiceImpl;
 import org.apache.iotdb.db.service.thrift.impl.DataNodeRegionManager;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.mpp.rpc.thrift.TPlanNode;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
-import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
+import org.apache.iotdb.mpp.rpc.thrift.TSendBatchPlanNodeReq;
+import org.apache.iotdb.mpp.rpc.thrift.TSendBatchPlanNodeResp;
+import org.apache.iotdb.mpp.rpc.thrift.TSendSinglePlanNodeReq;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -61,11 +62,13 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 public class DataNodeInternalRPCServiceImplTest {
+
   private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
   DataNodeInternalRPCServiceImpl dataNodeInternalRPCServiceImpl;
   private static final int dataNodeId = 0;
@@ -145,16 +148,18 @@ public class DataNodeInternalRPCServiceImplTest {
     ByteBuffer byteBuffer = createTimeSeriesNode.serializeToByteBuffer();
 
     // put serialized planNode to TSendPlanNodeReq
-    TSendPlanNodeReq request = new TSendPlanNodeReq();
+    TSendSinglePlanNodeReq request = new TSendSinglePlanNodeReq();
     TPlanNode tPlanNode = new TPlanNode();
     tPlanNode.setBody(byteBuffer);
     request.setPlanNode(tPlanNode);
     request.setConsensusGroupId(regionReplicaSet.getRegionId());
 
     // Use consensus layer to execute request
-    TSendPlanNodeResp response = dataNodeInternalRPCServiceImpl.sendPlanNode(request);
+    TSendBatchPlanNodeResp response =
+        dataNodeInternalRPCServiceImpl.sendBatchPlanNode(
+            new TSendBatchPlanNodeReq(Collections.singletonList(request)));
 
-    Assert.assertTrue(response.accepted);
+    Assert.assertTrue(response.getResponses().get(0).accepted);
   }
 
   @Test
@@ -221,16 +226,18 @@ public class DataNodeInternalRPCServiceImplTest {
     ByteBuffer byteBuffer = createAlignedTimeSeriesNode.serializeToByteBuffer();
 
     // put serialized planNode to TSendPlanNodeReq
-    TSendPlanNodeReq request = new TSendPlanNodeReq();
+    TSendSinglePlanNodeReq request = new TSendSinglePlanNodeReq();
     TPlanNode tPlanNode = new TPlanNode();
     tPlanNode.setBody(byteBuffer);
     request.setPlanNode(tPlanNode);
     request.setConsensusGroupId(regionReplicaSet.getRegionId());
 
     // Use consensus layer to execute request
-    TSendPlanNodeResp response = dataNodeInternalRPCServiceImpl.sendPlanNode(request);
+    TSendBatchPlanNodeResp response =
+        dataNodeInternalRPCServiceImpl.sendBatchPlanNode(
+            new TSendBatchPlanNodeReq(Collections.singletonList(request)));
 
-    Assert.assertTrue(response.accepted);
+    Assert.assertTrue(response.getResponses().get(0).accepted);
   }
 
   @Test
@@ -308,16 +315,18 @@ public class DataNodeInternalRPCServiceImplTest {
     ByteBuffer byteBuffer = createMultiTimeSeriesNode.serializeToByteBuffer();
 
     // put serialized planNode to TSendPlanNodeReq
-    TSendPlanNodeReq request = new TSendPlanNodeReq();
+    TSendSinglePlanNodeReq request = new TSendSinglePlanNodeReq();
     TPlanNode tPlanNode = new TPlanNode();
     tPlanNode.setBody(byteBuffer);
     request.setPlanNode(tPlanNode);
     request.setConsensusGroupId(regionReplicaSet.getRegionId());
 
     // Use consensus layer to execute request
-    TSendPlanNodeResp response = dataNodeInternalRPCServiceImpl.sendPlanNode(request);
+    TSendBatchPlanNodeResp response =
+        dataNodeInternalRPCServiceImpl.sendBatchPlanNode(
+            new TSendBatchPlanNodeReq(Collections.singletonList(request)));
 
-    Assert.assertTrue(response.accepted);
+    Assert.assertTrue(response.getResponses().get(0).accepted);
   }
 
   private TRegionReplicaSet genRegionReplicaSet() {
diff --git a/thrift/src/main/thrift/datanode.thrift b/thrift/src/main/thrift/datanode.thrift
index aa4cdc030fb..769dbb697c5 100644
--- a/thrift/src/main/thrift/datanode.thrift
+++ b/thrift/src/main/thrift/datanode.thrift
@@ -118,17 +118,25 @@ struct TSendFragmentInstanceResp {
   2: optional string message
 }
 
-struct TSendPlanNodeReq {
+struct TSendSinglePlanNodeReq {
   1: required TPlanNode planNode
   2: required common.TConsensusGroupId consensusGroupId
 }
 
-struct TSendPlanNodeResp {
+struct TSendSinglePlanNodeResp {
   1: required bool accepted
   2: optional string message
   3: optional common.TSStatus status
 }
 
+struct TSendBatchPlanNodeReq {
+  1: required list<TSendSinglePlanNodeReq> requests;
+}
+
+struct TSendBatchPlanNodeResp {
+  1: required list<TSendSinglePlanNodeResp> responses;
+}
+
 struct TFetchFragmentInstanceInfoReq {
   1: required TFragmentInstanceId fragmentInstanceId
 }
@@ -437,9 +445,9 @@ service IDataNodeRPCService {
   TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req);
 
   /**
-  * dispatch PlanNode to remote node for write request in order to save resource
+  * dispatch PlanNodes in batches to remote node for write request in order to save resource
   */
-  TSendPlanNodeResp sendPlanNode(TSendPlanNodeReq req);
+  TSendBatchPlanNodeResp sendBatchPlanNode(TSendBatchPlanNodeReq req);
 
   TFragmentInstanceInfoResp fetchFragmentInstanceInfo(TFetchFragmentInstanceInfoReq req);