You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/11/14 07:05:18 UTC
[iotdb] 05/05: modify WindowSplitOperator
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch ml/windowSet
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit cf1cffa7f9bad2b738a27d902a07e4a4b32558d3
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Mon Nov 14 15:04:54 2022 +0800
modify WindowSplitOperator
---
.../main/java/org/apache/iotdb/SessionExample.java | 4 +--
.../operator/process/WindowConcatOperator.java | 14 ++++-----
.../operator/process/WindowSplitOperator.java | 33 ++++++++++++++++------
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 17 +++++++++--
4 files changed, 49 insertions(+), 19 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 8760400c91..9baf3a8ad2 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -73,9 +73,9 @@ public class SessionExample {
session.setFetchSize(10000);
List<String> queryPaths = Arrays.asList("root.sg1.d1.s1", "root.sg1.d2.s1");
- List<Integer> indexes = Arrays.asList(1, 3, 5, 7);
+ List<Integer> indexes = Arrays.asList(0, 1, 2, 3);
List<SessionDataSet> windowBatch =
- session.fetchWindowBatch(queryPaths, null, 1, 40, 2, 2, indexes);
+ session.fetchWindowBatch(queryPaths, null, 0, 32, 4, 3, indexes);
for (SessionDataSet window : windowBatch) {
System.out.println(window.getColumnNames());
while (window.hasNext()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java
index 1f40016f9a..269b579feb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java
@@ -55,36 +55,36 @@ public class WindowConcatOperator implements ProcessOperator {
@Override
public OperatorContext getOperatorContext() {
- return null;
+ return operatorContext;
}
@Override
public TsBlock next() {
- return null;
+ return child.next();
}
@Override
public boolean hasNext() {
- return false;
+ return child.hasNext();
}
@Override
public boolean isFinished() {
- return false;
+ return child.isFinished();
}
@Override
public long calculateMaxPeekMemory() {
- return 0;
+ return child.calculateMaxPeekMemory();
}
@Override
public long calculateMaxReturnSize() {
- return 0;
+ return child.calculateMaxReturnSize();
}
@Override
public long calculateRetainedSizeAfterCallingNext() {
- return 0;
+ return child.calculateRetainedSizeAfterCallingNext();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
index f2ee804f75..6b9544b1a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java
@@ -42,18 +42,23 @@ public class WindowSplitOperator implements ProcessOperator {
protected TsBlock inputTsBlock;
protected boolean canCallNext;
- private final ITimeRangeIterator sampleTimeRangeSliceIterator;
+ private final ITimeRangeIterator sampleTimeRangeIterator;
private TimeRange curTimeRange;
+ private final ITimeRangeIterator sampleTimeRangeSliceIterator;
+ private TimeRange curTimeRangeSlice;
+
private final TsBlockBuilder resultTsBlockBuilder;
public WindowSplitOperator(
OperatorContext operatorContext,
Operator child,
+ ITimeRangeIterator sampleTimeRangeIterator,
ITimeRangeIterator sampleTimeRangeSliceIterator,
List<TSDataType> outputDataTypes) {
this.operatorContext = operatorContext;
this.child = child;
+ this.sampleTimeRangeIterator = sampleTimeRangeIterator;
this.sampleTimeRangeSliceIterator = sampleTimeRangeSliceIterator;
this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
}
@@ -73,15 +78,27 @@ public class WindowSplitOperator implements ProcessOperator {
// reset operator state
canCallNext = true;
- if (curTimeRange == null && sampleTimeRangeSliceIterator.hasNextTimeRange()) {
- // move to next time window
- curTimeRange = sampleTimeRangeSliceIterator.nextTimeRange();
+ if (curTimeRange == null && sampleTimeRangeIterator.hasNextTimeRange()) {
+ curTimeRange = sampleTimeRangeIterator.nextTimeRange();
+ }
+
+ while (curTimeRangeSlice == null && sampleTimeRangeSliceIterator.hasNextTimeRange()) {
+ curTimeRangeSlice = sampleTimeRangeSliceIterator.nextTimeRange();
+ if (curTimeRangeSlice.getMin() > curTimeRange.getMax()) {
+ if (sampleTimeRangeIterator.hasNextTimeRange()) {
+ curTimeRange = sampleTimeRangeIterator.nextTimeRange();
+ }
+ if (curTimeRangeSlice.getMin() > curTimeRange.getMax()
+ || curTimeRangeSlice.getMax() < curTimeRange.getMin()) {
+ curTimeRangeSlice = null;
+ }
+ }
}
if (!fetchData()) {
return null;
} else {
- curTimeRange = null;
+ curTimeRangeSlice = null;
TsBlock resultTsBlock = resultTsBlockBuilder.build();
resultTsBlockBuilder.reset();
return resultTsBlock;
@@ -106,14 +123,14 @@ public class WindowSplitOperator implements ProcessOperator {
return false;
}
- inputTsBlock = TsBlockUtil.skipPointsOutOfTimeRange(inputTsBlock, curTimeRange, true);
+ inputTsBlock = TsBlockUtil.skipPointsOutOfTimeRange(inputTsBlock, curTimeRangeSlice, true);
if (inputTsBlock == null) {
return false;
}
for (int readIndex = 0; readIndex < inputTsBlock.getPositionCount(); readIndex++) {
long time = inputTsBlock.getTimeByIndex(readIndex);
- if (curTimeRange.contains(time)) {
+ if (curTimeRangeSlice.contains(time)) {
writeData(readIndex);
} else {
inputTsBlock = inputTsBlock.subTsBlock(readIndex);
@@ -135,7 +152,7 @@ public class WindowSplitOperator implements ProcessOperator {
@Override
public boolean hasNext() {
- return curTimeRange != null || sampleTimeRangeSliceIterator.hasNextTimeRange();
+ return curTimeRangeSlice != null || sampleTimeRangeSliceIterator.hasNextTimeRange();
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 888c3f94a7..d839451544 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -1592,7 +1592,15 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
WindowSplitOperator.class.getSimpleName());
GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
- ITimeRangeIterator timeRangeIterator =
+ ITimeRangeIterator sampleTimeRangeIterator =
+ TimeRangeIteratorFactory.getSampleTimeRangeIterator(
+ groupByTimeParameter.getStartTime(),
+ groupByTimeParameter.getEndTime(),
+ groupByTimeParameter.getInterval(),
+ groupByTimeParameter.getSlidingStep(),
+ node.getSamplingIndexes(),
+ false);
+ ITimeRangeIterator sampleTimeRangeSliceIterator =
TimeRangeIteratorFactory.getSampleTimeRangeIterator(
groupByTimeParameter.getStartTime(),
groupByTimeParameter.getEndTime(),
@@ -1604,7 +1612,12 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
List<TSDataType> outputDataTypes = getOutputColumnTypes(node, context.getTypeProvider());
context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
- return new WindowSplitOperator(operatorContext, child, timeRangeIterator, outputDataTypes);
+ return new WindowSplitOperator(
+ operatorContext,
+ child,
+ sampleTimeRangeIterator,
+ sampleTimeRangeSliceIterator,
+ outputDataTypes);
}
@Override