You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/18 17:40:47 UTC

[iotdb] 03/03: add test for timejoin aggregation

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/agg_distribution_plan
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6d86108f1655ca728f9cb05d04b69ec043ec2e83
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Thu May 19 01:40:30 2022 +0800

    add test for timejoin aggregation
---
 .../db/mpp/plan/planner/DistributionPlanner.java   |  21 +--
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java |   4 -
 .../planner/plan/node/process/AggregationNode.java |   4 +
 .../node/source/SeriesAggregationScanNode.java     |   6 +
 .../node/source/SeriesAggregationSourceNode.java   |   5 +-
 .../plan/parameter/AggregationDescriptor.java      |   3 +-
 .../db/mpp/plan/plan/DistributionPlannerTest.java  | 146 +++++++++++++++++++++
 7 files changed, 174 insertions(+), 15 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
index ff664d93fc..26ac5e456e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/DistributionPlanner.java
@@ -55,7 +55,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
-import org.checkerframework.checker.units.qual.A;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -412,8 +411,7 @@ public class DistributionPlanner {
     }
 
     private PlanNode planAggregationWithTimeJoin(
-        TimeJoinNode root,
-        DistributionPlanContext context) {
+        TimeJoinNode root, DistributionPlanContext context) {
 
       // Step 1: construct AggregationDescriptor for AggregationNode
       List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
@@ -421,7 +419,8 @@ public class DistributionPlanner {
       Map<PartialPath, Integer> regionCountPerSeries = new HashMap<>();
       for (PlanNode child : root.getChildren()) {
         SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode) child;
-        handle.getAggregationDescriptorList()
+        handle
+            .getAggregationDescriptorList()
             .forEach(
                 descriptor -> {
                   rootAggDescriptorList.add(
@@ -437,7 +436,10 @@ public class DistributionPlanner {
           split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
           split.setRegionReplicaSet(dataRegion);
           // Let each split reference different object of AggregationDescriptorList
-          split.setAggregationDescriptorList(handle.getAggregationDescriptorList().stream().map(AggregationDescriptor::deepClone).collect(Collectors.toList()));
+          split.setAggregationDescriptorList(
+              handle.getAggregationDescriptorList().stream()
+                  .map(AggregationDescriptor::deepClone)
+                  .collect(Collectors.toList()));
           sources.add(split);
         }
         regionCountPerSeries.put(handle.getPartitionPath(), dataDistribution.size());
@@ -446,13 +448,16 @@ public class DistributionPlanner {
       // Step 2: change the step for each SeriesAggregationSourceNode according to its split count
       for (SeriesAggregationSourceNode node : sources) {
         boolean isFinal = regionCountPerSeries.get(node.getPartitionPath()) == 1;
-        node.getAggregationDescriptorList().forEach(d -> d.setStep(isFinal ? AggregationStep.FINAL : AggregationStep.PARTIAL));
+        node.getAggregationDescriptorList()
+            .forEach(d -> d.setStep(isFinal ? AggregationStep.FINAL : AggregationStep.PARTIAL));
       }
 
       Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup =
           sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
 
-      AggregationNode aggregationNode = new AggregationNode(context.queryContext.getQueryId().genPlanNodeId(), rootAggDescriptorList);
+      AggregationNode aggregationNode =
+          new AggregationNode(
+              context.queryContext.getQueryId().genPlanNodeId(), rootAggDescriptorList);
 
       final boolean[] addParent = {false};
       sourceGroup.forEach(
@@ -605,7 +610,7 @@ public class DistributionPlanner {
       return processMultiChildNode(node, context);
     }
 
-    public PlanNode visitAggregation(AggregationNode node, NodeGroupContext context) {
+    public PlanNode visitRowBasedSeriesAggregate(AggregationNode node, NodeGroupContext context) {
       return processMultiChildNode(node, context);
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 73df5ae27c..4d552df27b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -74,10 +74,6 @@ public abstract class PlanVisitor<R, C> {
     return visitPlan(node, context);
   }
 
-  public R visitAggregation(AggregationNode node, C context) {
-    return visitPlan(node, context);
-  }
-
   public R visitAlignedSeriesScan(AlignedSeriesScanNode node, C context) {
     return visitPlan(node, context);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
index e51045fb97..8e9cb67948 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java
@@ -225,4 +225,8 @@ public class AggregationNode extends MultiChildNode {
     }
     return deduplicatedDescriptors;
   }
+
+  public String toString() {
+    return String.format("AggregationNode-%s", getPlanNodeId());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
index f30caee4c7..d9fff4f0ec 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationScanNode.java
@@ -286,4 +286,10 @@ public class SeriesAggregationScanNode extends SeriesAggregationSourceNode {
   public Filter getPartitionTimeFilter() {
     return timeFilter;
   }
+
+  public String toString() {
+    return String.format(
+        "SeriesAggregationScanNode-%s:[SeriesPath: %s, DataRegion: %s]",
+        this.getPlanNodeId(), this.getSeriesPath(), this.getRegionReplicaSet());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationSourceNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationSourceNode.java
index c7701848bc..f9e1200ea6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationSourceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/SeriesAggregationSourceNode.java
@@ -24,13 +24,14 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor
 
 import java.util.List;
 
-public abstract class SeriesAggregationSourceNode extends SeriesSourceNode{
+public abstract class SeriesAggregationSourceNode extends SeriesSourceNode {
 
   // The list of aggregate functions, each AggregateDescriptor will be output as one column in
   // result TsBlock
   protected List<AggregationDescriptor> aggregationDescriptorList;
 
-  public SeriesAggregationSourceNode(PlanNodeId id, List<AggregationDescriptor> aggregationDescriptorList) {
+  public SeriesAggregationSourceNode(
+      PlanNodeId id, List<AggregationDescriptor> aggregationDescriptorList) {
     super(id);
     this.aggregationDescriptorList = aggregationDescriptorList;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
index cc10983355..c6933fd19d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/AggregationDescriptor.java
@@ -123,7 +123,8 @@ public class AggregationDescriptor {
   }
 
   public AggregationDescriptor deepClone() {
-    return new AggregationDescriptor(this.getAggregationType(), this.step, this.getInputExpressions());
+    return new AggregationDescriptor(
+        this.getAggregationType(), this.step, this.getInputExpressions());
   }
 
   public void serialize(ByteBuffer byteBuffer) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
index 8283a2d80e..b465c8e6bd 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/DistributionPlannerTest.java
@@ -50,10 +50,15 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
@@ -61,6 +66,7 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -292,6 +298,120 @@ public class DistributionPlannerTest {
     assertEquals(3, plan.getInstances().size());
   }
 
+  @Test
+  public void testTimeJoinAggregationSinglePerRegion() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_query_time_join_aggregation");
+    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
+    String d1s1Path = "root.sg.d1.s1";
+    List<AggregationDescriptor> d1s1Descriptors = new ArrayList<>();
+    d1s1Descriptors.add(
+        new AggregationDescriptor(
+            AggregationType.COUNT,
+            AggregationStep.FINAL,
+            Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s1Path)))));
+    timeJoinNode.addChild(
+        new SeriesAggregationScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath(d1s1Path, TSDataType.INT32),
+            d1s1Descriptors));
+
+    String d22s1Path = "root.sg.d22.s1";
+    List<AggregationDescriptor> d22s1Descriptors = new ArrayList<>();
+    d22s1Descriptors.add(
+        new AggregationDescriptor(
+            AggregationType.COUNT,
+            AggregationStep.FINAL,
+            Collections.singletonList(new TimeSeriesOperand(new PartialPath(d22s1Path)))));
+    timeJoinNode.addChild(
+        new SeriesAggregationScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath(d22s1Path, TSDataType.INT32),
+            d22s1Descriptors));
+    Analysis analysis = constructAnalysis();
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(3, plan.getInstances().size());
+  }
+
+  @Test
+  public void testTimeJoinAggregationMultiPerRegion() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_query_time_join_aggregation");
+    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
+    String d1s1Path = "root.sg.d1.s1";
+    List<AggregationDescriptor> d1s1Descriptors = new ArrayList<>();
+    d1s1Descriptors.add(
+        new AggregationDescriptor(
+            AggregationType.COUNT,
+            AggregationStep.FINAL,
+            Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s1Path)))));
+    timeJoinNode.addChild(
+        new SeriesAggregationScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath(d1s1Path, TSDataType.INT32),
+            d1s1Descriptors));
+
+    String d333s1Path = "root.sg.d333.s1";
+    List<AggregationDescriptor> d22s1Descriptors = new ArrayList<>();
+    d1s1Descriptors.add(
+        new AggregationDescriptor(
+            AggregationType.COUNT,
+            AggregationStep.FINAL,
+            Collections.singletonList(new TimeSeriesOperand(new PartialPath(d333s1Path)))));
+    timeJoinNode.addChild(
+        new SeriesAggregationScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath(d333s1Path, TSDataType.INT32),
+            d22s1Descriptors));
+    Analysis analysis = constructAnalysis();
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(3, plan.getInstances().size());
+  }
+
+  @Test
+  public void testTimeJoinAggregationMultiPerRegion2() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_query_time_join_aggregation");
+    TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(), OrderBy.TIMESTAMP_ASC);
+    String d1s1Path = "root.sg.d333.s1";
+    List<AggregationDescriptor> d1s1Descriptors = new ArrayList<>();
+    d1s1Descriptors.add(
+        new AggregationDescriptor(
+            AggregationType.COUNT,
+            AggregationStep.FINAL,
+            Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s1Path)))));
+    timeJoinNode.addChild(
+        new SeriesAggregationScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath(d1s1Path, TSDataType.INT32),
+            d1s1Descriptors));
+
+    String d333s1Path = "root.sg.d4444.s1";
+    List<AggregationDescriptor> d22s1Descriptors = new ArrayList<>();
+    d1s1Descriptors.add(
+        new AggregationDescriptor(
+            AggregationType.COUNT,
+            AggregationStep.FINAL,
+            Collections.singletonList(new TimeSeriesOperand(new PartialPath(d333s1Path)))));
+    timeJoinNode.addChild(
+        new SeriesAggregationScanNode(
+            queryId.genPlanNodeId(),
+            new MeasurementPath(d333s1Path, TSDataType.INT32),
+            d22s1Descriptors));
+    Analysis analysis = constructAnalysis();
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, timeJoinNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(2, plan.getInstances().size());
+  }
+
   @Test
   public void testParallelPlanWithAlignedSeries() throws IllegalPathException {
     QueryId queryId = new QueryId("test_query_aligned");
@@ -433,6 +553,7 @@ public class DistributionPlannerTest {
     String device1 = "root.sg.d1";
     String device2 = "root.sg.d22";
     String device3 = "root.sg.d333";
+    String device4 = "root.sg.d4444";
 
     DataPartition dataPartition =
         new DataPartition(
@@ -506,9 +627,34 @@ public class DistributionPlannerTest {
     Map<TTimePartitionSlot, List<TRegionReplicaSet>> d3DataRegionMap = new HashMap<>();
     d3DataRegionMap.put(new TTimePartitionSlot(), d3DataRegions);
 
+    List<TRegionReplicaSet> d4DataRegions = new ArrayList<>();
+    d4DataRegions.add(
+        new TRegionReplicaSet(
+            new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
+            Arrays.asList(
+                new TDataNodeLocation()
+                    .setDataNodeId(11)
+                    .setExternalEndPoint(new TEndPoint("192.0.1.1", 9000)),
+                new TDataNodeLocation()
+                    .setDataNodeId(12)
+                    .setExternalEndPoint(new TEndPoint("192.0.1.2", 9000)))));
+    d4DataRegions.add(
+        new TRegionReplicaSet(
+            new TConsensusGroupId(TConsensusGroupType.DataRegion, 4),
+            Arrays.asList(
+                new TDataNodeLocation()
+                    .setDataNodeId(41)
+                    .setExternalEndPoint(new TEndPoint("192.0.4.1", 9000)),
+                new TDataNodeLocation()
+                    .setDataNodeId(42)
+                    .setExternalEndPoint(new TEndPoint("192.0.4.2", 9000)))));
+    Map<TTimePartitionSlot, List<TRegionReplicaSet>> d4DataRegionMap = new HashMap<>();
+    d4DataRegionMap.put(new TTimePartitionSlot(), d4DataRegions);
+
     sgPartitionMap.put(executor.getSeriesPartitionSlot(device1), d1DataRegionMap);
     sgPartitionMap.put(executor.getSeriesPartitionSlot(device2), d2DataRegionMap);
     sgPartitionMap.put(executor.getSeriesPartitionSlot(device3), d3DataRegionMap);
+    sgPartitionMap.put(executor.getSeriesPartitionSlot(device4), d4DataRegionMap);
 
     dataPartitionMap.put("root.sg", sgPartitionMap);