You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/14 05:14:51 UTC

[iotdb] branch master updated: Add DistributionPlanner for WRITE operation (#5515)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 60fc7fc0da Add DistributionPlanner for WRITE operation (#5515)
60fc7fc0da is described below

commit 60fc7fc0da3bbcc1b56c543bfbf9a79d42b5b7f3
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Thu Apr 14 13:14:46 2022 +0800

    Add DistributionPlanner for WRITE operation (#5515)
---
 .../iotdb/commons/partition/DataPartition.java     | 20 ++++-
 .../iotdb/commons/partition/RegionReplicaSet.java  |  4 +-
 .../apache/iotdb/db/mpp/common/PlanFragmentId.java |  4 +-
 .../db/mpp/sql/planner/DistributionPlanner.java    | 20 ++++-
 .../db/mpp/sql/planner/plan/FragmentInstance.java  | 24 +++---
 .../plan/SimpleFragmentParallelPlanner.java        |  9 +--
 .../planner/plan/WriteFragmentParallelPlanner.java | 70 +++++++++++++++++
 .../planner/plan/node/SimplePlanNodeRewriter.java  |  4 +
 ...plePlanNodeRewriter.java => WritePlanNode.java} | 24 ++----
 .../plan/node/source/SeriesAggregateScanNode.java  |  3 +-
 .../planner/plan/node/source/SeriesScanNode.java   |  3 +-
 .../plan/node/write/InsertMultiTabletsNode.java    |  9 ++-
 .../sql/planner/plan/node/write/InsertNode.java    | 12 ++-
 .../sql/planner/plan/node/write/InsertRowNode.java |  3 +-
 .../planner/plan/node/write/InsertRowsNode.java    |  3 +-
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |  3 +-
 .../planner/plan/node/write/InsertTabletNode.java  |  5 +-
 .../db/mpp/sql/plan/DistributionPlannerTest.java   | 87 ++++++++++++++++++++--
 .../db/mpp/sql/plan/FragmentInstanceSerdeTest.java | 26 ++++---
 .../iotdb/db/service/InternalServiceImplTest.java  |  9 ++-
 20 files changed, 259 insertions(+), 83 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 3f677ce2c9..054c01b471 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -68,7 +68,14 @@ public class DataPartition {
     // A list of data region replica sets will store data in a same time partition.
     // We will insert data to the last set in the list.
     // TODO return the latest dataRegionReplicaSet for each time partition
-    return Collections.emptyList();
+    String storageGroup = getStorageGroupByDevice(deviceName);
+    SeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName);
+    // IMPORTANT TODO: (xingtanzjr) need to handle the situation for write operation that there are
+    // more than 1 Regions for one timeSlot
+    return dataPartitionMap.get(storageGroup).get(seriesPartitionSlot).entrySet().stream()
+        .filter(entry -> timePartitionSlotList.contains(entry.getKey()))
+        .flatMap(entry -> entry.getValue().stream())
+        .collect(Collectors.toList());
   }
 
   public RegionReplicaSet getDataRegionReplicaSetForWriting(
@@ -76,7 +83,16 @@ public class DataPartition {
     // A list of data region replica sets will store data in a same time partition.
     // We will insert data to the last set in the list.
     // TODO return the latest dataRegionReplicaSet for each time partition
-    return null;
+    String storageGroup = getStorageGroupByDevice(deviceName);
+    SeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName);
+    List<RegionReplicaSet> regions =
+        dataPartitionMap.get(storageGroup).get(seriesPartitionSlot).entrySet().stream()
+            .filter(entry -> entry.getKey().equals(timePartitionSlot))
+            .flatMap(entry -> entry.getValue().stream())
+            .collect(Collectors.toList());
+    // IMPORTANT TODO: (xingtanzjr) need to handle the situation for write operation that there are
+    // more than 1 Regions for one timeSlot
+    return regions.get(0);
   }
 
   private SeriesPartitionSlot calculateDeviceGroupId(String deviceName) {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
index d98695824d..955a021e50 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
@@ -55,9 +55,7 @@ public class RegionReplicaSet {
 
   @Override
   public String toString() {
-    return String.format(
-        "RegionReplicaSet[%s-%d]: %s",
-        consensusGroupId.getType(), consensusGroupId.getId(), dataNodeList);
+    return String.format("RegionReplicaSet[%s]: %s", consensusGroupId, dataNodeList);
   }
 
   public void serializeImpl(ByteBuffer buffer) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
index 1d3cc2296d..01cde7ef0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
@@ -89,9 +89,7 @@ public class PlanFragmentId {
       return false;
     }
     PlanFragmentId that = (PlanFragmentId) o;
-    return id == that.id
-        && nextFragmentInstanceId == that.nextFragmentInstanceId
-        && Objects.equals(queryId, that.queryId);
+    return id == that.id && Objects.equals(queryId, that.queryId);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 4ef647dc7e..e0d42e1bae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.sql.planner.plan.IFragmentParallelPlaner;
@@ -29,10 +30,12 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
 import org.apache.iotdb.db.mpp.sql.planner.plan.SimpleFragmentParallelPlanner;
 import org.apache.iotdb.db.mpp.sql.planner.plan.SubPlan;
+import org.apache.iotdb.db.mpp.sql.planner.plan.WriteFragmentParallelPlanner;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.SimplePlanNodeRewriter;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaScanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
@@ -84,7 +87,10 @@ public class DistributionPlanner {
     PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
     SubPlan subPlan = splitFragment(rootWithExchange);
     List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
-    SetSinkForRootInstance(subPlan, fragmentInstances);
+    // Only execute this step for READ operation
+    if (context.getQueryType() == QueryType.READ) {
+      SetSinkForRootInstance(subPlan, fragmentInstances);
+    }
     return new DistributedQueryPlan(
         logicalPlan.getContext(), subPlan, subPlan.getPlanFragmentList(), fragmentInstances);
   }
@@ -93,7 +99,9 @@ public class DistributionPlanner {
   // And for parallel-able fragment, clone it into several instances with different params.
   public List<FragmentInstance> planFragmentInstances(SubPlan subPlan) {
     IFragmentParallelPlaner parallelPlaner =
-        new SimpleFragmentParallelPlanner(subPlan, analysis, context);
+        context.getQueryType() == QueryType.READ
+            ? new SimpleFragmentParallelPlanner(subPlan, analysis, context)
+            : new WriteFragmentParallelPlanner(subPlan, analysis, context);
     return parallelPlaner.parallelPlan();
   }
 
@@ -239,6 +247,10 @@ public class DistributionPlanner {
   private class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
     @Override
     public PlanNode visitPlan(PlanNode node, NodeGroupContext context) {
+      // TODO: (xingtanzjr) we apply no action for IWritePlanNode currently
+      if (node instanceof WritePlanNode) {
+        return node;
+      }
       // Visit all the children of current node
       List<PlanNode> children =
           node.getChildren().stream()
@@ -433,6 +445,10 @@ public class DistributionPlanner {
     }
 
     private void splitToSubPlan(PlanNode root, SubPlan subPlan) {
+      // TODO: (xingtanzjr) we apply no action for IWritePlanNode currently
+      if (root instanceof WritePlanNode) {
+        return;
+      }
       if (root instanceof ExchangeNode) {
         // We add a FragmentSinkNode for newly created PlanFragment
         ExchangeNode exchangeNode = (ExchangeNode) root;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index 3e55a5d32c..a3065c8c03 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
@@ -50,15 +49,23 @@ public class FragmentInstance implements IConsensusRequest {
   // We can add some more params for a specific FragmentInstance
   // So that we can make different FragmentInstance owns different data range.
 
-  public FragmentInstance(PlanFragment fragment, int index, Filter timeFilter, QueryType type) {
+  public FragmentInstance(
+      PlanFragment fragment, FragmentInstanceId id, Filter timeFilter, QueryType type) {
     this.fragment = fragment;
     this.timeFilter = timeFilter;
-    this.id = generateId(fragment.getId(), index);
+    this.id = id;
     this.type = type;
   }
 
-  public static FragmentInstanceId generateId(PlanFragmentId id, int index) {
-    return new FragmentInstanceId(id, String.valueOf(index));
+  public RegionReplicaSet getDataRegionId() {
+    return regionReplicaSet;
+  }
+
+  public void setDataRegionAndHost(RegionReplicaSet regionReplicaSet) {
+    this.regionReplicaSet = regionReplicaSet;
+    // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current
+    // instance
+    this.hostEndpoint = regionReplicaSet.getDataNodeList().get(0).getEndPoint();
   }
 
   public RegionReplicaSet getRegionReplicaSet() {
@@ -73,10 +80,6 @@ public class FragmentInstance implements IConsensusRequest {
     return hostEndpoint;
   }
 
-  public void setHostEndpoint(Endpoint hostEndpoint) {
-    this.hostEndpoint = hostEndpoint;
-  }
-
   public PlanFragment getFragment() {
     return fragment;
   }
@@ -133,8 +136,7 @@ public class FragmentInstance implements IConsensusRequest {
     Filter timeFilter = hasTimeFilter ? FilterFactory.deserialize(buffer) : null;
     QueryType queryType = QueryType.values()[ReadWriteIOUtils.readInt(buffer)];
     FragmentInstance fragmentInstance =
-        new FragmentInstance(
-            planFragment, Integer.parseInt(id.getInstanceId()), timeFilter, queryType);
+        new FragmentInstance(planFragment, id, timeFilter, queryType);
     fragmentInstance.regionReplicaSet = RegionReplicaSet.deserializeImpl(buffer);
     fragmentInstance.hostEndpoint = Endpoint.deserializeImpl(buffer);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index d3357ec275..631b217695 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -79,7 +79,6 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
   private void produceFragmentInstance(PlanFragment fragment) {
     // If one PlanFragment will produce several FragmentInstance, the instanceIdx will be increased
     // one by one
-    int instanceIdx = 0;
     PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getRoot());
     Filter timeFilter =
         analysis.getQueryFilter() == null
@@ -88,7 +87,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
     FragmentInstance fragmentInstance =
         new FragmentInstance(
             new PlanFragment(fragment.getId(), rootCopy),
-            instanceIdx,
+            fragment.getId().genFragmentInstanceId(),
             timeFilter,
             queryContext.getQueryType());
 
@@ -100,11 +99,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
     // We need to store all the replica host in case of the scenario that the instance need to be
     // redirected
     // to another host when scheduling
-    fragmentInstance.setRegionReplicaSet(regionReplicaSet);
-
-    // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current
-    // instance
-    fragmentInstance.setHostEndpoint(regionReplicaSet.getDataNodeList().get(0).getEndPoint());
+    fragmentInstance.setDataRegionAndHost(regionReplicaSet);
     instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
     fragmentInstanceList.add(fragmentInstance);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
new file mode 100644
index 0000000000..be34ae257e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.planner.plan;
+
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner {
+
+  private SubPlan subPlan;
+  private Analysis analysis;
+  private MPPQueryContext queryContext;
+
+  public WriteFragmentParallelPlanner(
+      SubPlan subPlan, Analysis analysis, MPPQueryContext queryContext) {
+    this.subPlan = subPlan;
+    this.analysis = analysis;
+    this.queryContext = queryContext;
+  }
+
+  @Override
+  public List<FragmentInstance> parallelPlan() {
+    PlanFragment fragment = subPlan.getPlanFragment();
+    Filter timeFilter =
+        analysis.getQueryFilter() != null
+            ? ((GlobalTimeExpression) analysis.getQueryFilter()).getFilter()
+            : null;
+    PlanNode node = fragment.getRoot();
+    if (!(node instanceof WritePlanNode)) {
+      throw new IllegalArgumentException("PlanNode should be IWritePlanNode in WRITE operation");
+    }
+    List<WritePlanNode> splits = ((WritePlanNode) node).splitByPartition(analysis);
+    List<FragmentInstance> ret = new ArrayList<>();
+    for (WritePlanNode split : splits) {
+      FragmentInstance instance =
+          new FragmentInstance(
+              new PlanFragment(fragment.getId(), split),
+              fragment.getId().genFragmentInstanceId(),
+              timeFilter,
+              queryContext.getQueryType());
+      instance.setDataRegionAndHost(split.getRegionReplicaSet());
+      ret.add(instance);
+    }
+    return ret;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java
index e0ca8f6cfd..30a42b21d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java
@@ -26,6 +26,10 @@ import static com.google.common.collect.ImmutableList.toImmutableList;
 public class SimplePlanNodeRewriter<C> extends PlanVisitor<PlanNode, C> {
   @Override
   public PlanNode visitPlan(PlanNode node, C context) {
+    // TODO: (xingtanzjr) we apply no action for IWritePlanNode currently
+    if (node instanceof WritePlanNode) {
+      return node;
+    }
     return defaultRewrite(node, context);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/WritePlanNode.java
similarity index 58%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/WritePlanNode.java
index e0ca8f6cfd..c37b8e1d7f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/WritePlanNode.java
@@ -19,26 +19,18 @@
 
 package org.apache.iotdb.db.mpp.sql.planner.plan.node;
 
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+
 import java.util.List;
 
-import static com.google.common.collect.ImmutableList.toImmutableList;
+public abstract class WritePlanNode extends PlanNode {
 
-public class SimplePlanNodeRewriter<C> extends PlanVisitor<PlanNode, C> {
-  @Override
-  public PlanNode visitPlan(PlanNode node, C context) {
-    return defaultRewrite(node, context);
+  protected WritePlanNode(PlanNodeId id) {
+    super(id);
   }
 
-  public PlanNode defaultRewrite(PlanNode node, C context) {
-    List<PlanNode> children =
-        node.getChildren().stream()
-            .map(child -> rewrite(child, context))
-            .collect(toImmutableList());
-
-    return node.cloneWithChildren(children);
-  }
+  public abstract RegionReplicaSet getRegionReplicaSet();
 
-  public PlanNode rewrite(PlanNode node, C userContext) {
-    return node.accept(this, userContext);
-  }
+  public abstract List<WritePlanNode> splitByPartition(Analysis analysis);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
index d54501cb51..dcde49fd9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
@@ -195,8 +195,7 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
     Filter timeFilter = FilterFactory.deserialize(byteBuffer);
 
     // TODO serialize groupByTimeParameter
-    RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
-    RegionReplicaSet.deserializeImpl(byteBuffer);
+    RegionReplicaSet regionReplicaSet = RegionReplicaSet.deserializeImpl(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     SeriesAggregateScanNode seriesAggregateScanNode =
         new SeriesAggregateScanNode(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
index 11726b62d9..bf0df051bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
@@ -209,8 +209,7 @@ public class SeriesScanNode extends SourceNode implements IOutputPlanNode {
     if (isNull == 1) valueFilter = FilterFactory.deserialize(byteBuffer);
     int limit = ReadWriteIOUtils.readInt(byteBuffer);
     int offset = ReadWriteIOUtils.readInt(byteBuffer);
-    RegionReplicaSet dataRegionReplicaSet = new RegionReplicaSet();
-    RegionReplicaSet.deserializeImpl(byteBuffer);
+    RegionReplicaSet dataRegionReplicaSet = RegionReplicaSet.deserializeImpl(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     SeriesScanNode seriesScanNode = new SeriesScanNode(planNodeId, partialPath);
     seriesScanNode.allSensors = allSensors;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
index 47fbed6dff..946fa9f7f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 
 import java.nio.ByteBuffer;
@@ -108,13 +109,13 @@ public class InsertMultiTabletsNode extends InsertNode {
   }
 
   @Override
-  public List<InsertNode> splitByPartition(Analysis analysis) {
+  public List<WritePlanNode> splitByPartition(Analysis analysis) {
     Map<RegionReplicaSet, InsertMultiTabletsNode> splitMap = new HashMap<>();
     for (int i = 0; i < insertTabletNodeList.size(); i++) {
       InsertTabletNode insertTabletNode = insertTabletNodeList.get(i);
-      List<InsertNode> tmpResult = insertTabletNode.splitByPartition(analysis);
-      for (InsertNode subNode : tmpResult) {
-        RegionReplicaSet dataRegionReplicaSet = subNode.getDataRegionReplicaSet();
+      List<WritePlanNode> tmpResult = insertTabletNode.splitByPartition(analysis);
+      for (WritePlanNode subNode : tmpResult) {
+        RegionReplicaSet dataRegionReplicaSet = ((InsertNode) subNode).getDataRegionReplicaSet();
         if (splitMap.containsKey(dataRegionReplicaSet)) {
           InsertMultiTabletsNode tmpNode = splitMap.get(dataRegionReplicaSet);
           tmpNode.addInsertTabletNode((InsertTabletNode) subNode, i);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
index 30b091d83b..6856f2c0dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
@@ -21,17 +21,15 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.nio.ByteBuffer;
-import java.util.List;
 
-public abstract class InsertNode extends PlanNode {
+public abstract class InsertNode extends WritePlanNode {
 
   /**
    * if use id table, this filed is id form of device path <br>
@@ -130,9 +128,9 @@ public abstract class InsertNode extends PlanNode {
     this.deviceID = deviceID;
   }
 
-  // TODO(INSERT) split this insert node into multiple InsertNode according to the data partition
-  // info
-  public abstract List<InsertNode> splitByPartition(Analysis analysis);
+  public RegionReplicaSet getRegionReplicaSet() {
+    return dataRegionReplicaSet;
+  }
 
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
index 6e937bef25..1d37b01626 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.wal.buffer.WALEntryValue;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@ -57,7 +58,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
   }
 
   @Override
-  public List<InsertNode> splitByPartition(Analysis analysis) {
+  public List<WritePlanNode> splitByPartition(Analysis analysis) {
     TimePartitionSlot timePartitionSlot = StorageEngine.getTimePartitionSlot(time);
     this.dataRegionReplicaSet =
         analysis
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
index b3012c9986..cd80ae45cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 
 import java.nio.ByteBuffer;
@@ -114,7 +115,7 @@ public class InsertRowsNode extends InsertNode {
   public void serialize(ByteBuffer byteBuffer) {}
 
   @Override
-  public List<InsertNode> splitByPartition(Analysis analysis) {
+  public List<WritePlanNode> splitByPartition(Analysis analysis) {
     Map<RegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>();
     for (int i = 0; i < insertRowNodeList.size(); i++) {
       InsertRowNode insertRowNode = insertRowNodeList.get(i);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 3c22c0fd57..34d4e81c63 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 
 import java.nio.ByteBuffer;
@@ -110,7 +111,7 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
   public void serialize(ByteBuffer byteBuffer) {}
 
   @Override
-  public List<InsertNode> splitByPartition(Analysis analysis) {
+  public List<WritePlanNode> splitByPartition(Analysis analysis) {
     Map<RegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>();
     for (int i = 0; i < insertRowNodeList.size(); i++) {
       InsertRowNode insertRowNode = insertRowNodeList.get(i);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
index d27d0dafd0..2d516f1e47 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
 import org.apache.iotdb.db.wal.buffer.WALEntryValue;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@ -149,9 +150,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
   public void serializeToWAL(IWALByteBufferView buffer, int start, int end) {}
 
   @Override
-  public List<InsertNode> splitByPartition(Analysis analysis) {
+  public List<WritePlanNode> splitByPartition(Analysis analysis) {
     // only single device in single storage group
-    List<InsertNode> result = new ArrayList<>();
+    List<WritePlanNode> result = new ArrayList<>();
     if (times.length == 0) {
       return Collections.emptyList();
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index 51a2313f08..41813bd337 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -46,8 +46,11 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesSch
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowsNode;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import com.google.common.collect.Sets;
 import org.junit.Test;
@@ -254,6 +257,74 @@ public class DistributionPlannerTest {
     assertEquals(3, plan.getInstances().size());
   }
 
+  @Test
+  public void TestInsertRowNodeParallelPlan() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_write");
+    InsertRowNode insertRowNode =
+        new InsertRowNode(
+            queryId.genPlanNodeId(),
+            new PartialPath("root.sg.d1"),
+            false,
+            new MeasurementSchema[] {
+              new MeasurementSchema("s1", TSDataType.INT32),
+            },
+            new TSDataType[] {TSDataType.INT32},
+            1L,
+            new Object[] {10});
+
+    Analysis analysis = constructAnalysis();
+
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, QueryType.WRITE, new Endpoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, insertRowNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    plan.getInstances().forEach(System.out::println);
+    assertEquals(1, plan.getInstances().size());
+  }
+
+  @Test
+  public void TestInsertRowsNodeParallelPlan() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_write");
+    InsertRowNode insertRowNode1 =
+        new InsertRowNode(
+            queryId.genPlanNodeId(),
+            new PartialPath("root.sg.d1"),
+            false,
+            new MeasurementSchema[] {
+              new MeasurementSchema("s1", TSDataType.INT32),
+            },
+            new TSDataType[] {TSDataType.INT32},
+            1L,
+            new Object[] {10});
+
+    InsertRowNode insertRowNode2 =
+        new InsertRowNode(
+            queryId.genPlanNodeId(),
+            new PartialPath("root.sg.d1"),
+            false,
+            new MeasurementSchema[] {
+              new MeasurementSchema("s1", TSDataType.INT32),
+            },
+            new TSDataType[] {TSDataType.INT32},
+            100000L,
+            new Object[] {10});
+
+    InsertRowsNode node = new InsertRowsNode(queryId.genPlanNodeId());
+    node.setInsertRowNodeList(Arrays.asList(insertRowNode1, insertRowNode2));
+    node.setInsertRowNodeIndexList(Arrays.asList(0, 1));
+
+    Analysis analysis = constructAnalysis();
+
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, QueryType.WRITE, new Endpoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, node));
+    DistributedQueryPlan plan = planner.planFragments();
+    plan.getInstances().forEach(System.out::println);
+    assertEquals(1, plan.getInstances().size());
+  }
+
   private Analysis constructAnalysis() {
     Analysis analysis = new Analysis();
 
@@ -267,21 +338,25 @@ public class DistributionPlannerTest {
     Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>> sgPartitionMap =
         new HashMap<>();
 
-    List<RegionReplicaSet> d1DataRegions = new ArrayList<>();
-    d1DataRegions.add(
+    List<RegionReplicaSet> d1DataRegion1 = new ArrayList<>();
+    d1DataRegion1.add(
         new RegionReplicaSet(
             new DataRegionId(1),
             Arrays.asList(
                 new DataNodeLocation(11, new Endpoint("192.0.1.1", 9000)),
                 new DataNodeLocation(12, new Endpoint("192.0.1.2", 9000)))));
-    d1DataRegions.add(
+
+    List<RegionReplicaSet> d1DataRegion2 = new ArrayList<>();
+    d1DataRegion2.add(
         new RegionReplicaSet(
             new DataRegionId(2),
             Arrays.asList(
                 new DataNodeLocation(21, new Endpoint("192.0.2.1", 9000)),
                 new DataNodeLocation(22, new Endpoint("192.0.2.2", 9000)))));
+
     Map<TimePartitionSlot, List<RegionReplicaSet>> d1DataRegionMap = new HashMap<>();
-    d1DataRegionMap.put(new TimePartitionSlot(), d1DataRegions);
+    d1DataRegionMap.put(new TimePartitionSlot(0), d1DataRegion1);
+    d1DataRegionMap.put(new TimePartitionSlot(1), d1DataRegion2);
 
     List<RegionReplicaSet> d2DataRegions = new ArrayList<>();
     d2DataRegions.add(
@@ -291,7 +366,7 @@ public class DistributionPlannerTest {
                 new DataNodeLocation(31, new Endpoint("192.0.3.1", 9000)),
                 new DataNodeLocation(32, new Endpoint("192.0.3.2", 9000)))));
     Map<TimePartitionSlot, List<RegionReplicaSet>> d2DataRegionMap = new HashMap<>();
-    d2DataRegionMap.put(new TimePartitionSlot(), d2DataRegions);
+    d2DataRegionMap.put(new TimePartitionSlot(0), d2DataRegions);
 
     List<RegionReplicaSet> d3DataRegions = new ArrayList<>();
     d3DataRegions.add(
@@ -307,7 +382,7 @@ public class DistributionPlannerTest {
                 new DataNodeLocation(41, new Endpoint("192.0.4.1", 9000)),
                 new DataNodeLocation(42, new Endpoint("192.0.4.2", 9000)))));
     Map<TimePartitionSlot, List<RegionReplicaSet>> d3DataRegionMap = new HashMap<>();
-    d3DataRegionMap.put(new TimePartitionSlot(), d3DataRegions);
+    d3DataRegionMap.put(new TimePartitionSlot(0), d3DataRegions);
 
     sgPartitionMap.put(new SeriesPartitionSlot(device1.length()), d1DataRegionMap);
     sgPartitionMap.put(new SeriesPartitionSlot(device2.length()), d2DataRegionMap);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
index 6871ff6560..4bf6a477e2 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.sql.plan;
 
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
 import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
@@ -43,6 +44,7 @@ import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
 import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
 import org.apache.iotdb.tsfile.read.filter.operator.Gt;
 
+import com.google.common.collect.ImmutableList;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
@@ -54,16 +56,18 @@ public class FragmentInstanceSerdeTest {
 
   @Test
   public void TestSerializeAndDeserializeForTree1() throws IllegalPathException {
+    PlanFragmentId planFragmentId = new PlanFragmentId("test", -1);
     FragmentInstance fragmentInstance =
         new FragmentInstance(
-            new PlanFragment(new PlanFragmentId("test", -1), constructPlanNodeTree()),
-            -1,
+            new PlanFragment(planFragmentId, constructPlanNodeTree()),
+            planFragmentId.genFragmentInstanceId(),
             new GroupByFilter(1, 2, 3, 4),
             QueryType.READ);
     RegionReplicaSet regionReplicaSet =
-        new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
-    fragmentInstance.setRegionReplicaSet(regionReplicaSet);
-    fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.1", 6666));
+        new RegionReplicaSet(
+            new DataRegionId(1),
+            ImmutableList.of(new DataNodeLocation(0, new Endpoint("127.0.0.1", 6666))));
+    fragmentInstance.setDataRegionAndHost(regionReplicaSet);
 
     ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
     fragmentInstance.serializeRequest(byteBuffer);
@@ -74,16 +78,18 @@ public class FragmentInstanceSerdeTest {
 
   @Test
   public void TestSerializeAndDeserializeWithNullFilter() throws IllegalPathException {
+    PlanFragmentId planFragmentId = new PlanFragmentId("test2", 1);
     FragmentInstance fragmentInstance =
         new FragmentInstance(
-            new PlanFragment(new PlanFragmentId("test2", 1), constructPlanNodeTree()),
-            -1,
+            new PlanFragment(planFragmentId, constructPlanNodeTree()),
+            planFragmentId.genFragmentInstanceId(),
             null,
             QueryType.READ);
     RegionReplicaSet regionReplicaSet =
-        new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
-    fragmentInstance.setRegionReplicaSet(regionReplicaSet);
-    fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.2", 6667));
+        new RegionReplicaSet(
+            new DataRegionId(1),
+            ImmutableList.of(new DataNodeLocation(0, new Endpoint("127.0.0.2", 6667))));
+    fragmentInstance.setDataRegionAndHost(regionReplicaSet);
 
     ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
     fragmentInstance.serializeRequest(byteBuffer);
diff --git a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
index 7ed08d65b6..d5ba4d1f07 100644
--- a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@ -117,9 +117,12 @@ public class InternalServiceImplTest {
     RegionReplicaSet regionReplicaSet = genRegionReplicaSet();
     PlanFragment planFragment = new PlanFragment(new PlanFragmentId("2", 3), createTimeSeriesNode);
     FragmentInstance fragmentInstance =
-        new FragmentInstance(planFragment, 4, new GroupByFilter(1, 2, 3, 4), QueryType.WRITE);
-    fragmentInstance.setRegionReplicaSet(regionReplicaSet);
-    fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.1", 6666));
+        new FragmentInstance(
+            planFragment,
+            planFragment.getId().genFragmentInstanceId(),
+            new GroupByFilter(1, 2, 3, 4),
+            QueryType.WRITE);
+    fragmentInstance.setDataRegionAndHost(regionReplicaSet);
 
     // serialize fragmentInstance
     ByteBuffer byteBuffer = ByteBuffer.allocate(1024);