You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/14 07:05:16 UTC
[iotdb] 03/05: add WindowConcatOperator
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 64b03425133cc6b758e93762b92994818638d7fe
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Nov 14 10:27:00 2022 +0800
add WindowConcatOperator
---
.../TimeRangeIteratorFactory.java | 3 +-
...plitOperator.java => WindowConcatOperator.java} | 82 ++--------------------
.../operator/process/WindowSplitOperator.java | 12 ++--
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 32 ++++++++-
4 files changed, 45 insertions(+), 84 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java
index cd0ac8a08e..552cca4bb2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java
@@ -80,7 +80,8 @@ public class TimeRangeIteratorFactory {
long endTime,
long interval,
long slidingStep,
- List<Integer> samplingIndexes) {
+ List<Integer> samplingIndexes,
+ boolean outputPartialTimeWindow) {
return new SampleWindowIterator(startTime, endTime, interval, slidingStep, samplingIndexes);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java
similarity index 50%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java
index d8d4ebc1f7..1f40016f9a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java
@@ -26,15 +26,10 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockUtil;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
-
-import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
-public class WindowSplitOperator implements ProcessOperator {
+public class WindowConcatOperator implements ProcessOperator {
protected final OperatorContext operatorContext;
@@ -47,7 +42,7 @@ public class WindowSplitOperator implements ProcessOperator {
private final TsBlockBuilder resultTsBlockBuilder;
- public WindowSplitOperator(
+ public WindowConcatOperator(
OperatorContext operatorContext,
Operator child,
ITimeRangeIterator sampleTimeRangeIterator,
@@ -60,87 +55,22 @@ public class WindowSplitOperator implements ProcessOperator {
@Override
public OperatorContext getOperatorContext() {
- return operatorContext;
- }
-
- @Override
- public ListenableFuture<?> isBlocked() {
- return child.isBlocked();
+ return null;
}
@Override
public TsBlock next() {
- // reset operator state
- canCallNext = true;
-
- if (curTimeRange == null && sampleTimeRangeIterator.hasNextTimeRange()) {
- // move to next time window
- curTimeRange = sampleTimeRangeIterator.nextTimeRange();
- }
-
- if (!fetchData()) {
- return null;
- } else {
- curTimeRange = null;
- TsBlock resultTsBlock = resultTsBlockBuilder.build();
- resultTsBlockBuilder.reset();
- return resultTsBlock;
- }
- }
-
- private boolean fetchData() {
- while (!consumeInput()) {
- // NOTE: child.next() can only be invoked once
- if (child.hasNext() && canCallNext) {
- inputTsBlock = child.next();
- canCallNext = false;
- } else {
- return false;
- }
- }
- return true;
- }
-
- private boolean consumeInput() {
- if (inputTsBlock == null) {
- return false;
- }
-
- inputTsBlock = TsBlockUtil.skipPointsOutOfTimeRange(inputTsBlock, curTimeRange, true);
- if (inputTsBlock == null) {
- return false;
- }
-
- for (int readIndex = 0; readIndex < inputTsBlock.getPositionCount(); readIndex++) {
- long time = inputTsBlock.getTimeByIndex(readIndex);
- if (curTimeRange.contains(time)) {
- writeData(readIndex);
- } else {
- inputTsBlock = inputTsBlock.subTsBlock(readIndex);
- return true;
- }
- }
- return false;
- }
-
- private void writeData(int readIndex) {
- TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
- timeColumnBuilder.writeLong(inputTsBlock.getTimeByIndex(readIndex));
- ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
- for (int columnIndex = 0; columnIndex < columnBuilders.length; columnIndex++) {
- columnBuilders[columnIndex].write(inputTsBlock.getColumn(columnIndex), readIndex);
- }
- resultTsBlockBuilder.declarePosition();
+ return null;
}
@Override
public boolean hasNext() {
- return curTimeRange != null || sampleTimeRangeIterator.hasNextTimeRange();
+ return false;
}
@Override
public boolean isFinished() {
- return !this.hasNext();
+ return false;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
index d8d4ebc1f7..f2ee804f75 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
@@ -42,7 +42,7 @@ public class WindowSplitOperator implements ProcessOperator {
protected TsBlock inputTsBlock;
protected boolean canCallNext;
- private final ITimeRangeIterator sampleTimeRangeIterator;
+ private final ITimeRangeIterator sampleTimeRangeSliceIterator;
private TimeRange curTimeRange;
private final TsBlockBuilder resultTsBlockBuilder;
@@ -50,11 +50,11 @@ public class WindowSplitOperator implements ProcessOperator {
public WindowSplitOperator(
OperatorContext operatorContext,
Operator child,
- ITimeRangeIterator sampleTimeRangeIterator,
+ ITimeRangeIterator sampleTimeRangeSliceIterator,
List<TSDataType> outputDataTypes) {
this.operatorContext = operatorContext;
this.child = child;
- this.sampleTimeRangeIterator = sampleTimeRangeIterator;
+ this.sampleTimeRangeSliceIterator = sampleTimeRangeSliceIterator;
this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
}
@@ -73,9 +73,9 @@ public class WindowSplitOperator implements ProcessOperator {
// reset operator state
canCallNext = true;
- if (curTimeRange == null && sampleTimeRangeIterator.hasNextTimeRange()) {
+ if (curTimeRange == null && sampleTimeRangeSliceIterator.hasNextTimeRange()) {
// move to next time window
- curTimeRange = sampleTimeRangeIterator.nextTimeRange();
+ curTimeRange = sampleTimeRangeSliceIterator.nextTimeRange();
}
if (!fetchData()) {
@@ -135,7 +135,7 @@ public class WindowSplitOperator implements ProcessOperator {
@Override
public boolean hasNext() {
- return curTimeRange != null || sampleTimeRangeIterator.hasNextTimeRange();
+ return curTimeRange != null || sampleTimeRangeSliceIterator.hasNextTimeRange();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 4c69cd5720..888c3f94a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -54,6 +54,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOper
import org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.TagAggregationOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.WindowConcatOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.WindowSplitOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill;
import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill;
@@ -149,6 +150,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggre
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.WindowConcatNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.WindowSplitNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
@@ -1596,7 +1598,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
groupByTimeParameter.getEndTime(),
groupByTimeParameter.getInterval(),
groupByTimeParameter.getSlidingStep(),
- node.getSamplingIndexes());
+ node.getSamplingIndexes(),
+ true);
List<TSDataType> outputDataTypes = getOutputColumnTypes(node, context.getTypeProvider());
@@ -1604,6 +1607,33 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
return new WindowSplitOperator(operatorContext, child, timeRangeIterator, outputDataTypes);
}
+ @Override
+ public Operator visitWindowConcat(WindowConcatNode node, LocalExecutionPlanContext context) {
+ Operator child = node.getChild().accept(this, context);
+ OperatorContext operatorContext =
+ context
+ .getInstanceContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ WindowConcatOperator.class.getSimpleName());
+
+ GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
+ ITimeRangeIterator timeRangeIterator =
+ TimeRangeIteratorFactory.getSampleTimeRangeIterator(
+ groupByTimeParameter.getStartTime(),
+ groupByTimeParameter.getEndTime(),
+ groupByTimeParameter.getInterval(),
+ groupByTimeParameter.getSlidingStep(),
+ node.getSamplingIndexes(),
+ false);
+
+ List<TSDataType> outputDataTypes = getOutputColumnTypes(node, context.getTypeProvider());
+
+ context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+ return new WindowConcatOperator(operatorContext, child, timeRangeIterator, outputDataTypes);
+ }
+
@Override
public Operator visitSchemaFetchMerge(
SchemaFetchMergeNode node, LocalExecutionPlanContext context) {