You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/18 17:40:47 UTC
[iotdb] 03/03: add test for timejoin aggregation
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/agg_distribution_plan
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6d86108f1655ca728f9cb05d04b69ec043ec2e83
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu May 19 01:40:30 2022 +0800
add test for timejoin aggregation
---
.../db/mpp/plan/planner/DistributionPlanner.java | 21 +--
.../db/mpp/plan/planner/plan/node/PlanVisitor.java | 4 -
.../planner/plan/node/process/AggregationNode.java | 4 +
.../node/source/SeriesAggregationScanNode.java | 6 +
.../node/source/SeriesAggregationSourceNode.java | 5 +-
.../plan/parameter/AggregationDescriptor.java | 3 +-
.../db/mpp/plan/plan/DistributionPlannerTest.java | 146 +++++++++++++++++++++
7 files changed, 174 insertions(+), 15 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
index ff664d93fc..26ac5e456e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
@@ -55,7 +55,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
-import org.checkerframework.checker.units.qual.A;
import java.util.ArrayList;
import java.util.Collections;
@@ -412,8 +411,7 @@ public class DistributionPlanner {
}
private PlanNode planAggregationWithTimeJoin(
- TimeJoinNode root,
- DistributionPlanContext context) {
+ TimeJoinNode root, DistributionPlanContext context) {
// Step 1: construct AggregationDescriptor for AggregationNode
List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
@@ -421,7 +419,8 @@ public class DistributionPlanner {
Map<PartialPath, Integer> regionCountPerSeries = new HashMap<>();
for (PlanNode child : root.getChildren()) {
SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode) child;
- handle.getAggregationDescriptorList()
+ handle
+ .getAggregationDescriptorList()
.forEach(
descriptor -> {
rootAggDescriptorList.add(
@@ -437,7 +436,10 @@ public class DistributionPlanner {
split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
split.setRegionReplicaSet(dataRegion);
// Let each split reference different object of AggregationDescriptorList
- split.setAggregationDescriptorList(handle.getAggregationDescriptorList().stream().map(AggregationDescriptor::deepClone).collect(Collectors.toList()));
+ split.setAggregationDescriptorList(
+ handle.getAggregationDescriptorList().stream()
+ .map(AggregationDescriptor::deepClone)
+ .collect(Collectors.toList()));
sources.add(split);
}
regionCountPerSeries.put(handle.getPartitionPath(), dataDistribution.size());
@@ -446,13 +448,16 @@ public class DistributionPlanner {
// Step 2: change the step for each SeriesAggregationSourceNode according to its split count
for (SeriesAggregationSourceNode node : sources) {
boolean isFinal = regionCountPerSeries.get(node.getPartitionPath()) == 1;
- node.getAggregationDescriptorList().forEach(d -> d.setStep(isFinal ? AggregationStep.FINAL : AggregationStep.PARTIAL));
+ node.getAggregationDescriptorList()
+ .forEach(d -> d.setStep(isFinal ? AggregationStep.FINAL : AggregationStep.PARTIAL));
}
Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup =
sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
- AggregationNode aggregationNode = new AggregationNode(context.queryContext.getQueryId().genPlanNodeId(), rootAggDescriptorList);
+ AggregationNode aggregationNode =
+ new AggregationNode(
+ context.queryContext.getQueryId().genPlanNodeId(), rootAggDescriptorList);
final boolean[] addParent = {false};
sourceGroup.forEach(
@@ -605,7 +610,7 @@ public class DistributionPlanner {
return processMultiChildNode(node, context);
}
- public PlanNode visitAggregation(AggregationNode node, NodeGroupContext context) {
+ public PlanNode visitRowBasedSeriesAggregate(AggregationNode node, NodeGroupContext context) {
return processMultiChildNode(node, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 73df5ae27c..4d552df27b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -74,10 +74,6 @@ public abstract class PlanVisitor<R, C> {
return visitPlan(node, context);
}
- public R visitAggregation(AggregationNode node, C context) {
- return visitPlan(node, context);
- }
-
public R visitAlignedSeriesScan(AlignedSeriesScanNode node, C context) {
return visitPlan(node, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
index e51045fb97..8e9cb67948 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
@@ -225,4 +225,8 @@ public class AggregationNode extends MultiChildNode {
}
return deduplicatedDescriptors;
}
+
+ public String toString() {
+ return String.format("AggregationNode-%s", getPlanNodeId());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
index f30caee4c7..d9fff4f0ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
@@ -286,4 +286,10 @@ public class SeriesAggregationScanNode extends SeriesAggregationSourceNode {
public Filter getPartitionTimeFilter() {
return timeFilter;
}
+
+ public String toString() {
+ return String.format(
+ "SeriesAggregationScanNode-%s:[SeriesPath: %s, DataRegion: %s]",
+ this.getPlanNodeId(), this.getSeriesPath(), this.getRegionReplicaSet());
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationSourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationSourceNode.java
index c7701848bc..f9e1200ea6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationSourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationSourceNode.java
@@ -24,13 +24,14 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor
import java.util.List;
-public abstract class SeriesAggregationSourceNode extends SeriesSourceNode{
+public abstract class SeriesAggregationSourceNode extends SeriesSourceNode {
// The list of aggregate functions, each AggregateDescriptor will be output as one column in
// result TsBlock
protected List<AggregationDescriptor> aggregationDescriptorList;
- public SeriesAggregationSourceNode(PlanNodeId id, List<AggregationDescriptor> aggregationDescriptorList) {
+ public SeriesAggregationSourceNode(
+ PlanNodeId id, List<AggregationDescriptor> aggregationDescriptorList) {
super(id);
this.aggregationDescriptorList = aggregationDescriptorList;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
index cc10983355..c6933fd19d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
@@ -123,7 +123,8 @@ public class AggregationDescriptor {
}
public AggregationDescriptor deepClone() {
- return new AggregationDescriptor(this.getAggregationType(), this.step, this.getInputExpressions());
+ return new AggregationDescriptor(
+ this.getAggregationType(), this.step, this.getInputExpressions());
}
public void serialize(ByteBuffer byteBuffer) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
index 8283a2d80e..b465c8e6bd 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
@@ -50,10 +50,15 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -61,6 +66,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -292,6 +298,120 @@ public class DistributionPlannerTest {
assertEquals(3, plan.getInstances().size());
}
+ @Test
+ public void testTimeJoinAggregationSinglePerRegion() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_query_time_join_aggregation");
+ TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
+ String d1s1Path = "root.sg.d1.s1";
+ List<AggregationDescriptor> d1s1Descriptors = new ArrayList<>();
+ d1s1Descriptors.add(
+ new AggregationDescriptor(
+ AggregationType.COUNT,
+ AggregationStep.FINAL,
+ Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s1Path)))));
+ timeJoinNode.addChild(
+ new SeriesAggregationScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath(d1s1Path, TSDataType.INT32),
+ d1s1Descriptors));
+
+ String d22s1Path = "root.sg.d22.s1";
+ List<AggregationDescriptor> d22s1Descriptors = new ArrayList<>();
+ d22s1Descriptors.add(
+ new AggregationDescriptor(
+ AggregationType.COUNT,
+ AggregationStep.FINAL,
+ Collections.singletonList(new TimeSeriesOperand(new PartialPath(d22s1Path)))));
+ timeJoinNode.addChild(
+ new SeriesAggregationScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath(d22s1Path, TSDataType.INT32),
+ d22s1Descriptors));
+ Analysis analysis = constructAnalysis();
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(3, plan.getInstances().size());
+ }
+
+ @Test
+ public void testTimeJoinAggregationMultiPerRegion() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_query_time_join_aggregation");
+ TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
+ String d1s1Path = "root.sg.d1.s1";
+ List<AggregationDescriptor> d1s1Descriptors = new ArrayList<>();
+ d1s1Descriptors.add(
+ new AggregationDescriptor(
+ AggregationType.COUNT,
+ AggregationStep.FINAL,
+ Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s1Path)))));
+ timeJoinNode.addChild(
+ new SeriesAggregationScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath(d1s1Path, TSDataType.INT32),
+ d1s1Descriptors));
+
+ String d333s1Path = "root.sg.d333.s1";
+ List<AggregationDescriptor> d22s1Descriptors = new ArrayList<>();
+ d1s1Descriptors.add(
+ new AggregationDescriptor(
+ AggregationType.COUNT,
+ AggregationStep.FINAL,
+ Collections.singletonList(new TimeSeriesOperand(new PartialPath(d333s1Path)))));
+ timeJoinNode.addChild(
+ new SeriesAggregationScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath(d333s1Path, TSDataType.INT32),
+ d22s1Descriptors));
+ Analysis analysis = constructAnalysis();
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(3, plan.getInstances().size());
+ }
+
+ @Test
+ public void testTimeJoinAggregationMultiPerRegion2() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_query_time_join_aggregation");
+ TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
+ String d1s1Path = "root.sg.d333.s1";
+ List<AggregationDescriptor> d1s1Descriptors = new ArrayList<>();
+ d1s1Descriptors.add(
+ new AggregationDescriptor(
+ AggregationType.COUNT,
+ AggregationStep.FINAL,
+ Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s1Path)))));
+ timeJoinNode.addChild(
+ new SeriesAggregationScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath(d1s1Path, TSDataType.INT32),
+ d1s1Descriptors));
+
+ String d333s1Path = "root.sg.d4444.s1";
+ List<AggregationDescriptor> d22s1Descriptors = new ArrayList<>();
+ d1s1Descriptors.add(
+ new AggregationDescriptor(
+ AggregationType.COUNT,
+ AggregationStep.FINAL,
+ Collections.singletonList(new TimeSeriesOperand(new PartialPath(d333s1Path)))));
+ timeJoinNode.addChild(
+ new SeriesAggregationScanNode(
+ queryId.genPlanNodeId(),
+ new MeasurementPath(d333s1Path, TSDataType.INT32),
+ d22s1Descriptors));
+ Analysis analysis = constructAnalysis();
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(2, plan.getInstances().size());
+ }
+
@Test
public void testParallelPlanWithAlignedSeries() throws IllegalPathException {
QueryId queryId = new QueryId("test_query_aligned");
@@ -433,6 +553,7 @@ public class DistributionPlannerTest {
String device1 = "root.sg.d1";
String device2 = "root.sg.d22";
String device3 = "root.sg.d333";
+ String device4 = "root.sg.d4444";
DataPartition dataPartition =
new DataPartition(
@@ -506,9 +627,34 @@ public class DistributionPlannerTest {
Map<TTimePartitionSlot, List<TRegionReplicaSet>> d3DataRegionMap = new HashMap<>();
d3DataRegionMap.put(new TTimePartitionSlot(), d3DataRegions);
+ List<TRegionReplicaSet> d4DataRegions = new ArrayList<>();
+ d4DataRegions.add(
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
+ Arrays.asList(
+ new TDataNodeLocation()
+ .setDataNodeId(11)
+ .setExternalEndPoint(new TEndPoint("192.0.1.1", 9000)),
+ new TDataNodeLocation()
+ .setDataNodeId(12)
+ .setExternalEndPoint(new TEndPoint("192.0.1.2", 9000)))));
+ d4DataRegions.add(
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 4),
+ Arrays.asList(
+ new TDataNodeLocation()
+ .setDataNodeId(41)
+ .setExternalEndPoint(new TEndPoint("192.0.4.1", 9000)),
+ new TDataNodeLocation()
+ .setDataNodeId(42)
+ .setExternalEndPoint(new TEndPoint("192.0.4.2", 9000)))));
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>> d4DataRegionMap = new HashMap<>();
+ d4DataRegionMap.put(new TTimePartitionSlot(), d4DataRegions);
+
sgPartitionMap.put(executor.getSeriesPartitionSlot(device1), d1DataRegionMap);
sgPartitionMap.put(executor.getSeriesPartitionSlot(device2), d2DataRegionMap);
sgPartitionMap.put(executor.getSeriesPartitionSlot(device3), d3DataRegionMap);
+ sgPartitionMap.put(executor.getSeriesPartitionSlot(device4), d4DataRegionMap);
dataPartitionMap.put("root.sg", sgPartitionMap);