You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/03/24 02:47:48 UTC

[iotdb] branch master updated: [IoTDB-2260]Complete the basic version of DistributionPlanner (#5327)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new bc3179b  [IoTDB-2260]Complete the basic version of DistributionPlanner (#5327)
bc3179b is described below

commit bc3179b99f9296bb9c036cf7e2c56632db4b85bc
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Thu Mar 24 10:47:01 2022 +0800

    [IoTDB-2260]Complete the basic version of DistributionPlanner (#5327)
    
    * complete SubPlan builder
    
    * add interface
    
    * format cofde
    
    * add fragment parallel planner
    
    * improve the InsertTabletNode and InsertTabletStatement
    
    * complete basic distributed plan
    
    * spotless
    
    * spotless
    
    * change name
    
    Co-authored-by: qiaojialin <64...@qq.com>
    Co-authored-by: JackieTien97 <ja...@gmail.com>
---
 .../iotdb/commons/partition/DataPartitionInfo.java |  67 +++++++++++
 .../commons/partition/DataPartitionQueryParam.java |  30 +++--
 .../iotdb/commons/partition/DataRegionId.java      |  22 +++-
 .../commons/partition/DataRegionReplicaSet.java    |  43 ++++---
 .../iotdb/commons/partition/DeviceGroupId.java     |  33 +++---
 .../iotdb/commons/partition/PartitionInfo.java     |  31 ++---
 .../commons/partition/SchemaPartitionInfo.java     |  19 ++-
 .../iotdb/commons/partition/SchemaRegionId.java    |  14 ++-
 .../commons/partition/SchemaRegionReplicaSet.java  |  29 +++--
 .../iotdb/commons/partition/TimePartitionId.java   |  17 ++-
 .../iotdb/db/mpp/common/FragmentInstanceId.java    |   4 +
 .../iotdb/db/mpp/common/MPPQueryContext.java       |  16 ++-
 .../{FragmentId.java => PlanFragmentId.java}       |  16 ++-
 .../SessionInfo.java}                              |  11 +-
 .../apache/iotdb/db/mpp/execution/Coordinator.java |  24 +++-
 .../ExecutionResult.java}                          |  15 ++-
 .../iotdb/db/mpp/execution/FragmentInfo.java       |   6 +-
 .../iotdb/db/mpp/execution/QueryExecution.java     |  41 ++++---
 .../mpp/execution/scheduler/ClusterScheduler.java  |   4 +-
 .../db/mpp/execution/scheduler/IScheduler.java     |   4 +-
 .../execution/scheduler/StandaloneScheduler.java   |   4 +-
 .../apache/iotdb/db/mpp/sql/analyze/Analysis.java  |  63 ++++++----
 .../{StatementAnalyzer.java => Analyzer.java}      |  47 +++++++-
 .../ClusterPartitionFetcher.java}                  |  29 ++---
 .../ClusterSchemaFetcher.java}                     |  19 +--
 .../IPartitionFetcher.java}                        |  39 ++-----
 .../ISchemaFetcher.java}                           |  13 ++-
 .../analyze/QueryType.java}                        |   7 +-
 .../sql/analyze/StandalonePartitionFetcher.java    |  66 +++++++++++
 .../StandaloneSchemaFetcher.java}                  |  25 ++--
 .../db/mpp/sql/planner/DistributionPlanner.java    |  99 +++++++++++++---
 .../iotdb/db/mpp/sql/planner/LogicalPlanner.java   |  19 ++-
 .../mpp/sql/planner/plan/DistributedQueryPlan.java |  34 +++++-
 .../db/mpp/sql/planner/plan/FragmentInstance.java  |  65 +++++++++++
 ...QueryPlan.java => IFragmentParallelPlaner.java} |  25 ++--
 .../db/mpp/sql/planner/plan/PlanFragment.java      |  61 +++++++++-
 .../plan/SimpleFragmentParallelPlanner.java        | 124 ++++++++++++++++++++
 .../iotdb/db/mpp/sql/planner/plan/SubPlan.java     |  68 +++++++++++
 .../db/mpp/sql/planner/plan/node/PlanNodeUtil.java |  26 +++++
 .../db/mpp/sql/planner/plan/node/PlanVisitor.java  |   4 +
 .../planner/plan/node/process/ExchangeNode.java    |  85 +++++++++++---
 .../planner/plan/node/sink/FragmentSinkNode.java   |  61 +++++++++-
 .../planner/plan/node/source/CsvSourceNode.java    |  11 ++
 .../plan/node/source/SeriesAggregateScanNode.java  |  16 ++-
 .../planner/plan/node/source/SeriesScanNode.java   |  30 ++---
 .../sql/planner/plan/node/source/SourceNode.java   |   5 +
 .../sql/planner/plan/node/write/InsertNode.java    |  16 +--
 .../planner/plan/node/write/InsertTabletNode.java  |  21 +++-
 .../statement/crud/InsertBaseStatement.java}       |  37 +++---
 .../crud/InsertTabletStatement.java}               |  13 ++-
 .../iotdb/db/mpp/sql/tree/StatementVisitor.java    |   5 +
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |   1 +
 .../iotdb/db/query/control/SessionManager.java     |   8 ++
 .../iotdb/db/service/basic/ServiceProvider.java    |   6 +-
 .../db/service/thrift/impl/TSServiceImpl.java      |  56 ++++++++-
 ...tatementAnalyzerTest.java => AnalyzerTest.java} |   4 +-
 .../db/mpp/sql/plan/DistributionPlannerTest.java   | 128 +++++++++++++++------
 .../iotdb/db/mpp/sql/plan/LogicalPlannerTest.java  |   4 +-
 58 files changed, 1412 insertions(+), 378 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionInfo.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionInfo.java
