You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/24 09:47:41 UTC

[iotdb] branch xingtanzjr/delete_timeseries_plan updated: complete basic distribution

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/delete_timeseries_plan
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/xingtanzjr/delete_timeseries_plan by this push:
     new 3eae1e0349 complete basic distribution
3eae1e0349 is described below

commit 3eae1e0349a751d2ea6a56dc2a7d16b38cbcc968
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue May 24 17:46:56 2022 +0800

    complete basic distribution
---
 .../commons/partition/RegionReplicaSetInfo.java    |   8 +-
 .../iotdb/commons/partition/SchemaPartition.java   |  13 +
 .../planner/distribution/DistributionPlanner.java  |   3 +
 .../planner/distribution/ExchangeNodeAdder.java    |  28 +
 .../plan/planner/distribution/SourceRewriter.java  |  57 +-
 .../iotdb/db/mpp/plan/planner/plan/SubPlan.java    |   2 +-
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java |   5 +
 .../node/metedata/write/DeleteTimeSeriesNode.java  |  32 +-
 .../planner/plan/node/write/DeleteDataNode.java    |  27 +-
 .../metadata/DeleteTimeSeriesStatement.java        |   2 +-
 .../db/mpp/plan/plan/DistributionPlannerTest.java  | 934 ---------------------
 .../distribution/AggregationDistributionTest.java  | 412 +++++++++
 .../db/mpp/plan/plan/distribution/BasicTest.java   | 378 +++++++++
 .../plan/distribution/DeleteTimeseriesTest.java    |  71 ++
 .../iotdb/db/mpp/plan/plan/distribution/util.java  | 191 +++++
 15 files changed, 1213 insertions(+), 950 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSetInfo.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSetInfo.java
