You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/27 04:04:02 UTC
[iotdb] 01/01: add groupbytime parameter in GroupByLevelNode
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/agg_groupbytime
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 04c4f0cd9c5bd2bf0c714b88d933a9a9dca6db32
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Fri May 27 12:03:47 2022 +0800
add groupbytime parameter in GroupByLevelNode
---
.../db/mpp/plan/planner/LogicalPlanBuilder.java | 35 ++++++++++---
.../iotdb/db/mpp/plan/planner/LogicalPlanner.java | 6 ++-
.../plan/node/process/GroupByLevelNode.java | 61 ++++++++++++++++------
.../db/mpp/plan/plan/QueryLogicalPlanUtil.java | 8 ++-
.../distribution/AggregationDistributionTest.java | 16 ++++--
.../node/process/GroupByLevelNodeSerdeTest.java | 4 +-
6 files changed, 101 insertions(+), 29 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index a22371f711..b6b94da6b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -245,12 +245,22 @@ public class LogicalPlanBuilder {
curStep = AggregationStep.FINAL;
this.root =
createGroupByTLevelNode(
- Collections.singletonList(this.getRoot()), groupByLevelExpressions, curStep);
+ Collections.singletonList(this.getRoot()),
+ groupByLevelExpressions,
+ curStep,
+ groupByTimeParameter,
+ scanOrder);
}
} else {
if (groupByLevelExpressions != null) {
curStep = AggregationStep.FINAL;
- this.root = createGroupByTLevelNode(sourceNodeList, groupByLevelExpressions, curStep);
+ this.root =
+ createGroupByTLevelNode(
+ sourceNodeList,
+ groupByLevelExpressions,
+ curStep,
+ groupByTimeParameter,
+ scanOrder);
}
}
} else {
@@ -305,14 +315,21 @@ public class LogicalPlanBuilder {
}
public LogicalPlanBuilder planGroupByLevel(
- Map<Expression, Set<Expression>> groupByLevelExpressions, AggregationStep curStep) {
+ Map<Expression, Set<Expression>> groupByLevelExpressions,
+ AggregationStep curStep,
+ GroupByTimeParameter groupByTimeParameter,
+ OrderBy scanOrder) {
if (groupByLevelExpressions == null) {
return this;
}
this.root =
createGroupByTLevelNode(
- Collections.singletonList(this.getRoot()), groupByLevelExpressions, curStep);
+ Collections.singletonList(this.getRoot()),
+ groupByLevelExpressions,
+ curStep,
+ groupByTimeParameter,
+ scanOrder);
return this;
}
@@ -378,7 +395,9 @@ public class LogicalPlanBuilder {
private PlanNode createGroupByTLevelNode(
List<PlanNode> children,
Map<Expression, Set<Expression>> groupByLevelExpressions,
- AggregationStep curStep) {
+ AggregationStep curStep,
+ GroupByTimeParameter groupByTimeParameter,
+ OrderBy scanOrder) {
List<GroupByLevelDescriptor> groupByLevelDescriptors = new ArrayList<>();
for (Expression groupedExpression : groupByLevelExpressions.keySet()) {
AggregationType aggregationFunction =
@@ -395,7 +414,11 @@ public class LogicalPlanBuilder {
groupedExpression.getExpressions().get(0)));
}
return new GroupByLevelNode(
- context.getQueryId().genPlanNodeId(), children, groupByLevelDescriptors);
+ context.getQueryId().genPlanNodeId(),
+ children,
+ groupByLevelDescriptors,
+ groupByTimeParameter,
+ scanOrder);
}
private PlanNode createAggregationScanNode(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
index ad1bccb1ee..e247423ca5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanner.java
@@ -242,7 +242,11 @@ public class LogicalPlanner {
if (queryStatement.isGroupByLevel()) {
curStep = AggregationStep.FINAL;
planBuilder =
- planBuilder.planGroupByLevel(analysis.getGroupByLevelExpressions(), curStep);
+ planBuilder.planGroupByLevel(
+ analysis.getGroupByLevelExpressions(),
+ curStep,
+ analysis.getGroupByTimeParameter(),
+ queryStatement.getResultOrder());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
index cecfa4cdee..7210bf449e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java
@@ -24,8 +24,12 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import javax.annotation.Nullable;
+
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -52,17 +56,33 @@ public class GroupByLevelNode extends MultiChildNode {
// each GroupByLevelDescriptor will be output as one or two column of result TsBlock
protected List<GroupByLevelDescriptor> groupByLevelDescriptors;
+ // The parameter of `group by time`.
+ // Its value will be null if there is no `group by time` clause.
+ @Nullable protected GroupByTimeParameter groupByTimeParameter;
+
+ protected OrderBy scanOrder;
+
public GroupByLevelNode(
PlanNodeId id,
List<PlanNode> children,
- List<GroupByLevelDescriptor> groupByLevelDescriptors) {
+ List<GroupByLevelDescriptor> groupByLevelDescriptors,
+ GroupByTimeParameter groupByTimeParameter,
+ OrderBy scanOrder) {
super(id, children);
this.groupByLevelDescriptors = groupByLevelDescriptors;
+ this.groupByTimeParameter = groupByTimeParameter;
+ this.scanOrder = scanOrder;
}
- public GroupByLevelNode(PlanNodeId id, List<GroupByLevelDescriptor> groupByLevelDescriptors) {
+ public GroupByLevelNode(
+ PlanNodeId id,
+ List<GroupByLevelDescriptor> groupByLevelDescriptors,
+ GroupByTimeParameter groupByTimeParameter,
+ OrderBy scanOrder) {
super(id);
this.groupByLevelDescriptors = groupByLevelDescriptors;
+ this.groupByTimeParameter = groupByTimeParameter;
+ this.scanOrder = scanOrder;
}
@Override
@@ -82,7 +102,8 @@ public class GroupByLevelNode extends MultiChildNode {
@Override
public PlanNode clone() {
- return new GroupByLevelNode(getPlanNodeId(), getGroupByLevelDescriptors());
+ return new GroupByLevelNode(
+ getPlanNodeId(), getGroupByLevelDescriptors(), this.groupByTimeParameter, this.scanOrder);
}
public List<GroupByLevelDescriptor> getGroupByLevelDescriptors() {
@@ -113,6 +134,13 @@ public class GroupByLevelNode extends MultiChildNode {
for (GroupByLevelDescriptor groupByLevelDescriptor : groupByLevelDescriptors) {
groupByLevelDescriptor.serialize(byteBuffer);
}
+ if (groupByTimeParameter == null) {
+ ReadWriteIOUtils.write((byte) 0, byteBuffer);
+ } else {
+ ReadWriteIOUtils.write((byte) 1, byteBuffer);
+ groupByTimeParameter.serialize(byteBuffer);
+ }
+ ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer);
}
public static GroupByLevelNode deserialize(ByteBuffer byteBuffer) {
@@ -122,28 +150,31 @@ public class GroupByLevelNode extends MultiChildNode {
groupByLevelDescriptors.add(GroupByLevelDescriptor.deserialize(byteBuffer));
descriptorSize--;
}
+ byte isNull = ReadWriteIOUtils.readByte(byteBuffer);
+ GroupByTimeParameter groupByTimeParameter = null;
+ if (isNull == 1) {
+ groupByTimeParameter = GroupByTimeParameter.deserialize(byteBuffer);
+ }
+ OrderBy scanOrder = OrderBy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new GroupByLevelNode(planNodeId, groupByLevelDescriptors);
+ return new GroupByLevelNode(
+ planNodeId, groupByLevelDescriptors, groupByTimeParameter, scanOrder);
}
@Override
public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
GroupByLevelNode that = (GroupByLevelNode) o;
- return Objects.equals(groupByLevelDescriptors, that.groupByLevelDescriptors);
+ return Objects.equals(groupByLevelDescriptors, that.groupByLevelDescriptors)
+ && Objects.equals(groupByTimeParameter, that.groupByTimeParameter)
+ && scanOrder == that.scanOrder;
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), groupByLevelDescriptors);
+ return Objects.hash(super.hashCode(), groupByLevelDescriptors, groupByTimeParameter, scanOrder);
}
public String toString() {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
index 46f7b07683..f4482ca102 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
@@ -626,7 +626,9 @@ public class QueryLogicalPlanUtil {
AggregationStep.FINAL,
Collections.singletonList(
new TimeSeriesOperand(schemaMap.get("root.sg.d2.a.s1"))),
- new TimeSeriesOperand(schemaMap.get("root.sg.*.*.s1")))));
+ new TimeSeriesOperand(schemaMap.get("root.sg.*.*.s1")))),
+ null,
+ OrderBy.TIMESTAMP_ASC);
OffsetNode offsetNode = new OffsetNode(queryId.genPlanNodeId(), groupByLevelNode, 100);
LimitNode limitNode = new LimitNode(queryId.genPlanNodeId(), offsetNode, 100);
@@ -857,7 +859,9 @@ public class QueryLogicalPlanUtil {
Arrays.asList(
new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")),
new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1"))),
- new TimeSeriesOperand(schemaMap.get("root.sg.*.s1")))));
+ new TimeSeriesOperand(schemaMap.get("root.sg.*.s1")))),
+ null,
+ OrderBy.TIMESTAMP_ASC);
OffsetNode offsetNode = new OffsetNode(queryId.genPlanNodeId(), groupByLevelNode, 100);
LimitNode limitNode = new LimitNode(queryId.genPlanNodeId(), offsetNode, 100);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
index befbeb9660..cfa79db920 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
@@ -168,7 +168,9 @@ public class AggregationDistributionTest {
Arrays.asList(
new TimeSeriesOperand(new PartialPath(d1s1Path)),
new TimeSeriesOperand(new PartialPath(d2s1Path))),
- new TimeSeriesOperand(new PartialPath(groupedPath)))));
+ new TimeSeriesOperand(new PartialPath(groupedPath)))),
+ null,
+ OrderBy.TIMESTAMP_ASC);
Analysis analysis = Util.constructAnalysis();
MPPQueryContext context =
new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
@@ -203,7 +205,9 @@ public class AggregationDistributionTest {
Arrays.asList(
new TimeSeriesOperand(new PartialPath(d3s1Path)),
new TimeSeriesOperand(new PartialPath(d4s1Path))),
- new TimeSeriesOperand(new PartialPath(groupedPath)))));
+ new TimeSeriesOperand(new PartialPath(groupedPath)))),
+ null,
+ OrderBy.TIMESTAMP_ASC);
Analysis analysis = Util.constructAnalysis();
MPPQueryContext context =
new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
@@ -254,7 +258,9 @@ public class AggregationDistributionTest {
AggregationType.COUNT,
AggregationStep.FINAL,
Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s2Path))),
- new TimeSeriesOperand(new PartialPath(groupedPathS2)))));
+ new TimeSeriesOperand(new PartialPath(groupedPathS2)))),
+ null,
+ OrderBy.TIMESTAMP_ASC);
Analysis analysis = Util.constructAnalysis();
MPPQueryContext context =
new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
@@ -311,7 +317,9 @@ public class AggregationDistributionTest {
AggregationType.COUNT,
AggregationStep.FINAL,
Collections.singletonList(new TimeSeriesOperand(new PartialPath(d1s2Path))),
- new TimeSeriesOperand(new PartialPath(groupedPathS2)))));
+ new TimeSeriesOperand(new PartialPath(groupedPathS2)))),
+ null,
+ OrderBy.TIMESTAMP_ASC);
Analysis analysis = Util.constructAnalysis();
MPPQueryContext context =
new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByLevelNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByLevelNodeSerdeTest.java
index ec6330eb85..862e8a9796 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByLevelNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/GroupByLevelNodeSerdeTest.java
@@ -88,7 +88,9 @@ public class GroupByLevelNodeSerdeTest {
Arrays.asList(
new TimeSeriesOperand(new PartialPath("root.sg.d1.s1")),
new TimeSeriesOperand(new PartialPath("root.sg.d2.s1"))),
- new TimeSeriesOperand(new PartialPath("root.sg.*.s1")))));
+ new TimeSeriesOperand(new PartialPath("root.sg.*.s1")))),
+ groupByTimeParameter,
+ OrderBy.TIMESTAMP_ASC);
ByteBuffer byteBuffer = ByteBuffer.allocate(2048);
groupByLevelNode.serialize(byteBuffer);