You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/22 01:22:16 UTC
[iotdb] branch master updated: Fix the bug that the AggregationNode is always Final (#8073)
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new bc97eee3a9 Fix the bug that the AggregationNode is always Final (#8073)
bc97eee3a9 is described below
commit bc97eee3a964a68ac1f0d11608e7f0f8c8c7f9c0
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Tue Nov 22 09:22:10 2022 +0800
Fix the bug that the AggregationNode is always Final (#8073)
---
.../plan/planner/distribution/SourceRewriter.java | 2 +-
.../distribution/AggregationDistributionTest.java | 63 ++++++++++++++++++++++
2 files changed, 64 insertions(+), 1 deletion(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index fc532f5d36..2274c6ce1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -397,7 +397,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
rootAggDescriptorList.add(
new AggregationDescriptor(
descriptor.getAggregationFuncName(),
- AggregationStep.FINAL,
+ context.isRoot ? AggregationStep.FINAL : AggregationStep.INTERMEDIATE,
descriptor.getInputExpressions()));
});
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
index 50cc202a52..6db6b2fa36 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
@@ -64,6 +64,69 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class AggregationDistributionTest {
+
+ @Test
+ public void testAggregation1Series2Regions() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_1_series_2_regions");
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ String sql = "select count(s1) from root.sg.d1";
+ String d1s1Path = "root.sg.d1.s1";
+
+ Analysis analysis = Util.analyze(sql, context);
+ PlanNode rootNode = Util.genLogicalPlan(analysis, context);
+
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, rootNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(2, plan.getInstances().size());
+ Map<String, AggregationStep> expectedStep = new HashMap<>();
+ expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+ List<FragmentInstance> fragmentInstances = plan.getInstances();
+ fragmentInstances.forEach(
+ f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
+ AggregationNode aggregationNode =
+ (AggregationNode)
+ fragmentInstances.get(0).getFragment().getPlanNodeTree().getChildren().get(0);
+ assertEquals(
+ AggregationStep.FINAL, aggregationNode.getAggregationDescriptorList().get(0).getStep());
+ }
+
+ @Test
+ public void testAggregation1Series2RegionsWithSlidingWindow() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_1_series_2_regions_sliding_window");
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ String sql = "select count(s1) from root.sg.d1 group by ([0, 100), 5ms, 1ms)";
+ String d1s1Path = "root.sg.d1.s1";
+
+ Analysis analysis = Util.analyze(sql, context);
+ PlanNode rootNode = Util.genLogicalPlan(analysis, context);
+
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, rootNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(2, plan.getInstances().size());
+ Map<String, AggregationStep> expectedStep = new HashMap<>();
+ expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+ List<FragmentInstance> fragmentInstances = plan.getInstances();
+ fragmentInstances.forEach(
+ f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree()));
+ AggregationNode aggregationNode =
+ (AggregationNode)
+ fragmentInstances
+ .get(0)
+ .getFragment()
+ .getPlanNodeTree()
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0);
+ assertEquals(
+ AggregationStep.INTERMEDIATE,
+ aggregationNode.getAggregationDescriptorList().get(0).getStep());
+ }
+
@Test
public void testTimeJoinAggregationSinglePerRegion() throws IllegalPathException {
QueryId queryId = new QueryId("test_query_time_join_aggregation");