You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/13 12:01:56 UTC

[iotdb] 01/01: tmp save

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/write_instance_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 850198002ecf1da7999bc8468632d1fc16eac62f
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 13 20:01:43 2022 +0800

    tmp save
---
 .../apache/iotdb/db/mpp/common/PlanFragmentId.java |  4 +-
 .../db/mpp/sql/planner/DistributionPlanner.java    | 19 +++++--
 .../db/mpp/sql/planner/plan/FragmentInstance.java  | 27 +++-------
 .../plan/SimpleFragmentParallelPlanner.java        |  9 +---
 .../planner/plan/WriteFragmentParallelPlanner.java | 61 ++++++++++++++++++++++
 ...lePlanNodeRewriter.java => IWritePlanNode.java} | 24 +++------
 .../planner/plan/node/SimplePlanNodeRewriter.java  |  4 ++
 .../plan/node/write/InsertMultiTabletNode.java     |  3 +-
 .../plan/node/write/InsertMultiTabletsNode.java    |  9 ++--
 .../sql/planner/plan/node/write/InsertNode.java    | 13 ++---
 .../sql/planner/plan/node/write/InsertRowNode.java |  3 +-
 .../planner/plan/node/write/InsertRowsNode.java    |  3 +-
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |  3 +-
 .../planner/plan/node/write/InsertTabletNode.java  |  5 +-
 .../db/mpp/sql/plan/DistributionPlannerTest.java   | 39 ++++++++++++--
 .../db/mpp/sql/plan/FragmentInstanceSerdeTest.java | 22 ++++----
 16 files changed, 168 insertions(+), 80 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
index e4bdd7d306..4d8d098e56 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
@@ -89,9 +89,7 @@ public class PlanFragmentId {
       return false;
     }
     PlanFragmentId that = (PlanFragmentId) o;
-    return id == that.id
-        && nextFragmentInstanceId == that.nextFragmentInstanceId
-        && Objects.equals(queryId, that.queryId);
+    return id == that.id && Objects.equals(queryId, that.queryId);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 96341a2ca1..f87979a2a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.*;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.*;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
@@ -71,7 +72,10 @@ public class DistributionPlanner {
     PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
     SubPlan subPlan = splitFragment(rootWithExchange);
     List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
-    SetSinkForRootInstance(subPlan, fragmentInstances);
+    // Only execute this step for READ operation
+    if (context.getQueryType() == QueryType.READ) {
+      SetSinkForRootInstance(subPlan, fragmentInstances);
+    }
     return new DistributedQueryPlan(
         logicalPlan.getContext(), subPlan, subPlan.getPlanFragmentList(), fragmentInstances);
   }
