You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/01 12:31:33 UTC
[iotdb] 04/07: implement operator
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e6c69abadc25230ada942ccec940dee9497451a2
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Nov 1 16:21:48 2022 +0800
implement operator
---
.../timerangeiterator/SampleWindowIterator.java | 87 +++++++++++
.../TimeRangeIteratorFactory.java | 11 ++
.../operator/process/WindowSplitOperator.java | 160 +++++++++++++++++++++
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 29 ++++
.../db/mpp/plan/planner/plan/node/PlanVisitor.java | 5 +
.../statement/crud/FetchWindowSetStatement.java | 7 +-
6 files changed, 298 insertions(+), 1 deletion(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java
new file mode 100644
index 0000000000..396d53d96b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/SampleWindowIterator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.aggregation.timerangeiterator;
+
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+
+import java.util.List;
+
+public class SampleWindowIterator implements ITimeRangeIterator {
+
+ private final ITimeRangeIterator allTimeRangeIterator;
+ private final List<Integer> samplingIndexes;
+
+ private int sampleIndex = 0;
+ private int timeRangeIndex = 0;
+
+ private TimeRange curTimeRange;
+
+ public SampleWindowIterator(
+ long startTime,
+ long endTime,
+ long interval,
+ long slidingStep,
+ List<Integer> samplingIndexes) {
+ this.samplingIndexes = samplingIndexes;
+ this.allTimeRangeIterator =
+ TimeRangeIteratorFactory.getTimeRangeIterator(
+ startTime, endTime, interval, slidingStep, true, false, false, true, false);
+ }
+
+ @Override
+ public TimeRange getFirstTimeRange() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean hasNextTimeRange() {
+ return sampleIndex < samplingIndexes.size();
+ }
+
+ @Override
+ public TimeRange nextTimeRange() {
+ while (allTimeRangeIterator.hasNextTimeRange()) {
+ TimeRange timeRange = allTimeRangeIterator.nextTimeRange();
+ if (timeRangeIndex == samplingIndexes.get(sampleIndex)) {
+ curTimeRange = timeRange;
+ timeRangeIndex++;
+ sampleIndex++;
+ break;
+ }
+ timeRangeIndex++;
+ }
+ return curTimeRange;
+ }
+
+ @Override
+ public boolean isAscending() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long currentOutputTime() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getTotalIntervalNum() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java
index a86a513936..cd0ac8a08e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.mpp.aggregation.timerangeiterator;
+import java.util.List;
+
import static org.apache.iotdb.db.qp.utils.DateTimeUtils.MS_TO_MONTH;
public class TimeRangeIteratorFactory {
@@ -72,4 +74,13 @@ public class TimeRangeIteratorFactory {
leftCRightO);
}
}
+
+ public static ITimeRangeIterator getSampleTimeRangeIterator(
+ long startTime,
+ long endTime,
+ long interval,
+ long slidingStep,
+ List<Integer> samplingIndexes) {
+ return new SampleWindowIterator(startTime, endTime, interval, slidingStep, samplingIndexes);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
new file mode 100644
index 0000000000..d8d4ebc1f7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockUtil;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.List;
+
+public class WindowSplitOperator implements ProcessOperator {
+
+ protected final OperatorContext operatorContext;
+
+ protected final Operator child;
+ protected TsBlock inputTsBlock;
+ protected boolean canCallNext;
+
+ private final ITimeRangeIterator sampleTimeRangeIterator;
+ private TimeRange curTimeRange;
+
+ private final TsBlockBuilder resultTsBlockBuilder;
+
+ public WindowSplitOperator(
+ OperatorContext operatorContext,
+ Operator child,
+ ITimeRangeIterator sampleTimeRangeIterator,
+ List<TSDataType> outputDataTypes) {
+ this.operatorContext = operatorContext;
+ this.child = child;
+ this.sampleTimeRangeIterator = sampleTimeRangeIterator;
+ this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public ListenableFuture<?> isBlocked() {
+ return child.isBlocked();
+ }
+
+ @Override
+ public TsBlock next() {
+ // reset operator state
+ canCallNext = true;
+
+ if (curTimeRange == null && sampleTimeRangeIterator.hasNextTimeRange()) {
+ // move to next time window
+ curTimeRange = sampleTimeRangeIterator.nextTimeRange();
+ }
+
+ if (!fetchData()) {
+ return null;
+ } else {
+ curTimeRange = null;
+ TsBlock resultTsBlock = resultTsBlockBuilder.build();
+ resultTsBlockBuilder.reset();
+ return resultTsBlock;
+ }
+ }
+
+ private boolean fetchData() {
+ while (!consumeInput()) {
+ // NOTE: child.next() can only be invoked once
+ if (child.hasNext() && canCallNext) {
+ inputTsBlock = child.next();
+ canCallNext = false;
+ } else {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean consumeInput() {
+ if (inputTsBlock == null) {
+ return false;
+ }
+
+ inputTsBlock = TsBlockUtil.skipPointsOutOfTimeRange(inputTsBlock, curTimeRange, true);
+ if (inputTsBlock == null) {
+ return false;
+ }
+
+ for (int readIndex = 0; readIndex < inputTsBlock.getPositionCount(); readIndex++) {
+ long time = inputTsBlock.getTimeByIndex(readIndex);
+ if (curTimeRange.contains(time)) {
+ writeData(readIndex);
+ } else {
+ inputTsBlock = inputTsBlock.subTsBlock(readIndex);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void writeData(int readIndex) {
+ TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
+ timeColumnBuilder.writeLong(inputTsBlock.getTimeByIndex(readIndex));
+ ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
+ for (int columnIndex = 0; columnIndex < columnBuilders.length; columnIndex++) {
+ columnBuilders[columnIndex].write(inputTsBlock.getColumn(columnIndex), readIndex);
+ }
+ resultTsBlockBuilder.declarePosition();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return curTimeRange != null || sampleTimeRangeIterator.hasNextTimeRange();
+ }
+
+ @Override
+ public boolean isFinished() {
+ return !this.hasNext();
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 0;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 0;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index a02cbd773a..4c69cd5720 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.aggregation.slidingwindow.SlidingWindowAggregatorFactory;
import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.TimeRangeIteratorFactory;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.NodeRef;
import org.apache.iotdb.db.mpp.execution.driver.SchemaDriverContext;
@@ -53,6 +54,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOper
import org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.TagAggregationOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.WindowSplitOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BinaryConstantFill;
@@ -147,6 +149,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggre
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.WindowSplitNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
@@ -1575,6 +1578,32 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
return child;
}
+ @Override
+ public Operator visitWindowSplit(WindowSplitNode node, LocalExecutionPlanContext context) {
+ Operator child = node.getChild().accept(this, context);
+ OperatorContext operatorContext =
+ context
+ .getInstanceContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ WindowSplitOperator.class.getSimpleName());
+
+ GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+ ITimeRangeIterator timeRangeIterator =
+ TimeRangeIteratorFactory.getSampleTimeRangeIterator(
+ groupByTimeParameter.getStartTime(),
+ groupByTimeParameter.getEndTime(),
+ groupByTimeParameter.getInterval(),
+ groupByTimeParameter.getSlidingStep(),
+ node.getSamplingIndexes());
+
+ List<TSDataType> outputDataTypes = getOutputColumnTypes(node, context.getTypeProvider());
+
+ context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+ return new WindowSplitOperator(operatorContext, child, timeRangeIterator, outputDataTypes);
+ }
+
@Override
public Operator visitSchemaFetchMerge(
SchemaFetchMergeNode node, LocalExecutionPlanContext context) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 788649ffc8..6a064cd90d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggre
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.WindowSplitNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
@@ -326,4 +327,8 @@ public abstract class PlanVisitor<R, C> {
public R visitDeviceViewInto(DeviceViewIntoNode node, C context) {
return visitPlan(node, context);
}
+
+ public R visitWindowSplit(WindowSplitNode node, C context) {
+ return visitPlan(node, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FetchWindowSetStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FetchWindowSetStatement.java
index fd9dc9a1e0..fa1b815cda 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FetchWindowSetStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/FetchWindowSetStatement.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.plan.statement.crud;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.mpp.plan.constant.StatementType;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
@@ -81,5 +82,9 @@ public class FetchWindowSetStatement extends Statement {
return visitor.visitFetchWindowSet(this, context);
}
- public void semanticCheck() {}
+ public void semanticCheck() {
+ if (groupByTimeParameter.hasOverlap()) {
+ throw new SemanticException("");
+ }
+ }
}