You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/19 11:41:28 UTC
[iotdb] branch xingtanzjr/agg_distribution_plan updated: tmp saved
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/agg_distribution_plan
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/xingtanzjr/agg_distribution_plan by this push:
new 72478856fd tmp saved
72478856fd is described below
commit 72478856fd509634b0ae801bd86ee6c955162227
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu May 19 19:41:18 2022 +0800
tmp saved
---
.../db/mpp/plan/planner/DistributionPlanner.java | 17 +-
.../planner/plan/node/process/AggregationNode.java | 4 +
.../plan/node/process/GroupByLevelNode.java | 10 +-
.../db/mpp/plan/plan/DistributionPlannerTest.java | 217 +++++++++++++--------
4 files changed, 157 insertions(+), 91 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
index 12d3bc8493..2a14234f8d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.SimplePlanNodeRewriter;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
@@ -102,8 +103,11 @@ public class DistributionPlanner {
}
public DistributedQueryPlan planFragments() {
+ System.out.println(PlanNodeUtil.nodeToString(logicalPlan.getRootNode()));
PlanNode rootAfterRewrite = rewriteSource();
+ System.out.println(PlanNodeUtil.nodeToString(rootAfterRewrite));
PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
+ System.out.println(PlanNodeUtil.nodeToString(rootWithExchange));
if (analysis.getStatement() instanceof QueryStatement) {
analysis
.getRespDatasetHeader()
@@ -492,7 +496,7 @@ public class DistributionPlanner {
// Then, we calculate the attributes for GroupByLevelNode in each level
calculateGroupByLevelNodeAttributes(newRoot, 0);
- return null;
+ return newRoot;
}
private void calculateGroupByLevelNodeAttributes(PlanNode node, int level) {
@@ -545,10 +549,8 @@ public class DistributionPlanner {
outputColumnList.add(column);
descriptorList.add(descriptor);
}
- handle.getOutputColumnNames().clear();
- handle.getOutputColumnNames().addAll(outputColumnList);
- handle.getAggregationDescriptorList().clear();
- handle.getAggregationDescriptorList().addAll(descriptorList);
+ handle.setOutputColumnNames(outputColumnList);
+ handle.setAggregationDescriptorList(descriptorList);
}
private List<SeriesAggregationSourceNode> splitAggregationSourceByPartition(
@@ -716,6 +718,11 @@ public class DistributionPlanner {
return processMultiChildNode(node, context);
}
+ @Override
+ public PlanNode visitGroupByLevel(GroupByLevelNode node, NodeGroupContext context) {
+ return processMultiChildNode(node, context);
+ }
+
private PlanNode processMultiChildNode(MultiChildNode node, NodeGroupContext context) {
MultiChildNode newNode = (MultiChildNode) node.clone();
List<PlanNode> visitedChildren = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
index 8e9cb67948..7f1d2bb6ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
@@ -125,6 +125,10 @@ public class AggregationNode extends MultiChildNode {
.collect(Collectors.toList());
}
+ public void setAggregationDescriptorList(List<AggregationDescriptor> aggregationDescriptorList) {
+ this.aggregationDescriptorList = aggregationDescriptorList;
+ }
+
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitRowBasedSeriesAggregate(this, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
index 034456c692..67e656150b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
@@ -48,7 +48,7 @@ import java.util.Objects;
public class GroupByLevelNode extends AggregationNode {
// column name of each output column
- private final List<String> outputColumnNames;
+ private List<String> outputColumnNames;
public GroupByLevelNode(
PlanNodeId id,
@@ -99,6 +99,10 @@ public class GroupByLevelNode extends AggregationNode {
return outputColumnNames;
}
+ public void setOutputColumnNames(List<String> outputColumnNames) {
+ this.outputColumnNames = outputColumnNames;
+ }
+
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitGroupByLevel(this, context);
@@ -165,4 +169,8 @@ public class GroupByLevelNode extends AggregationNode {
public int hashCode() {
return Objects.hash(super.hashCode(), outputColumnNames);
}
+
+ public String toString() {
+ return String.format("GroupByLevelNode-%s: Output: %s, Input: %s", getPlanNodeId(), getOutputColumnNames(), aggregationDescriptorList.size());
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
index a1025fe686..42c8b2eb77 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
@@ -44,9 +44,11 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
@@ -59,6 +61,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.expression.Expression;
import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -69,8 +72,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import static org.junit.Assert.assertEquals;
@@ -302,30 +307,11 @@ public class DistributionPlannerTest {
QueryId queryId = new QueryId("test_query_time_join_aggregation");
TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
String d1s1Path = "root.sg.d1.s1";
- List<AggregationDescriptor> d1s1Descriptors = new ArrayList<>();
- d1s1Descriptors.add(
- new AggregationDescriptor(
- AggregationType.COUNT,
- AggregationStep.FINAL,
- Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s1Path)))));
- timeJoinNode.addChild(
- new SeriesAggregationScanNode(
- queryId.genPlanNodeId(),
- new MeasurementPath(d1s1Path, TSDataType.INT32),
- d1s1Descriptors));
+ timeJoinNode.addChild(genAggregationSourceNode(queryId, d1s1Path, AggregationType.COUNT));
String d2s1Path = "root.sg.d22.s1";
- List<AggregationDescriptor> d2s1Descriptors = new ArrayList<>();
- d2s1Descriptors.add(
- new AggregationDescriptor(
- AggregationType.COUNT,
- AggregationStep.FINAL,
- Collections.singletonList(new TimeSeriesOperand(new PartialPath(d2s1Path)))));
- timeJoinNode.addChild(
- new SeriesAggregationScanNode(
- queryId.genPlanNodeId(),
- new MeasurementPath(d2s1Path, TSDataType.INT32),
- d2s1Descriptors));
+ timeJoinNode.addChild(genAggregationSourceNode(queryId, d2s1Path, AggregationType.COUNT));
+
Analysis analysis = constructAnalysis();
MPPQueryContext context =
new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
@@ -360,30 +346,11 @@ public class DistributionPlannerTest {
QueryId queryId = new QueryId("test_query_time_join_aggregation");
TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
String d1s1Path = "root.sg.d1.s1";
- List<AggregationDescriptor> d1s1Descriptors = new ArrayList<>();
- d1s1Descriptors.add(
- new AggregationDescriptor(
- AggregationType.COUNT,
- AggregationStep.FINAL,
- Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s1Path)))));
- timeJoinNode.addChild(
- new SeriesAggregationScanNode(
- queryId.genPlanNodeId(),
- new MeasurementPath(d1s1Path, TSDataType.INT32),
- d1s1Descriptors));
+ timeJoinNode.addChild(genAggregationSourceNode(queryId, d1s1Path, AggregationType.COUNT));
String d3s1Path = "root.sg.d333.s1";
- List<AggregationDescriptor> d2s1Descriptors = new ArrayList<>();
- d2s1Descriptors.add(
- new AggregationDescriptor(
- AggregationType.COUNT,
- AggregationStep.FINAL,
- Collections.singletonList(new TimeSeriesOperand(new PartialPath(d3s1Path)))));
- timeJoinNode.addChild(
- new SeriesAggregationScanNode(
- queryId.genPlanNodeId(),
- new MeasurementPath(d3s1Path, TSDataType.INT32),
- d2s1Descriptors));
+ timeJoinNode.addChild(genAggregationSourceNode(queryId, d3s1Path, AggregationType.COUNT));
+
Analysis analysis = constructAnalysis();
MPPQueryContext context =
new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
@@ -402,31 +369,11 @@ public class DistributionPlannerTest {
public void testTimeJoinAggregationMultiPerRegion2() throws IllegalPathException {
QueryId queryId = new QueryId("test_query_time_join_aggregation");
TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
- String d1s1Path = "root.sg.d333.s1";
- List<AggregationDescriptor> d1s1Descriptors = new ArrayList<>();
- d1s1Descriptors.add(
- new AggregationDescriptor(
- AggregationType.COUNT,
- AggregationStep.FINAL,
- Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s1Path)))));
- timeJoinNode.addChild(
- new SeriesAggregationScanNode(
- queryId.genPlanNodeId(),
- new MeasurementPath(d1s1Path, TSDataType.INT32),
- d1s1Descriptors));
+ String d3s1Path = "root.sg.d333.s1";
+ timeJoinNode.addChild(genAggregationSourceNode(queryId, d3s1Path, AggregationType.COUNT));
String d4s1Path = "root.sg.d4444.s1";
- List<AggregationDescriptor> d4s1Descriptors = new ArrayList<>();
- d4s1Descriptors.add(
- new AggregationDescriptor(
- AggregationType.COUNT,
- AggregationStep.FINAL,
- Collections.singletonList(new TimeSeriesOperand(new PartialPath(d4s1Path)))));
- timeJoinNode.addChild(
- new SeriesAggregationScanNode(
- queryId.genPlanNodeId(),
- new MeasurementPath(d4s1Path, TSDataType.INT32),
- d4s1Descriptors));
+ timeJoinNode.addChild(genAggregationSourceNode(queryId, d4s1Path, AggregationType.COUNT));
Analysis analysis = constructAnalysis();
MPPQueryContext context =
new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
@@ -435,12 +382,94 @@ public class DistributionPlannerTest {
DistributedQueryPlan plan = planner.planFragments();
assertEquals(2, plan.getInstances().size());
Map<String, AggregationStep> expectedStep = new HashMap<>();
- expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+ expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
expectedStep.put(d4s1Path, AggregationStep.PARTIAL);
List<FragmentInstance> fragmentInstances = plan.getInstances();
fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
}
+ @Test
+ public void testGroupByLevelWithTwoChildren() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_group_by_level_two_children");
+ String d1s1Path = "root.sg.d1.s1";
+ String d2s1Path = "root.sg.d22.s1";
+ List<String> outputColumns = Collections.singletonList("root.sg.*.s1");
+
+ GroupByLevelNode groupByLevelNode =
+ new GroupByLevelNode(
+ new PlanNodeId("TestGroupByLevelNode"),
+ Arrays.asList(genAggregationSourceNode(queryId, d1s1Path, AggregationType.COUNT),
+ genAggregationSourceNode(queryId, d2s1Path, AggregationType.COUNT)),
+ Collections.singletonList(
+ new AggregationDescriptor(
+ AggregationType.COUNT,
+ AggregationStep.FINAL,
+ Arrays.asList(
+ new TimeSeriesOperand(new PartialPath(d1s1Path)),
+ new TimeSeriesOperand(new PartialPath(d2s1Path))))),
+ outputColumns);
+ Analysis analysis = constructAnalysis();
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, groupByLevelNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(3, plan.getInstances().size());
+ Map<String, AggregationStep> expectedStep = new HashMap<>();
+ expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+ expectedStep.put(d2s1Path, AggregationStep.FINAL);
+ List<FragmentInstance> fragmentInstances = plan.getInstances();
+ fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+ }
+
+ @Test
+ public void testAggregationWithMultiGroupByLevelNode() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_group_by_level_two_children");
+ String d1s1Path = "root.sg.d333.s1";
+ String d2s1Path = "root.sg.d4444.s1";
+ List<String> outputColumns = Collections.singletonList("root.sg.*.s1");
+
+ GroupByLevelNode groupByLevelNode =
+ new GroupByLevelNode(
+ new PlanNodeId("TestGroupByLevelNode"),
+ Arrays.asList(genAggregationSourceNode(queryId, d1s1Path, AggregationType.COUNT),
+ genAggregationSourceNode(queryId, d2s1Path, AggregationType.COUNT)),
+ Collections.singletonList(
+ new AggregationDescriptor(
+ AggregationType.COUNT,
+ AggregationStep.FINAL,
+ Arrays.asList(
+ new TimeSeriesOperand(new PartialPath(d1s1Path)),
+ new TimeSeriesOperand(new PartialPath(d2s1Path))))),
+ outputColumns);
+ Analysis analysis = constructAnalysis();
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, groupByLevelNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(3, plan.getInstances().size());
+ Map<String, AggregationStep> expectedStep = new HashMap<>();
+ expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+ expectedStep.put(d2s1Path, AggregationStep.FINAL);
+ List<FragmentInstance> fragmentInstances = plan.getInstances();
+ fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep, f.getFragment().getRoot()));
+ }
+
+ private SeriesAggregationSourceNode genAggregationSourceNode(QueryId queryId, String path, AggregationType type) throws IllegalPathException {
+ List<AggregationDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(
+ new AggregationDescriptor(
+ type,
+ AggregationStep.FINAL,
+ Collections.singletonList(new TimeSeriesOperand(new PartialPath(path)))));
+
+ return new SeriesAggregationScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath(path, TSDataType.INT32),
+ descriptors);
+ }
+
@Test
public void testParallelPlanWithAlignedSeries() throws IllegalPathException {
QueryId queryId = new QueryId("test_query_aligned");
@@ -498,16 +527,16 @@ public class DistributionPlannerTest {
queryId.genPlanNodeId(),
new PartialPath("root.sg.d1"),
false,
- new String[] {
- "s1",
+ new String[]{
+ "s1",
},
- new TSDataType[] {TSDataType.INT32},
+ new TSDataType[]{TSDataType.INT32},
1L,
- new Object[] {10},
+ new Object[]{10},
false);
insertRowNode.setMeasurementSchemas(
- new MeasurementSchema[] {
- new MeasurementSchema("s1", TSDataType.INT32),
+ new MeasurementSchema[]{
+ new MeasurementSchema("s1", TSDataType.INT32),
});
Analysis analysis = constructAnalysis();
@@ -529,14 +558,14 @@ public class DistributionPlannerTest {
queryId.genPlanNodeId(),
new PartialPath("root.sg.d1"),
false,
- new String[] {"s1"},
- new TSDataType[] {TSDataType.INT32},
+ new String[]{"s1"},
+ new TSDataType[]{TSDataType.INT32},
1L,
- new Object[] {10},
+ new Object[]{10},
false);
insertRowNode1.setMeasurementSchemas(
- new MeasurementSchema[] {
- new MeasurementSchema("s1", TSDataType.INT32),
+ new MeasurementSchema[]{
+ new MeasurementSchema("s1", TSDataType.INT32),
});
InsertRowNode insertRowNode2 =
@@ -544,14 +573,14 @@ public class DistributionPlannerTest {
queryId.genPlanNodeId(),
new PartialPath("root.sg.d1"),
false,
- new String[] {"s1"},
- new TSDataType[] {TSDataType.INT32},
+ new String[]{"s1"},
+ new TSDataType[]{TSDataType.INT32},
100000L,
- new Object[] {10},
+ new Object[]{10},
false);
insertRowNode2.setMeasurementSchemas(
- new MeasurementSchema[] {
- new MeasurementSchema("s1", TSDataType.INT32),
+ new MeasurementSchema[]{
+ new MeasurementSchema("s1", TSDataType.INT32),
});
InsertRowsNode node = new InsertRowsNode(queryId.genPlanNodeId());
@@ -569,7 +598,7 @@ public class DistributionPlannerTest {
assertEquals(1, plan.getInstances().size());
}
- private Analysis constructAnalysis() {
+ private Analysis constructAnalysis() throws IllegalPathException {
SeriesPartitionExecutor executor =
SeriesPartitionExecutor.getSeriesPartitionExecutor(
@@ -689,6 +718,24 @@ public class DistributionPlannerTest {
analysis.setDataPartitionInfo(dataPartition);
+ // construct AggregationExpression for GroupByLevel
+ Map<String, Set<Expression>> aggregationExpression = new HashMap<>();
+ Set<Expression> s1Expression = new HashSet<>();
+ s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d1.s1")));
+ s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d22.s1")));
+ s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d333.s1")));
+ s1Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d4444.s1")));
+
+ Set<Expression> s2Expression = new HashSet<>();
+ s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d1.s2")));
+ s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d22.s2")));
+ s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d333.s2")));
+ s2Expression.add(new TimeSeriesOperand(new PartialPath("root.sg.d4444.s2")));
+
+ aggregationExpression.put("root.sg.*.s1", s1Expression);
+ aggregationExpression.put("root.sg.*.s2", s2Expression);
+ analysis.setAggregationExpressions(aggregationExpression);
+
// construct schema partition
SchemaPartition schemaPartition =
new SchemaPartition(