You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/11/21 10:51:08 UTC

[iotdb] 01/01: add step check when do distribution plan for 1 series

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/fix_agg_1_series
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c02621fa6d8540e58d70b262b46b103fd46b74ee
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Nov 21 18:50:53 2022 +0800

    add step check when do distribution plan for 1 series
---
 .../plan/planner/distribution/SourceRewriter.java  |  2 +-
 .../distribution/AggregationDistributionTest.java  | 63 ++++++++++++++++++++++
 2 files changed, 64 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index fc532f5d36..2274c6ce1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -397,7 +397,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
               rootAggDescriptorList.add(
                   new AggregationDescriptor(
                       descriptor.getAggregationFuncName(),
-                      AggregationStep.FINAL,
+                      context.isRoot ? AggregationStep.FINAL : AggregationStep.INTERMEDIATE,
                       descriptor.getInputExpressions()));
             });
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
index 50cc202a52..6db6b2fa36 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
@@ -64,6 +64,69 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class AggregationDistributionTest {
+
+  @Test
+  public void testAggregation1Series2Regions() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_1_series_2_regions");
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+    String sql = "select count(s1) from root.sg.d1";
+    String d1s1Path = "root.sg.d1.s1";
+
+    Analysis analysis = Util.analyze(sql, context);
+    PlanNode rootNode = Util.genLogicalPlan(analysis, context);
+
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, rootNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(2, plan.getInstances().size());
+    Map<String, AggregationStep> expectedStep = new HashMap<>();
+    expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+    List<FragmentInstance> fragmentInstances = plan.getInstances();
+    fragmentInstances.forEach(
+        f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
+    AggregationNode aggregationNode =
+        (AggregationNode)
+            fragmentInstances.get(0).getFragment().getPlanNodeTree().getChildren().get(0);
+    assertEquals(
+        AggregationStep.FINAL, aggregationNode.getAggregationDescriptorList().get(0).getStep());
+  }
+
+  @Test
+  public void testAggregation1Series2RegionsWithSlidingWindow() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_1_series_2_regions_sliding_window");
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+    String sql = "select count(s1) from root.sg.d1 group by ([0, 100), 5ms, 1ms)";
+    String d1s1Path = "root.sg.d1.s1";
+
+    Analysis analysis = Util.analyze(sql, context);
+    PlanNode rootNode = Util.genLogicalPlan(analysis, context);
+
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, rootNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(2, plan.getInstances().size());
+    Map<String, AggregationStep> expectedStep = new HashMap<>();
+    expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+    List<FragmentInstance> fragmentInstances = plan.getInstances();
+    fragmentInstances.forEach(
+        f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
+    AggregationNode aggregationNode =
+        (AggregationNode)
+            fragmentInstances
+                .get(0)
+                .getFragment()
+                .getPlanNodeTree()
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0);
+    assertEquals(
+        AggregationStep.INTERMEDIATE,
+        aggregationNode.getAggregationDescriptorList().get(0).getStep());
+  }
+
   @Test
   public void testTimeJoinAggregationSinglePerRegion() throws IllegalPathException {
     QueryId queryId = new QueryId("test_query_time_join_aggregation");