new file mode 100644
index 0000000..9643471
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionInfo.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.partition;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class DataPartitionInfo {
+
+  // Map<StorageGroup, Map<DeviceGroupID, Map<TimePartitionId, List<DataRegionPlaceInfo>>>>
+  private Map<String, Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>>>
+      dataPartitionMap;
+
+  public Map<String, Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>>>
+      getDataPartitionMap() {
+    return dataPartitionMap;
+  }
+
+  public void setDataPartitionMap(
+      Map<String, Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>>>
+          dataPartitionMap) {
+    this.dataPartitionMap = dataPartitionMap;
+  }
+
+  public List<DataRegionReplicaSet> getDataRegionReplicaSet(
+      String deviceName, List<TimePartitionId> timePartitionIdList) {
+    String storageGroup = getStorageGroupByDevice(deviceName);
+    DeviceGroupId deviceGroupId = calculateDeviceGroupId(deviceName);
+    // TODO: (xingtanzjr) the timePartitionIdList is ignored
+    return dataPartitionMap.get(storageGroup).get(deviceGroupId).values().stream()
+        .flatMap(Collection::stream)
+        .collect(Collectors.toList());
+  }
+
+  private DeviceGroupId calculateDeviceGroupId(String deviceName) {
+    // TODO: (xingtanzjr) implement the real algorithm for calculation of DeviceGroupId
+    return new DeviceGroupId(deviceName.length());
+  }
+
+  private String getStorageGroupByDevice(String deviceName) {
+    for (String storageGroup : dataPartitionMap.keySet()) {
+      if (deviceName.startsWith(storageGroup)) {
+        return storageGroup;
+      }
+    }
+    // TODO: (xingtanzjr) how to handle this exception in IoTDB
+    return null;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
similarity index 61%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
index 8cf0341..60cdf7c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionQueryParam.java
@@ -16,18 +16,28 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.sql.planner.plan;
-
-import org.apache.iotdb.db.mpp.common.MPPQueryContext;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+package org.apache.iotdb.commons.partition;
 
 import java.util.List;
 
-public class DistributedQueryPlan {
-  private MPPQueryContext context;
-  private PlanNode rootNode;
-  private PlanFragment rootFragment;
+public class DataPartitionQueryParam {
+
+  private String deviceId;
+  private List<TimePartitionId> timePartitionIdList;
+
+  public String getDeviceId() {
+    return deviceId;
+  }
+
+  public void setDeviceId(String deviceId) {
+    this.deviceId = deviceId;
+  }
+
+  public List<TimePartitionId> getTimePartitionIdList() {
+    return timePartitionIdList;
+  }
 
-  // TODO: consider whether this field is necessary when do the implementation
-  private List<PlanFragment> fragments;
+  public void setTimePartitionIdList(List<TimePartitionId> timePartitionIdList) {
+    this.timePartitionIdList = timePartitionIdList;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionId.java
similarity index 65%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionId.java
index 38a56f3..dffdd90 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionId.java
@@ -16,12 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.commons.partition;
 
-public class PlanFragmentId {
-  private String id;
+public class DataRegionId {
+  private int dataRegionId;
 
-  public PlanFragmentId(String id) {
-    this.id = id;
+  public DataRegionId(int dataRegionId) {
+    this.dataRegionId = dataRegionId;
+  }
+
+  public int getDataRegionId() {
+    return dataRegionId;
+  }
+
+  public void setDataRegionId(int dataRegionId) {
+    this.dataRegionId = dataRegionId;
+  }
+
+  public String toString() {
+    return String.format("DataRegion-%d", dataRegionId);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionReplicaSet.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionReplicaSet.java
index 6151916..6ab4fbf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataRegionReplicaSet.java
@@ -16,41 +16,38 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
+package org.apache.iotdb.commons.partition;
 
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
 
 import java.util.List;
 
-public class FragmentSinkNode extends SinkNode {
-  public FragmentSinkNode(PlanNodeId id) {
-    super(id);
-  }
+public class DataRegionReplicaSet {
+  private DataRegionId Id;
+  private List<EndPoint> endPointList;
 
-  @Override
-  public List<PlanNode> getChildren() {
-    return null;
+  public DataRegionReplicaSet(DataRegionId Id, List<EndPoint> endPointList) {
+    this.Id = Id;
+    this.endPointList = endPointList;
   }
 
-  @Override
-  public PlanNode clone() {
-    return null;
+  public List<EndPoint> getEndPointList() {
+    return endPointList;
   }
 
-  @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
+  public void setEndPointList(List<EndPoint> endPointList) {
+    this.endPointList = endPointList;
   }
 
-  @Override
-  public List<String> getOutputColumnNames() {
-    return null;
+  public DataRegionId getId() {
+    return Id;
   }
 
-  @Override
-  public void send() {}
+  public void setId(DataRegionId id) {
+    this.Id = id;
+  }
 
-  @Override
-  public void close() throws Exception {}
+  public String toString() {
+    return String.format("%s:%s", Id, endPointList);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DeviceGroupId.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/partition/DeviceGroupId.java
index 1a597b2..0a7123c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DeviceGroupId.java
@@ -16,24 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.sql.planner.plan.node;
+package org.apache.iotdb.commons.partition;
 
-public class PlanNodeUtil {
-  public static void printPlanNode(PlanNode root) {
-    printPlanNodeWithLevel(root, 0);
+public class DeviceGroupId {
+  private int deviceGroupId;
+
+  public DeviceGroupId(int deviceGroupId) {
+    this.deviceGroupId = deviceGroupId;
+  }
+
+  public int getDeviceGroupId() {
+    return deviceGroupId;
+  }
+
+  public void setDeviceGroupId(int deviceGroupId) {
+    this.deviceGroupId = deviceGroupId;
   }
 
-  private static void printPlanNodeWithLevel(PlanNode root, int level) {
-    printTab(level);
-    System.out.println(root.toString());
-    for (PlanNode child : root.getChildren()) {
-      printPlanNodeWithLevel(child, level + 1);
-    }
+  public int hashCode() {
+    return new Integer(deviceGroupId).hashCode();
   }
 
-  private static void printTab(int count) {
-    for (int i = 0; i < count; i++) {
-      System.out.print("\t");
-    }
+  public boolean equals(Object obj) {
+    return obj instanceof DeviceGroupId
+        && this.deviceGroupId == ((DeviceGroupId) obj).deviceGroupId;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/PartitionInfo.java
similarity index 57%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/partition/PartitionInfo.java
index 16145fd..90a22a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/PartitionInfo.java
@@ -16,25 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution.scheduler;
+package org.apache.iotdb.commons.partition;
 
-import org.apache.iotdb.db.mpp.common.FragmentId;
-import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
-import org.apache.iotdb.db.mpp.execution.FragmentInfo;
+public class PartitionInfo {
 
-import io.airlift.units.Duration;
+  private DataPartitionInfo dataPartitionInfo;
+  private SchemaPartitionInfo schemaPartitionInfo;
 
-public interface IScheduler {
+  public DataPartitionInfo getDataPartitionInfo() {
+    return dataPartitionInfo;
+  }
 
-  void start();
+  public void setDataPartitionInfo(DataPartitionInfo dataPartitionInfo) {
+    this.dataPartitionInfo = dataPartitionInfo;
+  }
 
-  void abort();
+  public SchemaPartitionInfo getSchemaPartitionInfo() {
+    return schemaPartitionInfo;
+  }
 
-  Duration getTotalCpuTime();
-
-  FragmentInfo getFragmentInfo();
-
-  void failFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause);
-
-  void cancelFragment(FragmentId fragmentId);
+  public void setSchemaPartitionInfo(SchemaPartitionInfo schemaPartitionInfo) {
+    this.schemaPartitionInfo = schemaPartitionInfo;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartitionInfo.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartitionInfo.java
index 38a56f3..119115c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartitionInfo.java
@@ -16,12 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.commons.partition;
 
-public class PlanFragmentId {
-  private String id;
+import java.util.Map;
 
-  public PlanFragmentId(String id) {
-    this.id = id;
+public class SchemaPartitionInfo {
+
+  // Map<StorageGroup, Map<DeviceGroupID, SchemaRegionPlaceInfo>>
+  private Map<String, Map<DeviceGroupId, SchemaRegionReplicaSet>> schemaPartitionInfo;
+
+  public Map<String, Map<DeviceGroupId, SchemaRegionReplicaSet>> getSchemaPartitionInfo() {
+    return schemaPartitionInfo;
+  }
+
+  public void setSchemaPartitionInfo(
+      Map<String, Map<DeviceGroupId, SchemaRegionReplicaSet>> schemaPartitionInfo) {
+    this.schemaPartitionInfo = schemaPartitionInfo;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaRegionId.java
similarity index 75%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaRegionId.java
index 38a56f3..6a0363c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaRegionId.java
@@ -16,12 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.commons.partition;
 
-public class PlanFragmentId {
-  private String id;
+public class SchemaRegionId {
+  private int schemaRegionId;
 
-  public PlanFragmentId(String id) {
-    this.id = id;
+  public int getSchemaRegionId() {
+    return schemaRegionId;
+  }
+
+  public void setSchemaRegionId(int schemaRegionId) {
+    this.schemaRegionId = schemaRegionId;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaRegionReplicaSet.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaRegionReplicaSet.java
index 8cf0341..7f6e863 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaRegionReplicaSet.java
@@ -16,18 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.commons.partition;
 
-import org.apache.iotdb.db.mpp.common.MPPQueryContext;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
 
 import java.util.List;
 
-public class DistributedQueryPlan {
-  private MPPQueryContext context;
-  private PlanNode rootNode;
-  private PlanFragment rootFragment;
+public class SchemaRegionReplicaSet {
+  private SchemaRegionId schemaRegionId;
+  private List<EndPoint> endPointList;
 
-  // TODO: consider whether this field is necessary when do the implementation
-  private List<PlanFragment> fragments;
+  public SchemaRegionId getSchemaRegionId() {
+    return schemaRegionId;
+  }
+
+  public void setSchemaRegionId(SchemaRegionId schemaRegionId) {
+    this.schemaRegionId = schemaRegionId;
+  }
+
+  public List<EndPoint> getEndPointList() {
+    return endPointList;
+  }
+
+  public void setEndPointList(List<EndPoint> endPointList) {
+    this.endPointList = endPointList;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/DataRegionTimeSlice.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/TimePartitionId.java
similarity index 72%
rename from server/src/main/java/org/apache/iotdb/db/mpp/common/DataRegionTimeSlice.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/partition/TimePartitionId.java
index 65dcd39..0acbb43 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/DataRegionTimeSlice.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/TimePartitionId.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,9 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.common;
+package org.apache.iotdb.commons.partition;
 
-// TODO: (xingtanzjr) This class should be substituted with the class defined in Consensus level
-public class DataRegionTimeSlice {
-  long startTimestamp;
+public class TimePartitionId {
+  private long startTime;
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
index aa85300..8e9de62 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
@@ -48,4 +48,8 @@ public class FragmentInstanceId {
   public String getInstanceId() {
     return instanceId;
   }
+
+  public String toString() {
+    return fullId;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index 4a29263..2a348d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -23,7 +23,19 @@ package org.apache.iotdb.db.mpp.common;
  * info and so on
  */
 public class MPPQueryContext {
-  private String statement;
+  private String sql;
   private QueryId queryId;
-  private QuerySession session;
+  private SessionInfo session;
+
+  public MPPQueryContext() {}
+
+  public MPPQueryContext(String sql, QueryId queryId, SessionInfo session) {
+    this.sql = sql;
+    this.queryId = queryId;
+    this.session = session;
+  }
+
+  public QueryId getQueryId() {
+    return queryId;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
similarity index 77%
rename from server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentId.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
index ef0df6b..410f271 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/PlanFragmentId.java
@@ -23,26 +23,26 @@ import java.util.List;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 
-public class FragmentId {
+public class PlanFragmentId {
 
   private final QueryId queryId;
   private final int id;
 
-  public static FragmentId valueOf(String stageId) {
+  public static PlanFragmentId valueOf(String stageId) {
     List<String> ids = QueryId.parseDottedId(stageId, 2, "stageId");
     return valueOf(ids);
   }
 
-  public static FragmentId valueOf(List<String> ids) {
+  public static PlanFragmentId valueOf(List<String> ids) {
     checkArgument(ids.size() == 2, "Expected two ids but got: %s", ids);
-    return new FragmentId(new QueryId(ids.get(0)), Integer.parseInt(ids.get(1)));
+    return new PlanFragmentId(new QueryId(ids.get(0)), Integer.parseInt(ids.get(1)));
   }
 
-  public FragmentId(String queryId, int id) {
+  public PlanFragmentId(String queryId, int id) {
     this(new QueryId(queryId), id);
   }
 
-  public FragmentId(QueryId queryId, int id) {
+  public PlanFragmentId(QueryId queryId, int id) {
     this.queryId = requireNonNull(queryId, "queryId is null");
     this.id = id;
   }
@@ -54,4 +54,8 @@ public class FragmentId {
   public int getId() {
     return id;
   }
+
+  public String toString() {
+    return String.format("%s.%d", queryId, id);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/SessionInfo.java
similarity index 83%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/common/SessionInfo.java
index 38a56f3..ad64267 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/SessionInfo.java
@@ -16,12 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.db.mpp.common;
 
-public class PlanFragmentId {
-  private String id;
+import java.time.ZoneId;
 
-  public PlanFragmentId(String id) {
-    this.id = id;
-  }
+public class SessionInfo {
+  private String userName;
+  private ZoneId zoneId;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index f6c85e8..65d92b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -18,7 +18,11 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.common.SessionInfo;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.db.mpp.sql.statement.Statement;
 
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -31,14 +35,30 @@ public class Coordinator {
 
   private ConcurrentHashMap<QueryId, QueryExecution> queryExecutionMap;
 
-  private QueryExecution createQueryExecution() {
-    return null;
+  public static Coordinator getInstance() {
+    return new Coordinator();
+  }
+
+  private QueryExecution createQueryExecution(Statement statement, MPPQueryContext queryContext) {
+    return new QueryExecution(statement, queryContext);
   }
 
   private QueryExecution getQueryExecutionById() {
     return null;
   }
 
+  public ExecutionResult execute(
+      Statement statement, QueryId queryId, QueryType queryType, SessionInfo session, String sql) {
+
+    QueryExecution execution =
+        createQueryExecution(statement, new MPPQueryContext(sql, queryId, session));
+    queryExecutionMap.put(queryId, execution);
+
+    execution.start();
+
+    return execution.getResult();
+  }
+
   //    private TQueryResponse executeQuery(TQueryRequest request) {
   //
   //    }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionResult.java
similarity index 70%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionResult.java
index 38a56f3..4dae202 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecutionResult.java
@@ -16,12 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.db.mpp.execution;
 
-public class PlanFragmentId {
-  private String id;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-  public PlanFragmentId(String id) {
-    this.id = id;
+public class ExecutionResult {
+  public QueryId queryId;
+  public TSStatus status;
+
+  public ExecutionResult(QueryId queryId, TSStatus status) {
+    this.queryId = queryId;
+    this.status = status;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java
index 7bd9300..9e3412e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java
@@ -18,21 +18,21 @@
  */
 package org.apache.iotdb.db.mpp.execution;
 
-import org.apache.iotdb.db.mpp.common.FragmentId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
 
 import java.util.List;
 
 public class FragmentInfo {
 
-  private final FragmentId stageId;
+  private final PlanFragmentId stageId;
   private final FragmentState state;
   private final PlanFragment plan;
 
   private final List<FragmentInfo> childrenFragments;
 
   public FragmentInfo(
-      FragmentId stageId,
+      PlanFragmentId stageId,
       FragmentState state,
       PlanFragment plan,
       List<FragmentInfo> childrenFragments) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index 7e14bf3..0b15ef1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -22,14 +22,19 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
 import org.apache.iotdb.db.mpp.execution.scheduler.IScheduler;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.analyze.Analyzer;
 import org.apache.iotdb.db.mpp.sql.optimization.PlanOptimizer;
 import org.apache.iotdb.db.mpp.sql.planner.DistributionPlanner;
 import org.apache.iotdb.db.mpp.sql.planner.LogicalPlanner;
 import org.apache.iotdb.db.mpp.sql.planner.plan.*;
+import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import static org.apache.iotdb.rpc.RpcUtils.getStatus;
+
 /**
  * QueryExecution stores all the status of a query which is being prepared or running inside the MPP
  * frame. It takes three main responsibilities: 1. Prepare a query. Transform a query from statement
@@ -43,50 +48,45 @@ public class QueryExecution {
 
   private List<PlanOptimizer> planOptimizers;
 
-  private Analysis analysis;
+  private final Analysis analysis;
   private LogicalQueryPlan logicalPlan;
   private DistributedQueryPlan distributedPlan;
-  private List<PlanFragment> fragments;
   private List<FragmentInstance> fragmentInstances;
 
-  public QueryExecution(MPPQueryContext context) {
+  public QueryExecution(Statement statement, MPPQueryContext context) {
     this.context = context;
+    this.analysis = analyze(statement, context);
   }
 
-  public void plan() {
-    analyze();
+  public void start() {
     doLogicalPlan();
     doDistributedPlan();
-    planFragmentInstances();
-  }
-
-  public void schedule() {
-    this.scheduler = new ClusterScheduler(this.stateMachine, this.fragmentInstances);
-    this.scheduler.start();
+    schedule();
   }
 
   // Analyze the statement in QueryContext. Generate the analysis this query need
-  public void analyze() {
+  private static Analysis analyze(Statement statement, MPPQueryContext context) {
     // initialize the variable `analysis`
+    return new Analyzer(context).analyze(statement);
+  }
 
+  private void schedule() {
+    this.scheduler = new ClusterScheduler(this.stateMachine, this.fragmentInstances);
+    this.scheduler.start();
   }
 
   // Use LogicalPlanner to do the logical query plan and logical optimization
-  public void doLogicalPlan() {
+  private void doLogicalPlan() {
     LogicalPlanner planner = new LogicalPlanner(this.context, this.planOptimizers);
     this.logicalPlan = planner.plan(this.analysis);
   }
 
   // Generate the distributed plan and split it into fragments
-  public void doDistributedPlan() {
+  private void doDistributedPlan() {
     DistributionPlanner planner = new DistributionPlanner(this.analysis, this.logicalPlan);
     this.distributedPlan = planner.planFragments();
   }
 
-  // Convert fragment to detailed instance
-  // And for parallel-able fragment, clone it into several instances with different params.
-  public void planFragmentInstances() {}
-
   /**
    * This method will be called by the request thread from client connection. This method will block
    * until one of these conditions occurs: 1. There is a batch of result 2. There is no more result
@@ -97,4 +97,9 @@ public class QueryExecution {
   public ByteBuffer getBatchResult() {
     return null;
   }
+
+  public ExecutionResult getResult() {
+
+    return new ExecutionResult(context.getQueryId(), getStatus(TSStatusCode.SUCCESS_STATUS));
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
index 4af8b31..0a18d07 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/ClusterScheduler.java
@@ -18,8 +18,8 @@
  */
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
-import org.apache.iotdb.db.mpp.common.FragmentId;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.execution.FragmentInfo;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
@@ -67,7 +67,7 @@ public class ClusterScheduler implements IScheduler {
   public void failFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {}
 
   @Override
-  public void cancelFragment(FragmentId fragmentId) {}
+  public void cancelFragment(PlanFragmentId planFragmentId) {}
 
   // Send the instances to other nodes
   private void sendFragmentInstances() {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java
index 16145fd..1ecd4e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/IScheduler.java
@@ -18,8 +18,8 @@
  */
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
-import org.apache.iotdb.db.mpp.common.FragmentId;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.execution.FragmentInfo;
 
 import io.airlift.units.Duration;
@@ -36,5 +36,5 @@ public interface IScheduler {
 
   void failFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause);
 
-  void cancelFragment(FragmentId fragmentId);
+  void cancelFragment(PlanFragmentId planFragmentId);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
index 10de6b5..d876561 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
@@ -20,8 +20,8 @@ package org.apache.iotdb.db.mpp.execution.scheduler;
 
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.metadata.SchemaEngine;
-import org.apache.iotdb.db.mpp.common.FragmentId;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.execution.FragmentInfo;
 
 import io.airlift.units.Duration;
@@ -52,5 +52,5 @@ public class StandaloneScheduler implements IScheduler {
   public void failFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {}
 
   @Override
-  public void cancelFragment(FragmentId fragmentId) {}
+  public void cancelFragment(PlanFragmentId planFragmentId) {}
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
index dda47dc..a5e1d17 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analysis.java
@@ -19,12 +19,13 @@
 
 package org.apache.iotdb.db.mpp.sql.analyze;
 
-import org.apache.iotdb.db.metadata.SchemaRegion;
+import org.apache.iotdb.commons.partition.DataPartitionInfo;
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
+import org.apache.iotdb.commons.partition.SchemaPartitionInfo;
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.common.DataRegion;
-import org.apache.iotdb.db.mpp.common.DataRegionTimeSlice;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.util.*;
 
@@ -39,28 +40,18 @@ public class Analysis {
   // Statement
   private Statement statement;
 
-  // DataPartitionInfo
-  private Map<String, Map<DataRegionTimeSlice, List<DataRegion>>> dataPartitionInfo;
-
-  // SchemaPartitionInfo
-  private Map<String, List<SchemaRegion>> schemaPartitionInfo;
-
-  public Set<DataRegion> getPartitionInfo(PartialPath seriesPath, Filter timefilter) {
-    if (timefilter == null) {
-      // TODO: (xingtanzjr) we need to have a method to get the deviceGroup by device
-      String deviceGroup = seriesPath.getDevice();
-      Set<DataRegion> result = new HashSet<>();
-      this.dataPartitionInfo.get(deviceGroup).values().forEach(result::addAll);
-      return result;
-    } else {
-      // TODO: (xingtanzjr) complete this branch
-      return null;
-    }
-  }
+  // indicate whether this statement is write or read
+  private QueryType queryType;
 
-  public void setDataPartitionInfo(
-      Map<String, Map<DataRegionTimeSlice, List<DataRegion>>> dataPartitionInfo) {
-    this.dataPartitionInfo = dataPartitionInfo;
+  private DataPartitionInfo dataPartitionInfo;
+
+  private SchemaPartitionInfo schemaPartitionInfo;
+
+  private Map<String, MeasurementSchema> schemaMap;
+
+  public List<DataRegionReplicaSet> getPartitionInfo(PartialPath seriesPath, Filter timefilter) {
+    // TODO: (xingtanzjr) implement the calculation of timePartitionIdList
+    return dataPartitionInfo.getDataRegionReplicaSet(seriesPath.getDevice(), null);
   }
 
   public Statement getStatement() {
@@ -70,4 +61,28 @@ public class Analysis {
   public void setStatement(Statement statement) {
     this.statement = statement;
   }
+
+  public DataPartitionInfo getDataPartitionInfo() {
+    return dataPartitionInfo;
+  }
+
+  public void setDataPartitionInfo(DataPartitionInfo dataPartitionInfo) {
+    this.dataPartitionInfo = dataPartitionInfo;
+  }
+
+  public SchemaPartitionInfo getSchemaPartitionInfo() {
+    return schemaPartitionInfo;
+  }
+
+  public void setSchemaPartitionInfo(SchemaPartitionInfo schemaPartitionInfo) {
+    this.schemaPartitionInfo = schemaPartitionInfo;
+  }
+
+  public Map<String, MeasurementSchema> getSchemaMap() {
+    return schemaMap;
+  }
+
+  public void setSchemaMap(Map<String, MeasurementSchema> schemaMap) {
+    this.schemaMap = schemaMap;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StatementAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
similarity index 68%
rename from server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StatementAnalyzer.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
index 6bd7113..c1f1f28 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StatementAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/Analyzer.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.mpp.sql.analyze;
 
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.commons.partition.PartitionInfo;
 import org.apache.iotdb.db.exception.query.PathNumOverLimitException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
@@ -31,18 +33,26 @@ import org.apache.iotdb.db.mpp.sql.rewriter.RemoveNotOptimizer;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
 import org.apache.iotdb.db.mpp.sql.statement.component.WhereCondition;
 import org.apache.iotdb.db.mpp.sql.statement.crud.InsertStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.mpp.sql.statement.crud.QueryStatement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.sql.tree.StatementVisitor;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.Arrays;
+import java.util.Map;
 
 /** Analyze the statement and generate Analysis. */
-public class StatementAnalyzer {
+public class Analyzer {
 
-  private final Analysis analysis;
   private final MPPQueryContext context;
 
-  public StatementAnalyzer(Analysis analysis, MPPQueryContext context) {
-    this.analysis = analysis;
+  // TODO need to use factory to decide standalone or cluster
+  private final IPartitionFetcher partitionFetcher = StandalonePartitionFetcher.getInstance();
+  // TODO need to use factory to decide standalone or cluster
+  private final ISchemaFetcher schemaFetcher = StandaloneSchemaFetcher.getInstance();
+
+  public Analyzer(MPPQueryContext context) {
     this.context = context;
   }
 
@@ -55,12 +65,14 @@ public class StatementAnalyzer {
 
     @Override
     public Analysis visitStatement(Statement statement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
       analysis.setStatement(statement);
       return analysis;
     }
 
     @Override
     public Analysis visitQuery(QueryStatement queryStatement, MPPQueryContext context) {
+      Analysis analysis = new Analysis();
       try {
         // check for semantic errors
         queryStatement.selfCheck();
@@ -90,6 +102,7 @@ public class StatementAnalyzer {
     @Override
     public Analysis visitInsert(InsertStatement insertStatement, MPPQueryContext context) {
       // TODO: do analyze for insert statement
+      Analysis analysis = new Analysis();
       analysis.setStatement(insertStatement);
       return analysis;
     }
@@ -109,8 +122,34 @@ public class StatementAnalyzer {
           }
         }
       }
+      Analysis analysis = new Analysis();
       analysis.setStatement(createTimeSeriesStatement);
       return analysis;
     }
+
+    @Override
+    public Analysis visitInsertTablet(
+        InsertTabletStatement insertTabletStatement, MPPQueryContext context) {
+      // TODO(INSERT) device + time range -> PartitionInfo
+      DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam();
+      dataPartitionQueryParam.setDeviceId(insertTabletStatement.getDevicePath().getFullPath());
+      // TODO(INSERT) calculate the time partition id list
+      //      dataPartitionQueryParam.setTimePartitionIdList();
+      PartitionInfo partitionInfo = partitionFetcher.fetchPartitionInfo(dataPartitionQueryParam);
+
+      // TODO(INSERT) get each time series schema according to SchemaPartitionInfo in PartitionInfo
+      Map<String, MeasurementSchema> schemaMap =
+          schemaFetcher.fetchSchema(
+              insertTabletStatement.getDevicePath(),
+              Arrays.asList(insertTabletStatement.getMeasurements()));
+
+      Analysis analysis = new Analysis();
+      analysis.setSchemaMap(schemaMap);
+      // TODO(INSERT) do type check here
+      analysis.setStatement(insertTabletStatement);
+      analysis.setDataPartitionInfo(partitionInfo.getDataPartitionInfo());
+      analysis.setSchemaPartitionInfo(partitionInfo.getSchemaPartitionInfo());
+      return analysis;
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
similarity index 52%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
index 6151916..cdecf45 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
@@ -16,41 +16,44 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
+package org.apache.iotdb.db.mpp.sql.analyze;
 
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.commons.partition.DataPartitionInfo;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.commons.partition.PartitionInfo;
+import org.apache.iotdb.commons.partition.SchemaPartitionInfo;
 
 import java.util.List;
 
-public class FragmentSinkNode extends SinkNode {
-  public FragmentSinkNode(PlanNodeId id) {
-    super(id);
-  }
+public class ClusterPartitionFetcher implements IPartitionFetcher {
 
   @Override
-  public List<PlanNode> getChildren() {
+  public DataPartitionInfo fetchDataPartitionInfo(DataPartitionQueryParam parameter) {
     return null;
   }
 
   @Override
-  public PlanNode clone() {
+  public DataPartitionInfo fetchDataPartitionInfos(List<DataPartitionQueryParam> parameterList) {
     return null;
   }
 
   @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
+  public SchemaPartitionInfo fetchSchemaPartitionInfo(String deviceId) {
     return null;
   }
 
   @Override
-  public List<String> getOutputColumnNames() {
+  public SchemaPartitionInfo fetchSchemaPartitionInfos(List<String> deviceId) {
     return null;
   }
 
   @Override
-  public void send() {}
+  public PartitionInfo fetchPartitionInfo(DataPartitionQueryParam parameter) {
+    return null;
+  }
 
   @Override
-  public void close() throws Exception {}
+  public PartitionInfo fetchPartitionInfos(List<DataPartitionQueryParam> parameterList) {
+    return null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
similarity index 66%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
index 8cf0341..c7092d8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterSchemaFetcher.java
@@ -16,18 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.db.mpp.sql.analyze;
 
-import org.apache.iotdb.db.mpp.common.MPPQueryContext;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.util.List;
+import java.util.Map;
 
-public class DistributedQueryPlan {
-  private MPPQueryContext context;
-  private PlanNode rootNode;
-  private PlanFragment rootFragment;
+public class ClusterSchemaFetcher implements ISchemaFetcher {
 
-  // TODO: consider whether this field is necessary when do the implementation
-  private List<PlanFragment> fragments;
+  @Override
+  public Map<String, MeasurementSchema> fetchSchema(
+      PartialPath deviceId, List<String> measurementIdList) {
+    return null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/IPartitionFetcher.java
similarity index 51%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/IPartitionFetcher.java
index 6151916..d29c601 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/IPartitionFetcher.java
@@ -16,41 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
+package org.apache.iotdb.db.mpp.sql.analyze;
 
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.commons.partition.DataPartitionInfo;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.commons.partition.PartitionInfo;
+import org.apache.iotdb.commons.partition.SchemaPartitionInfo;
 
 import java.util.List;
 
-public class FragmentSinkNode extends SinkNode {
-  public FragmentSinkNode(PlanNodeId id) {
-    super(id);
-  }
+public interface IPartitionFetcher {
 
-  @Override
-  public List<PlanNode> getChildren() {
-    return null;
-  }
+  DataPartitionInfo fetchDataPartitionInfo(DataPartitionQueryParam parameter);
 
-  @Override
-  public PlanNode clone() {
-    return null;
-  }
+  DataPartitionInfo fetchDataPartitionInfos(List<DataPartitionQueryParam> parameterList);
 
-  @Override
-  public PlanNode cloneWithChildren(List<PlanNode> children) {
-    return null;
-  }
+  SchemaPartitionInfo fetchSchemaPartitionInfo(String deviceId);
 
-  @Override
-  public List<String> getOutputColumnNames() {
-    return null;
-  }
+  SchemaPartitionInfo fetchSchemaPartitionInfos(List<String> deviceId);
 
-  @Override
-  public void send() {}
+  PartitionInfo fetchPartitionInfo(DataPartitionQueryParam parameter);
 
-  @Override
-  public void close() throws Exception {}
+  PartitionInfo fetchPartitionInfos(List<DataPartitionQueryParam> parameterList);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/metadata/IMetadataFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ISchemaFetcher.java
similarity index 72%
rename from server/src/main/java/org/apache/iotdb/db/mpp/sql/metadata/IMetadataFetcher.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ISchemaFetcher.java
index bf326c5..c2ac230 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/metadata/IMetadataFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ISchemaFetcher.java
@@ -17,9 +17,18 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.sql.metadata;
+package org.apache.iotdb.db.mpp.sql.analyze;
+
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.List;
+import java.util.Map;
 
 /**
  * This interface is used to fetch the metadata information required in execution plan generating.
  */
-public interface IMetadataFetcher {}
+public interface ISchemaFetcher {
+
+  Map<String, MeasurementSchema> fetchSchema(PartialPath deviceId, List<String> measurementIdList);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/QuerySession.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/QueryType.java
similarity index 90%
rename from server/src/main/java/org/apache/iotdb/db/mpp/common/QuerySession.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/QueryType.java
index 3f1d165..8682628 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/QuerySession.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/QueryType.java
@@ -16,6 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.common;
+package org.apache.iotdb.db.mpp.sql.analyze;
 
-public class QuerySession {}
+public enum QueryType {
+  WRITE,
+  READ;
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
new file mode 100644
index 0000000..ca274a5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.analyze;
+
+import org.apache.iotdb.commons.partition.DataPartitionInfo;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.commons.partition.PartitionInfo;
+import org.apache.iotdb.commons.partition.SchemaPartitionInfo;
+
+import java.util.List;
+
+public class StandalonePartitionFetcher implements IPartitionFetcher {
+
+  private StandalonePartitionFetcher() {}
+
+  // TODO need to use safe singleton pattern
+  public static StandalonePartitionFetcher getInstance() {
+    return new StandalonePartitionFetcher();
+  }
+
+  @Override
+  public DataPartitionInfo fetchDataPartitionInfo(DataPartitionQueryParam parameter) {
+    return null;
+  }
+
+  @Override
+  public DataPartitionInfo fetchDataPartitionInfos(List<DataPartitionQueryParam> parameterList) {
+    return null;
+  }
+
+  @Override
+  public SchemaPartitionInfo fetchSchemaPartitionInfo(String deviceId) {
+    return null;
+  }
+
+  @Override
+  public SchemaPartitionInfo fetchSchemaPartitionInfos(List<String> deviceId) {
+    return null;
+  }
+
+  @Override
+  public PartitionInfo fetchPartitionInfo(DataPartitionQueryParam parameter) {
+    return null;
+  }
+
+  @Override
+  public PartitionInfo fetchPartitionInfos(List<DataPartitionQueryParam> parameterList) {
+    return null;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
similarity index 60%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
index 8cf0341..21d3e39 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
@@ -16,18 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.db.mpp.sql.analyze;
 
-import org.apache.iotdb.db.mpp.common.MPPQueryContext;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.util.List;
+import java.util.Map;
 
-public class DistributedQueryPlan {
-  private MPPQueryContext context;
-  private PlanNode rootNode;
-  private PlanFragment rootFragment;
+public class StandaloneSchemaFetcher implements ISchemaFetcher {
 
-  // TODO: consider whether this field is necessary when do the implementation
-  private List<PlanFragment> fragments;
+  private StandaloneSchemaFetcher() {}
+
+  public static StandaloneSchemaFetcher getInstance() {
+    return new StandaloneSchemaFetcher();
+  }
+
+  @Override
+  public Map<String, MeasurementSchema> fetchSchema(
+      PartialPath deviceId, List<String> measurementIdList) {
+    return null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index 5a4123e..5b2a969 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@ -18,13 +18,14 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner;
 
-import org.apache.iotdb.db.mpp.common.DataRegion;
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
-import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
-import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.mpp.sql.planner.plan.*;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.*;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
 
@@ -37,6 +38,8 @@ public class DistributionPlanner {
   private Analysis analysis;
   private LogicalQueryPlan logicalPlan;
 
+  private int planFragmentIndex = 0;
+
   public DistributionPlanner(Analysis analysis, LogicalQueryPlan logicalPlan) {
     this.analysis = analysis;
     this.logicalPlan = logicalPlan;
@@ -52,8 +55,29 @@ public class DistributionPlanner {
     return adder.visit(root, new NodeGroupContext());
   }
 
+  public SubPlan splitFragment(PlanNode root) {
+    FragmentBuilder fragmentBuilder = new FragmentBuilder();
+    return fragmentBuilder.splitToSubPlan(root);
+  }
+
   public DistributedQueryPlan planFragments() {
-    return null;
+    PlanNode rootAfterRewrite = rewriteSource();
+    PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
+    SubPlan subPlan = splitFragment(rootWithExchange);
+    List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
+    return new DistributedQueryPlan(
+        logicalPlan.getContext(), subPlan, subPlan.getPlanFragmentList(), fragmentInstances);
+  }
+
+  // Convert fragment to detailed instance
+  // And for parallel-able fragment, clone it into several instances with different params.
+  public List<FragmentInstance> planFragmentInstances(SubPlan subPlan) {
+    IFragmentParallelPlaner parallelPlaner = new SimpleFragmentParallelPlanner(subPlan);
+    return parallelPlaner.parallelPlan();
+  }
+
+  private PlanFragmentId getNextFragmentId() {
+    return new PlanFragmentId(this.logicalPlan.getContext().getQueryId(), this.planFragmentIndex++);
   }
 
   private class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanContext> {
@@ -74,13 +98,13 @@ public class DistributionPlanner {
           // If the child is SeriesScanNode, we need to check whether this node should be seperated
           // into several splits.
           SeriesScanNode handle = (SeriesScanNode) child;
-          Set<DataRegion> dataDistribution =
+          List<DataRegionReplicaSet> dataDistribution =
               analysis.getPartitionInfo(handle.getSeriesPath(), handle.getTimeFilter());
           // If the size of dataDistribution is m, this SeriesScanNode should be seperated into m
           // SeriesScanNode.
-          for (DataRegion dataRegion : dataDistribution) {
+          for (DataRegionReplicaSet dataRegion : dataDistribution) {
             SeriesScanNode split = (SeriesScanNode) handle.clone();
-            split.setDataRegion(dataRegion);
+            split.setDataRegionReplicaSet(dataRegion);
             sources.add(split);
           }
         } else if (child instanceof SeriesAggregateScanNode) {
@@ -97,8 +121,8 @@ public class DistributionPlanner {
       }
 
       // Step 2: For the source nodes, group them by the DataRegion.
-      Map<DataRegion, List<SeriesScanNode>> sourceGroup =
-          sources.stream().collect(Collectors.groupingBy(SeriesScanNode::getDataRegion));
+      Map<DataRegionReplicaSet, List<SeriesScanNode>> sourceGroup =
+          sources.stream().collect(Collectors.groupingBy(SeriesScanNode::getDataRegionReplicaSet));
       // Step 3: For the source nodes which belong to same data region, add a TimeJoinNode for them
       // and make the
       // new TimeJoinNode as the child of current TimeJoinNode
@@ -148,13 +172,15 @@ public class DistributionPlanner {
 
     public PlanNode visitSeriesScan(SeriesScanNode node, NodeGroupContext context) {
       context.putNodeDistribution(
-          node.getId(), new NodeDistribution(NodeDistributionType.NO_CHILD, node.getDataRegion()));
+          node.getId(),
+          new NodeDistribution(NodeDistributionType.NO_CHILD, node.getDataRegionReplicaSet()));
       return node.clone();
     }
 
     public PlanNode visitSeriesAggregate(SeriesAggregateScanNode node, NodeGroupContext context) {
       context.putNodeDistribution(
-          node.getId(), new NodeDistribution(NodeDistributionType.NO_CHILD, node.getDataRegion()));
+          node.getId(),
+          new NodeDistribution(NodeDistributionType.NO_CHILD, node.getDataRegionReplicaSet()));
       return node.clone();
     }
 
@@ -167,7 +193,7 @@ public class DistributionPlanner {
                 visitedChildren.add(visit(child, context));
               });
 
-      DataRegion dataRegion = calculateDataRegionByChildren(visitedChildren, context);
+      DataRegionReplicaSet dataRegion = calculateDataRegionByChildren(visitedChildren, context);
       NodeDistributionType distributionType =
           nodeDistributionIsSame(visitedChildren, context)
               ? NodeDistributionType.SAME_WITH_ALL_CHILDREN
@@ -187,7 +213,7 @@ public class DistributionPlanner {
           child -> {
             if (!dataRegion.equals(context.getNodeDistribution(child.getId()).dataRegion)) {
               ExchangeNode exchangeNode = new ExchangeNode(PlanNodeIdAllocator.generateId());
-              exchangeNode.setSourceNode(child);
+              exchangeNode.setChild(child);
               newNode.addChild(exchangeNode);
             } else {
               newNode.addChild(child);
@@ -196,12 +222,11 @@ public class DistributionPlanner {
       return newNode;
     }
 
-    private DataRegion calculateDataRegionByChildren(
+    private DataRegionReplicaSet calculateDataRegionByChildren(
         List<PlanNode> children, NodeGroupContext context) {
       // We always make the dataRegion of TimeJoinNode to be the same as its first child.
       // TODO: (xingtanzjr) We need to implement more suitable policies here
-      DataRegion childDataRegion = context.getNodeDistribution(children.get(0).getId()).dataRegion;
-      return new DataRegion(childDataRegion.getDataRegionId(), childDataRegion.getEndpoint());
+      return context.getNodeDistribution(children.get(0).getId()).dataRegion;
     }
 
     private boolean nodeDistributionIsSame(List<PlanNode> children, NodeGroupContext context) {
@@ -246,11 +271,49 @@ public class DistributionPlanner {
 
   private class NodeDistribution {
     private NodeDistributionType type;
-    private DataRegion dataRegion;
+    private DataRegionReplicaSet dataRegion;
 
-    private NodeDistribution(NodeDistributionType type, DataRegion dataRegion) {
+    private NodeDistribution(NodeDistributionType type, DataRegionReplicaSet dataRegion) {
       this.type = type;
       this.dataRegion = dataRegion;
     }
   }
+
+  private class FragmentBuilder {
+    public SubPlan splitToSubPlan(PlanNode root) {
+      SubPlan rootSubPlan = createSubPlan(root);
+      splitToSubPlan(root, rootSubPlan);
+      return rootSubPlan;
+    }
+
+    private void splitToSubPlan(PlanNode root, SubPlan subPlan) {
+      if (root instanceof ExchangeNode) {
+        // We add a FragmentSinkNode for newly created PlanFragment
+        ExchangeNode exchangeNode = (ExchangeNode) root;
+        FragmentSinkNode sinkNode = new FragmentSinkNode(PlanNodeIdAllocator.generateId());
+        sinkNode.setChild(exchangeNode.getChild());
+        sinkNode.setDownStreamNode(exchangeNode);
+        // Record the source node info in the ExchangeNode so that we can keep the connection of
+        // these nodes/fragments
+        exchangeNode.setRemoteSourceNode(sinkNode);
+        // We cut off the subtree to make the ExchangeNode as the leaf node of current PlanFragment
+        exchangeNode.cleanChildren();
+
+        // Build the child SubPlan Tree
+        SubPlan childSubPlan = createSubPlan(sinkNode);
+        splitToSubPlan(sinkNode, childSubPlan);
+
+        subPlan.addChild(childSubPlan);
+        return;
+      }
+      for (PlanNode child : root.getChildren()) {
+        splitToSubPlan(child, subPlan);
+      }
+    }
+
+    private SubPlan createSubPlan(PlanNode root) {
+      PlanFragment fragment = new PlanFragment(getNextFragmentId(), root);
+      return new SubPlan(fragment);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
index a997705..dcacb70 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.mpp.sql.statement.crud.QueryStatement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.sql.tree.StatementVisitor;
@@ -43,7 +45,7 @@ public class LogicalPlanner {
   }
 
   public LogicalQueryPlan plan(Analysis analysis) {
-    PlanNode rootNode = new LogicalPlanVisitor().process(analysis.getStatement());
+    PlanNode rootNode = new LogicalPlanVisitor(analysis).process(analysis.getStatement());
 
     // optimize the query logical plan
     if (analysis.getStatement() instanceof QueryStatement) {
@@ -62,6 +64,12 @@ public class LogicalPlanner {
   private static final class LogicalPlanVisitor
       extends StatementVisitor<PlanNode, MPPQueryContext> {
 
+    private final Analysis analysis;
+
+    public LogicalPlanVisitor(Analysis analysis) {
+      this.analysis = analysis;
+    }
+
     @Override
     public PlanNode visitQuery(QueryStatement queryStatement, MPPQueryContext context) {
       // TODO: Generate logical planNode tree for query statement
@@ -82,5 +90,14 @@ public class LogicalPlanner {
           createTimeSeriesStatement.getAttributes(),
           createTimeSeriesStatement.getAlias());
     }
+
+    @Override
+    public PlanNode visitInsertTablet(
+        InsertTabletStatement insertTabletStatement, MPPQueryContext context) {
+      // TODO(INSERT) change the InsertTabletStatement to InsertTabletNode
+      InsertTabletNode node = new InsertTabletNode(PlanNodeIdAllocator.generateId());
+
+      return node;
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
index 8cf0341..2df5323 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
@@ -19,15 +19,39 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan;
 
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 
 import java.util.List;
 
 public class DistributedQueryPlan {
   private MPPQueryContext context;
-  private PlanNode rootNode;
-  private PlanFragment rootFragment;
-
-  // TODO: consider whether this field is necessary when do the implementation
+  private SubPlan rootSubPlan;
   private List<PlanFragment> fragments;
+  private List<FragmentInstance> instances;
+
+  public DistributedQueryPlan(
+      MPPQueryContext context,
+      SubPlan rootSubPlan,
+      List<PlanFragment> fragments,
+      List<FragmentInstance> instances) {
+    this.context = context;
+    this.rootSubPlan = rootSubPlan;
+    this.fragments = fragments;
+    this.instances = instances;
+  }
+
+  public List<PlanFragment> getFragments() {
+    return fragments;
+  }
+
+  public SubPlan getRootSubPlan() {
+    return rootSubPlan;
+  }
+
+  public MPPQueryContext getContext() {
+    return context;
+  }
+
+  public List<FragmentInstance> getInstances() {
+    return instances;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index 7674f4f..5749c36 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -18,14 +18,79 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan;
 
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
 
 public class FragmentInstance {
   private FragmentInstanceId id;
 
   // The reference of PlanFragment which this instance is generated from
   private PlanFragment fragment;
+  // The DataRegion where the FragmentInstance should run
+  private DataRegionReplicaSet dataRegion;
+  private EndPoint hostEndpoint;
 
   // We can add some more params for a specific FragmentInstance
   // So that we can make different FragmentInstance owns different data range.
+
+  public FragmentInstance(PlanFragment fragment, int index) {
+    this.fragment = fragment;
+    this.id = generateId(fragment.getId(), index);
+  }
+
+  public static FragmentInstanceId generateId(PlanFragmentId id, int index) {
+    return new FragmentInstanceId(String.format("%s.%d", id, index));
+  }
+
+  public DataRegionReplicaSet getDataRegionId() {
+    return dataRegion;
+  }
+
+  public void setDataRegionId(DataRegionReplicaSet dataRegion) {
+    this.dataRegion = dataRegion;
+  }
+
+  public EndPoint getHostEndpoint() {
+    return hostEndpoint;
+  }
+
+  public void setHostEndpoint(EndPoint hostEndpoint) {
+    this.hostEndpoint = hostEndpoint;
+  }
+
+  public PlanFragment getFragment() {
+    return fragment;
+  }
+
+  public FragmentInstanceId getId() {
+    return id;
+  }
+
+  public String getDownstreamInfo() {
+    PlanNode root = getFragment().getRoot();
+    if (root instanceof FragmentSinkNode) {
+      FragmentSinkNode sink = (FragmentSinkNode) root;
+      return String.format(
+          "(%s, %s, %s)",
+          sink.getDownStreamEndpoint(), sink.getDownStreamInstanceId(), sink.getDownStreamNode());
+    }
+    return "<No downstream>";
+  }
+
+  public String toString() {
+    StringBuilder ret = new StringBuilder();
+    ret.append(
+        String.format(
+            "FragmentInstance-%s:[Host: %s/%s]\n",
+            getId(), getHostEndpoint().getIp(), getDataRegionId().getId()));
+    ret.append("---- Plan Node Tree ----\n");
+    ret.append(PlanNodeUtil.nodeToString(getFragment().getRoot()));
+    ret.append("\n");
+    return ret.toString();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IFragmentParallelPlaner.java
similarity index 61%
copy from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IFragmentParallelPlaner.java
index 8cf0341..36be61f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/DistributedQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/IFragmentParallelPlaner.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,18 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.sql.planner.plan;
 
-import org.apache.iotdb.db.mpp.common.MPPQueryContext;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+package org.apache.iotdb.db.mpp.sql.planner.plan;
 
 import java.util.List;
 
-public class DistributedQueryPlan {
-  private MPPQueryContext context;
-  private PlanNode rootNode;
-  private PlanFragment rootFragment;
-
-  // TODO: consider whether this field is necessary when do the implementation
-  private List<PlanFragment> fragments;
+/**
+ * The interface is used to transform one PlanFragment into one or more FragmentInstances which
+ * could run in parallel
+ */
+public interface IFragmentParallelPlaner {
+  /**
+   * The relation between each PlanFragment is necessary because sometimes we need to change the
+   * source/sink for each FragmentInstance according to its upstream/downstream
+   *
+   * @return All the FragmentInstances which can run in parallel
+   */
+  List<FragmentInstance> parallelPlan();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
index 0aa3ac5..173a8de 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragment.java
@@ -18,11 +18,70 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan;
 
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
 
-// TODO: consider whether it is necessary to make PlanFragment as a TreeNode
 /** PlanFragment contains a sub-query of distributed query. */
 public class PlanFragment {
   private PlanFragmentId id;
   private PlanNode root;
+
+  public PlanFragment(PlanFragmentId id, PlanNode root) {
+    this.id = id;
+    this.root = root;
+  }
+
+  public PlanFragmentId getId() {
+    return id;
+  }
+
+  public PlanNode getRoot() {
+    return root;
+  }
+
+  public String toString() {
+    return String.format("PlanFragment-%s", getId());
+  }
+
+  // Every Fragment should only run in DataRegion.
+  // But it can select any one of the Endpoint of the target DataRegion
+  // In current version, one PlanFragment should contain at least one SourceNode,
+  // and the DataRegions of all SourceNodes should be same in one PlanFragment.
+  // So we can use the DataRegion of one SourceNode as the PlanFragment's DataRegion.
+  public DataRegionReplicaSet getTargetDataRegion() {
+    return getNodeDataRegion(root);
+  }
+
+  private DataRegionReplicaSet getNodeDataRegion(PlanNode root) {
+    if (root instanceof SourceNode) {
+      return ((SourceNode) root).getDataRegionReplicaSet();
+    }
+    for (PlanNode child : root.getChildren()) {
+      DataRegionReplicaSet result = getNodeDataRegion(child);
+      if (result != null) {
+        return result;
+      }
+    }
+    return null;
+  }
+
+  public PlanNode getPlanNodeById(PlanNodeId nodeId) {
+    return getPlanNodeById(root, nodeId);
+  }
+
+  private PlanNode getPlanNodeById(PlanNode root, PlanNodeId nodeId) {
+    if (root.getId().equals(nodeId)) {
+      return root;
+    }
+    for (PlanNode child : root.getChildren()) {
+      PlanNode node = getPlanNodeById(child, nodeId);
+      if (node != null) {
+        return node;
+      }
+    }
+    return null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
new file mode 100644
index 0000000..3603310
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.sql.planner.plan;
+
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeUtil;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A simple implementation of IFragmentParallelPlaner. This planner will transform one PlanFragment
+ * into only one FragmentInstance.
+ */
+public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
+
+  private SubPlan subPlan;
+
+  // Record all the FragmentInstances belonged to same PlanFragment
+  Map<PlanFragmentId, FragmentInstance> instanceMap;
+  // Record which PlanFragment the PlanNode belongs
+  Map<PlanNodeId, PlanFragmentId> planNodeMap;
+  List<FragmentInstance> fragmentInstanceList;
+
+  public SimpleFragmentParallelPlanner(SubPlan subPlan) {
+    this.subPlan = subPlan;
+    this.instanceMap = new HashMap<>();
+    this.planNodeMap = new HashMap<>();
+    this.fragmentInstanceList = new ArrayList<>();
+  }
+
+  @Override
+  public List<FragmentInstance> parallelPlan() {
+    prepare();
+    calculateNodeTopologyBetweenInstance();
+    return fragmentInstanceList;
+  }
+
+  private void prepare() {
+    List<PlanFragment> fragments = subPlan.getPlanFragmentList();
+    for (PlanFragment fragment : fragments) {
+      recordPlanNodeRelation(fragment.getRoot(), fragment.getId());
+      produceFragmentInstance(fragment);
+    }
+  }
+
+  private void produceFragmentInstance(PlanFragment fragment) {
+    // If one PlanFragment will produce several FragmentInstance, the instanceIdx will be increased
+    // one by one
+    int instanceIdx = 0;
+    PlanNode rootCopy = PlanNodeUtil.deepCopy(fragment.getRoot());
+    FragmentInstance fragmentInstance = new FragmentInstance(fragment, instanceIdx);
+
+    // Get the target DataRegion for origin PlanFragment, then its instance will be distributed one
+    // of them.
+    DataRegionReplicaSet dataRegion = fragment.getTargetDataRegion();
+
+    // Set DataRegion and target host for the instance
+    // We need to store all the replica host in case of the scenario that the instance need to be
+    // redirected
+    // to another host when scheduling
+    fragmentInstance.setDataRegionId(dataRegion);
+
+    // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current
+    // instance
+    fragmentInstance.setHostEndpoint(dataRegion.getEndPointList().get(0));
+    instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
+    fragmentInstanceList.add(fragmentInstance);
+  }
+
+  private void calculateNodeTopologyBetweenInstance() {
+    for (FragmentInstance instance : fragmentInstanceList) {
+      PlanNode rootNode = instance.getFragment().getRoot();
+      if (rootNode instanceof FragmentSinkNode) {
+        // Set target Endpoint for FragmentSinkNode
+        FragmentSinkNode sinkNode = (FragmentSinkNode) rootNode;
+        PlanNodeId downStreamNodeId = sinkNode.getDownStreamNode().getId();
+        FragmentInstance downStreamInstance = findDownStreamInstance(downStreamNodeId);
+        sinkNode.setDownStream(
+            downStreamInstance.getHostEndpoint(), downStreamInstance.getId(), downStreamNodeId);
+
+        // Set upstream info for corresponding ExchangeNode in downstream FragmentInstance
+        PlanNode downStreamExchangeNode =
+            downStreamInstance.getFragment().getPlanNodeById(downStreamNodeId);
+        ((ExchangeNode) downStreamExchangeNode)
+            .setUpstream(instance.getHostEndpoint(), instance.getId(), sinkNode.getId());
+      }
+    }
+  }
+
+  private FragmentInstance findDownStreamInstance(PlanNodeId exchangeNodeId) {
+    return instanceMap.get(planNodeMap.get(exchangeNodeId));
+  }
+
+  private void recordPlanNodeRelation(PlanNode root, PlanFragmentId planFragmentId) {
+    planNodeMap.put(root.getId(), planFragmentId);
+    for (PlanNode child : root.getChildren()) {
+      recordPlanNodeRelation(child, planFragmentId);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SubPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SubPlan.java
new file mode 100644
index 0000000..1ce2f1c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SubPlan.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.planner.plan;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SubPlan {
+  private PlanFragment planFragment;
+  private List<SubPlan> children;
+
+  public SubPlan(PlanFragment planFragment) {
+    this.planFragment = planFragment;
+    this.children = new ArrayList<>();
+  }
+
+  public void setChildren(List<SubPlan> children) {
+    this.children = children;
+  }
+
+  public void addChild(SubPlan subPlan) {
+    this.children.add(subPlan);
+  }
+
+  public String toString() {
+    StringBuilder result = new StringBuilder();
+    result.append(
+        String.format(
+            "SubPlan-%s. RootNodeId: %s\n", planFragment.getId(), planFragment.getRoot().getId()));
+    children.forEach(result::append);
+    return result.toString();
+  }
+
+  public PlanFragment getPlanFragment() {
+    return this.planFragment;
+  }
+
+  public List<SubPlan> getChildren() {
+    return this.children;
+  }
+
+  public List<PlanFragment> getPlanFragmentList() {
+    List<PlanFragment> result = new ArrayList<>();
+    result.add(this.planFragment);
+    this.children.forEach(
+        child -> {
+          result.add(child.getPlanFragment());
+        });
+    return result;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
index 1a597b2..f05a1be 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeUtil.java
@@ -18,6 +18,9 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node;
 
+import java.util.List;
+import java.util.stream.Collectors;
+
 public class PlanNodeUtil {
   public static void printPlanNode(PlanNode root) {
     printPlanNodeWithLevel(root, 0);
@@ -36,4 +39,27 @@ public class PlanNodeUtil {
       System.out.print("\t");
     }
   }
+
+  public static String nodeToString(PlanNode root) {
+    StringBuilder result = new StringBuilder();
+    nodeToString(root, 0, result);
+    return result.toString();
+  }
+
+  private static void nodeToString(PlanNode root, int level, StringBuilder result) {
+    for (int i = 0; i < level; i++) {
+      result.append("\t");
+    }
+    result.append(root.toString());
+    result.append(System.lineSeparator());
+    for (PlanNode child : root.getChildren()) {
+      nodeToString(child, level + 1, result);
+    }
+  }
+
+  public static PlanNode deepCopy(PlanNode root) {
+    List<PlanNode> children =
+        root.getChildren().stream().map(PlanNodeUtil::deepCopy).collect(Collectors.toList());
+    return root.cloneWithChildren(children);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
index 0be251d..b0ff572 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
@@ -73,4 +73,8 @@ public abstract class PlanVisitor<R, C> {
   public R visitTimeJoin(TimeJoinNode node, C context) {
     return visitPlan(node, context);
   }
+
+  public R visitExchange(ExchangeNode node, C context) {
+    return visitPlan(node, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
index 4a36a75..7eab8c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/ExchangeNode.java
@@ -19,17 +19,26 @@
 
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
-import org.apache.iotdb.db.mpp.common.FragmentId;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
 
 import com.google.common.collect.ImmutableList;
 
 import java.util.List;
 
 public class ExchangeNode extends PlanNode {
-  private PlanNode sourceNode;
-  private FragmentId sourceFragmentId;
+  private PlanNode child;
+  private FragmentSinkNode remoteSourceNode;
+
+  // In current version, one ExchangeNode will only have one source.
+  // And the fragment which the sourceNode belongs to will only have one instance.
+  // Thus, by nodeId and endpoint, the ExchangeNode can know where its source from.
+  private EndPoint upstreamEndpoint;
+  private FragmentInstanceId upstreamInstanceId;
+  private PlanNodeId upstreamPlanNodeId;
 
   public ExchangeNode(PlanNodeId id) {
     super(id);
@@ -37,43 +46,85 @@ public class ExchangeNode extends PlanNode {
 
   @Override
   public List<PlanNode> getChildren() {
-    return ImmutableList.of(sourceNode);
+    if (this.child == null) {
+      return ImmutableList.of();
+    }
+    return ImmutableList.of(child);
   }
 
   @Override
   public PlanNode clone() {
-    return new ExchangeNode(getId());
+    ExchangeNode node = new ExchangeNode(getId());
+    if (remoteSourceNode != null) {
+      node.setRemoteSourceNode((FragmentSinkNode) remoteSourceNode.clone());
+    }
+    return node;
   }
 
   @Override
   public PlanNode cloneWithChildren(List<PlanNode> children) {
-    ExchangeNode node = new ExchangeNode(getId());
-    node.setSourceNode(children.get(0));
+    ExchangeNode node = (ExchangeNode) clone();
+    if (children != null && children.size() > 0) {
+      node.setChild(children.get(0));
+    }
     return node;
   }
 
+  public void setUpstream(EndPoint endPoint, FragmentInstanceId instanceId, PlanNodeId nodeId) {
+    this.upstreamEndpoint = endPoint;
+    this.upstreamInstanceId = instanceId;
+    this.upstreamPlanNodeId = nodeId;
+  }
+
   @Override
   public List<String> getOutputColumnNames() {
     return null;
   }
 
-  public void setSourceFragmentId(FragmentId sourceFragmentId) {
-    this.sourceFragmentId = sourceFragmentId;
+  public PlanNode getChild() {
+    return child;
   }
 
-  public FragmentId getSourceFragmentId() {
-    return sourceFragmentId;
+  public void setChild(PlanNode child) {
+    this.child = child;
   }
 
-  public PlanNode getSourceNode() {
-    return sourceNode;
+  public String toString() {
+    return String.format(
+        "ExchangeNode-%s: [SourceNodeId: %s, SourceAddress:%s]",
+        getId(), remoteSourceNode.getId(), getSourceAddress());
   }
 
-  public void setSourceNode(PlanNode sourceNode) {
-    this.sourceNode = sourceNode;
+  public String getSourceAddress() {
+    if (getUpstreamEndpoint() == null) {
+      return "Not assigned";
+    }
+    return String.format(
+        "%s/%s/%s",
+        getUpstreamEndpoint().getIp(), getUpstreamInstanceId(), getUpstreamPlanNodeId());
   }
 
-  public String toString() {
-    return String.format("ExchangeNode-%s", getId());
+  public FragmentSinkNode getRemoteSourceNode() {
+    return remoteSourceNode;
+  }
+
+  public void setRemoteSourceNode(FragmentSinkNode remoteSourceNode) {
+    this.remoteSourceNode = remoteSourceNode;
+  }
+
+  public void cleanChildren() {
+    this.child = null;
+  }
+
+  public EndPoint getUpstreamEndpoint() {
+    return upstreamEndpoint;
+  }
+
+  public FragmentInstanceId getUpstreamInstanceId() {
+    return upstreamInstanceId;
+  }
+
+  public PlanNodeId getUpstreamPlanNodeId() {
+    return upstreamPlanNodeId;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
index 6151916..3144a4e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/sink/FragmentSinkNode.java
@@ -18,19 +18,31 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.sink;
 
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
+
+import com.google.common.collect.ImmutableList;
 
 import java.util.List;
 
 public class FragmentSinkNode extends SinkNode {
+  private PlanNode child;
+  private ExchangeNode downStreamNode;
+
+  private EndPoint downStreamEndpoint;
+  private FragmentInstanceId downStreamInstanceId;
+  private PlanNodeId downStreamPlanNodeId;
+
   public FragmentSinkNode(PlanNodeId id) {
     super(id);
   }
 
   @Override
   public List<PlanNode> getChildren() {
-    return null;
+    return ImmutableList.of(child);
   }
 
   @Override
@@ -53,4 +65,51 @@ public class FragmentSinkNode extends SinkNode {
 
   @Override
   public void close() throws Exception {}
+
+  public PlanNode getChild() {
+    return child;
+  }
+
+  public void setChild(PlanNode child) {
+    this.child = child;
+  }
+
+  public String toString() {
+    return String.format("FragmentSinkNode-%s:[SendTo: (%s)]", getId(), getDownStreamAddress());
+  }
+
+  public String getDownStreamAddress() {
+    if (getDownStreamEndpoint() == null) {
+      return "Not assigned";
+    }
+    return String.format(
+        "%s/%s/%s",
+        getDownStreamEndpoint().getIp(), getDownStreamInstanceId(), getDownStreamPlanNodeId());
+  }
+
+  public ExchangeNode getDownStreamNode() {
+    return downStreamNode;
+  }
+
+  public void setDownStreamNode(ExchangeNode downStreamNode) {
+    this.downStreamNode = downStreamNode;
+  }
+
+  public void setDownStream(EndPoint endPoint, FragmentInstanceId instanceId, PlanNodeId nodeId) {
+    this.downStreamEndpoint = endPoint;
+    this.downStreamInstanceId = instanceId;
+    this.downStreamPlanNodeId = nodeId;
+  }
+
+  public EndPoint getDownStreamEndpoint() {
+    return downStreamEndpoint;
+  }
+
+  public FragmentInstanceId getDownStreamInstanceId() {
+    return downStreamInstanceId;
+  }
+
+  public PlanNodeId getDownStreamPlanNodeId() {
+    return downStreamPlanNodeId;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
index 797611e..fabe25a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
 
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 
@@ -55,4 +56,14 @@ public class CsvSourceNode extends SourceNode {
 
   @Override
   public void open() throws Exception {}
+
+  @Override
+  public DataRegionReplicaSet getDataRegionReplicaSet() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setDataRegionReplicaSet(DataRegionReplicaSet dataRegionReplicaSet) {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
index 674b6b4..e5fd770 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
 
-import org.apache.iotdb.db.mpp.common.DataRegion;
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
 import org.apache.iotdb.db.mpp.common.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -61,7 +61,7 @@ public class SeriesAggregateScanNode extends SourceNode {
   private String columnName;
 
   // The id of DataRegion where the node will run
-  private DataRegion dataRegion;
+  private DataRegionReplicaSet dataRegionReplicaSet;
 
   public SeriesAggregateScanNode(PlanNodeId id) {
     super(id);
@@ -102,13 +102,19 @@ public class SeriesAggregateScanNode extends SourceNode {
   public void open() throws Exception {}
 
   @Override
-  public void close() throws Exception {}
+  public DataRegionReplicaSet getDataRegionReplicaSet() {
+    return this.dataRegionReplicaSet;
+  }
 
-  public DataRegion getDataRegion() {
-    return dataRegion;
+  @Override
+  public void setDataRegionReplicaSet(DataRegionReplicaSet dataRegionReplicaSet) {
+    this.dataRegionReplicaSet = dataRegionReplicaSet;
   }
 
   @Override
+  public void close() throws Exception {}
+
+  @Override
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitSeriesAggregate(this, context);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
index bfa2faf..d7406ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
@@ -18,8 +18,8 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
 
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.common.DataRegion;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
@@ -62,16 +62,17 @@ public class SeriesScanNode extends SourceNode {
   private String columnName;
 
   // The id of DataRegion where the node will run
-  private DataRegion dataRegion;
+  private DataRegionReplicaSet dataRegionReplicaSet;
 
   public SeriesScanNode(PlanNodeId id, PartialPath seriesPath) {
     super(id);
     this.seriesPath = seriesPath;
   }
 
-  public SeriesScanNode(PlanNodeId id, PartialPath seriesPath, DataRegion dataRegion) {
+  public SeriesScanNode(
+      PlanNodeId id, PartialPath seriesPath, DataRegionReplicaSet dataRegionReplicaSet) {
     this(id, seriesPath);
-    this.dataRegion = dataRegion;
+    this.dataRegionReplicaSet = dataRegionReplicaSet;
   }
 
   public void setTimeFilter(Filter timeFilter) {
@@ -88,6 +89,15 @@ public class SeriesScanNode extends SourceNode {
   @Override
   public void open() throws Exception {}
 
+  @Override
+  public DataRegionReplicaSet getDataRegionReplicaSet() {
+    return dataRegionReplicaSet;
+  }
+
+  public void setDataRegionReplicaSet(DataRegionReplicaSet dataRegion) {
+    this.dataRegionReplicaSet = dataRegion;
+  }
+
   public void setScanOrder(OrderBy scanOrder) {
     this.scanOrder = scanOrder;
   }
@@ -107,7 +117,7 @@ public class SeriesScanNode extends SourceNode {
 
   @Override
   public PlanNode clone() {
-    return new SeriesScanNode(getId(), getSeriesPath(), this.dataRegion);
+    return new SeriesScanNode(getId(), getSeriesPath(), this.dataRegionReplicaSet);
   }
 
   @Override
@@ -133,17 +143,9 @@ public class SeriesScanNode extends SourceNode {
     return timeFilter;
   }
 
-  public void setDataRegion(DataRegion dataRegion) {
-    this.dataRegion = dataRegion;
-  }
-
-  public DataRegion getDataRegion() {
-    return dataRegion;
-  }
-
   public String toString() {
     return String.format(
         "SeriesScanNode-%s:[SeriesPath: %s, DataRegion: %s]",
-        this.getId(), this.getSeriesPath(), this.getDataRegion());
+        this.getId(), this.getSeriesPath(), this.getDataRegionReplicaSet());
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
index 551e9d2..67c9e2b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
 
+import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 
@@ -28,4 +29,8 @@ public abstract class SourceNode extends PlanNode implements AutoCloseable {
   }
 
   public abstract void open() throws Exception;
+
+  public abstract DataRegionReplicaSet getDataRegionReplicaSet();
+
+  public abstract void setDataRegionReplicaSet(DataRegionReplicaSet dataRegionReplicaSet);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
index 2b54026..9edaf5c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
@@ -19,12 +19,12 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
 
 import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
-import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import java.util.List;
 
@@ -37,11 +37,10 @@ public abstract class InsertNode extends PlanNode {
   protected PartialPath devicePath;
 
   protected boolean isAligned;
-  protected String[] measurements;
-  // get from client
+  protected MeasurementSchema[] measurements;
   protected TSDataType[] dataTypes;
-  // get from SchemaEngine
-  protected IMeasurementMNode[] measurementMNodes;
+  // TODO(INSERT) need to change it to a function handle to update last time value
+  //  protected IMeasurementMNode[] measurementMNodes;
 
   /**
    * device id reference, for reuse device id in both id table and memtable <br>
@@ -49,15 +48,12 @@ public abstract class InsertNode extends PlanNode {
    */
   protected IDeviceID deviceID;
 
-  // record the failed measurements, their reasons, and positions in "measurements"
-  List<String> failedMeasurements;
-  private List<Exception> failedExceptions;
-  List<Integer> failedIndices;
-
   protected InsertNode(PlanNodeId id) {
     super(id);
   }
 
+  // TODO(INSERT) split this insert node into multiple InsertNode according to the data partition
+  // info
   public abstract List<InsertNode> splitByPartition(Analysis analysis);
 
   public boolean needSplit() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
index 4140719..5ce7bbb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertTabletNode.java
@@ -21,12 +21,31 @@ package org.apache.iotdb.db.mpp.sql.planner.plan.node.write;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.tsfile.utils.BitMap;
 
 import java.util.List;
 
 public class InsertTabletNode extends InsertNode {
 
-  protected InsertTabletNode(PlanNodeId id) {
+  private long[] times; // times should be sorted. It is done in the session API.
+
+  private BitMap[] bitMaps;
+  private Object[] columns;
+
+  private int start;
+  private int end;
+  // when this plan is sub-plan split from another InsertTabletPlan, this indicates the original
+  // positions of values in
+  // this plan. For example, if the plan contains 5 timestamps, and range = [1,4,10,12], then it
+  // means that the first 3
+  // timestamps in this plan are from range[1,4) of the parent plan, and the last 2 timestamps are
+  // from range[10,12)
+  // of the parent plan.
+  // this is usually used to back-propagate exceptions to the parent plan without losing their
+  // proper positions.
+  private List<Integer> range;
+
+  public InsertTabletNode(PlanNodeId id) {
     super(id);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertBaseStatement.java
similarity index 54%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertBaseStatement.java
index 7bd9300..8b70ccc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertBaseStatement.java
@@ -16,29 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.execution;
+package org.apache.iotdb.db.mpp.sql.statement.crud;
 
-import org.apache.iotdb.db.mpp.common.FragmentId;
-import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
-import java.util.List;
+public abstract class InsertBaseStatement extends Statement {
 
-public class FragmentInfo {
+  /**
+   * if use id table, this filed is id form of device path <br>
+   * if not, this filed is device path<br>
+   */
+  protected PartialPath devicePath;
 
-  private final FragmentId stageId;
-  private final FragmentState state;
-  private final PlanFragment plan;
+  protected boolean isAligned;
+  protected String[] measurements;
+  // get from client
+  protected TSDataType[] dataTypes;
 
-  private final List<FragmentInfo> childrenFragments;
+  public PartialPath getDevicePath() {
+    return devicePath;
+  }
 
-  public FragmentInfo(
-      FragmentId stageId,
-      FragmentState state,
-      PlanFragment plan,
-      List<FragmentInfo> childrenFragments) {
-    this.stageId = stageId;
-    this.state = state;
-    this.plan = plan;
-    this.childrenFragments = childrenFragments;
+  public String[] getMeasurements() {
+    return measurements;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java
similarity index 73%
rename from server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java
index 38a56f3..9a97d66 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/PlanFragmentId.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/InsertTabletStatement.java
@@ -16,12 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.sql.planner.plan;
+package org.apache.iotdb.db.mpp.sql.statement.crud;
 
-public class PlanFragmentId {
-  private String id;
+import org.apache.iotdb.tsfile.utils.BitMap;
 
-  public PlanFragmentId(String id) {
-    this.id = id;
-  }
+public class InsertTabletStatement extends InsertBaseStatement {
+
+  private long[] times; // times should be sorted. It is done in the session API.
+  private BitMap[] bitMaps;
+  private Object[] columns;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/tree/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/tree/StatementVisitor.java
index 6ff3aeb..4892b18 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/tree/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/tree/StatementVisitor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.sql.tree;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
 import org.apache.iotdb.db.mpp.sql.statement.StatementNode;
 import org.apache.iotdb.db.mpp.sql.statement.crud.InsertStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.mpp.sql.statement.crud.QueryStatement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
 
@@ -70,4 +71,8 @@ public abstract class StatementVisitor<R, C> {
   public R visitInsert(InsertStatement insertStatement, C context) {
     return visitStatement(insertStatement, context);
   }
+
+  public R visitInsertTablet(InsertTabletStatement insertTabletStatement, C context) {
+    return visitStatement(insertTabletStatement, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index 2183348..7e8430b 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -577,6 +577,7 @@ public abstract class PhysicalPlan implements IConsensusRequest {
    *
    * @throws QueryProcessException when the check fails
    */
+  // TODO(INSERT) move this check into analyze
   public void checkIntegrity() throws QueryProcessException {}
 
   public boolean isPrefixMatch() {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
index 17e0e4d..810ed6d 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.control;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.mpp.common.SessionInfo;
 import org.apache.iotdb.db.query.dataset.UDTFDataSet;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
@@ -59,6 +60,9 @@ public class SessionManager {
   private final Map<Long, IoTDBConstant.ClientVersion> sessionIdToClientVersion =
       new ConcurrentHashMap<>();
 
+  // TODO sessionIdToUsername and sessionIdToZoneId should be replaced with this
+  private final Map<Long, SessionInfo> sessionIdToSessionInfo = new ConcurrentHashMap<>();
+
   protected SessionManager() {
     // singleton
   }
@@ -219,6 +223,10 @@ public class SessionManager {
     return SessionManagerHelper.INSTANCE;
   }
 
+  public SessionInfo getSessionInfo(long sessionId) {
+    return sessionIdToSessionInfo.get(sessionId);
+  }
+
   private static class SessionManagerHelper {
 
     private static final SessionManager INSTANCE = new SessionManager();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java b/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java
index 45a42e0..924babf 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java
@@ -70,13 +70,13 @@ public abstract class ServiceProvider {
 
   public static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
 
+  public static SessionManager SESSION_MANAGER = SessionManager.getInstance();
+
+  // MPP: The following fields will be moved to Coordinator
   public static final QueryTimeManager QUERY_TIME_MANAGER = QueryTimeManager.getInstance();
   public static final TracingManager TRACING_MANAGER = TracingManager.getInstance();
   public static final QueryFrequencyRecorder QUERY_FREQUENCY_RECORDER =
       new QueryFrequencyRecorder(CONFIG);
-
-  public static SessionManager SESSION_MANAGER = SessionManager.getInstance();
-
   private final Planner planner;
   protected final IPlanExecutor executor;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index be520f4..d949e66 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -35,6 +35,11 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.metadata.template.TemplateQueryType;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.Coordinator;
+import org.apache.iotdb.db.mpp.execution.ExecutionResult;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
@@ -60,6 +65,7 @@ import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan;
 import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.SessionManager;
 import org.apache.iotdb.db.query.control.tracing.TracingConstant;
 import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
 import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet;
@@ -151,7 +157,6 @@ import static org.apache.iotdb.db.service.basic.ServiceProvider.CONFIG;
 import static org.apache.iotdb.db.service.basic.ServiceProvider.CURRENT_RPC_VERSION;
 import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_FREQUENCY_RECORDER;
 import static org.apache.iotdb.db.service.basic.ServiceProvider.QUERY_TIME_MANAGER;
-import static org.apache.iotdb.db.service.basic.ServiceProvider.SESSION_MANAGER;
 import static org.apache.iotdb.db.service.basic.ServiceProvider.SLOW_SQL_LOGGER;
 import static org.apache.iotdb.db.service.basic.ServiceProvider.TRACING_MANAGER;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
@@ -162,6 +167,10 @@ import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
 /** Thrift RPC implementation at server side. */
 public class TSServiceImpl implements TSIService.Iface {
 
+  private static final Coordinator coordinator = Coordinator.getInstance();
+
+  public static SessionManager SESSION_MANAGER = SessionManager.getInstance();
+
   protected class QueryTask implements Callable<TSExecuteStatementResp> {
 
     private PhysicalPlan plan;
@@ -1582,6 +1591,51 @@ public class TSServiceImpl implements TSIService.Iface {
     }
   }
 
+  public TSStatus insertTabletV2(TSInsertTabletReq req) {
+    long t1 = System.currentTimeMillis();
+    try {
+      // TODO (xingtanzjr) move this method to SESSION_MANAGER
+      if (!serviceProvider.checkLogin(req.getSessionId())) {
+        return getNotLoggedInStatus();
+      }
+
+      // Step 1: TODO(INSERT) transfer from TSInsertTabletReq to Statement
+      InsertTabletStatement statement = new InsertTabletStatement();
+      //      InsertTabletPlan insertTabletPlan =
+      //          new InsertTabletPlan(new PartialPath(req.getPrefixPath()), req.measurements);
+      //      insertTabletPlan.setTimes(QueryDataSetUtils.readTimesFromBuffer(req.timestamps,
+      // req.size));
+      //      insertTabletPlan.setColumns(
+      //          QueryDataSetUtils.readValuesFromBuffer(
+      //              req.values, req.types, req.types.size(), req.size));
+      //      insertTabletPlan.setBitMaps(
+      //          QueryDataSetUtils.readBitMapsFromBuffer(req.values, req.types.size(), req.size));
+      //      insertTabletPlan.setRowCount(req.size);
+      //      insertTabletPlan.setDataTypes(req.types);
+      //      insertTabletPlan.setAligned(req.isAligned);
+
+      // Step 2: call the coordinator
+      long queryId = SESSION_MANAGER.requestQueryId(false);
+      ExecutionResult result =
+          coordinator.execute(
+              statement,
+              new QueryId(String.valueOf(queryId)),
+              QueryType.WRITE,
+              SESSION_MANAGER.getSessionInfo(req.sessionId),
+              "");
+
+      // TODO(INSERT) do this check in analyze
+      //      TSStatus status = serviceProvider.checkAuthority(insertTabletPlan,
+      // req.getSessionId());
+      return result.status;
+    } catch (Exception e) {
+      return onNPEOrUnexpectedException(
+          e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } finally {
+      addOperationLatency(Operation.EXECUTE_RPC_BATCH_INSERT, t1);
+    }
+  }
+
   @Override
   public TSStatus insertTablets(TSInsertTabletsReq req) {
     long t1 = System.currentTimeMillis();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/StatementAnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java
similarity index 92%
rename from server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/StatementAnalyzerTest.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java
index 39ea9f7..6c034fd 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/StatementAnalyzerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/analyze/AnalyzerTest.java
@@ -30,7 +30,7 @@ import java.time.ZonedDateTime;
 
 import static org.junit.Assert.fail;
 
-public class StatementAnalyzerTest {
+public class AnalyzerTest {
 
   @Test
   public void samePropertyKeyTest() {
@@ -41,7 +41,7 @@ public class StatementAnalyzerTest {
 
   private void assertAnalyzeSemanticException(String sql, String message) {
     try {
-      StatementAnalyzer analyzer = new StatementAnalyzer(new Analysis(), new MPPQueryContext());
+      Analyzer analyzer = new Analyzer(new MPPQueryContext());
       analyzer.analyze(StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset()));
       fail();
     } catch (SemanticException e) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index 6bc38c9..4c224b8 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -19,29 +19,28 @@
 
 package org.apache.iotdb.db.mpp.sql.plan;
 
+import org.apache.iotdb.commons.partition.*;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.*;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.planner.DistributionPlanner;
+import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.mpp.sql.planner.plan.SubPlan;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
-import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
 
 import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 public class DistributionPlannerTest {
 
@@ -56,7 +55,7 @@ public class DistributionPlannerTest {
     timeJoinNode.addChild(
         new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s2")));
     timeJoinNode.addChild(
-        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d2.s1")));
+        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d22.s1")));
 
     LimitNode root = new LimitNode(PlanNodeIdAllocator.generateId(), 10, timeJoinNode);
 
@@ -66,10 +65,8 @@ public class DistributionPlannerTest {
         new DistributionPlanner(analysis, new LogicalQueryPlan(new MPPQueryContext(), root));
     PlanNode newRoot = planner.rewriteSource();
 
-    //    PlanNodeUtil.printPlanNode(newRoot);
+    //        PlanNodeUtil.printPlanNode(newRoot);
     assertEquals(newRoot.getChildren().get(0).getChildren().size(), 3);
-    assertEquals(newRoot.getChildren().get(0).getChildren().get(0).getChildren().size(), 2);
-    assertEquals(newRoot.getChildren().get(0).getChildren().get(1).getChildren().size(), 2);
   }
 
   @Test
@@ -83,7 +80,7 @@ public class DistributionPlannerTest {
     timeJoinNode.addChild(
         new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s2")));
     timeJoinNode.addChild(
-        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d2.s1")));
+        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d22.s1")));
 
     LimitNode root = new LimitNode(PlanNodeIdAllocator.generateId(), 10, timeJoinNode);
 
@@ -95,32 +92,97 @@ public class DistributionPlannerTest {
     PlanNode rootWithExchange = planner.addExchangeNode(rootAfterRewrite);
     //    PlanNodeUtil.printPlanNode(rootWithExchange);
     assertEquals(rootWithExchange.getChildren().get(0).getChildren().size(), 3);
-    assertEquals(
-        rootWithExchange.getChildren().get(0).getChildren().get(0).getChildren().size(), 2);
-    assertTrue(rootWithExchange.getChildren().get(0).getChildren().get(1) instanceof ExchangeNode);
-    assertEquals(
-        rootWithExchange.getChildren().get(0).getChildren().get(1).getChildren().size(), 1);
-    assertTrue(rootWithExchange.getChildren().get(0).getChildren().get(2) instanceof ExchangeNode);
-    assertEquals(
-        rootWithExchange.getChildren().get(0).getChildren().get(2).getChildren().size(), 1);
+  }
+
+  @Test
+  public void TestSplitFragment() throws IllegalPathException {
+    TimeJoinNode timeJoinNode =
+        new TimeJoinNode(
+            PlanNodeIdAllocator.generateId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
+
+    timeJoinNode.addChild(
+        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s1")));
+    timeJoinNode.addChild(
+        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s2")));
+    timeJoinNode.addChild(
+        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d22.s1")));
+
+    LimitNode root = new LimitNode(PlanNodeIdAllocator.generateId(), 10, timeJoinNode);
+
+    Analysis analysis = constructAnalysis();
+
+    MPPQueryContext context = new MPPQueryContext("", new QueryId("query1"), null);
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
+    PlanNode rootAfterRewrite = planner.rewriteSource();
+    PlanNode rootWithExchange = planner.addExchangeNode(rootAfterRewrite);
+    SubPlan subPlan = planner.splitFragment(rootWithExchange);
+    assertEquals(subPlan.getChildren().size(), 2);
+  }
+
+  @Test
+  public void TestParallelPlan() throws IllegalPathException {
+    TimeJoinNode timeJoinNode =
+        new TimeJoinNode(
+            PlanNodeIdAllocator.generateId(), OrderBy.TIMESTAMP_ASC, FilterNullPolicy.NO_FILTER);
+
+    timeJoinNode.addChild(
+        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s1")));
+    timeJoinNode.addChild(
+        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d1.s2")));
+    timeJoinNode.addChild(
+        new SeriesScanNode(PlanNodeIdAllocator.generateId(), new PartialPath("root.sg.d22.s1")));
+
+    LimitNode root = new LimitNode(PlanNodeIdAllocator.generateId(), 10, timeJoinNode);
+
+    Analysis analysis = constructAnalysis();
+
+    MPPQueryContext context = new MPPQueryContext("", new QueryId("query1"), null);
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
+    DistributedQueryPlan plan = planner.planFragments();
+    plan.getInstances().forEach(System.out::println);
+    assertEquals(plan.getInstances().size(), 3);
   }
 
   private Analysis constructAnalysis() {
     Analysis analysis = new Analysis();
-    Map<String, Map<DataRegionTimeSlice, List<DataRegion>>> dataPartitionInfo = new HashMap<>();
-    List<DataRegion> d1DataRegions = new ArrayList<>();
-    d1DataRegions.add(new DataRegion(1, "192.0.0.1"));
-    d1DataRegions.add(new DataRegion(2, "192.0.0.1"));
-    Map<DataRegionTimeSlice, List<DataRegion>> d1DataRegionMap = new HashMap<>();
-    d1DataRegionMap.put(new DataRegionTimeSlice(), d1DataRegions);
-
-    List<DataRegion> d2DataRegions = new ArrayList<>();
-    d2DataRegions.add(new DataRegion(3, "192.0.0.1"));
-    Map<DataRegionTimeSlice, List<DataRegion>> d2DataRegionMap = new HashMap<>();
-    d2DataRegionMap.put(new DataRegionTimeSlice(), d2DataRegions);
-
-    dataPartitionInfo.put("root.sg.d1", d1DataRegionMap);
-    dataPartitionInfo.put("root.sg.d2", d2DataRegionMap);
+
+    String device1 = "root.sg.d1";
+    String device2 = "root.sg.d22";
+    String device3 = "root.sg.d333";
+
+    DataPartitionInfo dataPartitionInfo = new DataPartitionInfo();
+    Map<String, Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>>>
+        dataPartitionMap = new HashMap<>();
+    Map<DeviceGroupId, Map<TimePartitionId, List<DataRegionReplicaSet>>> sgPartitionMap =
+        new HashMap<>();
+    List<DataRegionReplicaSet> d1DataRegions = new ArrayList<>();
+    d1DataRegions.add(
+        new DataRegionReplicaSet(
+            new DataRegionId(1),
+            Arrays.asList(new EndPoint("192.0.0.1", 9000), new EndPoint("192.0.0.2", 9000))));
+    d1DataRegions.add(
+        new DataRegionReplicaSet(
+            new DataRegionId(2),
+            Arrays.asList(new EndPoint("192.0.0.3", 9000), new EndPoint("192.0.0.4", 9000))));
+    Map<TimePartitionId, List<DataRegionReplicaSet>> d1DataRegionMap = new HashMap<>();
+    d1DataRegionMap.put(new TimePartitionId(), d1DataRegions);
+
+    List<DataRegionReplicaSet> d2DataRegions = new ArrayList<>();
+    d2DataRegions.add(
+        new DataRegionReplicaSet(
+            new DataRegionId(3),
+            Arrays.asList(new EndPoint("192.0.0.5", 9000), new EndPoint("192.0.0.6", 9000))));
+    Map<TimePartitionId, List<DataRegionReplicaSet>> d2DataRegionMap = new HashMap<>();
+    d2DataRegionMap.put(new TimePartitionId(), d2DataRegions);
+
+    sgPartitionMap.put(new DeviceGroupId(device1.length()), d1DataRegionMap);
+    sgPartitionMap.put(new DeviceGroupId(device2.length()), d2DataRegionMap);
+
+    dataPartitionMap.put("root.sg", sgPartitionMap);
+
+    dataPartitionInfo.setDataPartitionMap(dataPartitionMap);
 
     analysis.setDataPartitionInfo(dataPartitionInfo);
     return analysis;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
index fc02d5b..ec63ca3 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.mpp.sql.plan;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
-import org.apache.iotdb.db.mpp.sql.analyze.StatementAnalyzer;
+import org.apache.iotdb.db.mpp.sql.analyze.Analyzer;
 import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.sql.planner.LogicalPlanner;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
@@ -81,7 +81,7 @@ public class LogicalPlannerTest {
     PlanNode planNode = null;
     try {
       MPPQueryContext context = new MPPQueryContext();
-      StatementAnalyzer analyzer = new StatementAnalyzer(new Analysis(), context);
+      Analyzer analyzer = new Analyzer(context);
       Analysis analysis =
           analyzer.analyze(
               StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset()));