You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/11/21 10:51:07 UTC

[iotdb] branch xingtanzjr/fix_agg_1_series created (now c02621fa6d)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch xingtanzjr/fix_agg_1_series
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at c02621fa6d add step check when do distribution plan for 1 series

This branch includes the following new commits:

     new c02621fa6d add step check when do distribution plan for 1 series

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: add step check when do distribution plan for 1 series

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/fix_agg_1_series
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c02621fa6d8540e58d70b262b46b103fd46b74ee
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Mon Nov 21 18:50:53 2022 +0800

    add step check when do distribution plan for 1 series
---
 .../plan/planner/distribution/SourceRewriter.java  |  2 +-
 .../distribution/AggregationDistributionTest.java  | 63 ++++++++++++++++++++++
 2 files changed, 64 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index fc532f5d36..2274c6ce1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -397,7 +397,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
               rootAggDescriptorList.add(
                   new AggregationDescriptor(
                       descriptor.getAggregationFuncName(),
-                      AggregationStep.FINAL,
+                      context.isRoot ? AggregationStep.FINAL : AggregationStep.INTERMEDIATE,
                       descriptor.getInputExpressions()));
             });
 
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
index 50cc202a52..6db6b2fa36 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
@@ -64,6 +64,69 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class AggregationDistributionTest {
+
+  @Test
+  public void testAggregation1Series2Regions() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_1_series_2_regions");
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+    String sql = "select count(s1) from root.sg.d1";
+    String d1s1Path = "root.sg.d1.s1";
+
+    Analysis analysis = Util.analyze(sql, context);
+    PlanNode rootNode = Util.genLogicalPlan(analysis, context);
+
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, rootNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(2, plan.getInstances().size());
+    Map<String, AggregationStep> expectedStep = new HashMap<>();
+    expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+    List<FragmentInstance> fragmentInstances = plan.getInstances();
+    fragmentInstances.forEach(
+        f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
+    AggregationNode aggregationNode =
+        (AggregationNode)
+            fragmentInstances.get(0).getFragment().getPlanNodeTree().getChildren().get(0);
+    assertEquals(
+        AggregationStep.FINAL, aggregationNode.getAggregationDescriptorList().get(0).getStep());
+  }
+
+  @Test
+  public void testAggregation1Series2RegionsWithSlidingWindow() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_1_series_2_regions_sliding_window");
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+    String sql = "select count(s1) from root.sg.d1 group by ([0, 100), 5ms, 1ms)";
+    String d1s1Path = "root.sg.d1.s1";
+
+    Analysis analysis = Util.analyze(sql, context);
+    PlanNode rootNode = Util.genLogicalPlan(analysis, context);
+
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, rootNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(2, plan.getInstances().size());
+    Map<String, AggregationStep> expectedStep = new HashMap<>();
+    expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+    List<FragmentInstance> fragmentInstances = plan.getInstances();
+    fragmentInstances.forEach(
+        f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
+    AggregationNode aggregationNode =
+        (AggregationNode)
+            fragmentInstances
+                .get(0)
+                .getFragment()
+                .getPlanNodeTree()
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0);
+    assertEquals(
+        AggregationStep.INTERMEDIATE,
+        aggregationNode.getAggregationDescriptorList().get(0).getStep());
+  }
+
   @Test
   public void testTimeJoinAggregationSinglePerRegion() throws IllegalPathException {
     QueryId queryId = new QueryId("test_query_time_join_aggregation");