You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/03/03 02:41:49 UTC
[iotdb] branch master updated: [IOTDB-5563] Extract and decouple the logic of window segmentation in Aggregator (#9141)
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0a4a84fca3 [IOTDB-5563] Extract and decouple the logic of window segmentation in Aggregator (#9141)
0a4a84fca3 is described below
commit 0a4a84fca3fc785af585b4589c66f0b84046d175
Author: YangCaiyin <yc...@gmail.com>
AuthorDate: Fri Mar 3 10:41:41 2023 +0800
[IOTDB-5563] Extract and decouple the logic of window segmentation in Aggregator (#9141)
---
.../db/it/groupby/IoTDBGroupByConditionIT.java | 2 +-
.../iotdb/db/it/groupby/IoTDBGroupBySessionIT.java | 18 +--
...eventWindow.ftl => abstractVariationWindow.ftl} | 30 ++---
.../main/codegen/templates/eventWindowManager.ftl | 51 -------
.../{evEventWindow.ftl => variationWindow.ftl} | 16 +--
...indowManager.ftl => variationWindowManager.ftl} | 36 ++---
.../iotdb/db/mpp/aggregation/Accumulator.java | 16 +--
.../iotdb/db/mpp/aggregation/Aggregator.java | 35 +----
.../iotdb/db/mpp/aggregation/AvgAccumulator.java | 90 +++++--------
.../iotdb/db/mpp/aggregation/CountAccumulator.java | 23 ++--
.../db/mpp/aggregation/CountIfAccumulator.java | 25 ++--
.../db/mpp/aggregation/ExtremeAccumulator.java | 90 +++++--------
.../db/mpp/aggregation/FirstValueAccumulator.java | 150 +++++++--------------
.../mpp/aggregation/FirstValueDescAccumulator.java | 110 +++++----------
.../db/mpp/aggregation/LastValueAccumulator.java | 132 +++++++-----------
.../mpp/aggregation/LastValueDescAccumulator.java | 122 +++++------------
.../db/mpp/aggregation/MaxTimeAccumulator.java | 22 +--
.../db/mpp/aggregation/MaxTimeDescAccumulator.java | 25 ++--
.../db/mpp/aggregation/MaxValueAccumulator.java | 94 +++++--------
.../db/mpp/aggregation/MinTimeAccumulator.java | 25 ++--
.../db/mpp/aggregation/MinTimeDescAccumulator.java | 20 +--
.../db/mpp/aggregation/MinValueAccumulator.java | 93 +++++--------
.../iotdb/db/mpp/aggregation/SumAccumulator.java | 92 +++++--------
.../slidingwindow/SlidingWindowAggregator.java | 5 +-
.../db/mpp/execution/operator/AggregationUtil.java | 18 ++-
.../operator/process/AggregationOperator.java | 2 +-
.../process/RawDataAggregationOperator.java | 48 +++++--
.../process/SlidingWindowAggregationOperator.java | 5 +-
.../AbstractSeriesAggregationScanOperator.java | 2 +-
...entWindow.java => AbstractVariationWindow.java} | 40 +++---
.../{SeriesWindow.java => ConditionWindow.java} | 11 +-
...dowManager.java => ConditionWindowManager.java} | 63 +++------
...arameter.java => ConditionWindowParameter.java} | 6 +-
.../db/mpp/execution/operator/window/IWindow.java | 2 +-
.../execution/operator/window/IWindowManager.java | 52 +++++--
.../execution/operator/window/SessionWindow.java | 5 +-
.../operator/window/SessionWindowManager.java | 33 +----
.../operator/window/TimeWindowManager.java | 24 +---
...dowManager.java => VariationWindowManager.java} | 48 ++-----
...arameter.java => VariationWindowParameter.java} | 6 +-
.../operator/window/WindowManagerFactory.java | 40 +++---
.../mpp/execution/operator/window/WindowType.java | 4 +-
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 28 ++--
.../iotdb/db/mpp/plan/parser/ASTVisitor.java | 20 +--
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 34 +++--
...rameter.java => GroupByConditionParameter.java} | 14 +-
.../planner/plan/parameter/GroupByParameter.java | 6 +-
.../plan/parameter/GroupByVariationParameter.java | 2 +-
...mponent.java => GroupByConditionComponent.java} | 6 +-
.../component/GroupByVariationComponent.java | 2 +-
.../db/mpp/plan/statement/crud/QueryStatement.java | 10 +-
.../iotdb/db/mpp/aggregation/AccumulatorTest.java | 55 ++++----
.../operator/RawDataAggregationOperatorTest.java | 15 ++-
53 files changed, 727 insertions(+), 1196 deletions(-)
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IoTDBGroupByConditionIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IoTDBGroupByConditionIT.java
index 8c9eeb555e..fdbea94e8c 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IoTDBGroupByConditionIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IoTDBGroupByConditionIT.java
@@ -210,7 +210,7 @@ public class IoTDBGroupByConditionIT {
{"6", "2500000000", "2499999994", "13", "100.0"},
};
String sql =
- "select __endTime,max_time(charging_status) - min_time(charging_status),count(vehicle_status),last_value(soc) from root.sg.beijing.car01 group by condition(soc>=24.0,KEEP<=20)";
+ "select __endTime,max_time(charging_status) - min_time(charging_status),count(vehicle_status),last_value(soc) from root.sg.beijing.car01 group by condition(soc>=24.0,KEEP<=15)";
normalTestWithEndTime(res, sql);
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IoTDBGroupBySessionIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IoTDBGroupBySessionIT.java
index c9f4f2fe17..7e4a8e3b07 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IoTDBGroupBySessionIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IoTDBGroupBySessionIT.java
@@ -369,11 +369,7 @@ public class IoTDBGroupBySessionIT {
};
String sql =
"select __endTime,count(status), avg(temperature), sum(hardware), first_value(hardware) from root.ln.** group by session(50s) having count(status)>5 align by device";
- normalTestAlignByDevice(
- res,
- sql,
- 1,
- "Time,Device,__endTime,count(status),avg(temperature),sum(hardware),first_value(hardware)");
+ normalTestAlignByDevice(res, sql, 1);
}
@Test
@@ -386,11 +382,7 @@ public class IoTDBGroupBySessionIT {
};
String sql =
"select __endTime,count(status), avg(temperature), sum(hardware), first_value(hardware) from root.ln.** group by session(1d) align by device";
- normalTestAlignByDevice(
- res,
- sql,
- 2,
- "Time,Device,__endTime,count(status),avg(temperature),sum(hardware),first_value(hardware)");
+ normalTestAlignByDevice(res, sql, 2);
}
@Test
@@ -430,13 +422,15 @@ public class IoTDBGroupBySessionIT {
normalTestAlignByDevice2(res, sql, 2, "Time,Device,__endTime,count(hardware)");
}
- private void normalTestAlignByDevice(String[][] res, String sql, int split, String title) {
+ private void normalTestAlignByDevice(String[][] res, String sql, int split) {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
try (ResultSet resultSet = statement.executeQuery(sql)) {
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
- checkHeader(resultSetMetaData, title);
+ checkHeader(
+ resultSetMetaData,
+ "Time,Device,__endTime,count(status),avg(temperature),sum(hardware),first_value(hardware)");
int count = 0;
String device = "root.ln.wf02.wt01";
while (resultSet.next()) {
diff --git a/server/src/main/codegen/templates/eventWindow.ftl b/server/src/main/codegen/templates/abstractVariationWindow.ftl
similarity index 73%
rename from server/src/main/codegen/templates/eventWindow.ftl
rename to server/src/main/codegen/templates/abstractVariationWindow.ftl
index 04b461c3e7..aa8083cfaa 100644
--- a/server/src/main/codegen/templates/eventWindow.ftl
+++ b/server/src/main/codegen/templates/abstractVariationWindow.ftl
@@ -20,7 +20,7 @@
<#list allDataTypes.types as type>
- <#assign className = "Event${type.dataType?cap_first}Window">
+ <#assign className = "AbstractVariation${type.dataType?cap_first}Window">
<@pp.changeOutputFile name="/org/apache/iotdb/db/mpp/execution/operator/window/${className}.java" />
package org.apache.iotdb.db.mpp.execution.operator.window;
@@ -32,19 +32,19 @@ import org.apache.iotdb.tsfile.utils.Binary;
/*
* This class is generated using freemarker and the ${.template_name} template.
*/
-public abstract class ${className} extends EventWindow {
+public abstract class ${className} extends AbstractVariationWindow {
- protected ${type.dataType} eventValue;
+ protected ${type.dataType} headValue;
- private ${type.dataType} previousEventValue;
+ private ${type.dataType} previousValue;
- public ${className}(EventWindowParameter eventWindowParameter) {
- super(eventWindowParameter);
+ public ${className}(VariationWindowParameter variationWindowParameter) {
+ super(variationWindowParameter);
}
@Override
- public void updatePreviousEventValue() {
- previousEventValue = eventValue;
+ public void updatePreviousValue() {
+ previousValue = headValue;
}
@Override
@@ -59,25 +59,25 @@ public abstract class ${className} extends EventWindow {
endTime = currentTime;
}
// judge whether we need initialize eventValue
- if (!initializedEventValue) {
+ if (!initializedHeadValue) {
startTime = currentTime;
endTime = currentTime;
if(controlTimeAndValueColumn[0].isNull(index)){
valueIsNull = true;
}else{
valueIsNull = false;
- eventValue = controlTimeAndValueColumn[0].get${type.dataType?cap_first}(index);
+ headValue = controlTimeAndValueColumn[0].get${type.dataType?cap_first}(index);
}
- initializedEventValue = true;
+ initializedHeadValue = true;
}
}
- public ${type.dataType} getEventValue() {
- return eventValue;
+ public ${type.dataType} getHeadValue() {
+ return headValue;
}
- public ${type.dataType} getPreviousEventValue() {
- return previousEventValue;
+ public ${type.dataType} getPreviousHeadValue() {
+ return previousValue;
}
}
diff --git a/server/src/main/codegen/templates/eventWindowManager.ftl b/server/src/main/codegen/templates/eventWindowManager.ftl
deleted file mode 100644
index 4934268e46..0000000000
--- a/server/src/main/codegen/templates/eventWindowManager.ftl
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-<@pp.dropOutputFile />
-
-<#list allDataTypes.types as type>
-
- <#assign className = "Event${type.dataType?cap_first}WindowManager">
- <@pp.changeOutputFile name="/org/apache/iotdb/db/mpp/execution/operator/window/${className}.java" />
-package org.apache.iotdb.db.mpp.execution.operator.window;
-
-import org.apache.iotdb.db.mpp.aggregation.Aggregator;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-
-import java.util.List;
-
-/*
-* This class is generated using freemarker and the ${.template_name} template.
-*/
-public abstract class ${className} extends EventWindowManager {
-
- public ${className}(EventWindowParameter eventWindowParameter, boolean ascending) {
- super(eventWindowParameter, ascending);
- }
-
- @Override
- public void appendAggregationResult(
- TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
- // Append aggregation results to valueColumnBuilders.
- ColumnBuilder[] columnBuilders =
- appendOriginAggregationResult(resultTsBlockBuilder, aggregators);
- }
-}
-
-</#list>
\ No newline at end of file
diff --git a/server/src/main/codegen/templates/evEventWindow.ftl b/server/src/main/codegen/templates/variationWindow.ftl
similarity index 83%
rename from server/src/main/codegen/templates/evEventWindow.ftl
rename to server/src/main/codegen/templates/variationWindow.ftl
index 03b62cb586..5930fd0077 100644
--- a/server/src/main/codegen/templates/evEventWindow.ftl
+++ b/server/src/main/codegen/templates/variationWindow.ftl
@@ -30,22 +30,22 @@
</#if>
</#if>
- <#assign className = "${compareType.compareType?cap_first}Event${dataType.dataType?cap_first}Window">
+ <#assign className = "${compareType.compareType?cap_first}${dataType.dataType?cap_first}Window">
<@pp.changeOutputFile name="/org/apache/iotdb/db/mpp/execution/operator/window/${className}.java" />
package org.apache.iotdb.db.mpp.execution.operator.window;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
-public class ${className} extends Event${dataType.dataType?cap_first}Window {
+public class ${className} extends AbstractVariation${dataType.dataType?cap_first}Window {
- public ${className}(EventWindowParameter eventWindowParameter) {
- super(eventWindowParameter);
+ public ${className}(VariationWindowParameter variationWindowParameter) {
+ super(variationWindowParameter);
}
@Override
public boolean satisfy(Column column, int index) {
- if (!initializedEventValue) {
+ if (!initializedHeadValue) {
return true;
}
if(column.isNull(index)){
@@ -53,12 +53,12 @@ public class ${className} extends Event${dataType.dataType?cap_first}Window {
}
<#if compareType.compareType == "equal">
<#if dataType.dataType == "Binary">
- return !valueIsNull&&column.get${dataType.dataType?cap_first}(index).equals(eventValue);
+ return !valueIsNull&&column.get${dataType.dataType?cap_first}(index).equals(headValue);
<#else>
- return !valueIsNull&&column.get${dataType.dataType?cap_first}(index) == eventValue;
+ return !valueIsNull&&column.get${dataType.dataType?cap_first}(index) == headValue;
</#if>
<#else>
- return !valueIsNull&&Math.abs(column.get${dataType.dataType?cap_first}(index) - eventValue) <= eventWindowParameter.getDelta();
+ return !valueIsNull&&Math.abs(column.get${dataType.dataType?cap_first}(index) - headValue) <= getDelta();
</#if>
}
}
diff --git a/server/src/main/codegen/templates/evEventWindowManager.ftl b/server/src/main/codegen/templates/variationWindowManager.ftl
similarity index 74%
rename from server/src/main/codegen/templates/evEventWindowManager.ftl
rename to server/src/main/codegen/templates/variationWindowManager.ftl
index 1064e94295..5409884e86 100644
--- a/server/src/main/codegen/templates/evEventWindowManager.ftl
+++ b/server/src/main/codegen/templates/variationWindowManager.ftl
@@ -30,8 +30,8 @@
</#if>
</#if>
- <#assign className = "${compareType.compareType?cap_first}Event${dataType.dataType?cap_first}WindowManager">
- <#assign windowName = "${compareType.compareType?cap_first}Event${dataType.dataType?cap_first}Window">
+ <#assign className = "${compareType.compareType?cap_first}${dataType.dataType?cap_first}WindowManager">
+ <#assign windowName = "${compareType.compareType?cap_first}${dataType.dataType?cap_first}Window">
<@pp.changeOutputFile name="/org/apache/iotdb/db/mpp/execution/operator/window/${className}.java" />
package org.apache.iotdb.db.mpp.execution.operator.window;
@@ -46,12 +46,12 @@ import org.apache.iotdb.tsfile.utils.Binary;
/*
* This class is generated using freemarker and the ${.template_name} template.
*/
-public class ${className} extends Event${dataType.dataType?cap_first}WindowManager {
+public class ${className} extends VariationWindowManager {
public ${className}(
- EventWindowParameter eventWindowParameter, boolean ascending) {
- super(eventWindowParameter, ascending);
- eventWindow = new ${windowName}(eventWindowParameter);
+ VariationWindowParameter variationWindowParameter, boolean ascending) {
+ super(ascending);
+ variationWindow = new ${windowName}(variationWindowParameter);
}
@Override
@@ -64,24 +64,24 @@ public class ${className} extends Event${dataType.dataType?cap_first}WindowManag
return inputTsBlock;
}
- Column controlColumn = inputTsBlock.getColumn(eventWindowParameter.getControlColumnIndex());
+ Column controlColumn = variationWindow.getControlColumn(inputTsBlock);
TimeColumn timeColumn = inputTsBlock.getTimeColumn();
int i = 0, size = inputTsBlock.getPositionCount();
- ${dataType.dataType} previousEventValue = ((${windowName}) eventWindow).getPreviousEventValue();
- boolean previousEventValueIsNull = ((${windowName}) eventWindow).valueIsNull();
+ ${dataType.dataType} previousValue = ((${windowName}) variationWindow).getPreviousHeadValue();
+ boolean previousValueIsNull = ((${windowName}) variationWindow).valueIsNull();
for (; i < size; i++) {
// condition must be initialized when isNull is false
boolean condition = false;
boolean isNull = controlColumn.isNull(i);
<#if compareType.compareType == "equal">
<#if dataType.dataType == "Binary">
- if(!isNull) condition = !controlColumn.get${dataType.dataType?cap_first}(i).equals(previousEventValue);
+ if(!isNull) condition = !controlColumn.get${dataType.dataType?cap_first}(i).equals(previousValue);
<#else>
- if(!isNull) condition = controlColumn.get${dataType.dataType?cap_first}(i) != previousEventValue;
+ if(!isNull) condition = controlColumn.get${dataType.dataType?cap_first}(i) != previousValue;
</#if>
<#else>
- if(!isNull) condition = Math.abs(controlColumn.get${dataType.dataType?cap_first}(i) - previousEventValue)
- > eventWindowParameter.getDelta();
+ if(!isNull) condition = Math.abs(controlColumn.get${dataType.dataType?cap_first}(i) - previousValue)
+ > variationWindow.getDelta();
</#if>
if(isIgnoringNull()){
if (!isNull && condition) {
@@ -90,19 +90,19 @@ public class ${className} extends Event${dataType.dataType?cap_first}WindowManag
continue;
}
}else{
- if((isNull&&!previousEventValueIsNull)||!isNull&&previousEventValueIsNull||(!isNull&&condition)){
+ if((isNull&&!previousValueIsNull)||!isNull&&previousValueIsNull||(!isNull&&condition)){
break;
}
}
long currentTime = timeColumn.getLong(i);
// judge whether we need update startTime
- if (eventWindow.getStartTime() > currentTime) {
- eventWindow.setStartTime(currentTime);
+ if (variationWindow.getStartTime() > currentTime) {
+ variationWindow.setStartTime(currentTime);
}
// judge whether we need update endTime
- if (eventWindow.getEndTime() < currentTime) {
- eventWindow.setEndTime(currentTime);
+ if (variationWindow.getEndTime() < currentTime) {
+ variationWindow.setEndTime(currentTime);
}
}
// we can create a new window beginning at index i of inputTsBlock
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Accumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Accumulator.java
index bbf571d39b..a268c96fb6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Accumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Accumulator.java
@@ -18,26 +18,26 @@
*/
package org.apache.iotdb.db.mpp.aggregation;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.BitMap;
public interface Accumulator {
/**
- * Column should be like: | ControlColumn | Time | Value |
+ * Column should be like: |Time | Value |
*
* <p>IgnoringNull is required when considering the row where the value of controlColumn is null
*
- * <p>Return the last read row index of current timeColumn
+ * <p>bitMap is required for group-by framework. When needed(eq. controlColumn is null), bitMap
+ * can guide accumulator to skip some rows
+ *
+ * <p>lastIndex is required for group-by framework indicating the row to return to leave in
+ * advance for various reasons(eq. current row doesn't satisfy the window)
*/
- default int addInput(Column[] column, IWindow window) {
- return addInput(column, window, true);
- }
-
- int addInput(Column[] column, IWindow window, boolean ignoringNull);
+ void addInput(Column[] column, BitMap bitMap, int lastIndex);
/**
* For aggregation function like COUNT, SUM, partialResult should be single; But for AVG,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
index 2295636d0a..e359ce884a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
@@ -19,17 +19,15 @@
package org.apache.iotdb.db.mpp.aggregation;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
-import org.apache.iotdb.db.mpp.execution.operator.window.TimeWindow;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.BitMap;
import java.util.Collections;
import java.util.List;
@@ -45,8 +43,6 @@ public class Aggregator {
protected List<InputLocation[]> inputLocationList;
protected final AggregationStep step;
- protected IWindow curWindow;
-
protected final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
// Used for SeriesAggregateScanOperator
@@ -66,27 +62,21 @@ public class Aggregator {
}
// Used for SeriesAggregateScanOperator and RawDataAggregateOperator
- public int processTsBlock(TsBlock tsBlock, boolean ignoringNull) {
+ public void processTsBlock(TsBlock tsBlock, BitMap bitMap, int lastIndex) {
long startTime = System.nanoTime();
try {
checkArgument(
step.isInputRaw(),
"Step in SeriesAggregateScanOperator and RawDataAggregateOperator can only process raw input");
- int lastReadReadIndex = 0;
for (InputLocation[] inputLocations : inputLocationList) {
checkArgument(
inputLocations[0].getTsBlockIndex() == 0,
"RawDataAggregateOperator can only process one tsBlock input.");
- Column[] controlTimeAndValueColumn = new Column[3];
- controlTimeAndValueColumn[0] = curWindow.getControlColumn(tsBlock);
- controlTimeAndValueColumn[1] = tsBlock.getTimeColumn();
- controlTimeAndValueColumn[2] = tsBlock.getColumn(inputLocations[0].getValueColumnIndex());
- lastReadReadIndex =
- Math.max(
- lastReadReadIndex,
- accumulator.addInput(controlTimeAndValueColumn, curWindow, ignoringNull));
+ Column[] timeAndValueColumn = new Column[2];
+ timeAndValueColumn[0] = tsBlock.getTimeColumn();
+ timeAndValueColumn[1] = tsBlock.getColumn(inputLocations[0].getValueColumnIndex());
+ accumulator.addInput(timeAndValueColumn, bitMap, lastIndex);
}
- return lastReadReadIndex;
} finally {
QUERY_METRICS.recordExecutionCost(AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
}
@@ -149,21 +139,10 @@ public class Aggregator {
}
public void reset() {
- curWindow = null;
accumulator.reset();
}
public boolean hasFinalResult() {
- return curWindow.hasFinalResult(accumulator);
- }
-
- public void updateTimeRange(TimeRange curTimeRange) {
- reset();
- this.curWindow = new TimeWindow(curTimeRange);
- }
-
- public void updateWindow(IWindow curWindow) {
- reset();
- this.curWindow = curWindow;
+ return accumulator.hasFinalResult();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java
index 89b946207e..eb3ba80523 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java
@@ -19,19 +19,19 @@
package org.apache.iotdb.db.mpp.aggregation;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.BitMap;
import static com.google.common.base.Preconditions.checkArgument;
public class AvgAccumulator implements Accumulator {
- private TSDataType seriesDataType;
+ private final TSDataType seriesDataType;
private long countValue;
private double sumValue;
private boolean initResult = false;
@@ -41,16 +41,20 @@ public class AvgAccumulator implements Accumulator {
}
@Override
- public int addInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
+ public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
switch (seriesDataType) {
case INT32:
- return addIntInput(column, curWindow, ignoringNull);
+ addIntInput(column, bitMap, lastIndex);
+ return;
case INT64:
- return addLongInput(column, curWindow, ignoringNull);
+ addLongInput(column, bitMap, lastIndex);
+ return;
case FLOAT:
- return addFloatInput(column, curWindow, ignoringNull);
+ addFloatInput(column, bitMap, lastIndex);
+ return;
case DOUBLE:
- return addDoubleInput(column, curWindow, ignoringNull);
+ addDoubleInput(column, bitMap, lastIndex);
+ return;
case TEXT:
case BOOLEAN:
default:
@@ -146,87 +150,55 @@ public class AvgAccumulator implements Accumulator {
return TSDataType.DOUBLE;
}
- private int addIntInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ private void addIntInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
+ if (!column[1].isNull(i)) {
initResult = true;
countValue++;
- sumValue += column[2].getInt(i);
+ sumValue += column[1].getInt(i);
}
}
- return curPositionCount;
}
- private int addLongInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ private void addLongInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
+ if (!column[1].isNull(i)) {
initResult = true;
countValue++;
- sumValue += column[2].getLong(i);
+ sumValue += column[1].getLong(i);
}
}
- return curPositionCount;
}
- private int addFloatInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ private void addFloatInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
+ if (!column[1].isNull(i)) {
initResult = true;
countValue++;
- sumValue += column[2].getFloat(i);
+ sumValue += column[1].getFloat(i);
}
}
- return curPositionCount;
}
- private int addDoubleInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ private void addDoubleInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
+ if (!column[1].isNull(i)) {
initResult = true;
countValue++;
- sumValue += column[2].getDouble(i);
+ sumValue += column[1].getDouble(i);
}
}
- return curPositionCount;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java
index 6f7f19c1fa..538c21d0fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java
@@ -19,11 +19,11 @@
package org.apache.iotdb.db.mpp.aggregation;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.BitMap;
import static com.google.common.base.Preconditions.checkArgument;
@@ -33,31 +33,26 @@ public class CountAccumulator implements Accumulator {
public CountAccumulator() {}
- // Column should be like: | ControlColumn | Time | Value |
+ // Column should be like: | Time | Value |
@Override
- public int addInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
+ public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
int curPositionCount = column[0].getPositionCount();
- if (!column[2].mayHaveNull() && curWindow.contains(column[0])) {
+ if (!column[1].mayHaveNull()
+ && lastIndex == curPositionCount - 1
+ && ((bitMap == null) || bitMap.isAllMarked())) {
countValue += curPositionCount;
} else {
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
+ if (!column[1].isNull(i)) {
countValue++;
}
}
}
-
- return curPositionCount;
}
// partialResult should be like: | partialCountValue1 |
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountIfAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountIfAccumulator.java
index 8d3e2e03e8..cb33e44f50 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountIfAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountIfAccumulator.java
@@ -20,11 +20,11 @@
package org.apache.iotdb.db.mpp.aggregation;
import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory.KeepEvaluator;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.BitMap;
import static com.google.common.base.Preconditions.checkArgument;
@@ -47,23 +47,16 @@ public class CountIfAccumulator implements Accumulator {
this.ignoreNull = ignoreNull;
}
- // Column should be like: | ControlColumn | Time | Value |
+ // Column should be like: | Time | Value |
@Override
- public int addInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
- for (int i = 0; i < curPositionCount; i++) {
+ public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
// skip null value in control column
- // the input parameter 'ignoringNull' effects on ControlColumn
- if (ignoringNull && column[0].isNull(i)) {
+ // the input parameter 'bitMap' and 'lastIndex' effects on ControlColumn
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
-
- if (column[2].isNull(i)) {
- // the member variable 'ignoreNull' effects on calculation of ValueColumn
+ if (column[1].isNull(i)) {
if (!this.ignoreNull) {
// data point segment was over, judge whether to count
if (lastPointIsSatisfy && keepEvaluator.apply(keep)) {
@@ -73,7 +66,7 @@ public class CountIfAccumulator implements Accumulator {
lastPointIsSatisfy = false;
}
} else {
- if (column[2].getBoolean(i)) {
+ if (column[1].getBoolean(i)) {
keep++;
lastPointIsSatisfy = true;
} else {
@@ -86,8 +79,6 @@ public class CountIfAccumulator implements Accumulator {
}
}
}
-
- return curPositionCount;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java
index 44a2c90283..87643d97bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java
@@ -19,12 +19,12 @@
package org.apache.iotdb.db.mpp.aggregation;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import static com.google.common.base.Preconditions.checkArgument;
@@ -32,7 +32,7 @@ import static com.google.common.base.Preconditions.checkArgument;
public class ExtremeAccumulator implements Accumulator {
private final TSDataType seriesDataType;
- private TsPrimitiveType extremeResult;
+ private final TsPrimitiveType extremeResult;
private boolean initResult;
public ExtremeAccumulator(TSDataType seriesDataType) {
@@ -41,16 +41,20 @@ public class ExtremeAccumulator implements Accumulator {
}
@Override
- public int addInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
+ public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
switch (seriesDataType) {
case INT32:
- return addIntInput(column, curWindow, ignoringNull);
+ addIntInput(column, bitMap, lastIndex);
+ return;
case INT64:
- return addLongInput(column, curWindow, ignoringNull);
+ addLongInput(column, bitMap, lastIndex);
+ return;
case FLOAT:
- return addFloatInput(column, curWindow, ignoringNull);
+ addFloatInput(column, bitMap, lastIndex);
+ return;
case DOUBLE:
- return addDoubleInput(column, curWindow, ignoringNull);
+ addDoubleInput(column, bitMap, lastIndex);
+ return;
case TEXT:
case BOOLEAN:
default:
@@ -221,23 +225,15 @@ public class ExtremeAccumulator implements Accumulator {
return extremeResult.getDataType();
}
- private int addIntInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ private void addIntInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateIntResult(column[2].getInt(i));
+ if (!column[1].isNull(i)) {
+ updateIntResult(column[1].getInt(i));
}
}
- return curPositionCount;
}
private void updateIntResult(int extVal) {
@@ -253,23 +249,15 @@ public class ExtremeAccumulator implements Accumulator {
}
}
- private int addLongInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ private void addLongInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateLongResult(column[2].getLong(i));
+ if (!column[1].isNull(i)) {
+ updateLongResult(column[1].getLong(i));
}
}
- return curPositionCount;
}
private void updateLongResult(long extVal) {
@@ -285,23 +273,15 @@ public class ExtremeAccumulator implements Accumulator {
}
}
- private int addFloatInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ private void addFloatInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateFloatResult(column[2].getFloat(i));
+ if (!column[1].isNull(i)) {
+ updateFloatResult(column[1].getFloat(i));
}
}
- return curPositionCount;
}
private void updateFloatResult(float extVal) {
@@ -317,23 +297,15 @@ public class ExtremeAccumulator implements Accumulator {
}
}
- private int addDoubleInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ private void addDoubleInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateDoubleResult(column[2].getDouble(i));
+ if (!column[1].isNull(i)) {
+ updateDoubleResult(column[1].getDouble(i));
}
}
- return curPositionCount;
}
private void updateDoubleResult(double extVal) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
index 86d2b34f5e..5aff5e4e7d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
@@ -19,13 +19,13 @@
package org.apache.iotdb.db.mpp.aggregation;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import static com.google.common.base.Preconditions.checkArgument;
@@ -42,22 +42,28 @@ public class FirstValueAccumulator implements Accumulator {
firstValue = TsPrimitiveType.getByType(seriesDataType);
}
- // Column should be like: | ControlColumn | Time | Value |
+ // Column should be like: | Time | Value |
@Override
- public int addInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
+ public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
switch (seriesDataType) {
case INT32:
- return addIntInput(column, curWindow, ignoringNull);
+ addIntInput(column, bitMap, lastIndex);
+ return;
case INT64:
- return addLongInput(column, curWindow, ignoringNull);
+ addLongInput(column, bitMap, lastIndex);
+ return;
case FLOAT:
- return addFloatInput(column, curWindow, ignoringNull);
+ addFloatInput(column, bitMap, lastIndex);
+ return;
case DOUBLE:
- return addDoubleInput(column, curWindow, ignoringNull);
+ addDoubleInput(column, bitMap, lastIndex);
+ return;
case TEXT:
- return addBinaryInput(column, curWindow, ignoringNull);
+ addBinaryInput(column, bitMap, lastIndex);
+ return;
case BOOLEAN:
- return addBooleanInput(column, curWindow, ignoringNull);
+ addBooleanInput(column, bitMap, lastIndex);
+ return;
default:
throw new UnSupportedDataTypeException(
String.format("Unsupported data type in FirstValue: %s", seriesDataType));
@@ -246,25 +252,16 @@ public class FirstValueAccumulator implements Accumulator {
return firstValue.getDataType();
}
- protected int addIntInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ protected void addIntInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateIntFirstValue(column[2].getInt(i), column[1].getLong(i));
- return i + 1;
+ if (!column[1].isNull(i)) {
+ updateIntFirstValue(column[1].getInt(i), column[0].getLong(i));
+ return;
}
}
-
- return curPositionCount;
}
protected void updateIntFirstValue(int value, long curTime) {
@@ -275,25 +272,16 @@ public class FirstValueAccumulator implements Accumulator {
}
}
- protected int addLongInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ protected void addLongInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateLongFirstValue(column[2].getLong(i), column[1].getLong(i));
- return i + 1;
+ if (!column[1].isNull(i)) {
+ updateLongFirstValue(column[1].getLong(i), column[0].getLong(i));
+ return;
}
}
-
- return curPositionCount;
}
protected void updateLongFirstValue(long value, long curTime) {
@@ -304,25 +292,16 @@ public class FirstValueAccumulator implements Accumulator {
}
}
- protected int addFloatInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ protected void addFloatInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateFloatFirstValue(column[2].getFloat(i), column[1].getLong(i));
- return i + 1;
+ if (!column[1].isNull(i)) {
+ updateFloatFirstValue(column[1].getFloat(i), column[0].getLong(i));
+ return;
}
}
-
- return curPositionCount;
}
protected void updateFloatFirstValue(float value, long curTime) {
@@ -333,25 +312,16 @@ public class FirstValueAccumulator implements Accumulator {
}
}
- protected int addDoubleInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ protected void addDoubleInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateDoubleFirstValue(column[2].getDouble(i), column[1].getLong(i));
- return i + 1;
+ if (!column[1].isNull(i)) {
+ updateDoubleFirstValue(column[1].getDouble(i), column[0].getLong(i));
+ return;
}
}
-
- return curPositionCount;
}
protected void updateDoubleFirstValue(double value, long curTime) {
@@ -362,25 +332,16 @@ public class FirstValueAccumulator implements Accumulator {
}
}
- protected int addBooleanInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ protected void addBooleanInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateBooleanFirstValue(column[2].getBoolean(i), column[1].getLong(i));
- return i + 1;
+ if (!column[1].isNull(i)) {
+ updateBooleanFirstValue(column[1].getBoolean(i), column[0].getLong(i));
+ return;
}
}
-
- return curPositionCount;
}
protected void updateBooleanFirstValue(boolean value, long curTime) {
@@ -391,25 +352,16 @@ public class FirstValueAccumulator implements Accumulator {
}
}
- protected int addBinaryInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ protected void addBinaryInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateBinaryFirstValue(column[2].getBinary(i), column[1].getLong(i));
- return i + 1;
+ if (!column[1].isNull(i)) {
+ updateBinaryFirstValue(column[1].getBinary(i), column[0].getLong(i));
+ return;
}
}
-
- return curPositionCount;
}
protected void updateBinaryFirstValue(Binary value, long curTime) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueDescAccumulator.java
index 23c40d8de3..f885a3e7fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueDescAccumulator.java
@@ -19,9 +19,9 @@
package org.apache.iotdb.db.mpp.aggregation;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.BitMap;
public class FirstValueDescAccumulator extends FirstValueAccumulator {
@@ -36,122 +36,74 @@ public class FirstValueDescAccumulator extends FirstValueAccumulator {
// Don't break in advance
@Override
- protected int addIntInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ protected void addIntInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateIntFirstValue(column[2].getInt(i), column[1].getLong(i));
+ if (!column[1].isNull(i)) {
+ updateIntFirstValue(column[1].getInt(i), column[0].getLong(i));
}
}
- return curPositionCount;
}
@Override
- protected int addLongInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ protected void addLongInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateLongFirstValue(column[2].getLong(i), column[1].getLong(i));
+ if (!column[1].isNull(i)) {
+ updateLongFirstValue(column[1].getLong(i), column[0].getLong(i));
}
}
- return curPositionCount;
}
@Override
- protected int addFloatInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ protected void addFloatInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateFloatFirstValue(column[2].getFloat(i), column[1].getLong(i));
+ if (!column[1].isNull(i)) {
+ updateFloatFirstValue(column[1].getFloat(i), column[0].getLong(i));
}
}
- return curPositionCount;
}
@Override
- protected int addDoubleInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ protected void addDoubleInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateDoubleFirstValue(column[2].getDouble(i), column[1].getLong(i));
+ if (!column[1].isNull(i)) {
+ updateDoubleFirstValue(column[1].getDouble(i), column[0].getLong(i));
}
}
- return curPositionCount;
}
@Override
- protected int addBooleanInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ protected void addBooleanInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateBooleanFirstValue(column[2].getBoolean(i), column[1].getLong(i));
+ if (!column[1].isNull(i)) {
+ updateBooleanFirstValue(column[1].getBoolean(i), column[0].getLong(i));
}
}
- return curPositionCount;
}
@Override
- protected int addBinaryInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ protected void addBinaryInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateBinaryFirstValue(column[2].getBinary(i), column[1].getLong(i));
+ if (!column[1].isNull(i)) {
+ updateBinaryFirstValue(column[1].getBinary(i), column[0].getLong(i));
}
}
- return curPositionCount;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
index 8441ed197f..d2f71a370a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
@@ -19,13 +19,13 @@
package org.apache.iotdb.db.mpp.aggregation;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import static com.google.common.base.Preconditions.checkArgument;
@@ -42,22 +42,28 @@ public class LastValueAccumulator implements Accumulator {
lastValue = TsPrimitiveType.getByType(seriesDataType);
}
- // Column should be like: | ControlColumn | Time | Value |
+ // Column should be like: | Time | Value |
@Override
- public int addInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
+ public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
switch (seriesDataType) {
case INT32:
- return addIntInput(column, curWindow, ignoringNull);
+ addIntInput(column, bitMap, lastIndex);
+ return;
case INT64:
- return addLongInput(column, curWindow, ignoringNull);
+ addLongInput(column, bitMap, lastIndex);
+ return;
case FLOAT:
- return addFloatInput(column, curWindow, ignoringNull);
+ addFloatInput(column, bitMap, lastIndex);
+ return;
case DOUBLE:
- return addDoubleInput(column, curWindow, ignoringNull);
+ addDoubleInput(column, bitMap, lastIndex);
+ return;
case TEXT:
- return addBinaryInput(column, curWindow, ignoringNull);
+ addBinaryInput(column, bitMap, lastIndex);
+ return;
case BOOLEAN:
- return addBooleanInput(column, curWindow, ignoringNull);
+ addBooleanInput(column, bitMap, lastIndex);
+ return;
default:
throw new UnSupportedDataTypeException(
String.format("Unsupported data type in LastValue: %s", seriesDataType));
@@ -246,23 +252,15 @@ public class LastValueAccumulator implements Accumulator {
return lastValue.getDataType();
}
- protected int addIntInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ protected void addIntInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateIntLastValue(column[2].getInt(i), column[1].getLong(i));
+ if (!column[1].isNull(i)) {
+ updateIntLastValue(column[1].getInt(i), column[0].getLong(i));
}
}
- return curPositionCount;
}
protected void updateIntLastValue(int value, long curTime) {
@@ -273,23 +271,15 @@ public class LastValueAccumulator implements Accumulator {
}
}
- protected int addLongInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ protected void addLongInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateLongLastValue(column[2].getLong(i), column[1].getLong(i));
+ if (!column[1].isNull(i)) {
+ updateLongLastValue(column[1].getLong(i), column[0].getLong(i));
}
}
- return curPositionCount;
}
protected void updateLongLastValue(long value, long curTime) {
@@ -300,23 +290,15 @@ public class LastValueAccumulator implements Accumulator {
}
}
- protected int addFloatInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ protected void addFloatInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateFloatLastValue(column[2].getFloat(i), column[1].getLong(i));
+ if (!column[1].isNull(i)) {
+ updateFloatLastValue(column[1].getFloat(i), column[0].getLong(i));
}
}
- return curPositionCount;
}
protected void updateFloatLastValue(float value, long curTime) {
@@ -327,23 +309,15 @@ public class LastValueAccumulator implements Accumulator {
}
}
- protected int addDoubleInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ protected void addDoubleInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateDoubleLastValue(column[2].getDouble(i), column[1].getLong(i));
+ if (!column[1].isNull(i)) {
+ updateDoubleLastValue(column[1].getDouble(i), column[0].getLong(i));
}
}
- return curPositionCount;
}
protected void updateDoubleLastValue(double value, long curTime) {
@@ -354,23 +328,15 @@ public class LastValueAccumulator implements Accumulator {
}
}
- protected int addBooleanInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ protected void addBooleanInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateBooleanLastValue(column[2].getBoolean(i), column[1].getLong(i));
+ if (!column[1].isNull(i)) {
+ updateBooleanLastValue(column[1].getBoolean(i), column[0].getLong(i));
}
}
- return curPositionCount;
}
protected void updateBooleanLastValue(boolean value, long curTime) {
@@ -381,23 +347,15 @@ public class LastValueAccumulator implements Accumulator {
}
}
- protected int addBinaryInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ protected void addBinaryInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateBinaryLastValue(column[2].getBinary(i), column[1].getLong(i));
+ if (!column[1].isNull(i)) {
+ updateBinaryLastValue(column[1].getBinary(i), column[0].getLong(i));
}
}
- return curPositionCount;
}
protected void updateBinaryLastValue(Binary value, long curTime) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueDescAccumulator.java
index 92c984950c..15607bba15 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueDescAccumulator.java
@@ -19,9 +19,9 @@
package org.apache.iotdb.db.mpp.aggregation;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.BitMap;
public class LastValueDescAccumulator extends LastValueAccumulator {
@@ -40,134 +40,86 @@ public class LastValueDescAccumulator extends LastValueAccumulator {
}
@Override
- protected int addIntInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
+ protected void addIntInput(Column[] column, BitMap needSkip, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
// skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ if (needSkip != null && needSkip.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateIntLastValue(column[2].getInt(i), column[1].getLong(i));
- return i;
+ if (!column[1].isNull(i)) {
+ updateIntLastValue(column[1].getInt(i), column[0].getLong(i));
+ return;
}
}
-
- return curPositionCount;
}
@Override
- protected int addLongInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
+ protected void addLongInput(Column[] column, BitMap needSkip, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
// skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ if (needSkip != null && needSkip.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateLongLastValue(column[2].getLong(i), column[1].getLong(i));
- return i;
+ if (!column[1].isNull(i)) {
+ updateLongLastValue(column[1].getLong(i), column[0].getLong(i));
+ return;
}
}
-
- return curPositionCount;
}
@Override
- protected int addFloatInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
+ protected void addFloatInput(Column[] column, BitMap needSkip, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
// skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ if (needSkip != null && needSkip.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateFloatLastValue(column[2].getFloat(i), column[1].getLong(i));
- return i;
+ if (!column[1].isNull(i)) {
+ updateFloatLastValue(column[1].getFloat(i), column[0].getLong(i));
+ return;
}
}
-
- return curPositionCount;
}
@Override
- protected int addDoubleInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
+ protected void addDoubleInput(Column[] column, BitMap needSkip, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
// skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ if (needSkip != null && needSkip.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateDoubleLastValue(column[2].getDouble(i), column[1].getLong(i));
- return i;
+ if (!column[1].isNull(i)) {
+ updateDoubleLastValue(column[1].getDouble(i), column[0].getLong(i));
+ return;
}
}
-
- return curPositionCount;
}
@Override
- protected int addBooleanInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
+ protected void addBooleanInput(Column[] column, BitMap needSkip, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
// skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ if (needSkip != null && needSkip.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateBooleanLastValue(column[2].getBoolean(i), column[1].getLong(i));
- return i;
+ if (!column[1].isNull(i)) {
+ updateBooleanLastValue(column[1].getBoolean(i), column[0].getLong(i));
+ return;
}
}
-
- return curPositionCount;
}
@Override
- protected int addBinaryInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
+ protected void addBinaryInput(Column[] column, BitMap needSkip, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
// skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ if (needSkip != null && needSkip.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateBinaryLastValue(column[2].getBinary(i), column[1].getLong(i));
- return i;
+ if (!column[1].isNull(i)) {
+ updateBinaryLastValue(column[1].getBinary(i), column[0].getLong(i));
+ return;
}
}
-
- return curPositionCount;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java
index 63e5347759..c715285163 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java
@@ -19,11 +19,11 @@
package org.apache.iotdb.db.mpp.aggregation;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.BitMap;
import static com.google.common.base.Preconditions.checkArgument;
@@ -34,26 +34,18 @@ public class MaxTimeAccumulator implements Accumulator {
public MaxTimeAccumulator() {}
- // Column should be like: | ControlColumn | Time | Value |
+ // Column should be like: | Time | Value |
// Value is used to judge isNull()
@Override
- public int addInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateMaxTime(column[1].getLong(i));
+ if (!column[1].isNull(i)) {
+ updateMaxTime(column[0].getLong(i));
}
}
- return curPositionCount;
}
// partialResult should be like: | partialMaxTimeValue |
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeDescAccumulator.java
index 050be6461f..65c3707c27 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeDescAccumulator.java
@@ -19,33 +19,24 @@
package org.apache.iotdb.db.mpp.aggregation;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.BitMap;
public class MaxTimeDescAccumulator extends MaxTimeAccumulator {
- // Column should be like: | ControlColumn | Time | Value |
+ // Column should be like: | Time | Value |
// Value is used to judge isNull()
@Override
- public int addInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateMaxTime(column[1].getLong(i));
- return i;
+ if (!column[1].isNull(i)) {
+ updateMaxTime(column[0].getLong(i));
+ return;
}
}
-
- return curPositionCount;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java
index 45a21adbe8..5c960f3b30 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java
@@ -19,20 +19,20 @@
package org.apache.iotdb.db.mpp.aggregation;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import static com.google.common.base.Preconditions.checkArgument;
public class MaxValueAccumulator implements Accumulator {
- private TSDataType seriesDataType;
- private TsPrimitiveType maxResult;
+ private final TSDataType seriesDataType;
+ private final TsPrimitiveType maxResult;
private boolean initResult;
public MaxValueAccumulator(TSDataType seriesDataType) {
@@ -40,18 +40,22 @@ public class MaxValueAccumulator implements Accumulator {
this.maxResult = TsPrimitiveType.getByType(seriesDataType);
}
- // Column should be like: | ControlColumn | Time | Value |
+ // Column should be like: | Time | Value |
@Override
- public int addInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
+ public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
switch (seriesDataType) {
case INT32:
- return addIntInput(column, curWindow, ignoringNull);
+ addIntInput(column, bitMap, lastIndex);
+ return;
case INT64:
- return addLongInput(column, curWindow, ignoringNull);
+ addLongInput(column, bitMap, lastIndex);
+ return;
case FLOAT:
- return addFloatInput(column, curWindow, ignoringNull);
+ addFloatInput(column, bitMap, lastIndex);
+ return;
case DOUBLE:
- return addDoubleInput(column, curWindow, ignoringNull);
+ addDoubleInput(column, bitMap, lastIndex);
+ return;
case TEXT:
case BOOLEAN:
default:
@@ -219,23 +223,15 @@ public class MaxValueAccumulator implements Accumulator {
return maxResult.getDataType();
}
- private int addIntInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ private void addIntInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateIntResult(column[2].getInt(i));
+ if (!column[1].isNull(i)) {
+ updateIntResult(column[1].getInt(i));
}
}
- return curPositionCount;
}
private void updateIntResult(int maxVal) {
@@ -245,23 +241,15 @@ public class MaxValueAccumulator implements Accumulator {
}
}
- private int addLongInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ private void addLongInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateLongResult(column[2].getLong(i));
+ if (!column[1].isNull(i)) {
+ updateLongResult(column[1].getLong(i));
}
}
- return curPositionCount;
}
private void updateLongResult(long maxVal) {
@@ -271,23 +259,15 @@ public class MaxValueAccumulator implements Accumulator {
}
}
- private int addFloatInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ private void addFloatInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateFloatResult(column[2].getFloat(i));
+ if (!column[1].isNull(i)) {
+ updateFloatResult(column[1].getFloat(i));
}
}
- return curPositionCount;
}
private void updateFloatResult(float maxVal) {
@@ -297,23 +277,15 @@ public class MaxValueAccumulator implements Accumulator {
}
}
- private int addDoubleInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ private void addDoubleInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateDoubleResult(column[2].getDouble(i));
+ if (!column[1].isNull(i)) {
+ updateDoubleResult(column[1].getDouble(i));
}
}
- return curPositionCount;
}
private void updateDoubleResult(double maxVal) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java
index 9ab1c3d20b..4725265ec6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java
@@ -19,11 +19,11 @@
package org.apache.iotdb.db.mpp.aggregation;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.BitMap;
import static com.google.common.base.Preconditions.checkArgument;
@@ -34,28 +34,19 @@ public class MinTimeAccumulator implements Accumulator {
public MinTimeAccumulator() {}
- // Column should be like: | ControlColumn | Time | Value |
+ // Column should be like: | Time | Value |
// Value is used to judge isNull()
@Override
- public int addInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateMinTime(column[1].getLong(i));
- return i;
+ if (!column[1].isNull(i)) {
+ updateMinTime(column[0].getLong(i));
+ return;
}
}
-
- return curPositionCount;
}
// partialResult should be like: | partialMinTimeValue |
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
index 9640d9332b..dab7c847b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
@@ -19,29 +19,21 @@
package org.apache.iotdb.db.mpp.aggregation;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.BitMap;
public class MinTimeDescAccumulator extends MinTimeAccumulator {
@Override
- public int addInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateMinTime(column[1].getLong(i));
+ if (!column[1].isNull(i)) {
+ updateMinTime(column[0].getLong(i));
}
}
- return curPositionCount;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java
index 493fe64ab7..fdc96e063c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java
@@ -19,20 +19,20 @@
package org.apache.iotdb.db.mpp.aggregation;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import static com.google.common.base.Preconditions.checkArgument;
public class MinValueAccumulator implements Accumulator {
- private TSDataType seriesDataType;
- private TsPrimitiveType minResult;
+ private final TSDataType seriesDataType;
+ private final TsPrimitiveType minResult;
private boolean initResult = false;
public MinValueAccumulator(TSDataType seriesDataType) {
@@ -40,18 +40,22 @@ public class MinValueAccumulator implements Accumulator {
this.minResult = TsPrimitiveType.getByType(seriesDataType);
}
- // Column should be like: | ControlColumn | Time | Value |
+ // Column should be like: | Time | Value |
@Override
- public int addInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
+ public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
switch (seriesDataType) {
case INT32:
- return addIntInput(column, curWindow, ignoringNull);
+ addIntInput(column, bitMap, lastIndex);
+ return;
case INT64:
- return addLongInput(column, curWindow, ignoringNull);
+ addLongInput(column, bitMap, lastIndex);
+ return;
case FLOAT:
- return addFloatInput(column, curWindow, ignoringNull);
+ addFloatInput(column, bitMap, lastIndex);
+ return;
case DOUBLE:
- return addDoubleInput(column, curWindow, ignoringNull);
+ addDoubleInput(column, bitMap, lastIndex);
+ return;
case TEXT:
case BOOLEAN:
default:
@@ -219,23 +223,15 @@ public class MinValueAccumulator implements Accumulator {
return minResult.getDataType();
}
- private int addIntInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ private void addIntInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateIntResult(column[2].getInt(i));
+ if (!column[1].isNull(i)) {
+ updateIntResult(column[1].getInt(i));
}
}
- return curPositionCount;
}
private void updateIntResult(int minVal) {
@@ -245,23 +241,16 @@ public class MinValueAccumulator implements Accumulator {
}
}
- private int addLongInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
+ private void addLongInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
// skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateLongResult(column[2].getLong(i));
+ if (!column[1].isNull(i)) {
+ updateLongResult(column[1].getLong(i));
}
}
- return curPositionCount;
}
private void updateLongResult(long minVal) {
@@ -271,23 +260,15 @@ public class MinValueAccumulator implements Accumulator {
}
}
- private int addFloatInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ private void addFloatInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateFloatResult(column[2].getFloat(i));
+ if (!column[1].isNull(i)) {
+ updateFloatResult(column[1].getFloat(i));
}
}
- return curPositionCount;
}
private void updateFloatResult(float minVal) {
@@ -297,23 +278,15 @@ public class MinValueAccumulator implements Accumulator {
}
}
- private int addDoubleInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ private void addDoubleInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
- updateDoubleResult(column[2].getDouble(i));
+ if (!column[1].isNull(i)) {
+ updateDoubleResult(column[1].getDouble(i));
}
}
- return curPositionCount;
}
private void updateDoubleResult(double minVal) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java
index 060cb8b4c9..3c0b75a22d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java
@@ -19,19 +19,19 @@
package org.apache.iotdb.db.mpp.aggregation;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.BitMap;
import static com.google.common.base.Preconditions.checkArgument;
public class SumAccumulator implements Accumulator {
- private TSDataType seriesDataType;
+ private final TSDataType seriesDataType;
private double sumValue = 0;
private boolean initResult = false;
@@ -39,18 +39,22 @@ public class SumAccumulator implements Accumulator {
this.seriesDataType = seriesDataType;
}
- // Column should be like: | ControlColumn | Time | Value |
+ // Column should be like: | Time | Value |
@Override
- public int addInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
+ public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
switch (seriesDataType) {
case INT32:
- return addIntInput(column, curWindow, ignoringNull);
+ addIntInput(column, bitMap, lastIndex);
+ return;
case INT64:
- return addLongInput(column, curWindow, ignoringNull);
+ addLongInput(column, bitMap, lastIndex);
+ return;
case FLOAT:
- return addFloatInput(column, curWindow, ignoringNull);
+ addFloatInput(column, bitMap, lastIndex);
+ return;
case DOUBLE:
- return addDoubleInput(column, curWindow, ignoringNull);
+ addDoubleInput(column, bitMap, lastIndex);
+ return;
case TEXT:
case BOOLEAN:
default:
@@ -135,83 +139,51 @@ public class SumAccumulator implements Accumulator {
return TSDataType.DOUBLE;
}
- private int addIntInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ private void addIntInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
+ if (!column[1].isNull(i)) {
initResult = true;
- sumValue += column[2].getInt(i);
+ sumValue += column[1].getInt(i);
}
}
- return curPositionCount;
}
- private int addLongInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ private void addLongInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
+ if (!column[1].isNull(i)) {
initResult = true;
- sumValue += column[2].getLong(i);
+ sumValue += column[1].getLong(i);
}
}
- return curPositionCount;
}
- private int addFloatInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ private void addFloatInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
+ if (!column[1].isNull(i)) {
initResult = true;
- sumValue += column[2].getFloat(i);
+ sumValue += column[1].getFloat(i);
}
}
- return curPositionCount;
}
- private int addDoubleInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (ignoringNull && column[0].isNull(i)) {
+ private void addDoubleInput(Column[] column, BitMap bitMap, int lastIndex) {
+ for (int i = 0; i <= lastIndex; i++) {
+ if (bitMap != null && !bitMap.isMarked(i)) {
continue;
}
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint(column, i);
- if (!column[2].isNull(i)) {
+ if (!column[1].isNull(i)) {
initResult = true;
- sumValue += column[2].getDouble(i);
+ sumValue += column[1].getDouble(i);
}
}
- return curPositionCount;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java
index 6eb6012b2f..9ce9f803c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java
@@ -53,8 +53,7 @@ public abstract class SlidingWindowAggregator extends Aggregator {
this.deque = new LinkedList<>();
}
- @Override
- public int processTsBlock(TsBlock tsBlock, boolean ignoringNull) {
+ public void processTsBlock(TsBlock tsBlock) {
checkArgument(
step.isInputPartial(),
"Step in SlidingWindowAggregationOperator can only process partial result");
@@ -68,10 +67,8 @@ public abstract class SlidingWindowAggregator extends Aggregator {
valueColumn[i] = tsBlock.getColumn(inputLocation.getValueColumnIndex());
}
processPartialResult(new PartialAggregationResult(timeColumn, valueColumn));
- return 1;
}
- @Override
public void updateTimeRange(TimeRange curTimeRange) {
this.curTimeRange = curTimeRange;
evictingExpiredValue();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
index 5458e92482..586ff48be3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.SingleTimeWindowIterator;
import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.TimeRangeIteratorFactory;
+import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
+import org.apache.iotdb.db.mpp.execution.operator.window.TimeWindow;
import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
@@ -104,16 +106,26 @@ public class AggregationUtil {
inputTsBlock = skipPointsOutOfTimeRange(inputTsBlock, curTimeRange, ascending);
}
- int lastReadRowIndex = 0;
+ // Get the row which need to be processed by aggregator
+ IWindow curWindow = new TimeWindow(curTimeRange);
+ TimeColumn timeColumn = inputTsBlock.getTimeColumn();
+ int lastIndexToProcess = 0;
+ for (int i = 0; i < inputTsBlock.getPositionCount(); i++) {
+ if (!curWindow.satisfy(timeColumn, i)) {
+ break;
+ }
+ lastIndexToProcess = i;
+ }
+
for (Aggregator aggregator : aggregators) {
// current agg method has been calculated
if (aggregator.hasFinalResult()) {
continue;
}
- lastReadRowIndex =
- Math.max(lastReadRowIndex, aggregator.processTsBlock(inputTsBlock, true));
+ aggregator.processTsBlock(inputTsBlock, null, lastIndexToProcess);
}
+ int lastReadRowIndex = lastIndexToProcess + 1;
if (lastReadRowIndex >= inputTsBlock.getPositionCount()) {
inputTsBlock = null;
} else {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 62b5e7a53c..364086c4fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -117,7 +117,7 @@ public class AggregationOperator extends AbstractConsumeAllOperator {
// clear previous aggregation result
for (Aggregator aggregator : aggregators) {
- aggregator.updateTimeRange(curTimeRange);
+ aggregator.reset();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index 33b9c8dd99..97e6f6d93c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.db.mpp.execution.operator.window.IWindowManager;
import org.apache.iotdb.db.mpp.execution.operator.window.WindowParameter;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.BitMap;
import java.util.List;
@@ -102,7 +104,7 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
} else {
// If there are no points belong to last time window, the last time window will not
// initialize window and aggregators. Specially for time window.
- if (windowManager.notInitedLastTimeWindow()) {
+ if (windowManager.notInitializedLastTimeWindow()) {
initWindowAndAggregators();
}
// If the window is not initialized, it just returns to avoid invoking updateResultTsBlock()
@@ -148,25 +150,48 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
if (windowManager.satisfiedCurWindow(inputTsBlock)) {
- int lastReadRowIndex = 0;
+ // Get the indexes in tsBlock which needs to be processed by aggregator, and the last row
+ // needed to be processed.
+ int tsBlockSize = inputTsBlock.getPositionCount();
+ IWindow curWindow = windowManager.getCurWindow();
+
+ Column[] controlAndTimeColumn = new Column[2];
+ controlAndTimeColumn[0] = curWindow.getControlColumn(inputTsBlock);
+ controlAndTimeColumn[1] = inputTsBlock.getTimeColumn();
+
+ BitMap needProcess = new BitMap(tsBlockSize);
+ int lastIndexToProcess = -1;
+ boolean hasSkip = false;
+
+ for (int i = 0; i < tsBlockSize; i++) {
+ if (windowManager.isIgnoringNull() && controlAndTimeColumn[0].isNull(i)) {
+ lastIndexToProcess = i;
+ hasSkip = true;
+ continue;
+ }
+ if (!curWindow.satisfy(controlAndTimeColumn[0], i)) {
+ break;
+ }
+ needProcess.mark(i);
+ curWindow.mergeOnePoint(controlAndTimeColumn, i);
+ lastIndexToProcess = i;
+ }
+
+ // if no row needs to skip, just send a null parameter.
+ if (!hasSkip) needProcess = null;
+
for (Aggregator aggregator : aggregators) {
// Current agg method has been calculated
if (aggregator.hasFinalResult()) {
continue;
}
- lastReadRowIndex =
- Math.max(
- lastReadRowIndex,
- aggregator.processTsBlock(inputTsBlock, windowManager.isIgnoringNull()));
+ aggregator.processTsBlock(inputTsBlock, needProcess, lastIndexToProcess);
}
-
+ int lastReadRowIndex = lastIndexToProcess + 1;
// If lastReadRowIndex is not zero, some of tsBlock is consumed and result is cached in
// aggregators.
if (lastReadRowIndex != 0) {
- // todo update the keep value in group by series, it will be removed in the future
- windowManager.setKeep(lastReadRowIndex);
- windowManager.setLastTsBlockTime();
hasCachedDataInAggregator = true;
}
if (lastReadRowIndex >= inputTsBlock.getPositionCount()) {
@@ -207,9 +232,8 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
private void initWindowAndAggregators() {
windowManager.initCurWindow();
- IWindow curWindow = windowManager.getCurWindow();
for (Aggregator aggregator : aggregators) {
- aggregator.updateWindow(curWindow);
+ aggregator.reset();
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
index 95df36c2e7..3fb0dca85a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.aggregation.slidingwindow.SlidingWindowAggregator;
import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
@@ -82,7 +83,7 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper
// clear previous aggregation result
for (Aggregator aggregator : aggregators) {
- aggregator.updateTimeRange(curTimeRange);
+ ((SlidingWindowAggregator) aggregator).updateTimeRange(curTimeRange);
}
}
@@ -129,7 +130,7 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper
}
for (Aggregator aggregator : aggregators) {
- aggregator.processTsBlock(inputTsBlock, true);
+ ((SlidingWindowAggregator) aggregator).processTsBlock(inputTsBlock);
}
inputTsBlock = inputTsBlock.skipFirst();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index ee7c0a0b64..afb2957f93 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -131,7 +131,7 @@ public abstract class AbstractSeriesAggregationScanOperator extends AbstractData
// clear previous aggregation result
for (Aggregator aggregator : aggregators) {
- aggregator.updateTimeRange(curTimeRange);
+ aggregator.reset();
}
// calculate aggregation result on current time window
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/AbstractVariationWindow.java
similarity index 63%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindow.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/AbstractVariationWindow.java
index 63356361f4..5ce44233b7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/AbstractVariationWindow.java
@@ -23,25 +23,28 @@ import org.apache.iotdb.db.mpp.aggregation.Accumulator;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
-public abstract class EventWindow implements IWindow {
+public abstract class AbstractVariationWindow implements IWindow {
- protected EventWindowParameter eventWindowParameter;
+ private final double delta;
+ private final int controlColumnIndex;
+ private final boolean outputEndTime;
+ private final boolean ignoreNull;
protected long startTime;
-
protected long endTime;
-
- protected boolean initializedEventValue;
-
+ protected boolean initializedHeadValue;
protected boolean valueIsNull = false;
- protected EventWindow(EventWindowParameter eventWindowParameter) {
- this.eventWindowParameter = eventWindowParameter;
+ protected AbstractVariationWindow(VariationWindowParameter variationWindowParameter) {
+ this.controlColumnIndex = variationWindowParameter.getControlColumnIndex();
+ this.ignoreNull = variationWindowParameter.isIgnoringNull();
+ this.outputEndTime = variationWindowParameter.isNeedOutputEndTime();
+ this.delta = variationWindowParameter.getDelta();
}
@Override
public Column getControlColumn(TsBlock tsBlock) {
- return tsBlock.getColumn(eventWindowParameter.getControlColumnIndex());
+ return tsBlock.getColumn(controlColumnIndex);
}
@Override
@@ -49,13 +52,12 @@ public abstract class EventWindow implements IWindow {
return accumulator.hasFinalResult();
}
- // TODO
@Override
public boolean contains(Column column) {
return false;
}
- public abstract void updatePreviousEventValue();
+ public abstract void updatePreviousValue();
public long getStartTime() {
return startTime;
@@ -73,15 +75,23 @@ public abstract class EventWindow implements IWindow {
this.endTime = endTime;
}
- public void setInitializedEventValue(boolean initializedEventValue) {
- this.initializedEventValue = initializedEventValue;
+ public void setInitializedHeadValue(boolean initializedHeadValue) {
+ this.initializedHeadValue = initializedHeadValue;
+ }
+
+ public boolean ignoreNull() {
+ return ignoreNull;
}
public boolean valueIsNull() {
return valueIsNull;
}
- public boolean ignoringNull() {
- return eventWindowParameter.isIgnoringNull();
+ public boolean isOutputEndTime() {
+ return outputEndTime;
+ }
+
+ public double getDelta() {
+ return delta;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/ConditionWindow.java
similarity index 88%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindow.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/ConditionWindow.java
index b32238e196..5ed29a80d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/ConditionWindow.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.mpp.aggregation.Accumulator;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
-public class SeriesWindow implements IWindow {
+public class ConditionWindow implements IWindow {
private final int controlColumnIndex;
private final boolean outputEndTime;
@@ -33,10 +33,10 @@ public class SeriesWindow implements IWindow {
private long keep;
private boolean timeInitialized;
- public SeriesWindow(SeriesWindowParameter seriesWindowParameter) {
- this.ignoringNull = seriesWindowParameter.isIgnoringNull();
- this.controlColumnIndex = seriesWindowParameter.getControlColumnIndex();
- this.outputEndTime = seriesWindowParameter.isNeedOutputEndTime();
+ public ConditionWindow(ConditionWindowParameter conditionWindowParameter) {
+ this.ignoringNull = conditionWindowParameter.isIgnoringNull();
+ this.controlColumnIndex = conditionWindowParameter.getControlColumnIndex();
+ this.outputEndTime = conditionWindowParameter.isNeedOutputEndTime();
}
@Override
@@ -54,6 +54,7 @@ public class SeriesWindow implements IWindow {
@Override
public void mergeOnePoint(Column[] controlTimeAndValueColumn, int index) {
+ keep++;
long currentTime = controlTimeAndValueColumn[1].getLong(index);
if (!timeInitialized) {
startTime = currentTime;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/ConditionWindowManager.java
similarity index 68%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindowManager.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/ConditionWindowManager.java
index d8c9c128aa..f1551fedda 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindowManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/ConditionWindowManager.java
@@ -26,15 +26,13 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import java.util.List;
-public class SeriesWindowManager implements IWindowManager {
+public class ConditionWindowManager implements IWindowManager {
- private final SeriesWindow seriesWindow;
+ private final ConditionWindow conditionWindow;
private boolean initialized;
private boolean needSkip;
@@ -47,13 +45,13 @@ public class SeriesWindowManager implements IWindowManager {
private boolean isFirstSkip;
private final KeepEvaluator keepEvaluator;
- public SeriesWindowManager(SeriesWindowParameter seriesWindowParameter) {
- this.seriesWindow = new SeriesWindow(seriesWindowParameter);
+ public ConditionWindowManager(ConditionWindowParameter conditionWindowParameter) {
+ this.conditionWindow = new ConditionWindow(conditionWindowParameter);
// In group by condition, the first data point cannot be guaranteed to be true in controlColumn,
// so there is going to be a skipPointsOutOfBounds() in the beginning.
this.needSkip = true;
this.keepEvaluator =
- AccumulatorFactory.initKeepEvaluator(seriesWindowParameter.getKeepExpression());
+ AccumulatorFactory.initKeepEvaluator(conditionWindowParameter.getKeepExpression());
}
@Override
@@ -64,8 +62,8 @@ public class SeriesWindowManager implements IWindowManager {
@Override
public void initCurWindow() {
this.initialized = true;
- this.seriesWindow.setTimeInitialized(false);
- this.seriesWindow.setKeep(0);
+ this.conditionWindow.setTimeInitialized(false);
+ this.conditionWindow.setKeep(0);
}
@Override
@@ -82,7 +80,7 @@ public class SeriesWindowManager implements IWindowManager {
@Override
public IWindow getCurWindow() {
- return seriesWindow;
+ return conditionWindow;
}
@Override
@@ -95,7 +93,7 @@ public class SeriesWindowManager implements IWindowManager {
return inputTsBlock;
}
- Column controlColumn = seriesWindow.getControlColumn(inputTsBlock);
+ Column controlColumn = conditionWindow.getControlColumn(inputTsBlock);
TimeColumn timeColumn = inputTsBlock.getTimeColumn();
int i = 0, size = inputTsBlock.getPositionCount();
int k = 0;
@@ -117,11 +115,11 @@ public class SeriesWindowManager implements IWindowManager {
if (isFirstSkip) {
k++;
long currentTime = timeColumn.getLong(i);
- if (seriesWindow.getStartTime() > currentTime) {
- seriesWindow.setStartTime(currentTime);
+ if (conditionWindow.getStartTime() > currentTime) {
+ conditionWindow.setStartTime(currentTime);
}
- if (seriesWindow.getEndTime() < currentTime) {
- seriesWindow.setEndTime(currentTime);
+ if (conditionWindow.getEndTime() < currentTime) {
+ conditionWindow.setEndTime(currentTime);
}
}
}
@@ -130,7 +128,7 @@ public class SeriesWindowManager implements IWindowManager {
// not finish.
if (isFirstSkip) {
if (i != size) isFirstSkip = false;
- seriesWindow.setKeep(seriesWindow.getKeep() + k);
+ conditionWindow.setKeep(conditionWindow.getKeep() + k);
return inputTsBlock.subTsBlock(i);
}
@@ -145,7 +143,7 @@ public class SeriesWindowManager implements IWindowManager {
public TsBlockBuilder createResultTsBlockBuilder(List<Aggregator> aggregators) {
List<TSDataType> dataTypes = getResultDataTypes(aggregators);
// Judge whether we need output endTime column.
- if (seriesWindow.isOutputEndTime()) {
+ if (conditionWindow.isOutputEndTime()) {
dataTypes.add(0, TSDataType.INT64);
}
return new TsBlockBuilder(dataTypes);
@@ -154,29 +152,11 @@ public class SeriesWindowManager implements IWindowManager {
@Override
public void appendAggregationResult(
TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
- if (!keepEvaluator.apply(seriesWindow.getKeep())) {
- for (Aggregator aggregator : aggregators) aggregator.reset();
+ if (!keepEvaluator.apply(conditionWindow.getKeep())) {
return;
}
- // Use the start time of eventWindow as default output time.
- TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
- timeColumnBuilder.writeLong(seriesWindow.getStartTime());
-
- ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
- int columnIndex = 0;
- if (seriesWindow.isOutputEndTime()) {
- columnBuilders[0].writeLong(seriesWindow.getEndTime());
- columnIndex = 1;
- }
- for (Aggregator aggregator : aggregators) {
- ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length];
- columnBuilder[0] = columnBuilders[columnIndex++];
- if (columnBuilder.length > 1) {
- columnBuilder[1] = columnBuilders[columnIndex++];
- }
- aggregator.outputResult(columnBuilder);
- }
- resultTsBlockBuilder.declarePosition();
+ long endTime = conditionWindow.isOutputEndTime() ? conditionWindow.getEndTime() : -1;
+ outputAggregators(aggregators, resultTsBlockBuilder, conditionWindow.getStartTime(), endTime);
}
@Override
@@ -186,11 +166,6 @@ public class SeriesWindowManager implements IWindowManager {
@Override
public boolean isIgnoringNull() {
- return seriesWindow.ignoringNull();
- }
-
- @Override
- public void setKeep(long keep) {
- seriesWindow.setKeep(seriesWindow.getKeep() + keep);
+ return conditionWindow.ignoringNull();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindowParameter.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/ConditionWindowParameter.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindowParameter.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/ConditionWindowParameter.java
index cde53be91f..83a3149965 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindowParameter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/ConditionWindowParameter.java
@@ -21,19 +21,19 @@ package org.apache.iotdb.db.mpp.execution.operator.window;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
-public class SeriesWindowParameter extends WindowParameter {
+public class ConditionWindowParameter extends WindowParameter {
private final boolean ignoringNull;
private final int controlColumnIndex;
private final Expression keepExpression;
- public SeriesWindowParameter(
+ public ConditionWindowParameter(
boolean needOutputEndTime,
boolean ignoringNull,
int controlColumnIndex,
Expression keepExpression) {
super(needOutputEndTime);
- this.windowType = WindowType.SERIES_WINDOW;
+ this.windowType = WindowType.CONDITION_WINDOW;
this.ignoringNull = ignoringNull;
this.controlColumnIndex = controlColumnIndex;
this.keepExpression = keepExpression;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindow.java
index f6003c0120..ee39f197c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindow.java
@@ -46,7 +46,7 @@ public interface IWindow {
* When we merge a point into window, at this time, we can use this method to change the status in
* this window
*/
- void mergeOnePoint(Column[] controlTimeAndValueColumn, int index);
+ void mergeOnePoint(Column[] timeAndValueColumn, int index);
/**
* Used to customize whether the window has final aggregation result
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindowManager.java
index 50c94da4fc..ef0f493a1c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindowManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindowManager.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import java.util.ArrayList;
import java.util.Arrays;
@@ -77,7 +79,7 @@ public interface IWindowManager {
*/
default boolean satisfiedCurWindow(TsBlock inputTsBlock) {
return true;
- };
+ }
/**
* Used to determine whether there are extra points for the next window
@@ -87,12 +89,12 @@ public interface IWindowManager {
*/
default boolean isTsBlockOutOfBound(TsBlock inputTsBlock) {
return false;
- };
+ }
/**
* According to the Aggregator list, we could obtain all the aggregation result column type list.
*
- * @param aggregators
+ * @param aggregators the list of aggregators
* @return Aggregation result column type list.
*/
default List<TSDataType> getResultDataTypes(List<Aggregator> aggregators) {
@@ -109,7 +111,7 @@ public interface IWindowManager {
* <p>For the implementation, we should consider whether we need to add endTime column and event
* column in the resultSet besides the aggregation columns.
*
- * @param aggregators
+ * @param aggregators the list of aggregators
* @return TsBlockBuilder of resultSet
*/
TsBlockBuilder createResultTsBlockBuilder(List<Aggregator> aggregators);
@@ -121,8 +123,8 @@ public interface IWindowManager {
* whether we need to add endTime column and event column in the resultSet besides the aggregation
* columns.
*
- * @param resultTsBlockBuilder
- * @param aggregators
+ * @param resultTsBlockBuilder tsBlockBuilder for resultSet
+ * @param aggregators the list of aggregators
*/
void appendAggregationResult(TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators);
@@ -132,9 +134,9 @@ public interface IWindowManager {
*
* @return whether the window is TimeWindow and the last TimeWindow has not been initialized
*/
- default boolean notInitedLastTimeWindow() {
+ default boolean notInitializedLastTimeWindow() {
return false;
- };
+ }
/**
* When endTime is required in resultSet, operator should skip the points in last window directly
@@ -151,9 +153,33 @@ public interface IWindowManager {
*/
boolean isIgnoringNull();
- // TODO: "group by series" used for keep value temporarily, it will be removed in the future.
- default void setKeep(long keep) {}
-
- // TODO: "group by session" used for keeping lastTsBlockTime, it will be removed in the future.
- default void setLastTsBlockTime() {}
+ /**
+ * output the result in aggregators to columnBuilders
+ *
+ * @param endTime if the window doesn't need to output endTime, just assign -1 to endTime.
+ */
+ default void outputAggregators(
+ List<Aggregator> aggregators,
+ TsBlockBuilder resultTsBlockBuilder,
+ long startTime,
+ long endTime) {
+ TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
+ timeColumnBuilder.writeLong(startTime);
+
+ ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
+ int columnIndex = 0;
+ if (endTime != -1) {
+ columnBuilders[0].writeLong(endTime);
+ columnIndex = 1;
+ }
+ for (Aggregator aggregator : aggregators) {
+ ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length];
+ columnBuilder[0] = columnBuilders[columnIndex++];
+ if (columnBuilder.length > 1) {
+ columnBuilder[1] = columnBuilders[columnIndex++];
+ }
+ aggregator.outputResult(columnBuilder);
+ }
+ resultTsBlockBuilder.declarePosition();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindow.java
index 68c4aa30af..79b22dd858 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindow.java
@@ -83,6 +83,7 @@ public class SessionWindow implements IWindow {
}
// update the last time of session window
timeValue = ascending ? Math.max(timeValue, currentTime) : Math.min(timeValue, currentTime);
+ setLastTsBlockTime(timeValue);
}
@Override
@@ -147,10 +148,6 @@ public class SessionWindow implements IWindow {
this.initializedTimeValue = initializedTimeValue;
}
- public long getLastTsBlockTime() {
- return lastTsBlockTime;
- }
-
public void setLastTsBlockTime(long lastTsBlockTime) {
this.lastTsBlockTime = lastTsBlockTime;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindowManager.java
index d8976445b4..d76496aa92 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindowManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindowManager.java
@@ -23,9 +23,7 @@ import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import java.util.List;
@@ -138,30 +136,8 @@ public class SessionWindowManager implements IWindowManager {
@Override
public void appendAggregationResult(
TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
- // Use the start time of sessionWindow as default output time.
- TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
- timeColumnBuilder.writeLong(sessionWindow.getStartTime());
-
- ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
- int columnIndex = 0;
- if (isNeedOutputEndTime) {
- columnBuilders[0].writeLong(sessionWindow.getEndTime());
- columnIndex = 1;
- }
- for (Aggregator aggregator : aggregators) {
- ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length];
- columnBuilder[0] = columnBuilders[columnIndex++];
- if (columnBuilder.length > 1) {
- columnBuilder[1] = columnBuilders[columnIndex++];
- }
- aggregator.outputResult(columnBuilder);
- }
- resultTsBlockBuilder.declarePosition();
- }
-
- @Override
- public boolean notInitedLastTimeWindow() {
- return false;
+ long endTime = isNeedOutputEndTime ? sessionWindow.getEndTime() : -1;
+ outputAggregators(aggregators, resultTsBlockBuilder, sessionWindow.getStartTime(), endTime);
}
@Override
@@ -173,9 +149,4 @@ public class SessionWindowManager implements IWindowManager {
public boolean isIgnoringNull() {
return false;
}
-
- @Override
- public void setLastTsBlockTime() {
- this.sessionWindow.setLastTsBlockTime(this.sessionWindow.getTimeValue());
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindowManager.java
index 8a0e2f3f2a..5321e7d14b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindowManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindowManager.java
@@ -26,8 +26,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.read.common.block.TsBlockUtil;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import java.util.List;
@@ -150,28 +148,12 @@ public class TimeWindowManager implements IWindowManager {
@Override
public void appendAggregationResult(
TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
- TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
- // Use start time of current time range as time column
- timeColumnBuilder.writeLong(startTime);
- ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
- int columnIndex = 0;
- if (this.needOutputEndTime) {
- columnBuilders[0].writeLong(endTime);
- columnIndex = 1;
- }
- for (Aggregator aggregator : aggregators) {
- ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length];
- columnBuilder[0] = columnBuilders[columnIndex++];
- if (columnBuilder.length > 1) {
- columnBuilder[1] = columnBuilders[columnIndex++];
- }
- aggregator.outputResult(columnBuilder);
- }
- resultTsBlockBuilder.declarePosition();
+ long endTime = this.needOutputEndTime ? this.endTime : -1;
+ outputAggregators(aggregators, resultTsBlockBuilder, this.startTime, endTime);
}
@Override
- public boolean notInitedLastTimeWindow() {
+ public boolean notInitializedLastTimeWindow() {
return !this.initialized;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationWindowManager.java
similarity index 58%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindowManager.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationWindowManager.java
index ca1acf0ba7..b4d162b68a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindowManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationWindowManager.java
@@ -22,12 +22,10 @@ package org.apache.iotdb.db.mpp.execution.operator.window;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import java.util.List;
-public abstract class EventWindowManager implements IWindowManager {
+public abstract class VariationWindowManager implements IWindowManager {
protected boolean initialized;
@@ -35,12 +33,9 @@ public abstract class EventWindowManager implements IWindowManager {
protected boolean needSkip;
- protected EventWindowParameter eventWindowParameter;
+ protected AbstractVariationWindow variationWindow;
- protected EventWindow eventWindow;
-
- protected EventWindowManager(EventWindowParameter eventWindowParameter, boolean ascending) {
- this.eventWindowParameter = eventWindowParameter;
+ protected VariationWindowManager(boolean ascending) {
this.initialized = false;
this.ascending = ascending;
// At beginning, we do not need to skip inputTsBlock
@@ -48,7 +43,7 @@ public abstract class EventWindowManager implements IWindowManager {
}
public boolean isIgnoringNull() {
- return eventWindowParameter.isIgnoringNull();
+ return variationWindow.ignoreNull();
}
@Override
@@ -59,7 +54,7 @@ public abstract class EventWindowManager implements IWindowManager {
@Override
public void initCurWindow() {
this.initialized = true;
- this.eventWindow.setInitializedEventValue(false);
+ this.variationWindow.setInitializedHeadValue(false);
}
@Override
@@ -73,50 +68,33 @@ public abstract class EventWindowManager implements IWindowManager {
// belong to previous window have been consumed. If not, we need skip these points.
this.needSkip = true;
this.initialized = false;
- this.eventWindow.updatePreviousEventValue();
+ this.variationWindow.updatePreviousValue();
}
@Override
public IWindow getCurWindow() {
- return eventWindow;
+ return variationWindow;
}
@Override
public TsBlockBuilder createResultTsBlockBuilder(List<Aggregator> aggregators) {
List<TSDataType> dataTypes = getResultDataTypes(aggregators);
// Judge whether we need output endTime column.
- if (eventWindowParameter.isNeedOutputEndTime()) {
+ if (variationWindow.isOutputEndTime()) {
dataTypes.add(0, TSDataType.INT64);
}
return new TsBlockBuilder(dataTypes);
}
- protected ColumnBuilder[] appendOriginAggregationResult(
+ public void appendAggregationResult(
TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
- // Use the start time of eventWindow as default output time.
- TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
- timeColumnBuilder.writeLong(eventWindow.getStartTime());
-
- ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
- int columnIndex = 0;
- if (eventWindowParameter.isNeedOutputEndTime()) {
- columnBuilders[0].writeLong(eventWindow.getEndTime());
- columnIndex = 1;
- }
- for (Aggregator aggregator : aggregators) {
- ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length];
- columnBuilder[0] = columnBuilders[columnIndex++];
- if (columnBuilder.length > 1) {
- columnBuilder[1] = columnBuilders[columnIndex++];
- }
- aggregator.outputResult(columnBuilder);
- }
- resultTsBlockBuilder.declarePosition();
- return columnBuilders;
+
+ long endTime = variationWindow.isOutputEndTime() ? variationWindow.getEndTime() : -1;
+ outputAggregators(aggregators, resultTsBlockBuilder, variationWindow.getStartTime(), endTime);
}
@Override
public boolean needSkipInAdvance() {
- return eventWindowParameter.isNeedOutputEndTime();
+ return variationWindow.isOutputEndTime();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindowParameter.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationWindowParameter.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindowParameter.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationWindowParameter.java
index 2bfcb7d4d9..7fdcb0b621 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindowParameter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationWindowParameter.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.mpp.execution.operator.window;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-public class EventWindowParameter extends WindowParameter {
+public class VariationWindowParameter extends WindowParameter {
private final boolean ignoringNull;
private final double delta;
@@ -29,7 +29,7 @@ public class EventWindowParameter extends WindowParameter {
private final int controlColumnIndex;
- public EventWindowParameter(
+ public VariationWindowParameter(
TSDataType dataType,
int controlColumnIndex,
boolean needOutputEndTime,
@@ -40,7 +40,7 @@ public class EventWindowParameter extends WindowParameter {
this.dataType = dataType;
this.ignoringNull = ignoringNull;
this.delta = delta;
- this.windowType = WindowType.EVENT_WINDOW;
+ this.windowType = WindowType.VARIATION_WINDOW;
}
public boolean isIgnoringNull() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowManagerFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowManagerFactory.java
index eaeba4d246..8e5df9a2d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowManagerFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowManagerFactory.java
@@ -29,12 +29,12 @@ public class WindowManagerFactory {
switch (windowParameter.getWindowType()) {
case TIME_WINDOW:
return new TimeWindowManager(timeRangeIterator, (TimeWindowParameter) windowParameter);
- case EVENT_WINDOW:
- return ((EventWindowParameter) windowParameter).getDelta() == 0
- ? genEqualEventWindowManager((EventWindowParameter) windowParameter, ascending)
- : genVariationEventWindowManager((EventWindowParameter) windowParameter, ascending);
- case SERIES_WINDOW:
- return new SeriesWindowManager((SeriesWindowParameter) windowParameter);
+ case VARIATION_WINDOW:
+ return ((VariationWindowParameter) windowParameter).getDelta() == 0
+ ? genEqualEventWindowManager((VariationWindowParameter) windowParameter, ascending)
+ : genVariationEventWindowManager((VariationWindowParameter) windowParameter, ascending);
+ case CONDITION_WINDOW:
+ return new ConditionWindowManager((ConditionWindowParameter) windowParameter);
case SESSION_WINDOW:
return new SessionWindowManager(
windowParameter.isNeedOutputEndTime(),
@@ -47,21 +47,21 @@ public class WindowManagerFactory {
}
}
- private static EventWindowManager genEqualEventWindowManager(
- EventWindowParameter eventWindowParameter, boolean ascending) {
+ private static VariationWindowManager genEqualEventWindowManager(
+ VariationWindowParameter eventWindowParameter, boolean ascending) {
switch (eventWindowParameter.getDataType()) {
case INT32:
- return new EqualEventIntWindowManager(eventWindowParameter, ascending);
+ return new EqualIntWindowManager(eventWindowParameter, ascending);
case INT64:
- return new EqualEventLongWindowManager(eventWindowParameter, ascending);
+ return new EqualLongWindowManager(eventWindowParameter, ascending);
case FLOAT:
- return new EqualEventFloatWindowManager(eventWindowParameter, ascending);
+ return new EqualFloatWindowManager(eventWindowParameter, ascending);
case DOUBLE:
- return new EqualEventDoubleWindowManager(eventWindowParameter, ascending);
+ return new EqualDoubleWindowManager(eventWindowParameter, ascending);
case TEXT:
- return new EqualEventBinaryWindowManager(eventWindowParameter, ascending);
+ return new EqualBinaryWindowManager(eventWindowParameter, ascending);
case BOOLEAN:
- return new EqualEventBooleanWindowManager(eventWindowParameter, ascending);
+ return new EqualBooleanWindowManager(eventWindowParameter, ascending);
default:
throw new UnSupportedDataTypeException(
String.format(
@@ -70,17 +70,17 @@ public class WindowManagerFactory {
}
}
- private static EventWindowManager genVariationEventWindowManager(
- EventWindowParameter eventWindowParameter, boolean ascending) {
+ private static VariationWindowManager genVariationEventWindowManager(
+ VariationWindowParameter eventWindowParameter, boolean ascending) {
switch (eventWindowParameter.getDataType()) {
case INT32:
- return new VariationEventIntWindowManager(eventWindowParameter, ascending);
+ return new VariationIntWindowManager(eventWindowParameter, ascending);
case INT64:
- return new VariationEventLongWindowManager(eventWindowParameter, ascending);
+ return new VariationLongWindowManager(eventWindowParameter, ascending);
case FLOAT:
- return new VariationEventFloatWindowManager(eventWindowParameter, ascending);
+ return new VariationFloatWindowManager(eventWindowParameter, ascending);
case DOUBLE:
- return new VariationEventDoubleWindowManager(eventWindowParameter, ascending);
+ return new VariationDoubleWindowManager(eventWindowParameter, ascending);
default:
throw new UnSupportedDataTypeException(
String.format(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java
index e2ff0f8241..acc12791da 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.db.mpp.execution.operator.window;
public enum WindowType {
TIME_WINDOW((byte) 0),
- EVENT_WINDOW((byte) 1),
- SERIES_WINDOW((byte) 2),
+ VARIATION_WINDOW((byte) 1),
+ CONDITION_WINDOW((byte) 2),
SESSION_WINDOW((byte) 3);
private final byte type;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index c0977cbd92..25953f8460 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -67,8 +67,8 @@ import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByConditionParameter;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByParameter;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupBySeriesParameter;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupBySessionParameter;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByVariationParameter;
@@ -79,7 +79,7 @@ import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.plan.statement.component.FillComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.GroupByComponent;
-import org.apache.iotdb.db.mpp.plan.statement.component.GroupBySeriesComponent;
+import org.apache.iotdb.db.mpp.plan.statement.component.GroupByConditionComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.GroupBySessionComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.GroupByVariationComponent;
@@ -1189,7 +1189,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
}
}
- if (windowType == WindowType.EVENT_WINDOW) {
+ if (windowType == WindowType.VARIATION_WINDOW) {
double delta = ((GroupByVariationComponent) groupByComponent).getDelta();
for (Expression expression : deviceToGroupByExpression.values()) {
checkGroupByVariationExpressionType(analysis, expression, delta);
@@ -1198,13 +1198,14 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
new GroupByVariationParameter(groupByComponent.isIgnoringNull(), delta);
analysis.setGroupByParameter(groupByParameter);
analysis.setDeviceToGroupByExpression(deviceToGroupByExpression);
- } else if (windowType == WindowType.SERIES_WINDOW) {
- Expression keepExpression = ((GroupBySeriesComponent) groupByComponent).getKeepExpression();
+ } else if (windowType == WindowType.CONDITION_WINDOW) {
+ Expression keepExpression =
+ ((GroupByConditionComponent) groupByComponent).getKeepExpression();
for (Expression expression : deviceToGroupByExpression.values()) {
- checkGroupBySeriesExpressionType(analysis, expression, keepExpression);
+ checkGroupByConditionExpressionType(analysis, expression, keepExpression);
}
GroupByParameter groupByParameter =
- new GroupBySeriesParameter(groupByComponent.isIgnoringNull(), keepExpression);
+ new GroupByConditionParameter(groupByComponent.isIgnoringNull(), keepExpression);
analysis.setGroupByParameter(groupByParameter);
analysis.setDeviceToGroupByExpression(deviceToGroupByExpression);
} else if (windowType == WindowType.SESSION_WINDOW) {
@@ -1244,18 +1245,19 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
groupByExpression = expressions.get(0);
}
- if (windowType == WindowType.EVENT_WINDOW) {
+ if (windowType == WindowType.VARIATION_WINDOW) {
double delta = ((GroupByVariationComponent) groupByComponent).getDelta();
checkGroupByVariationExpressionType(analysis, groupByExpression, delta);
GroupByParameter groupByParameter =
new GroupByVariationParameter(groupByComponent.isIgnoringNull(), delta);
analysis.setGroupByExpression(groupByExpression);
analysis.setGroupByParameter(groupByParameter);
- } else if (windowType == WindowType.SERIES_WINDOW) {
- Expression keepExpression = ((GroupBySeriesComponent) groupByComponent).getKeepExpression();
- checkGroupBySeriesExpressionType(analysis, groupByExpression, keepExpression);
+ } else if (windowType == WindowType.CONDITION_WINDOW) {
+ Expression keepExpression =
+ ((GroupByConditionComponent) groupByComponent).getKeepExpression();
+ checkGroupByConditionExpressionType(analysis, groupByExpression, keepExpression);
GroupByParameter groupByParameter =
- new GroupBySeriesParameter(groupByComponent.isIgnoringNull(), keepExpression);
+ new GroupByConditionParameter(groupByComponent.isIgnoringNull(), keepExpression);
analysis.setGroupByExpression(groupByExpression);
analysis.setGroupByParameter(groupByParameter);
} else if (windowType == WindowType.SESSION_WINDOW) {
@@ -1277,7 +1279,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
}
}
- private void checkGroupBySeriesExpressionType(
+ private void checkGroupByConditionExpressionType(
Analysis analysis, Expression groupByExpression, Expression keepExpression) {
TSDataType type = analyzeExpression(analysis, groupByExpression);
if (type != TSDataType.BOOLEAN) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index dcb9d0d173..c5ac196776 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -71,8 +71,8 @@ import org.apache.iotdb.db.mpp.plan.statement.component.FillComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
import org.apache.iotdb.db.mpp.plan.statement.component.FromComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.GroupByComponent;
+import org.apache.iotdb.db.mpp.plan.statement.component.GroupByConditionComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.GroupByLevelComponent;
-import org.apache.iotdb.db.mpp.plan.statement.component.GroupBySeriesComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.GroupBySessionComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTagComponent;
import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
@@ -965,7 +965,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
groupByKeys.add("COMMON");
queryStatement.setGroupByComponent(
- parseGroupByClause(groupByAttribute, WindowType.EVENT_WINDOW));
+ parseGroupByClause(groupByAttribute, WindowType.VARIATION_WINDOW));
} else if (groupByAttribute.CONDITION() != null) {
if (groupByKeys.contains("COMMON")) {
throw new SemanticException(GROUP_BY_COMMON_ONLY_ONE_MSG);
@@ -973,7 +973,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
groupByKeys.add("COMMON");
queryStatement.setGroupByComponent(
- parseGroupByClause(groupByAttribute, WindowType.SERIES_WINDOW));
+ parseGroupByClause(groupByAttribute, WindowType.CONDITION_WINDOW));
} else if (groupByAttribute.SESSION() != null) {
if (groupByKeys.contains("COMMON")) {
throw new SemanticException(GROUP_BY_COMMON_ONLY_ONE_MSG);
@@ -1209,7 +1209,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
}
}
List<ExpressionContext> expressions = ctx.expression();
- if (windowType == WindowType.EVENT_WINDOW) {
+ if (windowType == WindowType.VARIATION_WINDOW) {
ExpressionContext expressionContext = expressions.get(0);
GroupByVariationComponent groupByVariationComponent = new GroupByVariationComponent();
groupByVariationComponent.setControlColumnExpression(
@@ -1218,16 +1218,16 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
ctx.delta == null ? 0 : Double.parseDouble(ctx.delta.getText()));
groupByVariationComponent.setIgnoringNull(ignoringNull);
return groupByVariationComponent;
- } else if (windowType == WindowType.SERIES_WINDOW) {
+ } else if (windowType == WindowType.CONDITION_WINDOW) {
ExpressionContext conditionExpressionContext = expressions.get(0);
- GroupBySeriesComponent groupBySeriesComponent = new GroupBySeriesComponent();
- groupBySeriesComponent.setControlColumnExpression(
+ GroupByConditionComponent groupByConditionComponent = new GroupByConditionComponent();
+ groupByConditionComponent.setControlColumnExpression(
parseExpression(conditionExpressionContext, true));
if (expressions.size() == 2) {
- groupBySeriesComponent.setKeepExpression(parseExpression(expressions.get(1), true));
+ groupByConditionComponent.setKeepExpression(parseExpression(expressions.get(1), true));
}
- groupBySeriesComponent.setIgnoringNull(ignoringNull);
- return groupBySeriesComponent;
+ groupByConditionComponent.setIgnoringNull(ignoringNull);
+ return groupByConditionComponent;
} else if (windowType == WindowType.SESSION_WINDOW) {
long interval = DateTimeUtils.convertDurationStrToLong(ctx.timeInterval.getText());
return new GroupBySessionComponent(interval);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 5a3c88a318..1f7ac0236c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -123,10 +123,10 @@ import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.ShowQueriesOperator;
-import org.apache.iotdb.db.mpp.execution.operator.window.EventWindowParameter;
-import org.apache.iotdb.db.mpp.execution.operator.window.SeriesWindowParameter;
+import org.apache.iotdb.db.mpp.execution.operator.window.ConditionWindowParameter;
import org.apache.iotdb.db.mpp.execution.operator.window.SessionWindowParameter;
import org.apache.iotdb.db.mpp.execution.operator.window.TimeWindowParameter;
+import org.apache.iotdb.db.mpp.execution.operator.window.VariationWindowParameter;
import org.apache.iotdb.db.mpp.execution.operator.window.WindowParameter;
import org.apache.iotdb.db.mpp.execution.operator.window.WindowType;
import org.apache.iotdb.db.mpp.plan.Coordinator;
@@ -189,8 +189,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByConditionParameter;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByParameter;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupBySeriesParameter;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupBySessionParameter;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByVariationParameter;
@@ -773,6 +773,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
operatorContext, node.getDevices(), children, deviceColumnIndex, outputColumnTypes);
}
+ @Deprecated
@Override
public Operator visitDeviceMerge(DeviceMergeNode node, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
@@ -1464,27 +1465,35 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
WindowParameter windowParameter;
switch (windowType) {
- case EVENT_WINDOW:
- String controlColumn = node.getGroupByExpression().getExpressionString();
+ case VARIATION_WINDOW:
+ Expression groupByVariationExpression = node.getGroupByExpression();
+ if (groupByVariationExpression == null) {
+ throw new IllegalArgumentException("groupByVariationExpression can't be null");
+ }
+ String controlColumn = groupByVariationExpression.getExpressionString();
TSDataType controlColumnType = context.getTypeProvider().getType(controlColumn);
windowParameter =
- new EventWindowParameter(
+ new VariationWindowParameter(
controlColumnType,
layout.get(controlColumn).get(0).getValueColumnIndex(),
node.isOutputEndTime(),
((GroupByVariationParameter) groupByParameter).isIgnoringNull(),
((GroupByVariationParameter) groupByParameter).getDelta());
break;
- case SERIES_WINDOW:
+ case CONDITION_WINDOW:
+ Expression groupByConditionExpression = node.getGroupByExpression();
+ if (groupByConditionExpression == null) {
+ throw new IllegalArgumentException("groupByConditionExpression can't be null");
+ }
windowParameter =
- new SeriesWindowParameter(
+ new ConditionWindowParameter(
node.isOutputEndTime(),
- ((GroupBySeriesParameter) groupByParameter).isIgnoringNull(),
+ ((GroupByConditionParameter) groupByParameter).isIgnoringNull(),
layout
- .get(node.getGroupByExpression().getExpressionString())
+ .get(groupByConditionExpression.getExpressionString())
.get(0)
.getValueColumnIndex(),
- ((GroupBySeriesParameter) groupByParameter).getKeepExpression());
+ ((GroupByConditionParameter) groupByParameter).getKeepExpression());
break;
case SESSION_WINDOW:
windowParameter =
@@ -1746,6 +1755,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
}
}
+ @Deprecated
@Override
public Operator visitTimeJoin(TimeJoinNode node, LocalExecutionPlanContext context) {
List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, context);
@@ -2421,7 +2431,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
LocalExecutionPlanContext subContext = context.createSubContext();
subContext.setDegreeOfParallelism(1);
// Create partial parent operator for children
- PlanNode partialParentNode = null;
+ PlanNode partialParentNode;
if (endIndex - startIndex == 1) {
partialParentNode = node.getChildren().get(startIndex);
} else {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupBySeriesParameter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByConditionParameter.java
similarity index 83%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupBySeriesParameter.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByConditionParameter.java
index f0ed735779..5d5bc09d15 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupBySeriesParameter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByConditionParameter.java
@@ -28,13 +28,13 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
-public class GroupBySeriesParameter extends GroupByParameter {
+public class GroupByConditionParameter extends GroupByParameter {
private final Expression keepExpression;
- private boolean ignoringNull;
+ private final boolean ignoringNull;
- public GroupBySeriesParameter(boolean ignoringNull, Expression keepExpression) {
- super(WindowType.SERIES_WINDOW);
+ public GroupByConditionParameter(boolean ignoringNull, Expression keepExpression) {
+ super(WindowType.CONDITION_WINDOW);
this.keepExpression = keepExpression;
this.ignoringNull = ignoringNull;
}
@@ -54,7 +54,7 @@ public class GroupBySeriesParameter extends GroupByParameter {
public static GroupByParameter deserialize(ByteBuffer buffer) {
boolean ignoringNull = ReadWriteIOUtils.readBool(buffer);
Expression keepExpression = Expression.deserialize(buffer);
- return new GroupBySeriesParameter(ignoringNull, keepExpression);
+ return new GroupByConditionParameter(ignoringNull, keepExpression);
}
public Expression getKeepExpression() {
@@ -76,8 +76,8 @@ public class GroupBySeriesParameter extends GroupByParameter {
if (!super.equals(obj)) {
return false;
}
- return this.keepExpression == ((GroupBySeriesParameter) obj).getKeepExpression()
- && this.ignoringNull == ((GroupBySeriesParameter) obj).ignoringNull;
+ return this.keepExpression == ((GroupByConditionParameter) obj).getKeepExpression()
+ && this.ignoringNull == ((GroupByConditionParameter) obj).ignoringNull;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByParameter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByParameter.java
index 3380ea6a0d..586188fc4a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByParameter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByParameter.java
@@ -71,10 +71,10 @@ public abstract class GroupByParameter {
public static GroupByParameter deserialize(ByteBuffer byteBuffer) {
byte type = ReadWriteIOUtils.readByte(byteBuffer);
- if (type == WindowType.EVENT_WINDOW.getType()) {
+ if (type == WindowType.VARIATION_WINDOW.getType()) {
return GroupByVariationParameter.deserialize(byteBuffer);
- } else if (type == WindowType.SERIES_WINDOW.getType()) {
- return GroupBySeriesParameter.deserialize(byteBuffer);
+ } else if (type == WindowType.CONDITION_WINDOW.getType()) {
+ return GroupByConditionParameter.deserialize(byteBuffer);
} else if (type == WindowType.SESSION_WINDOW.getType()) {
return GroupBySessionParameter.deserialize(byteBuffer);
} else throw new SemanticException("Unsupported window type");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByVariationParameter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByVariationParameter.java
index 6a397e5e33..7e30251f74 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByVariationParameter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByVariationParameter.java
@@ -34,7 +34,7 @@ public class GroupByVariationParameter extends GroupByParameter {
boolean ignoringNull;
public GroupByVariationParameter(boolean ignoringNull, double delta) {
- super(WindowType.EVENT_WINDOW);
+ super(WindowType.VARIATION_WINDOW);
this.delta = delta;
this.ignoringNull = ignoringNull;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupBySeriesComponent.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByConditionComponent.java
similarity index 89%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupBySeriesComponent.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByConditionComponent.java
index 31f44f4389..86589b2ccb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupBySeriesComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByConditionComponent.java
@@ -22,13 +22,13 @@ package org.apache.iotdb.db.mpp.plan.statement.component;
import org.apache.iotdb.db.mpp.execution.operator.window.WindowType;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
-public class GroupBySeriesComponent extends GroupByComponent {
+public class GroupByConditionComponent extends GroupByComponent {
// use to filter continuous value (keep>=/>/=/</<=threshold)
private Expression keepExpression;
- public GroupBySeriesComponent() {
- super(WindowType.SERIES_WINDOW);
+ public GroupByConditionComponent() {
+ super(WindowType.CONDITION_WINDOW);
}
public void setKeepExpression(Expression keepExpression) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByVariationComponent.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByVariationComponent.java
index 90c7b3d606..001607635a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByVariationComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByVariationComponent.java
@@ -25,7 +25,7 @@ public class GroupByVariationComponent extends GroupByComponent {
private double delta = 0;
public GroupByVariationComponent() {
- super(WindowType.EVENT_WINDOW);
+ super(WindowType.VARIATION_WINDOW);
}
public double getDelta() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
index 480ab2a026..810586dc29 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
@@ -294,15 +294,17 @@ public class QueryStatement extends Statement {
}
private boolean isGroupByVariation() {
- return groupByComponent != null && groupByComponent.getWindowType() == WindowType.EVENT_WINDOW;
+ return groupByComponent != null
+ && groupByComponent.getWindowType() == WindowType.VARIATION_WINDOW;
}
- private boolean isGroupBySeries() {
- return groupByComponent != null && groupByComponent.getWindowType() == WindowType.SERIES_WINDOW;
+ private boolean isGroupByCondition() {
+ return groupByComponent != null
+ && groupByComponent.getWindowType() == WindowType.CONDITION_WINDOW;
}
public boolean hasGroupByExpression() {
- return isGroupByVariation() || isGroupBySeries();
+ return isGroupByVariation() || isGroupByCondition();
}
public boolean isAlignByTime() {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/AccumulatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/AccumulatorTest.java
index 8e79b3fb3c..5d0729fe5b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/AccumulatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/AccumulatorTest.java
@@ -20,11 +20,8 @@
package org.apache.iotdb.db.mpp.aggregation;
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
-import org.apache.iotdb.db.mpp.execution.operator.window.TimeWindow;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -45,9 +42,6 @@ public class AccumulatorTest {
private TsBlock rawData;
private Statistics statistics;
- private TimeRange defaultTimeRange = new TimeRange(0, Long.MAX_VALUE);
-
- private TimeWindow timeWindow = new TimeWindow(defaultTimeRange);
@Before
public void setUp() {
@@ -71,11 +65,10 @@ public class AccumulatorTest {
statistics.update(100L, 100d);
}
- public Column[] getControlTimeAndValueColumn(IWindow curWindow, int columnIndex) {
- Column[] columns = new Column[3];
- columns[0] = curWindow.getControlColumn(rawData);
- columns[1] = rawData.getTimeColumn();
- columns[2] = rawData.getColumn(columnIndex);
+ public Column[] getTimeAndValueColumn(int columnIndex) {
+ Column[] columns = new Column[2];
+ columns[0] = rawData.getTimeColumn();
+ columns[1] = rawData.getColumn(columnIndex);
return columns;
}
@@ -102,8 +95,8 @@ public class AccumulatorTest {
avgAccumulator.outputFinal(finalResult);
Assert.assertTrue(finalResult.build().isNull(0));
- Column[] controlTimeAndValueColumn = getControlTimeAndValueColumn(timeWindow, 0);
- avgAccumulator.addInput(controlTimeAndValueColumn, timeWindow);
+ Column[] timeAndValueColumn = getTimeAndValueColumn(0);
+ avgAccumulator.addInput(timeAndValueColumn, null, rawData.getPositionCount() - 1);
Assert.assertFalse(avgAccumulator.hasFinalResult());
intermediateResult[0] = new LongColumnBuilder(null, 1);
intermediateResult[1] = new DoubleColumnBuilder(null, 1);
@@ -145,8 +138,8 @@ public class AccumulatorTest {
countAccumulator.outputFinal(finalResult);
Assert.assertEquals(0, finalResult.build().getLong(0));
- Column[] controlTimeAndValueColumn = getControlTimeAndValueColumn(timeWindow, 0);
- countAccumulator.addInput(controlTimeAndValueColumn, timeWindow);
+ Column[] timeAndValueColumn = getTimeAndValueColumn(0);
+ countAccumulator.addInput(timeAndValueColumn, null, rawData.getPositionCount() - 1);
Assert.assertFalse(countAccumulator.hasFinalResult());
intermediateResult[0] = new LongColumnBuilder(null, 1);
countAccumulator.outputIntermediate(intermediateResult);
@@ -185,8 +178,8 @@ public class AccumulatorTest {
extremeAccumulator.outputFinal(finalResult);
Assert.assertTrue(finalResult.build().isNull(0));
- Column[] controlTimeAndValueColumn = getControlTimeAndValueColumn(timeWindow, 0);
- extremeAccumulator.addInput(controlTimeAndValueColumn, timeWindow);
+ Column[] timeAndValueColumn = getTimeAndValueColumn(0);
+ extremeAccumulator.addInput(timeAndValueColumn, null, rawData.getPositionCount() - 1);
Assert.assertFalse(extremeAccumulator.hasFinalResult());
intermediateResult[0] = new DoubleColumnBuilder(null, 1);
extremeAccumulator.outputIntermediate(intermediateResult);
@@ -228,8 +221,8 @@ public class AccumulatorTest {
firstValueAccumulator.outputFinal(finalResult);
Assert.assertTrue(finalResult.build().isNull(0));
- Column[] controlTimeAndValueColumn = getControlTimeAndValueColumn(timeWindow, 0);
- firstValueAccumulator.addInput(controlTimeAndValueColumn, timeWindow);
+ Column[] timeAndValueColumn = getTimeAndValueColumn(0);
+ firstValueAccumulator.addInput(timeAndValueColumn, null, rawData.getPositionCount() - 1);
Assert.assertTrue(firstValueAccumulator.hasFinalResult());
intermediateResult[0] = new DoubleColumnBuilder(null, 1);
intermediateResult[1] = new LongColumnBuilder(null, 1);
@@ -274,8 +267,8 @@ public class AccumulatorTest {
lastValueAccumulator.outputFinal(finalResult);
Assert.assertTrue(finalResult.build().isNull(0));
- Column[] controlTimeAndValueColumn = getControlTimeAndValueColumn(timeWindow, 0);
- lastValueAccumulator.addInput(controlTimeAndValueColumn, timeWindow);
+ Column[] timeAndValueColumn = getTimeAndValueColumn(0);
+ lastValueAccumulator.addInput(timeAndValueColumn, null, rawData.getPositionCount() - 1);
intermediateResult[0] = new DoubleColumnBuilder(null, 1);
intermediateResult[1] = new LongColumnBuilder(null, 1);
lastValueAccumulator.outputIntermediate(intermediateResult);
@@ -316,8 +309,8 @@ public class AccumulatorTest {
maxTimeAccumulator.outputFinal(finalResult);
Assert.assertTrue(finalResult.build().isNull(0));
- Column[] controlTimeAndValueColumn = getControlTimeAndValueColumn(timeWindow, 0);
- maxTimeAccumulator.addInput(controlTimeAndValueColumn, timeWindow);
+ Column[] timeAndValueColumn = getTimeAndValueColumn(0);
+ maxTimeAccumulator.addInput(timeAndValueColumn, null, rawData.getPositionCount() - 1);
Assert.assertFalse(maxTimeAccumulator.hasFinalResult());
intermediateResult[0] = new LongColumnBuilder(null, 1);
maxTimeAccumulator.outputIntermediate(intermediateResult);
@@ -356,8 +349,8 @@ public class AccumulatorTest {
minTimeAccumulator.outputFinal(finalResult);
Assert.assertTrue(finalResult.build().isNull(0));
- Column[] controlTimeAndValueColumn = getControlTimeAndValueColumn(timeWindow, 0);
- minTimeAccumulator.addInput(controlTimeAndValueColumn, timeWindow);
+ Column[] timeAndValueColumn = getTimeAndValueColumn(0);
+ minTimeAccumulator.addInput(timeAndValueColumn, null, rawData.getPositionCount() - 1);
Assert.assertTrue(minTimeAccumulator.hasFinalResult());
intermediateResult[0] = new LongColumnBuilder(null, 1);
minTimeAccumulator.outputIntermediate(intermediateResult);
@@ -396,8 +389,8 @@ public class AccumulatorTest {
extremeAccumulator.outputFinal(finalResult);
Assert.assertTrue(finalResult.build().isNull(0));
- Column[] controlTimeAndValueColumn = getControlTimeAndValueColumn(timeWindow, 0);
- extremeAccumulator.addInput(controlTimeAndValueColumn, timeWindow);
+ Column[] timeAndValueColumn = getTimeAndValueColumn(0);
+ extremeAccumulator.addInput(timeAndValueColumn, null, rawData.getPositionCount() - 1);
Assert.assertFalse(extremeAccumulator.hasFinalResult());
intermediateResult[0] = new DoubleColumnBuilder(null, 1);
extremeAccumulator.outputIntermediate(intermediateResult);
@@ -436,8 +429,8 @@ public class AccumulatorTest {
extremeAccumulator.outputFinal(finalResult);
Assert.assertTrue(finalResult.build().isNull(0));
- Column[] controlTimeAndValueColumn = getControlTimeAndValueColumn(timeWindow, 0);
- extremeAccumulator.addInput(controlTimeAndValueColumn, timeWindow);
+ Column[] timeAndValueColumn = getTimeAndValueColumn(0);
+ extremeAccumulator.addInput(timeAndValueColumn, null, rawData.getPositionCount() - 1);
Assert.assertFalse(extremeAccumulator.hasFinalResult());
intermediateResult[0] = new DoubleColumnBuilder(null, 1);
extremeAccumulator.outputIntermediate(intermediateResult);
@@ -476,8 +469,8 @@ public class AccumulatorTest {
sumAccumulator.outputFinal(finalResult);
Assert.assertTrue(finalResult.build().isNull(0));
- Column[] controlTimeAndValueColumn = getControlTimeAndValueColumn(timeWindow, 0);
- sumAccumulator.addInput(controlTimeAndValueColumn, timeWindow);
+ Column[] timeAndValueColumn = getTimeAndValueColumn(0);
+ sumAccumulator.addInput(timeAndValueColumn, null, rawData.getPositionCount() - 1);
Assert.assertFalse(sumAccumulator.hasFinalResult());
intermediateResult[0] = new DoubleColumnBuilder(null, 1);
sumAccumulator.outputIntermediate(intermediateResult);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
index 76aa21237e..aa333914a9 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
@@ -40,9 +40,9 @@ import org.apache.iotdb.db.mpp.execution.operator.process.join.RowBasedTimeJoinO
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
-import org.apache.iotdb.db.mpp.execution.operator.window.EventWindowParameter;
import org.apache.iotdb.db.mpp.execution.operator.window.SessionWindowParameter;
import org.apache.iotdb.db.mpp.execution.operator.window.TimeWindowParameter;
+import org.apache.iotdb.db.mpp.execution.operator.window.VariationWindowParameter;
import org.apache.iotdb.db.mpp.execution.operator.window.WindowParameter;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
@@ -562,7 +562,7 @@ public class RawDataAggregationOperatorTest {
}
WindowParameter windowParameter =
- new EventWindowParameter(TSDataType.INT32, 0, false, true, 10000);
+ new VariationWindowParameter(TSDataType.INT32, 0, false, true, 10000);
RawDataAggregationOperator rawDataAggregationOperator =
initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
@@ -626,7 +626,7 @@ public class RawDataAggregationOperatorTest {
}
WindowParameter windowParameter =
- new EventWindowParameter(TSDataType.INT32, 0, true, true, 5000);
+ new VariationWindowParameter(TSDataType.INT32, 0, true, true, 5000);
RawDataAggregationOperator rawDataAggregationOperator =
initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
@@ -685,7 +685,7 @@ public class RawDataAggregationOperatorTest {
}
WindowParameter windowParameter =
- new EventWindowParameter(TSDataType.INT32, 0, false, true, 5000);
+ new VariationWindowParameter(TSDataType.INT32, 0, false, true, 5000);
RawDataAggregationOperator rawDataAggregationOperator =
initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
@@ -734,7 +734,7 @@ public class RawDataAggregationOperatorTest {
}
WindowParameter windowParameter =
- new EventWindowParameter(TSDataType.INT32, 0, true, true, 5000);
+ new VariationWindowParameter(TSDataType.INT32, 0, true, true, 5000);
RawDataAggregationOperator rawDataAggregationOperator =
initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
@@ -758,14 +758,15 @@ public class RawDataAggregationOperatorTest {
@Test
public void onePointInOneEqualEventWindowTest() throws IllegalPathException {
- WindowParameter windowParameter = new EventWindowParameter(TSDataType.INT32, 0, false, true, 0);
+ WindowParameter windowParameter =
+ new VariationWindowParameter(TSDataType.INT32, 0, false, true, 0);
onePointInOneWindowTest(windowParameter);
}
@Test
public void onePointInOneVariationEventWindowTest() throws IllegalPathException {
WindowParameter windowParameter =
- new EventWindowParameter(TSDataType.INT32, 0, false, true, 0.5);
+ new VariationWindowParameter(TSDataType.INT32, 0, false, true, 0.5);
onePointInOneWindowTest(windowParameter);
}