You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/01/10 11:37:13 UTC
[iotdb] branch master updated: [IOTDB-4437] Implement event window in RawDataAggregationOperator
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5dac205243 [IOTDB-4437] Implement event window in RawDataAggregationOperator
5dac205243 is described below
commit 5dac205243e3e6b321966f456a15d817af56f8fb
Author: AACEPT <34...@users.noreply.github.com>
AuthorDate: Tue Jan 10 19:37:07 2023 +0800
[IOTDB-4437] Implement event window in RawDataAggregationOperator
---
.../iotdb/db/mpp/aggregation/AvgAccumulator.java | 8 +-
.../iotdb/db/mpp/aggregation/CountAccumulator.java | 4 +-
.../db/mpp/aggregation/ExtremeAccumulator.java | 8 +-
.../db/mpp/aggregation/FirstValueAccumulator.java | 12 +-
.../mpp/aggregation/FirstValueDescAccumulator.java | 12 +-
.../db/mpp/aggregation/LastValueAccumulator.java | 12 +-
.../mpp/aggregation/LastValueDescAccumulator.java | 12 +-
.../db/mpp/aggregation/MaxTimeAccumulator.java | 2 +-
.../db/mpp/aggregation/MaxTimeDescAccumulator.java | 2 +-
.../db/mpp/aggregation/MaxValueAccumulator.java | 8 +-
.../db/mpp/aggregation/MinTimeAccumulator.java | 2 +-
.../db/mpp/aggregation/MinTimeDescAccumulator.java | 2 +-
.../db/mpp/aggregation/MinValueAccumulator.java | 8 +-
.../iotdb/db/mpp/aggregation/SumAccumulator.java | 8 +-
.../process/RawDataAggregationOperator.java | 51 +-
.../process/SingleInputAggregationOperator.java | 12 +-
.../process/SlidingWindowAggregationOperator.java | 11 +
.../operator/window/EqualEventBooleanWindow.java} | 32 +-
.../window/EqualEventBooleanWindowManager.java | 64 +++
.../operator/window/EqualEventDoubleWindow.java} | 32 +-
.../window/EqualEventDoubleWindowManager.java | 64 +++
.../operator/window/EqualEventFloatWindow.java} | 32 +-
.../window/EqualEventFloatWindowManager.java | 64 +++
.../operator/window/EqualEventIntWindow.java} | 32 +-
.../window/EqualEventIntWindowManager.java | 63 +++
.../operator/window/EqualEventLongWindow.java} | 32 +-
.../window/EqualEventLongWindowManager.java | 63 +++
.../operator/window/EqualEventTextWindow.java} | 32 +-
.../window/EqualEventTextWindowManager.java | 64 +++
.../operator/window/EventBooleanWindow.java | 66 +++
.../operator/window/EventBooleanWindowManager.java | 46 ++
.../operator/window/EventDoubleWindow.java | 66 +++
.../operator/window/EventDoubleWindowManager.java | 46 ++
.../operator/window/EventFloatWindow.java | 66 +++
.../operator/window/EventFloatWindowManager.java | 46 ++
.../execution/operator/window/EventIntWindow.java | 66 +++
.../operator/window/EventIntWindowManager.java | 46 ++
.../execution/operator/window/EventLongWindow.java | 66 +++
.../operator/window/EventLongWindowManager.java | 46 ++
.../execution/operator/window/EventTextWindow.java | 67 +++
.../operator/window/EventTextWindowManager.java | 46 ++
.../window/{TimeWindow.java => EventWindow.java} | 61 ++-
.../operator/window/EventWindowManager.java | 138 ++++++
.../operator/window/EventWindowParameter.java | 61 +++
.../db/mpp/execution/operator/window/IWindow.java | 2 +-
.../execution/operator/window/IWindowManager.java | 68 ++-
.../mpp/execution/operator/window/TimeWindow.java | 2 +-
.../operator/window/TimeWindowManager.java | 73 ++-
.../operator/window/TimeWindowParameter.java} | 33 +-
.../window/VariationEventDoubleWindow.java} | 32 +-
.../window/VariationEventDoubleWindowManager.java | 66 +++
.../window/VariationEventFloatWindow.java} | 32 +-
.../window/VariationEventFloatWindowManager.java | 66 +++
.../operator/window/VariationEventIntWindow.java} | 32 +-
.../window/VariationEventIntWindowManager.java | 66 +++
.../operator/window/VariationEventLongWindow.java} | 32 +-
.../window/VariationEventLongWindowManager.java | 66 +++
.../operator/window/WindowManagerFactory.java | 84 ++++
.../operator/window/WindowParameter.java} | 36 +-
.../operator/window/WindowType.java} | 33 +-
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 7 +-
.../mpp/execution/operator/OperatorMemoryTest.java | 7 +-
.../operator/RawDataAggregationOperatorTest.java | 527 ++++++++++++++++++++-
.../query/reader/series/SeriesReaderTestUtil.java | 18 +
64 files changed, 2517 insertions(+), 446 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java
index c647087362..d8d1be7027 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java
@@ -157,7 +157,7 @@ public class AvgAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
initResult = true;
countValue++;
@@ -178,7 +178,7 @@ public class AvgAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
initResult = true;
countValue++;
@@ -199,7 +199,7 @@ public class AvgAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
initResult = true;
countValue++;
@@ -220,7 +220,7 @@ public class AvgAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
initResult = true;
countValue++;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java
index 0455255a45..b43129fe61 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java
@@ -38,7 +38,7 @@ public class CountAccumulator implements Accumulator {
public int addInput(Column[] column, IWindow curWindow) {
int curPositionCount = column[0].getPositionCount();
- if (!column[2].mayHaveNull() && curWindow.contains(column[1])) {
+ if (!column[2].mayHaveNull() && curWindow.contains(column[0])) {
countValue += curPositionCount;
} else {
for (int i = 0; i < curPositionCount; i++) {
@@ -49,7 +49,7 @@ public class CountAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
countValue++;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java
index eedbe08063..9f7704e995 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java
@@ -232,7 +232,7 @@ public class ExtremeAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateIntResult(column[2].getInt(i));
}
@@ -264,7 +264,7 @@ public class ExtremeAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateLongResult(column[2].getLong(i));
}
@@ -296,7 +296,7 @@ public class ExtremeAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateFloatResult(column[2].getFloat(i));
}
@@ -328,7 +328,7 @@ public class ExtremeAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateDoubleResult(column[2].getDouble(i));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
index 90ba07f543..180c3154d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
@@ -257,7 +257,7 @@ public class FirstValueAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateIntFirstValue(column[2].getInt(i), column[1].getLong(i));
return i;
@@ -286,7 +286,7 @@ public class FirstValueAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateLongFirstValue(column[2].getLong(i), column[1].getLong(i));
return i;
@@ -315,7 +315,7 @@ public class FirstValueAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateFloatFirstValue(column[2].getFloat(i), column[1].getLong(i));
return i;
@@ -344,7 +344,7 @@ public class FirstValueAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateDoubleFirstValue(column[2].getDouble(i), column[1].getLong(i));
return i;
@@ -373,7 +373,7 @@ public class FirstValueAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateBooleanFirstValue(column[2].getBoolean(i), column[1].getLong(i));
return i;
@@ -402,7 +402,7 @@ public class FirstValueAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateBinaryFirstValue(column[2].getBinary(i), column[1].getLong(i));
return i;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueDescAccumulator.java
index 5717e7d175..eb5ed724f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueDescAccumulator.java
@@ -47,7 +47,7 @@ public class FirstValueDescAccumulator extends FirstValueAccumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateIntFirstValue(column[2].getInt(i), column[1].getLong(i));
}
@@ -67,7 +67,7 @@ public class FirstValueDescAccumulator extends FirstValueAccumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateLongFirstValue(column[2].getLong(i), column[1].getLong(i));
}
@@ -87,7 +87,7 @@ public class FirstValueDescAccumulator extends FirstValueAccumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateFloatFirstValue(column[2].getFloat(i), column[1].getLong(i));
}
@@ -107,7 +107,7 @@ public class FirstValueDescAccumulator extends FirstValueAccumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateDoubleFirstValue(column[2].getDouble(i), column[1].getLong(i));
}
@@ -127,7 +127,7 @@ public class FirstValueDescAccumulator extends FirstValueAccumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateBooleanFirstValue(column[2].getBoolean(i), column[1].getLong(i));
}
@@ -147,7 +147,7 @@ public class FirstValueDescAccumulator extends FirstValueAccumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateBinaryFirstValue(column[2].getBinary(i), column[1].getLong(i));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
index e198948f49..485333a680 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
@@ -257,7 +257,7 @@ public class LastValueAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateIntLastValue(column[2].getInt(i), column[1].getLong(i));
}
@@ -284,7 +284,7 @@ public class LastValueAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateLongLastValue(column[2].getLong(i), column[1].getLong(i));
}
@@ -311,7 +311,7 @@ public class LastValueAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateFloatLastValue(column[2].getFloat(i), column[1].getLong(i));
}
@@ -338,7 +338,7 @@ public class LastValueAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateDoubleLastValue(column[2].getDouble(i), column[1].getLong(i));
}
@@ -365,7 +365,7 @@ public class LastValueAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateBooleanLastValue(column[2].getBoolean(i), column[1].getLong(i));
}
@@ -392,7 +392,7 @@ public class LastValueAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateBinaryLastValue(column[2].getBinary(i), column[1].getLong(i));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueDescAccumulator.java
index 98b77c3030..34ded05789 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueDescAccumulator.java
@@ -51,7 +51,7 @@ public class LastValueDescAccumulator extends LastValueAccumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateIntLastValue(column[2].getInt(i), column[1].getLong(i));
return i;
@@ -73,7 +73,7 @@ public class LastValueDescAccumulator extends LastValueAccumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateLongLastValue(column[2].getLong(i), column[1].getLong(i));
return i;
@@ -95,7 +95,7 @@ public class LastValueDescAccumulator extends LastValueAccumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateFloatLastValue(column[2].getFloat(i), column[1].getLong(i));
return i;
@@ -117,7 +117,7 @@ public class LastValueDescAccumulator extends LastValueAccumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateDoubleLastValue(column[2].getDouble(i), column[1].getLong(i));
return i;
@@ -139,7 +139,7 @@ public class LastValueDescAccumulator extends LastValueAccumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateBooleanLastValue(column[2].getBoolean(i), column[1].getLong(i));
return i;
@@ -161,7 +161,7 @@ public class LastValueDescAccumulator extends LastValueAccumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateBinaryLastValue(column[2].getBinary(i), column[1].getLong(i));
return i;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java
index ef1fa58dc1..ef045ad9c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java
@@ -48,7 +48,7 @@ public class MaxTimeAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateMaxTime(column[1].getLong(i));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeDescAccumulator.java
index 536a60fe25..6ed18bd381 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeDescAccumulator.java
@@ -38,7 +38,7 @@ public class MaxTimeDescAccumulator extends MaxTimeAccumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateMaxTime(column[1].getLong(i));
return i;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java
index 768e9cf16f..cf93148980 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java
@@ -230,7 +230,7 @@ public class MaxValueAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateIntResult(column[2].getInt(i));
}
@@ -256,7 +256,7 @@ public class MaxValueAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateLongResult(column[2].getLong(i));
}
@@ -282,7 +282,7 @@ public class MaxValueAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateFloatResult(column[2].getFloat(i));
}
@@ -308,7 +308,7 @@ public class MaxValueAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateDoubleResult(column[2].getDouble(i));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java
index 252ccaf02b..2229f2de2b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java
@@ -48,7 +48,7 @@ public class MinTimeAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateMinTime(column[1].getLong(i));
return i;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
index 249cf458fd..e63a9765dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
@@ -36,7 +36,7 @@ public class MinTimeDescAccumulator extends MinTimeAccumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateMinTime(column[1].getLong(i));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java
index 2abd59e6e1..1d49b56277 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java
@@ -230,7 +230,7 @@ public class MinValueAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateIntResult(column[2].getInt(i));
}
@@ -256,7 +256,7 @@ public class MinValueAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateLongResult(column[2].getLong(i));
}
@@ -282,7 +282,7 @@ public class MinValueAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateFloatResult(column[2].getFloat(i));
}
@@ -308,7 +308,7 @@ public class MinValueAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
updateDoubleResult(column[2].getDouble(i));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java
index bd113d7801..85edd382f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java
@@ -146,7 +146,7 @@ public class SumAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
initResult = true;
sumValue += column[2].getInt(i);
@@ -166,7 +166,7 @@ public class SumAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
initResult = true;
sumValue += column[2].getLong(i);
@@ -186,7 +186,7 @@ public class SumAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
initResult = true;
sumValue += column[2].getFloat(i);
@@ -206,7 +206,7 @@ public class SumAccumulator implements Accumulator {
if (!curWindow.satisfy(column[0], i)) {
return i;
}
- curWindow.mergeOnePoint();
+ curWindow.mergeOnePoint(column, i);
if (!column[2].isNull(i)) {
initResult = true;
sumValue += column[2].getDouble(i);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index b5ce5ef823..f34c92438f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -25,12 +25,12 @@ import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.db.mpp.execution.operator.window.IWindowManager;
-import org.apache.iotdb.db.mpp.execution.operator.window.TimeWindowManager;
+import org.apache.iotdb.db.mpp.execution.operator.window.WindowParameter;
import java.util.List;
-import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.isAllAggregatorsHasFinalResult;
+import static org.apache.iotdb.db.mpp.execution.operator.window.WindowManagerFactory.genWindowManager;
/**
* RawDataAggregationOperator is used to process raw data tsBlock input calculating using value
@@ -46,19 +46,27 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
private final IWindowManager windowManager;
+ // needSkip is the signal to determine whether to skip the points out of current window to get
+ // endTime when the resultSet needs to output endTime.
+ // If the resultSet needs endTime, needSkip will be set to true when the operator is skipping the
+ // points out of current window.
+ private boolean needSkip = false;
+
public RawDataAggregationOperator(
OperatorContext operatorContext,
List<Aggregator> aggregators,
ITimeRangeIterator timeRangeIterator,
Operator child,
boolean ascending,
- long maxReturnSize) {
+ long maxReturnSize,
+ WindowParameter windowParameter) {
super(operatorContext, aggregators, child, ascending, maxReturnSize);
- this.windowManager = new TimeWindowManager(timeRangeIterator);
+ this.windowManager = genWindowManager(windowParameter, timeRangeIterator);
+ this.resultTsBlockBuilder = windowManager.createResultTsBlockBuilder(aggregators);
}
private boolean hasMoreData() {
- return inputTsBlock != null || child.hasNextWithTimer();
+ return !(inputTsBlock == null || inputTsBlock.isEmpty()) || child.hasNextWithTimer();
}
@Override
@@ -68,29 +76,48 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
@Override
protected boolean calculateNextAggregationResult() {
- while (!calculateFromRawData()) {
+
+ // if needSkip is true, just get the tsBlock directly.
+ while (needSkip || !calculateFromRawData()) {
inputTsBlock = null;
// NOTE: child.next() can only be invoked once
if (child.hasNextWithTimer() && canCallNext) {
inputTsBlock = child.nextWithTimer();
canCallNext = false;
+ if (needSkip) {
+ break;
+ }
} else if (child.hasNextWithTimer()) {
// if child still has next but can't be invoked now
return false;
} else {
- // If there are no points belong to last window, the last window will not
- // initialize window and aggregators
- if (!windowManager.isCurWindowInit()) {
+ // If there are no points belong to last time window, the last time window will not
+ // initialize window and aggregators. Specially for time window.
+ if (windowManager.notInitedLastTimeWindow()) {
initWindowAndAggregators();
}
break;
}
}
- updateResultTsBlock();
// Step into next window
- windowManager.next();
+ // if needSkip is true, don't need to enter next window again
+ if (!needSkip) {
+ windowManager.next();
+ }
+ // When some windows without cached endTime trying to output endTime,
+ // they need to skip the points in lastWindow in advance to get endTime
+ if (windowManager.needSkipInAdvance()) {
+ needSkip = true;
+ inputTsBlock = windowManager.skipPointsOutOfCurWindow(inputTsBlock);
+ if ((inputTsBlock == null || inputTsBlock.isEmpty()) && child.hasNextWithTimer()) {
+ return canCallNext;
+ }
+ needSkip = false;
+ }
+
+ updateResultTsBlock();
return true;
}
@@ -135,7 +162,7 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
@Override
protected void updateResultTsBlock() {
- appendAggregationResult(resultTsBlockBuilder, aggregators, windowManager.currentOutputTime());
+ windowManager.appendAggregationResult(resultTsBlockBuilder, aggregators);
}
private boolean skipPreviousWindowAndInitCurWindow() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
index 93310c5731..0fb561e1e3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
@@ -22,14 +22,11 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import com.google.common.util.concurrent.ListenableFuture;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -45,7 +42,7 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
protected final List<Aggregator> aggregators;
// using for building result tsBlock
- protected final TsBlockBuilder resultTsBlockBuilder;
+ protected TsBlockBuilder resultTsBlockBuilder;
protected final long maxRetainedSize;
protected final long maxReturnSize;
@@ -60,13 +57,6 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
this.ascending = ascending;
this.child = child;
this.aggregators = aggregators;
-
- List<TSDataType> dataTypes = new ArrayList<>();
- for (Aggregator aggregator : aggregators) {
- dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
- }
- this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
-
this.maxRetainedSize = child.calculateMaxReturnSize();
this.maxReturnSize = maxReturnSize;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
index 805b183677..a0c93aa28f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -24,8 +24,12 @@ import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import static com.google.common.base.Preconditions.checkArgument;
@@ -54,6 +58,13 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper
checkArgument(
groupByTimeParameter != null,
"GroupByTimeParameter cannot be null in SlidingWindowAggregationOperator");
+
+ List<TSDataType> dataTypes = new ArrayList<>();
+ for (Aggregator aggregator : aggregators) {
+ dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
+ }
+ this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+
this.timeRangeIterator = timeRangeIterator;
this.subTimeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending, true);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventBooleanWindow.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventBooleanWindow.java
index 249cf458fd..78258c20c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventBooleanWindow.java
@@ -17,35 +17,21 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.aggregation;
+package org.apache.iotdb.db.mpp.execution.operator.window;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
-public class MinTimeDescAccumulator extends MinTimeAccumulator {
+public class EqualEventBooleanWindow extends EventBooleanWindow {
- @Override
- public int addInput(Column[] column, IWindow curWindow) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (column[0].isNull(i)) {
- continue;
- }
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint();
- if (!column[2].isNull(i)) {
- updateMinTime(column[1].getLong(i));
- }
- }
- return curPositionCount;
+ public EqualEventBooleanWindow(EventWindowParameter eventWindowParameter) {
+ super(eventWindowParameter);
}
@Override
- public boolean hasFinalResult() {
- return false;
+ public boolean satisfy(Column column, int index) {
+ if (!initializedEventValue) {
+ return true;
+ }
+ return column.getBoolean(index) == eventValue;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventBooleanWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventBooleanWindowManager.java
new file mode 100644
index 0000000000..b1da72bff4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventBooleanWindowManager.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public class EqualEventBooleanWindowManager extends EventBooleanWindowManager {
+
+ public EqualEventBooleanWindowManager(
+ EventWindowParameter eventWindowParameter, boolean ascending) {
+ super(eventWindowParameter, ascending);
+ eventWindow = new EqualEventBooleanWindow(eventWindowParameter);
+ }
+
+ @Override
+ public TsBlock skipPointsOutOfCurWindow(TsBlock inputTsBlock) {
+ if (!needSkip) {
+ return inputTsBlock;
+ }
+
+ if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+ return inputTsBlock;
+ }
+
+ Column controlColumn = inputTsBlock.getColumn(eventWindowParameter.getControlColumnIndex());
+ TimeColumn timeColumn = inputTsBlock.getTimeColumn();
+ int i = 0, size = inputTsBlock.getPositionCount();
+ boolean previousEventValue = ((EqualEventBooleanWindow) eventWindow).getPreviousEventValue();
+ for (; i < size; i++) {
+ if (!controlColumn.isNull(i) && controlColumn.getBoolean(i) != previousEventValue) {
+ break;
+ }
+ // judge whether we need update endTime
+ long currentTime = timeColumn.getLong(i);
+ if (eventWindow.getEndTime() < currentTime) {
+ eventWindow.setEndTime(currentTime);
+ }
+ }
+ // we can create a new window beginning at index i of inputTsBlock
+ if (i < size) {
+ needSkip = false;
+ }
+ return inputTsBlock.subTsBlock(i);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventDoubleWindow.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventDoubleWindow.java
index 249cf458fd..aae3164dd2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventDoubleWindow.java
@@ -17,35 +17,21 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.aggregation;
+package org.apache.iotdb.db.mpp.execution.operator.window;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
-public class MinTimeDescAccumulator extends MinTimeAccumulator {
+public class EqualEventDoubleWindow extends EventDoubleWindow {
- @Override
- public int addInput(Column[] column, IWindow curWindow) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (column[0].isNull(i)) {
- continue;
- }
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint();
- if (!column[2].isNull(i)) {
- updateMinTime(column[1].getLong(i));
- }
- }
- return curPositionCount;
+ public EqualEventDoubleWindow(EventWindowParameter eventWindowParameter) {
+ super(eventWindowParameter);
}
@Override
- public boolean hasFinalResult() {
- return false;
+ public boolean satisfy(Column column, int index) {
+ if (!initializedEventValue) {
+ return true;
+ }
+ return column.getDouble(index) == eventValue;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventDoubleWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventDoubleWindowManager.java
new file mode 100644
index 0000000000..c9b225f80e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventDoubleWindowManager.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public class EqualEventDoubleWindowManager extends EventDoubleWindowManager {
+
+ public EqualEventDoubleWindowManager(
+ EventWindowParameter eventWindowParameter, boolean ascending) {
+ super(eventWindowParameter, ascending);
+ eventWindow = new EqualEventDoubleWindow(eventWindowParameter);
+ }
+
+ @Override
+ public TsBlock skipPointsOutOfCurWindow(TsBlock inputTsBlock) {
+ if (!needSkip) {
+ return inputTsBlock;
+ }
+
+ if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+ return inputTsBlock;
+ }
+
+ Column controlColumn = inputTsBlock.getColumn(eventWindowParameter.getControlColumnIndex());
+ TimeColumn timeColumn = inputTsBlock.getTimeColumn();
+ int i = 0, size = inputTsBlock.getPositionCount();
+ double previousEventValue = ((EqualEventDoubleWindow) eventWindow).getPreviousEventValue();
+ for (; i < size; i++) {
+ if (!controlColumn.isNull(i) && controlColumn.getDouble(i) != previousEventValue) {
+ break;
+ }
+ // judge whether we need update endTime
+ long currentTime = timeColumn.getLong(i);
+ if (eventWindow.getEndTime() < currentTime) {
+ eventWindow.setEndTime(currentTime);
+ }
+ }
+ // we can create a new window beginning at index i of inputTsBlock
+ if (i < size) {
+ needSkip = false;
+ }
+ return inputTsBlock.subTsBlock(i);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventFloatWindow.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventFloatWindow.java
index 249cf458fd..9fb5b48150 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventFloatWindow.java
@@ -17,35 +17,21 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.aggregation;
+package org.apache.iotdb.db.mpp.execution.operator.window;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
-public class MinTimeDescAccumulator extends MinTimeAccumulator {
+public class EqualEventFloatWindow extends EventFloatWindow {
- @Override
- public int addInput(Column[] column, IWindow curWindow) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (column[0].isNull(i)) {
- continue;
- }
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint();
- if (!column[2].isNull(i)) {
- updateMinTime(column[1].getLong(i));
- }
- }
- return curPositionCount;
+ public EqualEventFloatWindow(EventWindowParameter eventWindowParameter) {
+ super(eventWindowParameter);
}
@Override
- public boolean hasFinalResult() {
- return false;
+ public boolean satisfy(Column column, int index) {
+ if (!initializedEventValue) {
+ return true;
+ }
+ return column.getFloat(index) == eventValue;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventFloatWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventFloatWindowManager.java
new file mode 100644
index 0000000000..1d5840c1d5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventFloatWindowManager.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public class EqualEventFloatWindowManager extends EventFloatWindowManager {
+
+ public EqualEventFloatWindowManager(
+ EventWindowParameter eventWindowParameter, boolean ascending) {
+ super(eventWindowParameter, ascending);
+ eventWindow = new EqualEventFloatWindow(eventWindowParameter);
+ }
+
+ @Override
+ public TsBlock skipPointsOutOfCurWindow(TsBlock inputTsBlock) {
+ if (!needSkip) {
+ return inputTsBlock;
+ }
+
+ if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+ return inputTsBlock;
+ }
+
+ Column controlColumn = inputTsBlock.getColumn(eventWindowParameter.getControlColumnIndex());
+ TimeColumn timeColumn = inputTsBlock.getTimeColumn();
+ int i = 0, size = inputTsBlock.getPositionCount();
+ float previousEventValue = ((EqualEventFloatWindow) eventWindow).getPreviousEventValue();
+ for (; i < size; i++) {
+ if (!controlColumn.isNull(i) && controlColumn.getFloat(i) != previousEventValue) {
+ break;
+ }
+ // judge whether we need update endTime
+ long currentTime = timeColumn.getLong(i);
+ if (eventWindow.getEndTime() < currentTime) {
+ eventWindow.setEndTime(currentTime);
+ }
+ }
+ // we can create a new window beginning at index i of inputTsBlock
+ if (i < size) {
+ needSkip = false;
+ }
+ return inputTsBlock.subTsBlock(i);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventIntWindow.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventIntWindow.java
index 249cf458fd..5b8e8230c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventIntWindow.java
@@ -17,35 +17,21 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.aggregation;
+package org.apache.iotdb.db.mpp.execution.operator.window;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
-public class MinTimeDescAccumulator extends MinTimeAccumulator {
+public class EqualEventIntWindow extends EventIntWindow {
- @Override
- public int addInput(Column[] column, IWindow curWindow) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (column[0].isNull(i)) {
- continue;
- }
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint();
- if (!column[2].isNull(i)) {
- updateMinTime(column[1].getLong(i));
- }
- }
- return curPositionCount;
+ public EqualEventIntWindow(EventWindowParameter eventWindowParameter) {
+ super(eventWindowParameter);
}
@Override
- public boolean hasFinalResult() {
- return false;
+ public boolean satisfy(Column column, int index) {
+ if (!initializedEventValue) {
+ return true;
+ }
+ return column.getInt(index) == eventValue;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventIntWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventIntWindowManager.java
new file mode 100644
index 0000000000..ef866dad43
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventIntWindowManager.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public class EqualEventIntWindowManager extends EventIntWindowManager {
+
+ public EqualEventIntWindowManager(EventWindowParameter eventWindowParameter, boolean ascending) {
+ super(eventWindowParameter, ascending);
+ eventWindow = new EqualEventIntWindow(eventWindowParameter);
+ }
+
+ @Override
+ public TsBlock skipPointsOutOfCurWindow(TsBlock inputTsBlock) {
+ if (!needSkip) {
+ return inputTsBlock;
+ }
+
+ if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+ return inputTsBlock;
+ }
+
+ Column controlColumn = inputTsBlock.getColumn(eventWindowParameter.getControlColumnIndex());
+ TimeColumn timeColumn = inputTsBlock.getTimeColumn();
+ int i = 0, size = inputTsBlock.getPositionCount();
+ int previousEventValue = ((EqualEventIntWindow) eventWindow).getPreviousEventValue();
+ for (; i < size; i++) {
+ if (!controlColumn.isNull(i) && controlColumn.getInt(i) != previousEventValue) {
+ break;
+ }
+ // judge whether we need update endTime
+ long currentTime = timeColumn.getLong(i);
+ if (eventWindow.getEndTime() < currentTime) {
+ eventWindow.setEndTime(currentTime);
+ }
+ }
+ // we can create a new window beginning at index i of inputTsBlock
+ if (i < size) {
+ needSkip = false;
+ }
+ return inputTsBlock.subTsBlock(i);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventLongWindow.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventLongWindow.java
index 249cf458fd..d69d3cf7a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventLongWindow.java
@@ -17,35 +17,21 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.aggregation;
+package org.apache.iotdb.db.mpp.execution.operator.window;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
-public class MinTimeDescAccumulator extends MinTimeAccumulator {
+public class EqualEventLongWindow extends EventLongWindow {
- @Override
- public int addInput(Column[] column, IWindow curWindow) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (column[0].isNull(i)) {
- continue;
- }
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint();
- if (!column[2].isNull(i)) {
- updateMinTime(column[1].getLong(i));
- }
- }
- return curPositionCount;
+ public EqualEventLongWindow(EventWindowParameter eventWindowParameter) {
+ super(eventWindowParameter);
}
@Override
- public boolean hasFinalResult() {
- return false;
+ public boolean satisfy(Column column, int index) {
+ if (!initializedEventValue) {
+ return true;
+ }
+ return column.getLong(index) == eventValue;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventLongWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventLongWindowManager.java
new file mode 100644
index 0000000000..6e330b2a5f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventLongWindowManager.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public class EqualEventLongWindowManager extends EventLongWindowManager {
+
+ public EqualEventLongWindowManager(EventWindowParameter eventWindowParameter, boolean ascending) {
+ super(eventWindowParameter, ascending);
+ eventWindow = new EqualEventLongWindow(eventWindowParameter);
+ }
+
+ @Override
+ public TsBlock skipPointsOutOfCurWindow(TsBlock inputTsBlock) {
+ if (!needSkip) {
+ return inputTsBlock;
+ }
+
+ if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+ return inputTsBlock;
+ }
+
+ Column controlColumn = inputTsBlock.getColumn(eventWindowParameter.getControlColumnIndex());
+ TimeColumn timeColumn = inputTsBlock.getTimeColumn();
+ int i = 0, size = inputTsBlock.getPositionCount();
+ long previousEventValue = ((EqualEventLongWindow) eventWindow).getPreviousEventValue();
+ for (; i < size; i++) {
+ if (!controlColumn.isNull(i) && controlColumn.getLong(i) != previousEventValue) {
+ break;
+ }
+ // judge whether we need update endTime
+ long currentTime = timeColumn.getLong(i);
+ if (eventWindow.getEndTime() < currentTime) {
+ eventWindow.setEndTime(currentTime);
+ }
+ }
+ // we can create a new window beginning at index i of inputTsBlock
+ if (i < size) {
+ needSkip = false;
+ }
+ return inputTsBlock.subTsBlock(i);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventTextWindow.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventTextWindow.java
index 249cf458fd..316702bb92 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventTextWindow.java
@@ -17,35 +17,21 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.aggregation;
+package org.apache.iotdb.db.mpp.execution.operator.window;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
-public class MinTimeDescAccumulator extends MinTimeAccumulator {
+public class EqualEventTextWindow extends EventTextWindow {
- @Override
- public int addInput(Column[] column, IWindow curWindow) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (column[0].isNull(i)) {
- continue;
- }
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint();
- if (!column[2].isNull(i)) {
- updateMinTime(column[1].getLong(i));
- }
- }
- return curPositionCount;
+ public EqualEventTextWindow(EventWindowParameter eventWindowParameter) {
+ super(eventWindowParameter);
}
@Override
- public boolean hasFinalResult() {
- return false;
+ public boolean satisfy(Column column, int index) {
+ if (!initializedEventValue) {
+ return true;
+ }
+ return column.getBinary(index).equals(eventValue);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventTextWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventTextWindowManager.java
new file mode 100644
index 0000000000..8882d15b6a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventTextWindowManager.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+public class EqualEventTextWindowManager extends EventTextWindowManager {
+
+ public EqualEventTextWindowManager(EventWindowParameter eventWindowParameter, boolean ascending) {
+ super(eventWindowParameter, ascending);
+ eventWindow = new EqualEventTextWindow(eventWindowParameter);
+ }
+
+ @Override
+ public TsBlock skipPointsOutOfCurWindow(TsBlock inputTsBlock) {
+ if (!needSkip) {
+ return inputTsBlock;
+ }
+
+ if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+ return inputTsBlock;
+ }
+
+ Column controlColumn = inputTsBlock.getColumn(eventWindowParameter.getControlColumnIndex());
+ TimeColumn timeColumn = inputTsBlock.getTimeColumn();
+ int i = 0, size = inputTsBlock.getPositionCount();
+ Binary previousEventValue = ((EqualEventTextWindow) eventWindow).getPreviousEventValue();
+ for (; i < size; i++) {
+ if (!controlColumn.isNull(i) && !controlColumn.getBinary(i).equals(previousEventValue)) {
+ break;
+ }
+ // judge whether we need update endTime
+ long currentTime = timeColumn.getLong(i);
+ if (eventWindow.getEndTime() < currentTime) {
+ eventWindow.setEndTime(currentTime);
+ }
+ }
+ // we can create a new window beginning at index i of inputTsBlock
+ if (i < size) {
+ needSkip = false;
+ }
+ return inputTsBlock.subTsBlock(i);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventBooleanWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventBooleanWindow.java
new file mode 100644
index 0000000000..9f3d8e7d22
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventBooleanWindow.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public abstract class EventBooleanWindow extends EventWindow {
+
+ protected boolean eventValue;
+
+ private boolean previousEventValue;
+
+ public EventBooleanWindow(EventWindowParameter eventWindowParameter) {
+ super(eventWindowParameter);
+ }
+
+ @Override
+ public void updatePreviousEventValue() {
+ previousEventValue = eventValue;
+ }
+
+ @Override
+ public void mergeOnePoint(Column[] controlTimeAndValueColumn, int index) {
+ long currentTime = controlTimeAndValueColumn[1].getLong(index);
+ // judge whether we need update startTime
+ if (startTime > currentTime) {
+ startTime = currentTime;
+ }
+ // judge whether we need update endTime
+ if (endTime < currentTime) {
+ endTime = currentTime;
+ }
+ // judge whether we need initialize eventValue
+ if (!initializedEventValue) {
+ startTime = currentTime;
+ endTime = currentTime;
+ eventValue = controlTimeAndValueColumn[0].getBoolean(index);
+ initializedEventValue = true;
+ }
+ }
+
+ public boolean getEventValue() {
+ return eventValue;
+ }
+
+ public boolean getPreviousEventValue() {
+ return previousEventValue;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventBooleanWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventBooleanWindowManager.java
new file mode 100644
index 0000000000..c291a3484e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventBooleanWindowManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+import java.util.List;
+
+public abstract class EventBooleanWindowManager extends EventWindowManager {
+
+ public EventBooleanWindowManager(EventWindowParameter eventWindowParameter, boolean ascending) {
+ super(eventWindowParameter, ascending);
+ }
+
+ @Override
+ public void appendAggregationResult(
+ TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
+ // Append aggregation results to valueColumnBuilders.
+ ColumnBuilder[] columnBuilders =
+ appendOriginAggregationResult(resultTsBlockBuilder, aggregators);
+ // Judge whether we need output event column.
+ if (eventWindowParameter.isNeedOutputEvent()) {
+ columnBuilders[columnBuilders.length - 1].writeBoolean(
+ ((EventBooleanWindow) eventWindow).getEventValue());
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventDoubleWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventDoubleWindow.java
new file mode 100644
index 0000000000..611d040aff
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventDoubleWindow.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public abstract class EventDoubleWindow extends EventWindow {
+
+ protected double eventValue;
+
+ private double previousEventValue;
+
+ public EventDoubleWindow(EventWindowParameter eventWindowParameter) {
+ super(eventWindowParameter);
+ }
+
+ @Override
+ public void updatePreviousEventValue() {
+ previousEventValue = eventValue;
+ }
+
+ @Override
+ public void mergeOnePoint(Column[] controlTimeAndValueColumn, int index) {
+ long currentTime = controlTimeAndValueColumn[1].getLong(index);
+ // judge whether we need update startTime
+ if (startTime > currentTime) {
+ startTime = currentTime;
+ }
+ // judge whether we need update endTime
+ if (endTime < currentTime) {
+ endTime = currentTime;
+ }
+ // judge whether we need initialize eventValue
+ if (!initializedEventValue) {
+ startTime = currentTime;
+ endTime = currentTime;
+ eventValue = controlTimeAndValueColumn[0].getDouble(index);
+ initializedEventValue = true;
+ }
+ }
+
+ public double getEventValue() {
+ return eventValue;
+ }
+
+ public double getPreviousEventValue() {
+ return previousEventValue;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventDoubleWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventDoubleWindowManager.java
new file mode 100644
index 0000000000..227c9312eb
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventDoubleWindowManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+import java.util.List;
+
+public abstract class EventDoubleWindowManager extends EventWindowManager {
+
+ public EventDoubleWindowManager(EventWindowParameter eventWindowParameter, boolean ascending) {
+ super(eventWindowParameter, ascending);
+ }
+
+ @Override
+ public void appendAggregationResult(
+ TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
+ // Append aggregation results to valueColumnBuilders.
+ ColumnBuilder[] columnBuilders =
+ appendOriginAggregationResult(resultTsBlockBuilder, aggregators);
+ // Judge whether we need output event column.
+ if (eventWindowParameter.isNeedOutputEvent()) {
+ columnBuilders[columnBuilders.length - 1].writeDouble(
+ ((EventDoubleWindow) eventWindow).getEventValue());
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventFloatWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventFloatWindow.java
new file mode 100644
index 0000000000..666a54ba0c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventFloatWindow.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public abstract class EventFloatWindow extends EventWindow {
+
+ protected float eventValue;
+
+ private float previousEventValue;
+
+ public EventFloatWindow(EventWindowParameter eventWindowParameter) {
+ super(eventWindowParameter);
+ }
+
+ @Override
+ public void updatePreviousEventValue() {
+ previousEventValue = eventValue;
+ }
+
+ @Override
+ public void mergeOnePoint(Column[] controlTimeAndValueColumn, int index) {
+ long currentTime = controlTimeAndValueColumn[1].getLong(index);
+ // judge whether we need update startTime
+ if (startTime > currentTime) {
+ startTime = currentTime;
+ }
+ // judge whether we need update endTime
+ if (endTime < currentTime) {
+ endTime = currentTime;
+ }
+ // judge whether we need initialize eventValue
+ if (!initializedEventValue) {
+ startTime = currentTime;
+ endTime = currentTime;
+ eventValue = controlTimeAndValueColumn[0].getFloat(index);
+ initializedEventValue = true;
+ }
+ }
+
+ public float getEventValue() {
+ return eventValue;
+ }
+
+ public float getPreviousEventValue() {
+ return previousEventValue;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventFloatWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventFloatWindowManager.java
new file mode 100644
index 0000000000..d4904760c1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventFloatWindowManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+import java.util.List;
+
+public abstract class EventFloatWindowManager extends EventWindowManager {
+
+ public EventFloatWindowManager(EventWindowParameter eventWindowParameter, boolean ascending) {
+ super(eventWindowParameter, ascending);
+ }
+
+ @Override
+ public void appendAggregationResult(
+ TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
+ // Append aggregation results to valueColumnBuilders.
+ ColumnBuilder[] columnBuilders =
+ appendOriginAggregationResult(resultTsBlockBuilder, aggregators);
+ // Judge whether we need output event column.
+ if (eventWindowParameter.isNeedOutputEvent()) {
+ columnBuilders[columnBuilders.length - 1].writeFloat(
+ ((EventFloatWindow) eventWindow).getEventValue());
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventIntWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventIntWindow.java
new file mode 100644
index 0000000000..e73e186e70
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventIntWindow.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public abstract class EventIntWindow extends EventWindow {
+
+ protected int eventValue;
+
+ private int previousEventValue;
+
+ public EventIntWindow(EventWindowParameter eventWindowParameter) {
+ super(eventWindowParameter);
+ }
+
+ @Override
+ public void updatePreviousEventValue() {
+ previousEventValue = eventValue;
+ }
+
+ @Override
+ public void mergeOnePoint(Column[] controlTimeAndValueColumn, int index) {
+ long currentTime = controlTimeAndValueColumn[1].getLong(index);
+ // judge whether we need update startTime
+ if (startTime > currentTime) {
+ startTime = currentTime;
+ }
+ // judge whether we need update endTime
+ if (endTime < currentTime) {
+ endTime = currentTime;
+ }
+ // judge whether we need initialize eventValue
+ if (!initializedEventValue) {
+ startTime = currentTime;
+ endTime = currentTime;
+ eventValue = controlTimeAndValueColumn[0].getInt(index);
+ initializedEventValue = true;
+ }
+ }
+
+ public int getEventValue() {
+ return eventValue;
+ }
+
+ public int getPreviousEventValue() {
+ return previousEventValue;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventIntWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventIntWindowManager.java
new file mode 100644
index 0000000000..a7233037f3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventIntWindowManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+import java.util.List;
+
+public abstract class EventIntWindowManager extends EventWindowManager {
+
+ public EventIntWindowManager(EventWindowParameter eventWindowParameter, boolean ascending) {
+ super(eventWindowParameter, ascending);
+ }
+
+ @Override
+ public void appendAggregationResult(
+ TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
+ // Append aggregation results to valueColumnBuilders.
+ ColumnBuilder[] columnBuilders =
+ appendOriginAggregationResult(resultTsBlockBuilder, aggregators);
+ // Judge whether we need output event column.
+ if (eventWindowParameter.isNeedOutputEvent()) {
+ columnBuilders[columnBuilders.length - 1].writeInt(
+ ((EventIntWindow) eventWindow).getEventValue());
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventLongWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventLongWindow.java
new file mode 100644
index 0000000000..e9b8d8425d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventLongWindow.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public abstract class EventLongWindow extends EventWindow {
+
+ protected long eventValue;
+
+ private long previousEventValue;
+
+ public EventLongWindow(EventWindowParameter eventWindowParameter) {
+ super(eventWindowParameter);
+ }
+
+ @Override
+ public void updatePreviousEventValue() {
+ previousEventValue = eventValue;
+ }
+
+ @Override
+ public void mergeOnePoint(Column[] controlTimeAndValueColumn, int index) {
+ long currentTime = controlTimeAndValueColumn[1].getLong(index);
+ // judge whether we need update startTime
+ if (startTime > currentTime) {
+ startTime = currentTime;
+ }
+ // judge whether we need update endTime
+ if (endTime < currentTime) {
+ endTime = currentTime;
+ }
+ // judge whether we need initialize eventValue
+ if (!initializedEventValue) {
+ startTime = currentTime;
+ endTime = currentTime;
+ eventValue = controlTimeAndValueColumn[0].getLong(index);
+ initializedEventValue = true;
+ }
+ }
+
+ public long getEventValue() {
+ return eventValue;
+ }
+
+ public long getPreviousEventValue() {
+ return previousEventValue;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventLongWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventLongWindowManager.java
new file mode 100644
index 0000000000..2f02142e10
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventLongWindowManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+import java.util.List;
+
+public abstract class EventLongWindowManager extends EventWindowManager {
+
+ public EventLongWindowManager(EventWindowParameter eventWindowParameter, boolean ascending) {
+ super(eventWindowParameter, ascending);
+ }
+
+ @Override
+ public void appendAggregationResult(
+ TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
+ // Append aggregation results to valueColumnBuilders.
+ ColumnBuilder[] columnBuilders =
+ appendOriginAggregationResult(resultTsBlockBuilder, aggregators);
+ // Judge whether we need output event column.
+ if (eventWindowParameter.isNeedOutputEvent()) {
+ columnBuilders[columnBuilders.length - 1].writeLong(
+ ((EventLongWindow) eventWindow).getEventValue());
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventTextWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventTextWindow.java
new file mode 100644
index 0000000000..219ec490f3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventTextWindow.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+public abstract class EventTextWindow extends EventWindow {
+
+ protected Binary eventValue;
+
+ private Binary previousEventValue;
+
+ public EventTextWindow(EventWindowParameter eventWindowParameter) {
+ super(eventWindowParameter);
+ }
+
+ @Override
+ public void updatePreviousEventValue() {
+ previousEventValue = eventValue;
+ }
+
+ @Override
+ public void mergeOnePoint(Column[] controlTimeAndValueColumn, int index) {
+ long currentTime = controlTimeAndValueColumn[1].getLong(index);
+ // judge whether we need update startTime
+ if (startTime > currentTime) {
+ startTime = currentTime;
+ }
+ // judge whether we need update endTime
+ if (endTime < currentTime) {
+ endTime = currentTime;
+ }
+ // judge whether we need initialize eventValue
+ if (!initializedEventValue) {
+ startTime = currentTime;
+ endTime = currentTime;
+ eventValue = controlTimeAndValueColumn[0].getBinary(index);
+ initializedEventValue = true;
+ }
+ }
+
+ public Binary getEventValue() {
+ return eventValue;
+ }
+
+ public Binary getPreviousEventValue() {
+ return previousEventValue;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventTextWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventTextWindowManager.java
new file mode 100644
index 0000000000..04b2ed7912
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventTextWindowManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+import java.util.List;
+
+public abstract class EventTextWindowManager extends EventWindowManager {
+
+ public EventTextWindowManager(EventWindowParameter eventWindowParameter, boolean ascending) {
+ super(eventWindowParameter, ascending);
+ }
+
+ @Override
+ public void appendAggregationResult(
+ TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
+ // Append aggregation results to valueColumnBuilders.
+ ColumnBuilder[] columnBuilders =
+ appendOriginAggregationResult(resultTsBlockBuilder, aggregators);
+ // Judge whether we need output event column.
+ if (eventWindowParameter.isNeedOutputEvent()) {
+ columnBuilders[columnBuilders.length - 1].writeBinary(
+ ((EventTextWindow) eventWindow).getEventValue());
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindow.java
similarity index 54%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindow.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindow.java
index ee11bd0eaa..8724560378 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindow.java
@@ -20,65 +20,58 @@
package org.apache.iotdb.db.mpp.execution.operator.window;
import org.apache.iotdb.db.mpp.aggregation.Accumulator;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
-public class TimeWindow implements IWindow {
+public abstract class EventWindow implements IWindow {
- private TimeRange curTimeRange;
+ protected EventWindowParameter eventWindowParameter;
- public TimeWindow() {}
+ protected long startTime;
- public TimeWindow(TimeRange curTimeRange) {
- this.curTimeRange = curTimeRange;
- }
-
- public TimeRange getCurTimeRange() {
- return curTimeRange;
- }
+ protected long endTime;
- public long getCurMinTime() {
- return curTimeRange.getMin();
- }
+ protected boolean initializedEventValue;
- public long getCurMaxTime() {
- return curTimeRange.getMax();
+ protected EventWindow(EventWindowParameter eventWindowParameter) {
+ this.eventWindowParameter = eventWindowParameter;
}
@Override
public Column getControlColumn(TsBlock tsBlock) {
- return tsBlock.getTimeColumn();
+ return tsBlock.getColumn(eventWindowParameter.getControlColumnIndex());
}
@Override
- public boolean satisfy(Column column, int index) {
- long curTime = column.getLong(index);
- return curTime <= getCurMaxTime() && curTime >= getCurMinTime();
+ public boolean hasFinalResult(Accumulator accumulator) {
+ return accumulator.hasFinalResult();
}
+ // TODO
@Override
- public void mergeOnePoint() {
- // do nothing
+ public boolean contains(Column column) {
+ return false;
}
- @Override
- public boolean hasFinalResult(Accumulator accumulator) {
- return accumulator.hasFinalResult();
+ public abstract void updatePreviousEventValue();
+
+ public long getStartTime() {
+ return startTime;
}
- @Override
- public boolean contains(Column column) {
- TimeColumn timeColumn = (TimeColumn) column;
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
- long minTime = Math.min(timeColumn.getStartTime(), timeColumn.getEndTime());
- long maxTime = Math.max(timeColumn.getStartTime(), timeColumn.getEndTime());
+ public long getEndTime() {
+ return endTime;
+ }
- return curTimeRange.contains(minTime, maxTime);
+ public void setEndTime(long endTime) {
+ this.endTime = endTime;
}
- public void update(TimeRange curTimeRange) {
- this.curTimeRange = curTimeRange;
+ public void setInitializedEventValue(boolean initializedEventValue) {
+ this.initializedEventValue = initializedEventValue;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindowManager.java
new file mode 100644
index 0000000000..9f6f0db9b1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindowManager.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import java.util.List;
+
+public abstract class EventWindowManager implements IWindowManager {
+
+ protected boolean initialized;
+
+ protected boolean ascending;
+
+ protected boolean needSkip;
+
+ protected EventWindowParameter eventWindowParameter;
+
+ protected EventWindow eventWindow;
+
+ protected EventWindowManager(EventWindowParameter eventWindowParameter, boolean ascending) {
+ this.eventWindowParameter = eventWindowParameter;
+ this.initialized = false;
+ this.ascending = ascending;
+ // At beginning, we do not need to skip inputTsBlock
+ this.needSkip = false;
+ }
+
+ @Override
+ public boolean isCurWindowInit() {
+ return this.initialized;
+ }
+
+ @Override
+ public void initCurWindow(TsBlock tsBlock) {
+ this.initialized = true;
+ this.eventWindow.setInitializedEventValue(false);
+ }
+
+ @Override
+ public boolean hasNext(boolean hasMoreData) {
+ return hasMoreData;
+ }
+
+ @Override
+ public void next() {
+ // When we go into next window, we should pay attention to previous window whether all points
+ // belong to previous window have been consumed. If not, we need skip these points.
+ this.needSkip = true;
+ this.initialized = false;
+ this.eventWindow.updatePreviousEventValue();
+ }
+
+ @Override
+ public IWindow getCurWindow() {
+ return eventWindow;
+ }
+
+ @Override
+ public boolean satisfiedCurWindow(TsBlock inputTsBlock) {
+ return true;
+ }
+
+ @Override
+ public boolean isTsBlockOutOfBound(TsBlock inputTsBlock) {
+ return false;
+ }
+
+ @Override
+ public TsBlockBuilder createResultTsBlockBuilder(List<Aggregator> aggregators) {
+ List<TSDataType> dataTypes = getResultDataTypes(aggregators);
+ // Judge whether we need output endTime column.
+ if (eventWindowParameter.isNeedOutputEndTime()) {
+ dataTypes.add(0, TSDataType.INT64);
+ }
+ // Judge whether we need output event column.
+ if (eventWindowParameter.isNeedOutputEvent()) {
+ dataTypes.add(eventWindowParameter.getDataType());
+ }
+ return new TsBlockBuilder(dataTypes);
+ }
+
+ protected ColumnBuilder[] appendOriginAggregationResult(
+ TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
+ // Use the start time of eventWindow as default output time.
+ TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
+ timeColumnBuilder.writeLong(eventWindow.getStartTime());
+
+ ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
+ int columnIndex = 0;
+ if (eventWindowParameter.isNeedOutputEndTime()) {
+ columnBuilders[0].writeLong(eventWindow.getEndTime());
+ columnIndex = 1;
+ }
+ for (Aggregator aggregator : aggregators) {
+ ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length];
+ columnBuilder[0] = columnBuilders[columnIndex++];
+ if (columnBuilder.length > 1) {
+ columnBuilder[1] = columnBuilders[columnIndex++];
+ }
+ aggregator.outputResult(columnBuilder);
+ }
+ resultTsBlockBuilder.declarePosition();
+ return columnBuilders;
+ }
+
+ @Override
+ public boolean notInitedLastTimeWindow() {
+ return false;
+ }
+
+ @Override
+ public boolean needSkipInAdvance() {
+ return eventWindowParameter.isNeedOutputEndTime();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindowParameter.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindowParameter.java
new file mode 100644
index 0000000000..e4ba73679d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindowParameter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class EventWindowParameter extends WindowParameter {
+
+ private final boolean needOutputEvent;
+ private final double delta;
+ private final TSDataType dataType;
+
+ private final int controlColumnIndex;
+
+ public EventWindowParameter(
+ TSDataType dataType,
+ int controlColumnIndex,
+ boolean needOutputEndTime,
+ boolean needOutputEvent,
+ double delta) {
+ super(needOutputEndTime);
+ this.controlColumnIndex = controlColumnIndex;
+ this.dataType = dataType;
+ this.needOutputEvent = needOutputEvent;
+ this.delta = delta;
+ this.windowType = WindowType.EVENT_WINDOW;
+ }
+
+ public boolean isNeedOutputEvent() {
+ return needOutputEvent;
+ }
+
+ public double getDelta() {
+ return delta;
+ }
+
+ public TSDataType getDataType() {
+ return dataType;
+ }
+
+ public int getControlColumnIndex() {
+ return controlColumnIndex;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindow.java
index da3d0bb324..f6003c0120 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindow.java
@@ -46,7 +46,7 @@ public interface IWindow {
* When we merge a point into window, at this time, we can use this method to change the status in
* this window
*/
- void mergeOnePoint();
+ void mergeOnePoint(Column[] controlTimeAndValueColumn, int index);
/**
* Used to customize whether the window has final aggregation result
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindowManager.java
index faf4c4c171..ee54df4d61 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindowManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindowManager.java
@@ -19,7 +19,14 @@
package org.apache.iotdb.db.mpp.execution.operator.window;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
/**
* Used to customize all the type of window managers, such as TimeWindowManager,
@@ -51,13 +58,6 @@ public interface IWindowManager {
/** Used to mark the current window has got last point */
void next();
- /**
- * Used to get the output time of current window
- *
- * @return the output time of current window
- */
- long currentOutputTime();
-
/**
* Used to get current window
*
@@ -88,4 +88,58 @@ public interface IWindowManager {
* @return whether there are extra points for the next window
*/
boolean isTsBlockOutOfBound(TsBlock inputTsBlock);
+
+ /**
+ * According to the Aggregator list, we could obtain all the aggregation result column type list.
+ *
+ * @param aggregators
+ * @return Aggregation result column type list.
+ */
+ default List<TSDataType> getResultDataTypes(List<Aggregator> aggregators) {
+ List<TSDataType> dataTypes = new ArrayList<>();
+ for (Aggregator aggregator : aggregators) {
+ dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
+ }
+ return dataTypes;
+ }
+
+ /**
+ * Used to create the aggregation resultSet.
+ *
+ * <p>For the implementation, we should consider whether we need to add endTime column and event
+ * column in the resultSet besides the aggregation columns.
+ *
+ * @param aggregators
+ * @return TsBlockBuilder of resultSet
+ */
+ TsBlockBuilder createResultTsBlockBuilder(List<Aggregator> aggregators);
+
+ /**
+ * Used to append a row of aggregation result into the resultSet.
+ *
+ * <p>For the implementation, similar to the method createResultTsBlockBuilder, we should consider
+ * whether we need to add endTime column and event column in the resultSet besides the aggregation
+ * columns.
+ *
+ * @param resultTsBlockBuilder
+ * @param aggregators
+ */
+ void appendAggregationResult(TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators);
+
+ /**
+ * Especially for TimeWindow, if there are no points belong to last TimeWindow, the last
+ * TimeWindow will not initialize window and aggregators in the aggregation frame.
+ *
+ * @return whether the window is TimeWindow and the last TimeWindow has not been initialized
+ */
+ boolean notInitedLastTimeWindow();
+
+ /**
+ * When endTime is required in resultSet, operator should skip the points in last window directly
+ * instead of a default lazy way to get the endTime for constructing the result tsBlock.
+ *
+ * <p>For the windows like TimeWindow which has already cached endTime, this method always return
+ * false.
+ */
+ boolean needSkipInAdvance();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindow.java
index ee11bd0eaa..1927316a24 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindow.java
@@ -59,7 +59,7 @@ public class TimeWindow implements IWindow {
}
@Override
- public void mergeOnePoint() {
+ public void mergeOnePoint(Column[] controlTimeAndValueColumn, int index) {
// do nothing
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindowManager.java
index a2466554f0..4e2754439d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindowManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindowManager.java
@@ -19,26 +19,35 @@
package org.apache.iotdb.db.mpp.execution.operator.window;
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.execution.operator.AggregationUtil;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.read.common.block.TsBlockUtil;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import java.util.List;
public class TimeWindowManager implements IWindowManager {
- private TimeWindow curWindow;
+ private final TimeWindow curWindow;
+ private final boolean ascending;
+ private final ITimeRangeIterator timeRangeIterator;
+ private final boolean needOutputEndTime;
private boolean initialized;
-
- private boolean ascending;
-
- private ITimeRangeIterator timeRangeIterator;
-
private boolean needSkip;
+ private long startTime;
+ private long endTime;
- public TimeWindowManager(ITimeRangeIterator timeRangeIterator) {
+ public TimeWindowManager(
+ ITimeRangeIterator timeRangeIterator, TimeWindowParameter timeWindowParameter) {
this.timeRangeIterator = timeRangeIterator;
this.initialized = false;
this.curWindow = new TimeWindow(this.timeRangeIterator.nextTimeRange());
+ this.needOutputEndTime = timeWindowParameter.isNeedOutputEndTime();
this.ascending = timeRangeIterator.isAscending();
// At beginning, we do not need to skip inputTsBlock
this.needSkip = false;
@@ -65,14 +74,11 @@ public class TimeWindowManager implements IWindowManager {
// belong to previous window have been consumed. If not, we need skip these points.
this.needSkip = true;
this.initialized = false;
+ this.startTime = this.timeRangeIterator.currentOutputTime();
+ this.endTime = this.curWindow.getCurMaxTime();
this.curWindow.update(this.timeRangeIterator.nextTimeRange());
}
- @Override
- public long currentOutputTime() {
- return timeRangeIterator.currentOutputTime();
- }
-
@Override
public IWindow getCurWindow() {
return curWindow;
@@ -130,4 +136,47 @@ public class TimeWindowManager implements IWindowManager {
? inputTsBlock.getEndTime() > this.curWindow.getCurMaxTime()
: inputTsBlock.getEndTime() < this.curWindow.getCurMinTime());
}
+
+ @Override
+ public TsBlockBuilder createResultTsBlockBuilder(List<Aggregator> aggregators) {
+ List<TSDataType> dataTypes = getResultDataTypes(aggregators);
+ // Judge whether we need output endTime column.
+ if (this.needOutputEndTime) {
+ dataTypes.add(0, TSDataType.INT64);
+ }
+ return new TsBlockBuilder(dataTypes);
+ }
+
+ @Override
+ public void appendAggregationResult(
+ TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
+ TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
+ // Use start time of current time range as time column
+ timeColumnBuilder.writeLong(startTime);
+ ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
+ int columnIndex = 0;
+ if (this.needOutputEndTime) {
+ columnBuilders[0].writeLong(endTime);
+ columnIndex = 1;
+ }
+ for (Aggregator aggregator : aggregators) {
+ ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length];
+ columnBuilder[0] = columnBuilders[columnIndex++];
+ if (columnBuilder.length > 1) {
+ columnBuilder[1] = columnBuilders[columnIndex++];
+ }
+ aggregator.outputResult(columnBuilder);
+ }
+ resultTsBlockBuilder.declarePosition();
+ }
+
+ @Override
+ public boolean notInitedLastTimeWindow() {
+ return !this.initialized;
+ }
+
+ @Override
+ public boolean needSkipInAdvance() {
+ return false;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindowParameter.java
similarity index 50%
copy from server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindowParameter.java
index 249cf458fd..d4cfdc0c96 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindowParameter.java
@@ -17,35 +17,12 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.aggregation;
+package org.apache.iotdb.db.mpp.execution.operator.window;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
-import org.apache.iotdb.tsfile.read.common.block.column.Column;
+public class TimeWindowParameter extends WindowParameter {
-public class MinTimeDescAccumulator extends MinTimeAccumulator {
-
- @Override
- public int addInput(Column[] column, IWindow curWindow) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (column[0].isNull(i)) {
- continue;
- }
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint();
- if (!column[2].isNull(i)) {
- updateMinTime(column[1].getLong(i));
- }
- }
- return curPositionCount;
- }
-
- @Override
- public boolean hasFinalResult() {
- return false;
+ public TimeWindowParameter(boolean needOutputEndTime) {
+ super(needOutputEndTime);
+ this.windowType = WindowType.TIME_WINDOW;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventDoubleWindow.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventDoubleWindow.java
index 249cf458fd..583e40ee71 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventDoubleWindow.java
@@ -17,35 +17,21 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.aggregation;
+package org.apache.iotdb.db.mpp.execution.operator.window;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
-public class MinTimeDescAccumulator extends MinTimeAccumulator {
+public class VariationEventDoubleWindow extends EventDoubleWindow {
- @Override
- public int addInput(Column[] column, IWindow curWindow) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (column[0].isNull(i)) {
- continue;
- }
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint();
- if (!column[2].isNull(i)) {
- updateMinTime(column[1].getLong(i));
- }
- }
- return curPositionCount;
+ public VariationEventDoubleWindow(EventWindowParameter eventWindowParameter) {
+ super(eventWindowParameter);
}
@Override
- public boolean hasFinalResult() {
- return false;
+ public boolean satisfy(Column column, int index) {
+ if (!initializedEventValue) {
+ return true;
+ }
+ return Math.abs(column.getDouble(index) - eventValue) <= eventWindowParameter.getDelta();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventDoubleWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventDoubleWindowManager.java
new file mode 100644
index 0000000000..c6093c5d01
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventDoubleWindowManager.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public class VariationEventDoubleWindowManager extends EventDoubleWindowManager {
+
+ public VariationEventDoubleWindowManager(
+ EventWindowParameter eventWindowParameter, boolean ascending) {
+ super(eventWindowParameter, ascending);
+ eventWindow = new VariationEventDoubleWindow(eventWindowParameter);
+ }
+
+ @Override
+ public TsBlock skipPointsOutOfCurWindow(TsBlock inputTsBlock) {
+ if (!needSkip) {
+ return inputTsBlock;
+ }
+
+ if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+ return inputTsBlock;
+ }
+
+ Column controlColumn = inputTsBlock.getColumn(eventWindowParameter.getControlColumnIndex());
+ TimeColumn timeColumn = inputTsBlock.getTimeColumn();
+ int i = 0, size = inputTsBlock.getPositionCount();
+ double previousEventValue = ((VariationEventDoubleWindow) eventWindow).getPreviousEventValue();
+ for (; i < size; i++) {
+ if (!controlColumn.isNull(i)
+ && Math.abs(controlColumn.getDouble(i) - previousEventValue)
+ > eventWindowParameter.getDelta()) {
+ break;
+ }
+ // judge whether we need update endTime
+ long currentTime = timeColumn.getLong(i);
+ if (eventWindow.getEndTime() < currentTime) {
+ eventWindow.setEndTime(currentTime);
+ }
+ }
+ // we can create a new window beginning at index i of inputTsBlock
+ if (i < size) {
+ needSkip = false;
+ }
+ return inputTsBlock.subTsBlock(i);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventFloatWindow.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventFloatWindow.java
index 249cf458fd..65df9a500f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventFloatWindow.java
@@ -17,35 +17,21 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.aggregation;
+package org.apache.iotdb.db.mpp.execution.operator.window;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
-public class MinTimeDescAccumulator extends MinTimeAccumulator {
+public class VariationEventFloatWindow extends EventFloatWindow {
- @Override
- public int addInput(Column[] column, IWindow curWindow) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (column[0].isNull(i)) {
- continue;
- }
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint();
- if (!column[2].isNull(i)) {
- updateMinTime(column[1].getLong(i));
- }
- }
- return curPositionCount;
+ public VariationEventFloatWindow(EventWindowParameter eventWindowParameter) {
+ super(eventWindowParameter);
}
@Override
- public boolean hasFinalResult() {
- return false;
+ public boolean satisfy(Column column, int index) {
+ if (!initializedEventValue) {
+ return true;
+ }
+ return Math.abs(column.getFloat(index) - eventValue) <= eventWindowParameter.getDelta();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventFloatWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventFloatWindowManager.java
new file mode 100644
index 0000000000..a01bb879ec
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventFloatWindowManager.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public class VariationEventFloatWindowManager extends EventFloatWindowManager {
+
+ public VariationEventFloatWindowManager(
+ EventWindowParameter eventWindowParameter, boolean ascending) {
+ super(eventWindowParameter, ascending);
+ eventWindow = new VariationEventFloatWindow(eventWindowParameter);
+ }
+
+ @Override
+ public TsBlock skipPointsOutOfCurWindow(TsBlock inputTsBlock) {
+ if (!needSkip) {
+ return inputTsBlock;
+ }
+
+ if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+ return inputTsBlock;
+ }
+
+ Column controlColumn = inputTsBlock.getColumn(eventWindowParameter.getControlColumnIndex());
+ TimeColumn timeColumn = inputTsBlock.getTimeColumn();
+ int i = 0, size = inputTsBlock.getPositionCount();
+ float previousEventValue = ((VariationEventFloatWindow) eventWindow).getPreviousEventValue();
+ for (; i < size; i++) {
+ if (!controlColumn.isNull(i)
+ && Math.abs(controlColumn.getFloat(i) - previousEventValue)
+ > eventWindowParameter.getDelta()) {
+ break;
+ }
+ // judge whether we need update endTime
+ long currentTime = timeColumn.getLong(i);
+ if (eventWindow.getEndTime() < currentTime) {
+ eventWindow.setEndTime(currentTime);
+ }
+ }
+ // we can create a new window beginning at index i of inputTsBlock
+ if (i < size) {
+ needSkip = false;
+ }
+ return inputTsBlock.subTsBlock(i);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventIntWindow.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventIntWindow.java
index 249cf458fd..a2248c4af9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventIntWindow.java
@@ -17,35 +17,21 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.aggregation;
+package org.apache.iotdb.db.mpp.execution.operator.window;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
-public class MinTimeDescAccumulator extends MinTimeAccumulator {
+public class VariationEventIntWindow extends EventIntWindow {
- @Override
- public int addInput(Column[] column, IWindow curWindow) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (column[0].isNull(i)) {
- continue;
- }
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint();
- if (!column[2].isNull(i)) {
- updateMinTime(column[1].getLong(i));
- }
- }
- return curPositionCount;
+ public VariationEventIntWindow(EventWindowParameter eventWindowParameter) {
+ super(eventWindowParameter);
}
@Override
- public boolean hasFinalResult() {
- return false;
+ public boolean satisfy(Column column, int index) {
+ if (!initializedEventValue) {
+ return true;
+ }
+ return Math.abs(column.getInt(index) - eventValue) <= eventWindowParameter.getDelta();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventIntWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventIntWindowManager.java
new file mode 100644
index 0000000000..1698cd2557
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventIntWindowManager.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public class VariationEventIntWindowManager extends EventIntWindowManager {
+
+ public VariationEventIntWindowManager(
+ EventWindowParameter eventWindowParameter, boolean ascending) {
+ super(eventWindowParameter, ascending);
+ eventWindow = new VariationEventIntWindow(eventWindowParameter);
+ }
+
+ @Override
+ public TsBlock skipPointsOutOfCurWindow(TsBlock inputTsBlock) {
+ if (!needSkip) {
+ return inputTsBlock;
+ }
+
+ if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+ return inputTsBlock;
+ }
+
+ Column controlColumn = inputTsBlock.getColumn(eventWindowParameter.getControlColumnIndex());
+ TimeColumn timeColumn = inputTsBlock.getTimeColumn();
+ int i = 0, size = inputTsBlock.getPositionCount();
+ int previousEventValue = ((VariationEventIntWindow) eventWindow).getPreviousEventValue();
+ for (; i < size; i++) {
+ if (!controlColumn.isNull(i)
+ && Math.abs(controlColumn.getInt(i) - previousEventValue)
+ > eventWindowParameter.getDelta()) {
+ break;
+ }
+ // judge whether we need update endTime
+ long currentTime = timeColumn.getLong(i);
+ if (eventWindow.getEndTime() < currentTime) {
+ eventWindow.setEndTime(currentTime);
+ }
+ }
+ // we can create a new window beginning at index i of inputTsBlock
+ if (i < size) {
+ needSkip = false;
+ }
+ return inputTsBlock.subTsBlock(i);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventLongWindow.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventLongWindow.java
index 249cf458fd..192eab4f8c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventLongWindow.java
@@ -17,35 +17,21 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.aggregation;
+package org.apache.iotdb.db.mpp.execution.operator.window;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
-public class MinTimeDescAccumulator extends MinTimeAccumulator {
+public class VariationEventLongWindow extends EventLongWindow {
- @Override
- public int addInput(Column[] column, IWindow curWindow) {
- int curPositionCount = column[0].getPositionCount();
-
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (column[0].isNull(i)) {
- continue;
- }
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint();
- if (!column[2].isNull(i)) {
- updateMinTime(column[1].getLong(i));
- }
- }
- return curPositionCount;
+ public VariationEventLongWindow(EventWindowParameter eventWindowParameter) {
+ super(eventWindowParameter);
}
@Override
- public boolean hasFinalResult() {
- return false;
+ public boolean satisfy(Column column, int index) {
+ if (!initializedEventValue) {
+ return true;
+ }
+ return Math.abs(column.getLong(index) - eventValue) <= eventWindowParameter.getDelta();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventLongWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventLongWindowManager.java
new file mode 100644
index 0000000000..923e526047
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventLongWindowManager.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public class VariationEventLongWindowManager extends EventLongWindowManager {
+
+ public VariationEventLongWindowManager(
+ EventWindowParameter eventWindowParameter, boolean ascending) {
+ super(eventWindowParameter, ascending);
+ eventWindow = new VariationEventLongWindow(eventWindowParameter);
+ }
+
+ @Override
+ public TsBlock skipPointsOutOfCurWindow(TsBlock inputTsBlock) {
+ if (!needSkip) {
+ return inputTsBlock;
+ }
+
+ if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+ return inputTsBlock;
+ }
+
+ Column controlColumn = inputTsBlock.getColumn(eventWindowParameter.getControlColumnIndex());
+ TimeColumn timeColumn = inputTsBlock.getTimeColumn();
+ int i = 0, size = inputTsBlock.getPositionCount();
+ long previousEventValue = ((VariationEventLongWindow) eventWindow).getPreviousEventValue();
+ for (; i < size; i++) {
+ if (!controlColumn.isNull(i)
+ && Math.abs(controlColumn.getLong(i) - previousEventValue)
+ > eventWindowParameter.getDelta()) {
+ break;
+ }
+ // judge whether we need update endTime
+ long currentTime = timeColumn.getLong(i);
+ if (eventWindow.getEndTime() < currentTime) {
+ eventWindow.setEndTime(currentTime);
+ }
+ }
+ // we can create a new window beginning at index i of inputTsBlock
+ if (i < size) {
+ needSkip = false;
+ }
+ return inputTsBlock.subTsBlock(i);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowManagerFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowManagerFactory.java
new file mode 100644
index 0000000000..bb7bf9dc6c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowManagerFactory.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+
+public class WindowManagerFactory {
+
+ public static IWindowManager genWindowManager(
+ WindowParameter windowParameter, ITimeRangeIterator timeRangeIterator) {
+ switch (windowParameter.getWindowType()) {
+ case TIME_WINDOW:
+ return new TimeWindowManager(timeRangeIterator, (TimeWindowParameter) windowParameter);
+ case EVENT_WINDOW:
+ return ((EventWindowParameter) windowParameter).getDelta() == 0
+ ? genEqualEventWindowManager(
+ (EventWindowParameter) windowParameter, timeRangeIterator.isAscending())
+ : genVariationEventWindowManager(
+ (EventWindowParameter) windowParameter, timeRangeIterator.isAscending());
+ default:
+ throw new IllegalArgumentException("Not support this type of aggregation window.");
+ }
+ }
+
+ private static EventWindowManager genEqualEventWindowManager(
+ EventWindowParameter eventWindowParameter, boolean ascending) {
+ switch (eventWindowParameter.getDataType()) {
+ case INT32:
+ return new EqualEventIntWindowManager(eventWindowParameter, ascending);
+ case INT64:
+ return new EqualEventLongWindowManager(eventWindowParameter, ascending);
+ case FLOAT:
+ return new EqualEventFloatWindowManager(eventWindowParameter, ascending);
+ case DOUBLE:
+ return new EqualEventDoubleWindowManager(eventWindowParameter, ascending);
+ case TEXT:
+ return new EqualEventTextWindowManager(eventWindowParameter, ascending);
+ case BOOLEAN:
+ return new EqualEventBooleanWindowManager(eventWindowParameter, ascending);
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format(
+ "Unsupported data type in equal event aggregation : %s",
+ eventWindowParameter.getDataType()));
+ }
+ }
+
+ private static EventWindowManager genVariationEventWindowManager(
+ EventWindowParameter eventWindowParameter, boolean ascending) {
+ switch (eventWindowParameter.getDataType()) {
+ case INT32:
+ return new VariationEventIntWindowManager(eventWindowParameter, ascending);
+ case INT64:
+ return new VariationEventLongWindowManager(eventWindowParameter, ascending);
+ case FLOAT:
+ return new VariationEventFloatWindowManager(eventWindowParameter, ascending);
+ case DOUBLE:
+ return new VariationEventDoubleWindowManager(eventWindowParameter, ascending);
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format(
+ "Unsupported data type in variation event aggregation : %s",
+ eventWindowParameter.getDataType()));
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowParameter.java
similarity index 50%
copy from server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowParameter.java
index 249cf458fd..3ba9bdbf7f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowParameter.java
@@ -17,35 +17,23 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.aggregation;
+package org.apache.iotdb.db.mpp.execution.operator.window;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
-import org.apache.iotdb.tsfile.read.common.block.column.Column;
+public abstract class WindowParameter {
-public class MinTimeDescAccumulator extends MinTimeAccumulator {
+ protected WindowType windowType;
- @Override
- public int addInput(Column[] column, IWindow curWindow) {
- int curPositionCount = column[0].getPositionCount();
+ private final boolean needOutputEndTime;
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (column[0].isNull(i)) {
- continue;
- }
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint();
- if (!column[2].isNull(i)) {
- updateMinTime(column[1].getLong(i));
- }
- }
- return curPositionCount;
+ public WindowParameter(boolean needOutputEndTime) {
+ this.needOutputEndTime = needOutputEndTime;
}
- @Override
- public boolean hasFinalResult() {
- return false;
+ public WindowType getWindowType() {
+ return windowType;
+ }
+
+ public boolean isNeedOutputEndTime() {
+ return needOutputEndTime;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java
similarity index 50%
copy from server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java
index 249cf458fd..6a6ecd6185 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java
@@ -17,35 +17,20 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.aggregation;
+package org.apache.iotdb.db.mpp.execution.operator.window;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
-import org.apache.iotdb.tsfile.read.common.block.column.Column;
+public enum WindowType {
+ TIME_WINDOW((byte) 0),
-public class MinTimeDescAccumulator extends MinTimeAccumulator {
+ EVENT_WINDOW((byte) 1);
- @Override
- public int addInput(Column[] column, IWindow curWindow) {
- int curPositionCount = column[0].getPositionCount();
+ private final byte type;
- for (int i = 0; i < curPositionCount; i++) {
- // skip null value in control column
- if (column[0].isNull(i)) {
- continue;
- }
- if (!curWindow.satisfy(column[0], i)) {
- return i;
- }
- curWindow.mergeOnePoint();
- if (!column[2].isNull(i)) {
- updateMinTime(column[1].getLong(i));
- }
- }
- return curPositionCount;
+ WindowType(byte type) {
+ this.type = type;
}
- @Override
- public boolean hasFinalResult() {
- return false;
+ public byte getType() {
+ return type;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 1e41632155..a083eb15d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -118,6 +118,8 @@ import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.ShowQueriesOperator;
+import org.apache.iotdb.db.mpp.execution.operator.window.TimeWindowParameter;
+import org.apache.iotdb.db.mpp.execution.operator.window.WindowParameter;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ExpressionTypeAnalyzer;
import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
@@ -1388,13 +1390,16 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
calculateMaxAggregationResultSize(
aggregationDescriptors, timeRangeIterator, context.getTypeProvider());
+ WindowParameter windowParameter = new TimeWindowParameter(false);
+
return new RawDataAggregationOperator(
operatorContext,
aggregators,
timeRangeIterator,
children.get(0),
ascending,
- maxReturnSize);
+ maxReturnSize,
+ windowParameter);
} else {
OperatorContext operatorContext =
context
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index ac812e0b17..ce9df44997 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -70,6 +70,8 @@ import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.window.TimeWindowParameter;
+import org.apache.iotdb.db.mpp.execution.operator.window.WindowParameter;
import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
@@ -1267,6 +1269,8 @@ public class OperatorMemoryTest {
AggregationUtil.calculateMaxAggregationResultSize(
aggregationDescriptors, timeRangeIterator, typeProvider);
+ WindowParameter windowParameter = new TimeWindowParameter(false);
+
RawDataAggregationOperator rawDataAggregationOperator =
new RawDataAggregationOperator(
Mockito.mock(OperatorContext.class),
@@ -1274,7 +1278,8 @@ public class OperatorMemoryTest {
timeRangeIterator,
child,
true,
- maxReturnSize);
+ maxReturnSize,
+ windowParameter);
long expectedMaxReturnSize =
100
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
index e4e70431e1..f89cebafac 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
@@ -39,6 +39,9 @@ import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.window.EventWindowParameter;
+import org.apache.iotdb.db.mpp.execution.operator.window.TimeWindowParameter;
+import org.apache.iotdb.db.mpp.execution.operator.window.WindowParameter;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationType;
@@ -49,6 +52,9 @@ import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterType;
+import org.apache.iotdb.tsfile.read.filter.operator.Gt;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
@@ -68,6 +74,7 @@ import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class RawDataAggregationOperatorTest {
@@ -121,8 +128,10 @@ public class RawDataAggregationOperatorTest {
}
}
+ WindowParameter windowParameter = new TimeWindowParameter(false);
+
RawDataAggregationOperator rawDataAggregationOperator =
- initRawDataAggregationOperator(aggregationTypes, null, inputLocations);
+ initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
int count = 0;
while (rawDataAggregationOperator.hasNext()) {
TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -174,8 +183,10 @@ public class RawDataAggregationOperatorTest {
inputLocations.add(inputLocationForOneAggregator);
}
+ WindowParameter windowParameter = new TimeWindowParameter(false);
+
RawDataAggregationOperator rawDataAggregationOperator =
- initRawDataAggregationOperator(aggregationTypes, null, inputLocations);
+ initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
int count = 0;
while (rawDataAggregationOperator.hasNext()) {
TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -198,7 +209,7 @@ public class RawDataAggregationOperatorTest {
/** Test aggregating raw data by time interval. */
@Test
- public void groupByRawDataTest1() throws IllegalPathException {
+ public void groupByTimeRawDataTest1() throws IllegalPathException {
int[][] result =
new int[][] {
{100, 100, 100, 99},
@@ -225,8 +236,11 @@ public class RawDataAggregationOperatorTest {
}
GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
+ WindowParameter windowParameter = new TimeWindowParameter(false);
+
RawDataAggregationOperator rawDataAggregationOperator =
- initRawDataAggregationOperator(aggregationTypes, groupByTimeParameter, inputLocations);
+ initRawDataAggregationOperator(
+ aggregationTypes, groupByTimeParameter, inputLocations, windowParameter);
int count = 0;
while (rawDataAggregationOperator.hasNext()) {
TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -250,7 +264,7 @@ public class RawDataAggregationOperatorTest {
/** Test aggregating raw data by time interval. */
@Test
- public void groupByRawDataTest2() throws IllegalPathException {
+ public void groupByTimeRawDataTest2() throws IllegalPathException {
double[][] result =
new double[][] {
{20049.5, 20149.5, 6249.5, 8429.808},
@@ -278,8 +292,12 @@ public class RawDataAggregationOperatorTest {
inputLocations.add(inputLocationForOneAggregator);
}
GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
+
+ WindowParameter windowParameter = new TimeWindowParameter(false);
+
RawDataAggregationOperator rawDataAggregationOperator =
- initRawDataAggregationOperator(aggregationTypes, groupByTimeParameter, inputLocations);
+ initRawDataAggregationOperator(
+ aggregationTypes, groupByTimeParameter, inputLocations, windowParameter);
int count = 0;
while (rawDataAggregationOperator.hasNext()) {
TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -303,10 +321,494 @@ public class RawDataAggregationOperatorTest {
assertEquals(4, count);
}
+ /** Test by time interval with EndTime */
+ @Test
+ public void groupByTimeRawDataTest3() throws IllegalPathException {
+ int[][] result =
+ new int[][] {
+ {100, 100, 100, 99},
+ {2004950, 2014950, 624950, 834551},
+ {0, 100, 200, 300},
+ {99, 199, 299, 398},
+ {20099, 20199, 10259, 10379},
+ {20000, 20100, 260, 380},
+ {20000, 20100, 10200, 10300},
+ {20099, 20199, 299, 398}
+ };
+ List<AggregationType> aggregationTypes = new ArrayList<>();
+ List<List<InputLocation[]>> inputLocations = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ aggregationTypes.add(AggregationType.COUNT);
+ aggregationTypes.add(AggregationType.SUM);
+ aggregationTypes.add(AggregationType.MIN_TIME);
+ aggregationTypes.add(AggregationType.MAX_TIME);
+ aggregationTypes.add(AggregationType.MAX_VALUE);
+ aggregationTypes.add(AggregationType.MIN_VALUE);
+ aggregationTypes.add(AggregationType.FIRST_VALUE);
+ aggregationTypes.add(AggregationType.LAST_VALUE);
+ for (int j = 0; j < 8; j++) {
+ List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+ inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+ inputLocations.add(inputLocationForOneAggregator);
+ }
+ }
+ GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
+
+ WindowParameter windowParameter = new TimeWindowParameter(true);
+
+ RawDataAggregationOperator rawDataAggregationOperator =
+ initRawDataAggregationOperator(
+ aggregationTypes, groupByTimeParameter, inputLocations, windowParameter);
+ int count = 0;
+ while (rawDataAggregationOperator.hasNext()) {
+ TsBlock resultTsBlock = rawDataAggregationOperator.next();
+ if (resultTsBlock == null) {
+ continue;
+ }
+
+ for (int row = 0; row < resultTsBlock.getPositionCount(); row++, count++) {
+ assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(row));
+ // endTime
+ long endTime = 100L * count + 99;
+ if (count == 3) {
+ endTime = 398;
+ }
+ assertEquals(endTime, resultTsBlock.getColumn(0).getLong(row));
+ for (int i = 0; i < 2; i++) {
+ assertEquals(result[0][count], resultTsBlock.getColumn(8 * i + 1).getLong(row));
+ assertEquals(result[1][count], resultTsBlock.getColumn(8 * i + 2).getDouble(row), 0.0001);
+ assertEquals(result[2][count], resultTsBlock.getColumn(8 * i + 3).getLong(row));
+ assertEquals(result[3][count], resultTsBlock.getColumn(8 * i + 4).getLong(row));
+ assertEquals(result[4][count], resultTsBlock.getColumn(8 * i + 5).getInt(row));
+ assertEquals(result[5][count], resultTsBlock.getColumn(8 * i + 6).getInt(row));
+ assertEquals(result[6][count], resultTsBlock.getColumn(8 * i + 7).getInt(row));
+ assertEquals(result[7][count], resultTsBlock.getColumn(8 * i + 8).getInt(row));
+ }
+ }
+ }
+ assertEquals(4, count);
+ }
+
+ /** 0 - 99 100 - 199 200 - 299 300 - 399 400 - 499 500 - 599 */
+ @Test
+ public void groupByTimeRawDataTest4() throws IllegalPathException {
+ int[][] result =
+ new int[][] {
+ {100, 100, 100, 100, 100, 0},
+ {20099, 20199, 10259, 10379, 10499},
+ {20000, 20100, 260, 380, 10400},
+ {20000, 20100, 10200, 10300, 10400},
+ {20099, 20199, 299, 399, 10499}
+ };
+ List<AggregationType> aggregationTypes = new ArrayList<>();
+ List<List<InputLocation[]>> inputLocations = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ aggregationTypes.add(AggregationType.COUNT);
+ aggregationTypes.add(AggregationType.MAX_VALUE);
+ aggregationTypes.add(AggregationType.MIN_VALUE);
+ aggregationTypes.add(AggregationType.FIRST_VALUE);
+ aggregationTypes.add(AggregationType.LAST_VALUE);
+ for (int j = 0; j < 8; j++) {
+ List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+ inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+ inputLocations.add(inputLocationForOneAggregator);
+ }
+ }
+ GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 600, 100, 100, true);
+
+ WindowParameter windowParameter = new TimeWindowParameter(true);
+
+ RawDataAggregationOperator rawDataAggregationOperator =
+ initRawDataAggregationOperator(
+ aggregationTypes, groupByTimeParameter, inputLocations, windowParameter);
+ int count = 0;
+ while (rawDataAggregationOperator.hasNext()) {
+ TsBlock resultTsBlock = rawDataAggregationOperator.next();
+ if (resultTsBlock == null) {
+ continue;
+ }
+ for (int row = 0; row < resultTsBlock.getPositionCount(); row++, count++) {
+ assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(row));
+ // endTime
+ assertEquals(100L * count + 99, resultTsBlock.getColumn(0).getLong(row));
+ for (int i = 0; i < 2; i++) {
+ if (count == 5) {
+ assertEquals(result[0][count], resultTsBlock.getColumn(5 * i + 1).getLong(row));
+ assertTrue(resultTsBlock.getColumn(5 * i + 2).isNull(row));
+ assertTrue(resultTsBlock.getColumn(5 * i + 3).isNull(row));
+ assertTrue(resultTsBlock.getColumn(5 * i + 4).isNull(row));
+ assertTrue(resultTsBlock.getColumn(5 * i + 5).isNull(row));
+ continue;
+ }
+ assertEquals(result[0][count], resultTsBlock.getColumn(5 * i + 1).getLong(row));
+ assertEquals(result[1][count], resultTsBlock.getColumn(5 * i + 2).getInt(row));
+ assertEquals(result[2][count], resultTsBlock.getColumn(5 * i + 3).getInt(row));
+ assertEquals(result[3][count], resultTsBlock.getColumn(5 * i + 4).getInt(row));
+ assertEquals(result[4][count], resultTsBlock.getColumn(5 * i + 5).getInt(row));
+ }
+ }
+ }
+ assertEquals(6, count);
+ }
+
+ /**
+ * test the situation when leftCRightO is false. 1 - 100 101 - 200 201 - 300 301 - 400 401 - 500
+ * 501 - 600
+ */
+ @Test
+ public void groupByTimeRawDataTest5() throws IllegalPathException {
+ int[][] result =
+ new int[][] {
+ {100, 100, 100, 100, 99, 0},
+ {20100, 20199, 10300, 10400, 10499}, // max
+ {20001, 10200, 260, 380, 10401}, // min
+ {20001, 20101, 10201, 10301, 10401}, // first
+ {20100, 10200, 10300, 10400, 10499} // last
+ };
+ List<AggregationType> aggregationTypes = new ArrayList<>();
+ List<List<InputLocation[]>> inputLocations = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ aggregationTypes.add(AggregationType.COUNT);
+ aggregationTypes.add(AggregationType.MAX_VALUE);
+ aggregationTypes.add(AggregationType.MIN_VALUE);
+ aggregationTypes.add(AggregationType.FIRST_VALUE);
+ aggregationTypes.add(AggregationType.LAST_VALUE);
+ for (int j = 0; j < 8; j++) {
+ List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+ inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+ inputLocations.add(inputLocationForOneAggregator);
+ }
+ }
+ GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 600, 100, 100, false);
+
+ WindowParameter windowParameter = new TimeWindowParameter(false);
+
+ RawDataAggregationOperator rawDataAggregationOperator =
+ initRawDataAggregationOperator(
+ aggregationTypes, groupByTimeParameter, inputLocations, windowParameter);
+ int count = 0;
+ while (rawDataAggregationOperator.hasNext()) {
+ TsBlock resultTsBlock = rawDataAggregationOperator.next();
+ if (resultTsBlock == null) {
+ continue;
+ }
+ for (int row = 0; row < resultTsBlock.getPositionCount(); row++, count++) {
+ assertEquals(100 * (count + 1), resultTsBlock.getTimeColumn().getLong(row));
+ for (int i = 0; i < 2; i++) {
+ if (count == 5) {
+ assertEquals(result[0][count], resultTsBlock.getColumn(5 * i).getLong(row));
+ assertTrue(resultTsBlock.getColumn(5 * i + 1).isNull(row));
+ assertTrue(resultTsBlock.getColumn(5 * i + 2).isNull(row));
+ assertTrue(resultTsBlock.getColumn(5 * i + 3).isNull(row));
+ assertTrue(resultTsBlock.getColumn(5 * i + 4).isNull(row));
+ continue;
+ }
+ assertEquals(result[0][count], resultTsBlock.getColumn(5 * i).getLong(row));
+ assertEquals(result[1][count], resultTsBlock.getColumn(5 * i + 1).getInt(row));
+ assertEquals(result[2][count], resultTsBlock.getColumn(5 * i + 2).getInt(row));
+ assertEquals(result[3][count], resultTsBlock.getColumn(5 * i + 3).getInt(row));
+ assertEquals(result[4][count], resultTsBlock.getColumn(5 * i + 4).getInt(row));
+ }
+ }
+ }
+ assertEquals(6, count);
+ }
+
+ /** 0 - 259 260 - 299 `300 - 499 */
+ @Test
+ public void groupByEventRawDataTest1() throws IllegalPathException {
+ int[][] result =
+ new int[][] {
+ {0, 260, 300},
+ {259, 299, 499},
+ {20000, 260, 10300},
+ {10259, 299, 10499}
+ };
+ List<AggregationType> aggregationTypes = new ArrayList<>();
+ List<List<InputLocation[]>> inputLocations = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ aggregationTypes.add(AggregationType.MIN_TIME);
+ List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+ inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+ inputLocations.add(inputLocationForOneAggregator);
+ }
+ for (int i = 0; i < 2; i++) {
+ aggregationTypes.add(AggregationType.MAX_TIME);
+ List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+ inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+ inputLocations.add(inputLocationForOneAggregator);
+ }
+ for (int i = 0; i < 2; i++) {
+ aggregationTypes.add(AggregationType.FIRST_VALUE);
+ List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+ inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+ inputLocations.add(inputLocationForOneAggregator);
+ }
+ for (int i = 0; i < 2; i++) {
+ aggregationTypes.add(AggregationType.LAST_VALUE);
+ List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+ inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+ inputLocations.add(inputLocationForOneAggregator);
+ }
+
+ WindowParameter windowParameter =
+ new EventWindowParameter(TSDataType.INT32, 0, false, false, 10000);
+
+ RawDataAggregationOperator rawDataAggregationOperator =
+ initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
+ int count = 0;
+ while (rawDataAggregationOperator.hasNext()) {
+ TsBlock resultTsBlock = rawDataAggregationOperator.next();
+ if (resultTsBlock == null) {
+ continue;
+ }
+ for (int row = 0; row < resultTsBlock.getPositionCount(); row++, count++) {
+ for (int i = 0; i < 2; i++) {
+ assertEquals(result[0][count], resultTsBlock.getColumn(i).getLong(row));
+ }
+ for (int i = 2; i < 4; i++) {
+ assertEquals(result[1][count], resultTsBlock.getColumn(i).getLong(row));
+ }
+ for (int i = 4; i < 6; i++) {
+ assertEquals(result[2][count], resultTsBlock.getColumn(i).getInt(row));
+ }
+ for (int i = 6; i < 8; i++) {
+ assertEquals(result[3][count], resultTsBlock.getColumn(i).getInt(row));
+ }
+ }
+ }
+ assertEquals(3, count);
+ }
+
+ @Test
+ public void groupByEventRawDataTest2() throws IllegalPathException {
+ int[][] result =
+ new int[][] {
+ {4019900, 613770, 11180, 827160, 7790, 1044950},
+ {200, 60, 40, 80, 20, 100},
+ {20000, 10200, 260, 10300, 380, 10400}
+ };
+ long[][] resultTime =
+ new long[][] {
+ {0, 200, 260, 300, 380, 400},
+ {199, 259, 299, 379, 399, 499}
+ };
+ List<AggregationType> aggregationTypes = new ArrayList<>();
+ List<List<InputLocation[]>> inputLocations = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ aggregationTypes.add(AggregationType.SUM);
+ List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+ inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+ inputLocations.add(inputLocationForOneAggregator);
+ }
+ for (int i = 0; i < 2; i++) {
+ aggregationTypes.add(AggregationType.COUNT);
+ List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+ inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+ inputLocations.add(inputLocationForOneAggregator);
+ }
+ for (int i = 0; i < 2; i++) {
+ aggregationTypes.add(AggregationType.FIRST_VALUE);
+ List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+ inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+ inputLocations.add(inputLocationForOneAggregator);
+ }
+
+ WindowParameter windowParameter =
+ new EventWindowParameter(TSDataType.INT32, 0, true, false, 5000);
+
+ RawDataAggregationOperator rawDataAggregationOperator =
+ initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
+ int count = 0;
+ while (rawDataAggregationOperator.hasNext()) {
+ TsBlock resultTsBlock = rawDataAggregationOperator.next();
+ if (resultTsBlock == null) {
+ continue;
+ }
+ for (int row = 0; row < resultTsBlock.getPositionCount(); row++, count++) {
+ assertEquals(resultTime[0][count], resultTsBlock.getTimeByIndex(row));
+ assertEquals(resultTime[1][count], resultTsBlock.getColumn(0).getLong(row));
+ for (int i = 1; i <= 2; i++) {
+ assertEquals(result[0][count], resultTsBlock.getColumn(i).getDouble(row), 0.01);
+ }
+ for (int i = 3; i <= 4; i++) {
+ assertEquals(result[1][count], resultTsBlock.getColumn(i).getLong(row));
+ }
+ for (int i = 5; i <= 6; i++) {
+ assertEquals(result[2][count], resultTsBlock.getColumn(i).getInt(row));
+ }
+ }
+ }
+ assertEquals(6, count);
+ }
+
+ @Test
+ public void groupByEventRawDataTest3() throws IllegalPathException {
+ int[][] result =
+ new int[][] {
+ {4019900, 613770, 11180, 827160, 7790, 1044950},
+ {200, 60, 40, 80, 20, 100},
+ {20000, 10200, 260, 10300, 380, 10400}
+ };
+ long[] resultTime = new long[] {0, 200, 260, 300, 380, 400};
+ List<AggregationType> aggregationTypes = new ArrayList<>();
+ List<List<InputLocation[]>> inputLocations = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ aggregationTypes.add(AggregationType.SUM);
+ List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+ inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+ inputLocations.add(inputLocationForOneAggregator);
+ }
+ for (int i = 0; i < 2; i++) {
+ aggregationTypes.add(AggregationType.COUNT);
+ List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+ inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+ inputLocations.add(inputLocationForOneAggregator);
+ }
+ for (int i = 0; i < 2; i++) {
+ aggregationTypes.add(AggregationType.FIRST_VALUE);
+ List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+ inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+ inputLocations.add(inputLocationForOneAggregator);
+ }
+
+ WindowParameter windowParameter =
+ new EventWindowParameter(TSDataType.INT32, 0, false, false, 5000);
+
+ RawDataAggregationOperator rawDataAggregationOperator =
+ initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
+ int count = 0;
+ while (rawDataAggregationOperator.hasNext()) {
+ TsBlock resultTsBlock = rawDataAggregationOperator.next();
+ if (resultTsBlock == null) {
+ continue;
+ }
+ for (int row = 0; row < resultTsBlock.getPositionCount(); row++, count++) {
+ assertEquals(resultTime[count], resultTsBlock.getTimeByIndex(row));
+ for (int i = 0; i < 2; i++) {
+ assertEquals(result[0][count], resultTsBlock.getColumn(i).getDouble(row), 0.01);
+ }
+ for (int i = 2; i < 4; i++) {
+ assertEquals(result[1][count], resultTsBlock.getColumn(i).getLong(row));
+ }
+ for (int i = 4; i < 6; i++) {
+ assertEquals(result[2][count], resultTsBlock.getColumn(i).getInt(row));
+ }
+ }
+ }
+ assertEquals(6, count);
+ }
+ /** 0 - 199 200 - 259 260 - 299 300 - 379 380 - 399 400 - 499 */
+ @Test
+ public void groupByEventRawDataTest4() throws IllegalPathException {
+ int[] result =
+ new int[] {
+ 20000, 10200, 260, 10300, 380, 10400,
+ };
+ long[][] resultTime =
+ new long[][] {
+ {0, 200, 260, 300, 380, 400},
+ {199, 259, 299, 379, 399, 499}
+ };
+
+ List<AggregationType> aggregationTypes = new ArrayList<>();
+ List<List<InputLocation[]>> inputLocations = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ aggregationTypes.add(AggregationType.FIRST_VALUE);
+ List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+ inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+ inputLocations.add(inputLocationForOneAggregator);
+ }
+
+ WindowParameter windowParameter =
+ new EventWindowParameter(TSDataType.INT32, 0, true, false, 5000);
+
+ RawDataAggregationOperator rawDataAggregationOperator =
+ initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
+ int count = 0;
+ while (rawDataAggregationOperator.hasNext()) {
+ TsBlock resultTsBlock = rawDataAggregationOperator.next();
+ if (resultTsBlock == null) {
+ continue;
+ }
+ for (int row = 0; row < resultTsBlock.getPositionCount(); row++, count++) {
+ assertEquals(resultTime[0][count], resultTsBlock.getTimeByIndex(row));
+ assertEquals(resultTime[1][count], resultTsBlock.getColumn(0).getLong(row));
+ for (int i = 1; i <= 2; i++) {
+ assertEquals(result[count], resultTsBlock.getColumn(i).getInt(row));
+ }
+ }
+ }
+ assertEquals(6, count);
+ }
+
+ @Test
+ public void onePointInOneEqualEventWindowTest() throws IllegalPathException {
+ WindowParameter windowParameter =
+ new EventWindowParameter(TSDataType.INT32, 0, false, false, 0);
+ onePointInOneWindowTest(windowParameter);
+ }
+
+ @Test
+ public void onePointInOneVariationEventWindowTest() throws IllegalPathException {
+ WindowParameter windowParameter =
+ new EventWindowParameter(TSDataType.INT32, 0, false, false, 0.5);
+ onePointInOneWindowTest(windowParameter);
+ }
+
+ private void onePointInOneWindowTest(WindowParameter windowParameter)
+ throws IllegalPathException {
+ List<AggregationType> aggregationTypes = new ArrayList<>();
+ List<List<InputLocation[]>> inputLocations = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ aggregationTypes.add(AggregationType.COUNT);
+ List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+ inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+ inputLocations.add(inputLocationForOneAggregator);
+ }
+ for (int i = 0; i < 2; i++) {
+ aggregationTypes.add(AggregationType.MIN_TIME);
+ List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+ inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+ inputLocations.add(inputLocationForOneAggregator);
+ }
+
+ RawDataAggregationOperator rawDataAggregationOperator =
+ initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
+
+ int resultMinTime1 = -1, resultMinTime2 = -1;
+ while (rawDataAggregationOperator.hasNext()) {
+ TsBlock resultTsBlock = rawDataAggregationOperator.next();
+ if (resultTsBlock == null) {
+ continue;
+ }
+ for (int i = 0; i < 2; i++) {
+ for (int j = 0; j < resultTsBlock.getColumn(i).getPositionCount(); j++) {
+ assertEquals(1, resultTsBlock.getColumn(i).getLong(j));
+ }
+ }
+ // Here, a resultTsBlock has many aggregation results instead of one.
+ for (int i = 2; i < 4; i++) {
+ if (i == 2) {
+ for (int j = 0; j < resultTsBlock.getColumn(i).getPositionCount(); j++) {
+ assertEquals(++resultMinTime1, resultTsBlock.getColumn(i).getLong(j));
+ }
+ } else {
+ for (int j = 0; j < resultTsBlock.getColumn(i).getPositionCount(); j++) {
+ assertEquals(++resultMinTime2, resultTsBlock.getColumn(i).getLong(j));
+ }
+ }
+ }
+ }
+ assertEquals(resultMinTime1, 499);
+ assertEquals(resultMinTime2, 499);
+ }
+
private RawDataAggregationOperator initRawDataAggregationOperator(
List<AggregationType> aggregationTypes,
GroupByTimeParameter groupByTimeParameter,
- List<List<InputLocation[]>> inputLocations)
+ List<List<InputLocation[]>> inputLocations,
+ WindowParameter windowParameter)
throws IllegalPathException {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
@@ -339,6 +841,10 @@ public class RawDataAggregationOperatorTest {
operatorContext.setMaxRunTime(TEST_TIME_SLICE);
});
+ Filter timeFilter = null;
+ if (groupByTimeParameter != null && !groupByTimeParameter.isLeftCRightO()) {
+ timeFilter = new Gt<>(0L, FilterType.TIME_FILTER);
+ }
SeriesScanOperator seriesScanOperator1 =
new SeriesScanOperator(
driverContext.getOperatorContexts().get(0),
@@ -346,7 +852,7 @@ public class RawDataAggregationOperatorTest {
measurementPath1,
allSensors,
TSDataType.INT32,
- null,
+ timeFilter,
null,
true);
seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
@@ -360,7 +866,7 @@ public class RawDataAggregationOperatorTest {
measurementPath2,
allSensors,
TSDataType.INT32,
- null,
+ timeFilter,
null,
true);
seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
@@ -389,6 +895,7 @@ public class RawDataAggregationOperatorTest {
initTimeRangeIterator(groupByTimeParameter, true, true),
timeJoinOperator,
true,
- DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+ DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+ windowParameter);
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
index 7f3bf6e268..b331966809 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
@@ -53,6 +53,24 @@ import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR;
*
* <p>UnSequence time range of data: [0, 19], [100, 139], [200, 259], [300, 379], [400, 499], [0,
* 199]
+ *
+ * <p>The time series generated by this util are:
+ *
+ * <p>time value
+ *
+ * <p>0-99 20000-20099
+ *
+ * <p>100-199 20100-20199
+ *
+ * <p>200-259 10200-10259
+ *
+ * <p>260-299 260-299
+ *
+ * <p>300-379 10300-10379
+ *
+ * <p>380-399 380-399
+ *
+ * <p>400-499 10400-10499
*/
public class SeriesReaderTestUtil {