You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/19 02:47:13 UTC

[iotdb] branch xingtanzjr/agg_distribution_plan updated: add more verifications for tests

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/agg_distribution_plan
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/xingtanzjr/agg_distribution_plan by this push:
     new 82425d29ed add more verifications for tests
82425d29ed is described below

commit 82425d29ed9d9b63f6135810d303ec7110a40c83
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu May 19 10:47:04 2022 +0800

    add more verifications for tests
---
 .../db/mpp/plan/plan/DistributionPlannerTest.java  | 68 ++++++++++++++++------
 1 file changed, 50 insertions(+), 18 deletions(-)

diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
index b465c8e6bd..606e9b09f0 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
 import org.apache.iotdb.db.mpp.plan.planner.DistributionPlanner;
 import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
@@ -51,6 +52,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
@@ -315,18 +317,18 @@ public class DistributionPlannerTest {
             new MeasurementPath(d1s1Path, TSDataType.INT32),
             d1s1Descriptors));
 
-    String d22s1Path = "root.sg.d22.s1";
-    List<AggregationDescriptor> d22s1Descriptors = new ArrayList<>();
-    d22s1Descriptors.add(
+    String d2s1Path = "root.sg.d22.s1";
+    List<AggregationDescriptor> d2s1Descriptors = new ArrayList<>();
+    d2s1Descriptors.add(
         new AggregationDescriptor(
             AggregationType.COUNT,
             AggregationStep.FINAL,
-            Collections.singletonList(new TimeSeriesOperand(new PartialPath(d22s1Path)))));
+            Collections.singletonList(new TimeSeriesOperand(new PartialPath(d2s1Path)))));
     timeJoinNode.addChild(
         new SeriesAggregationScanNode(
             queryId.genPlanNodeId(),
-            new MeasurementPath(d22s1Path, TSDataType.INT32),
-            d22s1Descriptors));
+            new MeasurementPath(d2s1Path, TSDataType.INT32),
+            d2s1Descriptors));
     Analysis analysis = constructAnalysis();
     MPPQueryContext context =
         new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
@@ -334,6 +336,26 @@ public class DistributionPlannerTest {
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
     DistributedQueryPlan plan = planner.planFragments();
     assertEquals(3, plan.getInstances().size());
+    Map<String, AggregationStep> expectedStep = new HashMap<>();
+    expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+    expectedStep.put(d2s1Path, AggregationStep.FINAL);
+    List<FragmentInstance> fragmentInstances = plan.getInstances();
+    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+  }
+
+  private void verifyAggregationStep(Map<String, AggregationStep> expected, PlanNode root) {
+    if (root == null) {
+      return;
+    }
+    if (root instanceof SeriesAggregationSourceNode) {
+      SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode) root;
+      List<AggregationDescriptor> descriptorList = handle.getAggregationDescriptorList();
+      descriptorList.forEach(
+          d -> {
+            assertEquals(expected.get(handle.getPartitionPath().getFullPath()), d.getStep());
+          });
+    }
+    root.getChildren().forEach(child -> verifyAggregationStep(expected, child));
   }
 
   @Test
@@ -353,18 +375,18 @@ public class DistributionPlannerTest {
             new MeasurementPath(d1s1Path, TSDataType.INT32),
             d1s1Descriptors));
 
-    String d333s1Path = "root.sg.d333.s1";
-    List<AggregationDescriptor> d22s1Descriptors = new ArrayList<>();
-    d1s1Descriptors.add(
+    String d3s1Path = "root.sg.d333.s1";
+    List<AggregationDescriptor> d2s1Descriptors = new ArrayList<>();
+    d2s1Descriptors.add(
         new AggregationDescriptor(
             AggregationType.COUNT,
             AggregationStep.FINAL,
-            Collections.singletonList(new TimeSeriesOperand(new PartialPath(d333s1Path)))));
+            Collections.singletonList(new TimeSeriesOperand(new PartialPath(d3s1Path)))));
     timeJoinNode.addChild(
         new SeriesAggregationScanNode(
             queryId.genPlanNodeId(),
-            new MeasurementPath(d333s1Path, TSDataType.INT32),
-            d22s1Descriptors));
+            new MeasurementPath(d3s1Path, TSDataType.INT32),
+            d2s1Descriptors));
     Analysis analysis = constructAnalysis();
     MPPQueryContext context =
         new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
@@ -372,6 +394,11 @@ public class DistributionPlannerTest {
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
     DistributedQueryPlan plan = planner.planFragments();
     assertEquals(3, plan.getInstances().size());
+    Map<String, AggregationStep> expectedStep = new HashMap<>();
+    expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+    expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
+    List<FragmentInstance> fragmentInstances = plan.getInstances();
+    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
   }
 
   @Test
@@ -391,18 +418,18 @@ public class DistributionPlannerTest {
             new MeasurementPath(d1s1Path, TSDataType.INT32),
             d1s1Descriptors));
 
-    String d333s1Path = "root.sg.d4444.s1";
-    List<AggregationDescriptor> d22s1Descriptors = new ArrayList<>();
-    d1s1Descriptors.add(
+    String d4s1Path = "root.sg.d4444.s1";
+    List<AggregationDescriptor> d4s1Descriptors = new ArrayList<>();
+    d4s1Descriptors.add(
         new AggregationDescriptor(
             AggregationType.COUNT,
             AggregationStep.FINAL,
-            Collections.singletonList(new TimeSeriesOperand(new PartialPath(d333s1Path)))));
+            Collections.singletonList(new TimeSeriesOperand(new PartialPath(d4s1Path)))));
     timeJoinNode.addChild(
         new SeriesAggregationScanNode(
             queryId.genPlanNodeId(),
-            new MeasurementPath(d333s1Path, TSDataType.INT32),
-            d22s1Descriptors));
+            new MeasurementPath(d4s1Path, TSDataType.INT32),
+            d4s1Descriptors));
     Analysis analysis = constructAnalysis();
     MPPQueryContext context =
         new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
@@ -410,6 +437,11 @@ public class DistributionPlannerTest {
         new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
     DistributedQueryPlan plan = planner.planFragments();
     assertEquals(2, plan.getInstances().size());
+    Map<String, AggregationStep> expectedStep = new HashMap<>();
+    expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+    expectedStep.put(d4s1Path, AggregationStep.PARTIAL);
+    List<FragmentInstance> fragmentInstances = plan.getInstances();
+    fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
   }
 
   @Test