You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/01 12:31:32 UTC

[iotdb] 03/07: implement planner

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit dc53d22b5c07a13eb7c32952fe77b8855c1dd573
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Nov 1 10:53:50 2022 +0800

    implement planner
---
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  15 +++
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    |  16 +++
 .../db/mpp/plan/planner/LogicalPlanVisitor.java    |  18 +++
 .../mpp/plan/planner/plan/node/PlanNodeType.java   |   3 +-
 .../planner/plan/node/process/WindowSplitNode.java | 134 +++++++++++++++++++++
 5 files changed, 185 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 1f29d600c7..6b5fc36659 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -1233,6 +1233,21 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
         measurementPaths.stream().map(TimeSeriesOperand::new).collect(Collectors.toSet());
     analysis.setSourceExpressions(sourceExpressions);
 
+    // set transform
+    if (fetchWindowSetStatement.getFunctionName() != null) {
+      String functionName = fetchWindowSetStatement.getFunctionName();
+      Set<Expression> sourceTransformExpressions =
+          sourceExpressions.stream()
+              .map(
+                  expression ->
+                      new FunctionExpression(
+                          functionName,
+                          new LinkedHashMap<>(),
+                          Collections.singletonList(expression)))
+              .collect(Collectors.toSet());
+      analysis.setSourceTransformExpressions(sourceTransformExpressions);
+    }
+
     // set output
     List<ColumnHeader> columnHeaders =
         measurementPaths.stream()
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index 9b0c800c27..a212cb6326 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -64,6 +64,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.WindowSplitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
@@ -772,6 +773,10 @@ public class LogicalPlanBuilder {
 
   public LogicalPlanBuilder planTransform(
       Set<Expression> selectExpressions, boolean isGroupByTime, ZoneId zoneId, Ordering scanOrder) {
+    if (selectExpressions == null) {
+      return this;
+    }
+
     boolean needTransform = false;
     for (Expression expression : selectExpressions) {
       if (ExpressionAnalyzer.checkIsNeedTransform(expression)) {
@@ -884,6 +889,17 @@ public class LogicalPlanBuilder {
     return this;
   }
 
+  public LogicalPlanBuilder planWindowSplit(
+      GroupByTimeParameter groupByTimeParameter, List<Integer> samplingIndexes) {
+    this.root =
+        new WindowSplitNode(
+            context.getQueryId().genPlanNodeId(),
+            this.getRoot(),
+            groupByTimeParameter,
+            samplingIndexes);
+    return this;
+  }
+
   /** Meta Query* */
   public LogicalPlanBuilder planTimeSeriesSchemaSource(
       PartialPath pathPattern,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index cbbc9d4131..ef1d9b8502 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -41,7 +41,9 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
 import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
 import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
 import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.FetchWindowSetStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -66,6 +68,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathsUsingTemplateStatement;
 
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -287,6 +290,21 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
     return false;
   }
 
+  @Override
+  public PlanNode visitFetchWindowSet(
+      FetchWindowSetStatement fetchWindowSetStatement, MPPQueryContext context) {
+    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
+    planBuilder
+        .planRawDataSource(analysis.getSourceExpressions(), Ordering.ASC, null)
+        .planTransform(
+            analysis.getSourceTransformExpressions(), true, ZoneId.systemDefault(), Ordering.ASC)
+        .planWindowSplit(
+            fetchWindowSetStatement.getGroupByTimeParameter(),
+            fetchWindowSetStatement.getSamplingIndexes());
+
+    return planBuilder.getRoot();
+  }
+
   @Override
   public PlanNode visitCreateTimeseries(
       CreateTimeSeriesStatement createTimeSeriesStatement, MPPQueryContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index bad8725978..800459849b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -150,7 +150,8 @@ public enum PlanNodeType {
   ROLLBACK_PRE_DEACTIVATE_TEMPLATE_NODE((short) 60),
   DEACTIVATE_TEMPLATE_NODE((short) 61),
   INTO((short) 62),
-  DEVICE_VIEW_INTO((short) 63);
+  DEVICE_VIEW_INTO((short) 63),
+  WINDOW_SPLIT((short) 64);
 
   public static final int BYTES = Short.BYTES;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/WindowSplitNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/WindowSplitNode.java
new file mode 100644
index 0000000000..8ed9d404ef
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/WindowSplitNode.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
+
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class WindowSplitNode extends SingleChildProcessNode {
+
+  private final GroupByTimeParameter groupByTimeParameter;
+  private final List<Integer> samplingIndexes;
+
+  public WindowSplitNode(
+      PlanNodeId id,
+      PlanNode child,
+      GroupByTimeParameter groupByTimeParameter,
+      List<Integer> samplingIndexes) {
+    super(id, child);
+    this.groupByTimeParameter = groupByTimeParameter;
+    this.samplingIndexes = samplingIndexes;
+  }
+
+  public WindowSplitNode(
+      PlanNodeId id, GroupByTimeParameter groupByTimeParameter, List<Integer> samplingIndexes) {
+    super(id);
+    this.groupByTimeParameter = groupByTimeParameter;
+    this.samplingIndexes = samplingIndexes;
+  }
+
+  public GroupByTimeParameter getGroupByTimeParameter() {
+    return groupByTimeParameter;
+  }
+
+  public List<Integer> getSamplingIndexes() {
+    return samplingIndexes;
+  }
+
+  @Override
+  public PlanNode clone() {
+    return new WindowSplitNode(getPlanNodeId(), groupByTimeParameter, samplingIndexes);
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return child.getOutputColumnNames();
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.WINDOW_SPLIT.serialize(byteBuffer);
+    groupByTimeParameter.serialize(byteBuffer);
+    ReadWriteIOUtils.write(samplingIndexes.size(), byteBuffer);
+    for (Integer index : samplingIndexes) {
+      ReadWriteIOUtils.write(index, byteBuffer);
+    }
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws IOException {
+    PlanNodeType.WINDOW_SPLIT.serialize(stream);
+    groupByTimeParameter.serialize(stream);
+    ReadWriteIOUtils.write(samplingIndexes.size(), stream);
+    for (Integer index : samplingIndexes) {
+      ReadWriteIOUtils.write(index, stream);
+    }
+  }
+
+  public static WindowSplitNode deserialize(ByteBuffer byteBuffer) {
+    GroupByTimeParameter groupByTimeParameter = GroupByTimeParameter.deserialize(byteBuffer);
+
+    int listSize = ReadWriteIOUtils.readInt(byteBuffer);
+    List<Integer> samplingIndexes = new ArrayList<>(listSize);
+    while (listSize > 0) {
+      samplingIndexes.add(ReadWriteIOUtils.readInt(byteBuffer));
+      listSize--;
+    }
+
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new WindowSplitNode(planNodeId, groupByTimeParameter, samplingIndexes);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    WindowSplitNode that = (WindowSplitNode) o;
+    return groupByTimeParameter.equals(that.groupByTimeParameter)
+        && samplingIndexes.equals(that.samplingIndexes);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(super.hashCode(), groupByTimeParameter, samplingIndexes);
+  }
+
+  @Override
+  public String toString() {
+    return String.format("WindowSplitNode-%s", getPlanNodeId());
+  }
+}