You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/23 16:08:15 UTC

[iotdb] branch xingtanzjr/delete_timeseries_plan created (now 97f5d8dce8)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch xingtanzjr/delete_timeseries_plan
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 97f5d8dce8 refactor node for plan

This branch includes the following new commits:

     new d9cf1a9732 complete logical plan for delete timeseries
     new 7ecb6bdec4 move package for distribution plan
     new 97f5d8dce8 refactor node for plan

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/03: complete logical plan for delete timeseries

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/delete_timeseries_plan
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d9cf1a9732a13672bc3d5ad7c7742a9c9c06f5e0
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon May 23 23:20:43 2022 +0800

    complete logical plan for delete timeseries
---
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    | 24 ++++++++++++++++++++++
 .../iotdb/db/mpp/plan/planner/LogicalPlanner.java  | 14 +++++++++++++
 2 files changed, 38 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index 23d7ee3286..f7a35f005d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.utils.MetaUtils;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.plan.analyze.ExpressionAnalyzer;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
@@ -45,6 +46,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchS
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesCountNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InvalidateSchemaCacheNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FillNode;
@@ -60,6 +63,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggreg
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
@@ -72,6 +76,7 @@ import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import org.apache.commons.lang.Validate;
+import org.apache.iotdb.tsfile.read.filter.operator.In;
 
 import java.time.ZoneId;
 import java.util.ArrayList;
@@ -599,4 +604,23 @@ public class LogicalPlanBuilder {
     this.root = memorySourceNode;
     return this;
   }
+
+  public LogicalPlanBuilder planInvalidateSchemaCache(List<PartialPath> paths, List<String> storageGroups) {
+    this.root = new InvalidateSchemaCacheNode(context.getQueryId().genPlanNodeId(), context.getQueryId(), paths, storageGroups);
+    return this;
+  }
+
+  public LogicalPlanBuilder planDeleteData(List<PartialPath> paths) {
+    DeleteDataNode node = new DeleteDataNode(context.getQueryId().genPlanNodeId(), paths);
+    node.addChild(this.root);
+    this.root = node;
+    return this;
+  }
+
+  public LogicalPlanBuilder planDeleteTimeseries(List<PartialPath> paths) {
+    DeleteTimeSeriesNode node = new DeleteTimeSeriesNode(context.getQueryId().genPlanNodeId(), paths);
+    node.addChild(this.root);
+    this.root = node;
+    return this;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index 4de3c11ab5..bf08942b92 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.plan.planner;
 
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.confignode.rpc.thrift.NodeManagementType;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
@@ -50,6 +51,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.SchemaFetchStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildNodesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildPathsStatement;
@@ -535,5 +537,17 @@ public class LogicalPlanner {
           .planNodeManagementMemoryMerge(analysis.getMatchedNodes(), NodeManagementType.CHILD_NODES)
           .getRoot();
     }
+
+    @Override
+    public PlanNode visitDeleteTimeseries(DeleteTimeSeriesStatement deleteTimeSeriesStatement, MPPQueryContext context) {
+      LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
+      List<PartialPath> paths = deleteTimeSeriesStatement.getPaths();
+      List<String> storageGroups = new ArrayList<>(analysis.getSchemaPartitionInfo().getSchemaPartitionMap().keySet());
+      return planBuilder
+          .planInvalidateSchemaCache(paths, storageGroups)
+          .planDeleteData(paths)
+          .planDeleteTimeseries(paths)
+          .getRoot();
+    }
   }
 }


[iotdb] 03/03: refactor node for plan

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/delete_timeseries_plan
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 97f5d8dce8a635fb4e18f162eebc973fe4d048d1
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue May 24 00:08:00 2022 +0800

    refactor node for plan
---
 .../planner/distribution/ExchangeNodeAdder.java    | 36 ++++++++--------------
 .../plan/planner/distribution/SourceRewriter.java  |  5 +++
 .../db/mpp/plan/planner/plan/PlanFragment.java     |  6 ++--
 ...itePlanNode.java => IPartitionRelatedNode.java} | 14 ++-------
 .../mpp/plan/planner/plan/node/WritePlanNode.java  |  5 +--
 .../node/metedata/write/DeleteTimeSeriesNode.java  | 10 ++----
 .../metedata/write/InvalidateSchemaCacheNode.java  | 15 +--------
 .../plan/planner/plan/node/source/SourceNode.java  |  5 ++-
 .../planner/plan/node/write/DeleteDataNode.java    | 10 ++----
 9 files changed, 31 insertions(+), 75 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
index 23fcc4c96b..781c84b28e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggreg
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -113,52 +114,41 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
   }
 
   @Override
