You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/11/17 04:24:29 UTC

[iotdb] 02/02: fix the issue that aggregation plan is not corrent with alignByDevice in multi-region

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch fix_agg_align_by_device
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit aa26adb654d731aa5870bfdf06b0514ce07396ec
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu Nov 17 12:24:11 2022 +0800

    fix the issue that aggregation plan is not corrent with alignByDevice in multi-region
---
 .../plan/planner/distribution/SourceRewriter.java  |  30 +-
 .../distribution/AggregationDistributionTest.java  |  48 +++
 .../iotdb/db/mpp/plan/plan/distribution/Util.java  | 475 +++++++++++++++------
 3 files changed, 405 insertions(+), 148 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 338c02a00e..10327086c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -90,7 +90,9 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
 
     // If the logicalPlan is mixed by DeviceView and Aggregation, it should be processed by a
     // special logic.
-    if (isAggregationQuery(node)) {}
+    if (isAggregationQuery(node)) {
+      return processDeviceViewWithAggregation(node, context);
+    }
 
     Set<TRegionReplicaSet> relatedDataRegions = new HashSet<>();
 
@@ -121,12 +123,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
           children.add(split.buildPlanNodeInRegion(regionReplicaSet, context.queryContext));
         }
       }
-      DeviceViewNode regionDeviceViewNode =
-          new DeviceViewNode(
-              context.queryContext.getQueryId().genPlanNodeId(),
-              node.getMergeOrderParameter(),
-              node.getOutputColumnNames(),
-              node.getDeviceToMeasurementIndexesMap());
+      DeviceViewNode regionDeviceViewNode = cloneDeviceViewNodeWithoutChild(node, context);
       for (int i = 0; i < devices.size(); i++) {
         regionDeviceViewNode.addChildDeviceNode(devices.get(i), children.get(i));
       }
@@ -136,6 +133,25 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
     return deviceMergeNode;
   }
 
+  private PlanNode processDeviceViewWithAggregation(
+      DeviceViewNode node, DistributionPlanContext context) {
+    DeviceViewNode newRoot = cloneDeviceViewNodeWithoutChild(node, context);
+    for (int i = 0; i < node.getDevices().size(); i++) {
+      newRoot.addChildDeviceNode(
+          node.getDevices().get(i), rewrite(node.getChildren().get(i), context));
+    }
+    return newRoot;
+  }
+
+  private DeviceViewNode cloneDeviceViewNodeWithoutChild(
+      DeviceViewNode node, DistributionPlanContext context) {
+    return new DeviceViewNode(
+        context.queryContext.getQueryId().genPlanNodeId(),
+        node.getMergeOrderParameter(),
+        node.getOutputColumnNames(),
+        node.getDeviceToMeasurementIndexesMap());
+  }
+
   private static class DeviceViewSplit {
     protected String device;
     protected PlanNode root;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
index 3368d7408f..6a173935f0 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
@@ -635,6 +636,53 @@ public class AggregationDistributionTest {
         root, plan.getInstances().get(0).getFragment().getPlanNodeTree().getChildren().get(0));
   }
 
