You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/27 04:04:02 UTC

[iotdb] 01/01: add groupbytime parameter in GroupByLevelNode

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/agg_groupbytime
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 04c4f0cd9c5bd2bf0c714b88d933a9a9dca6db32
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri May 27 12:03:47 2022 +0800

    add groupbytime parameter in GroupByLevelNode
---
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    | 35 ++++++++++---
 .../iotdb/db/mpp/plan/planner/LogicalPlanner.java  |  6 ++-
 .../plan/node/process/GroupByLevelNode.java        | 61 ++++++++++++++++------
 .../db/mpp/plan/plan/QueryLogicalPlanUtil.java     |  8 ++-
 .../distribution/AggregationDistributionTest.java  | 16 ++++--
 .../node/process/GroupByLevelNodeSerdeTest.java    |  4 +-
 6 files changed, 101 insertions(+), 29 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index a22371f711..b6b94da6b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -245,12 +245,22 @@ public class LogicalPlanBuilder {
           curStep = AggregationStep.FINAL;
           this.root =
               createGroupByTLevelNode(
-                  Collections.singletonList(this.getRoot()), groupByLevelExpressions, curStep);
+                  Collections.singletonList(this.getRoot()),
+                  groupByLevelExpressions,
+                  curStep,
+                  groupByTimeParameter,
+                  scanOrder);
         }
       } else {
         if (groupByLevelExpressions != null) {
           curStep = AggregationStep.FINAL;
-          this.root = createGroupByTLevelNode(sourceNodeList, groupByLevelExpressions, curStep);
+          this.root =
+              createGroupByTLevelNode(
+                  sourceNodeList,
+                  groupByLevelExpressions,
+                  curStep,
+                  groupByTimeParameter,
+                  scanOrder);
         }
       }
     } else {
@@ -305,14 +315,21 @@ public class LogicalPlanBuilder {
   }
 
   public LogicalPlanBuilder planGroupByLevel(
-      Map<Expression, Set<Expression>> groupByLevelExpressions, AggregationStep curStep) {
+      Map<Expression, Set<Expression>> groupByLevelExpressions,
+      AggregationStep curStep,
+      GroupByTimeParameter groupByTimeParameter,
+      OrderBy scanOrder) {
     if (groupByLevelExpressions == null) {
       return this;
     }
 
     this.root =
         createGroupByTLevelNode(
-            Collections.singletonList(this.getRoot()), groupByLevelExpressions, curStep);
+            Collections.singletonList(this.getRoot()),
+            groupByLevelExpressions,
+            curStep,
+            groupByTimeParameter,
+            scanOrder);
     return this;
   }
 