-  public PlanNode visitSchemaQueryScan(SchemaQueryScanNode node, NodeGroupContext context) {
-    NodeDistribution nodeDistribution = new NodeDistribution(NodeDistributionType.NO_CHILD);
-    nodeDistribution.region = node.getRegionReplicaSet();
-    context.putNodeDistribution(node.getPlanNodeId(), nodeDistribution);
-    return node;
+  public PlanNode visitSchemaFetchMerge(SchemaFetchMergeNode node, NodeGroupContext context) {
+    return internalVisitSchemaMerge(node, context);
   }
 
   @Override
-  public PlanNode visitSchemaFetchMerge(SchemaFetchMergeNode node, NodeGroupContext context) {
-    return internalVisitSchemaMerge(node, context);
+  public PlanNode visitSchemaQueryScan(SchemaQueryScanNode node, NodeGroupContext context) {
+    return processNoChildSourceNode(node, context);
   }
 
   @Override
   public PlanNode visitSchemaFetchScan(SchemaFetchScanNode node, NodeGroupContext context) {
-    NodeDistribution nodeDistribution = new NodeDistribution(NodeDistributionType.NO_CHILD);
-    nodeDistribution.region = node.getRegionReplicaSet();
-    context.putNodeDistribution(node.getPlanNodeId(), nodeDistribution);
-    return node;
+    return processNoChildSourceNode(node, context);
   }
 
   @Override
   public PlanNode visitSeriesScan(SeriesScanNode node, NodeGroupContext context) {
-    context.putNodeDistribution(
-        node.getPlanNodeId(),
-        new NodeDistribution(NodeDistributionType.NO_CHILD, node.getRegionReplicaSet()));
-    return node.clone();
+    return processNoChildSourceNode(node, context);
   }
 
   @Override
   public PlanNode visitAlignedSeriesScan(AlignedSeriesScanNode node, NodeGroupContext context) {
-    context.putNodeDistribution(
-        node.getPlanNodeId(),
-        new NodeDistribution(NodeDistributionType.NO_CHILD, node.getRegionReplicaSet()));
-    return node.clone();
+    return processNoChildSourceNode(node, context);
   }
 
   public PlanNode visitSeriesAggregationScan(
       SeriesAggregationScanNode node, NodeGroupContext context) {
-    context.putNodeDistribution(
-        node.getPlanNodeId(),
-        new NodeDistribution(NodeDistributionType.NO_CHILD, node.getRegionReplicaSet()));
-    return node.clone();
+    return processNoChildSourceNode(node, context);
   }
 
   public PlanNode visitAlignedSeriesAggregationScan(
       AlignedSeriesAggregationScanNode node, NodeGroupContext context) {
+    return processNoChildSourceNode(node, context);
+  }
+
+  private PlanNode processNoChildSourceNode(SourceNode node, NodeGroupContext context) {
     context.putNodeDistribution(
         node.getPlanNodeId(),
         new NodeDistribution(NodeDistributionType.NO_CHILD, node.getRegionReplicaSet()));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 973d40b3f8..606ccf82cf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSo
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesSourceNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
@@ -497,6 +498,10 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
     return sources;
   }
 
+  public PlanNode visit(DeleteDataNode node, DistributionPlanContext context) {
+    return null;
+  }
+
   public PlanNode visit(PlanNode node, DistributionPlanContext context) {
     return node.accept(this, context);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
index 626893c152..a500f65d18 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
@@ -21,10 +21,10 @@ package org.apache.iotdb.db.mpp.plan.planner.plan;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.IPartitionRelatedNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.ByteBuffer;
@@ -77,8 +77,8 @@ public class PlanFragment {
   }
 
   private TRegionReplicaSet getNodeRegion(PlanNode root) {
-    if (root instanceof SourceNode) {
-      return ((SourceNode) root).getRegionReplicaSet();
+    if (root instanceof IPartitionRelatedNode) {
+      return ((IPartitionRelatedNode) root).getRegionReplicaSet();
     }
     for (PlanNode child : root.getChildren()) {
       TRegionReplicaSet result = getNodeRegion(child);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/WritePlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/IPartitionRelatedNode.java
similarity index 73%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/WritePlanNode.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/IPartitionRelatedNode.java
index 0d6ee0c793..ca82777058 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/WritePlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/IPartitionRelatedNode.java
@@ -20,17 +20,7 @@
 package org.apache.iotdb.db.mpp.plan.planner.plan.node;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 
-import java.util.List;
-
-public abstract class WritePlanNode extends PlanNode {
-
-  protected WritePlanNode(PlanNodeId id) {
-    super(id);
-  }
-
-  public abstract TRegionReplicaSet getRegionReplicaSet();
-
-  public abstract List<WritePlanNode> splitByPartition(Analysis analysis);
+public interface IPartitionRelatedNode {
+  TRegionReplicaSet getRegionReplicaSet();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/WritePlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/WritePlanNode.java
index 0d6ee0c793..ef877add59 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/WritePlanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/WritePlanNode.java
@@ -19,18 +19,15 @@
 
 package org.apache.iotdb.db.mpp.plan.planner.plan.node;
 
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 
 import java.util.List;
 
-public abstract class WritePlanNode extends PlanNode {
+public abstract class WritePlanNode extends PlanNode implements IPartitionRelatedNode {
 
   protected WritePlanNode(PlanNodeId id) {
     super(id);
   }
 
-  public abstract TRegionReplicaSet getRegionReplicaSet();
-
   public abstract List<WritePlanNode> splitByPartition(Analysis analysis);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/DeleteTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/DeleteTimeSeriesNode.java
index 98df329b1a..05356144cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/DeleteTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/DeleteTimeSeriesNode.java
@@ -22,18 +22,17 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
-import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.IPartitionRelatedNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-public class DeleteTimeSeriesNode extends WritePlanNode {
+public class DeleteTimeSeriesNode extends PlanNode implements IPartitionRelatedNode {
 
   private final List<PartialPath> pathList;
 
@@ -92,9 +91,4 @@ public class DeleteTimeSeriesNode extends WritePlanNode {
   public TRegionReplicaSet getRegionReplicaSet() {
     return null;
   }
-
-  @Override
-  public List<WritePlanNode> splitByPartition(Analysis analysis) {
-    return null;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InvalidateSchemaCacheNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InvalidateSchemaCacheNode.java
index 95649d8248..8af09b8a3d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InvalidateSchemaCacheNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/InvalidateSchemaCacheNode.java
@@ -19,22 +19,19 @@
 
 package org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write;
 
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
 import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-public class InvalidateSchemaCacheNode extends WritePlanNode {
+public class InvalidateSchemaCacheNode extends PlanNode {
 
   private final QueryId queryId;
 
@@ -114,14 +111,4 @@ public class InvalidateSchemaCacheNode extends WritePlanNode {
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     return new InvalidateSchemaCacheNode(planNodeId, queryId, pathList, storageGroups);
   }
-
-  @Override
-  public TRegionReplicaSet getRegionReplicaSet() {
-    return null;
-  }
-
-  @Override
-  public List<WritePlanNode> splitByPartition(Analysis analysis) {
-    return null;
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SourceNode.java
index e3209a03e4..63203cf07f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SourceNode.java
@@ -19,10 +19,11 @@
 package org.apache.iotdb.db.mpp.plan.planner.plan.node.source;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.IPartitionRelatedNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 
-public abstract class SourceNode extends PlanNode implements AutoCloseable {
+public abstract class SourceNode extends PlanNode implements AutoCloseable, IPartitionRelatedNode {
 
   public SourceNode(PlanNodeId id) {
     super(id);
@@ -30,7 +31,5 @@ public abstract class SourceNode extends PlanNode implements AutoCloseable {
 
   public abstract void open() throws Exception;
 
-  public abstract TRegionReplicaSet getRegionReplicaSet();
-
   public abstract void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java
index 21ebfec88d..11c803c605 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java
@@ -22,18 +22,17 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
-import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.IPartitionRelatedNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-public class DeleteDataNode extends WritePlanNode {
+public class DeleteDataNode extends PlanNode implements IPartitionRelatedNode {
 
   private final List<PartialPath> pathList;
 
@@ -92,9 +91,4 @@ public class DeleteDataNode extends WritePlanNode {
   public TRegionReplicaSet getRegionReplicaSet() {
     return null;
   }
-
-  @Override
-  public List<WritePlanNode> splitByPartition(Analysis analysis) {
-    return null;
-  }
 }


[iotdb] 02/03: move package for distribution plan

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/delete_timeseries_plan
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7ecb6bdec404e5831ed431ec2e490beec817c7a8
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon May 23 23:37:11 2022 +0800

    move package for distribution plan
---
 .../db/mpp/plan/execution/QueryExecution.java      |   2 +-
 .../db/mpp/plan/planner/DistributionPlanner.java   | 909 ---------------------
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    |  12 +-
 .../iotdb/db/mpp/plan/planner/LogicalPlanner.java  |   6 +-
 .../distribution/DistributionPlanContext.java      |  30 +
 .../planner/distribution/DistributionPlanner.java  | 176 ++++
 .../planner/distribution/ExchangeNodeAdder.java    | 256 ++++++
 .../planner/distribution/NodeDistribution.java     |  36 +
 .../planner/distribution/NodeDistributionType.java |  27 +
 .../planner/distribution/NodeGroupContext.java     |  44 +
 .../SimpleFragmentParallelPlanner.java             |   3 +-
 .../plan/planner/distribution/SourceRewriter.java  | 503 ++++++++++++
 .../WriteFragmentParallelPlanner.java              |   3 +-
 .../db/mpp/plan/plan/DistributionPlannerTest.java  |   2 +-
 14 files changed, 1089 insertions(+), 920 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index eb289306ef..868e506dba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -39,8 +39,8 @@ import org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemorySource;
 import org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemorySourceContext;
 import org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemorySourceVisitor;
 import org.apache.iotdb.db.mpp.plan.optimization.PlanOptimizer;
-import org.apache.iotdb.db.mpp.plan.planner.DistributionPlanner;
 import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanner;
+import org.apache.iotdb.db.mpp.plan.planner.distribution.DistributionPlanner;
 import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
deleted file mode 100644
index 9756e4c6d6..0000000000
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
+++ /dev/null
@@ -1,909 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.mpp.plan.planner;
-
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.mpp.common.MPPQueryContext;
-import org.apache.iotdb.db.mpp.common.PlanFragmentId;
-import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
-import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
-import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
-import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
-import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
-import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.SimplePlanNodeRewriter;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.AbstractSchemaMergeNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.CountSchemaMergeNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchMergeNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchScanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesSourceNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
-import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.stream.Collectors;
-
-import static com.google.common.collect.ImmutableList.toImmutableList;
-
-public class DistributionPlanner {
-  private Analysis analysis;
-  private MPPQueryContext context;
-  private LogicalQueryPlan logicalPlan;
-
-  private int planFragmentIndex = 0;
-
-  public DistributionPlanner(Analysis analysis, LogicalQueryPlan logicalPlan) {
-    this.analysis = analysis;
-    this.logicalPlan = logicalPlan;
-    this.context = logicalPlan.getContext();
-  }
-
-  public PlanNode rewriteSource() {
-    SourceRewriter rewriter = new SourceRewriter();
-    return rewriter.visit(logicalPlan.getRootNode(), new DistributionPlanContext(context));
-  }
-
-  public PlanNode addExchangeNode(PlanNode root) {
-    ExchangeNodeAdder adder = new ExchangeNodeAdder();
-    return adder.visit(root, new NodeGroupContext(context));
-  }
-
-  public SubPlan splitFragment(PlanNode root) {
-    FragmentBuilder fragmentBuilder = new FragmentBuilder(context);
-    return fragmentBuilder.splitToSubPlan(root);
-  }
-
-  public DistributedQueryPlan planFragments() {
-    PlanNode rootAfterRewrite = rewriteSource();
-    PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
-    if (analysis.getStatement() instanceof QueryStatement) {
-      analysis
-          .getRespDatasetHeader()
-          .setColumnToTsBlockIndexMap(rootWithExchange.getOutputColumnNames());
-    }
-    SubPlan subPlan = splitFragment(rootWithExchange);
-    List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
-    // Only execute this step for READ operation
-    if (context.getQueryType() == QueryType.READ) {
-      SetSinkForRootInstance(subPlan, fragmentInstances);
-    }
-    return new DistributedQueryPlan(
-        logicalPlan.getContext(), subPlan, subPlan.getPlanFragmentList(), fragmentInstances);
-  }
-
-  // Convert fragment to detailed instance
-  // And for parallel-able fragment, clone it into several instances with different params.
-  public List<FragmentInstance> planFragmentInstances(SubPlan subPlan) {
-    IFragmentParallelPlaner parallelPlaner =
-        context.getQueryType() == QueryType.READ
-            ? new SimpleFragmentParallelPlanner(subPlan, analysis, context)
-            : new WriteFragmentParallelPlanner(subPlan, analysis, context);
-    return parallelPlaner.parallelPlan();
-  }
-
-  // TODO: (xingtanzjr) Maybe we should handle ResultNode in LogicalPlanner ?
-  public void SetSinkForRootInstance(SubPlan subPlan, List<FragmentInstance> instances) {
-    FragmentInstance rootInstance = null;
-    for (FragmentInstance instance : instances) {
-      if (instance.getFragment().getId().equals(subPlan.getPlanFragment().getId())) {
-        rootInstance = instance;
-        break;
-      }
-    }
-    // root should not be null during normal process
-    if (rootInstance == null) {
-      return;
-    }
-
-    FragmentSinkNode sinkNode = new FragmentSinkNode(context.getQueryId().genPlanNodeId());
-    sinkNode.setDownStream(
-        context.getLocalDataBlockEndpoint(),
-        context.getResultNodeContext().getVirtualFragmentInstanceId(),
-        context.getResultNodeContext().getVirtualResultNodeId());
-    sinkNode.setChild(rootInstance.getFragment().getRoot());
-    context
-        .getResultNodeContext()
-        .setUpStream(
-            rootInstance.getHostDataNode().dataBlockManagerEndPoint,
-            rootInstance.getId(),
-            sinkNode.getPlanNodeId());
-    rootInstance.getFragment().setRoot(sinkNode);
-  }
-
-  private PlanFragmentId getNextFragmentId() {
-    return new PlanFragmentId(this.logicalPlan.getContext().getQueryId(), this.planFragmentIndex++);
-  }
-
-  private class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanContext> {
-
-    // TODO: (xingtanzjr) implement the method visitDeviceMergeNode()
-    public PlanNode visitDeviceMerge(TimeJoinNode node, DistributionPlanContext context) {
-      return null;
-    }
-
-    @Override
-    public PlanNode visitSchemaQueryMerge(
-        SchemaQueryMergeNode node, DistributionPlanContext context) {
-      SchemaQueryMergeNode root = (SchemaQueryMergeNode) node.clone();
-      SchemaQueryScanNode seed = (SchemaQueryScanNode) node.getChildren().get(0);
-      TreeSet<TRegionReplicaSet> schemaRegions =
-          new TreeSet<>(Comparator.comparingInt(region -> region.getRegionId().getId()));
-      analysis
-          .getSchemaPartitionInfo()
-          .getSchemaPartitionMap()
-          .forEach(
-              (storageGroup, deviceGroup) -> {
-                deviceGroup.forEach(
-                    (deviceGroupId, schemaRegionReplicaSet) ->
-                        schemaRegions.add(schemaRegionReplicaSet));
-              });
-      int count = schemaRegions.size();
-      schemaRegions.forEach(
-          region -> {
-            SchemaQueryScanNode schemaQueryScanNode = (SchemaQueryScanNode) seed.clone();
-            schemaQueryScanNode.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
-            schemaQueryScanNode.setRegionReplicaSet(region);
-            if (count > 1) {
-              schemaQueryScanNode.setLimit(
-                  schemaQueryScanNode.getOffset() + schemaQueryScanNode.getLimit());
-              schemaQueryScanNode.setOffset(0);
-            }
-            root.addChild(schemaQueryScanNode);
-          });
-      return root;
-    }
-
-    @Override
-    public PlanNode visitCountMerge(CountSchemaMergeNode node, DistributionPlanContext context) {
-      CountSchemaMergeNode root = (CountSchemaMergeNode) node.clone();
-      SchemaQueryScanNode seed = (SchemaQueryScanNode) node.getChildren().get(0);
-      Set<TRegionReplicaSet> schemaRegions = new HashSet<>();
-      analysis
-          .getSchemaPartitionInfo()
-          .getSchemaPartitionMap()
-          .forEach(
-              (storageGroup, deviceGroup) -> {
-                deviceGroup.forEach(
-                    (deviceGroupId, schemaRegionReplicaSet) ->
-                        schemaRegions.add(schemaRegionReplicaSet));
-              });
-      schemaRegions.forEach(
-          region -> {
-            SchemaQueryScanNode schemaQueryScanNode = (SchemaQueryScanNode) seed.clone();
-            schemaQueryScanNode.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
-            schemaQueryScanNode.setRegionReplicaSet(region);
-            root.addChild(schemaQueryScanNode);
-          });
-      return root;
-    }
-
-    // TODO: (xingtanzjr) a temporary way to resolve the distribution of single SeriesScanNode issue
-    @Override
-    public PlanNode visitSeriesScan(SeriesScanNode node, DistributionPlanContext context) {
-      List<TRegionReplicaSet> dataDistribution =
-          analysis.getPartitionInfo(node.getSeriesPath(), node.getTimeFilter());
-      if (dataDistribution.size() == 1) {
-        node.setRegionReplicaSet(dataDistribution.get(0));
-        return node;
-      }
-      TimeJoinNode timeJoinNode =
-          new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder());
-      for (TRegionReplicaSet dataRegion : dataDistribution) {
-        SeriesScanNode split = (SeriesScanNode) node.clone();
-        split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
-        split.setRegionReplicaSet(dataRegion);
-        timeJoinNode.addChild(split);
-      }
-      return timeJoinNode;
-    }
-
-    @Override
-    public PlanNode visitSeriesAggregationScan(
-        SeriesAggregationScanNode node, DistributionPlanContext context) {
-      return processSeriesAggregationSource(node, context);
-    }
-
-    @Override
-    public PlanNode visitAlignedSeriesAggregationScan(
-        AlignedSeriesAggregationScanNode node, DistributionPlanContext context) {
-      return processSeriesAggregationSource(node, context);
-    }
-
-    private PlanNode processSeriesAggregationSource(
-        SeriesAggregationSourceNode node, DistributionPlanContext context) {
-      List<TRegionReplicaSet> dataDistribution =
-          analysis.getPartitionInfo(node.getPartitionPath(), node.getPartitionTimeFilter());
-      if (dataDistribution.size() == 1) {
-        node.setRegionReplicaSet(dataDistribution.get(0));
-        return node;
-      }
-      List<AggregationDescriptor> leafAggDescriptorList = new ArrayList<>();
-      node.getAggregationDescriptorList()
-          .forEach(
-              descriptor -> {
-                leafAggDescriptorList.add(
-                    new AggregationDescriptor(
-                        descriptor.getAggregationType(),
-                        AggregationStep.PARTIAL,
-                        descriptor.getInputExpressions()));
-              });
-
-      List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
-      node.getAggregationDescriptorList()
-          .forEach(
-              descriptor -> {
-                rootAggDescriptorList.add(
-                    new AggregationDescriptor(
-                        descriptor.getAggregationType(),
-                        AggregationStep.FINAL,
-                        descriptor.getInputExpressions()));
-              });
-
-      AggregationNode aggregationNode =
-          new AggregationNode(
-              context.queryContext.getQueryId().genPlanNodeId(), rootAggDescriptorList);
-      for (TRegionReplicaSet dataRegion : dataDistribution) {
-        SeriesAggregationScanNode split = (SeriesAggregationScanNode) node.clone();
-        split.setAggregationDescriptorList(leafAggDescriptorList);
-        split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
-        split.setRegionReplicaSet(dataRegion);
-        aggregationNode.addChild(split);
-      }
-      return aggregationNode;
-    }
-
-    @Override
-    public PlanNode visitAlignedSeriesScan(
-        AlignedSeriesScanNode node, DistributionPlanContext context) {
-      List<TRegionReplicaSet> dataDistribution =
-          analysis.getPartitionInfo(node.getAlignedPath(), node.getTimeFilter());
-      if (dataDistribution.size() == 1) {
-        node.setRegionReplicaSet(dataDistribution.get(0));
-        return node;
-      }
-      TimeJoinNode timeJoinNode =
-          new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder());
-      for (TRegionReplicaSet dataRegion : dataDistribution) {
-        AlignedSeriesScanNode split = (AlignedSeriesScanNode) node.clone();
-        split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
-        split.setRegionReplicaSet(dataRegion);
-        timeJoinNode.addChild(split);
-      }
-      return timeJoinNode;
-    }
-
-    @Override
-    public PlanNode visitSchemaFetchMerge(
-        SchemaFetchMergeNode node, DistributionPlanContext context) {
-      SchemaFetchMergeNode root = (SchemaFetchMergeNode) node.clone();
-      Map<String, Set<TRegionReplicaSet>> storageGroupSchemaRegionMap = new HashMap<>();
-      analysis
-          .getSchemaPartitionInfo()
-          .getSchemaPartitionMap()
-          .forEach(
-              (storageGroup, deviceGroup) -> {
-                storageGroupSchemaRegionMap.put(storageGroup, new HashSet<>());
-                deviceGroup.forEach(
-                    (deviceGroupId, schemaRegionReplicaSet) ->
-                        storageGroupSchemaRegionMap.get(storageGroup).add(schemaRegionReplicaSet));
-              });
-
-      for (PlanNode child : node.getChildren()) {
-        for (TRegionReplicaSet schemaRegion :
-            storageGroupSchemaRegionMap.get(
-                ((SchemaFetchScanNode) child).getStorageGroup().getFullPath())) {
-          SchemaFetchScanNode schemaFetchScanNode = (SchemaFetchScanNode) child.clone();
-          schemaFetchScanNode.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
-          schemaFetchScanNode.setRegionReplicaSet(schemaRegion);
-          root.addChild(schemaFetchScanNode);
-        }
-      }
-      return root;
-    }
-
-    @Override
-    public PlanNode visitTimeJoin(TimeJoinNode node, DistributionPlanContext context) {
-      // Although some logic is similar between Aggregation and RawDataQuery,
-      // we still use separate method to process the distribution planning now
-      // to make the planning procedure more clear
-      if (isAggregationQuery(node)) {
-        return planAggregationWithTimeJoin(node, context);
-      }
-
-      TimeJoinNode root = (TimeJoinNode) node.clone();
-      // Step 1: Get all source nodes. For the node which is not source, add it as the child of
-      // current TimeJoinNode
-      List<SourceNode> sources = new ArrayList<>();
-      for (PlanNode child : node.getChildren()) {
-        if (child instanceof SeriesSourceNode) {
-          // If the child is SeriesScanNode, we need to check whether this node should be seperated
-          // into several splits.
-          SeriesSourceNode handle = (SeriesSourceNode) child;
-          List<TRegionReplicaSet> dataDistribution =
-              analysis.getPartitionInfo(handle.getPartitionPath(), handle.getPartitionTimeFilter());
-          // If the size of dataDistribution is m, this SeriesScanNode should be seperated into m
-          // SeriesScanNode.
-          for (TRegionReplicaSet dataRegion : dataDistribution) {
-            SeriesSourceNode split = (SeriesSourceNode) handle.clone();
-            split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
-            split.setRegionReplicaSet(dataRegion);
-            sources.add(split);
-          }
-        }
-      }
-      // Step 2: For the source nodes, group them by the DataRegion.
-      Map<TRegionReplicaSet, List<SourceNode>> sourceGroup =
-          sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
-
-      // Step 3: For the source nodes which belong to same data region, add a TimeJoinNode for them
-      // and make the
-      // new TimeJoinNode as the child of current TimeJoinNode
-      // TODO: (xingtanzjr) optimize the procedure here to remove duplicated TimeJoinNode
-      final boolean[] addParent = {false};
-      sourceGroup.forEach(
-          (dataRegion, seriesScanNodes) -> {
-            if (seriesScanNodes.size() == 1) {
-              root.addChild(seriesScanNodes.get(0));
-            } else {
-              if (!addParent[0]) {
-                seriesScanNodes.forEach(root::addChild);
-                addParent[0] = true;
-              } else {
-                // We clone a TimeJoinNode from root to make the params to be consistent.
-                // But we need to assign a new ID to it
-                TimeJoinNode parentOfGroup = (TimeJoinNode) root.clone();
-                parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
-                seriesScanNodes.forEach(parentOfGroup::addChild);
-                root.addChild(parentOfGroup);
-              }
-            }
-          });
-
-      // Process the other children which are not SeriesSourceNode
-      for (PlanNode child : node.getChildren()) {
-        if (!(child instanceof SeriesSourceNode)) {
-          // In a general logical query plan, the children of TimeJoinNode should only be
-          // SeriesScanNode or SeriesAggregateScanNode
-          // So this branch should not be touched.
-          root.addChild(visit(child, context));
-        }
-      }
-
-      return root;
-    }
-
-    private boolean isAggregationQuery(TimeJoinNode node) {
-      for (PlanNode child : node.getChildren()) {
-        if (child instanceof SeriesAggregationScanNode
-            || child instanceof AlignedSeriesAggregationScanNode) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    private PlanNode planAggregationWithTimeJoin(
-        TimeJoinNode root, DistributionPlanContext context) {
-
-      List<SeriesAggregationSourceNode> sources = splitAggregationSourceByPartition(root, context);
-      Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup =
-          sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
-
-      // construct AggregationDescriptor for AggregationNode
-      List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
-      for (PlanNode child : root.getChildren()) {
-        SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode) child;
-        handle
-            .getAggregationDescriptorList()
-            .forEach(
-                descriptor -> {
-                  rootAggDescriptorList.add(
-                      new AggregationDescriptor(
-                          descriptor.getAggregationType(),
-                          AggregationStep.FINAL,
-                          descriptor.getInputExpressions()));
-                });
-      }
-      AggregationNode aggregationNode =
-          new AggregationNode(
-              context.queryContext.getQueryId().genPlanNodeId(), rootAggDescriptorList);
-
-      final boolean[] addParent = {false};
-      sourceGroup.forEach(
-          (dataRegion, sourceNodes) -> {
-            if (sourceNodes.size() == 1) {
-              aggregationNode.addChild(sourceNodes.get(0));
-            } else {
-              if (!addParent[0]) {
-                sourceNodes.forEach(aggregationNode::addChild);
-                addParent[0] = true;
-              } else {
-                // We clone a TimeJoinNode from root to make the params to be consistent.
-                // But we need to assign a new ID to it
-                TimeJoinNode parentOfGroup = (TimeJoinNode) root.clone();
-                parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
-                sourceNodes.forEach(parentOfGroup::addChild);
-                aggregationNode.addChild(parentOfGroup);
-              }
-            }
-          });
-
-      return aggregationNode;
-    }
-
-    public PlanNode visitGroupByLevel(GroupByLevelNode root, DistributionPlanContext context) {
-      // Firstly, we build the tree structure for GroupByLevelNode
-      List<SeriesAggregationSourceNode> sources = splitAggregationSourceByPartition(root, context);
-      Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup =
-          sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
-
-      GroupByLevelNode newRoot = (GroupByLevelNode) root.clone();
-      final boolean[] addParent = {false};
-      sourceGroup.forEach(
-          (dataRegion, sourceNodes) -> {
-            if (sourceNodes.size() == 1) {
-              newRoot.addChild(sourceNodes.get(0));
-            } else {
-              if (!addParent[0]) {
-                sourceNodes.forEach(newRoot::addChild);
-                addParent[0] = true;
-              } else {
-                // We clone a TimeJoinNode from root to make the params to be consistent.
-                // But we need to assign a new ID to it
-                GroupByLevelNode parentOfGroup = (GroupByLevelNode) root.clone();
-                parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
-                sourceNodes.forEach(parentOfGroup::addChild);
-                newRoot.addChild(parentOfGroup);
-              }
-            }
-          });
-
-      // Then, we calculate the attributes for GroupByLevelNode in each level
-      calculateGroupByLevelNodeAttributes(newRoot, 0);
-      return newRoot;
-    }
-
-    private void calculateGroupByLevelNodeAttributes(PlanNode node, int level) {
-      if (node == null) {
-        return;
-      }
-      node.getChildren().forEach(child -> calculateGroupByLevelNodeAttributes(child, level + 1));
-      if (!(node instanceof GroupByLevelNode)) {
-        return;
-      }
-      GroupByLevelNode handle = (GroupByLevelNode) node;
-
-      // Construct all outputColumns from children. Using Set here to avoid duplication
-      Set<String> childrenOutputColumns = new HashSet<>();
-      handle
-          .getChildren()
-          .forEach(child -> childrenOutputColumns.addAll(child.getOutputColumnNames()));
-
-      // Check every OutputColumn of GroupByLevelNode and set the Expression of corresponding
-      // AggregationDescriptor
-      List<GroupByLevelDescriptor> descriptorList = new ArrayList<>();
-      for (GroupByLevelDescriptor originalDescriptor : handle.getGroupByLevelDescriptors()) {
-        List<Expression> descriptorExpression = new ArrayList<>();
-        for (String childColumn : childrenOutputColumns) {
-          // If this condition matched, the childColumn should come from GroupByLevelNode
-          if (isAggColumnMatchExpression(childColumn, originalDescriptor.getOutputExpression())) {
-            descriptorExpression.add(originalDescriptor.getOutputExpression());
-            continue;
-          }
-          for (Expression exp : originalDescriptor.getInputExpressions()) {
-            if (isAggColumnMatchExpression(childColumn, exp)) {
-              descriptorExpression.add(exp);
-            }
-          }
-        }
-        if (descriptorExpression.size() == 0) {
-          continue;
-        }
-        GroupByLevelDescriptor descriptor = originalDescriptor.deepClone();
-        descriptor.setStep(level == 0 ? AggregationStep.FINAL : AggregationStep.PARTIAL);
-        descriptor.setInputExpressions(descriptorExpression);
-
-        descriptorList.add(descriptor);
-      }
-      handle.setGroupByLevelDescriptors(descriptorList);
-    }
-
-    // TODO: (xingtanzjr) need to confirm the logic when processing UDF
-    private boolean isAggColumnMatchExpression(String columnName, Expression expression) {
-      if (columnName == null) {
-        return false;
-      }
-      return columnName.contains(expression.getExpressionString());
-    }
-
-    private List<SeriesAggregationSourceNode> splitAggregationSourceByPartition(
-        MultiChildNode root, DistributionPlanContext context) {
-      // Step 1: split SeriesAggregationSourceNode according to data partition
-      List<SeriesAggregationSourceNode> sources = new ArrayList<>();
-      Map<PartialPath, Integer> regionCountPerSeries = new HashMap<>();
-      for (PlanNode child : root.getChildren()) {
-        SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode) child;
-        List<TRegionReplicaSet> dataDistribution =
-            analysis.getPartitionInfo(handle.getPartitionPath(), handle.getPartitionTimeFilter());
-        for (TRegionReplicaSet dataRegion : dataDistribution) {
-          SeriesAggregationSourceNode split = (SeriesAggregationSourceNode) handle.clone();
-          split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
-          split.setRegionReplicaSet(dataRegion);
-          // Let each split reference different object of AggregationDescriptorList
-          split.setAggregationDescriptorList(
-              handle.getAggregationDescriptorList().stream()
-                  .map(AggregationDescriptor::deepClone)
-                  .collect(Collectors.toList()));
-          sources.add(split);
-        }
-        regionCountPerSeries.put(handle.getPartitionPath(), dataDistribution.size());
-      }
-
-      // Step 2: change the step for each SeriesAggregationSourceNode according to its split count
-      for (SeriesAggregationSourceNode source : sources) {
-        //        boolean isFinal = regionCountPerSeries.get(source.getPartitionPath()) == 1;
-        // TODO: (xingtanzjr) need to optimize this step later. We make it as Partial now.
-        boolean isFinal = false;
-        source
-            .getAggregationDescriptorList()
-            .forEach(d -> d.setStep(isFinal ? AggregationStep.FINAL : AggregationStep.PARTIAL));
-      }
-      return sources;
-    }
-
-    public PlanNode visit(PlanNode node, DistributionPlanContext context) {
-      return node.accept(this, context);
-    }
-  }
-
-  private class DistributionPlanContext {
-    private MPPQueryContext queryContext;
-
-    public DistributionPlanContext(MPPQueryContext queryContext) {
-      this.queryContext = queryContext;
-    }
-  }
-
-  private class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
-    @Override
-    public PlanNode visitPlan(PlanNode node, NodeGroupContext context) {
-      // TODO: (xingtanzjr) we apply no action for IWritePlanNode currently
-      if (node instanceof WritePlanNode) {
-        return node;
-      }
-      // Visit all the children of current node
-      List<PlanNode> children =
-          node.getChildren().stream()
-              .map(child -> child.accept(this, context))
-              .collect(toImmutableList());
-
-      // Calculate the node distribution info according to its children
-
-      // Put the node distribution info into context
-      // NOTICE: we will only process the PlanNode which has only 1 child here. For the other
-      // PlanNode, we need to process
-      // them with special method
-      context.putNodeDistribution(
-          node.getPlanNodeId(),
-          new NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN, null));
-
-      return node.cloneWithChildren(children);
-    }
-
-    @Override
-    public PlanNode visitSchemaQueryMerge(SchemaQueryMergeNode node, NodeGroupContext context) {
-      return internalVisitSchemaMerge(node, context);
-    }
-
-    private PlanNode internalVisitSchemaMerge(
-        AbstractSchemaMergeNode node, NodeGroupContext context) {
-      node.getChildren()
-          .forEach(
-              child -> {
-                visit(child, context);
-              });
-      NodeDistribution nodeDistribution =
-          new NodeDistribution(NodeDistributionType.DIFFERENT_FROM_ALL_CHILDREN);
-      PlanNode newNode = node.clone();
-      nodeDistribution.region = calculateSchemaRegionByChildren(node.getChildren(), context);
-      context.putNodeDistribution(newNode.getPlanNodeId(), nodeDistribution);
-      node.getChildren()
-          .forEach(
-              child -> {
-                if (!nodeDistribution.region.equals(
-                    context.getNodeDistribution(child.getPlanNodeId()).region)) {
-                  ExchangeNode exchangeNode =
-                      new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
-                  exchangeNode.setChild(child);
-                  exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
-                  newNode.addChild(exchangeNode);
-                } else {
-                  newNode.addChild(child);
-                }
-              });
-      return newNode;
-    }
-
-    @Override
-    public PlanNode visitCountMerge(CountSchemaMergeNode node, NodeGroupContext context) {
-      return internalVisitSchemaMerge(node, context);
-    }
-
-    @Override
-    public PlanNode visitSchemaQueryScan(SchemaQueryScanNode node, NodeGroupContext context) {
-      NodeDistribution nodeDistribution = new NodeDistribution(NodeDistributionType.NO_CHILD);
-      nodeDistribution.region = node.getRegionReplicaSet();
-      context.putNodeDistribution(node.getPlanNodeId(), nodeDistribution);
-      return node;
-    }
-
-    @Override
-    public PlanNode visitSchemaFetchMerge(SchemaFetchMergeNode node, NodeGroupContext context) {
-      return internalVisitSchemaMerge(node, context);
-    }
-
-    @Override
-    public PlanNode visitSchemaFetchScan(SchemaFetchScanNode node, NodeGroupContext context) {
-      NodeDistribution nodeDistribution = new NodeDistribution(NodeDistributionType.NO_CHILD);
-      nodeDistribution.region = node.getRegionReplicaSet();
-      context.putNodeDistribution(node.getPlanNodeId(), nodeDistribution);
-      return node;
-    }
-
-    @Override
-    public PlanNode visitSeriesScan(SeriesScanNode node, NodeGroupContext context) {
-      context.putNodeDistribution(
-          node.getPlanNodeId(),
-          new NodeDistribution(NodeDistributionType.NO_CHILD, node.getRegionReplicaSet()));
-      return node.clone();
-    }
-
-    @Override
-    public PlanNode visitAlignedSeriesScan(AlignedSeriesScanNode node, NodeGroupContext context) {
-      context.putNodeDistribution(
-          node.getPlanNodeId(),
-          new NodeDistribution(NodeDistributionType.NO_CHILD, node.getRegionReplicaSet()));
-      return node.clone();
-    }
-
-    public PlanNode visitSeriesAggregationScan(
-        SeriesAggregationScanNode node, NodeGroupContext context) {
-      context.putNodeDistribution(
-          node.getPlanNodeId(),
-          new NodeDistribution(NodeDistributionType.NO_CHILD, node.getRegionReplicaSet()));
-      return node.clone();
-    }
-
-    public PlanNode visitAlignedSeriesAggregationScan(
-        AlignedSeriesAggregationScanNode node, NodeGroupContext context) {
-      context.putNodeDistribution(
-          node.getPlanNodeId(),
-          new NodeDistribution(NodeDistributionType.NO_CHILD, node.getRegionReplicaSet()));
-      return node.clone();
-    }
-
-    @Override
-    public PlanNode visitTimeJoin(TimeJoinNode node, NodeGroupContext context) {
-      return processMultiChildNode(node, context);
-    }
-
-    public PlanNode visitRowBasedSeriesAggregate(AggregationNode node, NodeGroupContext context) {
-      return processMultiChildNode(node, context);
-    }
-
-    @Override
-    public PlanNode visitGroupByLevel(GroupByLevelNode node, NodeGroupContext context) {
-      return processMultiChildNode(node, context);
-    }
-
-    private PlanNode processMultiChildNode(MultiChildNode node, NodeGroupContext context) {
-      MultiChildNode newNode = (MultiChildNode) node.clone();
-      List<PlanNode> visitedChildren = new ArrayList<>();
-      node.getChildren()
-          .forEach(
-              child -> {
-                visitedChildren.add(visit(child, context));
-              });
-
-      TRegionReplicaSet dataRegion = calculateDataRegionByChildren(visitedChildren, context);
-      NodeDistributionType distributionType =
-          nodeDistributionIsSame(visitedChildren, context)
-              ? NodeDistributionType.SAME_WITH_ALL_CHILDREN
-              : NodeDistributionType.SAME_WITH_SOME_CHILD;
-      context.putNodeDistribution(
-          newNode.getPlanNodeId(), new NodeDistribution(distributionType, dataRegion));
-
-      // If the distributionType of all the children are same, no ExchangeNode need to be added.
-      if (distributionType == NodeDistributionType.SAME_WITH_ALL_CHILDREN) {
-        newNode.setChildren(visitedChildren);
-        return newNode;
-      }
-
-      // Otherwise, we need to add ExchangeNode for the child whose DataRegion is different from the
-      // parent.
-      visitedChildren.forEach(
-          child -> {
-            if (!dataRegion.equals(context.getNodeDistribution(child.getPlanNodeId()).region)) {
-              ExchangeNode exchangeNode =
-                  new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
-              exchangeNode.setChild(child);
-              exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
-              newNode.addChild(exchangeNode);
-            } else {
-              newNode.addChild(child);
-            }
-          });
-      return newNode;
-    }
-
-    private TRegionReplicaSet calculateDataRegionByChildren(
-        List<PlanNode> children, NodeGroupContext context) {
-      // Step 1: calculate the count of children group by DataRegion.
-      Map<TRegionReplicaSet, Long> groupByRegion =
-          children.stream()
-              .collect(
-                  Collectors.groupingBy(
-                      child -> context.getNodeDistribution(child.getPlanNodeId()).region,
-                      Collectors.counting()));
-      // Step 2: return the RegionReplicaSet with max count
-      return Collections.max(groupByRegion.entrySet(), Map.Entry.comparingByValue()).getKey();
-    }
-
-    private TRegionReplicaSet calculateSchemaRegionByChildren(
-        List<PlanNode> children, NodeGroupContext context) {
-      // We always make the schemaRegion of MetaMergeNode to be the same as its first child.
-      return context.getNodeDistribution(children.get(0).getPlanNodeId()).region;
-    }
-
-    private boolean nodeDistributionIsSame(List<PlanNode> children, NodeGroupContext context) {
-      // The size of children here should always be larger than 0, or our code has Bug.
-      NodeDistribution first = context.getNodeDistribution(children.get(0).getPlanNodeId());
-      for (int i = 1; i < children.size(); i++) {
-        NodeDistribution next = context.getNodeDistribution(children.get(i).getPlanNodeId());
-        if (first.region == null || !first.region.equals(next.region)) {
-          return false;
-        }
-      }
-      return true;
-    }
-
-    public PlanNode visit(PlanNode node, NodeGroupContext context) {
-      return node.accept(this, context);
-    }
-  }
-
-  private class NodeGroupContext {
-    private MPPQueryContext queryContext;
-    private Map<PlanNodeId, NodeDistribution> nodeDistributionMap;
-
-    public NodeGroupContext(MPPQueryContext queryContext) {
-      this.queryContext = queryContext;
-      this.nodeDistributionMap = new HashMap<>();
-    }
-
-    public void putNodeDistribution(PlanNodeId nodeId, NodeDistribution distribution) {
-      this.nodeDistributionMap.put(nodeId, distribution);
-    }
-
-    public NodeDistribution getNodeDistribution(PlanNodeId nodeId) {
-      return this.nodeDistributionMap.get(nodeId);
-    }
-  }
-
-  private enum NodeDistributionType {
-    SAME_WITH_ALL_CHILDREN,
-    SAME_WITH_SOME_CHILD,
-    DIFFERENT_FROM_ALL_CHILDREN,
-    NO_CHILD,
-  }
-
-  private class NodeDistribution {
-    private NodeDistributionType type;
-    private TRegionReplicaSet region;
-
-    private NodeDistribution(NodeDistributionType type, TRegionReplicaSet region) {
-      this.type = type;
-      this.region = region;
-    }
-
-    private NodeDistribution(NodeDistributionType type) {
-      this.type = type;
-    }
-  }
-
-  private class FragmentBuilder {
-    private MPPQueryContext context;
-
-    public FragmentBuilder(MPPQueryContext context) {
-      this.context = context;
-    }
-
-    public SubPlan splitToSubPlan(PlanNode root) {
-      SubPlan rootSubPlan = createSubPlan(root);
-      splitToSubPlan(root, rootSubPlan);
-      return rootSubPlan;
-    }
-
-    private void splitToSubPlan(PlanNode root, SubPlan subPlan) {
-      // TODO: (xingtanzjr) we apply no action for IWritePlanNode currently
-      if (root instanceof WritePlanNode) {
-        return;
-      }
-      if (root instanceof ExchangeNode) {
-        // We add a FragmentSinkNode for newly created PlanFragment
-        ExchangeNode exchangeNode = (ExchangeNode) root;
-        FragmentSinkNode sinkNode = new FragmentSinkNode(context.getQueryId().genPlanNodeId());
-        sinkNode.setChild(exchangeNode.getChild());
-        sinkNode.setDownStreamPlanNodeId(exchangeNode.getPlanNodeId());
-
-        // Record the source node info in the ExchangeNode so that we can keep the connection of
-        // these nodes/fragments
-        exchangeNode.setRemoteSourceNode(sinkNode);
-        // We cut off the subtree to make the ExchangeNode as the leaf node of current PlanFragment
-        exchangeNode.cleanChildren();
-
-        // Build the child SubPlan Tree
-        SubPlan childSubPlan = createSubPlan(sinkNode);
-        splitToSubPlan(sinkNode, childSubPlan);
-
-        subPlan.addChild(childSubPlan);
-        return;
-      }
-      for (PlanNode child : root.getChildren()) {
-        splitToSubPlan(child, subPlan);
-      }
-    }
-
-    private SubPlan createSubPlan(PlanNode root) {
-      PlanFragment fragment = new PlanFragment(getNextFragmentId(), root);
-      return new SubPlan(fragment);
-    }
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index f7a35f005d..5a07b64d79 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.utils.MetaUtils;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
-import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.plan.analyze.ExpressionAnalyzer;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
@@ -76,7 +75,6 @@ import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import org.apache.commons.lang.Validate;
-import org.apache.iotdb.tsfile.read.filter.operator.In;
 
 import java.time.ZoneId;
 import java.util.ArrayList;
@@ -605,8 +603,11 @@ public class LogicalPlanBuilder {
     return this;
   }
 
-  public LogicalPlanBuilder planInvalidateSchemaCache(List<PartialPath> paths, List<String> storageGroups) {
-    this.root = new InvalidateSchemaCacheNode(context.getQueryId().genPlanNodeId(), context.getQueryId(), paths, storageGroups);
+  public LogicalPlanBuilder planInvalidateSchemaCache(
+      List<PartialPath> paths, List<String> storageGroups) {
+    this.root =
+        new InvalidateSchemaCacheNode(
+            context.getQueryId().genPlanNodeId(), context.getQueryId(), paths, storageGroups);
     return this;
   }
 
@@ -618,7 +619,8 @@ public class LogicalPlanBuilder {
   }
 
   public LogicalPlanBuilder planDeleteTimeseries(List<PartialPath> paths) {
-    DeleteTimeSeriesNode node = new DeleteTimeSeriesNode(context.getQueryId().genPlanNodeId(), paths);
+    DeleteTimeSeriesNode node =
+        new DeleteTimeSeriesNode(context.getQueryId().genPlanNodeId(), paths);
     node.addChild(this.root);
     this.root = node;
     return this;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index bf08942b92..7fd514ed22 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -539,10 +539,12 @@ public class LogicalPlanner {
     }
 
     @Override
-    public PlanNode visitDeleteTimeseries(DeleteTimeSeriesStatement deleteTimeSeriesStatement, MPPQueryContext context) {
+    public PlanNode visitDeleteTimeseries(
+        DeleteTimeSeriesStatement deleteTimeSeriesStatement, MPPQueryContext context) {
       LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
       List<PartialPath> paths = deleteTimeSeriesStatement.getPaths();
-      List<String> storageGroups = new ArrayList<>(analysis.getSchemaPartitionInfo().getSchemaPartitionMap().keySet());
+      List<String> storageGroups =
+          new ArrayList<>(analysis.getSchemaPartitionInfo().getSchemaPartitionMap().keySet());
       return planBuilder
           .planInvalidateSchemaCache(paths, storageGroups)
           .planDeleteData(paths)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
new file mode 100644
index 0000000000..6f9e16e6ee
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.distribution;
+
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+
+public class DistributionPlanContext {
+  protected MPPQueryContext queryContext;
+
+  protected DistributionPlanContext(MPPQueryContext queryContext) {
+    this.queryContext = queryContext;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
new file mode 100644
index 0000000000..91ca95a5f9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.planner.distribution;
+
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.planner.IFragmentParallelPlaner;
+import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
+import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
+import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
+
+import java.util.List;
+
+public class DistributionPlanner {
+  private Analysis analysis;
+  private MPPQueryContext context;
+  private LogicalQueryPlan logicalPlan;
+
+  private int planFragmentIndex = 0;
+
+  public DistributionPlanner(Analysis analysis, LogicalQueryPlan logicalPlan) {
+    this.analysis = analysis;
+    this.logicalPlan = logicalPlan;
+    this.context = logicalPlan.getContext();
+  }
+
+  public PlanNode rewriteSource() {
+    SourceRewriter rewriter = new SourceRewriter(this.analysis);
+    return rewriter.visit(logicalPlan.getRootNode(), new DistributionPlanContext(context));
+  }
+
+  public PlanNode addExchangeNode(PlanNode root) {
+    ExchangeNodeAdder adder = new ExchangeNodeAdder();
+    return adder.visit(root, new NodeGroupContext(context));
+  }
+
+  public SubPlan splitFragment(PlanNode root) {
+    FragmentBuilder fragmentBuilder = new FragmentBuilder(context);
+    return fragmentBuilder.splitToSubPlan(root);
+  }
+
+  public DistributedQueryPlan planFragments() {
+    PlanNode rootAfterRewrite = rewriteSource();
+    PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
+    if (analysis.getStatement() instanceof QueryStatement) {
+      analysis
+          .getRespDatasetHeader()
+          .setColumnToTsBlockIndexMap(rootWithExchange.getOutputColumnNames());
+    }
+    SubPlan subPlan = splitFragment(rootWithExchange);
+    List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
+    // Only execute this step for READ operation
+    if (context.getQueryType() == QueryType.READ) {
+      SetSinkForRootInstance(subPlan, fragmentInstances);
+    }
+    return new DistributedQueryPlan(
+        logicalPlan.getContext(), subPlan, subPlan.getPlanFragmentList(), fragmentInstances);
+  }
+
+  // Convert fragment to detailed instance
+  // And for parallel-able fragment, clone it into several instances with different params.
+  public List<FragmentInstance> planFragmentInstances(SubPlan subPlan) {
+    IFragmentParallelPlaner parallelPlaner =
+        context.getQueryType() == QueryType.READ
+            ? new SimpleFragmentParallelPlanner(subPlan, analysis, context)
+            : new WriteFragmentParallelPlanner(subPlan, analysis, context);
+    return parallelPlaner.parallelPlan();
+  }
+
+  // TODO: (xingtanzjr) Maybe we should handle ResultNode in LogicalPlanner ?
+  public void SetSinkForRootInstance(SubPlan subPlan, List<FragmentInstance> instances) {
+    FragmentInstance rootInstance = null;
+    for (FragmentInstance instance : instances) {
+      if (instance.getFragment().getId().equals(subPlan.getPlanFragment().getId())) {
+        rootInstance = instance;
+        break;
+      }
+    }
+    // root should not be null during normal process
+    if (rootInstance == null) {
+      return;
+    }
+
+    FragmentSinkNode sinkNode = new FragmentSinkNode(context.getQueryId().genPlanNodeId());
+    sinkNode.setDownStream(
+        context.getLocalDataBlockEndpoint(),
+        context.getResultNodeContext().getVirtualFragmentInstanceId(),
+        context.getResultNodeContext().getVirtualResultNodeId());
+    sinkNode.setChild(rootInstance.getFragment().getRoot());
+    context
+        .getResultNodeContext()
+        .setUpStream(
+            rootInstance.getHostDataNode().dataBlockManagerEndPoint,
+            rootInstance.getId(),
+            sinkNode.getPlanNodeId());
+    rootInstance.getFragment().setRoot(sinkNode);
+  }
+
+  private PlanFragmentId getNextFragmentId() {
+    return new PlanFragmentId(this.logicalPlan.getContext().getQueryId(), this.planFragmentIndex++);
+  }
+
+  private class FragmentBuilder {
+    private MPPQueryContext context;
+
+    public FragmentBuilder(MPPQueryContext context) {
+      this.context = context;
+    }
+
+    public SubPlan splitToSubPlan(PlanNode root) {
+      SubPlan rootSubPlan = createSubPlan(root);
+      splitToSubPlan(root, rootSubPlan);
+      return rootSubPlan;
+    }
+
+    private void splitToSubPlan(PlanNode root, SubPlan subPlan) {
+      // TODO: (xingtanzjr) we apply no action for IWritePlanNode currently
+      if (root instanceof WritePlanNode) {
+        return;
+      }
+      if (root instanceof ExchangeNode) {
+        // We add a FragmentSinkNode for newly created PlanFragment
+        ExchangeNode exchangeNode = (ExchangeNode) root;
+        FragmentSinkNode sinkNode = new FragmentSinkNode(context.getQueryId().genPlanNodeId());
+        sinkNode.setChild(exchangeNode.getChild());
+        sinkNode.setDownStreamPlanNodeId(exchangeNode.getPlanNodeId());
+
+        // Record the source node info in the ExchangeNode so that we can keep the connection of
+        // these nodes/fragments
+        exchangeNode.setRemoteSourceNode(sinkNode);
+        // We cut off the subtree to make the ExchangeNode as the leaf node of current PlanFragment
+        exchangeNode.cleanChildren();
+
+        // Build the child SubPlan Tree
+        SubPlan childSubPlan = createSubPlan(sinkNode);
+        splitToSubPlan(sinkNode, childSubPlan);
+
+        subPlan.addChild(childSubPlan);
+        return;
+      }
+      for (PlanNode child : root.getChildren()) {
+        splitToSubPlan(child, subPlan);
+      }
+    }
+
+    private SubPlan createSubPlan(PlanNode root) {
+      PlanFragment fragment = new PlanFragment(getNextFragmentId(), root);
+      return new SubPlan(fragment);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
new file mode 100644
index 0000000000..23fcc4c96b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.distribution;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.AbstractSchemaMergeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.CountSchemaMergeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchMergeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+
+public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
+  @Override
+  public PlanNode visitPlan(PlanNode node, NodeGroupContext context) {
+    // TODO: (xingtanzjr) we apply no action for IWritePlanNode currently
+    if (node instanceof WritePlanNode) {
+      return node;
+    }
+    // Visit all the children of current node
+    List<PlanNode> children =
+        node.getChildren().stream()
+            .map(child -> child.accept(this, context))
+            .collect(toImmutableList());
+
+    // Calculate the node distribution info according to its children
+
+    // Put the node distribution info into context
+    // NOTICE: we will only process the PlanNode which has only 1 child here. For the other
+    // PlanNode, we need to process
+    // them with special method
+    context.putNodeDistribution(
+        node.getPlanNodeId(),
+        new NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN, null));
+
+    return node.cloneWithChildren(children);
+  }
+
+  @Override
+  public PlanNode visitSchemaQueryMerge(SchemaQueryMergeNode node, NodeGroupContext context) {
+    return internalVisitSchemaMerge(node, context);
+  }
+
+  private PlanNode internalVisitSchemaMerge(
+      AbstractSchemaMergeNode node, NodeGroupContext context) {
+    node.getChildren()
+        .forEach(
+            child -> {
+              visit(child, context);
+            });
+    NodeDistribution nodeDistribution =
+        new NodeDistribution(NodeDistributionType.DIFFERENT_FROM_ALL_CHILDREN);
+    PlanNode newNode = node.clone();
+    nodeDistribution.region = calculateSchemaRegionByChildren(node.getChildren(), context);
+    context.putNodeDistribution(newNode.getPlanNodeId(), nodeDistribution);
+    node.getChildren()
+        .forEach(
+            child -> {
+              if (!nodeDistribution.region.equals(
+                  context.getNodeDistribution(child.getPlanNodeId()).region)) {
+                ExchangeNode exchangeNode =
+                    new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
+                exchangeNode.setChild(child);
+                exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
+                newNode.addChild(exchangeNode);
+              } else {
+                newNode.addChild(child);
+              }
+            });
+    return newNode;
+  }
+
+  @Override
+  public PlanNode visitCountMerge(CountSchemaMergeNode node, NodeGroupContext context) {
+    return internalVisitSchemaMerge(node, context);
+  }
+
+  @Override
+  public PlanNode visitSchemaQueryScan(SchemaQueryScanNode node, NodeGroupContext context) {
+    NodeDistribution nodeDistribution = new NodeDistribution(NodeDistributionType.NO_CHILD);
+    nodeDistribution.region = node.getRegionReplicaSet();
+    context.putNodeDistribution(node.getPlanNodeId(), nodeDistribution);
+    return node;
+  }
+
+  @Override
+  public PlanNode visitSchemaFetchMerge(SchemaFetchMergeNode node, NodeGroupContext context) {
+    return internalVisitSchemaMerge(node, context);
+  }
+
+  @Override
+  public PlanNode visitSchemaFetchScan(SchemaFetchScanNode node, NodeGroupContext context) {
+    NodeDistribution nodeDistribution = new NodeDistribution(NodeDistributionType.NO_CHILD);
+    nodeDistribution.region = node.getRegionReplicaSet();
+    context.putNodeDistribution(node.getPlanNodeId(), nodeDistribution);
+    return node;
+  }
+
+  @Override
+  public PlanNode visitSeriesScan(SeriesScanNode node, NodeGroupContext context) {
+    context.putNodeDistribution(
+        node.getPlanNodeId(),
+        new NodeDistribution(NodeDistributionType.NO_CHILD, node.getRegionReplicaSet()));
+    return node.clone();
+  }
+
+  @Override
+  public PlanNode visitAlignedSeriesScan(AlignedSeriesScanNode node, NodeGroupContext context) {
+    context.putNodeDistribution(
+        node.getPlanNodeId(),
+        new NodeDistribution(NodeDistributionType.NO_CHILD, node.getRegionReplicaSet()));
+    return node.clone();
+  }
+
+  public PlanNode visitSeriesAggregationScan(
+      SeriesAggregationScanNode node, NodeGroupContext context) {
+    context.putNodeDistribution(
+        node.getPlanNodeId(),
+        new NodeDistribution(NodeDistributionType.NO_CHILD, node.getRegionReplicaSet()));
+    return node.clone();
+  }
+
+  public PlanNode visitAlignedSeriesAggregationScan(
+      AlignedSeriesAggregationScanNode node, NodeGroupContext context) {
+    context.putNodeDistribution(
+        node.getPlanNodeId(),
+        new NodeDistribution(NodeDistributionType.NO_CHILD, node.getRegionReplicaSet()));
+    return node.clone();
+  }
+
+  @Override
+  public PlanNode visitTimeJoin(TimeJoinNode node, NodeGroupContext context) {
+    return processMultiChildNode(node, context);
+  }
+
+  public PlanNode visitRowBasedSeriesAggregate(AggregationNode node, NodeGroupContext context) {
+    return processMultiChildNode(node, context);
+  }
+
+  @Override
+  public PlanNode visitGroupByLevel(GroupByLevelNode node, NodeGroupContext context) {
+    return processMultiChildNode(node, context);
+  }
+
+  private PlanNode processMultiChildNode(MultiChildNode node, NodeGroupContext context) {
+    MultiChildNode newNode = (MultiChildNode) node.clone();
+    List<PlanNode> visitedChildren = new ArrayList<>();
+    node.getChildren()
+        .forEach(
+            child -> {
+              visitedChildren.add(visit(child, context));
+            });
+
+    TRegionReplicaSet dataRegion = calculateDataRegionByChildren(visitedChildren, context);
+    NodeDistributionType distributionType =
+        nodeDistributionIsSame(visitedChildren, context)
+            ? NodeDistributionType.SAME_WITH_ALL_CHILDREN
+            : NodeDistributionType.SAME_WITH_SOME_CHILD;
+    context.putNodeDistribution(
+        newNode.getPlanNodeId(), new NodeDistribution(distributionType, dataRegion));
+
+    // If the distributionType of all the children are same, no ExchangeNode need to be added.
+    if (distributionType == NodeDistributionType.SAME_WITH_ALL_CHILDREN) {
+      newNode.setChildren(visitedChildren);
+      return newNode;
+    }
+
+    // Otherwise, we need to add ExchangeNode for the child whose DataRegion is different from the
+    // parent.
+    visitedChildren.forEach(
+        child -> {
+          if (!dataRegion.equals(context.getNodeDistribution(child.getPlanNodeId()).region)) {
+            ExchangeNode exchangeNode =
+                new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
+            exchangeNode.setChild(child);
+            exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
+            newNode.addChild(exchangeNode);
+          } else {
+            newNode.addChild(child);
+          }
+        });
+    return newNode;
+  }
+
+  private TRegionReplicaSet calculateDataRegionByChildren(
+      List<PlanNode> children, NodeGroupContext context) {
+    // Step 1: calculate the count of children group by DataRegion.
+    Map<TRegionReplicaSet, Long> groupByRegion =
+        children.stream()
+            .collect(
+                Collectors.groupingBy(
+                    child -> context.getNodeDistribution(child.getPlanNodeId()).region,
+                    Collectors.counting()));
+    // Step 2: return the RegionReplicaSet with max count
+    return Collections.max(groupByRegion.entrySet(), Map.Entry.comparingByValue()).getKey();
+  }
+
+  private TRegionReplicaSet calculateSchemaRegionByChildren(
+      List<PlanNode> children, NodeGroupContext context) {
+    // We always make the schemaRegion of MetaMergeNode to be the same as its first child.
+    return context.getNodeDistribution(children.get(0).getPlanNodeId()).region;
+  }
+
+  private boolean nodeDistributionIsSame(List<PlanNode> children, NodeGroupContext context) {
+    // The size of children here should always be larger than 0, or our code has Bug.
+    NodeDistribution first = context.getNodeDistribution(children.get(0).getPlanNodeId());
+    for (int i = 1; i < children.size(); i++) {
+      NodeDistribution next = context.getNodeDistribution(children.get(i).getPlanNodeId());
+      if (first.region == null || !first.region.equals(next.region)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public PlanNode visit(PlanNode node, NodeGroupContext context) {
+    return node.accept(this, context);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/NodeDistribution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/NodeDistribution.java
new file mode 100644
index 0000000000..fb4561460d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/NodeDistribution.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.distribution;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+public class NodeDistribution {
+  protected NodeDistributionType type;
+  protected TRegionReplicaSet region;
+
+  protected NodeDistribution(NodeDistributionType type, TRegionReplicaSet region) {
+    this.type = type;
+    this.region = region;
+  }
+
+  protected NodeDistribution(NodeDistributionType type) {
+    this.type = type;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/NodeDistributionType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/NodeDistributionType.java
new file mode 100644
index 0000000000..be34998958
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/NodeDistributionType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.distribution;
+
+public enum NodeDistributionType {
+  SAME_WITH_ALL_CHILDREN,
+  SAME_WITH_SOME_CHILD,
+  DIFFERENT_FROM_ALL_CHILDREN,
+  NO_CHILD,
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/NodeGroupContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/NodeGroupContext.java
new file mode 100644
index 0000000000..ba3ddb400c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/NodeGroupContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.distribution;
+
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class NodeGroupContext {
+  protected MPPQueryContext queryContext;
+  protected Map<PlanNodeId, NodeDistribution> nodeDistributionMap;
+
+  public NodeGroupContext(MPPQueryContext queryContext) {
+    this.queryContext = queryContext;
+    this.nodeDistributionMap = new HashMap<>();
+  }
+
+  public void putNodeDistribution(PlanNodeId nodeId, NodeDistribution distribution) {
+    this.nodeDistributionMap.put(nodeId, distribution);
+  }
+
+  public NodeDistribution getNodeDistribution(PlanNodeId nodeId) {
+    return this.nodeDistributionMap.get(nodeId);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
similarity index 97%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SimpleFragmentParallelPlanner.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 5b06be36fe..fd991196df 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -16,12 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.plan.planner;
+package org.apache.iotdb.db.mpp.plan.planner.distribution;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.planner.IFragmentParallelPlaner;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
 import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
new file mode 100644
index 0000000000..973d40b3f8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.distribution;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.SimplePlanNodeRewriter;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.CountSchemaMergeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchMergeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesSourceNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanContext> {
+
+  private Analysis analysis;
+
+  public SourceRewriter(Analysis analysis) {
+    this.analysis = analysis;
+  }
+
+  // TODO: (xingtanzjr) implement the method visitDeviceMergeNode()
+  public PlanNode visitDeviceMerge(TimeJoinNode node, DistributionPlanContext context) {
+    return null;
+  }
+
+  @Override
+  public PlanNode visitSchemaQueryMerge(
+      SchemaQueryMergeNode node, DistributionPlanContext context) {
+    SchemaQueryMergeNode root = (SchemaQueryMergeNode) node.clone();
+    SchemaQueryScanNode seed = (SchemaQueryScanNode) node.getChildren().get(0);
+    TreeSet<TRegionReplicaSet> schemaRegions =
+        new TreeSet<>(Comparator.comparingInt(region -> region.getRegionId().getId()));
+    analysis
+        .getSchemaPartitionInfo()
+        .getSchemaPartitionMap()
+        .forEach(
+            (storageGroup, deviceGroup) -> {
+              deviceGroup.forEach(
+                  (deviceGroupId, schemaRegionReplicaSet) ->
+                      schemaRegions.add(schemaRegionReplicaSet));
+            });
+    int count = schemaRegions.size();
+    schemaRegions.forEach(
+        region -> {
+          SchemaQueryScanNode schemaQueryScanNode = (SchemaQueryScanNode) seed.clone();
+          schemaQueryScanNode.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+          schemaQueryScanNode.setRegionReplicaSet(region);
+          if (count > 1) {
+            schemaQueryScanNode.setLimit(
+                schemaQueryScanNode.getOffset() + schemaQueryScanNode.getLimit());
+            schemaQueryScanNode.setOffset(0);
+          }
+          root.addChild(schemaQueryScanNode);
+        });
+    return root;
+  }
+
+  @Override
+  public PlanNode visitCountMerge(CountSchemaMergeNode node, DistributionPlanContext context) {
+    CountSchemaMergeNode root = (CountSchemaMergeNode) node.clone();
+    SchemaQueryScanNode seed = (SchemaQueryScanNode) node.getChildren().get(0);
+    Set<TRegionReplicaSet> schemaRegions = new HashSet<>();
+    analysis
+        .getSchemaPartitionInfo()
+        .getSchemaPartitionMap()
+        .forEach(
+            (storageGroup, deviceGroup) -> {
+              deviceGroup.forEach(
+                  (deviceGroupId, schemaRegionReplicaSet) ->
+                      schemaRegions.add(schemaRegionReplicaSet));
+            });
+    schemaRegions.forEach(
+        region -> {
+          SchemaQueryScanNode schemaQueryScanNode = (SchemaQueryScanNode) seed.clone();
+          schemaQueryScanNode.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+          schemaQueryScanNode.setRegionReplicaSet(region);
+          root.addChild(schemaQueryScanNode);
+        });
+    return root;
+  }
+
+  // TODO: (xingtanzjr) a temporary way to resolve the distribution of single SeriesScanNode issue
+  @Override
+  public PlanNode visitSeriesScan(SeriesScanNode node, DistributionPlanContext context) {
+    List<TRegionReplicaSet> dataDistribution =
+        analysis.getPartitionInfo(node.getSeriesPath(), node.getTimeFilter());
+    if (dataDistribution.size() == 1) {
+      node.setRegionReplicaSet(dataDistribution.get(0));
+      return node;
+    }
+    TimeJoinNode timeJoinNode =
+        new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder());
+    for (TRegionReplicaSet dataRegion : dataDistribution) {
+      SeriesScanNode split = (SeriesScanNode) node.clone();
+      split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+      split.setRegionReplicaSet(dataRegion);
+      timeJoinNode.addChild(split);
+    }
+    return timeJoinNode;
+  }
+
+  @Override
+  public PlanNode visitSeriesAggregationScan(
+      SeriesAggregationScanNode node, DistributionPlanContext context) {
+    return processSeriesAggregationSource(node, context);
+  }
+
+  @Override
+  public PlanNode visitAlignedSeriesAggregationScan(
+      AlignedSeriesAggregationScanNode node, DistributionPlanContext context) {
+    return processSeriesAggregationSource(node, context);
+  }
+
+  private PlanNode processSeriesAggregationSource(
+      SeriesAggregationSourceNode node, DistributionPlanContext context) {
+    List<TRegionReplicaSet> dataDistribution =
+        analysis.getPartitionInfo(node.getPartitionPath(), node.getPartitionTimeFilter());
+    if (dataDistribution.size() == 1) {
+      node.setRegionReplicaSet(dataDistribution.get(0));
+      return node;
+    }
+    List<AggregationDescriptor> leafAggDescriptorList = new ArrayList<>();
+    node.getAggregationDescriptorList()
+        .forEach(
+            descriptor -> {
+              leafAggDescriptorList.add(
+                  new AggregationDescriptor(
+                      descriptor.getAggregationType(),
+                      AggregationStep.PARTIAL,
+                      descriptor.getInputExpressions()));
+            });
+
+    List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
+    node.getAggregationDescriptorList()
+        .forEach(
+            descriptor -> {
+              rootAggDescriptorList.add(
+                  new AggregationDescriptor(
+                      descriptor.getAggregationType(),
+                      AggregationStep.FINAL,
+                      descriptor.getInputExpressions()));
+            });
+
+    AggregationNode aggregationNode =
+        new AggregationNode(
+            context.queryContext.getQueryId().genPlanNodeId(), rootAggDescriptorList);
+    for (TRegionReplicaSet dataRegion : dataDistribution) {
+      SeriesAggregationScanNode split = (SeriesAggregationScanNode) node.clone();
+      split.setAggregationDescriptorList(leafAggDescriptorList);
+      split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+      split.setRegionReplicaSet(dataRegion);
+      aggregationNode.addChild(split);
+    }
+    return aggregationNode;
+  }
+
+  @Override
+  public PlanNode visitAlignedSeriesScan(
+      AlignedSeriesScanNode node, DistributionPlanContext context) {
+    List<TRegionReplicaSet> dataDistribution =
+        analysis.getPartitionInfo(node.getAlignedPath(), node.getTimeFilter());
+    if (dataDistribution.size() == 1) {
+      node.setRegionReplicaSet(dataDistribution.get(0));
+      return node;
+    }
+    TimeJoinNode timeJoinNode =
+        new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder());
+    for (TRegionReplicaSet dataRegion : dataDistribution) {
+      AlignedSeriesScanNode split = (AlignedSeriesScanNode) node.clone();
+      split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+      split.setRegionReplicaSet(dataRegion);
+      timeJoinNode.addChild(split);
+    }
+    return timeJoinNode;
+  }
+
+  @Override
+  public PlanNode visitSchemaFetchMerge(
+      SchemaFetchMergeNode node, DistributionPlanContext context) {
+    SchemaFetchMergeNode root = (SchemaFetchMergeNode) node.clone();
+    Map<String, Set<TRegionReplicaSet>> storageGroupSchemaRegionMap = new HashMap<>();
+    analysis
+        .getSchemaPartitionInfo()
+        .getSchemaPartitionMap()
+        .forEach(
+            (storageGroup, deviceGroup) -> {
+              storageGroupSchemaRegionMap.put(storageGroup, new HashSet<>());
+              deviceGroup.forEach(
+                  (deviceGroupId, schemaRegionReplicaSet) ->
+                      storageGroupSchemaRegionMap.get(storageGroup).add(schemaRegionReplicaSet));
+            });
+
+    for (PlanNode child : node.getChildren()) {
+      for (TRegionReplicaSet schemaRegion :
+          storageGroupSchemaRegionMap.get(
+              ((SchemaFetchScanNode) child).getStorageGroup().getFullPath())) {
+        SchemaFetchScanNode schemaFetchScanNode = (SchemaFetchScanNode) child.clone();
+        schemaFetchScanNode.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+        schemaFetchScanNode.setRegionReplicaSet(schemaRegion);
+        root.addChild(schemaFetchScanNode);
+      }
+    }
+    return root;
+  }
+
+  @Override
+  public PlanNode visitTimeJoin(TimeJoinNode node, DistributionPlanContext context) {
+    // Although some logic is similar between Aggregation and RawDataQuery,
+    // we still use separate method to process the distribution planning now
+    // to make the planning procedure more clear
+    if (isAggregationQuery(node)) {
+      return planAggregationWithTimeJoin(node, context);
+    }
+
+    TimeJoinNode root = (TimeJoinNode) node.clone();
+    // Step 1: Get all source nodes. For the node which is not source, add it as the child of
+    // current TimeJoinNode
+    List<SourceNode> sources = new ArrayList<>();
+    for (PlanNode child : node.getChildren()) {
+      if (child instanceof SeriesSourceNode) {
+        // If the child is SeriesScanNode, we need to check whether this node should be seperated
+        // into several splits.
+        SeriesSourceNode handle = (SeriesSourceNode) child;
+        List<TRegionReplicaSet> dataDistribution =
+            analysis.getPartitionInfo(handle.getPartitionPath(), handle.getPartitionTimeFilter());
+        // If the size of dataDistribution is m, this SeriesScanNode should be seperated into m
+        // SeriesScanNode.
+        for (TRegionReplicaSet dataRegion : dataDistribution) {
+          SeriesSourceNode split = (SeriesSourceNode) handle.clone();
+          split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+          split.setRegionReplicaSet(dataRegion);
+          sources.add(split);
+        }
+      }
+    }
+    // Step 2: For the source nodes, group them by the DataRegion.
+    Map<TRegionReplicaSet, List<SourceNode>> sourceGroup =
+        sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
+
+    // Step 3: For the source nodes which belong to same data region, add a TimeJoinNode for them
+    // and make the
+    // new TimeJoinNode as the child of current TimeJoinNode
+    // TODO: (xingtanzjr) optimize the procedure here to remove duplicated TimeJoinNode
+    final boolean[] addParent = {false};
+    sourceGroup.forEach(
+        (dataRegion, seriesScanNodes) -> {
+          if (seriesScanNodes.size() == 1) {
+            root.addChild(seriesScanNodes.get(0));
+          } else {
+            if (!addParent[0]) {
+              seriesScanNodes.forEach(root::addChild);
+              addParent[0] = true;
+            } else {
+              // We clone a TimeJoinNode from root to make the params to be consistent.
+              // But we need to assign a new ID to it
+              TimeJoinNode parentOfGroup = (TimeJoinNode) root.clone();
+              parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+              seriesScanNodes.forEach(parentOfGroup::addChild);
+              root.addChild(parentOfGroup);
+            }
+          }
+        });
+
+    // Process the other children which are not SeriesSourceNode
+    for (PlanNode child : node.getChildren()) {
+      if (!(child instanceof SeriesSourceNode)) {
+        // In a general logical query plan, the children of TimeJoinNode should only be
+        // SeriesScanNode or SeriesAggregateScanNode
+        // So this branch should not be touched.
+        root.addChild(visit(child, context));
+      }
+    }
+
+    return root;
+  }
+
+  private boolean isAggregationQuery(TimeJoinNode node) {
+    for (PlanNode child : node.getChildren()) {
+      if (child instanceof SeriesAggregationScanNode
+          || child instanceof AlignedSeriesAggregationScanNode) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private PlanNode planAggregationWithTimeJoin(TimeJoinNode root, DistributionPlanContext context) {
+
+    List<SeriesAggregationSourceNode> sources = splitAggregationSourceByPartition(root, context);
+    Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup =
+        sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
+
+    // construct AggregationDescriptor for AggregationNode
+    List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
+    for (PlanNode child : root.getChildren()) {
+      SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode) child;
+      handle
+          .getAggregationDescriptorList()
+          .forEach(
+              descriptor -> {
+                rootAggDescriptorList.add(
+                    new AggregationDescriptor(
+                        descriptor.getAggregationType(),
+                        AggregationStep.FINAL,
+                        descriptor.getInputExpressions()));
+              });
+    }
+    AggregationNode aggregationNode =
+        new AggregationNode(
+            context.queryContext.getQueryId().genPlanNodeId(), rootAggDescriptorList);
+
+    final boolean[] addParent = {false};
+    sourceGroup.forEach(
+        (dataRegion, sourceNodes) -> {
+          if (sourceNodes.size() == 1) {
+            aggregationNode.addChild(sourceNodes.get(0));
+          } else {
+            if (!addParent[0]) {
+              sourceNodes.forEach(aggregationNode::addChild);
+              addParent[0] = true;
+            } else {
+              // We clone a TimeJoinNode from root to make the params to be consistent.
+              // But we need to assign a new ID to it
+              TimeJoinNode parentOfGroup = (TimeJoinNode) root.clone();
+              parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+              sourceNodes.forEach(parentOfGroup::addChild);
+              aggregationNode.addChild(parentOfGroup);
+            }
+          }
+        });
+
+    return aggregationNode;
+  }
+
+  public PlanNode visitGroupByLevel(GroupByLevelNode root, DistributionPlanContext context) {
+    // Firstly, we build the tree structure for GroupByLevelNode
+    List<SeriesAggregationSourceNode> sources = splitAggregationSourceByPartition(root, context);
+    Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup =
+        sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
+
+    GroupByLevelNode newRoot = (GroupByLevelNode) root.clone();
+    final boolean[] addParent = {false};
+    sourceGroup.forEach(
+        (dataRegion, sourceNodes) -> {
+          if (sourceNodes.size() == 1) {
+            newRoot.addChild(sourceNodes.get(0));
+          } else {
+            if (!addParent[0]) {
+              sourceNodes.forEach(newRoot::addChild);
+              addParent[0] = true;
+            } else {
+              // We clone a TimeJoinNode from root to make the params to be consistent.
+              // But we need to assign a new ID to it
+              GroupByLevelNode parentOfGroup = (GroupByLevelNode) root.clone();
+              parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+              sourceNodes.forEach(parentOfGroup::addChild);
+              newRoot.addChild(parentOfGroup);
+            }
+          }
+        });
+
+    // Then, we calculate the attributes for GroupByLevelNode in each level
+    calculateGroupByLevelNodeAttributes(newRoot, 0);
+    return newRoot;
+  }
+
+  private void calculateGroupByLevelNodeAttributes(PlanNode node, int level) {
+    if (node == null) {
+      return;
+    }
+    node.getChildren().forEach(child -> calculateGroupByLevelNodeAttributes(child, level + 1));
+    if (!(node instanceof GroupByLevelNode)) {
+      return;
+    }
+    GroupByLevelNode handle = (GroupByLevelNode) node;
+
+    // Construct all outputColumns from children. Using Set here to avoid duplication
+    Set<String> childrenOutputColumns = new HashSet<>();
+    handle
+        .getChildren()
+        .forEach(child -> childrenOutputColumns.addAll(child.getOutputColumnNames()));
+
+    // Check every OutputColumn of GroupByLevelNode and set the Expression of corresponding
+    // AggregationDescriptor
+    List<GroupByLevelDescriptor> descriptorList = new ArrayList<>();
+    for (GroupByLevelDescriptor originalDescriptor : handle.getGroupByLevelDescriptors()) {
+      List<Expression> descriptorExpression = new ArrayList<>();
+      for (String childColumn : childrenOutputColumns) {
+        // If this condition matched, the childColumn should come from GroupByLevelNode
+        if (isAggColumnMatchExpression(childColumn, originalDescriptor.getOutputExpression())) {
+          descriptorExpression.add(originalDescriptor.getOutputExpression());
+          continue;
+        }
+        for (Expression exp : originalDescriptor.getInputExpressions()) {
+          if (isAggColumnMatchExpression(childColumn, exp)) {
+            descriptorExpression.add(exp);
+          }
+        }
+      }
+      if (descriptorExpression.size() == 0) {
+        continue;
+      }
+      GroupByLevelDescriptor descriptor = originalDescriptor.deepClone();
+      descriptor.setStep(level == 0 ? AggregationStep.FINAL : AggregationStep.PARTIAL);
+      descriptor.setInputExpressions(descriptorExpression);
+
+      descriptorList.add(descriptor);
+    }
+    handle.setGroupByLevelDescriptors(descriptorList);
+  }
+
+  // TODO: (xingtanzjr) need to confirm the logic when processing UDF
+  private boolean isAggColumnMatchExpression(String columnName, Expression expression) {
+    if (columnName == null) {
+      return false;
+    }
+    return columnName.contains(expression.getExpressionString());
+  }
+
+  private List<SeriesAggregationSourceNode> splitAggregationSourceByPartition(
+      MultiChildNode root, DistributionPlanContext context) {
+    // Step 1: split SeriesAggregationSourceNode according to data partition
+    List<SeriesAggregationSourceNode> sources = new ArrayList<>();
+    Map<PartialPath, Integer> regionCountPerSeries = new HashMap<>();
+    for (PlanNode child : root.getChildren()) {
+      SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode) child;
+      List<TRegionReplicaSet> dataDistribution =
+          analysis.getPartitionInfo(handle.getPartitionPath(), handle.getPartitionTimeFilter());
+      for (TRegionReplicaSet dataRegion : dataDistribution) {
+        SeriesAggregationSourceNode split = (SeriesAggregationSourceNode) handle.clone();
+        split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+        split.setRegionReplicaSet(dataRegion);
+        // Let each split reference different object of AggregationDescriptorList
+        split.setAggregationDescriptorList(
+            handle.getAggregationDescriptorList().stream()
+                .map(AggregationDescriptor::deepClone)
+                .collect(Collectors.toList()));
+        sources.add(split);
+      }
+      regionCountPerSeries.put(handle.getPartitionPath(), dataDistribution.size());
+    }
+
+    // Step 2: change the step for each SeriesAggregationSourceNode according to its split count
+    for (SeriesAggregationSourceNode source : sources) {
+      //        boolean isFinal = regionCountPerSeries.get(source.getPartitionPath()) == 1;
+      // TODO: (xingtanzjr) need to optimize this step later. We make it as Partial now.
+      boolean isFinal = false;
+      source
+          .getAggregationDescriptorList()
+          .forEach(d -> d.setStep(isFinal ? AggregationStep.FINAL : AggregationStep.PARTIAL));
+    }
+    return sources;
+  }
+
+  public PlanNode visit(PlanNode node, DistributionPlanContext context) {
+    return node.accept(this, context);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/WriteFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
similarity index 95%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/WriteFragmentParallelPlanner.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
index 6925194e9c..fb91a0c2fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/WriteFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/WriteFragmentParallelPlanner.java
@@ -17,10 +17,11 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.planner;
+package org.apache.iotdb.db.mpp.plan.planner.distribution;
 
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.planner.IFragmentParallelPlaner;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
 import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
index 5882cfb66b..58f0153825 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
@@ -40,7 +40,7 @@ import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
-import org.apache.iotdb.db.mpp.plan.planner.DistributionPlanner;
+import org.apache.iotdb.db.mpp.plan.planner.distribution.DistributionPlanner;
 import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;