You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/01 12:31:33 UTC

[iotdb] 04/07: implement operator

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e6c69abadc25230ada942ccec940dee9497451a2
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Nov 1 16:21:48 2022 +0800

    implement operator
---
 .../timerangeiterator/SampleWindowIterator.java    |  87 +++++++++++
 .../TimeRangeIteratorFactory.java                  |  11 ++
 .../operator/process/WindowSplitOperator.java      | 160 +++++++++++++++++++++
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  29 ++++
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java |   5 +
 .../statement/crud/FetchWindowSetStatement.java    |   7 +-
 6 files changed, 298 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java
new file mode 100644
index 0000000000..396d53d96b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation.timerangeiterator;
+
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+
+import java.util.List;
+
+public class SampleWindowIterator implements ITimeRangeIterator {
+
+  private final ITimeRangeIterator allTimeRangeIterator;
+  private final List<Integer> samplingIndexes;
+
+  private int sampleIndex = 0;
+  private int timeRangeIndex = 0;
+
+  private TimeRange curTimeRange;
+
+  public SampleWindowIterator(
+      long startTime,
+      long endTime,
+      long interval,
+      long slidingStep,
+      List<Integer> samplingIndexes) {
+    this.samplingIndexes = samplingIndexes;
+    this.allTimeRangeIterator =
+        TimeRangeIteratorFactory.getTimeRangeIterator(
+            startTime, endTime, interval, slidingStep, true, false, false, true, false);
+  }
+
+  @Override
+  public TimeRange getFirstTimeRange() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean hasNextTimeRange() {
+    return sampleIndex < samplingIndexes.size();
+  }
+
+  @Override
+  public TimeRange nextTimeRange() {
+    while (allTimeRangeIterator.hasNextTimeRange()) {
+      TimeRange timeRange = allTimeRangeIterator.nextTimeRange();
+      if (timeRangeIndex == samplingIndexes.get(sampleIndex)) {
+        curTimeRange = timeRange;
+        timeRangeIndex++;
+        sampleIndex++;
+        break;
+      }
+      timeRangeIndex++;
+    }
+    return curTimeRange;
+  }
+
+  @Override
+  public boolean isAscending() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long currentOutputTime() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getTotalIntervalNum() {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java
index a86a513936..cd0ac8a08e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.mpp.aggregation.timerangeiterator;
 
+import java.util.List;
+
 import static org.apache.iotdb.db.qp.utils.DateTimeUtils.MS_TO_MONTH;
 
 public class TimeRangeIteratorFactory {
@@ -72,4 +74,13 @@ public class TimeRangeIteratorFactory {
           leftCRightO);
     }
   }
+
+  public static ITimeRangeIterator getSampleTimeRangeIterator(
+      long startTime,
+      long endTime,
+      long interval,
+      long slidingStep,
+      List<Integer> samplingIndexes) {
+    return new SampleWindowIterator(startTime, endTime, interval, slidingStep, samplingIndexes);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
new file mode 100644
index 0000000000..d8d4ebc1f7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockUtil;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.List;
+
+public class WindowSplitOperator implements ProcessOperator {
+
+  protected final OperatorContext operatorContext;
+
+  protected final Operator child;
+  protected TsBlock inputTsBlock;
+  protected boolean canCallNext;
+
+  private final ITimeRangeIterator sampleTimeRangeIterator;
+  private TimeRange curTimeRange;
+
+  private final TsBlockBuilder resultTsBlockBuilder;
+
+  public WindowSplitOperator(
+      OperatorContext operatorContext,
+      Operator child,
+      ITimeRangeIterator sampleTimeRangeIterator,
+      List<TSDataType> outputDataTypes) {
+    this.operatorContext = operatorContext;
+    this.child = child;
+    this.sampleTimeRangeIterator = sampleTimeRangeIterator;
+    this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    return child.isBlocked();
+  }
+
+  @Override
+  public TsBlock next() {
+    // reset operator state
+    canCallNext = true;
+
+    if (curTimeRange == null && sampleTimeRangeIterator.hasNextTimeRange()) {
+      // move to next time window
+      curTimeRange = sampleTimeRangeIterator.nextTimeRange();
+    }
+
+    if (!fetchData()) {
+      return null;
+    } else {
+      curTimeRange = null;
+      TsBlock resultTsBlock = resultTsBlockBuilder.build();
+      resultTsBlockBuilder.reset();
+      return resultTsBlock;
+    }
+  }
+
+  private boolean fetchData() {
+    while (!consumeInput()) {
+      // NOTE: child.next() can only be invoked once
+      if (child.hasNext() && canCallNext) {
+        inputTsBlock = child.next();
+        canCallNext = false;
+      } else {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean consumeInput() {
+    if (inputTsBlock == null) {
+      return false;
+    }
+
+    inputTsBlock = TsBlockUtil.skipPointsOutOfTimeRange(inputTsBlock, curTimeRange, true);
+    if (inputTsBlock == null) {
+      return false;
+    }
+
+    for (int readIndex = 0; readIndex < inputTsBlock.getPositionCount(); readIndex++) {
+      long time = inputTsBlock.getTimeByIndex(readIndex);
+      if (curTimeRange.contains(time)) {
+        writeData(readIndex);
+      } else {
+        inputTsBlock = inputTsBlock.subTsBlock(readIndex);
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private void writeData(int readIndex) {
+    TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
+    timeColumnBuilder.writeLong(inputTsBlock.getTimeByIndex(readIndex));
+    ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
+    for (int columnIndex = 0; columnIndex < columnBuilders.length; columnIndex++) {
+      columnBuilders[columnIndex].write(inputTsBlock.getColumn(columnIndex), readIndex);
+    }
+    resultTsBlockBuilder.declarePosition();
+  }
+
+  @Override
+  public boolean hasNext() {
+    return curTimeRange != null || sampleTimeRangeIterator.hasNextTimeRange();
+  }
+
+  @Override
+  public boolean isFinished() {
+    return !this.hasNext();
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return 0;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return 0;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index a02cbd773a..4c69cd5720 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.aggregation.slidingwindow.SlidingWindowAggregatorFactory;
 import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.TimeRangeIteratorFactory;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.common.NodeRef;
 import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
@@ -53,6 +54,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOper
 import org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TagAggregationOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.WindowSplitOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
 import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BinaryConstantFill;
@@ -147,6 +149,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggre
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.WindowSplitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
@@ -1575,6 +1578,32 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     return child;
   }
 
+  @Override
+  public Operator visitWindowSplit(WindowSplitNode node, LocalExecutionPlanContext context) {
+    Operator child = node.getChild().accept(this, context);
+    OperatorContext operatorContext =
+        context
+            .getInstanceContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                WindowSplitOperator.class.getSimpleName());
+
+    GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+    ITimeRangeIterator timeRangeIterator =
+        TimeRangeIteratorFactory.getSampleTimeRangeIterator(
+            groupByTimeParameter.getStartTime(),
+            groupByTimeParameter.getEndTime(),
+            groupByTimeParameter.getInterval(),
+            groupByTimeParameter.getSlidingStep(),
+            node.getSamplingIndexes());
+
+    List<TSDataType> outputDataTypes = getOutputColumnTypes(node, context.getTypeProvider());
+
+    context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+    return new WindowSplitOperator(operatorContext, child, timeRangeIterator, outputDataTypes);
+  }
+
   @Override
   public Operator visitSchemaFetchMerge(
       SchemaFetchMergeNode node, LocalExecutionPlanContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 788649ffc8..6a064cd90d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggre
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.WindowSplitNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
@@ -326,4 +327,8 @@ public abstract class PlanVisitor<R, C> {
   public R visitDeviceViewInto(DeviceViewIntoNode node, C context) {
     return visitPlan(node, context);
   }
+
+  public R visitWindowSplit(WindowSplitNode node, C context) {
+    return visitPlan(node, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FetchWindowSetStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FetchWindowSetStatement.java
index fd9dc9a1e0..fa1b815cda 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FetchWindowSetStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FetchWindowSetStatement.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.plan.statement.crud;
 
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.mpp.plan.constant.StatementType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
@@ -81,5 +82,9 @@ public class FetchWindowSetStatement extends Statement {
     return visitor.visitFetchWindowSet(this, context);
   }
 
-  public void semanticCheck() {}
+  public void semanticCheck() {
+    if (groupByTimeParameter.hasOverlap()) {
+      throw new SemanticException("");
+    }
+  }
 }