index 298547d866..f2017fdb24 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSetInfo.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSetInfo.java
@@ -22,15 +22,17 @@ package org.apache.iotdb.commons.partition;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 public class RegionReplicaSetInfo {
   private TRegionReplicaSet regionReplicaSet;
-  private List<String> ownedStorageGroups;
+  private Set<String> ownedStorageGroups;
 
   public RegionReplicaSetInfo(TRegionReplicaSet regionReplicaSet) {
     this.regionReplicaSet = regionReplicaSet;
-    this.ownedStorageGroups = new ArrayList<>();
+    this.ownedStorageGroups = new HashSet<>();
   }
 
   public void addStorageGroup(String storageGroup) {
@@ -42,6 +44,6 @@ public class RegionReplicaSetInfo {
   }
 
   public List<String> getOwnedStorageGroups() {
-    return ownedStorageGroups;
+    return new ArrayList<>(ownedStorageGroups);
   }
 }
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
index 2b05700add..200f6107b5 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java
@@ -223,6 +223,19 @@ public class SchemaPartition extends Partition {
     return result;
   }
 
+  public List<RegionReplicaSetInfo> getSchemaDistributionInfo() {
+    Map<TRegionReplicaSet, RegionReplicaSetInfo> distributionMap = new HashMap<>();
+    schemaPartitionMap.forEach(
+        (storageGroup, partition) -> {
+          for (TRegionReplicaSet regionReplicaSet : partition.values()) {
+            distributionMap
+                .computeIfAbsent(regionReplicaSet, RegionReplicaSetInfo::new)
+                .addStorageGroup(storageGroup);
+          }
+        });
+    return new ArrayList<>(distributionMap.values());
+  }
+
   private void writeMap(
       Map<TSeriesPartitionSlot, TRegionReplicaSet> valueMap,
       OutputStream outputStream,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
index 91ca95a5f9..3be4a6daf6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
 import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
@@ -66,7 +67,9 @@ public class DistributionPlanner {
 
   public DistributedQueryPlan planFragments() {
     PlanNode rootAfterRewrite = rewriteSource();
+    System.out.println(PlanNodeUtil.nodeToString(rootAfterRewrite));
     PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
+    System.out.println(PlanNodeUtil.nodeToString(rootWithExchange));
     if (analysis.getStatement() instanceof QueryStatement) {
       analysis
           .getRespDatasetHeader()
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
index 781c84b28e..daef502c7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchM
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
@@ -39,6 +40,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNo
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -155,6 +157,32 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
     return node.clone();
   }
 
+  public PlanNode visitDeleteData(DeleteDataNode node, NodeGroupContext context) {
+    context.putNodeDistribution(
+        node.getPlanNodeId(),
+        new NodeDistribution(NodeDistributionType.NO_CHILD, node.getRegionReplicaSet()));
+    return node;
+  }
+
+  public PlanNode visitDeleteTimeseries(DeleteTimeSeriesNode node, NodeGroupContext context) {
+    List<PlanNode> visitedChildren = new ArrayList<>();
+    node.getChildren()
+        .forEach(
+            child -> {
+              visitedChildren.add(visit(child, context));
+            });
+    node.getChildren().clear();
+    visitedChildren.forEach(
+        child -> {
+          ExchangeNode exchangeNode =
+              new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
+          exchangeNode.setChild(child);
+          exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
+          node.addChild(exchangeNode);
+        });
+    return node;
+  }
+
   @Override
   public PlanNode visitTimeJoin(TimeJoinNode node, NodeGroupContext context) {
     return processMultiChildNode(node, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 752383eff2..d0ba535c26 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -60,6 +60,8 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanContext> {
 
   private Analysis analysis;
@@ -75,7 +77,44 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
 
   public PlanNode visitDeleteTimeseries(
       DeleteTimeSeriesNode node, DistributionPlanContext context) {
-    return null;
+    // Step 1: split DeleteDataNode by partition
+    checkArgument(node.getChildren().size() == 1, "DeleteTimeSeriesNode should have 1 child");
+    checkArgument(
+        node.getChildren().get(0) instanceof DeleteDataNode,
+        "Child of DeleteTimeSeriesNode should be DeleteDataNode");
+
+    DeleteDataNode deleteDataNode = (DeleteDataNode) node.getChildren().get(0);
+    List<DeleteDataNode> deleteDataNodes = splitDeleteDataNode(deleteDataNode, context);
+
+    // Step 2: split DeleteTimeseriesNode by partition
+    List<DeleteTimeSeriesNode> deleteTimeSeriesNodes = splitDeleteTimeseries(node, context);
+
+    // Step 3: construct them as a Tree
+    checkArgument(
+        deleteTimeSeriesNodes.size() > 0,
+        "Size of DeleteTimeseriesNode splits should be larger than 0");
+    deleteDataNodes.forEach(split -> deleteTimeSeriesNodes.get(0).addChild(split));
+    for (int i = 1; i < deleteTimeSeriesNodes.size(); i++) {
+      deleteTimeSeriesNodes.get(i).addChild(deleteTimeSeriesNodes.get(i - 1));
+    }
+    return deleteTimeSeriesNodes.get(deleteTimeSeriesNodes.size() - 1);
+  }
+
+  private List<DeleteTimeSeriesNode> splitDeleteTimeseries(
+      DeleteTimeSeriesNode node, DistributionPlanContext context) {
+    List<DeleteTimeSeriesNode> ret = new ArrayList<>();
+    List<PartialPath> rawPaths = node.getPathList();
+    List<RegionReplicaSetInfo> relatedRegions =
+        analysis.getSchemaPartitionInfo().getSchemaDistributionInfo();
+    for (RegionReplicaSetInfo regionReplicaSetInfo : relatedRegions) {
+      List<PartialPath> newPaths =
+          getRelatedPaths(rawPaths, regionReplicaSetInfo.getOwnedStorageGroups());
+      DeleteTimeSeriesNode split =
+          new DeleteTimeSeriesNode(context.queryContext.getQueryId().genPlanNodeId(), newPaths);
+      split.setRegionReplicaSet(regionReplicaSetInfo.getRegionReplicaSet());
+      ret.add(split);
+    }
+    return ret;
   }
 
   private List<DeleteDataNode> splitDeleteDataNode(
@@ -84,9 +123,19 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
     List<PartialPath> rawPaths = node.getPathList();
     List<RegionReplicaSetInfo> relatedRegions =
         analysis.getDataPartitionInfo().getDataDistributionInfo();
-    for (RegionReplicaSetInfo regionReplicaSetInfo : relatedRegions) {}
-
-    return null;
+    for (RegionReplicaSetInfo regionReplicaSetInfo : relatedRegions) {
+      List<PartialPath> newPaths =
+          getRelatedPaths(rawPaths, regionReplicaSetInfo.getOwnedStorageGroups());
+      DeleteDataNode split =
+          new DeleteDataNode(
+              context.queryContext.getQueryId().genPlanNodeId(),
+              context.queryContext.getQueryId(),
+              newPaths,
+              regionReplicaSetInfo.getOwnedStorageGroups());
+      split.setRegionReplicaSet(regionReplicaSetInfo.getRegionReplicaSet());
+      ret.add(split);
+    }
+    return ret;
   }
 
   private List<PartialPath> getRelatedPaths(List<PartialPath> paths, List<String> storageGroups) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/SubPlan.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/SubPlan.java
index b922c37c8a..7ac6f92d0a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/SubPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/SubPlan.java
@@ -62,7 +62,7 @@ public class SubPlan {
     result.add(this.planFragment);
     this.children.forEach(
         child -> {
-          result.add(child.getPlanFragment());
+          result.addAll(child.getPlanFragmentList());
         });
     return result;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 4c7f1a5587..2af623bfbe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -59,6 +59,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNo
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
@@ -260,4 +261,8 @@ public abstract class PlanVisitor<R, C> {
   public R visitDeleteTimeseries(DeleteTimeSeriesNode node, C context) {
     return visitPlan(node, context);
   }
+
+  public R visitDeleteData(DeleteDataNode node, C context) {
+    return visitPlan(node, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/DeleteTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/DeleteTimeSeriesNode.java
index 05356144cb..454db8da22 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/DeleteTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/write/DeleteTimeSeriesNode.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.IPartitionRelatedNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.ByteBuffer;
@@ -36,9 +37,13 @@ public class DeleteTimeSeriesNode extends PlanNode implements IPartitionRelatedN
 
   private final List<PartialPath> pathList;
 
+  private TRegionReplicaSet regionReplicaSet;
+  private List<PlanNode> children;
+
   public DeleteTimeSeriesNode(PlanNodeId id, List<PartialPath> pathList) {
     super(id);
     this.pathList = pathList;
+    this.children = new ArrayList<>();
   }
 
   public List<PartialPath> getPathList() {
@@ -47,11 +52,13 @@ public class DeleteTimeSeriesNode extends PlanNode implements IPartitionRelatedN
 
   @Override
   public List<PlanNode> getChildren() {
-    return null;
+    return children;
   }
 
   @Override
-  public void addChild(PlanNode child) {}
+  public void addChild(PlanNode child) {
+    children.add(child);
+  }
 
   @Override
   public PlanNode clone() {
@@ -60,7 +67,7 @@ public class DeleteTimeSeriesNode extends PlanNode implements IPartitionRelatedN
 
   @Override
   public int allowedChildCount() {
-    return 0;
+    return CHILD_COUNT_NO_LIMIT;
   }
 
   @Override
@@ -68,6 +75,11 @@ public class DeleteTimeSeriesNode extends PlanNode implements IPartitionRelatedN
     return null;
   }
 
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitDeleteTimeseries(this, context);
+  }
+
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.DELETE_TIMESERIES.serialize(byteBuffer);
@@ -89,6 +101,18 @@ public class DeleteTimeSeriesNode extends PlanNode implements IPartitionRelatedN
 
   @Override
   public TRegionReplicaSet getRegionReplicaSet() {
-    return null;
+    return regionReplicaSet;
+  }
+
+  public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {
+    this.regionReplicaSet = regionReplicaSet;
+  }
+
+  public String toString() {
+    return String.format(
+        "DeleteTimeseriesNode-%s: %s. Region: %s",
+        getPlanNodeId(),
+        pathList,
+        regionReplicaSet == null ? "Not Assigned" : regionReplicaSet.getRegionId());
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java
index 3e16c1021e..a594bba0c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/DeleteDataNode.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.IPartitionRelatedNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.ByteBuffer;
@@ -39,6 +40,8 @@ public class DeleteDataNode extends PlanNode implements IPartitionRelatedNode {
   private final List<PartialPath> pathList;
   private final List<String> storageGroups;
 
+  private TRegionReplicaSet regionReplicaSet;
+
   public DeleteDataNode(
       PlanNodeId id, QueryId queryId, List<PartialPath> pathList, List<String> storageGroups) {
     super(id);
@@ -53,7 +56,7 @@ public class DeleteDataNode extends PlanNode implements IPartitionRelatedNode {
 
   @Override
   public List<PlanNode> getChildren() {
-    return null;
+    return new ArrayList<>();
   }
 
   @Override
@@ -66,7 +69,7 @@ public class DeleteDataNode extends PlanNode implements IPartitionRelatedNode {
 
   @Override
   public int allowedChildCount() {
-    return 0;
+    return NO_CHILD_ALLOWED;
   }
 
   @Override
@@ -104,9 +107,18 @@ public class DeleteDataNode extends PlanNode implements IPartitionRelatedNode {
     return new DeleteDataNode(planNodeId, queryId, pathList, storageGroups);
   }
 
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitDeleteData(this, context);
+  }
+
   @Override
   public TRegionReplicaSet getRegionReplicaSet() {
-    return null;
+    return regionReplicaSet;
+  }
+
+  public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {
+    this.regionReplicaSet = regionReplicaSet;
   }
 
   public QueryId getQueryId() {
@@ -116,4 +128,13 @@ public class DeleteDataNode extends PlanNode implements IPartitionRelatedNode {
   public List<String> getStorageGroups() {
     return storageGroups;
   }
+
+  public String toString() {
+    return String.format(
+        "DeleteDataNode-%s[ Paths: %s, StorageGroups: %s, Region: %s ]",
+        getPlanNodeId(),
+        pathList,
+        storageGroups,
+        regionReplicaSet == null ? "Not Assigned" : regionReplicaSet.getRegionId());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/DeleteTimeSeriesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/DeleteTimeSeriesStatement.java
index edcccd58e0..99cb5b57ff 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/DeleteTimeSeriesStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/DeleteTimeSeriesStatement.java
@@ -31,7 +31,7 @@ public class DeleteTimeSeriesStatement extends Statement {
 
   @Override
   public List<PartialPath> getPaths() {
-    return null;
+    return partialPaths;
   }
 
   public void setPartialPaths(List<PartialPath> partialPaths) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
deleted file mode 100644
index 58f0153825..0000000000
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
+++ /dev/null
@@ -1,934 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.mpp.plan.plan;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.partition.DataPartition;
-import org.apache.iotdb.commons.partition.SchemaPartition;
-import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.metadata.path.AlignedPath;
-import org.apache.iotdb.db.metadata.path.MeasurementPath;
-import org.apache.iotdb.db.mpp.common.MPPQueryContext;
-import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
-import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
-import org.apache.iotdb.db.mpp.plan.planner.distribution.DistributionPlanner;
-import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
-import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
-import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
-import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
-import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
-import org.apache.iotdb.db.query.aggregation.AggregationType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class DistributionPlannerTest {
-
-  @Test
-  public void testSingleSeriesScan() throws IllegalPathException {
-    QueryId queryId = new QueryId("test_query");
-    SeriesScanNode root =
-        new SeriesScanNode(
-            queryId.genPlanNodeId(),
-            new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
-            OrderBy.TIMESTAMP_ASC);
-
-    Analysis analysis = constructAnalysis();
-
-    MPPQueryContext context =
-        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
-    DistributionPlanner planner =
-        new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
-    DistributedQueryPlan plan = planner.planFragments();
-    assertEquals(2, plan.getInstances().size());
-  }
-
-  @Test
-  public void testSingleSeriesScanRewriteSource() throws IllegalPathException {
-    QueryId queryId = new QueryId("test_query");
-    SeriesScanNode root =
-        new SeriesScanNode(
-            queryId.genPlanNodeId(),
-            new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
-            OrderBy.TIMESTAMP_ASC);
-
-    Analysis analysis = constructAnalysis();
-
-    MPPQueryContext context =
-        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
-    DistributionPlanner planner =
-        new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
-    PlanNode rootAfterRewrite = planner.rewriteSource();
-    assertEquals(2, rootAfterRewrite.getChildren().size());
-  }
-
-  @Test
-  public void testRewriteSourceNode() throws IllegalPathException {
-    QueryId queryId = new QueryId("test_query");
-
-    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
-
-    timeJoinNode.addChild(
-        new SeriesScanNode(
-            queryId.genPlanNodeId(),
-            new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
-            OrderBy.TIMESTAMP_ASC));
-    timeJoinNode.addChild(
-        new SeriesScanNode(
-            queryId.genPlanNodeId(),
-            new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
-            OrderBy.TIMESTAMP_ASC));
-    timeJoinNode.addChild(
-        new SeriesScanNode(
-            queryId.genPlanNodeId(),
-            new MeasurementPath("root.sg.d22.s1", TSDataType.INT32),
-            OrderBy.TIMESTAMP_ASC));
-
-    LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
-
-    Analysis analysis = constructAnalysis();
-
-    DistributionPlanner planner =
-        new DistributionPlanner(analysis, new LogicalQueryPlan(new MPPQueryContext(queryId), root));
-    PlanNode newRoot = planner.rewriteSource();
-    assertEquals(4, newRoot.getChildren().get(0).getChildren().size());
-  }
-
-  @Test
-  public void testRewriteMetaSourceNode() throws IllegalPathException {
-    QueryId queryId = new QueryId("test_query");
-    SchemaQueryMergeNode metaMergeNode = new SchemaQueryMergeNode(queryId.genPlanNodeId(), false);
-    metaMergeNode.addChild(
-        new TimeSeriesSchemaScanNode(
-            queryId.genPlanNodeId(),
-            new PartialPath("root.sg.d1.s1"),
-            null,
-            null,
-            10,
-            0,
-            false,
-            false,
-            false));
-    metaMergeNode.addChild(
-        new TimeSeriesSchemaScanNode(
-            queryId.genPlanNodeId(),
-            new PartialPath("root.sg.d1.s2"),
-            null,
-            null,
-            10,
-            0,
-            false,
-            false,
-            false));
-    metaMergeNode.addChild(
-        new TimeSeriesSchemaScanNode(
-            queryId.genPlanNodeId(),
-            new PartialPath("root.sg.d22.s1"),
-            null,
-            null,
-            10,
-            0,
-            false,
-            false,
-            false));
-    LimitNode root2 = new LimitNode(queryId.genPlanNodeId(), metaMergeNode, 10);
-    Analysis analysis = constructAnalysis();
-    DistributionPlanner planner2 =
-        new DistributionPlanner(
-            analysis, new LogicalQueryPlan(new MPPQueryContext(queryId), root2));
-    PlanNode newRoot2 = planner2.rewriteSource();
-    assertEquals(newRoot2.getChildren().get(0).getChildren().size(), 2);
-  }
-
-  @Test
-  public void testAddExchangeNode() throws IllegalPathException {
-    QueryId queryId = new QueryId("test_query");
-    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
-
-    timeJoinNode.addChild(
-        new SeriesScanNode(
-            queryId.genPlanNodeId(),
-            new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
-            OrderBy.TIMESTAMP_ASC));
-    timeJoinNode.addChild(
-        new SeriesScanNode(
-            queryId.genPlanNodeId(),
-            new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
-            OrderBy.TIMESTAMP_ASC));
-    timeJoinNode.addChild(
-        new SeriesScanNode(
-            queryId.genPlanNodeId(),
-            new MeasurementPath("root.sg.d22.s1", TSDataType.INT32),
-            OrderBy.TIMESTAMP_ASC));
-
-    LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
-
-    Analysis analysis = constructAnalysis();
-
-    DistributionPlanner planner =
-        new DistributionPlanner(analysis, new LogicalQueryPlan(new MPPQueryContext(queryId), root));
-    PlanNode rootAfterRewrite = planner.rewriteSource();
-    PlanNode rootWithExchange = planner.addExchangeNode(rootAfterRewrite);
-    assertEquals(4, rootWithExchange.getChildren().get(0).getChildren().size());
-    int exchangeNodeCount = 0;
-    for (PlanNode child : rootWithExchange.getChildren().get(0).getChildren()) {
-      exchangeNodeCount += child instanceof ExchangeNode ? 1 : 0;
-    }
-    assertEquals(2, exchangeNodeCount);
-  }
-
-  @Test
-  public void testSplitFragment() throws IllegalPathException {
-    QueryId queryId = new QueryId("test_query");
-    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
-
-    timeJoinNode.addChild(
-        new SeriesScanNode(
-            queryId.genPlanNodeId(),
-            new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
-            OrderBy.TIMESTAMP_ASC));
-    timeJoinNode.addChild(
-        new SeriesScanNode(
-            queryId.genPlanNodeId(),
-            new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
-            OrderBy.TIMESTAMP_ASC));
-    timeJoinNode.addChild(
-        new SeriesScanNode(
-            queryId.genPlanNodeId(),
-            new MeasurementPath("root.sg.d22.s1", TSDataType.INT32),
-            OrderBy.TIMESTAMP_ASC));
-
-    LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
-
-    Analysis analysis = constructAnalysis();
-
-    MPPQueryContext context =
-        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
-    DistributionPlanner planner =
-        new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
-    PlanNode rootAfterRewrite = planner.rewriteSource();
-    PlanNode rootWithExchange = planner.addExchangeNode(rootAfterRewrite);
-    SubPlan subPlan = planner.splitFragment(rootWithExchange);
-    assertEquals(subPlan.getChildren().size(), 2);
-  }
-
-  @Test
-  public void testParallelPlan() throws IllegalPathException {
-    QueryId queryId = new QueryId("test_query");
-    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
-
-    timeJoinNode.addChild(
-        new SeriesScanNode(
-            queryId.genPlanNodeId(),
-            new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
-            OrderBy.TIMESTAMP_ASC));
-    timeJoinNode.addChild(
-        new SeriesScanNode(
-            queryId.genPlanNodeId(),
-            new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
-            OrderBy.TIMESTAMP_ASC));
-    timeJoinNode.addChild(
-        new SeriesScanNode(
-            queryId.genPlanNodeId(),
-            new MeasurementPath("root.sg.d333.s1", TSDataType.INT32),
-            OrderBy.TIMESTAMP_ASC));
-
-    LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
-
-    Analysis analysis = constructAnalysis();
-
-    MPPQueryContext context =
-        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
-    DistributionPlanner planner =
-        new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
-    DistributedQueryPlan plan = planner.planFragments();
-    assertEquals(3, plan.getInstances().size());
-  }
-
-  @Test
-  public void testTimeJoinAggregationSinglePerRegion() throws IllegalPathException {
-    QueryId queryId = new QueryId("test_query_time_join_aggregation");
-    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
-    String d1s1Path = "root.sg.d1.s1";
-    timeJoinNode.addChild(genAggregationSourceNode(queryId, d1s1Path, AggregationType.COUNT));
-
-    String d2s1Path = "root.sg.d22.s1";
-    timeJoinNode.addChild(genAggregationSourceNode(queryId, d2s1Path, AggregationType.COUNT));
-
-    Analysis analysis = constructAnalysis();
-    MPPQueryContext context =
-        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
-    DistributionPlanner planner =
-        new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
-    DistributedQueryPlan plan = planner.planFragments();
-    assertEquals(3, plan.getInstances().size());
-    Map<String, AggregationStep> expectedStep = new HashMap<>();
-    expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
-    expectedStep.put(d2s1Path, AggregationStep.PARTIAL);
-    List<FragmentInstance> fragmentInstances = plan.getInstances();
-    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
-  }
-
-  private void verifyAggregationStep(Map<String, AggregationStep> expected, PlanNode root) {
-    if (root == null) {
-      return;
-    }
-    if (root instanceof SeriesAggregationSourceNode) {
-      SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode) root;
-      List<AggregationDescriptor> descriptorList = handle.getAggregationDescriptorList();
-      descriptorList.forEach(
-          d -> {
-            assertEquals(expected.get(handle.getPartitionPath().getFullPath()), d.getStep());
-          });
-    }
-    root.getChildren().forEach(child -> verifyAggregationStep(expected, child));
-  }
-
-  @Test
-  public void testTimeJoinAggregationMultiPerRegion() throws IllegalPathException {
-    QueryId queryId = new QueryId("test_query_time_join_aggregation");
-    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
-    String d1s1Path = "root.sg.d1.s1";
-    timeJoinNode.addChild(genAggregationSourceNode(queryId, d1s1Path, AggregationType.COUNT));
-
-    String d3s1Path = "root.sg.d333.s1";
-    timeJoinNode.addChild(genAggregationSourceNode(queryId, d3s1Path, AggregationType.COUNT));
-
-    Analysis analysis = constructAnalysis();
-    MPPQueryContext context =
-        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
-    DistributionPlanner planner =
-        new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
-    DistributedQueryPlan plan = planner.planFragments();
-    assertEquals(3, plan.getInstances().size());
-    Map<String, AggregationStep> expectedStep = new HashMap<>();
-    expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
-    expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
-    List<FragmentInstance> fragmentInstances = plan.getInstances();
-    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
-  }
-
-  @Test
-  public void testTimeJoinAggregationMultiPerRegion2() throws IllegalPathException {
-    QueryId queryId = new QueryId("test_query_time_join_aggregation");
-    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
-    String d3s1Path = "root.sg.d333.s1";
-    timeJoinNode.addChild(genAggregationSourceNode(queryId, d3s1Path, AggregationType.COUNT));
-
-    String d4s1Path = "root.sg.d4444.s1";
-    timeJoinNode.addChild(genAggregationSourceNode(queryId, d4s1Path, AggregationType.COUNT));
-    Analysis analysis = constructAnalysis();
-    MPPQueryContext context =
-        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
-    DistributionPlanner planner =
-        new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
-    DistributedQueryPlan plan = planner.planFragments();
-    assertEquals(2, plan.getInstances().size());
-    Map<String, AggregationStep> expectedStep = new HashMap<>();
-    expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
-    expectedStep.put(d4s1Path, AggregationStep.PARTIAL);
-    List<FragmentInstance> fragmentInstances = plan.getInstances();
-    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
-  }
-
-  @Test
-  public void testGroupByLevelWithTwoChildren() throws IllegalPathException {
-    QueryId queryId = new QueryId("test_group_by_level_two_children");
-    String d1s1Path = "root.sg.d1.s1";
-    String d2s1Path = "root.sg.d22.s1";
-    String groupedPath = "root.sg.*.s1";
-
-    GroupByLevelNode groupByLevelNode =
-        new GroupByLevelNode(
-            new PlanNodeId("TestGroupByLevelNode"),
-            Arrays.asList(
-                genAggregationSourceNode(queryId, d1s1Path, AggregationType.COUNT),
-                genAggregationSourceNode(queryId, d2s1Path, AggregationType.COUNT)),
-            Collections.singletonList(
-                new GroupByLevelDescriptor(
-                    AggregationType.COUNT,
-                    AggregationStep.FINAL,
-                    Arrays.asList(
-                        new TimeSeriesOperand(new PartialPath(d1s1Path)),
-                        new TimeSeriesOperand(new PartialPath(d2s1Path))),
-                    new TimeSeriesOperand(new PartialPath(groupedPath)))));
-    Analysis analysis = constructAnalysis();
-    MPPQueryContext context =
-        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
-    DistributionPlanner planner =
-        new DistributionPlanner(analysis, new LogicalQueryPlan(context, groupByLevelNode));
-    DistributedQueryPlan plan = planner.planFragments();
-    assertEquals(3, plan.getInstances().size());
-    Map<String, AggregationStep> expectedStep = new HashMap<>();
-    expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
-    expectedStep.put(d2s1Path, AggregationStep.PARTIAL);
-    List<FragmentInstance> fragmentInstances = plan.getInstances();
-    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
-  }
-
-  @Test
-  public void testAggregationWithMultiGroupByLevelNode() throws IllegalPathException {
-    QueryId queryId = new QueryId("test_group_by_level_two_children");
-    String d3s1Path = "root.sg.d333.s1";
-    String d4s1Path = "root.sg.d4444.s1";
-    String groupedPath = "root.sg.*.s1";
-
-    GroupByLevelNode groupByLevelNode =
-        new GroupByLevelNode(
-            new PlanNodeId("TestGroupByLevelNode"),
-            Arrays.asList(
-                genAggregationSourceNode(queryId, d3s1Path, AggregationType.COUNT),
-                genAggregationSourceNode(queryId, d4s1Path, AggregationType.COUNT)),
-            Collections.singletonList(
-                new GroupByLevelDescriptor(
-                    AggregationType.COUNT,
-                    AggregationStep.FINAL,
-                    Arrays.asList(
-                        new TimeSeriesOperand(new PartialPath(d3s1Path)),
-                        new TimeSeriesOperand(new PartialPath(d4s1Path))),
-                    new TimeSeriesOperand(new PartialPath(groupedPath)))));
-    Analysis analysis = constructAnalysis();
-    MPPQueryContext context =
-        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
-    DistributionPlanner planner =
-        new DistributionPlanner(analysis, new LogicalQueryPlan(context, groupByLevelNode));
-    DistributedQueryPlan plan = planner.planFragments();
-    assertEquals(2, plan.getInstances().size());
-    Map<String, AggregationStep> expectedStep = new HashMap<>();
-    expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
-    expectedStep.put(d4s1Path, AggregationStep.PARTIAL);
-    List<FragmentInstance> fragmentInstances = plan.getInstances();
-    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
-
-    Map<String, List<String>> expectedDescriptorValue = new HashMap<>();
-    expectedDescriptorValue.put(groupedPath, Arrays.asList(groupedPath, d3s1Path, d4s1Path));
-    verifyGroupByLevelDescriptor(
-        expectedDescriptorValue,
-        (GroupByLevelNode) fragmentInstances.get(0).getFragment().getRoot().getChildren().get(0));
-
-    Map<String, List<String>> expectedDescriptorValue2 = new HashMap<>();
-    expectedDescriptorValue2.put(groupedPath, Arrays.asList(d3s1Path, d4s1Path));
-    verifyGroupByLevelDescriptor(
-        expectedDescriptorValue2,
-        (GroupByLevelNode) fragmentInstances.get(1).getFragment().getRoot().getChildren().get(0));
-  }
-
-  @Test
-  public void testGroupByLevelTwoSeries() throws IllegalPathException {
-    QueryId queryId = new QueryId("test_group_by_level_two_series");
-    String d1s1Path = "root.sg.d1.s1";
-    String d1s2Path = "root.sg.d1.s2";
-    String groupedPathS1 = "root.sg.*.s1";
-    String groupedPathS2 = "root.sg.*.s2";
-
-    GroupByLevelNode groupByLevelNode =
-        new GroupByLevelNode(
-            new PlanNodeId("TestGroupByLevelNode"),
-            Arrays.asList(
-                genAggregationSourceNode(queryId, d1s1Path, AggregationType.COUNT),
-                genAggregationSourceNode(queryId, d1s2Path, AggregationType.COUNT)),
-            Arrays.asList(
-                new GroupByLevelDescriptor(
-                    AggregationType.COUNT,
-                    AggregationStep.FINAL,
-                    Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s1Path))),
-                    new TimeSeriesOperand(new PartialPath(groupedPathS1))),
-                new GroupByLevelDescriptor(
-                    AggregationType.COUNT,
-                    AggregationStep.FINAL,
-                    Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s2Path))),
-                    new TimeSeriesOperand(new PartialPath(groupedPathS2)))));
-    Analysis analysis = constructAnalysis();
-    MPPQueryContext context =
-        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
-    DistributionPlanner planner =
-        new DistributionPlanner(analysis, new LogicalQueryPlan(context, groupByLevelNode));
-    DistributedQueryPlan plan = planner.planFragments();
-    assertEquals(2, plan.getInstances().size());
-    Map<String, AggregationStep> expectedStep = new HashMap<>();
-    expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
-    expectedStep.put(d1s2Path, AggregationStep.PARTIAL);
-    List<FragmentInstance> fragmentInstances = plan.getInstances();
-    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
-
-    Map<String, List<String>> expectedDescriptorValue = new HashMap<>();
-    expectedDescriptorValue.put(groupedPathS1, Arrays.asList(groupedPathS1, d1s1Path));
-    expectedDescriptorValue.put(groupedPathS2, Arrays.asList(groupedPathS2, d1s2Path));
-    verifyGroupByLevelDescriptor(
-        expectedDescriptorValue,
-        (GroupByLevelNode) fragmentInstances.get(0).getFragment().getRoot().getChildren().get(0));
-
-    Map<String, List<String>> expectedDescriptorValue2 = new HashMap<>();
-    expectedDescriptorValue2.put(groupedPathS1, Collections.singletonList(d1s1Path));
-    expectedDescriptorValue2.put(groupedPathS2, Collections.singletonList(d1s2Path));
-    verifyGroupByLevelDescriptor(
-        expectedDescriptorValue2,
-        (GroupByLevelNode) fragmentInstances.get(1).getFragment().getRoot().getChildren().get(0));
-  }
-
-  @Test
-  public void testGroupByLevel2Series2Devices3Regions() throws IllegalPathException {
-    QueryId queryId = new QueryId("test_group_by_level_two_series");
-    String d1s1Path = "root.sg.d1.s1";
-    String d1s2Path = "root.sg.d1.s2";
-    String d2s1Path = "root.sg.d22.s1";
-    String groupedPathS1 = "root.sg.*.s1";
-    String groupedPathS2 = "root.sg.*.s2";
-
-    GroupByLevelNode groupByLevelNode =
-        new GroupByLevelNode(
-            new PlanNodeId("TestGroupByLevelNode"),
-            Arrays.asList(
-                genAggregationSourceNode(queryId, d1s1Path, AggregationType.COUNT),
-                genAggregationSourceNode(queryId, d1s2Path, AggregationType.COUNT),
-                genAggregationSourceNode(queryId, d2s1Path, AggregationType.COUNT)),
-            Arrays.asList(
-                new GroupByLevelDescriptor(
-                    AggregationType.COUNT,
-                    AggregationStep.FINAL,
-                    Arrays.asList(
-                        new TimeSeriesOperand(new PartialPath(d1s1Path)),
-                        new TimeSeriesOperand(new PartialPath(d2s1Path))),
-                    new TimeSeriesOperand(new PartialPath(groupedPathS1))),
-                new GroupByLevelDescriptor(
-                    AggregationType.COUNT,
-                    AggregationStep.FINAL,
-                    Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s2Path))),
-                    new TimeSeriesOperand(new PartialPath(groupedPathS2)))));
-    Analysis analysis = constructAnalysis();
-    MPPQueryContext context =
-        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
-    DistributionPlanner planner =
-        new DistributionPlanner(analysis, new LogicalQueryPlan(context, groupByLevelNode));
-    DistributedQueryPlan plan = planner.planFragments();
-    assertEquals(3, plan.getInstances().size());
-    Map<String, AggregationStep> expectedStep = new HashMap<>();
-    expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
-    expectedStep.put(d1s2Path, AggregationStep.PARTIAL);
-    expectedStep.put(d2s1Path, AggregationStep.PARTIAL);
-    List<FragmentInstance> fragmentInstances = plan.getInstances();
-    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
-
-    Map<String, List<String>> expectedDescriptorValue = new HashMap<>();
-    expectedDescriptorValue.put(groupedPathS1, Arrays.asList(groupedPathS1, d1s1Path, d2s1Path));
-    expectedDescriptorValue.put(groupedPathS2, Arrays.asList(groupedPathS2, d1s2Path));
-    verifyGroupByLevelDescriptor(
-        expectedDescriptorValue,
-        (GroupByLevelNode) fragmentInstances.get(0).getFragment().getRoot().getChildren().get(0));
-
-    Map<String, List<String>> expectedDescriptorValue2 = new HashMap<>();
-    expectedDescriptorValue2.put(groupedPathS1, Collections.singletonList(d1s1Path));
-    expectedDescriptorValue2.put(groupedPathS2, Collections.singletonList(d1s2Path));
-    verifyGroupByLevelDescriptor(
-        expectedDescriptorValue2,
-        (GroupByLevelNode) fragmentInstances.get(2).getFragment().getRoot().getChildren().get(0));
-  }
-
-  @Test
-  public void testAggregation1Series1Region() throws IllegalPathException {
-    QueryId queryId = new QueryId("test_aggregation_1_series_1_region");
-    String d2s1Path = "root.sg.d22.s1";
-
-    PlanNode root = genAggregationSourceNode(queryId, d2s1Path, AggregationType.COUNT);
-    Analysis analysis = constructAnalysis();
-    MPPQueryContext context =
-        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
-    DistributionPlanner planner =
-        new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
-    DistributedQueryPlan plan = planner.planFragments();
-    assertEquals(1, plan.getInstances().size());
-    assertEquals(root, plan.getInstances().get(0).getFragment().getRoot().getChildren().get(0));
-  }
-
-  private void verifyGroupByLevelDescriptor(
-      Map<String, List<String>> expected, GroupByLevelNode node) {
-    List<GroupByLevelDescriptor> descriptors = node.getGroupByLevelDescriptors();
-    assertEquals(expected.size(), descriptors.size());
-    for (GroupByLevelDescriptor descriptor : descriptors) {
-      String outputExpression = descriptor.getOutputExpression().getExpressionString();
-      assertEquals(expected.get(outputExpression).size(), descriptor.getInputExpressions().size());
-      for (Expression inputExpression : descriptor.getInputExpressions()) {
-        assertTrue(expected.get(outputExpression).contains(inputExpression.getExpressionString()));
-      }
-    }
-  }
-
-  private SeriesAggregationSourceNode genAggregationSourceNode(
-      QueryId queryId, String path, AggregationType type) throws IllegalPathException {
-    List<AggregationDescriptor> descriptors = new ArrayList<>();
-    descriptors.add(
-        new AggregationDescriptor(
-            type,
-            AggregationStep.FINAL,
-            Collections.singletonList(new TimeSeriesOperand(new PartialPath(path)))));
-
-    return new SeriesAggregationScanNode(
-        queryId.genPlanNodeId(), new MeasurementPath(path, TSDataType.INT32), descriptors);
-  }
-
-  @Test
-  public void testParallelPlanWithAlignedSeries() throws IllegalPathException {
-    QueryId queryId = new QueryId("test_query_aligned");
-    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
-
-    timeJoinNode.addChild(
-        new AlignedSeriesScanNode(
-            queryId.genPlanNodeId(),
-            new AlignedPath("root.sg.d1", Arrays.asList("s1", "s2")),
-            OrderBy.TIMESTAMP_ASC));
-    timeJoinNode.addChild(
-        new SeriesScanNode(
-            queryId.genPlanNodeId(),
-            new MeasurementPath("root.sg.d333.s1", TSDataType.INT32),
-            OrderBy.TIMESTAMP_ASC));
-
-    LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
-    Analysis analysis = constructAnalysis();
-
-    MPPQueryContext context =
-        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
-    DistributionPlanner planner =
-        new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
-    DistributedQueryPlan plan = planner.planFragments();
-    assertEquals(3, plan.getInstances().size());
-  }
-
-  @Test
-  public void testSingleAlignedSeries() throws IllegalPathException {
-    QueryId queryId = new QueryId("test_query_aligned");
-    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
-
-    timeJoinNode.addChild(
-        new AlignedSeriesScanNode(
-            queryId.genPlanNodeId(),
-            new AlignedPath("root.sg.d22", Arrays.asList("s1", "s2")),
-            OrderBy.TIMESTAMP_ASC));
-
-    LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
-    Analysis analysis = constructAnalysis();
-
-    MPPQueryContext context =
-        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
-    DistributionPlanner planner =
-        new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
-    DistributedQueryPlan plan = planner.planFragments();
-    assertEquals(1, plan.getInstances().size());
-  }
-
-  @Test
-  public void testInsertRowNodeParallelPlan() throws IllegalPathException {
-    QueryId queryId = new QueryId("test_write");
-    InsertRowNode insertRowNode =
-        new InsertRowNode(
-            queryId.genPlanNodeId(),
-            new PartialPath("root.sg.d1"),
-            false,
-            new String[] {
-              "s1",
-            },
-            new TSDataType[] {TSDataType.INT32},
-            1L,
-            new Object[] {10},
-            false);
-    insertRowNode.setMeasurementSchemas(
-        new MeasurementSchema[] {
-          new MeasurementSchema("s1", TSDataType.INT32),
-        });
-
-    Analysis analysis = constructAnalysis();
-
-    MPPQueryContext context =
-        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
-    context.setQueryType(QueryType.WRITE);
-    DistributionPlanner planner =
-        new DistributionPlanner(analysis, new LogicalQueryPlan(context, insertRowNode));
-    DistributedQueryPlan plan = planner.planFragments();
-    assertEquals(1, plan.getInstances().size());
-  }
-
-  @Test
-  public void testInsertRowsNodeParallelPlan() throws IllegalPathException {
-    QueryId queryId = new QueryId("test_write");
-    InsertRowNode insertRowNode1 =
-        new InsertRowNode(
-            queryId.genPlanNodeId(),
-            new PartialPath("root.sg.d1"),
-            false,
-            new String[] {"s1"},
-            new TSDataType[] {TSDataType.INT32},
-            1L,
-            new Object[] {10},
-            false);
-    insertRowNode1.setMeasurementSchemas(
-        new MeasurementSchema[] {
-          new MeasurementSchema("s1", TSDataType.INT32),
-        });
-
-    InsertRowNode insertRowNode2 =
-        new InsertRowNode(
-            queryId.genPlanNodeId(),
-            new PartialPath("root.sg.d1"),
-            false,
-            new String[] {"s1"},
-            new TSDataType[] {TSDataType.INT32},
-            100000L,
-            new Object[] {10},
-            false);
-    insertRowNode2.setMeasurementSchemas(
-        new MeasurementSchema[] {
-          new MeasurementSchema("s1", TSDataType.INT32),
-        });
-
-    InsertRowsNode node = new InsertRowsNode(queryId.genPlanNodeId());
-    node.setInsertRowNodeList(Arrays.asList(insertRowNode1, insertRowNode2));
-    node.setInsertRowNodeIndexList(Arrays.asList(0, 1));
-
-    Analysis analysis = constructAnalysis();
-
-    MPPQueryContext context =
-        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
-    context.setQueryType(QueryType.WRITE);
-    DistributionPlanner planner =
-        new DistributionPlanner(analysis, new LogicalQueryPlan(context, node));
-    DistributedQueryPlan plan = planner.planFragments();
-    assertEquals(1, plan.getInstances().size());
-  }
-
-  private Analysis constructAnalysis() throws IllegalPathException {
-
-    SeriesPartitionExecutor executor =
-        SeriesPartitionExecutor.getSeriesPartitionExecutor(
-            IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
-            IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
-    Analysis analysis = new Analysis();
-
-    String device1 = "root.sg.d1";
-    String device2 = "root.sg.d22";
-    String device3 = "root.sg.d333";
-    String device4 = "root.sg.d4444";
-
-    DataPartition dataPartition =
-        new DataPartition(
-            IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
-            IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
-
-    Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
-        dataPartitionMap = new HashMap<>();
-    Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> sgPartitionMap =
-        new HashMap<>();
-
-    List<TRegionReplicaSet> d1DataRegions = new ArrayList<>();
-    d1DataRegions.add(
-        new TRegionReplicaSet(
-            new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
-            Arrays.asList(
-                new TDataNodeLocation()
-                    .setDataNodeId(11)
-                    .setExternalEndPoint(new TEndPoint("192.0.1.1", 9000)),
-                new TDataNodeLocation()
-                    .setDataNodeId(12)
-                    .setExternalEndPoint(new TEndPoint("192.0.1.2", 9000)))));
-    d1DataRegions.add(
-        new TRegionReplicaSet(
-            new TConsensusGroupId(TConsensusGroupType.DataRegion, 2),
-            Arrays.asList(
-                new TDataNodeLocation()
-                    .setDataNodeId(21)
-                    .setExternalEndPoint(new TEndPoint("192.0.2.1", 9000)),
-                new TDataNodeLocation()
-                    .setDataNodeId(22)
-                    .setExternalEndPoint(new TEndPoint("192.0.2.2", 9000)))));
-    Map<TTimePartitionSlot, List<TRegionReplicaSet>> d1DataRegionMap = new HashMap<>();
-    d1DataRegionMap.put(new TTimePartitionSlot(), d1DataRegions);
-
-    List<TRegionReplicaSet> d2DataRegions = new ArrayList<>();
-    d2DataRegions.add(
-        new TRegionReplicaSet(
-            new TConsensusGroupId(TConsensusGroupType.DataRegion, 3),
-            Arrays.asList(
-                new TDataNodeLocation()
-                    .setDataNodeId(31)
-                    .setExternalEndPoint(new TEndPoint("192.0.3.1", 9000)),
-                new TDataNodeLocation()
-                    .setDataNodeId(32)
-                    .setExternalEndPoint(new TEndPoint("192.0.3.2", 9000)))));
-    Map<TTimePartitionSlot, List<TRegionReplicaSet>> d2DataRegionMap = new HashMap<>();
-    d2DataRegionMap.put(new TTimePartitionSlot(), d2DataRegions);
-
-    List<TRegionReplicaSet> d3DataRegions = new ArrayList<>();
-    d3DataRegions.add(
-        new TRegionReplicaSet(
-            new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
-            Arrays.asList(
-                new TDataNodeLocation()
-                    .setDataNodeId(11)
-                    .setExternalEndPoint(new TEndPoint("192.0.1.1", 9000)),
-                new TDataNodeLocation()
-                    .setDataNodeId(12)
-                    .setExternalEndPoint(new TEndPoint("192.0.1.2", 9000)))));
-    d3DataRegions.add(
-        new TRegionReplicaSet(
-            new TConsensusGroupId(TConsensusGroupType.DataRegion, 4),
-            Arrays.asList(
-                new TDataNodeLocation()
-                    .setDataNodeId(41)
-                    .setExternalEndPoint(new TEndPoint("192.0.4.1", 9000)),
-                new TDataNodeLocation()
-                    .setDataNodeId(42)
-                    .setExternalEndPoint(new TEndPoint("192.0.4.2", 9000)))));
-    Map<TTimePartitionSlot, List<TRegionReplicaSet>> d3DataRegionMap = new HashMap<>();
-    d3DataRegionMap.put(new TTimePartitionSlot(), d3DataRegions);
-
-    List<TRegionReplicaSet> d4DataRegions = new ArrayList<>();
-    d4DataRegions.add(
-        new TRegionReplicaSet(
-            new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
-            Arrays.asList(
-                new TDataNodeLocation()
-                    .setDataNodeId(11)
-                    .setExternalEndPoint(new TEndPoint("192.0.1.1", 9000)),
-                new TDataNodeLocation()
-                    .setDataNodeId(12)
-                    .setExternalEndPoint(new TEndPoint("192.0.1.2", 9000)))));
-    d4DataRegions.add(
-        new TRegionReplicaSet(
-            new TConsensusGroupId(TConsensusGroupType.DataRegion, 4),
-            Arrays.asList(
-                new TDataNodeLocation()
-                    .setDataNodeId(41)
-                    .setExternalEndPoint(new TEndPoint("192.0.4.1", 9000)),
-                new TDataNodeLocation()
-                    .setDataNodeId(42)
-                    .setExternalEndPoint(new TEndPoint("192.0.4.2", 9000)))));
-    Map<TTimePartitionSlot, List<TRegionReplicaSet>> d4DataRegionMap = new HashMap<>();
-    d4DataRegionMap.put(new TTimePartitionSlot(), d4DataRegions);
-
-    sgPartitionMap.put(executor.getSeriesPartitionSlot(device1), d1DataRegionMap);
-    sgPartitionMap.put(executor.getSeriesPartitionSlot(device2), d2DataRegionMap);
-    sgPartitionMap.put(executor.getSeriesPartitionSlot(device3), d3DataRegionMap);
-    sgPartitionMap.put(executor.getSeriesPartitionSlot(device4), d4DataRegionMap);
-
-    dataPartitionMap.put("root.sg", sgPartitionMap);
-
-    dataPartition.setDataPartitionMap(dataPartitionMap);
-
-    analysis.setDataPartitionInfo(dataPartition);
-
-    // construct AggregationExpression for GroupByLevel
-    Map<String, Set<Expression>> aggregationExpression = new HashMap<>();
-    Set<Expression> s1Expression = new HashSet<>();
-    s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d1.s1")));
-    s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d22.s1")));
-    s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d333.s1")));
-    s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d4444.s1")));
-
-    Set<Expression> s2Expression = new HashSet<>();
-    s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d1.s2")));
-    s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d22.s2")));
-    s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d333.s2")));
-    s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d4444.s2")));
-
-    aggregationExpression.put("root.sg.*.s1", s1Expression);
-    aggregationExpression.put("root.sg.*.s2", s2Expression);
-    // analysis.setAggregationExpressions(aggregationExpression);
-
-    // construct schema partition
-    SchemaPartition schemaPartition =
-        new SchemaPartition(
-            IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
-            IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
-    Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap = new HashMap<>();
-
-    TRegionReplicaSet schemaRegion1 =
-        new TRegionReplicaSet(
-            new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 11),
-            Arrays.asList(
-                new TDataNodeLocation()
-                    .setDataNodeId(11)
-                    .setExternalEndPoint(new TEndPoint("192.0.1.1", 9000)),
-                new TDataNodeLocation()
-                    .setDataNodeId(12)
-                    .setExternalEndPoint(new TEndPoint("192.0.1.2", 9000))));
-    Map<TSeriesPartitionSlot, TRegionReplicaSet> schemaRegionMap = new HashMap<>();
-
-    TRegionReplicaSet schemaRegion2 =
-        new TRegionReplicaSet(
-            new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 21),
-            Arrays.asList(
-                new TDataNodeLocation()
-                    .setDataNodeId(21)
-                    .setExternalEndPoint(new TEndPoint("192.0.1.1", 9000)),
-                new TDataNodeLocation()
-                    .setDataNodeId(22)
-                    .setExternalEndPoint(new TEndPoint("192.0.1.2", 9000))));
-
-    schemaRegionMap.put(executor.getSeriesPartitionSlot(device1), schemaRegion1);
-    schemaRegionMap.put(executor.getSeriesPartitionSlot(device2), schemaRegion2);
-    schemaRegionMap.put(executor.getSeriesPartitionSlot(device3), schemaRegion2);
-    schemaPartitionMap.put("root.sg", schemaRegionMap);
-    schemaPartition.setSchemaPartitionMap(schemaPartitionMap);
-
-    analysis.setDataPartitionInfo(dataPartition);
-    analysis.setSchemaPartitionInfo(schemaPartition);
-    return analysis;
-  }
-}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
new file mode 100644
index 0000000000..0991684f2d
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.plan.distribution;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.mpp.plan.planner.distribution.DistributionPlanner;
+import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class AggregationDistributionTest {
+  @Test
+  public void testTimeJoinAggregationSinglePerRegion() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_query_time_join_aggregation");
+    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
+    String d1s1Path = "root.sg.d1.s1";
+    timeJoinNode.addChild(genAggregationSourceNode(queryId, d1s1Path, AggregationType.COUNT));
+
+    String d2s1Path = "root.sg.d22.s1";
+    timeJoinNode.addChild(genAggregationSourceNode(queryId, d2s1Path, AggregationType.COUNT));
+
+    Analysis analysis = util.constructAnalysis();
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(3, plan.getInstances().size());
+    Map<String, AggregationStep> expectedStep = new HashMap<>();
+    expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+    expectedStep.put(d2s1Path, AggregationStep.PARTIAL);
+    List<FragmentInstance> fragmentInstances = plan.getInstances();
+    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+  }
+
+  private void verifyAggregationStep(Map<String, AggregationStep> expected, PlanNode root) {
+    if (root == null) {
+      return;
+    }
+    if (root instanceof SeriesAggregationSourceNode) {
+      SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode) root;
+      List<AggregationDescriptor> descriptorList = handle.getAggregationDescriptorList();
+      descriptorList.forEach(
+          d -> {
+            assertEquals(expected.get(handle.getPartitionPath().getFullPath()), d.getStep());
+          });
+    }
+    root.getChildren().forEach(child -> verifyAggregationStep(expected, child));
+  }
+
+  @Test
+  public void testTimeJoinAggregationMultiPerRegion() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_query_time_join_aggregation");
+    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
+    String d1s1Path = "root.sg.d1.s1";
+    timeJoinNode.addChild(genAggregationSourceNode(queryId, d1s1Path, AggregationType.COUNT));
+
+    String d3s1Path = "root.sg.d333.s1";
+    timeJoinNode.addChild(genAggregationSourceNode(queryId, d3s1Path, AggregationType.COUNT));
+
+    Analysis analysis = util.constructAnalysis();
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(3, plan.getInstances().size());
+    Map<String, AggregationStep> expectedStep = new HashMap<>();
+    expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+    expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
+    List<FragmentInstance> fragmentInstances = plan.getInstances();
+    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+  }
+
+  @Test
+  public void testTimeJoinAggregationMultiPerRegion2() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_query_time_join_aggregation");
+    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
+    String d3s1Path = "root.sg.d333.s1";
+    timeJoinNode.addChild(genAggregationSourceNode(queryId, d3s1Path, AggregationType.COUNT));
+
+    String d4s1Path = "root.sg.d4444.s1";
+    timeJoinNode.addChild(genAggregationSourceNode(queryId, d4s1Path, AggregationType.COUNT));
+    Analysis analysis = util.constructAnalysis();
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(2, plan.getInstances().size());
+    Map<String, AggregationStep> expectedStep = new HashMap<>();
+    expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
+    expectedStep.put(d4s1Path, AggregationStep.PARTIAL);
+    List<FragmentInstance> fragmentInstances = plan.getInstances();
+    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+  }
+
+  @Test
+  public void testGroupByLevelWithTwoChildren() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_group_by_level_two_children");
+    String d1s1Path = "root.sg.d1.s1";
+    String d2s1Path = "root.sg.d22.s1";
+    String groupedPath = "root.sg.*.s1";
+
+    GroupByLevelNode groupByLevelNode =
+        new GroupByLevelNode(
+            new PlanNodeId("TestGroupByLevelNode"),
+            Arrays.asList(
+                genAggregationSourceNode(queryId, d1s1Path, AggregationType.COUNT),
+                genAggregationSourceNode(queryId, d2s1Path, AggregationType.COUNT)),
+            Collections.singletonList(
+                new GroupByLevelDescriptor(
+                    AggregationType.COUNT,
+                    AggregationStep.FINAL,
+                    Arrays.asList(
+                        new TimeSeriesOperand(new PartialPath(d1s1Path)),
+                        new TimeSeriesOperand(new PartialPath(d2s1Path))),
+                    new TimeSeriesOperand(new PartialPath(groupedPath)))));
+    Analysis analysis = util.constructAnalysis();
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, groupByLevelNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(3, plan.getInstances().size());
+    Map<String, AggregationStep> expectedStep = new HashMap<>();
+    expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+    expectedStep.put(d2s1Path, AggregationStep.PARTIAL);
+    List<FragmentInstance> fragmentInstances = plan.getInstances();
+    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+  }
+
+  @Test
+  public void testAggregationWithMultiGroupByLevelNode() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_group_by_level_two_children");
+    String d3s1Path = "root.sg.d333.s1";
+    String d4s1Path = "root.sg.d4444.s1";
+    String groupedPath = "root.sg.*.s1";
+
+    GroupByLevelNode groupByLevelNode =
+        new GroupByLevelNode(
+            new PlanNodeId("TestGroupByLevelNode"),
+            Arrays.asList(
+                genAggregationSourceNode(queryId, d3s1Path, AggregationType.COUNT),
+                genAggregationSourceNode(queryId, d4s1Path, AggregationType.COUNT)),
+            Collections.singletonList(
+                new GroupByLevelDescriptor(
+                    AggregationType.COUNT,
+                    AggregationStep.FINAL,
+                    Arrays.asList(
+                        new TimeSeriesOperand(new PartialPath(d3s1Path)),
+                        new TimeSeriesOperand(new PartialPath(d4s1Path))),
+                    new TimeSeriesOperand(new PartialPath(groupedPath)))));
+    Analysis analysis = util.constructAnalysis();
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, groupByLevelNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(2, plan.getInstances().size());
+    Map<String, AggregationStep> expectedStep = new HashMap<>();
+    expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
+    expectedStep.put(d4s1Path, AggregationStep.PARTIAL);
+    List<FragmentInstance> fragmentInstances = plan.getInstances();
+    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+
+    Map<String, List<String>> expectedDescriptorValue = new HashMap<>();
+    expectedDescriptorValue.put(groupedPath, Arrays.asList(groupedPath, d3s1Path, d4s1Path));
+    verifyGroupByLevelDescriptor(
+        expectedDescriptorValue,
+        (GroupByLevelNode) fragmentInstances.get(0).getFragment().getRoot().getChildren().get(0));
+
+    Map<String, List<String>> expectedDescriptorValue2 = new HashMap<>();
+    expectedDescriptorValue2.put(groupedPath, Arrays.asList(d3s1Path, d4s1Path));
+    verifyGroupByLevelDescriptor(
+        expectedDescriptorValue2,
+        (GroupByLevelNode) fragmentInstances.get(1).getFragment().getRoot().getChildren().get(0));
+  }
+
+  @Test
+  public void testGroupByLevelTwoSeries() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_group_by_level_two_series");
+    String d1s1Path = "root.sg.d1.s1";
+    String d1s2Path = "root.sg.d1.s2";
+    String groupedPathS1 = "root.sg.*.s1";
+    String groupedPathS2 = "root.sg.*.s2";
+
+    GroupByLevelNode groupByLevelNode =
+        new GroupByLevelNode(
+            new PlanNodeId("TestGroupByLevelNode"),
+            Arrays.asList(
+                genAggregationSourceNode(queryId, d1s1Path, AggregationType.COUNT),
+                genAggregationSourceNode(queryId, d1s2Path, AggregationType.COUNT)),
+            Arrays.asList(
+                new GroupByLevelDescriptor(
+                    AggregationType.COUNT,
+                    AggregationStep.FINAL,
+                    Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s1Path))),
+                    new TimeSeriesOperand(new PartialPath(groupedPathS1))),
+                new GroupByLevelDescriptor(
+                    AggregationType.COUNT,
+                    AggregationStep.FINAL,
+                    Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s2Path))),
+                    new TimeSeriesOperand(new PartialPath(groupedPathS2)))));
+    Analysis analysis = util.constructAnalysis();
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, groupByLevelNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(2, plan.getInstances().size());
+    Map<String, AggregationStep> expectedStep = new HashMap<>();
+    expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+    expectedStep.put(d1s2Path, AggregationStep.PARTIAL);
+    List<FragmentInstance> fragmentInstances = plan.getInstances();
+    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+
+    Map<String, List<String>> expectedDescriptorValue = new HashMap<>();
+    expectedDescriptorValue.put(groupedPathS1, Arrays.asList(groupedPathS1, d1s1Path));
+    expectedDescriptorValue.put(groupedPathS2, Arrays.asList(groupedPathS2, d1s2Path));
+    verifyGroupByLevelDescriptor(
+        expectedDescriptorValue,
+        (GroupByLevelNode) fragmentInstances.get(0).getFragment().getRoot().getChildren().get(0));
+
+    Map<String, List<String>> expectedDescriptorValue2 = new HashMap<>();
+    expectedDescriptorValue2.put(groupedPathS1, Collections.singletonList(d1s1Path));
+    expectedDescriptorValue2.put(groupedPathS2, Collections.singletonList(d1s2Path));
+    verifyGroupByLevelDescriptor(
+        expectedDescriptorValue2,
+        (GroupByLevelNode) fragmentInstances.get(1).getFragment().getRoot().getChildren().get(0));
+  }
+
+  @Test
+  public void testGroupByLevel2Series2Devices3Regions() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_group_by_level_two_series");
+    String d1s1Path = "root.sg.d1.s1";
+    String d1s2Path = "root.sg.d1.s2";
+    String d2s1Path = "root.sg.d22.s1";
+    String groupedPathS1 = "root.sg.*.s1";
+    String groupedPathS2 = "root.sg.*.s2";
+
+    GroupByLevelNode groupByLevelNode =
+        new GroupByLevelNode(
+            new PlanNodeId("TestGroupByLevelNode"),
+            Arrays.asList(
+                genAggregationSourceNode(queryId, d1s1Path, AggregationType.COUNT),
+                genAggregationSourceNode(queryId, d1s2Path, AggregationType.COUNT),
+                genAggregationSourceNode(queryId, d2s1Path, AggregationType.COUNT)),
+            Arrays.asList(
+                new GroupByLevelDescriptor(
+                    AggregationType.COUNT,
+                    AggregationStep.FINAL,
+                    Arrays.asList(
+                        new TimeSeriesOperand(new PartialPath(d1s1Path)),
+                        new TimeSeriesOperand(new PartialPath(d2s1Path))),
+                    new TimeSeriesOperand(new PartialPath(groupedPathS1))),
+                new GroupByLevelDescriptor(
+                    AggregationType.COUNT,
+                    AggregationStep.FINAL,
+                    Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s2Path))),
+                    new TimeSeriesOperand(new PartialPath(groupedPathS2)))));
+    Analysis analysis = util.constructAnalysis();
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, groupByLevelNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(3, plan.getInstances().size());
+    Map<String, AggregationStep> expectedStep = new HashMap<>();
+    expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+    expectedStep.put(d1s2Path, AggregationStep.PARTIAL);
+    expectedStep.put(d2s1Path, AggregationStep.PARTIAL);
+    List<FragmentInstance> fragmentInstances = plan.getInstances();
+    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+
+    Map<String, List<String>> expectedDescriptorValue = new HashMap<>();
+    expectedDescriptorValue.put(groupedPathS1, Arrays.asList(groupedPathS1, d1s1Path, d2s1Path));
+    expectedDescriptorValue.put(groupedPathS2, Arrays.asList(groupedPathS2, d1s2Path));
+    verifyGroupByLevelDescriptor(
+        expectedDescriptorValue,
+        (GroupByLevelNode) fragmentInstances.get(0).getFragment().getRoot().getChildren().get(0));
+
+    Map<String, List<String>> expectedDescriptorValue2 = new HashMap<>();
+    expectedDescriptorValue2.put(groupedPathS1, Collections.singletonList(d1s1Path));
+    expectedDescriptorValue2.put(groupedPathS2, Collections.singletonList(d1s2Path));
+    verifyGroupByLevelDescriptor(
+        expectedDescriptorValue2,
+        (GroupByLevelNode) fragmentInstances.get(2).getFragment().getRoot().getChildren().get(0));
+  }
+
+  @Test
+  public void testAggregation1Series1Region() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_aggregation_1_series_1_region");
+    String d2s1Path = "root.sg.d22.s1";
+
+    PlanNode root = genAggregationSourceNode(queryId, d2s1Path, AggregationType.COUNT);
+    Analysis analysis = util.constructAnalysis();
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(1, plan.getInstances().size());
+    assertEquals(root, plan.getInstances().get(0).getFragment().getRoot().getChildren().get(0));
+  }
+
+  private void verifyGroupByLevelDescriptor(
+      Map<String, List<String>> expected, GroupByLevelNode node) {
+    List<GroupByLevelDescriptor> descriptors = node.getGroupByLevelDescriptors();
+    assertEquals(expected.size(), descriptors.size());
+    for (GroupByLevelDescriptor descriptor : descriptors) {
+      String outputExpression = descriptor.getOutputExpression().getExpressionString();
+      assertEquals(expected.get(outputExpression).size(), descriptor.getInputExpressions().size());
+      for (Expression inputExpression : descriptor.getInputExpressions()) {
+        assertTrue(expected.get(outputExpression).contains(inputExpression.getExpressionString()));
+      }
+    }
+  }
+
+  private SeriesAggregationSourceNode genAggregationSourceNode(
+      QueryId queryId, String path, AggregationType type) throws IllegalPathException {
+    List<AggregationDescriptor> descriptors = new ArrayList<>();
+    descriptors.add(
+        new AggregationDescriptor(
+            type,
+            AggregationStep.FINAL,
+            Collections.singletonList(new TimeSeriesOperand(new PartialPath(path)))));
+
+    return new SeriesAggregationScanNode(
+        queryId.genPlanNodeId(), new MeasurementPath(path, TSDataType.INT32), descriptors);
+  }
+
+  @Test
+  public void testParallelPlanWithAlignedSeries() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_query_aligned");
+    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
+
+    timeJoinNode.addChild(
+        new AlignedSeriesScanNode(
+            queryId.genPlanNodeId(),
+            new AlignedPath("root.sg.d1", Arrays.asList("s1", "s2")),
+            OrderBy.TIMESTAMP_ASC));
+    timeJoinNode.addChild(
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d333.s1", TSDataType.INT32),
+            OrderBy.TIMESTAMP_ASC));
+
+    LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
+    Analysis analysis = util.constructAnalysis();
+
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(3, plan.getInstances().size());
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/BasicTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/BasicTest.java
new file mode 100644
index 0000000000..8437ec06c3
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/BasicTest.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.plan.distribution;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.planner.distribution.DistributionPlanner;
+import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+public class BasicTest {
+
+  @Test
+  public void testSingleSeriesScan() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_query");
+    SeriesScanNode root =
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
+            OrderBy.TIMESTAMP_ASC);
+
+    Analysis analysis = util.constructAnalysis();
+
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(2, plan.getInstances().size());
+  }
+
+  @Test
+  public void testSingleSeriesScanRewriteSource() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_query");
+    SeriesScanNode root =
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
+            OrderBy.TIMESTAMP_ASC);
+
+    Analysis analysis = util.constructAnalysis();
+
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
+    PlanNode rootAfterRewrite = planner.rewriteSource();
+    assertEquals(2, rootAfterRewrite.getChildren().size());
+  }
+
+  @Test
+  public void testRewriteSourceNode() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_query");
+
+    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
+
+    timeJoinNode.addChild(
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
+            OrderBy.TIMESTAMP_ASC));
+    timeJoinNode.addChild(
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
+            OrderBy.TIMESTAMP_ASC));
+    timeJoinNode.addChild(
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d22.s1", TSDataType.INT32),
+            OrderBy.TIMESTAMP_ASC));
+
+    LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
+
+    Analysis analysis = util.constructAnalysis();
+
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(new MPPQueryContext(queryId), root));
+    PlanNode newRoot = planner.rewriteSource();
+    assertEquals(4, newRoot.getChildren().get(0).getChildren().size());
+  }
+
+  @Test
+  public void testRewriteMetaSourceNode() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_query");
+    SchemaQueryMergeNode metaMergeNode = new SchemaQueryMergeNode(queryId.genPlanNodeId(), false);
+    metaMergeNode.addChild(
+        new TimeSeriesSchemaScanNode(
+            queryId.genPlanNodeId(),
+            new PartialPath("root.sg.d1.s1"),
+            null,
+            null,
+            10,
+            0,
+            false,
+            false,
+            false));
+    metaMergeNode.addChild(
+        new TimeSeriesSchemaScanNode(
+            queryId.genPlanNodeId(),
+            new PartialPath("root.sg.d1.s2"),
+            null,
+            null,
+            10,
+            0,
+            false,
+            false,
+            false));
+    metaMergeNode.addChild(
+        new TimeSeriesSchemaScanNode(
+            queryId.genPlanNodeId(),
+            new PartialPath("root.sg.d22.s1"),
+            null,
+            null,
+            10,
+            0,
+            false,
+            false,
+            false));
+    LimitNode root2 = new LimitNode(queryId.genPlanNodeId(), metaMergeNode, 10);
+    Analysis analysis = util.constructAnalysis();
+    DistributionPlanner planner2 =
+        new DistributionPlanner(
+            analysis, new LogicalQueryPlan(new MPPQueryContext(queryId), root2));
+    PlanNode newRoot2 = planner2.rewriteSource();
+    assertEquals(newRoot2.getChildren().get(0).getChildren().size(), 2);
+  }
+
+  @Test
+  public void testAddExchangeNode() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_query");
+    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
+
+    timeJoinNode.addChild(
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
+            OrderBy.TIMESTAMP_ASC));
+    timeJoinNode.addChild(
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
+            OrderBy.TIMESTAMP_ASC));
+    timeJoinNode.addChild(
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d22.s1", TSDataType.INT32),
+            OrderBy.TIMESTAMP_ASC));
+
+    LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
+
+    Analysis analysis = util.constructAnalysis();
+
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(new MPPQueryContext(queryId), root));
+    PlanNode rootAfterRewrite = planner.rewriteSource();
+    PlanNode rootWithExchange = planner.addExchangeNode(rootAfterRewrite);
+    assertEquals(4, rootWithExchange.getChildren().get(0).getChildren().size());
+    int exchangeNodeCount = 0;
+    for (PlanNode child : rootWithExchange.getChildren().get(0).getChildren()) {
+      exchangeNodeCount += child instanceof ExchangeNode ? 1 : 0;
+    }
+    assertEquals(2, exchangeNodeCount);
+  }
+
+  @Test
+  public void testSplitFragment() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_query");
+    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
+
+    timeJoinNode.addChild(
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
+            OrderBy.TIMESTAMP_ASC));
+    timeJoinNode.addChild(
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
+            OrderBy.TIMESTAMP_ASC));
+    timeJoinNode.addChild(
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d22.s1", TSDataType.INT32),
+            OrderBy.TIMESTAMP_ASC));
+
+    LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
+
+    Analysis analysis = util.constructAnalysis();
+
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
+    PlanNode rootAfterRewrite = planner.rewriteSource();
+    PlanNode rootWithExchange = planner.addExchangeNode(rootAfterRewrite);
+    SubPlan subPlan = planner.splitFragment(rootWithExchange);
+    assertEquals(subPlan.getChildren().size(), 2);
+  }
+
+  @Test
+  public void testParallelPlan() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_query");
+    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
+
+    timeJoinNode.addChild(
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d1.s1", TSDataType.INT32),
+            OrderBy.TIMESTAMP_ASC));
+    timeJoinNode.addChild(
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d1.s2", TSDataType.INT32),
+            OrderBy.TIMESTAMP_ASC));
+    timeJoinNode.addChild(
+        new SeriesScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath("root.sg.d333.s1", TSDataType.INT32),
+            OrderBy.TIMESTAMP_ASC));
+
+    LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
+
+    Analysis analysis = util.constructAnalysis();
+
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(3, plan.getInstances().size());
+  }
+
+  @Test
+  public void testSingleAlignedSeries() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_query_aligned");
+    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
+
+    timeJoinNode.addChild(
+        new AlignedSeriesScanNode(
+            queryId.genPlanNodeId(),
+            new AlignedPath("root.sg.d22", Arrays.asList("s1", "s2")),
+            OrderBy.TIMESTAMP_ASC));
+
+    LimitNode root = new LimitNode(queryId.genPlanNodeId(), timeJoinNode, 10);
+    Analysis analysis = util.constructAnalysis();
+
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, root));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(1, plan.getInstances().size());
+  }
+
+  @Test
+  public void testInsertRowNodeParallelPlan() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_write");
+    InsertRowNode insertRowNode =
+        new InsertRowNode(
+            queryId.genPlanNodeId(),
+            new PartialPath("root.sg.d1"),
+            false,
+            new String[] {
+              "s1",
+            },
+            new TSDataType[] {TSDataType.INT32},
+            1L,
+            new Object[] {10},
+            false);
+    insertRowNode.setMeasurementSchemas(
+        new MeasurementSchema[] {
+          new MeasurementSchema("s1", TSDataType.INT32),
+        });
+
+    Analysis analysis = util.constructAnalysis();
+
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+    context.setQueryType(QueryType.WRITE);
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, insertRowNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(1, plan.getInstances().size());
+  }
+
+  @Test
+  public void testInsertRowsNodeParallelPlan() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_write");
+    InsertRowNode insertRowNode1 =
+        new InsertRowNode(
+            queryId.genPlanNodeId(),
+            new PartialPath("root.sg.d1"),
+            false,
+            new String[] {"s1"},
+            new TSDataType[] {TSDataType.INT32},
+            1L,
+            new Object[] {10},
+            false);
+    insertRowNode1.setMeasurementSchemas(
+        new MeasurementSchema[] {
+          new MeasurementSchema("s1", TSDataType.INT32),
+        });
+
+    InsertRowNode insertRowNode2 =
+        new InsertRowNode(
+            queryId.genPlanNodeId(),
+            new PartialPath("root.sg.d1"),
+            false,
+            new String[] {"s1"},
+            new TSDataType[] {TSDataType.INT32},
+            100000L,
+            new Object[] {10},
+            false);
+    insertRowNode2.setMeasurementSchemas(
+        new MeasurementSchema[] {
+          new MeasurementSchema("s1", TSDataType.INT32),
+        });
+
+    InsertRowsNode node = new InsertRowsNode(queryId.genPlanNodeId());
+    node.setInsertRowNodeList(Arrays.asList(insertRowNode1, insertRowNode2));
+    node.setInsertRowNodeIndexList(Arrays.asList(0, 1));
+
+    Analysis analysis = util.constructAnalysis();
+
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+    context.setQueryType(QueryType.WRITE);
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, node));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(1, plan.getInstances().size());
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/DeleteTimeseriesTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/DeleteTimeseriesTest.java
new file mode 100644
index 0000000000..1656dce37d
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/DeleteTimeseriesTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.plan.distribution;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanner;
+import org.apache.iotdb.db.mpp.plan.planner.distribution.DistributionPlanner;
+import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class DeleteTimeseriesTest {
+
+  @Test
+  public void test1DataRegion1SchemaRegion() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_group_by_level_two_series");
+    String d2s1Path = "root.sg.d22.s1";
+    List<PartialPath> paths = Collections.singletonList(new PartialPath(d2s1Path));
+    Analysis analysis = util.constructAnalysis();
+    LogicalQueryPlan logicalQueryPlan = constructLogicalPlan(queryId, paths, analysis);
+
+    System.out.println(PlanNodeUtil.nodeToString(logicalQueryPlan.getRootNode()));
+
+    DistributionPlanner planner = new DistributionPlanner(analysis, logicalQueryPlan);
+    DistributedQueryPlan distributedQueryPlan = planner.planFragments();
+    distributedQueryPlan.getInstances().forEach(System.out::println);
+    Assert.assertEquals(6, distributedQueryPlan.getInstances().size());
+  }
+
+  private LogicalQueryPlan constructLogicalPlan(
+      QueryId queryId, List<PartialPath> paths, Analysis analysis) throws IllegalPathException {
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+    DeleteTimeSeriesStatement statement = new DeleteTimeSeriesStatement();
+
+    analysis.setStatement(statement);
+    statement.setPartialPaths(paths);
+    LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
+    return planner.plan(analysis);
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/util.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/util.java
new file mode 100644
index 0000000000..76af829fe1
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/util.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.plan.distribution;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.SchemaPartition;
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class util {
+  public static Analysis constructAnalysis() throws IllegalPathException {
+
+    SeriesPartitionExecutor executor =
+        SeriesPartitionExecutor.getSeriesPartitionExecutor(
+            IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+            IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+    Analysis analysis = new Analysis();
+
+    String device1 = "root.sg.d1";
+    String device2 = "root.sg.d22";
+    String device3 = "root.sg.d333";
+    String device4 = "root.sg.d4444";
+
+    TRegionReplicaSet dataRegion1 =
+        new TRegionReplicaSet(
+            new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
+            Arrays.asList(
+                genDataNodeLocation(11, "192.0.1.1"), genDataNodeLocation(12, "192.0.1.2")));
+
+    TRegionReplicaSet dataRegion2 =
+        new TRegionReplicaSet(
+            new TConsensusGroupId(TConsensusGroupType.DataRegion, 2),
+            Arrays.asList(
+                genDataNodeLocation(21, "192.0.2.1"), genDataNodeLocation(22, "192.0.2.2")));
+
+    TRegionReplicaSet dataRegion3 =
+        new TRegionReplicaSet(
+            new TConsensusGroupId(TConsensusGroupType.DataRegion, 3),
+            Arrays.asList(
+                genDataNodeLocation(31, "192.0.3.1"), genDataNodeLocation(32, "192.0.3.2")));
+
+    TRegionReplicaSet dataRegion4 =
+        new TRegionReplicaSet(
+            new TConsensusGroupId(TConsensusGroupType.DataRegion, 4),
+            Arrays.asList(
+                genDataNodeLocation(41, "192.0.4.1"), genDataNodeLocation(42, "192.0.4.2")));
+
+    TRegionReplicaSet dataRegion5 =
+        new TRegionReplicaSet(
+            new TConsensusGroupId(TConsensusGroupType.DataRegion, 5),
+            Arrays.asList(
+                genDataNodeLocation(51, "192.0.5.1"), genDataNodeLocation(52, "192.0.5.2")));
+
+    DataPartition dataPartition =
+        new DataPartition(
+            IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+            IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+
+    Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
+        dataPartitionMap = new HashMap<>();
+    Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> sgPartitionMap =
+        new HashMap<>();
+
+    List<TRegionReplicaSet> d1DataRegions = new ArrayList<>();
+    d1DataRegions.add(dataRegion1);
+    d1DataRegions.add(dataRegion2);
+    Map<TTimePartitionSlot, List<TRegionReplicaSet>> d1DataRegionMap = new HashMap<>();
+    d1DataRegionMap.put(new TTimePartitionSlot(), d1DataRegions);
+
+    List<TRegionReplicaSet> d2DataRegions = new ArrayList<>();
+    d2DataRegions.add(dataRegion3);
+    Map<TTimePartitionSlot, List<TRegionReplicaSet>> d2DataRegionMap = new HashMap<>();
+    d2DataRegionMap.put(new TTimePartitionSlot(), d2DataRegions);
+
+    List<TRegionReplicaSet> d3DataRegions = new ArrayList<>();
+    d3DataRegions.add(dataRegion1);
+    d3DataRegions.add(dataRegion4);
+    Map<TTimePartitionSlot, List<TRegionReplicaSet>> d3DataRegionMap = new HashMap<>();
+    d3DataRegionMap.put(new TTimePartitionSlot(), d3DataRegions);
+
+    List<TRegionReplicaSet> d4DataRegions = new ArrayList<>();
+    d4DataRegions.add(dataRegion1);
+    d4DataRegions.add(dataRegion4);
+    Map<TTimePartitionSlot, List<TRegionReplicaSet>> d4DataRegionMap = new HashMap<>();
+    d4DataRegionMap.put(new TTimePartitionSlot(), d4DataRegions);
+
+    sgPartitionMap.put(executor.getSeriesPartitionSlot(device1), d1DataRegionMap);
+    sgPartitionMap.put(executor.getSeriesPartitionSlot(device2), d2DataRegionMap);
+    sgPartitionMap.put(executor.getSeriesPartitionSlot(device3), d3DataRegionMap);
+    sgPartitionMap.put(executor.getSeriesPartitionSlot(device4), d4DataRegionMap);
+
+    dataPartitionMap.put("root.sg", sgPartitionMap);
+
+    dataPartition.setDataPartitionMap(dataPartitionMap);
+
+    analysis.setDataPartitionInfo(dataPartition);
+
+    // construct AggregationExpression for GroupByLevel
+    Map<String, Set<Expression>> aggregationExpression = new HashMap<>();
+    Set<Expression> s1Expression = new HashSet<>();
+    s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d1.s1")));
+    s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d22.s1")));
+    s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d333.s1")));
+    s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d4444.s1")));
+
+    Set<Expression> s2Expression = new HashSet<>();
+    s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d1.s2")));
+    s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d22.s2")));
+    s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d333.s2")));
+    s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d4444.s2")));
+
+    aggregationExpression.put("root.sg.*.s1", s1Expression);
+    aggregationExpression.put("root.sg.*.s2", s2Expression);
+    // analysis.setAggregationExpressions(aggregationExpression);
+
+    // construct schema partition
+    SchemaPartition schemaPartition =
+        new SchemaPartition(
+            IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+            IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+    Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap = new HashMap<>();
+
+    TRegionReplicaSet schemaRegion1 =
+        new TRegionReplicaSet(
+            new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 11),
+            Arrays.asList(
+                genDataNodeLocation(11, "192.0.1.1"), genDataNodeLocation(12, "192.0.1.2")));
+
+    TRegionReplicaSet schemaRegion2 =
+        new TRegionReplicaSet(
+            new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 21),
+            Arrays.asList(
+                genDataNodeLocation(21, "192.0.2.1"), genDataNodeLocation(22, "192.0.2.2")));
+
+    Map<TSeriesPartitionSlot, TRegionReplicaSet> schemaRegionMap = new HashMap<>();
+    schemaRegionMap.put(executor.getSeriesPartitionSlot(device1), schemaRegion1);
+    schemaRegionMap.put(executor.getSeriesPartitionSlot(device2), schemaRegion2);
+    schemaRegionMap.put(executor.getSeriesPartitionSlot(device3), schemaRegion2);
+    schemaPartitionMap.put("root.sg", schemaRegionMap);
+    schemaPartition.setSchemaPartitionMap(schemaPartitionMap);
+
+    analysis.setDataPartitionInfo(dataPartition);
+    analysis.setSchemaPartitionInfo(schemaPartition);
+    return analysis;
+  }
+
+  private static TDataNodeLocation genDataNodeLocation(int dataNodeId, String ip) {
+    return new TDataNodeLocation()
+        .setDataNodeId(dataNodeId)
+        .setExternalEndPoint(new TEndPoint(ip, 9000))
+        .setDataBlockManagerEndPoint(new TEndPoint(ip, 9001))
+        .setInternalEndPoint(new TEndPoint(ip, 9002));
+  }
+}