@@ -79,8 +83,9 @@ public class DistributionPlanner {
   // Convert fragment to detailed instance
   // And for parallel-able fragment, clone it into several instances with different params.
   public List<FragmentInstance> planFragmentInstances(SubPlan subPlan) {
-    IFragmentParallelPlaner parallelPlaner =
-        new SimpleFragmentParallelPlanner(subPlan, analysis, context);
+    IFragmentParallelPlaner parallelPlaner = context.getQueryType() == QueryType.READ ?
+        new SimpleFragmentParallelPlanner(subPlan, analysis, context) :
+        new WriteFragmentParallelPlanner(subPlan, analysis, context);
     return parallelPlaner.parallelPlan();
   }
 
@@ -196,6 +201,10 @@ public class DistributionPlanner {
   private class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
     @Override
     public PlanNode visitPlan(PlanNode node, NodeGroupContext context) {
+      // TODO: (xingtanzjr) we apply no action for IWritePlanNode currently
+      if (node instanceof IWritePlanNode) {
+        return node;
+      }
       // Visit all the children of current node
       List<PlanNode> children =
           node.getChildren().stream()
@@ -341,6 +350,10 @@ public class DistributionPlanner {
     }
 
     private void splitToSubPlan(PlanNode root, SubPlan subPlan) {
+      // TODO: (xingtanzjr) we apply no action for IWritePlanNode currently
+      if (root instanceof IWritePlanNode) {
+        return;
+      }
       if (root instanceof ExchangeNode) {
         // We add a FragmentSinkNode for newly created PlanFragment
         ExchangeNode exchangeNode = (ExchangeNode) root;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index 03fd9f3e0e..c263732b00 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
@@ -49,33 +48,27 @@ public class FragmentInstance implements IConsensusRequest {
   // We can add some more params for a specific FragmentInstance
   // So that we can make different FragmentInstance owns different data range.
 
-  public FragmentInstance(PlanFragment fragment, int index, Filter timeFilter, QueryType type) {
+  public FragmentInstance(PlanFragment fragment, FragmentInstanceId id, Filter timeFilter, QueryType type) {
     this.fragment = fragment;
     this.timeFilter = timeFilter;
-    this.id = generateId(fragment.getId(), index);
+    this.id = id;
     this.type = type;
   }
 
-  public static FragmentInstanceId generateId(PlanFragmentId id, int index) {
-    return new FragmentInstanceId(id, String.valueOf(index));
-  }
-
   public RegionReplicaSet getDataRegionId() {
     return dataRegion;
   }
 
-  public void setDataRegionId(RegionReplicaSet dataRegion) {
+  public void setDataRegionAndHost(RegionReplicaSet dataRegion) {
     this.dataRegion = dataRegion;
+    // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current instance
+    this.hostEndpoint = dataRegion.getDataNodeList().get(0).getEndPoint();
   }
 
   public Endpoint getHostEndpoint() {
     return hostEndpoint;
   }
 
-  public void setHostEndpoint(Endpoint hostEndpoint) {
-    this.hostEndpoint = hostEndpoint;
-  }
-
   public PlanFragment getFragment() {
     return fragment;
   }
@@ -127,8 +120,7 @@ public class FragmentInstance implements IConsensusRequest {
     Filter timeFilter = hasTimeFilter ? FilterFactory.deserialize(buffer) : null;
     QueryType queryType = QueryType.values()[ReadWriteIOUtils.readInt(buffer)];
     FragmentInstance fragmentInstance =
-        new FragmentInstance(
-            planFragment, Integer.parseInt(id.getInstanceId()), timeFilter, queryType);
+        new FragmentInstance(planFragment, id, timeFilter, queryType);
     RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
     try {
       regionReplicaSet.deserializeImpl(buffer);
@@ -161,12 +153,7 @@ public class FragmentInstance implements IConsensusRequest {
     if (this == o) return true;
     if (o == null || getClass() != o.getClass()) return false;
     FragmentInstance instance = (FragmentInstance) o;
-    return Objects.equals(id, instance.id)
-        && type == instance.type
-        && Objects.equals(fragment, instance.fragment)
-        && Objects.equals(dataRegion, instance.dataRegion)
-        && Objects.equals(hostEndpoint, instance.hostEndpoint)
-        && Objects.equals(timeFilter, instance.timeFilter);
+    return Objects.equals(id, instance.id) && type == instance.type && Objects.equals(fragment, instance.fragment) && Objects.equals(dataRegion, instance.dataRegion) && Objects.equals(hostEndpoint, instance.hostEndpoint) && Objects.equals(timeFilter, instance.timeFilter);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index c2014c55d2..280d9891af 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -79,7 +79,6 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
   private void produceFragmentInstance(PlanFragment fragment) {
     // If one PlanFragment will produce several FragmentInstance, the instanceIdx will be increased
     // one by one
-    int instanceIdx = 0;
     PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getRoot());
     Filter timeFilter =
         analysis.getQueryFilter() == null
@@ -88,7 +87,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
     FragmentInstance fragmentInstance =
         new FragmentInstance(
             new PlanFragment(fragment.getId(), rootCopy),
-            instanceIdx,
+            fragment.getId().genFragmentInstanceId(),
             timeFilter,
             queryContext.getQueryType());
 
@@ -100,11 +99,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
     // We need to store all the replica host in case of the scenario that the instance need to be
     // redirected
     // to another host when scheduling
-    fragmentInstance.setDataRegionId(dataRegion);
-
-    // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current
-    // instance
-    fragmentInstance.setHostEndpoint(dataRegion.getDataNodeList().get(0).getEndPoint());
+    fragmentInstance.setDataRegionAndHost(dataRegion);
     instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
     fragmentInstanceList.add(fragmentInstance);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
new file mode 100644
index 0000000000..eebf705cd3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.planner.plan;
+
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner{
+
+  private SubPlan subPlan;
+  private Analysis analysis;
+  private MPPQueryContext queryContext;
+
+  public WriteFragmentParallelPlanner(SubPlan subPlan, Analysis analysis, MPPQueryContext queryContext) {
+    this.subPlan = subPlan;
+    this.analysis = analysis;
+    this.queryContext = queryContext;
+  }
+
+  @Override
+  public List<FragmentInstance> parallelPlan() {
+    PlanFragment fragment = subPlan.getPlanFragment();
+    Filter timeFilter = analysis.getQueryFilter() != null ? ((GlobalTimeExpression) analysis.getQueryFilter()).getFilter() : null;
+    PlanNode node = fragment.getRoot();
+    if (!(node instanceof IWritePlanNode)) {
+      throw new IllegalArgumentException("PlanNode should be IWritePlanNode in WRITE operation");
+    }
+    List<IWritePlanNode> splits = ((IWritePlanNode) node).splitByPartition(analysis);
+    List<FragmentInstance> ret = new ArrayList<>();
+    for (IWritePlanNode split : splits) {
+      FragmentInstance instance = new FragmentInstance(new PlanFragment(fragment.getId(), split), fragment.getId().genFragmentInstanceId(), timeFilter, queryContext.getQueryType());
+      instance.setDataRegionAndHost(((IWritePlanNode) node).getRegionReplicaSet());
+      ret.add(instance);
+    }
+    return ret;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/IWritePlanNode.java
similarity index 58%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/IWritePlanNode.java
index e0ca8f6cfd..7170952721 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/IWritePlanNode.java
@@ -19,26 +19,18 @@
 
 package org.apache.iotdb.db.mpp.sql.planner.plan.node;
 
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
+import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+
 import java.util.List;
 
-import static com.google.common.collect.ImmutableList.toImmutableList;
+public abstract class IWritePlanNode extends PlanNode {
 
-public class SimplePlanNodeRewriter<C> extends PlanVisitor<PlanNode, C> {
-  @Override
-  public PlanNode visitPlan(PlanNode node, C context) {
-    return defaultRewrite(node, context);
+  protected IWritePlanNode(PlanNodeId id) {
+    super(id);
   }
 
-  public PlanNode defaultRewrite(PlanNode node, C context) {
-    List<PlanNode> children =
-        node.getChildren().stream()
-            .map(child -> rewrite(child, context))
-            .collect(toImmutableList());
-
-    return node.cloneWithChildren(children);
-  }
+  public abstract RegionReplicaSet getRegionReplicaSet();
 
-  public PlanNode rewrite(PlanNode node, C userContext) {
-    return node.accept(this, userContext);
-  }
+  public abstract List<IWritePlanNode> splitByPartition(Analysis analysis);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java
index e0ca8f6cfd..b96002acb3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/SimplePlanNodeRewriter.java
@@ -26,6 +26,10 @@ import static com.google.common.collect.ImmutableList.toImmutableList;
 public class SimplePlanNodeRewriter<C> extends PlanVisitor<PlanNode, C> {
   @Override
   public PlanNode visitPlan(PlanNode node, C context) {
+    // TODO: (xingtanzjr) we apply no action for IWritePlanNode currently
+    if (node instanceof IWritePlanNode) {
+      return node;
+    }
     return defaultRewrite(node, context);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletNode.java
index ede58ff6b8..ef1c570457 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletNode.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
 
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@ -33,7 +34,7 @@ public class InsertMultiTabletNode extends InsertNode {
   }
 
   @Override
-  public List<InsertNode> splitByPartition(Analysis analysis) {
+  public List<IWritePlanNode> splitByPartition(Analysis analysis) {
     return null;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
index 2a11bc6f9a..49b1065a05 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@ -107,13 +108,13 @@ public class InsertMultiTabletsNode extends InsertNode {
   }
 
   @Override
-  public List<InsertNode> splitByPartition(Analysis analysis) {
+  public List<IWritePlanNode> splitByPartition(Analysis analysis) {
     Map<RegionReplicaSet, InsertMultiTabletsNode> splitMap = new HashMap<>();
     for (int i = 0; i < insertTabletNodeList.size(); i++) {
       InsertTabletNode insertTabletNode = insertTabletNodeList.get(i);
-      List<InsertNode> tmpResult = insertTabletNode.splitByPartition(analysis);
-      for (InsertNode subNode : tmpResult) {
-        RegionReplicaSet dataRegionReplicaSet = subNode.getDataRegionReplicaSet();
+      List<IWritePlanNode> tmpResult = insertTabletNode.splitByPartition(analysis);
+      for (IWritePlanNode subNode : tmpResult) {
+        RegionReplicaSet dataRegionReplicaSet = ((InsertNode)subNode).getDataRegionReplicaSet();
         if (splitMap.containsKey(dataRegionReplicaSet)) {
           InsertMultiTabletsNode tmpNode = splitMap.get(dataRegionReplicaSet);
           tmpNode.addInsertTabletNode((InsertTabletNode) subNode, i);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
index 30b091d83b..d7eef6b50b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
@@ -21,17 +21,14 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
 import java.nio.ByteBuffer;
-import java.util.List;
 
-public abstract class InsertNode extends PlanNode {
+public abstract class InsertNode extends IWritePlanNode {
 
   /**
    * if use id table, this filed is id form of device path <br>
@@ -130,9 +127,9 @@ public abstract class InsertNode extends PlanNode {
     this.deviceID = deviceID;
   }
 
-  // TODO(INSERT) split this insert node into multiple InsertNode according to the data partition
-  // info
-  public abstract List<InsertNode> splitByPartition(Analysis analysis);
+  public RegionReplicaSet getRegionReplicaSet() {
+    return dataRegionReplicaSet;
+  }
 
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
index 6e937bef25..e84b461c87 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowNode.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.partition.TimePartitionSlot;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
@@ -57,7 +58,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue {
   }
 
   @Override
-  public List<InsertNode> splitByPartition(Analysis analysis) {
+  public List<IWritePlanNode> splitByPartition(Analysis analysis) {
     TimePartitionSlot timePartitionSlot = StorageEngine.getTimePartitionSlot(time);
     this.dataRegionReplicaSet =
         analysis
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
index bcd4eedc07..3cba509506 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@ -109,7 +110,7 @@ public class InsertRowsNode extends InsertNode {
   public void serialize(ByteBuffer byteBuffer) {}
 
   @Override
-  public List<InsertNode> splitByPartition(Analysis analysis) {
+  public List<IWritePlanNode> splitByPartition(Analysis analysis) {
     Map<RegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>();
     for (int i = 0; i < insertRowNodeList.size(); i++) {
       InsertRowNode insertRowNode = insertRowNodeList.get(i);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index da5b729318..8dc7fd7671 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@ -105,7 +106,7 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
   public void serialize(ByteBuffer byteBuffer) {}
 
   @Override
-  public List<InsertNode> splitByPartition(Analysis analysis) {
+  public List<IWritePlanNode> splitByPartition(Analysis analysis) {
     Map<RegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>();
     for (int i = 0; i < insertRowNodeList.size(); i++) {
       InsertRowNode insertRowNode = insertRowNodeList.get(i);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
index d27d0dafd0..a0a262fb5a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.partition.TimePartitionSlot;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
@@ -149,9 +150,9 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue {
   public void serializeToWAL(IWALByteBufferView buffer, int start, int end) {}
 
   @Override
-  public List<InsertNode> splitByPartition(Analysis analysis) {
+  public List<IWritePlanNode> splitByPartition(Analysis analysis) {
     // only single device in single storage group
-    List<InsertNode> result = new ArrayList<>();
+    List<IWritePlanNode> result = new ArrayList<>();
     if (times.length == 0) {
       return Collections.emptyList();
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index 03db5bb4ff..ed192c357b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
 import org.apache.iotdb.commons.partition.TimePartitionSlot;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
@@ -40,10 +41,13 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import com.google.common.collect.Sets;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -201,6 +205,35 @@ public class DistributionPlannerTest {
     assertEquals(3, plan.getInstances().size());
   }
 
+  @Test
+  public void TestWriteParallelPlan() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_write");
+    InsertRowNode insertRowNode = new InsertRowNode(
+        queryId.genPlanNodeId(),
+        new PartialPath("root.sg.d1"),
+        false,
+        new MeasurementSchema[]{
+            new MeasurementSchema("s1", TSDataType.INT32),
+        },
+        new TSDataType[]{
+            TSDataType.INT32
+        },
+        1L,
+        new Object[]{
+            10
+        });
+
+    Analysis analysis = constructAnalysis();
+
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, QueryType.WRITE, new Endpoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, insertRowNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    plan.getInstances().forEach(System.out::println);
+    assertEquals(1, plan.getInstances().size());
+  }
+
   private Analysis constructAnalysis() {
     Analysis analysis = new Analysis();
 
@@ -228,7 +261,7 @@ public class DistributionPlannerTest {
                 new DataNodeLocation(21, new Endpoint("192.0.2.1", 9000)),
                 new DataNodeLocation(22, new Endpoint("192.0.2.2", 9000)))));
     Map<TimePartitionSlot, List<RegionReplicaSet>> d1DataRegionMap = new HashMap<>();
-    d1DataRegionMap.put(new TimePartitionSlot(), d1DataRegions);
+    d1DataRegionMap.put(new TimePartitionSlot(0), d1DataRegions);
 
     List<RegionReplicaSet> d2DataRegions = new ArrayList<>();
     d2DataRegions.add(
@@ -238,7 +271,7 @@ public class DistributionPlannerTest {
                 new DataNodeLocation(31, new Endpoint("192.0.3.1", 9000)),
                 new DataNodeLocation(32, new Endpoint("192.0.3.2", 9000)))));
     Map<TimePartitionSlot, List<RegionReplicaSet>> d2DataRegionMap = new HashMap<>();
-    d2DataRegionMap.put(new TimePartitionSlot(), d2DataRegions);
+    d2DataRegionMap.put(new TimePartitionSlot(0), d2DataRegions);
 
     List<RegionReplicaSet> d3DataRegions = new ArrayList<>();
     d3DataRegions.add(
@@ -254,7 +287,7 @@ public class DistributionPlannerTest {
                 new DataNodeLocation(41, new Endpoint("192.0.4.1", 9000)),
                 new DataNodeLocation(42, new Endpoint("192.0.4.2", 9000)))));
     Map<TimePartitionSlot, List<RegionReplicaSet>> d3DataRegionMap = new HashMap<>();
-    d3DataRegionMap.put(new TimePartitionSlot(), d3DataRegions);
+    d3DataRegionMap.put(new TimePartitionSlot(0), d3DataRegions);
 
     sgPartitionMap.put(new SeriesPartitionSlot(device1.length()), d1DataRegionMap);
     sgPartitionMap.put(new SeriesPartitionSlot(device2.length()), d2DataRegionMap);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
index dea1e9b031..f57f60dda6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.db.mpp.sql.plan;
 
+import com.google.common.collect.ImmutableList;
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
 import org.apache.iotdb.commons.cluster.Endpoint;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
@@ -54,16 +56,16 @@ public class FragmentInstanceSerdeTest {
 
   @Test
   public void TestSerializeAndDeserializeForTree1() throws IllegalPathException {
+    PlanFragmentId planFragmentId = new PlanFragmentId("test", -1);
     FragmentInstance fragmentInstance =
         new FragmentInstance(
-            new PlanFragment(new PlanFragmentId("test", -1), constructPlanNodeTree()),
-            -1,
+            new PlanFragment(planFragmentId, constructPlanNodeTree()),
+            planFragmentId.genFragmentInstanceId(),
             new GroupByFilter(1, 2, 3, 4),
             QueryType.READ);
     RegionReplicaSet regionReplicaSet =
-        new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
-    fragmentInstance.setDataRegionId(regionReplicaSet);
-    fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.1", 6666));
+        new RegionReplicaSet(new DataRegionId(1), ImmutableList.of(new DataNodeLocation(0, new Endpoint("127.0.0.1", 6666))));
+    fragmentInstance.setDataRegionAndHost(regionReplicaSet);
 
     ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
     fragmentInstance.serializeRequest(byteBuffer);
@@ -74,16 +76,16 @@ public class FragmentInstanceSerdeTest {
 
   @Test
   public void TestSerializeAndDeserializeWithNullFilter() throws IllegalPathException {
+    PlanFragmentId planFragmentId = new PlanFragmentId("test2", 1);
     FragmentInstance fragmentInstance =
         new FragmentInstance(
-            new PlanFragment(new PlanFragmentId("test2", 1), constructPlanNodeTree()),
-            -1,
+            new PlanFragment(planFragmentId, constructPlanNodeTree()),
+            planFragmentId.genFragmentInstanceId(),
             null,
             QueryType.READ);
     RegionReplicaSet regionReplicaSet =
-        new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
-    fragmentInstance.setDataRegionId(regionReplicaSet);
-    fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.2", 6667));
+        new RegionReplicaSet(new DataRegionId(1), ImmutableList.of(new DataNodeLocation(0, new Endpoint("127.0.0.2", 6667))));
+    fragmentInstance.setDataRegionAndHost(regionReplicaSet);
 
     ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
     fragmentInstance.serializeRequest(byteBuffer);