@@ -378,7 +395,9 @@ public class LogicalPlanBuilder {
   private PlanNode createGroupByTLevelNode(
       List<PlanNode> children,
       Map<Expression, Set<Expression>> groupByLevelExpressions,
-      AggregationStep curStep) {
+      AggregationStep curStep,
+      GroupByTimeParameter groupByTimeParameter,
+      OrderBy scanOrder) {
     List<GroupByLevelDescriptor> groupByLevelDescriptors = new ArrayList<>();
     for (Expression groupedExpression : groupByLevelExpressions.keySet()) {
       AggregationType aggregationFunction =
@@ -395,7 +414,11 @@ public class LogicalPlanBuilder {
               groupedExpression.getExpressions().get(0)));
     }
     return new GroupByLevelNode(
-        context.getQueryId().genPlanNodeId(), children, groupByLevelDescriptors);
+        context.getQueryId().genPlanNodeId(),
+        children,
+        groupByLevelDescriptors,
+        groupByTimeParameter,
+        scanOrder);
   }
 
   private PlanNode createAggregationScanNode(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index ad1bccb1ee..e247423ca5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -242,7 +242,11 @@ public class LogicalPlanner {
             if (queryStatement.isGroupByLevel()) {
               curStep = AggregationStep.FINAL;
               planBuilder =
-                  planBuilder.planGroupByLevel(analysis.getGroupByLevelExpressions(), curStep);
+                  planBuilder.planGroupByLevel(
+                      analysis.getGroupByLevelExpressions(),
+                      curStep,
+                      analysis.getGroupByTimeParameter(),
+                      queryStatement.getResultOrder());
             }
           }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
index cecfa4cdee..7210bf449e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
@@ -24,8 +24,12 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import javax.annotation.Nullable;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -52,17 +56,33 @@ public class GroupByLevelNode extends MultiChildNode {
   // each GroupByLevelDescriptor will be output as one or two column of result TsBlock
   protected List<GroupByLevelDescriptor> groupByLevelDescriptors;
 
+  // The parameter of `group by time`.
+  // Its value will be null if there is no `group by time` clause.
+  @Nullable protected GroupByTimeParameter groupByTimeParameter;
+
+  protected OrderBy scanOrder;
+
   public GroupByLevelNode(
       PlanNodeId id,
       List<PlanNode> children,
-      List<GroupByLevelDescriptor> groupByLevelDescriptors) {
+      List<GroupByLevelDescriptor> groupByLevelDescriptors,
+      GroupByTimeParameter groupByTimeParameter,
+      OrderBy scanOrder) {
     super(id, children);
     this.groupByLevelDescriptors = groupByLevelDescriptors;
+    this.groupByTimeParameter = groupByTimeParameter;
+    this.scanOrder = scanOrder;
   }
 
-  public GroupByLevelNode(PlanNodeId id, List<GroupByLevelDescriptor> groupByLevelDescriptors) {
+  public GroupByLevelNode(
+      PlanNodeId id,
+      List<GroupByLevelDescriptor> groupByLevelDescriptors,
+      GroupByTimeParameter groupByTimeParameter,
+      OrderBy scanOrder) {
     super(id);
     this.groupByLevelDescriptors = groupByLevelDescriptors;
+    this.groupByTimeParameter = groupByTimeParameter;
+    this.scanOrder = scanOrder;
   }
 
   @Override
@@ -82,7 +102,8 @@ public class GroupByLevelNode extends MultiChildNode {
 
   @Override
   public PlanNode clone() {
-    return new GroupByLevelNode(getPlanNodeId(), getGroupByLevelDescriptors());
+    return new GroupByLevelNode(
+        getPlanNodeId(), getGroupByLevelDescriptors(), this.groupByTimeParameter, this.scanOrder);
   }
 
   public List<GroupByLevelDescriptor> getGroupByLevelDescriptors() {
@@ -113,6 +134,13 @@ public class GroupByLevelNode extends MultiChildNode {
     for (GroupByLevelDescriptor groupByLevelDescriptor : groupByLevelDescriptors) {
       groupByLevelDescriptor.serialize(byteBuffer);
     }
+    if (groupByTimeParameter == null) {
+      ReadWriteIOUtils.write((byte) 0, byteBuffer);
+    } else {
+      ReadWriteIOUtils.write((byte) 1, byteBuffer);
+      groupByTimeParameter.serialize(byteBuffer);
+    }
+    ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer);
   }
 
   public static GroupByLevelNode deserialize(ByteBuffer byteBuffer) {
@@ -122,28 +150,31 @@ public class GroupByLevelNode extends MultiChildNode {
       groupByLevelDescriptors.add(GroupByLevelDescriptor.deserialize(byteBuffer));
       descriptorSize--;
     }
+    byte isNull = ReadWriteIOUtils.readByte(byteBuffer);
+    GroupByTimeParameter groupByTimeParameter = null;
+    if (isNull == 1) {
+      groupByTimeParameter = GroupByTimeParameter.deserialize(byteBuffer);
+    }
+    OrderBy scanOrder = OrderBy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new GroupByLevelNode(planNodeId, groupByLevelDescriptors);
+    return new GroupByLevelNode(
+        planNodeId, groupByLevelDescriptors, groupByTimeParameter, scanOrder);
   }
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    if (!super.equals(o)) {
-      return false;
-    }
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    if (!super.equals(o)) return false;
     GroupByLevelNode that = (GroupByLevelNode) o;
-    return Objects.equals(groupByLevelDescriptors, that.groupByLevelDescriptors);
+    return Objects.equals(groupByLevelDescriptors, that.groupByLevelDescriptors)
+        && Objects.equals(groupByTimeParameter, that.groupByTimeParameter)
+        && scanOrder == that.scanOrder;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), groupByLevelDescriptors);
+    return Objects.hash(super.hashCode(), groupByLevelDescriptors, groupByTimeParameter, scanOrder);
   }
 
   public String toString() {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
index 46f7b07683..f4482ca102 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
@@ -626,7 +626,9 @@ public class QueryLogicalPlanUtil {
                     AggregationStep.FINAL,
                     Collections.singletonList(
                         new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s1"))),
-                    new TimeSeriesOperand(schemaMap.get("root.sg.*.*.s1")))));
+                    new TimeSeriesOperand(schemaMap.get("root.sg.*.*.s1")))),
+            null,
+            OrderBy.TIMESTAMP_ASC);
 
     OffsetNode offsetNode = new OffsetNode(queryId.genPlanNodeId(), groupByLevelNode, 100);
     LimitNode limitNode = new LimitNode(queryId.genPlanNodeId(), offsetNode, 100);
