You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/11/17 04:24:29 UTC
[iotdb] 02/02: fix the issue that aggregation plan is not corrent with alignByDevice in multi-region
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch fix_agg_align_by_device
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit aa26adb654d731aa5870bfdf06b0514ce07396ec
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu Nov 17 12:24:11 2022 +0800
fix the issue that aggregation plan is not corrent with alignByDevice in multi-region
---
.../plan/planner/distribution/SourceRewriter.java | 30 +-
.../distribution/AggregationDistributionTest.java | 48 +++
.../iotdb/db/mpp/plan/plan/distribution/Util.java | 475 +++++++++++++++------
3 files changed, 405 insertions(+), 148 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 338c02a00e..10327086c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -90,7 +90,9 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
// If the logicalPlan is mixed by DeviceView and Aggregation, it should be processed by a
// special logic.
- if (isAggregationQuery(node)) {}
+ if (isAggregationQuery(node)) {
+ return processDeviceViewWithAggregation(node, context);
+ }
Set<TRegionReplicaSet> relatedDataRegions = new HashSet<>();
@@ -121,12 +123,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
children.add(split.buildPlanNodeInRegion(regionReplicaSet, context.queryContext));
}
}
- DeviceViewNode regionDeviceViewNode =
- new DeviceViewNode(
- context.queryContext.getQueryId().genPlanNodeId(),
- node.getMergeOrderParameter(),
- node.getOutputColumnNames(),
- node.getDeviceToMeasurementIndexesMap());
+ DeviceViewNode regionDeviceViewNode = cloneDeviceViewNodeWithoutChild(node, context);
for (int i = 0; i < devices.size(); i++) {
regionDeviceViewNode.addChildDeviceNode(devices.get(i), children.get(i));
}
@@ -136,6 +133,25 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
return deviceMergeNode;
}
+ private PlanNode processDeviceViewWithAggregation(
+ DeviceViewNode node, DistributionPlanContext context) {
+ DeviceViewNode newRoot = cloneDeviceViewNodeWithoutChild(node, context);
+ for (int i = 0; i < node.getDevices().size(); i++) {
+ newRoot.addChildDeviceNode(
+ node.getDevices().get(i), rewrite(node.getChildren().get(i), context));
+ }
+ return newRoot;
+ }
+
+ private DeviceViewNode cloneDeviceViewNodeWithoutChild(
+ DeviceViewNode node, DistributionPlanContext context) {
+ return new DeviceViewNode(
+ context.queryContext.getQueryId().genPlanNodeId(),
+ node.getMergeOrderParameter(),
+ node.getOutputColumnNames(),
+ node.getDeviceToMeasurementIndexesMap());
+ }
+
private static class DeviceViewSplit {
protected String device;
protected PlanNode root;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
index 3368d7408f..6a173935f0 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
@@ -635,6 +636,53 @@ public class AggregationDistributionTest {
root, plan.getInstances().get(0).getFragment().getPlanNodeTree().getChildren().get(0));
}
+ @Test
+ public void testAlignByDevice1Device2Region() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_align_by_device_1_device_2_region");
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ String sql = "select count(s1), count(s2) from root.sg.d1 align by device";
+ Analysis analysis = Util.ANALYSIS;
+ PlanNode logicalPlanNode = Util.genLogicalPlan(sql, context);
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, logicalPlanNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(2, plan.getInstances().size());
+ PlanNode f1Root =
+ plan.getInstances().get(0).getFragment().getPlanNodeTree().getChildren().get(0);
+ PlanNode f2Root =
+ plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0);
+ assertTrue(f1Root instanceof DeviceViewNode);
+ assertTrue(f2Root instanceof TimeJoinNode);
+ assertTrue(f1Root.getChildren().get(0) instanceof AggregationNode);
+ assertEquals(3, f1Root.getChildren().get(0).getChildren().size());
+ }
+
+ @Test
+ public void testAlignByDevice2Device2Region() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_align_by_device_2_device_2_region");
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ String sql = "select count(s1), count(s2) from root.sg.d1,root.sg.d22 align by device";
+ Analysis analysis = Util.ANALYSIS;
+ PlanNode logicalPlanNode = Util.genLogicalPlan(sql, context);
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, logicalPlanNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(3, plan.getInstances().size());
+ PlanNode f1Root =
+ plan.getInstances().get(0).getFragment().getPlanNodeTree().getChildren().get(0);
+ PlanNode f2Root =
+ plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0);
+ PlanNode f3Root =
+ plan.getInstances().get(2).getFragment().getPlanNodeTree().getChildren().get(0);
+ assertTrue(f1Root instanceof DeviceViewNode);
+ assertTrue(f2Root instanceof TimeJoinNode);
+ assertTrue(f3Root instanceof AggregationNode);
+ assertTrue(f1Root.getChildren().get(0) instanceof AggregationNode);
+ assertEquals(3, f1Root.getChildren().get(0).getChildren().size());
+ }
+
private void verifyGroupByLevelDescriptor(
Map<String, List<String>> expected, GroupByLevelNode node) {
List<CrossSeriesAggregationDescriptor> descriptors = node.getGroupByLevelDescriptors();
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
index d433f9aee0..36d564f882 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/Util.java
@@ -28,164 +28,357 @@ import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.mpp.common.schematree.node.SchemaEntityNode;
+import org.apache.iotdb.db.mpp.common.schematree.node.SchemaInternalNode;
+import org.apache.iotdb.db.mpp.common.schematree.node.SchemaMeasurementNode;
+import org.apache.iotdb.db.mpp.common.schematree.node.SchemaNode;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
+import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
+import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
+import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanner;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
public class Util {
- public static Analysis constructAnalysis() throws IllegalPathException {
-
- SeriesPartitionExecutor executor =
- SeriesPartitionExecutor.getSeriesPartitionExecutor(
- IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
- IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
- Analysis analysis = new Analysis();
-
- String device1 = "root.sg.d1";
- String device2 = "root.sg.d22";
- String device3 = "root.sg.d333";
- String device4 = "root.sg.d4444";
- String device5 = "root.sg.d55555";
-
- TRegionReplicaSet dataRegion1 =
- new TRegionReplicaSet(
- new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
- Arrays.asList(
- genDataNodeLocation(11, "192.0.1.1"), genDataNodeLocation(12, "192.0.1.2")));
-
- TRegionReplicaSet dataRegion2 =
- new TRegionReplicaSet(
- new TConsensusGroupId(TConsensusGroupType.DataRegion, 2),
- Arrays.asList(
- genDataNodeLocation(21, "192.0.2.1"), genDataNodeLocation(22, "192.0.2.2")));
-
- TRegionReplicaSet dataRegion3 =
- new TRegionReplicaSet(
- new TConsensusGroupId(TConsensusGroupType.DataRegion, 3),
- Arrays.asList(
- genDataNodeLocation(31, "192.0.3.1"), genDataNodeLocation(32, "192.0.3.2")));
-
- TRegionReplicaSet dataRegion4 =
- new TRegionReplicaSet(
- new TConsensusGroupId(TConsensusGroupType.DataRegion, 4),
- Arrays.asList(
- genDataNodeLocation(41, "192.0.4.1"), genDataNodeLocation(42, "192.0.4.2")));
-
- TRegionReplicaSet dataRegion5 =
- new TRegionReplicaSet(
- new TConsensusGroupId(TConsensusGroupType.DataRegion, 5),
- Arrays.asList(
- genDataNodeLocation(51, "192.0.5.1"), genDataNodeLocation(52, "192.0.5.2")));
-
- DataPartition dataPartition =
- new DataPartition(
- IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
- IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
-
- Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
- dataPartitionMap = new HashMap<>();
- Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> sgPartitionMap =
- new HashMap<>();
-
- List<TRegionReplicaSet> d1DataRegions = new ArrayList<>();
- d1DataRegions.add(dataRegion1);
- d1DataRegions.add(dataRegion2);
- Map<TTimePartitionSlot, List<TRegionReplicaSet>> d1DataRegionMap = new HashMap<>();
- d1DataRegionMap.put(new TTimePartitionSlot(), d1DataRegions);
-
- List<TRegionReplicaSet> d2DataRegions = new ArrayList<>();
- d2DataRegions.add(dataRegion3);
- Map<TTimePartitionSlot, List<TRegionReplicaSet>> d2DataRegionMap = new HashMap<>();
- d2DataRegionMap.put(new TTimePartitionSlot(), d2DataRegions);
-
- List<TRegionReplicaSet> d3DataRegions = new ArrayList<>();
- d3DataRegions.add(dataRegion1);
- d3DataRegions.add(dataRegion4);
- Map<TTimePartitionSlot, List<TRegionReplicaSet>> d3DataRegionMap = new HashMap<>();
- d3DataRegionMap.put(new TTimePartitionSlot(), d3DataRegions);
-
- List<TRegionReplicaSet> d4DataRegions = new ArrayList<>();
- d4DataRegions.add(dataRegion1);
- d4DataRegions.add(dataRegion4);
- Map<TTimePartitionSlot, List<TRegionReplicaSet>> d4DataRegionMap = new HashMap<>();
- d4DataRegionMap.put(new TTimePartitionSlot(), d4DataRegions);
-
- List<TRegionReplicaSet> d5DataRegions = new ArrayList<>();
- d5DataRegions.add(dataRegion4);
- Map<TTimePartitionSlot, List<TRegionReplicaSet>> d5DataRegionMap = new HashMap<>();
- d5DataRegionMap.put(new TTimePartitionSlot(), d5DataRegions);
-
- sgPartitionMap.put(executor.getSeriesPartitionSlot(device1), d1DataRegionMap);
- sgPartitionMap.put(executor.getSeriesPartitionSlot(device2), d2DataRegionMap);
- sgPartitionMap.put(executor.getSeriesPartitionSlot(device3), d3DataRegionMap);
- sgPartitionMap.put(executor.getSeriesPartitionSlot(device4), d4DataRegionMap);
- sgPartitionMap.put(executor.getSeriesPartitionSlot(device5), d5DataRegionMap);
-
- dataPartitionMap.put("root.sg", sgPartitionMap);
-
- dataPartition.setDataPartitionMap(dataPartitionMap);
-
- analysis.setDataPartitionInfo(dataPartition);
-
- // construct AggregationExpression for GroupByLevel
- Map<String, Set<Expression>> aggregationExpression = new HashMap<>();
- Set<Expression> s1Expression = new HashSet<>();
- s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d1.s1")));
- s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d22.s1")));
- s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d333.s1")));
- s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d4444.s1")));
-
- Set<Expression> s2Expression = new HashSet<>();
- s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d1.s2")));
- s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d22.s2")));
- s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d333.s2")));
- s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d4444.s2")));
-
- aggregationExpression.put("root.sg.*.s1", s1Expression);
- aggregationExpression.put("root.sg.*.s2", s2Expression);
- // analysis.setAggregationExpressions(aggregationExpression);
-
- // construct schema partition
- SchemaPartition schemaPartition =
- new SchemaPartition(
- IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
- IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
- Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap = new HashMap<>();
-
- TRegionReplicaSet schemaRegion1 =
- new TRegionReplicaSet(
- new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 11),
- Arrays.asList(
- genDataNodeLocation(11, "192.0.1.1"), genDataNodeLocation(12, "192.0.1.2")));
-
- TRegionReplicaSet schemaRegion2 =
- new TRegionReplicaSet(
- new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 21),
- Arrays.asList(
- genDataNodeLocation(21, "192.0.2.1"), genDataNodeLocation(22, "192.0.2.2")));
-
- Map<TSeriesPartitionSlot, TRegionReplicaSet> schemaRegionMap = new HashMap<>();
- schemaRegionMap.put(executor.getSeriesPartitionSlot(device1), schemaRegion1);
- schemaRegionMap.put(executor.getSeriesPartitionSlot(device2), schemaRegion2);
- schemaRegionMap.put(executor.getSeriesPartitionSlot(device3), schemaRegion2);
- schemaPartitionMap.put("root.sg", schemaRegionMap);
- schemaPartition.setSchemaPartitionMap(schemaPartitionMap);
-
- analysis.setDataPartitionInfo(dataPartition);
- analysis.setSchemaPartitionInfo(schemaPartition);
- return analysis;
+ public static final Analysis ANALYSIS = constructAnalysis();
+
+ public static Analysis constructAnalysis() {
+ try {
+ SeriesPartitionExecutor executor =
+ SeriesPartitionExecutor.getSeriesPartitionExecutor(
+ IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+ IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+ Analysis analysis = new Analysis();
+
+ String device1 = "root.sg.d1";
+ String device2 = "root.sg.d22";
+ String device3 = "root.sg.d333";
+ String device4 = "root.sg.d4444";
+ String device5 = "root.sg.d55555";
+
+ TRegionReplicaSet dataRegion1 =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
+ Arrays.asList(
+ genDataNodeLocation(11, "192.0.1.1"), genDataNodeLocation(12, "192.0.1.2")));
+
+ TRegionReplicaSet dataRegion2 =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 2),
+ Arrays.asList(
+ genDataNodeLocation(21, "192.0.2.1"), genDataNodeLocation(22, "192.0.2.2")));
+
+ TRegionReplicaSet dataRegion3 =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 3),
+ Arrays.asList(
+ genDataNodeLocation(31, "192.0.3.1"), genDataNodeLocation(32, "192.0.3.2")));
+
+ TRegionReplicaSet dataRegion4 =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 4),
+ Arrays.asList(
+ genDataNodeLocation(41, "192.0.4.1"), genDataNodeLocation(42, "192.0.4.2")));
+
+ TRegionReplicaSet dataRegion5 =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 5),
+ Arrays.asList(
+ genDataNodeLocation(51, "192.0.5.1"), genDataNodeLocation(52, "192.0.5.2")));
+
+ DataPartition dataPartition =
+ new DataPartition(
+ IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+ IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+
+ Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
+ dataPartitionMap = new HashMap<>();
+ Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> sgPartitionMap =
+ new HashMap<>();
+
+ List<TRegionReplicaSet> d1DataRegions = new ArrayList<>();
+ d1DataRegions.add(dataRegion1);
+ d1DataRegions.add(dataRegion2);
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>> d1DataRegionMap = new HashMap<>();
+ d1DataRegionMap.put(new TTimePartitionSlot(), d1DataRegions);
+
+ List<TRegionReplicaSet> d2DataRegions = new ArrayList<>();
+ d2DataRegions.add(dataRegion3);
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>> d2DataRegionMap = new HashMap<>();
+ d2DataRegionMap.put(new TTimePartitionSlot(), d2DataRegions);
+
+ List<TRegionReplicaSet> d3DataRegions = new ArrayList<>();
+ d3DataRegions.add(dataRegion1);
+ d3DataRegions.add(dataRegion4);
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>> d3DataRegionMap = new HashMap<>();
+ d3DataRegionMap.put(new TTimePartitionSlot(), d3DataRegions);
+
+ List<TRegionReplicaSet> d4DataRegions = new ArrayList<>();
+ d4DataRegions.add(dataRegion1);
+ d4DataRegions.add(dataRegion4);
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>> d4DataRegionMap = new HashMap<>();
+ d4DataRegionMap.put(new TTimePartitionSlot(), d4DataRegions);
+
+ List<TRegionReplicaSet> d5DataRegions = new ArrayList<>();
+ d5DataRegions.add(dataRegion4);
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>> d5DataRegionMap = new HashMap<>();
+ d5DataRegionMap.put(new TTimePartitionSlot(), d5DataRegions);
+
+ sgPartitionMap.put(executor.getSeriesPartitionSlot(device1), d1DataRegionMap);
+ sgPartitionMap.put(executor.getSeriesPartitionSlot(device2), d2DataRegionMap);
+ sgPartitionMap.put(executor.getSeriesPartitionSlot(device3), d3DataRegionMap);
+ sgPartitionMap.put(executor.getSeriesPartitionSlot(device4), d4DataRegionMap);
+ sgPartitionMap.put(executor.getSeriesPartitionSlot(device5), d5DataRegionMap);
+
+ dataPartitionMap.put("root.sg", sgPartitionMap);
+
+ dataPartition.setDataPartitionMap(dataPartitionMap);
+
+ analysis.setDataPartitionInfo(dataPartition);
+
+ // construct AggregationExpression for GroupByLevel
+ Map<String, Set<Expression>> aggregationExpression = new HashMap<>();
+ Set<Expression> s1Expression = new HashSet<>();
+ s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d1.s1")));
+ s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d22.s1")));
+ s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d333.s1")));
+ s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d4444.s1")));
+
+ Set<Expression> s2Expression = new HashSet<>();
+ s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d1.s2")));
+ s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d22.s2")));
+ s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d333.s2")));
+ s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d4444.s2")));
+
+ aggregationExpression.put("root.sg.*.s1", s1Expression);
+ aggregationExpression.put("root.sg.*.s2", s2Expression);
+ // analysis.setAggregationExpressions(aggregationExpression);
+
+ // construct schema partition
+ SchemaPartition schemaPartition =
+ new SchemaPartition(
+ IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+ IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+ Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> schemaPartitionMap =
+ new HashMap<>();
+
+ TRegionReplicaSet schemaRegion1 =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 11),
+ Arrays.asList(
+ genDataNodeLocation(11, "192.0.1.1"), genDataNodeLocation(12, "192.0.1.2")));
+
+ TRegionReplicaSet schemaRegion2 =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 21),
+ Arrays.asList(
+ genDataNodeLocation(21, "192.0.2.1"), genDataNodeLocation(22, "192.0.2.2")));
+
+ Map<TSeriesPartitionSlot, TRegionReplicaSet> schemaRegionMap = new HashMap<>();
+ schemaRegionMap.put(executor.getSeriesPartitionSlot(device1), schemaRegion1);
+ schemaRegionMap.put(executor.getSeriesPartitionSlot(device2), schemaRegion2);
+ schemaRegionMap.put(executor.getSeriesPartitionSlot(device3), schemaRegion2);
+ schemaPartitionMap.put("root.sg", schemaRegionMap);
+ schemaPartition.setSchemaPartitionMap(schemaPartitionMap);
+
+ analysis.setDataPartitionInfo(dataPartition);
+ analysis.setSchemaPartitionInfo(schemaPartition);
+ analysis.setSchemaTree(genSchemaTree());
+ return analysis;
+ } catch (IllegalPathException e) {
+ return new Analysis();
+ }
+ }
+
+ private static ISchemaTree genSchemaTree() {
+ SchemaNode root = new SchemaInternalNode("root");
+
+ SchemaNode sg = new SchemaInternalNode("sg");
+ root.addChild("sg", sg);
+
+ SchemaMeasurementNode s1 =
+ new SchemaMeasurementNode("s1", new MeasurementSchema("s1", TSDataType.INT32));
+ s1.setTagMap(Collections.singletonMap("key1", "value1"));
+ SchemaMeasurementNode s2 =
+ new SchemaMeasurementNode("s2", new MeasurementSchema("s2", TSDataType.DOUBLE));
+ s2.setTagMap(Collections.singletonMap("key1", "value1"));
+
+ SchemaEntityNode d1 = new SchemaEntityNode("d1");
+ sg.addChild("d1", d1);
+ d1.addChild("s1", s1);
+ d1.addChild("s2", s2);
+
+ SchemaEntityNode d2 = new SchemaEntityNode("d22");
+ sg.addChild("d22", d2);
+ d2.addChild("s1", s1);
+ d2.addChild("s2", s2);
+
+ SchemaEntityNode d3 = new SchemaEntityNode("d333");
+ sg.addChild("d333", d2);
+ d3.addChild("s1", s1);
+ d3.addChild("s2", s2);
+
+ SchemaEntityNode d4 = new SchemaEntityNode("d4444");
+ sg.addChild("d4444", d2);
+ d4.addChild("s1", s1);
+ d4.addChild("s2", s2);
+
+ SchemaEntityNode d5 = new SchemaEntityNode("d55555");
+ sg.addChild("d55555", d2);
+ d5.addChild("s1", s1);
+ d5.addChild("s2", s2);
+
+ ClusterSchemaTree tree = new ClusterSchemaTree(root);
+ tree.setStorageGroups(Collections.singletonList("root.sg"));
+
+ return tree;
+ }
+
+ public static PlanNode genLogicalPlan(String sql, MPPQueryContext context) {
+ Statement statement = StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset());
+ Analyzer analyzer = new Analyzer(context, getFakePartitionFetcher(), getFakeSchemaFetcher());
+ Analysis analysis = analyzer.analyze(statement);
+ LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
+ return planner.plan(analysis).getRootNode();
+ }
+
+ private static ISchemaFetcher getFakeSchemaFetcher() {
+ return new ISchemaFetcher() {
+ @Override
+ public ISchemaTree fetchSchema(PathPatternTree patternTree) {
+ return ANALYSIS.getSchemaTree();
+ }
+
+ @Override
+ public ISchemaTree fetchSchemaWithTags(PathPatternTree patternTree) {
+ return ANALYSIS.getSchemaTree();
+ }
+
+ @Override
+ public ISchemaTree fetchSchemaWithAutoCreate(
+ PartialPath devicePath,
+ String[] measurements,
+ Function<Integer, TSDataType> getDataType,
+ boolean aligned) {
+ return ANALYSIS.getSchemaTree();
+ }
+
+ @Override
+ public ISchemaTree fetchSchemaListWithAutoCreate(
+ List<PartialPath> devicePath,
+ List<String[]> measurements,
+ List<TSDataType[]> tsDataTypes,
+ List<Boolean> aligned) {
+ return ANALYSIS.getSchemaTree();
+ }
+
+ @Override
+ public ISchemaTree fetchSchemaListWithAutoCreate(
+ List<PartialPath> devicePath,
+ List<String[]> measurements,
+ List<TSDataType[]> tsDataTypes,
+ List<TSEncoding[]> encodings,
+ List<CompressionType[]> compressionTypes,
+ List<Boolean> aligned) {
+ return ANALYSIS.getSchemaTree();
+ }
+
+ @Override
+ public Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath path) {
+ return null;
+ }
+
+ @Override
+ public Map<Integer, Template> checkAllRelatedTemplate(PartialPath pathPattern) {
+ return null;
+ }
+
+ @Override
+ public Pair<Template, List<PartialPath>> getAllPathsSetTemplate(String templateName) {
+ return null;
+ }
+
+ @Override
+ public void invalidAllCache() {}
+ };
+ }
+
+ private static IPartitionFetcher getFakePartitionFetcher() {
+ return new IPartitionFetcher() {
+ @Override
+ public SchemaPartition getSchemaPartition(PathPatternTree patternTree) {
+ return ANALYSIS.getSchemaPartitionInfo();
+ }
+
+ @Override
+ public SchemaPartition getOrCreateSchemaPartition(PathPatternTree patternTree) {
+ return ANALYSIS.getSchemaPartitionInfo();
+ }
+
+ @Override
+ public DataPartition getDataPartition(
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+ return ANALYSIS.getDataPartitionInfo();
+ }
+
+ @Override
+ public DataPartition getOrCreateDataPartition(
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+ return ANALYSIS.getDataPartitionInfo();
+ }
+
+ @Override
+ public DataPartition getOrCreateDataPartition(
+ List<DataPartitionQueryParam> dataPartitionQueryParams) {
+ return ANALYSIS.getDataPartitionInfo();
+ }
+
+ @Override
+ public SchemaNodeManagementPartition getSchemaNodeManagementPartitionWithLevel(
+ PathPatternTree patternTree, Integer level) {
+ return null;
+ }
+
+ @Override
+ public boolean updateRegionCache(TRegionRouteReq req) {
+ return false;
+ }
+
+ @Override
+ public void invalidAllCache() {}
+ };
}
private static TDataNodeLocation genDataNodeLocation(int dataNodeId, String ip) {