You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/19 02:47:13 UTC
[iotdb] branch xingtanzjr/agg_distribution_plan updated: add more verifications for tests
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/agg_distribution_plan
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/xingtanzjr/agg_distribution_plan by this push:
new 82425d29ed add more verifications for tests
82425d29ed is described below
commit 82425d29ed9d9b63f6135810d303ec7110a40c83
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu May 19 10:47:04 2022 +0800
add more verifications for tests
---
.../db/mpp/plan/plan/DistributionPlannerTest.java | 68 ++++++++++++++++------
1 file changed, 50 insertions(+), 18 deletions(-)
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
index b465c8e6bd..606e9b09f0 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
import org.apache.iotdb.db.mpp.plan.planner.DistributionPlanner;
import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
@@ -51,6 +52,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
@@ -315,18 +317,18 @@ public class DistributionPlannerTest {
new MeasurementPath(d1s1Path, TSDataType.INT32),
d1s1Descriptors));
- String d22s1Path = "root.sg.d22.s1";
- List<AggregationDescriptor> d22s1Descriptors = new ArrayList<>();
- d22s1Descriptors.add(
+ String d2s1Path = "root.sg.d22.s1";
+ List<AggregationDescriptor> d2s1Descriptors = new ArrayList<>();
+ d2s1Descriptors.add(
new AggregationDescriptor(
AggregationType.COUNT,
AggregationStep.FINAL,
- Collections.singletonList(new TimeSeriesOperand(new PartialPath(d22s1Path)))));
+ Collections.singletonList(new TimeSeriesOperand(new PartialPath(d2s1Path)))));
timeJoinNode.addChild(
new SeriesAggregationScanNode(
queryId.genPlanNodeId(),
- new MeasurementPath(d22s1Path, TSDataType.INT32),
- d22s1Descriptors));
+ new MeasurementPath(d2s1Path, TSDataType.INT32),
+ d2s1Descriptors));
Analysis analysis = constructAnalysis();
MPPQueryContext context =
new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
@@ -334,6 +336,26 @@ public class DistributionPlannerTest {
new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
DistributedQueryPlan plan = planner.planFragments();
assertEquals(3, plan.getInstances().size());
+ Map<String, AggregationStep> expectedStep = new HashMap<>();
+ expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+ expectedStep.put(d2s1Path, AggregationStep.FINAL);
+ List<FragmentInstance> fragmentInstances = plan.getInstances();
+ fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+ }
+
+ private void verifyAggregationStep(Map<String, AggregationStep> expected, PlanNode root) {
+ if (root == null) {
+ return;
+ }
+ if (root instanceof SeriesAggregationSourceNode) {
+ SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode) root;
+ List<AggregationDescriptor> descriptorList = handle.getAggregationDescriptorList();
+ descriptorList.forEach(
+ d -> {
+ assertEquals(expected.get(handle.getPartitionPath().getFullPath()), d.getStep());
+ });
+ }
+ root.getChildren().forEach(child -> verifyAggregationStep(expected, child));
}
@Test
@@ -353,18 +375,18 @@ public class DistributionPlannerTest {
new MeasurementPath(d1s1Path, TSDataType.INT32),
d1s1Descriptors));
- String d333s1Path = "root.sg.d333.s1";
- List<AggregationDescriptor> d22s1Descriptors = new ArrayList<>();
- d1s1Descriptors.add(
+ String d3s1Path = "root.sg.d333.s1";
+ List<AggregationDescriptor> d2s1Descriptors = new ArrayList<>();
+ d2s1Descriptors.add(
new AggregationDescriptor(
AggregationType.COUNT,
AggregationStep.FINAL,
- Collections.singletonList(new TimeSeriesOperand(new PartialPath(d333s1Path)))));
+ Collections.singletonList(new TimeSeriesOperand(new PartialPath(d3s1Path)))));
timeJoinNode.addChild(
new SeriesAggregationScanNode(
queryId.genPlanNodeId(),
- new MeasurementPath(d333s1Path, TSDataType.INT32),
- d22s1Descriptors));
+ new MeasurementPath(d3s1Path, TSDataType.INT32),
+ d2s1Descriptors));
Analysis analysis = constructAnalysis();
MPPQueryContext context =
new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
@@ -372,6 +394,11 @@ public class DistributionPlannerTest {
new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
DistributedQueryPlan plan = planner.planFragments();
assertEquals(3, plan.getInstances().size());
+ Map<String, AggregationStep> expectedStep = new HashMap<>();
+ expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+ expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
+ List<FragmentInstance> fragmentInstances = plan.getInstances();
+ fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
}
@Test
@@ -391,18 +418,18 @@ public class DistributionPlannerTest {
new MeasurementPath(d1s1Path, TSDataType.INT32),
d1s1Descriptors));
- String d333s1Path = "root.sg.d4444.s1";
- List<AggregationDescriptor> d22s1Descriptors = new ArrayList<>();
- d1s1Descriptors.add(
+ String d4s1Path = "root.sg.d4444.s1";
+ List<AggregationDescriptor> d4s1Descriptors = new ArrayList<>();
+ d4s1Descriptors.add(
new AggregationDescriptor(
AggregationType.COUNT,
AggregationStep.FINAL,
- Collections.singletonList(new TimeSeriesOperand(new PartialPath(d333s1Path)))));
+ Collections.singletonList(new TimeSeriesOperand(new PartialPath(d4s1Path)))));
timeJoinNode.addChild(
new SeriesAggregationScanNode(
queryId.genPlanNodeId(),
- new MeasurementPath(d333s1Path, TSDataType.INT32),
- d22s1Descriptors));
+ new MeasurementPath(d4s1Path, TSDataType.INT32),
+ d4s1Descriptors));
Analysis analysis = constructAnalysis();
MPPQueryContext context =
new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
@@ -410,6 +437,11 @@ public class DistributionPlannerTest {
new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
DistributedQueryPlan plan = planner.planFragments();
assertEquals(2, plan.getInstances().size());
+ Map<String, AggregationStep> expectedStep = new HashMap<>();
+ expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+ expectedStep.put(d4s1Path, AggregationStep.PARTIAL);
+ List<FragmentInstance> fragmentInstances = plan.getInstances();
+ fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
}
@Test