+  @Test
+  public void testAlignByDevice1Device2Region() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_align_by_device_1_device_2_region");
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+    String sql = "select count(s1), count(s2) from root.sg.d1 align by device";
+    Analysis analysis = Util.ANALYSIS;
+    PlanNode logicalPlanNode = Util.genLogicalPlan(sql, context);
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, logicalPlanNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(2, plan.getInstances().size());
+    PlanNode f1Root =
+        plan.getInstances().get(0).getFragment().getPlanNodeTree().getChildren().get(0);
+    PlanNode f2Root =
+        plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0);
+    assertTrue(f1Root instanceof DeviceViewNode);
+    assertTrue(f2Root instanceof TimeJoinNode);
+    assertTrue(f1Root.getChildren().get(0) instanceof AggregationNode);
+    assertEquals(3, f1Root.getChildren().get(0).getChildren().size());
+  }
+
+  @Test
+  public void testAlignByDevice2Device2Region() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_align_by_device_2_device_2_region");
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+    String sql = "select count(s1), count(s2) from root.sg.d1,root.sg.d22 align by device";
+    Analysis analysis = Util.ANALYSIS;
+    PlanNode logicalPlanNode = Util.genLogicalPlan(sql, context);
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, logicalPlanNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(3, plan.getInstances().size());
+    PlanNode f1Root =
+        plan.getInstances().get(0).getFragment().getPlanNodeTree().getChildren().get(0);
+    PlanNode f2Root =
+        plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0);
+    PlanNode f3Root =
+        plan.getInstances().get(2).getFragment().getPlanNodeTree().getChildren().get(0);
+    assertTrue(f1Root instanceof DeviceViewNode);
+    assertTrue(f2Root instanceof TimeJoinNode);
+    assertTrue(f3Root instanceof AggregationNode);
+    assertTrue(f1Root.getChildren().get(0) instanceof AggregationNode);
+    assertEquals(3, f1Root.getChildren().get(0).getChildren().size());
+  }
+
   private void verifyGroupByLevelDescriptor(
       Map<String, List<String>> expected, GroupByLevelNode node) {
     List<CrossSeriesAggregationDescriptor> descriptors = node.getGroupByLevelDescriptors();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
index d433f9aee0..36d564f882 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
@@ -28,164 +28,357 @@ import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
 import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.mpp.common.schematree.node.SchemaEntityNode;
+import org.apache.iotdb.db.mpp.common.schematree.node.SchemaInternalNode;
+import org.apache.iotdb.db.mpp.common.schematree.node.SchemaMeasurementNode;
+import org.apache.iotdb.db.mpp.common.schematree.node.SchemaNode;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
+import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanner;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
+import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 
 public class Util {
-  public static Analysis constructAnalysis() throws IllegalPathException {
-
-    SeriesPartitionExecutor executor =
-        SeriesPartitionExecutor.getSeriesPartitionExecutor(
-            IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
-            IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
-    Analysis analysis = new Analysis();
-
-    String device1 = "root.sg.d1";
-    String device2 = "root.sg.d22";
-    String device3 = "root.sg.d333";
-    String device4 = "root.sg.d4444";
-    String device5 = "root.sg.d55555";
-
-    TRegionReplicaSet dataRegion1 =
-        new TRegionReplicaSet(
-            new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
-            Arrays.asList(
-                genDataNodeLocation(11, "192.0.1.1"), genDataNodeLocation(12, "192.0.1.2")));
-
-    TRegionReplicaSet dataRegion2 =
-        new TRegionReplicaSet(
-            new TConsensusGroupId(TConsensusGroupType.DataRegion, 2),
-            Arrays.asList(
-                genDataNodeLocation(21, "192.0.2.1"), genDataNodeLocation(22, "192.0.2.2")));
-
-    TRegionReplicaSet dataRegion3 =
-        new TRegionReplicaSet(
-            new TConsensusGroupId(TConsensusGroupType.DataRegion, 3),
-            Arrays.asList(
-                genDataNodeLocation(31, "192.0.3.1"), genDataNodeLocation(32, "192.0.3.2")));
-
-    TRegionReplicaSet dataRegion4 =
-        new TRegionReplicaSet(
-            new TConsensusGroupId(TConsensusGroupType.DataRegion, 4),
-            Arrays.asList(
-                genDataNodeLocation(41, "192.0.4.1"), genDataNodeLocation(42, "192.0.4.2")));
-
-    TRegionReplicaSet dataRegion5 =
-        new TRegionReplicaSet(
-            new TConsensusGroupId(TConsensusGroupType.DataRegion, 5),
-            Arrays.asList(
-                genDataNodeLocation(51, "192.0.5.1"), genDataNodeLocation(52, "192.0.5.2")));
-
-    DataPartition dataPartition =
-        new DataPartition(
-            IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
-            IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
-
-    Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
-        dataPartitionMap = new HashMap<>();
-    Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> sgPartitionMap =
-        new HashMap<>();
-
-    List<TRegionReplicaSet> d1DataRegions = new ArrayList<>();
-    d1DataRegions.add(dataRegion1);
-    d1DataRegions.add(dataRegion2);
-    Map<TTimePartitionSlot, List<TRegionReplicaSet>> d1DataRegionMap = new HashMap<>();
-    d1DataRegionMap.put(new TTimePartitionSlot(), d1DataRegions);
-
-    List<TRegionReplicaSet> d2DataRegions = new ArrayList<>();
-    d2DataRegions.add(dataRegion3);
-    Map<TTimePartitionSlot, List<TRegionReplicaSet>> d2DataRegionMap = new HashMap<>();
-    d2DataRegionMap.put(new TTimePartitionSlot(), d2DataRegions);
-
-    List<TRegionReplicaSet> d3DataRegions = new ArrayList<>();
-    d3DataRegions.add(dataRegion1);
-    d3DataRegions.add(dataRegion4);
-    Map<TTimePartitionSlot, List<TRegionReplicaSet>> d3DataRegionMap = new HashMap<>();
-    d3DataRegionMap.put(new TTimePartitionSlot(), d3DataRegions);
-
-    List<TRegionReplicaSet> d4DataRegions = new ArrayList<>();
-    d4DataRegions.add(dataRegion1);
-    d4DataRegions.add(dataRegion4);
-    Map<TTimePartitionSlot, List<TRegionReplicaSet>> d4DataRegionMap = new HashMap<>();
-    d4DataRegionMap.put(new TTimePartitionSlot(), d4DataRegions);
-
-    List<TRegionReplicaSet> d5DataRegions = new ArrayList<>();
-    d5DataRegions.add(dataRegion4);
-    Map<TTimePartitionSlot, List<TRegionReplicaSet>> d5DataRegionMap = new HashMap<>();
-    d5DataRegionMap.put(new TTimePartitionSlot(), d5DataRegions);
-
-    sgPartitionMap.put(executor.getSeriesPartitionSlot(device1), d1DataRegionMap);
-    sgPartitionMap.put(executor.getSeriesPartitionSlot(device2), d2DataRegionMap);
-    sgPartitionMap.put(executor.getSeriesPartitionSlot(device3), d3DataRegionMap);
-    sgPartitionMap.put(executor.getSeriesPartitionSlot(device4), d4DataRegionMap);
-    sgPartitionMap.put(executor.getSeriesPartitionSlot(device5), d5DataRegionMap);
-
-    dataPartitionMap.put("root.sg", sgPartitionMap);
-
-    dataPartition.setDataPartitionMap(dataPartitionMap);
-
-    analysis.setDataPartitionInfo(dataPartition);
-
-    // construct AggregationExpression for GroupByLevel
-    Map<String, Set<Expression>> aggregationExpression = new HashMap<>();
-    Set<Expression> s1Expression = new HashSet<>();
-    s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d1.s1")));
-    s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d22.s1")));
-    s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d333.s1")));
-    s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d4444.s1")));
-
-    Set<Expression> s2Expression = new HashSet<>();
-    s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d1.s2")));
-    s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d22.s2")));
-    s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d333.s2")));
-    s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d4444.s2")));
-
-    aggregationExpression.put("root.sg.*.s1", s1Expression);
-    aggregationExpression.put("root.sg.*.s2", s2Expression);
-    // analysis.setAggregationExpressions(aggregationExpression);
-
-    // construct schema partition
-    SchemaPartition schemaPartition =
-        new SchemaPartition(
-            IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
-            IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
-    Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap = new HashMap<>();
-
-    TRegionReplicaSet schemaRegion1 =
-        new TRegionReplicaSet(
-            new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 11),
-            Arrays.asList(
-                genDataNodeLocation(11, "192.0.1.1"), genDataNodeLocation(12, "192.0.1.2")));
-
-    TRegionReplicaSet schemaRegion2 =
-        new TRegionReplicaSet(
-            new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 21),
-            Arrays.asList(
-                genDataNodeLocation(21, "192.0.2.1"), genDataNodeLocation(22, "192.0.2.2")));
-
-    Map<TSeriesPartitionSlot, TRegionReplicaSet> schemaRegionMap = new HashMap<>();
-    schemaRegionMap.put(executor.getSeriesPartitionSlot(device1), schemaRegion1);
-    schemaRegionMap.put(executor.getSeriesPartitionSlot(device2), schemaRegion2);
-    schemaRegionMap.put(executor.getSeriesPartitionSlot(device3), schemaRegion2);
-    schemaPartitionMap.put("root.sg", schemaRegionMap);
-    schemaPartition.setSchemaPartitionMap(schemaPartitionMap);
-
-    analysis.setDataPartitionInfo(dataPartition);
-    analysis.setSchemaPartitionInfo(schemaPartition);
-    return analysis;
+  public static final Analysis ANALYSIS = constructAnalysis();
+
+  public static Analysis constructAnalysis() {
+    try {
+      SeriesPartitionExecutor executor =
+          SeriesPartitionExecutor.getSeriesPartitionExecutor(
+              IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+              IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+      Analysis analysis = new Analysis();
+
+      String device1 = "root.sg.d1";
+      String device2 = "root.sg.d22";
+      String device3 = "root.sg.d333";
+      String device4 = "root.sg.d4444";
+      String device5 = "root.sg.d55555";
+
+      TRegionReplicaSet dataRegion1 =
+          new TRegionReplicaSet(
+              new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
+              Arrays.asList(
+                  genDataNodeLocation(11, "192.0.1.1"), genDataNodeLocation(12, "192.0.1.2")));
+
+      TRegionReplicaSet dataRegion2 =
+          new TRegionReplicaSet(
+              new TConsensusGroupId(TConsensusGroupType.DataRegion, 2),
+              Arrays.asList(
+                  genDataNodeLocation(21, "192.0.2.1"), genDataNodeLocation(22, "192.0.2.2")));
+
+      TRegionReplicaSet dataRegion3 =
+          new TRegionReplicaSet(
+              new TConsensusGroupId(TConsensusGroupType.DataRegion, 3),
+              Arrays.asList(
+                  genDataNodeLocation(31, "192.0.3.1"), genDataNodeLocation(32, "192.0.3.2")));
+
+      TRegionReplicaSet dataRegion4 =
+          new TRegionReplicaSet(
+              new TConsensusGroupId(TConsensusGroupType.DataRegion, 4),
+              Arrays.asList(
+                  genDataNodeLocation(41, "192.0.4.1"), genDataNodeLocation(42, "192.0.4.2")));
+
+      TRegionReplicaSet dataRegion5 =
+          new TRegionReplicaSet(
+              new TConsensusGroupId(TConsensusGroupType.DataRegion, 5),
+              Arrays.asList(
+                  genDataNodeLocation(51, "192.0.5.1"), genDataNodeLocation(52, "192.0.5.2")));
+
+      DataPartition dataPartition =
+          new DataPartition(
+              IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+              IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+
+      Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
+          dataPartitionMap = new HashMap<>();
+      Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> sgPartitionMap =
+          new HashMap<>();
+
+      List<TRegionReplicaSet> d1DataRegions = new ArrayList<>();
+      d1DataRegions.add(dataRegion1);
+      d1DataRegions.add(dataRegion2);
+      Map<TTimePartitionSlot, List<TRegionReplicaSet>> d1DataRegionMap = new HashMap<>();
+      d1DataRegionMap.put(new TTimePartitionSlot(), d1DataRegions);
+
+      List<TRegionReplicaSet> d2DataRegions = new ArrayList<>();
+      d2DataRegions.add(dataRegion3);
+      Map<TTimePartitionSlot, List<TRegionReplicaSet>> d2DataRegionMap = new HashMap<>();
+      d2DataRegionMap.put(new TTimePartitionSlot(), d2DataRegions);
+
+      List<TRegionReplicaSet> d3DataRegions = new ArrayList<>();
+      d3DataRegions.add(dataRegion1);
+      d3DataRegions.add(dataRegion4);
+      Map<TTimePartitionSlot, List<TRegionReplicaSet>> d3DataRegionMap = new HashMap<>();
+      d3DataRegionMap.put(new TTimePartitionSlot(), d3DataRegions);
+
+      List<TRegionReplicaSet> d4DataRegions = new ArrayList<>();
+      d4DataRegions.add(dataRegion1);
+      d4DataRegions.add(dataRegion4);
+      Map<TTimePartitionSlot, List<TRegionReplicaSet>> d4DataRegionMap = new HashMap<>();
+      d4DataRegionMap.put(new TTimePartitionSlot(), d4DataRegions);
+
+      List<TRegionReplicaSet> d5DataRegions = new ArrayList<>();
+      d5DataRegions.add(dataRegion4);
+      Map<TTimePartitionSlot, List<TRegionReplicaSet>> d5DataRegionMap = new HashMap<>();
+      d5DataRegionMap.put(new TTimePartitionSlot(), d5DataRegions);
+
+      sgPartitionMap.put(executor.getSeriesPartitionSlot(device1), d1DataRegionMap);
+      sgPartitionMap.put(executor.getSeriesPartitionSlot(device2), d2DataRegionMap);
+      sgPartitionMap.put(executor.getSeriesPartitionSlot(device3), d3DataRegionMap);
+      sgPartitionMap.put(executor.getSeriesPartitionSlot(device4), d4DataRegionMap);
+      sgPartitionMap.put(executor.getSeriesPartitionSlot(device5), d5DataRegionMap);
+
+      dataPartitionMap.put("root.sg", sgPartitionMap);
+
+      dataPartition.setDataPartitionMap(dataPartitionMap);
+
+      analysis.setDataPartitionInfo(dataPartition);
+
+      // construct AggregationExpression for GroupByLevel
+      Map<String, Set<Expression>> aggregationExpression = new HashMap<>();
+      Set<Expression> s1Expression = new HashSet<>();
+      s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d1.s1")));
+      s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d22.s1")));
+      s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d333.s1")));
+      s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d4444.s1")));
+
+      Set<Expression> s2Expression = new HashSet<>();
+      s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d1.s2")));
+      s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d22.s2")));
+      s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d333.s2")));
+      s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d4444.s2")));
+
+      aggregationExpression.put("root.sg.*.s1", s1Expression);
+      aggregationExpression.put("root.sg.*.s2", s2Expression);
+      // analysis.setAggregationExpressions(aggregationExpression);
+
+      // construct schema partition
+      SchemaPartition schemaPartition =
+          new SchemaPartition(
+              IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+              IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+      Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap =
+          new HashMap<>();
+
+      TRegionReplicaSet schemaRegion1 =
+          new TRegionReplicaSet(
+              new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 11),
+              Arrays.asList(
+                  genDataNodeLocation(11, "192.0.1.1"), genDataNodeLocation(12, "192.0.1.2")));
+
+      TRegionReplicaSet schemaRegion2 =
+          new TRegionReplicaSet(
+              new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 21),
+              Arrays.asList(
+                  genDataNodeLocation(21, "192.0.2.1"), genDataNodeLocation(22, "192.0.2.2")));
+
+      Map<TSeriesPartitionSlot, TRegionReplicaSet> schemaRegionMap = new HashMap<>();
+      schemaRegionMap.put(executor.getSeriesPartitionSlot(device1), schemaRegion1);
+      schemaRegionMap.put(executor.getSeriesPartitionSlot(device2), schemaRegion2);
+      schemaRegionMap.put(executor.getSeriesPartitionSlot(device3), schemaRegion2);
+      schemaPartitionMap.put("root.sg", schemaRegionMap);
+      schemaPartition.setSchemaPartitionMap(schemaPartitionMap);
+
+      analysis.setDataPartitionInfo(dataPartition);
+      analysis.setSchemaPartitionInfo(schemaPartition);
+      analysis.setSchemaTree(genSchemaTree());
+      return analysis;
+    } catch (IllegalPathException e) {
+      return new Analysis();
+    }
+  }
+
+  private static ISchemaTree genSchemaTree() {
+    SchemaNode root = new SchemaInternalNode("root");
+
+    SchemaNode sg = new SchemaInternalNode("sg");
+    root.addChild("sg", sg);
+
+    SchemaMeasurementNode s1 =
+        new SchemaMeasurementNode("s1", new MeasurementSchema("s1", TSDataType.INT32));
+    s1.setTagMap(Collections.singletonMap("key1", "value1"));
+    SchemaMeasurementNode s2 =
+        new SchemaMeasurementNode("s2", new MeasurementSchema("s2", TSDataType.DOUBLE));
+    s2.setTagMap(Collections.singletonMap("key1", "value1"));
+
+    SchemaEntityNode d1 = new SchemaEntityNode("d1");
+    sg.addChild("d1", d1);
+    d1.addChild("s1", s1);
+    d1.addChild("s2", s2);
+
+    SchemaEntityNode d2 = new SchemaEntityNode("d22");
+    sg.addChild("d22", d2);
+    d2.addChild("s1", s1);
+    d2.addChild("s2", s2);
+
+    SchemaEntityNode d3 = new SchemaEntityNode("d333");
+    sg.addChild("d333", d2);
+    d3.addChild("s1", s1);
+    d3.addChild("s2", s2);
+
+    SchemaEntityNode d4 = new SchemaEntityNode("d4444");
+    sg.addChild("d4444", d2);
+    d4.addChild("s1", s1);
+    d4.addChild("s2", s2);
+
+    SchemaEntityNode d5 = new SchemaEntityNode("d55555");
+    sg.addChild("d55555", d2);
+    d5.addChild("s1", s1);
+    d5.addChild("s2", s2);
+
+    ClusterSchemaTree tree = new ClusterSchemaTree(root);
+    tree.setStorageGroups(Collections.singletonList("root.sg"));
+
+    return tree;
+  }
+
+  public static PlanNode genLogicalPlan(String sql, MPPQueryContext context) {
+    Statement statement = StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset());
+    Analyzer analyzer = new Analyzer(context, getFakePartitionFetcher(), getFakeSchemaFetcher());
+    Analysis analysis = analyzer.analyze(statement);
+    LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
+    return planner.plan(analysis).getRootNode();
+  }
+
+  private static ISchemaFetcher getFakeSchemaFetcher() {
+    return new ISchemaFetcher() {
+      @Override
+      public ISchemaTree fetchSchema(PathPatternTree patternTree) {
+        return ANALYSIS.getSchemaTree();
+      }
+
+      @Override
+      public ISchemaTree fetchSchemaWithTags(PathPatternTree patternTree) {
+        return ANALYSIS.getSchemaTree();
+      }
+
+      @Override
+      public ISchemaTree fetchSchemaWithAutoCreate(
+          PartialPath devicePath,
+          String[] measurements,
+          Function<Integer, TSDataType> getDataType,
+          boolean aligned) {
+        return ANALYSIS.getSchemaTree();
+      }
+
+      @Override
+      public ISchemaTree fetchSchemaListWithAutoCreate(
+          List<PartialPath> devicePath,
+          List<String[]> measurements,
+          List<TSDataType[]> tsDataTypes,
+          List<Boolean> aligned) {
+        return ANALYSIS.getSchemaTree();
+      }
+
+      @Override
+      public ISchemaTree fetchSchemaListWithAutoCreate(
+          List<PartialPath> devicePath,
+          List<String[]> measurements,
+          List<TSDataType[]> tsDataTypes,
+          List<TSEncoding[]> encodings,
+          List<CompressionType[]> compressionTypes,
+          List<Boolean> aligned) {
+        return ANALYSIS.getSchemaTree();
+      }
+
+      @Override
+      public Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath path) {
+        return null;
+      }
+
+      @Override
+      public Map<Integer, Template> checkAllRelatedTemplate(PartialPath pathPattern) {
+        return null;
+      }
+
+      @Override
+      public Pair<Template, List<PartialPath>> getAllPathsSetTemplate(String templateName) {
+        return null;
+      }
+
+      @Override
+      public void invalidAllCache() {}
+    };
+  }
+
+  private static IPartitionFetcher getFakePartitionFetcher() {
+    return new IPartitionFetcher() {
+      @Override
+      public SchemaPartition getSchemaPartition(PathPatternTree patternTree) {
+        return ANALYSIS.getSchemaPartitionInfo();
+      }
+
+      @Override
+      public SchemaPartition getOrCreateSchemaPartition(PathPatternTree patternTree) {
+        return ANALYSIS.getSchemaPartitionInfo();
+      }
+
+      @Override
+      public DataPartition getDataPartition(
+          Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+        return ANALYSIS.getDataPartitionInfo();
+      }
+
+      @Override
+      public DataPartition getOrCreateDataPartition(
+          Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+        return ANALYSIS.getDataPartitionInfo();
+      }
+
+      @Override
+      public DataPartition getOrCreateDataPartition(
+          List<DataPartitionQueryParam> dataPartitionQueryParams) {
+        return ANALYSIS.getDataPartitionInfo();
+      }
+
+      @Override
+      public SchemaNodeManagementPartition getSchemaNodeManagementPartitionWithLevel(
+          PathPatternTree patternTree, Integer level) {
+        return null;
+      }
+
+      @Override
+      public boolean updateRegionCache(TRegionRouteReq req) {
+        return false;
+      }
+
+      @Override
+      public void invalidAllCache() {}
+    };
   }
 
   private static TDataNodeLocation genDataNodeLocation(int dataNodeId, String ip) {