You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/01 12:31:32 UTC
[iotdb] 03/07: implement planner
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit dc53d22b5c07a13eb7c32952fe77b8855c1dd573
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Nov 1 10:53:50 2022 +0800
implement planner
---
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 15 +++
.../db/mpp/plan/planner/LogicalPlanBuilder.java | 16 +++
.../db/mpp/plan/planner/LogicalPlanVisitor.java | 18 +++
.../mpp/plan/planner/plan/node/PlanNodeType.java | 3 +-
.../planner/plan/node/process/WindowSplitNode.java | 134 +++++++++++++++++++++
5 files changed, 185 insertions(+), 1 deletion(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 1f29d600c7..6b5fc36659 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -1233,6 +1233,21 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
measurementPaths.stream().map(TimeSeriesOperand::new).collect(Collectors.toSet());
analysis.setSourceExpressions(sourceExpressions);
+ // set transform
+ if (fetchWindowSetStatement.getFunctionName() != null) {
+ String functionName = fetchWindowSetStatement.getFunctionName();
+ Set<Expression> sourceTransformExpressions =
+ sourceExpressions.stream()
+ .map(
+ expression ->
+ new FunctionExpression(
+ functionName,
+ new LinkedHashMap<>(),
+ Collections.singletonList(expression)))
+ .collect(Collectors.toSet());
+ analysis.setSourceTransformExpressions(sourceTransformExpressions);
+ }
+
// set output
List<ColumnHeader> columnHeaders =
measurementPaths.stream()
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index 9b0c800c27..a212cb6326 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -64,6 +64,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.WindowSplitNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
@@ -772,6 +773,10 @@ public class LogicalPlanBuilder {
public LogicalPlanBuilder planTransform(
Set<Expression> selectExpressions, boolean isGroupByTime, ZoneId zoneId, Ordering scanOrder) {
+ if (selectExpressions == null) {
+ return this;
+ }
+
boolean needTransform = false;
for (Expression expression : selectExpressions) {
if (ExpressionAnalyzer.checkIsNeedTransform(expression)) {
@@ -884,6 +889,17 @@ public class LogicalPlanBuilder {
return this;
}
+ public LogicalPlanBuilder planWindowSplit(
+ GroupByTimeParameter groupByTimeParameter, List<Integer> samplingIndexes) {
+ this.root =
+ new WindowSplitNode(
+ context.getQueryId().genPlanNodeId(),
+ this.getRoot(),
+ groupByTimeParameter,
+ samplingIndexes);
+ return this;
+ }
+
/** Meta Query* */
public LogicalPlanBuilder planTimeSeriesSchemaSource(
PartialPath pathPattern,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index cbbc9d4131..ef1d9b8502 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -41,7 +41,9 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.FetchWindowSetStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -66,6 +68,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathsUsingTemplateStatement;
+import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -287,6 +290,21 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
return false;
}
+ @Override
+ public PlanNode visitFetchWindowSet(
+ FetchWindowSetStatement fetchWindowSetStatement, MPPQueryContext context) {
+ LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
+ planBuilder
+ .planRawDataSource(analysis.getSourceExpressions(), Ordering.ASC, null)
+ .planTransform(
+ analysis.getSourceTransformExpressions(), true, ZoneId.systemDefault(), Ordering.ASC)
+ .planWindowSplit(
+ fetchWindowSetStatement.getGroupByTimeParameter(),
+ fetchWindowSetStatement.getSamplingIndexes());
+
+ return planBuilder.getRoot();
+ }
+
@Override
public PlanNode visitCreateTimeseries(
CreateTimeSeriesStatement createTimeSeriesStatement, MPPQueryContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index bad8725978..800459849b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -150,7 +150,8 @@ public enum PlanNodeType {
ROLLBACK_PRE_DEACTIVATE_TEMPLATE_NODE((short) 60),
DEACTIVATE_TEMPLATE_NODE((short) 61),
INTO((short) 62),
- DEVICE_VIEW_INTO((short) 63);
+ DEVICE_VIEW_INTO((short) 63),
+ WINDOW_SPLIT((short) 64);
public static final int BYTES = Short.BYTES;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/WindowSplitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/WindowSplitNode.java
new file mode 100644
index 0000000000..8ed9d404ef
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/WindowSplitNode.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
+
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class WindowSplitNode extends SingleChildProcessNode {
+
+ private final GroupByTimeParameter groupByTimeParameter;
+ private final List<Integer> samplingIndexes;
+
+ public WindowSplitNode(
+ PlanNodeId id,
+ PlanNode child,
+ GroupByTimeParameter groupByTimeParameter,
+ List<Integer> samplingIndexes) {
+ super(id, child);
+ this.groupByTimeParameter = groupByTimeParameter;
+ this.samplingIndexes = samplingIndexes;
+ }
+
+ public WindowSplitNode(
+ PlanNodeId id, GroupByTimeParameter groupByTimeParameter, List<Integer> samplingIndexes) {
+ super(id);
+ this.groupByTimeParameter = groupByTimeParameter;
+ this.samplingIndexes = samplingIndexes;
+ }
+
+ public GroupByTimeParameter getGroupByTimeParameter() {
+ return groupByTimeParameter;
+ }
+
+ public List<Integer> getSamplingIndexes() {
+ return samplingIndexes;
+ }
+
+ @Override
+ public PlanNode clone() {
+ return new WindowSplitNode(getPlanNodeId(), groupByTimeParameter, samplingIndexes);
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return child.getOutputColumnNames();
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.WINDOW_SPLIT.serialize(byteBuffer);
+ groupByTimeParameter.serialize(byteBuffer);
+ ReadWriteIOUtils.write(samplingIndexes.size(), byteBuffer);
+ for (Integer index : samplingIndexes) {
+ ReadWriteIOUtils.write(index, byteBuffer);
+ }
+ }
+
+ @Override
+ protected void serializeAttributes(DataOutputStream stream) throws IOException {
+ PlanNodeType.WINDOW_SPLIT.serialize(stream);
+ groupByTimeParameter.serialize(stream);
+ ReadWriteIOUtils.write(samplingIndexes.size(), stream);
+ for (Integer index : samplingIndexes) {
+ ReadWriteIOUtils.write(index, stream);
+ }
+ }
+
+ public static WindowSplitNode deserialize(ByteBuffer byteBuffer) {
+ GroupByTimeParameter groupByTimeParameter = GroupByTimeParameter.deserialize(byteBuffer);
+
+ int listSize = ReadWriteIOUtils.readInt(byteBuffer);
+ List<Integer> samplingIndexes = new ArrayList<>(listSize);
+ while (listSize > 0) {
+ samplingIndexes.add(ReadWriteIOUtils.readInt(byteBuffer));
+ listSize--;
+ }
+
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ return new WindowSplitNode(planNodeId, groupByTimeParameter, samplingIndexes);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ WindowSplitNode that = (WindowSplitNode) o;
+ return groupByTimeParameter.equals(that.groupByTimeParameter)
+ && samplingIndexes.equals(that.samplingIndexes);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), groupByTimeParameter, samplingIndexes);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("WindowSplitNode-%s", getPlanNodeId());
+ }
+}