@@ -857,7 +859,9 @@ public class QueryLogicalPlanUtil {
                     Arrays.asList(
                         new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")),
                         new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))),
-                    new TimeSeriesOperand(schemaMap.get("root.sg.*.s1")))));
+                    new TimeSeriesOperand(schemaMap.get("root.sg.*.s1")))),
+            null,
+            OrderBy.TIMESTAMP_ASC);
 
     OffsetNode offsetNode = new OffsetNode(queryId.genPlanNodeId(), groupByLevelNode, 100);
     LimitNode limitNode = new LimitNode(queryId.genPlanNodeId(), offsetNode, 100);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
index befbeb9660..cfa79db920 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
@@ -168,7 +168,9 @@ public class AggregationDistributionTest {
                     Arrays.asList(
                         new TimeSeriesOperand(new PartialPath(d1s1Path)),
                         new TimeSeriesOperand(new PartialPath(d2s1Path))),
-                    new TimeSeriesOperand(new PartialPath(groupedPath)))));
+                    new TimeSeriesOperand(new PartialPath(groupedPath)))),
+            null,
+            OrderBy.TIMESTAMP_ASC);
     Analysis analysis = Util.constructAnalysis();
     MPPQueryContext context =
         new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
@@ -203,7 +205,9 @@ public class AggregationDistributionTest {
                     Arrays.asList(
                         new TimeSeriesOperand(new PartialPath(d3s1Path)),
                         new TimeSeriesOperand(new PartialPath(d4s1Path))),
-                    new TimeSeriesOperand(new PartialPath(groupedPath)))));
+                    new TimeSeriesOperand(new PartialPath(groupedPath)))),
+            null,
+            OrderBy.TIMESTAMP_ASC);
     Analysis analysis = Util.constructAnalysis();
     MPPQueryContext context =
         new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
@@ -254,7 +258,9 @@ public class AggregationDistributionTest {
                     AggregationType.COUNT,
                     AggregationStep.FINAL,
                     Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s2Path))),
-                    new TimeSeriesOperand(new PartialPath(groupedPathS2)))));
+                    new TimeSeriesOperand(new PartialPath(groupedPathS2)))),
+            null,
+            OrderBy.TIMESTAMP_ASC);
     Analysis analysis = Util.constructAnalysis();
     MPPQueryContext context =
         new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
@@ -311,7 +317,9 @@ public class AggregationDistributionTest {
                     AggregationType.COUNT,
                     AggregationStep.FINAL,
                     Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s2Path))),
-                    new TimeSeriesOperand(new PartialPath(groupedPathS2)))));
+                    new TimeSeriesOperand(new PartialPath(groupedPathS2)))),
+            null,
+            OrderBy.TIMESTAMP_ASC);
     Analysis analysis = Util.constructAnalysis();
     MPPQueryContext context =
         new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByLevelNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByLevelNodeSerdeTest.java
index ec6330eb85..862e8a9796 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByLevelNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByLevelNodeSerdeTest.java
@@ -88,7 +88,9 @@ public class GroupByLevelNodeSerdeTest {
                     Arrays.asList(
                         new TimeSeriesOperand(new PartialPath("root.sg.d1.s1")),
                         new TimeSeriesOperand(new PartialPath("root.sg.d2.s1"))),
-                    new TimeSeriesOperand(new PartialPath("root.sg.*.s1")))));
+                    new TimeSeriesOperand(new PartialPath("root.sg.*.s1")))),
+            groupByTimeParameter,
+            OrderBy.TIMESTAMP_ASC);
 
     ByteBuffer byteBuffer = ByteBuffer.allocate(2048);
     groupByLevelNode.serialize(byteBuffer);