You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2023/01/10 11:37:13 UTC

[iotdb] branch master updated: [IOTDB-4437] Implement event window in RawDataAggregationOperator

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5dac205243 [IOTDB-4437] Implement event window in RawDataAggregationOperator
5dac205243 is described below

commit 5dac205243e3e6b321966f456a15d817af56f8fb
Author: AACEPT <34...@users.noreply.github.com>
AuthorDate: Tue Jan 10 19:37:07 2023 +0800

    [IOTDB-4437] Implement event window in RawDataAggregationOperator
---
 .../iotdb/db/mpp/aggregation/AvgAccumulator.java   |   8 +-
 .../iotdb/db/mpp/aggregation/CountAccumulator.java |   4 +-
 .../db/mpp/aggregation/ExtremeAccumulator.java     |   8 +-
 .../db/mpp/aggregation/FirstValueAccumulator.java  |  12 +-
 .../mpp/aggregation/FirstValueDescAccumulator.java |  12 +-
 .../db/mpp/aggregation/LastValueAccumulator.java   |  12 +-
 .../mpp/aggregation/LastValueDescAccumulator.java  |  12 +-
 .../db/mpp/aggregation/MaxTimeAccumulator.java     |   2 +-
 .../db/mpp/aggregation/MaxTimeDescAccumulator.java |   2 +-
 .../db/mpp/aggregation/MaxValueAccumulator.java    |   8 +-
 .../db/mpp/aggregation/MinTimeAccumulator.java     |   2 +-
 .../db/mpp/aggregation/MinTimeDescAccumulator.java |   2 +-
 .../db/mpp/aggregation/MinValueAccumulator.java    |   8 +-
 .../iotdb/db/mpp/aggregation/SumAccumulator.java   |   8 +-
 .../process/RawDataAggregationOperator.java        |  51 +-
 .../process/SingleInputAggregationOperator.java    |  12 +-
 .../process/SlidingWindowAggregationOperator.java  |  11 +
 .../operator/window/EqualEventBooleanWindow.java}  |  32 +-
 .../window/EqualEventBooleanWindowManager.java     |  64 +++
 .../operator/window/EqualEventDoubleWindow.java}   |  32 +-
 .../window/EqualEventDoubleWindowManager.java      |  64 +++
 .../operator/window/EqualEventFloatWindow.java}    |  32 +-
 .../window/EqualEventFloatWindowManager.java       |  64 +++
 .../operator/window/EqualEventIntWindow.java}      |  32 +-
 .../window/EqualEventIntWindowManager.java         |  63 +++
 .../operator/window/EqualEventLongWindow.java}     |  32 +-
 .../window/EqualEventLongWindowManager.java        |  63 +++
 .../operator/window/EqualEventTextWindow.java}     |  32 +-
 .../window/EqualEventTextWindowManager.java        |  64 +++
 .../operator/window/EventBooleanWindow.java        |  66 +++
 .../operator/window/EventBooleanWindowManager.java |  46 ++
 .../operator/window/EventDoubleWindow.java         |  66 +++
 .../operator/window/EventDoubleWindowManager.java  |  46 ++
 .../operator/window/EventFloatWindow.java          |  66 +++
 .../operator/window/EventFloatWindowManager.java   |  46 ++
 .../execution/operator/window/EventIntWindow.java  |  66 +++
 .../operator/window/EventIntWindowManager.java     |  46 ++
 .../execution/operator/window/EventLongWindow.java |  66 +++
 .../operator/window/EventLongWindowManager.java    |  46 ++
 .../execution/operator/window/EventTextWindow.java |  67 +++
 .../operator/window/EventTextWindowManager.java    |  46 ++
 .../window/{TimeWindow.java => EventWindow.java}   |  61 ++-
 .../operator/window/EventWindowManager.java        | 138 ++++++
 .../operator/window/EventWindowParameter.java      |  61 +++
 .../db/mpp/execution/operator/window/IWindow.java  |   2 +-
 .../execution/operator/window/IWindowManager.java  |  68 ++-
 .../mpp/execution/operator/window/TimeWindow.java  |   2 +-
 .../operator/window/TimeWindowManager.java         |  73 ++-
 .../operator/window/TimeWindowParameter.java}      |  33 +-
 .../window/VariationEventDoubleWindow.java}        |  32 +-
 .../window/VariationEventDoubleWindowManager.java  |  66 +++
 .../window/VariationEventFloatWindow.java}         |  32 +-
 .../window/VariationEventFloatWindowManager.java   |  66 +++
 .../operator/window/VariationEventIntWindow.java}  |  32 +-
 .../window/VariationEventIntWindowManager.java     |  66 +++
 .../operator/window/VariationEventLongWindow.java} |  32 +-
 .../window/VariationEventLongWindowManager.java    |  66 +++
 .../operator/window/WindowManagerFactory.java      |  84 ++++
 .../operator/window/WindowParameter.java}          |  36 +-
 .../operator/window/WindowType.java}               |  33 +-
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |   7 +-
 .../mpp/execution/operator/OperatorMemoryTest.java |   7 +-
 .../operator/RawDataAggregationOperatorTest.java   | 527 ++++++++++++++++++++-
 .../query/reader/series/SeriesReaderTestUtil.java  |  18 +
 64 files changed, 2517 insertions(+), 446 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java
index c647087362..d8d1be7027 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java
@@ -157,7 +157,7 @@ public class AvgAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         initResult = true;
         countValue++;
@@ -178,7 +178,7 @@ public class AvgAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         initResult = true;
         countValue++;
@@ -199,7 +199,7 @@ public class AvgAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         initResult = true;
         countValue++;
@@ -220,7 +220,7 @@ public class AvgAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         initResult = true;
         countValue++;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java
index 0455255a45..b43129fe61 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java
@@ -38,7 +38,7 @@ public class CountAccumulator implements Accumulator {
   public int addInput(Column[] column, IWindow curWindow) {
     int curPositionCount = column[0].getPositionCount();
 
-    if (!column[2].mayHaveNull() && curWindow.contains(column[1])) {
+    if (!column[2].mayHaveNull() && curWindow.contains(column[0])) {
       countValue += curPositionCount;
     } else {
       for (int i = 0; i < curPositionCount; i++) {
@@ -49,7 +49,7 @@ public class CountAccumulator implements Accumulator {
         if (!curWindow.satisfy(column[0], i)) {
           return i;
         }
-        curWindow.mergeOnePoint();
+        curWindow.mergeOnePoint(column, i);
         if (!column[2].isNull(i)) {
           countValue++;
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java
index eedbe08063..9f7704e995 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java
@@ -232,7 +232,7 @@ public class ExtremeAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateIntResult(column[2].getInt(i));
       }
@@ -264,7 +264,7 @@ public class ExtremeAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateLongResult(column[2].getLong(i));
       }
@@ -296,7 +296,7 @@ public class ExtremeAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateFloatResult(column[2].getFloat(i));
       }
@@ -328,7 +328,7 @@ public class ExtremeAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateDoubleResult(column[2].getDouble(i));
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
index 90ba07f543..180c3154d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
@@ -257,7 +257,7 @@ public class FirstValueAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateIntFirstValue(column[2].getInt(i), column[1].getLong(i));
         return i;
@@ -286,7 +286,7 @@ public class FirstValueAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateLongFirstValue(column[2].getLong(i), column[1].getLong(i));
         return i;
@@ -315,7 +315,7 @@ public class FirstValueAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateFloatFirstValue(column[2].getFloat(i), column[1].getLong(i));
         return i;
@@ -344,7 +344,7 @@ public class FirstValueAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateDoubleFirstValue(column[2].getDouble(i), column[1].getLong(i));
         return i;
@@ -373,7 +373,7 @@ public class FirstValueAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateBooleanFirstValue(column[2].getBoolean(i), column[1].getLong(i));
         return i;
@@ -402,7 +402,7 @@ public class FirstValueAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateBinaryFirstValue(column[2].getBinary(i), column[1].getLong(i));
         return i;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueDescAccumulator.java
index 5717e7d175..eb5ed724f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueDescAccumulator.java
@@ -47,7 +47,7 @@ public class FirstValueDescAccumulator extends FirstValueAccumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateIntFirstValue(column[2].getInt(i), column[1].getLong(i));
       }
@@ -67,7 +67,7 @@ public class FirstValueDescAccumulator extends FirstValueAccumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateLongFirstValue(column[2].getLong(i), column[1].getLong(i));
       }
@@ -87,7 +87,7 @@ public class FirstValueDescAccumulator extends FirstValueAccumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateFloatFirstValue(column[2].getFloat(i), column[1].getLong(i));
       }
@@ -107,7 +107,7 @@ public class FirstValueDescAccumulator extends FirstValueAccumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateDoubleFirstValue(column[2].getDouble(i), column[1].getLong(i));
       }
@@ -127,7 +127,7 @@ public class FirstValueDescAccumulator extends FirstValueAccumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateBooleanFirstValue(column[2].getBoolean(i), column[1].getLong(i));
       }
@@ -147,7 +147,7 @@ public class FirstValueDescAccumulator extends FirstValueAccumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateBinaryFirstValue(column[2].getBinary(i), column[1].getLong(i));
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
index e198948f49..485333a680 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
@@ -257,7 +257,7 @@ public class LastValueAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateIntLastValue(column[2].getInt(i), column[1].getLong(i));
       }
@@ -284,7 +284,7 @@ public class LastValueAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateLongLastValue(column[2].getLong(i), column[1].getLong(i));
       }
@@ -311,7 +311,7 @@ public class LastValueAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateFloatLastValue(column[2].getFloat(i), column[1].getLong(i));
       }
@@ -338,7 +338,7 @@ public class LastValueAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateDoubleLastValue(column[2].getDouble(i), column[1].getLong(i));
       }
@@ -365,7 +365,7 @@ public class LastValueAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateBooleanLastValue(column[2].getBoolean(i), column[1].getLong(i));
       }
@@ -392,7 +392,7 @@ public class LastValueAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateBinaryLastValue(column[2].getBinary(i), column[1].getLong(i));
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueDescAccumulator.java
index 98b77c3030..34ded05789 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueDescAccumulator.java
@@ -51,7 +51,7 @@ public class LastValueDescAccumulator extends LastValueAccumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateIntLastValue(column[2].getInt(i), column[1].getLong(i));
         return i;
@@ -73,7 +73,7 @@ public class LastValueDescAccumulator extends LastValueAccumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateLongLastValue(column[2].getLong(i), column[1].getLong(i));
         return i;
@@ -95,7 +95,7 @@ public class LastValueDescAccumulator extends LastValueAccumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateFloatLastValue(column[2].getFloat(i), column[1].getLong(i));
         return i;
@@ -117,7 +117,7 @@ public class LastValueDescAccumulator extends LastValueAccumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateDoubleLastValue(column[2].getDouble(i), column[1].getLong(i));
         return i;
@@ -139,7 +139,7 @@ public class LastValueDescAccumulator extends LastValueAccumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateBooleanLastValue(column[2].getBoolean(i), column[1].getLong(i));
         return i;
@@ -161,7 +161,7 @@ public class LastValueDescAccumulator extends LastValueAccumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateBinaryLastValue(column[2].getBinary(i), column[1].getLong(i));
         return i;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java
index ef1fa58dc1..ef045ad9c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java
@@ -48,7 +48,7 @@ public class MaxTimeAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateMaxTime(column[1].getLong(i));
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeDescAccumulator.java
index 536a60fe25..6ed18bd381 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeDescAccumulator.java
@@ -38,7 +38,7 @@ public class MaxTimeDescAccumulator extends MaxTimeAccumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateMaxTime(column[1].getLong(i));
         return i;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java
index 768e9cf16f..cf93148980 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java
@@ -230,7 +230,7 @@ public class MaxValueAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateIntResult(column[2].getInt(i));
       }
@@ -256,7 +256,7 @@ public class MaxValueAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateLongResult(column[2].getLong(i));
       }
@@ -282,7 +282,7 @@ public class MaxValueAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateFloatResult(column[2].getFloat(i));
       }
@@ -308,7 +308,7 @@ public class MaxValueAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateDoubleResult(column[2].getDouble(i));
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java
index 252ccaf02b..2229f2de2b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java
@@ -48,7 +48,7 @@ public class MinTimeAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateMinTime(column[1].getLong(i));
         return i;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
index 249cf458fd..e63a9765dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
@@ -36,7 +36,7 @@ public class MinTimeDescAccumulator extends MinTimeAccumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateMinTime(column[1].getLong(i));
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java
index 2abd59e6e1..1d49b56277 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java
@@ -230,7 +230,7 @@ public class MinValueAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateIntResult(column[2].getInt(i));
       }
@@ -256,7 +256,7 @@ public class MinValueAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateLongResult(column[2].getLong(i));
       }
@@ -282,7 +282,7 @@ public class MinValueAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateFloatResult(column[2].getFloat(i));
       }
@@ -308,7 +308,7 @@ public class MinValueAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         updateDoubleResult(column[2].getDouble(i));
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java
index bd113d7801..85edd382f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java
@@ -146,7 +146,7 @@ public class SumAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         initResult = true;
         sumValue += column[2].getInt(i);
@@ -166,7 +166,7 @@ public class SumAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         initResult = true;
         sumValue += column[2].getLong(i);
@@ -186,7 +186,7 @@ public class SumAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         initResult = true;
         sumValue += column[2].getFloat(i);
@@ -206,7 +206,7 @@ public class SumAccumulator implements Accumulator {
       if (!curWindow.satisfy(column[0], i)) {
         return i;
       }
-      curWindow.mergeOnePoint();
+      curWindow.mergeOnePoint(column, i);
       if (!column[2].isNull(i)) {
         initResult = true;
         sumValue += column[2].getDouble(i);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index b5ce5ef823..f34c92438f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -25,12 +25,12 @@ import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.db.mpp.execution.operator.window.IWindowManager;
-import org.apache.iotdb.db.mpp.execution.operator.window.TimeWindowManager;
+import org.apache.iotdb.db.mpp.execution.operator.window.WindowParameter;
 
 import java.util.List;
 
-import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.isAllAggregatorsHasFinalResult;
+import static org.apache.iotdb.db.mpp.execution.operator.window.WindowManagerFactory.genWindowManager;
 
 /**
  * RawDataAggregationOperator is used to process raw data tsBlock input calculating using value
@@ -46,19 +46,27 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
 
   private final IWindowManager windowManager;
 
+  // needSkip is the signal to determine whether to skip the points out of current window to get
+  // endTime when the resultSet needs to output endTime.
+  // If the resultSet needs endTime, needSkip will be set to true when the operator is skipping the
+  // points out of current window.
+  private boolean needSkip = false;
+
   public RawDataAggregationOperator(
       OperatorContext operatorContext,
       List<Aggregator> aggregators,
       ITimeRangeIterator timeRangeIterator,
       Operator child,
       boolean ascending,
-      long maxReturnSize) {
+      long maxReturnSize,
+      WindowParameter windowParameter) {
     super(operatorContext, aggregators, child, ascending, maxReturnSize);
-    this.windowManager = new TimeWindowManager(timeRangeIterator);
+    this.windowManager = genWindowManager(windowParameter, timeRangeIterator);
+    this.resultTsBlockBuilder = windowManager.createResultTsBlockBuilder(aggregators);
   }
 
   private boolean hasMoreData() {
-    return inputTsBlock != null || child.hasNextWithTimer();
+    return !(inputTsBlock == null || inputTsBlock.isEmpty()) || child.hasNextWithTimer();
   }
 
   @Override
@@ -68,29 +76,48 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
 
   @Override
   protected boolean calculateNextAggregationResult() {
-    while (!calculateFromRawData()) {
+
+    // if needSkip is true, just get the tsBlock directly.
+    while (needSkip || !calculateFromRawData()) {
       inputTsBlock = null;
 
       // NOTE: child.next() can only be invoked once
       if (child.hasNextWithTimer() && canCallNext) {
         inputTsBlock = child.nextWithTimer();
         canCallNext = false;
+        if (needSkip) {
+          break;
+        }
       } else if (child.hasNextWithTimer()) {
         // if child still has next but can't be invoked now
         return false;
       } else {
-        // If there are no points belong to last window, the last window will not
-        // initialize window and aggregators
-        if (!windowManager.isCurWindowInit()) {
+        // If there are no points belong to last time window, the last time window will not
+        // initialize window and aggregators. Specially for time window.
+        if (windowManager.notInitedLastTimeWindow()) {
           initWindowAndAggregators();
         }
         break;
       }
     }
 
-    updateResultTsBlock();
     // Step into next window
-    windowManager.next();
+    // if needSkip is true, don't need to enter next window again
+    if (!needSkip) {
+      windowManager.next();
+    }
+    // When some windows without cached endTime trying to output endTime,
+    // they need to skip the points in lastWindow in advance to get endTime
+    if (windowManager.needSkipInAdvance()) {
+      needSkip = true;
+      inputTsBlock = windowManager.skipPointsOutOfCurWindow(inputTsBlock);
+      if ((inputTsBlock == null || inputTsBlock.isEmpty()) && child.hasNextWithTimer()) {
+        return canCallNext;
+      }
+      needSkip = false;
+    }
+
+    updateResultTsBlock();
 
     return true;
   }
@@ -135,7 +162,7 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
 
   @Override
   protected void updateResultTsBlock() {
-    appendAggregationResult(resultTsBlockBuilder, aggregators, windowManager.currentOutputTime());
+    windowManager.appendAggregationResult(resultTsBlockBuilder, aggregators);
   }
 
   private boolean skipPreviousWindowAndInitCurWindow() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
index 93310c5731..0fb561e1e3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
@@ -22,14 +22,11 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -45,7 +42,7 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
   protected final List<Aggregator> aggregators;
 
   // using for building result tsBlock
-  protected final TsBlockBuilder resultTsBlockBuilder;
+  protected TsBlockBuilder resultTsBlockBuilder;
 
   protected final long maxRetainedSize;
   protected final long maxReturnSize;
@@ -60,13 +57,6 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
     this.ascending = ascending;
     this.child = child;
     this.aggregators = aggregators;
-
-    List<TSDataType> dataTypes = new ArrayList<>();
-    for (Aggregator aggregator : aggregators) {
-      dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
-    }
-    this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
-
     this.maxRetainedSize = child.calculateMaxReturnSize();
     this.maxReturnSize = maxReturnSize;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
index 805b183677..a0c93aa28f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -24,8 +24,12 @@ import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -54,6 +58,13 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper
     checkArgument(
         groupByTimeParameter != null,
         "GroupByTimeParameter cannot be null in SlidingWindowAggregationOperator");
+
+    List<TSDataType> dataTypes = new ArrayList<>();
+    for (Aggregator aggregator : aggregators) {
+      dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
+    }
+    this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+
     this.timeRangeIterator = timeRangeIterator;
     this.subTimeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending, true);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventBooleanWindow.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventBooleanWindow.java
index 249cf458fd..78258c20c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventBooleanWindow.java
@@ -17,35 +17,21 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.aggregation;
+package org.apache.iotdb.db.mpp.execution.operator.window;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 
-public class MinTimeDescAccumulator extends MinTimeAccumulator {
+public class EqualEventBooleanWindow extends EventBooleanWindow {
 
-  @Override
-  public int addInput(Column[] column, IWindow curWindow) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (column[0].isNull(i)) {
-        continue;
-      }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint();
-      if (!column[2].isNull(i)) {
-        updateMinTime(column[1].getLong(i));
-      }
-    }
-    return curPositionCount;
+  public EqualEventBooleanWindow(EventWindowParameter eventWindowParameter) {
+    super(eventWindowParameter);
   }
 
   @Override
-  public boolean hasFinalResult() {
-    return false;
+  public boolean satisfy(Column column, int index) {
+    if (!initializedEventValue) {
+      return true;
+    }
+    return column.getBoolean(index) == eventValue;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventBooleanWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventBooleanWindowManager.java
new file mode 100644
index 0000000000..b1da72bff4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventBooleanWindowManager.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public class EqualEventBooleanWindowManager extends EventBooleanWindowManager {
+
+  public EqualEventBooleanWindowManager(
+      EventWindowParameter eventWindowParameter, boolean ascending) {
+    super(eventWindowParameter, ascending);
+    eventWindow = new EqualEventBooleanWindow(eventWindowParameter);
+  }
+
+  @Override
+  public TsBlock skipPointsOutOfCurWindow(TsBlock inputTsBlock) {
+    if (!needSkip) {
+      return inputTsBlock;
+    }
+
+    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+      return inputTsBlock;
+    }
+
+    Column controlColumn = inputTsBlock.getColumn(eventWindowParameter.getControlColumnIndex());
+    TimeColumn timeColumn = inputTsBlock.getTimeColumn();
+    int i = 0, size = inputTsBlock.getPositionCount();
+    boolean previousEventValue = ((EqualEventBooleanWindow) eventWindow).getPreviousEventValue();
+    for (; i < size; i++) {
+      if (!controlColumn.isNull(i) && controlColumn.getBoolean(i) != previousEventValue) {
+        break;
+      }
+      // judge whether we need update endTime
+      long currentTime = timeColumn.getLong(i);
+      if (eventWindow.getEndTime() < currentTime) {
+        eventWindow.setEndTime(currentTime);
+      }
+    }
+    // we can create a new window beginning at index i of inputTsBlock
+    if (i < size) {
+      needSkip = false;
+    }
+    return inputTsBlock.subTsBlock(i);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventDoubleWindow.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventDoubleWindow.java
index 249cf458fd..aae3164dd2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventDoubleWindow.java
@@ -17,35 +17,21 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.aggregation;
+package org.apache.iotdb.db.mpp.execution.operator.window;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 
-public class MinTimeDescAccumulator extends MinTimeAccumulator {
+public class EqualEventDoubleWindow extends EventDoubleWindow {
 
-  @Override
-  public int addInput(Column[] column, IWindow curWindow) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (column[0].isNull(i)) {
-        continue;
-      }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint();
-      if (!column[2].isNull(i)) {
-        updateMinTime(column[1].getLong(i));
-      }
-    }
-    return curPositionCount;
+  public EqualEventDoubleWindow(EventWindowParameter eventWindowParameter) {
+    super(eventWindowParameter);
   }
 
   @Override
-  public boolean hasFinalResult() {
-    return false;
+  public boolean satisfy(Column column, int index) {
+    if (!initializedEventValue) {
+      return true;
+    }
+    return column.getDouble(index) == eventValue;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventDoubleWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventDoubleWindowManager.java
new file mode 100644
index 0000000000..c9b225f80e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventDoubleWindowManager.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public class EqualEventDoubleWindowManager extends EventDoubleWindowManager {
+
+  public EqualEventDoubleWindowManager(
+      EventWindowParameter eventWindowParameter, boolean ascending) {
+    super(eventWindowParameter, ascending);
+    eventWindow = new EqualEventDoubleWindow(eventWindowParameter);
+  }
+
+  @Override
+  public TsBlock skipPointsOutOfCurWindow(TsBlock inputTsBlock) {
+    if (!needSkip) {
+      return inputTsBlock;
+    }
+
+    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+      return inputTsBlock;
+    }
+
+    Column controlColumn = inputTsBlock.getColumn(eventWindowParameter.getControlColumnIndex());
+    TimeColumn timeColumn = inputTsBlock.getTimeColumn();
+    int i = 0, size = inputTsBlock.getPositionCount();
+    double previousEventValue = ((EqualEventDoubleWindow) eventWindow).getPreviousEventValue();
+    for (; i < size; i++) {
+      if (!controlColumn.isNull(i) && controlColumn.getDouble(i) != previousEventValue) {
+        break;
+      }
+      // judge whether we need update endTime
+      long currentTime = timeColumn.getLong(i);
+      if (eventWindow.getEndTime() < currentTime) {
+        eventWindow.setEndTime(currentTime);
+      }
+    }
+    // we can create a new window beginning at index i of inputTsBlock
+    if (i < size) {
+      needSkip = false;
+    }
+    return inputTsBlock.subTsBlock(i);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventFloatWindow.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventFloatWindow.java
index 249cf458fd..9fb5b48150 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventFloatWindow.java
@@ -17,35 +17,21 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.aggregation;
+package org.apache.iotdb.db.mpp.execution.operator.window;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 
-public class MinTimeDescAccumulator extends MinTimeAccumulator {
+public class EqualEventFloatWindow extends EventFloatWindow {
 
-  @Override
-  public int addInput(Column[] column, IWindow curWindow) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (column[0].isNull(i)) {
-        continue;
-      }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint();
-      if (!column[2].isNull(i)) {
-        updateMinTime(column[1].getLong(i));
-      }
-    }
-    return curPositionCount;
+  public EqualEventFloatWindow(EventWindowParameter eventWindowParameter) {
+    super(eventWindowParameter);
   }
 
   @Override
-  public boolean hasFinalResult() {
-    return false;
+  public boolean satisfy(Column column, int index) {
+    if (!initializedEventValue) {
+      return true;
+    }
+    return column.getFloat(index) == eventValue;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventFloatWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventFloatWindowManager.java
new file mode 100644
index 0000000000..1d5840c1d5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventFloatWindowManager.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public class EqualEventFloatWindowManager extends EventFloatWindowManager {
+
+  public EqualEventFloatWindowManager(
+      EventWindowParameter eventWindowParameter, boolean ascending) {
+    super(eventWindowParameter, ascending);
+    eventWindow = new EqualEventFloatWindow(eventWindowParameter);
+  }
+
+  @Override
+  public TsBlock skipPointsOutOfCurWindow(TsBlock inputTsBlock) {
+    if (!needSkip) {
+      return inputTsBlock;
+    }
+
+    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+      return inputTsBlock;
+    }
+
+    Column controlColumn = inputTsBlock.getColumn(eventWindowParameter.getControlColumnIndex());
+    TimeColumn timeColumn = inputTsBlock.getTimeColumn();
+    int i = 0, size = inputTsBlock.getPositionCount();
+    float previousEventValue = ((EqualEventFloatWindow) eventWindow).getPreviousEventValue();
+    for (; i < size; i++) {
+      if (!controlColumn.isNull(i) && controlColumn.getFloat(i) != previousEventValue) {
+        break;
+      }
+      // judge whether we need update endTime
+      long currentTime = timeColumn.getLong(i);
+      if (eventWindow.getEndTime() < currentTime) {
+        eventWindow.setEndTime(currentTime);
+      }
+    }
+    // we can create a new window beginning at index i of inputTsBlock
+    if (i < size) {
+      needSkip = false;
+    }
+    return inputTsBlock.subTsBlock(i);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventIntWindow.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventIntWindow.java
index 249cf458fd..5b8e8230c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventIntWindow.java
@@ -17,35 +17,21 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.aggregation;
+package org.apache.iotdb.db.mpp.execution.operator.window;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 
-public class MinTimeDescAccumulator extends MinTimeAccumulator {
+public class EqualEventIntWindow extends EventIntWindow {
 
-  @Override
-  public int addInput(Column[] column, IWindow curWindow) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (column[0].isNull(i)) {
-        continue;
-      }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint();
-      if (!column[2].isNull(i)) {
-        updateMinTime(column[1].getLong(i));
-      }
-    }
-    return curPositionCount;
+  public EqualEventIntWindow(EventWindowParameter eventWindowParameter) {
+    super(eventWindowParameter);
   }
 
   @Override
-  public boolean hasFinalResult() {
-    return false;
+  public boolean satisfy(Column column, int index) {
+    if (!initializedEventValue) {
+      return true;
+    }
+    return column.getInt(index) == eventValue;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventIntWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventIntWindowManager.java
new file mode 100644
index 0000000000..ef866dad43
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventIntWindowManager.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public class EqualEventIntWindowManager extends EventIntWindowManager {
+
+  public EqualEventIntWindowManager(EventWindowParameter eventWindowParameter, boolean ascending) {
+    super(eventWindowParameter, ascending);
+    eventWindow = new EqualEventIntWindow(eventWindowParameter);
+  }
+
+  @Override
+  public TsBlock skipPointsOutOfCurWindow(TsBlock inputTsBlock) {
+    if (!needSkip) {
+      return inputTsBlock;
+    }
+
+    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+      return inputTsBlock;
+    }
+
+    Column controlColumn = inputTsBlock.getColumn(eventWindowParameter.getControlColumnIndex());
+    TimeColumn timeColumn = inputTsBlock.getTimeColumn();
+    int i = 0, size = inputTsBlock.getPositionCount();
+    int previousEventValue = ((EqualEventIntWindow) eventWindow).getPreviousEventValue();
+    for (; i < size; i++) {
+      if (!controlColumn.isNull(i) && controlColumn.getInt(i) != previousEventValue) {
+        break;
+      }
+      // judge whether we need update endTime
+      long currentTime = timeColumn.getLong(i);
+      if (eventWindow.getEndTime() < currentTime) {
+        eventWindow.setEndTime(currentTime);
+      }
+    }
+    // we can create a new window beginning at index i of inputTsBlock
+    if (i < size) {
+      needSkip = false;
+    }
+    return inputTsBlock.subTsBlock(i);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventLongWindow.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventLongWindow.java
index 249cf458fd..d69d3cf7a2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventLongWindow.java
@@ -17,35 +17,21 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.aggregation;
+package org.apache.iotdb.db.mpp.execution.operator.window;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 
-public class MinTimeDescAccumulator extends MinTimeAccumulator {
+public class EqualEventLongWindow extends EventLongWindow {
 
-  @Override
-  public int addInput(Column[] column, IWindow curWindow) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (column[0].isNull(i)) {
-        continue;
-      }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint();
-      if (!column[2].isNull(i)) {
-        updateMinTime(column[1].getLong(i));
-      }
-    }
-    return curPositionCount;
+  public EqualEventLongWindow(EventWindowParameter eventWindowParameter) {
+    super(eventWindowParameter);
   }
 
   @Override
-  public boolean hasFinalResult() {
-    return false;
+  public boolean satisfy(Column column, int index) {
+    if (!initializedEventValue) {
+      return true;
+    }
+    return column.getLong(index) == eventValue;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventLongWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventLongWindowManager.java
new file mode 100644
index 0000000000..6e330b2a5f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventLongWindowManager.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public class EqualEventLongWindowManager extends EventLongWindowManager {
+
+  public EqualEventLongWindowManager(EventWindowParameter eventWindowParameter, boolean ascending) {
+    super(eventWindowParameter, ascending);
+    eventWindow = new EqualEventLongWindow(eventWindowParameter);
+  }
+
+  @Override
+  public TsBlock skipPointsOutOfCurWindow(TsBlock inputTsBlock) {
+    if (!needSkip) {
+      return inputTsBlock;
+    }
+
+    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+      return inputTsBlock;
+    }
+
+    Column controlColumn = inputTsBlock.getColumn(eventWindowParameter.getControlColumnIndex());
+    TimeColumn timeColumn = inputTsBlock.getTimeColumn();
+    int i = 0, size = inputTsBlock.getPositionCount();
+    long previousEventValue = ((EqualEventLongWindow) eventWindow).getPreviousEventValue();
+    for (; i < size; i++) {
+      if (!controlColumn.isNull(i) && controlColumn.getLong(i) != previousEventValue) {
+        break;
+      }
+      // judge whether we need update endTime
+      long currentTime = timeColumn.getLong(i);
+      if (eventWindow.getEndTime() < currentTime) {
+        eventWindow.setEndTime(currentTime);
+      }
+    }
+    // we can create a new window beginning at index i of inputTsBlock
+    if (i < size) {
+      needSkip = false;
+    }
+    return inputTsBlock.subTsBlock(i);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventTextWindow.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventTextWindow.java
index 249cf458fd..316702bb92 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventTextWindow.java
@@ -17,35 +17,21 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.aggregation;
+package org.apache.iotdb.db.mpp.execution.operator.window;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 
-public class MinTimeDescAccumulator extends MinTimeAccumulator {
+public class EqualEventTextWindow extends EventTextWindow {
 
-  @Override
-  public int addInput(Column[] column, IWindow curWindow) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (column[0].isNull(i)) {
-        continue;
-      }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint();
-      if (!column[2].isNull(i)) {
-        updateMinTime(column[1].getLong(i));
-      }
-    }
-    return curPositionCount;
+  public EqualEventTextWindow(EventWindowParameter eventWindowParameter) {
+    super(eventWindowParameter);
   }
 
   @Override
-  public boolean hasFinalResult() {
-    return false;
+  public boolean satisfy(Column column, int index) {
+    if (!initializedEventValue) {
+      return true;
+    }
+    return column.getBinary(index).equals(eventValue);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventTextWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventTextWindowManager.java
new file mode 100644
index 0000000000..8882d15b6a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EqualEventTextWindowManager.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+public class EqualEventTextWindowManager extends EventTextWindowManager {
+
+  public EqualEventTextWindowManager(EventWindowParameter eventWindowParameter, boolean ascending) {
+    super(eventWindowParameter, ascending);
+    eventWindow = new EqualEventTextWindow(eventWindowParameter);
+  }
+
+  @Override
+  public TsBlock skipPointsOutOfCurWindow(TsBlock inputTsBlock) {
+    if (!needSkip) {
+      return inputTsBlock;
+    }
+
+    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+      return inputTsBlock;
+    }
+
+    Column controlColumn = inputTsBlock.getColumn(eventWindowParameter.getControlColumnIndex());
+    TimeColumn timeColumn = inputTsBlock.getTimeColumn();
+    int i = 0, size = inputTsBlock.getPositionCount();
+    Binary previousEventValue = ((EqualEventTextWindow) eventWindow).getPreviousEventValue();
+    for (; i < size; i++) {
+      if (!controlColumn.isNull(i) && !controlColumn.getBinary(i).equals(previousEventValue)) {
+        break;
+      }
+      // judge whether we need update endTime
+      long currentTime = timeColumn.getLong(i);
+      if (eventWindow.getEndTime() < currentTime) {
+        eventWindow.setEndTime(currentTime);
+      }
+    }
+    // we can create a new window beginning at index i of inputTsBlock
+    if (i < size) {
+      needSkip = false;
+    }
+    return inputTsBlock.subTsBlock(i);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventBooleanWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventBooleanWindow.java
new file mode 100644
index 0000000000..9f3d8e7d22
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventBooleanWindow.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public abstract class EventBooleanWindow extends EventWindow {
+
+  protected boolean eventValue;
+
+  private boolean previousEventValue;
+
+  public EventBooleanWindow(EventWindowParameter eventWindowParameter) {
+    super(eventWindowParameter);
+  }
+
+  @Override
+  public void updatePreviousEventValue() {
+    previousEventValue = eventValue;
+  }
+
+  @Override
+  public void mergeOnePoint(Column[] controlTimeAndValueColumn, int index) {
+    long currentTime = controlTimeAndValueColumn[1].getLong(index);
+    // judge whether we need update startTime
+    if (startTime > currentTime) {
+      startTime = currentTime;
+    }
+    // judge whether we need update endTime
+    if (endTime < currentTime) {
+      endTime = currentTime;
+    }
+    // judge whether we need initialize eventValue
+    if (!initializedEventValue) {
+      startTime = currentTime;
+      endTime = currentTime;
+      eventValue = controlTimeAndValueColumn[0].getBoolean(index);
+      initializedEventValue = true;
+    }
+  }
+
+  public boolean getEventValue() {
+    return eventValue;
+  }
+
+  public boolean getPreviousEventValue() {
+    return previousEventValue;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventBooleanWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventBooleanWindowManager.java
new file mode 100644
index 0000000000..c291a3484e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventBooleanWindowManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+import java.util.List;
+
+public abstract class EventBooleanWindowManager extends EventWindowManager {
+
+  public EventBooleanWindowManager(EventWindowParameter eventWindowParameter, boolean ascending) {
+    super(eventWindowParameter, ascending);
+  }
+
+  @Override
+  public void appendAggregationResult(
+      TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
+    // Append aggregation results to valueColumnBuilders.
+    ColumnBuilder[] columnBuilders =
+        appendOriginAggregationResult(resultTsBlockBuilder, aggregators);
+    // Judge whether we need output event column.
+    if (eventWindowParameter.isNeedOutputEvent()) {
+      columnBuilders[columnBuilders.length - 1].writeBoolean(
+          ((EventBooleanWindow) eventWindow).getEventValue());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventDoubleWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventDoubleWindow.java
new file mode 100644
index 0000000000..611d040aff
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventDoubleWindow.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public abstract class EventDoubleWindow extends EventWindow {
+
+  protected double eventValue;
+
+  private double previousEventValue;
+
+  public EventDoubleWindow(EventWindowParameter eventWindowParameter) {
+    super(eventWindowParameter);
+  }
+
+  @Override
+  public void updatePreviousEventValue() {
+    previousEventValue = eventValue;
+  }
+
+  @Override
+  public void mergeOnePoint(Column[] controlTimeAndValueColumn, int index) {
+    long currentTime = controlTimeAndValueColumn[1].getLong(index);
+    // judge whether we need update startTime
+    if (startTime > currentTime) {
+      startTime = currentTime;
+    }
+    // judge whether we need update endTime
+    if (endTime < currentTime) {
+      endTime = currentTime;
+    }
+    // judge whether we need initialize eventValue
+    if (!initializedEventValue) {
+      startTime = currentTime;
+      endTime = currentTime;
+      eventValue = controlTimeAndValueColumn[0].getDouble(index);
+      initializedEventValue = true;
+    }
+  }
+
+  public double getEventValue() {
+    return eventValue;
+  }
+
+  public double getPreviousEventValue() {
+    return previousEventValue;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventDoubleWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventDoubleWindowManager.java
new file mode 100644
index 0000000000..227c9312eb
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventDoubleWindowManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+import java.util.List;
+
+public abstract class EventDoubleWindowManager extends EventWindowManager {
+
+  public EventDoubleWindowManager(EventWindowParameter eventWindowParameter, boolean ascending) {
+    super(eventWindowParameter, ascending);
+  }
+
+  @Override
+  public void appendAggregationResult(
+      TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
+    // Append aggregation results to valueColumnBuilders.
+    ColumnBuilder[] columnBuilders =
+        appendOriginAggregationResult(resultTsBlockBuilder, aggregators);
+    // Judge whether we need output event column.
+    if (eventWindowParameter.isNeedOutputEvent()) {
+      columnBuilders[columnBuilders.length - 1].writeDouble(
+          ((EventDoubleWindow) eventWindow).getEventValue());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventFloatWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventFloatWindow.java
new file mode 100644
index 0000000000..666a54ba0c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventFloatWindow.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public abstract class EventFloatWindow extends EventWindow {
+
+  protected float eventValue;
+
+  private float previousEventValue;
+
+  public EventFloatWindow(EventWindowParameter eventWindowParameter) {
+    super(eventWindowParameter);
+  }
+
+  @Override
+  public void updatePreviousEventValue() {
+    previousEventValue = eventValue;
+  }
+
+  @Override
+  public void mergeOnePoint(Column[] controlTimeAndValueColumn, int index) {
+    long currentTime = controlTimeAndValueColumn[1].getLong(index);
+    // judge whether we need update startTime
+    if (startTime > currentTime) {
+      startTime = currentTime;
+    }
+    // judge whether we need update endTime
+    if (endTime < currentTime) {
+      endTime = currentTime;
+    }
+    // judge whether we need initialize eventValue
+    if (!initializedEventValue) {
+      startTime = currentTime;
+      endTime = currentTime;
+      eventValue = controlTimeAndValueColumn[0].getFloat(index);
+      initializedEventValue = true;
+    }
+  }
+
+  public float getEventValue() {
+    return eventValue;
+  }
+
+  public float getPreviousEventValue() {
+    return previousEventValue;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventFloatWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventFloatWindowManager.java
new file mode 100644
index 0000000000..d4904760c1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventFloatWindowManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+import java.util.List;
+
+public abstract class EventFloatWindowManager extends EventWindowManager {
+
+  public EventFloatWindowManager(EventWindowParameter eventWindowParameter, boolean ascending) {
+    super(eventWindowParameter, ascending);
+  }
+
+  @Override
+  public void appendAggregationResult(
+      TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
+    // Append aggregation results to valueColumnBuilders.
+    ColumnBuilder[] columnBuilders =
+        appendOriginAggregationResult(resultTsBlockBuilder, aggregators);
+    // Judge whether we need output event column.
+    if (eventWindowParameter.isNeedOutputEvent()) {
+      columnBuilders[columnBuilders.length - 1].writeFloat(
+          ((EventFloatWindow) eventWindow).getEventValue());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventIntWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventIntWindow.java
new file mode 100644
index 0000000000..e73e186e70
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventIntWindow.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public abstract class EventIntWindow extends EventWindow {
+
+  protected int eventValue;
+
+  private int previousEventValue;
+
+  public EventIntWindow(EventWindowParameter eventWindowParameter) {
+    super(eventWindowParameter);
+  }
+
+  @Override
+  public void updatePreviousEventValue() {
+    previousEventValue = eventValue;
+  }
+
+  @Override
+  public void mergeOnePoint(Column[] controlTimeAndValueColumn, int index) {
+    long currentTime = controlTimeAndValueColumn[1].getLong(index);
+    // judge whether we need update startTime
+    if (startTime > currentTime) {
+      startTime = currentTime;
+    }
+    // judge whether we need update endTime
+    if (endTime < currentTime) {
+      endTime = currentTime;
+    }
+    // judge whether we need initialize eventValue
+    if (!initializedEventValue) {
+      startTime = currentTime;
+      endTime = currentTime;
+      eventValue = controlTimeAndValueColumn[0].getInt(index);
+      initializedEventValue = true;
+    }
+  }
+
+  public int getEventValue() {
+    return eventValue;
+  }
+
+  public int getPreviousEventValue() {
+    return previousEventValue;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventIntWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventIntWindowManager.java
new file mode 100644
index 0000000000..a7233037f3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventIntWindowManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+import java.util.List;
+
+public abstract class EventIntWindowManager extends EventWindowManager {
+
+  public EventIntWindowManager(EventWindowParameter eventWindowParameter, boolean ascending) {
+    super(eventWindowParameter, ascending);
+  }
+
+  @Override
+  public void appendAggregationResult(
+      TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
+    // Append aggregation results to valueColumnBuilders.
+    ColumnBuilder[] columnBuilders =
+        appendOriginAggregationResult(resultTsBlockBuilder, aggregators);
+    // Judge whether we need output event column.
+    if (eventWindowParameter.isNeedOutputEvent()) {
+      columnBuilders[columnBuilders.length - 1].writeInt(
+          ((EventIntWindow) eventWindow).getEventValue());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventLongWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventLongWindow.java
new file mode 100644
index 0000000000..e9b8d8425d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventLongWindow.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public abstract class EventLongWindow extends EventWindow {
+
+  protected long eventValue;
+
+  private long previousEventValue;
+
+  public EventLongWindow(EventWindowParameter eventWindowParameter) {
+    super(eventWindowParameter);
+  }
+
+  @Override
+  public void updatePreviousEventValue() {
+    previousEventValue = eventValue;
+  }
+
+  @Override
+  public void mergeOnePoint(Column[] controlTimeAndValueColumn, int index) {
+    long currentTime = controlTimeAndValueColumn[1].getLong(index);
+    // judge whether we need update startTime
+    if (startTime > currentTime) {
+      startTime = currentTime;
+    }
+    // judge whether we need update endTime
+    if (endTime < currentTime) {
+      endTime = currentTime;
+    }
+    // judge whether we need initialize eventValue
+    if (!initializedEventValue) {
+      startTime = currentTime;
+      endTime = currentTime;
+      eventValue = controlTimeAndValueColumn[0].getLong(index);
+      initializedEventValue = true;
+    }
+  }
+
+  public long getEventValue() {
+    return eventValue;
+  }
+
+  public long getPreviousEventValue() {
+    return previousEventValue;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventLongWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventLongWindowManager.java
new file mode 100644
index 0000000000..2f02142e10
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventLongWindowManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+import java.util.List;
+
+public abstract class EventLongWindowManager extends EventWindowManager {
+
+  public EventLongWindowManager(EventWindowParameter eventWindowParameter, boolean ascending) {
+    super(eventWindowParameter, ascending);
+  }
+
+  @Override
+  public void appendAggregationResult(
+      TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
+    // Append aggregation results to valueColumnBuilders.
+    ColumnBuilder[] columnBuilders =
+        appendOriginAggregationResult(resultTsBlockBuilder, aggregators);
+    // Judge whether we need output event column.
+    if (eventWindowParameter.isNeedOutputEvent()) {
+      columnBuilders[columnBuilders.length - 1].writeLong(
+          ((EventLongWindow) eventWindow).getEventValue());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventTextWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventTextWindow.java
new file mode 100644
index 0000000000..219ec490f3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventTextWindow.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+public abstract class EventTextWindow extends EventWindow {
+
+  protected Binary eventValue;
+
+  private Binary previousEventValue;
+
+  public EventTextWindow(EventWindowParameter eventWindowParameter) {
+    super(eventWindowParameter);
+  }
+
+  @Override
+  public void updatePreviousEventValue() {
+    previousEventValue = eventValue;
+  }
+
+  @Override
+  public void mergeOnePoint(Column[] controlTimeAndValueColumn, int index) {
+    long currentTime = controlTimeAndValueColumn[1].getLong(index);
+    // judge whether we need update startTime
+    if (startTime > currentTime) {
+      startTime = currentTime;
+    }
+    // judge whether we need update endTime
+    if (endTime < currentTime) {
+      endTime = currentTime;
+    }
+    // judge whether we need initialize eventValue
+    if (!initializedEventValue) {
+      startTime = currentTime;
+      endTime = currentTime;
+      eventValue = controlTimeAndValueColumn[0].getBinary(index);
+      initializedEventValue = true;
+    }
+  }
+
+  public Binary getEventValue() {
+    return eventValue;
+  }
+
+  public Binary getPreviousEventValue() {
+    return previousEventValue;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventTextWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventTextWindowManager.java
new file mode 100644
index 0000000000..04b2ed7912
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventTextWindowManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+import java.util.List;
+
+public abstract class EventTextWindowManager extends EventWindowManager {
+
+  public EventTextWindowManager(EventWindowParameter eventWindowParameter, boolean ascending) {
+    super(eventWindowParameter, ascending);
+  }
+
+  @Override
+  public void appendAggregationResult(
+      TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
+    // Append aggregation results to valueColumnBuilders.
+    ColumnBuilder[] columnBuilders =
+        appendOriginAggregationResult(resultTsBlockBuilder, aggregators);
+    // Judge whether we need output event column.
+    if (eventWindowParameter.isNeedOutputEvent()) {
+      columnBuilders[columnBuilders.length - 1].writeBinary(
+          ((EventTextWindow) eventWindow).getEventValue());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindow.java
similarity index 54%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindow.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindow.java
index ee11bd0eaa..8724560378 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindow.java
@@ -20,65 +20,58 @@
 package org.apache.iotdb.db.mpp.execution.operator.window;
 
 import org.apache.iotdb.db.mpp.aggregation.Accumulator;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
 
-public class TimeWindow implements IWindow {
+public abstract class EventWindow implements IWindow {
 
-  private TimeRange curTimeRange;
+  protected EventWindowParameter eventWindowParameter;
 
-  public TimeWindow() {}
+  protected long startTime;
 
-  public TimeWindow(TimeRange curTimeRange) {
-    this.curTimeRange = curTimeRange;
-  }
-
-  public TimeRange getCurTimeRange() {
-    return curTimeRange;
-  }
+  protected long endTime;
 
-  public long getCurMinTime() {
-    return curTimeRange.getMin();
-  }
+  protected boolean initializedEventValue;
 
-  public long getCurMaxTime() {
-    return curTimeRange.getMax();
+  protected EventWindow(EventWindowParameter eventWindowParameter) {
+    this.eventWindowParameter = eventWindowParameter;
   }
 
   @Override
   public Column getControlColumn(TsBlock tsBlock) {
-    return tsBlock.getTimeColumn();
+    return tsBlock.getColumn(eventWindowParameter.getControlColumnIndex());
   }
 
   @Override
-  public boolean satisfy(Column column, int index) {
-    long curTime = column.getLong(index);
-    return curTime <= getCurMaxTime() && curTime >= getCurMinTime();
+  public boolean hasFinalResult(Accumulator accumulator) {
+    return accumulator.hasFinalResult();
   }
 
+  // TODO
   @Override
-  public void mergeOnePoint() {
-    // do nothing
+  public boolean contains(Column column) {
+    return false;
   }
 
-  @Override
-  public boolean hasFinalResult(Accumulator accumulator) {
-    return accumulator.hasFinalResult();
+  public abstract void updatePreviousEventValue();
+
+  public long getStartTime() {
+    return startTime;
   }
 
-  @Override
-  public boolean contains(Column column) {
-    TimeColumn timeColumn = (TimeColumn) column;
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
 
-    long minTime = Math.min(timeColumn.getStartTime(), timeColumn.getEndTime());
-    long maxTime = Math.max(timeColumn.getStartTime(), timeColumn.getEndTime());
+  public long getEndTime() {
+    return endTime;
+  }
 
-    return curTimeRange.contains(minTime, maxTime);
+  public void setEndTime(long endTime) {
+    this.endTime = endTime;
   }
 
-  public void update(TimeRange curTimeRange) {
-    this.curTimeRange = curTimeRange;
+  public void setInitializedEventValue(boolean initializedEventValue) {
+    this.initializedEventValue = initializedEventValue;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindowManager.java
new file mode 100644
index 0000000000..9f6f0db9b1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindowManager.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import java.util.List;
+
+public abstract class EventWindowManager implements IWindowManager {
+
+  protected boolean initialized;
+
+  protected boolean ascending;
+
+  protected boolean needSkip;
+
+  protected EventWindowParameter eventWindowParameter;
+
+  protected EventWindow eventWindow;
+
+  protected EventWindowManager(EventWindowParameter eventWindowParameter, boolean ascending) {
+    this.eventWindowParameter = eventWindowParameter;
+    this.initialized = false;
+    this.ascending = ascending;
+    // At beginning, we do not need to skip inputTsBlock
+    this.needSkip = false;
+  }
+
+  @Override
+  public boolean isCurWindowInit() {
+    return this.initialized;
+  }
+
+  @Override
+  public void initCurWindow(TsBlock tsBlock) {
+    this.initialized = true;
+    this.eventWindow.setInitializedEventValue(false);
+  }
+
+  @Override
+  public boolean hasNext(boolean hasMoreData) {
+    return hasMoreData;
+  }
+
+  @Override
+  public void next() {
+    // When we go into next window, we should pay attention to previous window whether all points
+    // belong to previous window have been consumed. If not, we need skip these points.
+    this.needSkip = true;
+    this.initialized = false;
+    this.eventWindow.updatePreviousEventValue();
+  }
+
+  @Override
+  public IWindow getCurWindow() {
+    return eventWindow;
+  }
+
+  @Override
+  public boolean satisfiedCurWindow(TsBlock inputTsBlock) {
+    return true;
+  }
+
+  @Override
+  public boolean isTsBlockOutOfBound(TsBlock inputTsBlock) {
+    return false;
+  }
+
+  @Override
+  public TsBlockBuilder createResultTsBlockBuilder(List<Aggregator> aggregators) {
+    List<TSDataType> dataTypes = getResultDataTypes(aggregators);
+    // Judge whether we need output endTime column.
+    if (eventWindowParameter.isNeedOutputEndTime()) {
+      dataTypes.add(0, TSDataType.INT64);
+    }
+    // Judge whether we need output event column.
+    if (eventWindowParameter.isNeedOutputEvent()) {
+      dataTypes.add(eventWindowParameter.getDataType());
+    }
+    return new TsBlockBuilder(dataTypes);
+  }
+
+  protected ColumnBuilder[] appendOriginAggregationResult(
+      TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
+    // Use the start time of eventWindow as default output time.
+    TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
+    timeColumnBuilder.writeLong(eventWindow.getStartTime());
+
+    ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
+    int columnIndex = 0;
+    if (eventWindowParameter.isNeedOutputEndTime()) {
+      columnBuilders[0].writeLong(eventWindow.getEndTime());
+      columnIndex = 1;
+    }
+    for (Aggregator aggregator : aggregators) {
+      ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length];
+      columnBuilder[0] = columnBuilders[columnIndex++];
+      if (columnBuilder.length > 1) {
+        columnBuilder[1] = columnBuilders[columnIndex++];
+      }
+      aggregator.outputResult(columnBuilder);
+    }
+    resultTsBlockBuilder.declarePosition();
+    return columnBuilders;
+  }
+
+  @Override
+  public boolean notInitedLastTimeWindow() {
+    return false;
+  }
+
+  @Override
+  public boolean needSkipInAdvance() {
+    return eventWindowParameter.isNeedOutputEndTime();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindowParameter.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindowParameter.java
new file mode 100644
index 0000000000..e4ba73679d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindowParameter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class EventWindowParameter extends WindowParameter {
+
+  private final boolean needOutputEvent;
+  private final double delta;
+  private final TSDataType dataType;
+
+  private final int controlColumnIndex;
+
+  public EventWindowParameter(
+      TSDataType dataType,
+      int controlColumnIndex,
+      boolean needOutputEndTime,
+      boolean needOutputEvent,
+      double delta) {
+    super(needOutputEndTime);
+    this.controlColumnIndex = controlColumnIndex;
+    this.dataType = dataType;
+    this.needOutputEvent = needOutputEvent;
+    this.delta = delta;
+    this.windowType = WindowType.EVENT_WINDOW;
+  }
+
+  public boolean isNeedOutputEvent() {
+    return needOutputEvent;
+  }
+
+  public double getDelta() {
+    return delta;
+  }
+
+  public TSDataType getDataType() {
+    return dataType;
+  }
+
+  public int getControlColumnIndex() {
+    return controlColumnIndex;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindow.java
index da3d0bb324..f6003c0120 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindow.java
@@ -46,7 +46,7 @@ public interface IWindow {
    * When we merge a point into window, at this time, we can use this method to change the status in
    * this window
    */
-  void mergeOnePoint();
+  void mergeOnePoint(Column[] controlTimeAndValueColumn, int index);
 
   /**
    * Used to customize whether the window has final aggregation result
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindowManager.java
index faf4c4c171..ee54df4d61 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindowManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindowManager.java
@@ -19,7 +19,14 @@
 
 package org.apache.iotdb.db.mpp.execution.operator.window;
 
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
 /**
  * Used to customize all the type of window managers, such as TimeWindowManager,
@@ -51,13 +58,6 @@ public interface IWindowManager {
   /** Used to mark the current window has got last point */
   void next();
 
-  /**
-   * Used to get the output time of current window
-   *
-   * @return the output time of current window
-   */
-  long currentOutputTime();
-
   /**
    * Used to get current window
    *
@@ -88,4 +88,58 @@ public interface IWindowManager {
    * @return whether there are extra points for the next window
    */
   boolean isTsBlockOutOfBound(TsBlock inputTsBlock);
+
+  /**
+   * According to the Aggregator list, we could obtain all the aggregation result column type list.
+   *
+   * @param aggregators
+   * @return Aggregation result column type list.
+   */
+  default List<TSDataType> getResultDataTypes(List<Aggregator> aggregators) {
+    List<TSDataType> dataTypes = new ArrayList<>();
+    for (Aggregator aggregator : aggregators) {
+      dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
+    }
+    return dataTypes;
+  }
+
+  /**
+   * Used to create the aggregation resultSet.
+   *
+   * <p>For the implementation, we should consider whether we need to add endTime column and event
+   * column in the resultSet besides the aggregation columns.
+   *
+   * @param aggregators
+   * @return TsBlockBuilder of resultSet
+   */
+  TsBlockBuilder createResultTsBlockBuilder(List<Aggregator> aggregators);
+
+  /**
+   * Used to append a row of aggregation result into the resultSet.
+   *
+   * <p>For the implementation, similar to the method createResultTsBlockBuilder, we should consider
+   * whether we need to add endTime column and event column in the resultSet besides the aggregation
+   * columns.
+   *
+   * @param resultTsBlockBuilder
+   * @param aggregators
+   */
+  void appendAggregationResult(TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators);
+
+  /**
+   * Especially for TimeWindow, if there are no points belong to last TimeWindow, the last
+   * TimeWindow will not initialize window and aggregators in the aggregation frame.
+   *
+   * @return whether the window is TimeWindow and the last TimeWindow has not been initialized
+   */
+  boolean notInitedLastTimeWindow();
+
+  /**
+   * When endTime is required in resultSet, operator should skip the points in last window directly
+   * instead of a default lazy way to get the endTime for constructing the result tsBlock.
+   *
+   * <p>For the windows like TimeWindow which has already cached endTime, this method always return
+   * false.
+   */
+  boolean needSkipInAdvance();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindow.java
index ee11bd0eaa..1927316a24 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindow.java
@@ -59,7 +59,7 @@ public class TimeWindow implements IWindow {
   }
 
   @Override
-  public void mergeOnePoint() {
+  public void mergeOnePoint(Column[] controlTimeAndValueColumn, int index) {
     // do nothing
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindowManager.java
index a2466554f0..4e2754439d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindowManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindowManager.java
@@ -19,26 +19,35 @@
 
 package org.apache.iotdb.db.mpp.execution.operator.window;
 
+import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.AggregationUtil;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockUtil;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import java.util.List;
 
 public class TimeWindowManager implements IWindowManager {
 
-  private TimeWindow curWindow;
+  private final TimeWindow curWindow;
+  private final boolean ascending;
+  private final ITimeRangeIterator timeRangeIterator;
+  private final boolean needOutputEndTime;
   private boolean initialized;
-
-  private boolean ascending;
-
-  private ITimeRangeIterator timeRangeIterator;
-
   private boolean needSkip;
+  private long startTime;
+  private long endTime;
 
-  public TimeWindowManager(ITimeRangeIterator timeRangeIterator) {
+  public TimeWindowManager(
+      ITimeRangeIterator timeRangeIterator, TimeWindowParameter timeWindowParameter) {
     this.timeRangeIterator = timeRangeIterator;
     this.initialized = false;
     this.curWindow = new TimeWindow(this.timeRangeIterator.nextTimeRange());
+    this.needOutputEndTime = timeWindowParameter.isNeedOutputEndTime();
     this.ascending = timeRangeIterator.isAscending();
     // At beginning, we do not need to skip inputTsBlock
     this.needSkip = false;
@@ -65,14 +74,11 @@ public class TimeWindowManager implements IWindowManager {
     // belong to previous window have been consumed. If not, we need skip these points.
     this.needSkip = true;
     this.initialized = false;
+    this.startTime = this.timeRangeIterator.currentOutputTime();
+    this.endTime = this.curWindow.getCurMaxTime();
     this.curWindow.update(this.timeRangeIterator.nextTimeRange());
   }
 
-  @Override
-  public long currentOutputTime() {
-    return timeRangeIterator.currentOutputTime();
-  }
-
   @Override
   public IWindow getCurWindow() {
     return curWindow;
@@ -130,4 +136,47 @@ public class TimeWindowManager implements IWindowManager {
             ? inputTsBlock.getEndTime() > this.curWindow.getCurMaxTime()
             : inputTsBlock.getEndTime() < this.curWindow.getCurMinTime());
   }
+
+  @Override
+  public TsBlockBuilder createResultTsBlockBuilder(List<Aggregator> aggregators) {
+    List<TSDataType> dataTypes = getResultDataTypes(aggregators);
+    // Judge whether we need output endTime column.
+    if (this.needOutputEndTime) {
+      dataTypes.add(0, TSDataType.INT64);
+    }
+    return new TsBlockBuilder(dataTypes);
+  }
+
+  @Override
+  public void appendAggregationResult(
+      TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
+    TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
+    // Use start time of current time range as time column
+    timeColumnBuilder.writeLong(startTime);
+    ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
+    int columnIndex = 0;
+    if (this.needOutputEndTime) {
+      columnBuilders[0].writeLong(endTime);
+      columnIndex = 1;
+    }
+    for (Aggregator aggregator : aggregators) {
+      ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length];
+      columnBuilder[0] = columnBuilders[columnIndex++];
+      if (columnBuilder.length > 1) {
+        columnBuilder[1] = columnBuilders[columnIndex++];
+      }
+      aggregator.outputResult(columnBuilder);
+    }
+    resultTsBlockBuilder.declarePosition();
+  }
+
+  @Override
+  public boolean notInitedLastTimeWindow() {
+    return !this.initialized;
+  }
+
+  @Override
+  public boolean needSkipInAdvance() {
+    return false;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindowParameter.java
similarity index 50%
copy from server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindowParameter.java
index 249cf458fd..d4cfdc0c96 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindowParameter.java
@@ -17,35 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.aggregation;
+package org.apache.iotdb.db.mpp.execution.operator.window;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
-import org.apache.iotdb.tsfile.read.common.block.column.Column;
+public class TimeWindowParameter extends WindowParameter {
 
-public class MinTimeDescAccumulator extends MinTimeAccumulator {
-
-  @Override
-  public int addInput(Column[] column, IWindow curWindow) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (column[0].isNull(i)) {
-        continue;
-      }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint();
-      if (!column[2].isNull(i)) {
-        updateMinTime(column[1].getLong(i));
-      }
-    }
-    return curPositionCount;
-  }
-
-  @Override
-  public boolean hasFinalResult() {
-    return false;
+  public TimeWindowParameter(boolean needOutputEndTime) {
+    super(needOutputEndTime);
+    this.windowType = WindowType.TIME_WINDOW;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventDoubleWindow.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventDoubleWindow.java
index 249cf458fd..583e40ee71 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventDoubleWindow.java
@@ -17,35 +17,21 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.aggregation;
+package org.apache.iotdb.db.mpp.execution.operator.window;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 
-public class MinTimeDescAccumulator extends MinTimeAccumulator {
+public class VariationEventDoubleWindow extends EventDoubleWindow {
 
-  @Override
-  public int addInput(Column[] column, IWindow curWindow) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (column[0].isNull(i)) {
-        continue;
-      }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint();
-      if (!column[2].isNull(i)) {
-        updateMinTime(column[1].getLong(i));
-      }
-    }
-    return curPositionCount;
+  public VariationEventDoubleWindow(EventWindowParameter eventWindowParameter) {
+    super(eventWindowParameter);
   }
 
   @Override
-  public boolean hasFinalResult() {
-    return false;
+  public boolean satisfy(Column column, int index) {
+    if (!initializedEventValue) {
+      return true;
+    }
+    return Math.abs(column.getDouble(index) - eventValue) <= eventWindowParameter.getDelta();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventDoubleWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventDoubleWindowManager.java
new file mode 100644
index 0000000000..c6093c5d01
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventDoubleWindowManager.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public class VariationEventDoubleWindowManager extends EventDoubleWindowManager {
+
+  public VariationEventDoubleWindowManager(
+      EventWindowParameter eventWindowParameter, boolean ascending) {
+    super(eventWindowParameter, ascending);
+    eventWindow = new VariationEventDoubleWindow(eventWindowParameter);
+  }
+
+  @Override
+  public TsBlock skipPointsOutOfCurWindow(TsBlock inputTsBlock) {
+    if (!needSkip) {
+      return inputTsBlock;
+    }
+
+    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+      return inputTsBlock;
+    }
+
+    Column controlColumn = inputTsBlock.getColumn(eventWindowParameter.getControlColumnIndex());
+    TimeColumn timeColumn = inputTsBlock.getTimeColumn();
+    int i = 0, size = inputTsBlock.getPositionCount();
+    double previousEventValue = ((VariationEventDoubleWindow) eventWindow).getPreviousEventValue();
+    for (; i < size; i++) {
+      if (!controlColumn.isNull(i)
+          && Math.abs(controlColumn.getDouble(i) - previousEventValue)
+              > eventWindowParameter.getDelta()) {
+        break;
+      }
+      // judge whether we need update endTime
+      long currentTime = timeColumn.getLong(i);
+      if (eventWindow.getEndTime() < currentTime) {
+        eventWindow.setEndTime(currentTime);
+      }
+    }
+    // we can create a new window beginning at index i of inputTsBlock
+    if (i < size) {
+      needSkip = false;
+    }
+    return inputTsBlock.subTsBlock(i);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventFloatWindow.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventFloatWindow.java
index 249cf458fd..65df9a500f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventFloatWindow.java
@@ -17,35 +17,21 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.aggregation;
+package org.apache.iotdb.db.mpp.execution.operator.window;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 
-public class MinTimeDescAccumulator extends MinTimeAccumulator {
+public class VariationEventFloatWindow extends EventFloatWindow {
 
-  @Override
-  public int addInput(Column[] column, IWindow curWindow) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (column[0].isNull(i)) {
-        continue;
-      }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint();
-      if (!column[2].isNull(i)) {
-        updateMinTime(column[1].getLong(i));
-      }
-    }
-    return curPositionCount;
+  public VariationEventFloatWindow(EventWindowParameter eventWindowParameter) {
+    super(eventWindowParameter);
   }
 
   @Override
-  public boolean hasFinalResult() {
-    return false;
+  public boolean satisfy(Column column, int index) {
+    if (!initializedEventValue) {
+      return true;
+    }
+    return Math.abs(column.getFloat(index) - eventValue) <= eventWindowParameter.getDelta();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventFloatWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventFloatWindowManager.java
new file mode 100644
index 0000000000..a01bb879ec
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventFloatWindowManager.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public class VariationEventFloatWindowManager extends EventFloatWindowManager {
+
+  public VariationEventFloatWindowManager(
+      EventWindowParameter eventWindowParameter, boolean ascending) {
+    super(eventWindowParameter, ascending);
+    eventWindow = new VariationEventFloatWindow(eventWindowParameter);
+  }
+
+  @Override
+  public TsBlock skipPointsOutOfCurWindow(TsBlock inputTsBlock) {
+    if (!needSkip) {
+      return inputTsBlock;
+    }
+
+    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+      return inputTsBlock;
+    }
+
+    Column controlColumn = inputTsBlock.getColumn(eventWindowParameter.getControlColumnIndex());
+    TimeColumn timeColumn = inputTsBlock.getTimeColumn();
+    int i = 0, size = inputTsBlock.getPositionCount();
+    float previousEventValue = ((VariationEventFloatWindow) eventWindow).getPreviousEventValue();
+    for (; i < size; i++) {
+      if (!controlColumn.isNull(i)
+          && Math.abs(controlColumn.getFloat(i) - previousEventValue)
+              > eventWindowParameter.getDelta()) {
+        break;
+      }
+      // judge whether we need update endTime
+      long currentTime = timeColumn.getLong(i);
+      if (eventWindow.getEndTime() < currentTime) {
+        eventWindow.setEndTime(currentTime);
+      }
+    }
+    // we can create a new window beginning at index i of inputTsBlock
+    if (i < size) {
+      needSkip = false;
+    }
+    return inputTsBlock.subTsBlock(i);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventIntWindow.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventIntWindow.java
index 249cf458fd..a2248c4af9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventIntWindow.java
@@ -17,35 +17,21 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.aggregation;
+package org.apache.iotdb.db.mpp.execution.operator.window;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 
-public class MinTimeDescAccumulator extends MinTimeAccumulator {
+public class VariationEventIntWindow extends EventIntWindow {
 
-  @Override
-  public int addInput(Column[] column, IWindow curWindow) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (column[0].isNull(i)) {
-        continue;
-      }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint();
-      if (!column[2].isNull(i)) {
-        updateMinTime(column[1].getLong(i));
-      }
-    }
-    return curPositionCount;
+  public VariationEventIntWindow(EventWindowParameter eventWindowParameter) {
+    super(eventWindowParameter);
   }
 
   @Override
-  public boolean hasFinalResult() {
-    return false;
+  public boolean satisfy(Column column, int index) {
+    if (!initializedEventValue) {
+      return true;
+    }
+    return Math.abs(column.getInt(index) - eventValue) <= eventWindowParameter.getDelta();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventIntWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventIntWindowManager.java
new file mode 100644
index 0000000000..1698cd2557
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventIntWindowManager.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public class VariationEventIntWindowManager extends EventIntWindowManager {
+
+  public VariationEventIntWindowManager(
+      EventWindowParameter eventWindowParameter, boolean ascending) {
+    super(eventWindowParameter, ascending);
+    eventWindow = new VariationEventIntWindow(eventWindowParameter);
+  }
+
+  @Override
+  public TsBlock skipPointsOutOfCurWindow(TsBlock inputTsBlock) {
+    if (!needSkip) {
+      return inputTsBlock;
+    }
+
+    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+      return inputTsBlock;
+    }
+
+    Column controlColumn = inputTsBlock.getColumn(eventWindowParameter.getControlColumnIndex());
+    TimeColumn timeColumn = inputTsBlock.getTimeColumn();
+    int i = 0, size = inputTsBlock.getPositionCount();
+    int previousEventValue = ((VariationEventIntWindow) eventWindow).getPreviousEventValue();
+    for (; i < size; i++) {
+      if (!controlColumn.isNull(i)
+          && Math.abs(controlColumn.getInt(i) - previousEventValue)
+              > eventWindowParameter.getDelta()) {
+        break;
+      }
+      // judge whether we need update endTime
+      long currentTime = timeColumn.getLong(i);
+      if (eventWindow.getEndTime() < currentTime) {
+        eventWindow.setEndTime(currentTime);
+      }
+    }
+    // we can create a new window beginning at index i of inputTsBlock
+    if (i < size) {
+      needSkip = false;
+    }
+    return inputTsBlock.subTsBlock(i);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventLongWindow.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventLongWindow.java
index 249cf458fd..192eab4f8c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventLongWindow.java
@@ -17,35 +17,21 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.aggregation;
+package org.apache.iotdb.db.mpp.execution.operator.window;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 
-public class MinTimeDescAccumulator extends MinTimeAccumulator {
+public class VariationEventLongWindow extends EventLongWindow {
 
-  @Override
-  public int addInput(Column[] column, IWindow curWindow) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (column[0].isNull(i)) {
-        continue;
-      }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint();
-      if (!column[2].isNull(i)) {
-        updateMinTime(column[1].getLong(i));
-      }
-    }
-    return curPositionCount;
+  public VariationEventLongWindow(EventWindowParameter eventWindowParameter) {
+    super(eventWindowParameter);
   }
 
   @Override
-  public boolean hasFinalResult() {
-    return false;
+  public boolean satisfy(Column column, int index) {
+    if (!initializedEventValue) {
+      return true;
+    }
+    return Math.abs(column.getLong(index) - eventValue) <= eventWindowParameter.getDelta();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventLongWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventLongWindowManager.java
new file mode 100644
index 0000000000..923e526047
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationEventLongWindowManager.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public class VariationEventLongWindowManager extends EventLongWindowManager {
+
+  public VariationEventLongWindowManager(
+      EventWindowParameter eventWindowParameter, boolean ascending) {
+    super(eventWindowParameter, ascending);
+    eventWindow = new VariationEventLongWindow(eventWindowParameter);
+  }
+
+  @Override
+  public TsBlock skipPointsOutOfCurWindow(TsBlock inputTsBlock) {
+    if (!needSkip) {
+      return inputTsBlock;
+    }
+
+    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+      return inputTsBlock;
+    }
+
+    Column controlColumn = inputTsBlock.getColumn(eventWindowParameter.getControlColumnIndex());
+    TimeColumn timeColumn = inputTsBlock.getTimeColumn();
+    int i = 0, size = inputTsBlock.getPositionCount();
+    long previousEventValue = ((VariationEventLongWindow) eventWindow).getPreviousEventValue();
+    for (; i < size; i++) {
+      if (!controlColumn.isNull(i)
+          && Math.abs(controlColumn.getLong(i) - previousEventValue)
+              > eventWindowParameter.getDelta()) {
+        break;
+      }
+      // judge whether we need update endTime
+      long currentTime = timeColumn.getLong(i);
+      if (eventWindow.getEndTime() < currentTime) {
+        eventWindow.setEndTime(currentTime);
+      }
+    }
+    // we can create a new window beginning at index i of inputTsBlock
+    if (i < size) {
+      needSkip = false;
+    }
+    return inputTsBlock.subTsBlock(i);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowManagerFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowManagerFactory.java
new file mode 100644
index 0000000000..bb7bf9dc6c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowManagerFactory.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+
+public class WindowManagerFactory {
+
+  public static IWindowManager genWindowManager(
+      WindowParameter windowParameter, ITimeRangeIterator timeRangeIterator) {
+    switch (windowParameter.getWindowType()) {
+      case TIME_WINDOW:
+        return new TimeWindowManager(timeRangeIterator, (TimeWindowParameter) windowParameter);
+      case EVENT_WINDOW:
+        return ((EventWindowParameter) windowParameter).getDelta() == 0
+            ? genEqualEventWindowManager(
+                (EventWindowParameter) windowParameter, timeRangeIterator.isAscending())
+            : genVariationEventWindowManager(
+                (EventWindowParameter) windowParameter, timeRangeIterator.isAscending());
+      default:
+        throw new IllegalArgumentException("Not support this type of aggregation window.");
+    }
+  }
+
+  private static EventWindowManager genEqualEventWindowManager(
+      EventWindowParameter eventWindowParameter, boolean ascending) {
+    switch (eventWindowParameter.getDataType()) {
+      case INT32:
+        return new EqualEventIntWindowManager(eventWindowParameter, ascending);
+      case INT64:
+        return new EqualEventLongWindowManager(eventWindowParameter, ascending);
+      case FLOAT:
+        return new EqualEventFloatWindowManager(eventWindowParameter, ascending);
+      case DOUBLE:
+        return new EqualEventDoubleWindowManager(eventWindowParameter, ascending);
+      case TEXT:
+        return new EqualEventTextWindowManager(eventWindowParameter, ascending);
+      case BOOLEAN:
+        return new EqualEventBooleanWindowManager(eventWindowParameter, ascending);
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format(
+                "Unsupported data type in equal event aggregation : %s",
+                eventWindowParameter.getDataType()));
+    }
+  }
+
+  private static EventWindowManager genVariationEventWindowManager(
+      EventWindowParameter eventWindowParameter, boolean ascending) {
+    switch (eventWindowParameter.getDataType()) {
+      case INT32:
+        return new VariationEventIntWindowManager(eventWindowParameter, ascending);
+      case INT64:
+        return new VariationEventLongWindowManager(eventWindowParameter, ascending);
+      case FLOAT:
+        return new VariationEventFloatWindowManager(eventWindowParameter, ascending);
+      case DOUBLE:
+        return new VariationEventDoubleWindowManager(eventWindowParameter, ascending);
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format(
+                "Unsupported data type in variation event aggregation : %s",
+                eventWindowParameter.getDataType()));
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowParameter.java
similarity index 50%
copy from server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowParameter.java
index 249cf458fd..3ba9bdbf7f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowParameter.java
@@ -17,35 +17,23 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.aggregation;
+package org.apache.iotdb.db.mpp.execution.operator.window;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
-import org.apache.iotdb.tsfile.read.common.block.column.Column;
+public abstract class WindowParameter {
 
-public class MinTimeDescAccumulator extends MinTimeAccumulator {
+  protected WindowType windowType;
 
-  @Override
-  public int addInput(Column[] column, IWindow curWindow) {
-    int curPositionCount = column[0].getPositionCount();
+  private final boolean needOutputEndTime;
 
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (column[0].isNull(i)) {
-        continue;
-      }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint();
-      if (!column[2].isNull(i)) {
-        updateMinTime(column[1].getLong(i));
-      }
-    }
-    return curPositionCount;
+  public WindowParameter(boolean needOutputEndTime) {
+    this.needOutputEndTime = needOutputEndTime;
   }
 
-  @Override
-  public boolean hasFinalResult() {
-    return false;
+  public WindowType getWindowType() {
+    return windowType;
+  }
+
+  public boolean isNeedOutputEndTime() {
+    return needOutputEndTime;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java
similarity index 50%
copy from server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java
index 249cf458fd..6a6ecd6185 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java
@@ -17,35 +17,20 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.aggregation;
+package org.apache.iotdb.db.mpp.execution.operator.window;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
-import org.apache.iotdb.tsfile.read.common.block.column.Column;
+public enum WindowType {
+  TIME_WINDOW((byte) 0),
 
-public class MinTimeDescAccumulator extends MinTimeAccumulator {
+  EVENT_WINDOW((byte) 1);
 
-  @Override
-  public int addInput(Column[] column, IWindow curWindow) {
-    int curPositionCount = column[0].getPositionCount();
+  private final byte type;
 
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (column[0].isNull(i)) {
-        continue;
-      }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint();
-      if (!column[2].isNull(i)) {
-        updateMinTime(column[1].getLong(i));
-      }
-    }
-    return curPositionCount;
+  WindowType(byte type) {
+    this.type = type;
   }
 
-  @Override
-  public boolean hasFinalResult() {
-    return false;
+  public byte getType() {
+    return type;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 1e41632155..a083eb15d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -118,6 +118,8 @@ import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.ShowQueriesOperator;
+import org.apache.iotdb.db.mpp.execution.operator.window.TimeWindowParameter;
+import org.apache.iotdb.db.mpp.execution.operator.window.WindowParameter;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.ExpressionTypeAnalyzer;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
@@ -1388,13 +1390,16 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
           calculateMaxAggregationResultSize(
               aggregationDescriptors, timeRangeIterator, context.getTypeProvider());
 
+      WindowParameter windowParameter = new TimeWindowParameter(false);
+
       return new RawDataAggregationOperator(
           operatorContext,
           aggregators,
           timeRangeIterator,
           children.get(0),
           ascending,
-          maxReturnSize);
+          maxReturnSize,
+          windowParameter);
     } else {
       OperatorContext operatorContext =
           context
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
index ac812e0b17..ce9df44997 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java
@@ -70,6 +70,8 @@ import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.window.TimeWindowParameter;
+import org.apache.iotdb.db.mpp.execution.operator.window.WindowParameter;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
@@ -1267,6 +1269,8 @@ public class OperatorMemoryTest {
         AggregationUtil.calculateMaxAggregationResultSize(
             aggregationDescriptors, timeRangeIterator, typeProvider);
 
+    WindowParameter windowParameter = new TimeWindowParameter(false);
+
     RawDataAggregationOperator rawDataAggregationOperator =
         new RawDataAggregationOperator(
             Mockito.mock(OperatorContext.class),
@@ -1274,7 +1278,8 @@ public class OperatorMemoryTest {
             timeRangeIterator,
             child,
             true,
-            maxReturnSize);
+            maxReturnSize,
+            windowParameter);
 
     long expectedMaxReturnSize =
         100
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
index e4e70431e1..f89cebafac 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
@@ -39,6 +39,9 @@ import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.execution.operator.window.EventWindowParameter;
+import org.apache.iotdb.db.mpp.execution.operator.window.TimeWindowParameter;
+import org.apache.iotdb.db.mpp.execution.operator.window.WindowParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationType;
@@ -49,6 +52,9 @@ import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterType;
+import org.apache.iotdb.tsfile.read.filter.operator.Gt;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.junit.After;
@@ -68,6 +74,7 @@ import static org.apache.iotdb.db.mpp.execution.operator.AggregationOperatorTest
 import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class RawDataAggregationOperatorTest {
 
@@ -121,8 +128,10 @@ public class RawDataAggregationOperatorTest {
       }
     }
 
+    WindowParameter windowParameter = new TimeWindowParameter(false);
+
     RawDataAggregationOperator rawDataAggregationOperator =
-        initRawDataAggregationOperator(aggregationTypes, null, inputLocations);
+        initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
     int count = 0;
     while (rawDataAggregationOperator.hasNext()) {
       TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -174,8 +183,10 @@ public class RawDataAggregationOperatorTest {
       inputLocations.add(inputLocationForOneAggregator);
     }
 
+    WindowParameter windowParameter = new TimeWindowParameter(false);
+
     RawDataAggregationOperator rawDataAggregationOperator =
-        initRawDataAggregationOperator(aggregationTypes, null, inputLocations);
+        initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
     int count = 0;
     while (rawDataAggregationOperator.hasNext()) {
       TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -198,7 +209,7 @@ public class RawDataAggregationOperatorTest {
 
   /** Test aggregating raw data by time interval. */
   @Test
-  public void groupByRawDataTest1() throws IllegalPathException {
+  public void groupByTimeRawDataTest1() throws IllegalPathException {
     int[][] result =
         new int[][] {
           {100, 100, 100, 99},
@@ -225,8 +236,11 @@ public class RawDataAggregationOperatorTest {
     }
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
 
+    WindowParameter windowParameter = new TimeWindowParameter(false);
+
     RawDataAggregationOperator rawDataAggregationOperator =
-        initRawDataAggregationOperator(aggregationTypes, groupByTimeParameter, inputLocations);
+        initRawDataAggregationOperator(
+            aggregationTypes, groupByTimeParameter, inputLocations, windowParameter);
     int count = 0;
     while (rawDataAggregationOperator.hasNext()) {
       TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -250,7 +264,7 @@ public class RawDataAggregationOperatorTest {
 
   /** Test aggregating raw data by time interval. */
   @Test
-  public void groupByRawDataTest2() throws IllegalPathException {
+  public void groupByTimeRawDataTest2() throws IllegalPathException {
     double[][] result =
         new double[][] {
           {20049.5, 20149.5, 6249.5, 8429.808},
@@ -278,8 +292,12 @@ public class RawDataAggregationOperatorTest {
       inputLocations.add(inputLocationForOneAggregator);
     }
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
+
+    WindowParameter windowParameter = new TimeWindowParameter(false);
+
     RawDataAggregationOperator rawDataAggregationOperator =
-        initRawDataAggregationOperator(aggregationTypes, groupByTimeParameter, inputLocations);
+        initRawDataAggregationOperator(
+            aggregationTypes, groupByTimeParameter, inputLocations, windowParameter);
     int count = 0;
     while (rawDataAggregationOperator.hasNext()) {
       TsBlock resultTsBlock = rawDataAggregationOperator.next();
@@ -303,10 +321,494 @@ public class RawDataAggregationOperatorTest {
     assertEquals(4, count);
   }
 
+  /** Test by time interval with EndTime */
+  @Test
+  public void groupByTimeRawDataTest3() throws IllegalPathException {
+    int[][] result =
+        new int[][] {
+          {100, 100, 100, 99},
+          {2004950, 2014950, 624950, 834551},
+          {0, 100, 200, 300},
+          {99, 199, 299, 398},
+          {20099, 20199, 10259, 10379},
+          {20000, 20100, 260, 380},
+          {20000, 20100, 10200, 10300},
+          {20099, 20199, 299, 398}
+        };
+    List<AggregationType> aggregationTypes = new ArrayList<>();
+    List<List<InputLocation[]>> inputLocations = new ArrayList<>();
+    for (int i = 0; i < 2; i++) {
+      aggregationTypes.add(AggregationType.COUNT);
+      aggregationTypes.add(AggregationType.SUM);
+      aggregationTypes.add(AggregationType.MIN_TIME);
+      aggregationTypes.add(AggregationType.MAX_TIME);
+      aggregationTypes.add(AggregationType.MAX_VALUE);
+      aggregationTypes.add(AggregationType.MIN_VALUE);
+      aggregationTypes.add(AggregationType.FIRST_VALUE);
+      aggregationTypes.add(AggregationType.LAST_VALUE);
+      for (int j = 0; j < 8; j++) {
+        List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+        inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+        inputLocations.add(inputLocationForOneAggregator);
+      }
+    }
+    GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
+
+    WindowParameter windowParameter = new TimeWindowParameter(true);
+
+    RawDataAggregationOperator rawDataAggregationOperator =
+        initRawDataAggregationOperator(
+            aggregationTypes, groupByTimeParameter, inputLocations, windowParameter);
+    int count = 0;
+    while (rawDataAggregationOperator.hasNext()) {
+      TsBlock resultTsBlock = rawDataAggregationOperator.next();
+      if (resultTsBlock == null) {
+        continue;
+      }
+
+      for (int row = 0; row < resultTsBlock.getPositionCount(); row++, count++) {
+        assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(row));
+        // endTime
+        long endTime = 100L * count + 99;
+        if (count == 3) {
+          endTime = 398;
+        }
+        assertEquals(endTime, resultTsBlock.getColumn(0).getLong(row));
+        for (int i = 0; i < 2; i++) {
+          assertEquals(result[0][count], resultTsBlock.getColumn(8 * i + 1).getLong(row));
+          assertEquals(result[1][count], resultTsBlock.getColumn(8 * i + 2).getDouble(row), 0.0001);
+          assertEquals(result[2][count], resultTsBlock.getColumn(8 * i + 3).getLong(row));
+          assertEquals(result[3][count], resultTsBlock.getColumn(8 * i + 4).getLong(row));
+          assertEquals(result[4][count], resultTsBlock.getColumn(8 * i + 5).getInt(row));
+          assertEquals(result[5][count], resultTsBlock.getColumn(8 * i + 6).getInt(row));
+          assertEquals(result[6][count], resultTsBlock.getColumn(8 * i + 7).getInt(row));
+          assertEquals(result[7][count], resultTsBlock.getColumn(8 * i + 8).getInt(row));
+        }
+      }
+    }
+    assertEquals(4, count);
+  }
+
+  /** 0 - 99 100 - 199 200 - 299 300 - 399 400 - 499 500 - 599 */
+  @Test
+  public void groupByTimeRawDataTest4() throws IllegalPathException {
+    int[][] result =
+        new int[][] {
+          {100, 100, 100, 100, 100, 0},
+          {20099, 20199, 10259, 10379, 10499},
+          {20000, 20100, 260, 380, 10400},
+          {20000, 20100, 10200, 10300, 10400},
+          {20099, 20199, 299, 399, 10499}
+        };
+    List<AggregationType> aggregationTypes = new ArrayList<>();
+    List<List<InputLocation[]>> inputLocations = new ArrayList<>();
+    for (int i = 0; i < 2; i++) {
+      aggregationTypes.add(AggregationType.COUNT);
+      aggregationTypes.add(AggregationType.MAX_VALUE);
+      aggregationTypes.add(AggregationType.MIN_VALUE);
+      aggregationTypes.add(AggregationType.FIRST_VALUE);
+      aggregationTypes.add(AggregationType.LAST_VALUE);
+      for (int j = 0; j < 8; j++) {
+        List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+        inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+        inputLocations.add(inputLocationForOneAggregator);
+      }
+    }
+    GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 600, 100, 100, true);
+
+    WindowParameter windowParameter = new TimeWindowParameter(true);
+
+    RawDataAggregationOperator rawDataAggregationOperator =
+        initRawDataAggregationOperator(
+            aggregationTypes, groupByTimeParameter, inputLocations, windowParameter);
+    int count = 0;
+    while (rawDataAggregationOperator.hasNext()) {
+      TsBlock resultTsBlock = rawDataAggregationOperator.next();
+      if (resultTsBlock == null) {
+        continue;
+      }
+      for (int row = 0; row < resultTsBlock.getPositionCount(); row++, count++) {
+        assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(row));
+        // endTime
+        assertEquals(100L * count + 99, resultTsBlock.getColumn(0).getLong(row));
+        for (int i = 0; i < 2; i++) {
+          if (count == 5) {
+            assertEquals(result[0][count], resultTsBlock.getColumn(5 * i + 1).getLong(row));
+            assertTrue(resultTsBlock.getColumn(5 * i + 2).isNull(row));
+            assertTrue(resultTsBlock.getColumn(5 * i + 3).isNull(row));
+            assertTrue(resultTsBlock.getColumn(5 * i + 4).isNull(row));
+            assertTrue(resultTsBlock.getColumn(5 * i + 5).isNull(row));
+            continue;
+          }
+          assertEquals(result[0][count], resultTsBlock.getColumn(5 * i + 1).getLong(row));
+          assertEquals(result[1][count], resultTsBlock.getColumn(5 * i + 2).getInt(row));
+          assertEquals(result[2][count], resultTsBlock.getColumn(5 * i + 3).getInt(row));
+          assertEquals(result[3][count], resultTsBlock.getColumn(5 * i + 4).getInt(row));
+          assertEquals(result[4][count], resultTsBlock.getColumn(5 * i + 5).getInt(row));
+        }
+      }
+    }
+    assertEquals(6, count);
+  }
+
+  /**
+   * test the situation when leftCRightO is false. 1 - 100 101 - 200 201 - 300 301 - 400 401 - 500
+   * 501 - 600
+   */
+  @Test
+  public void groupByTimeRawDataTest5() throws IllegalPathException {
+    int[][] result =
+        new int[][] {
+          {100, 100, 100, 100, 99, 0},
+          {20100, 20199, 10300, 10400, 10499}, // max
+          {20001, 10200, 260, 380, 10401}, // min
+          {20001, 20101, 10201, 10301, 10401}, // first
+          {20100, 10200, 10300, 10400, 10499} // last
+        };
+    List<AggregationType> aggregationTypes = new ArrayList<>();
+    List<List<InputLocation[]>> inputLocations = new ArrayList<>();
+    for (int i = 0; i < 2; i++) {
+      aggregationTypes.add(AggregationType.COUNT);
+      aggregationTypes.add(AggregationType.MAX_VALUE);
+      aggregationTypes.add(AggregationType.MIN_VALUE);
+      aggregationTypes.add(AggregationType.FIRST_VALUE);
+      aggregationTypes.add(AggregationType.LAST_VALUE);
+      for (int j = 0; j < 8; j++) {
+        List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+        inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+        inputLocations.add(inputLocationForOneAggregator);
+      }
+    }
+    GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 600, 100, 100, false);
+
+    WindowParameter windowParameter = new TimeWindowParameter(false);
+
+    RawDataAggregationOperator rawDataAggregationOperator =
+        initRawDataAggregationOperator(
+            aggregationTypes, groupByTimeParameter, inputLocations, windowParameter);
+    int count = 0;
+    while (rawDataAggregationOperator.hasNext()) {
+      TsBlock resultTsBlock = rawDataAggregationOperator.next();
+      if (resultTsBlock == null) {
+        continue;
+      }
+      for (int row = 0; row < resultTsBlock.getPositionCount(); row++, count++) {
+        assertEquals(100 * (count + 1), resultTsBlock.getTimeColumn().getLong(row));
+        for (int i = 0; i < 2; i++) {
+          if (count == 5) {
+            assertEquals(result[0][count], resultTsBlock.getColumn(5 * i).getLong(row));
+            assertTrue(resultTsBlock.getColumn(5 * i + 1).isNull(row));
+            assertTrue(resultTsBlock.getColumn(5 * i + 2).isNull(row));
+            assertTrue(resultTsBlock.getColumn(5 * i + 3).isNull(row));
+            assertTrue(resultTsBlock.getColumn(5 * i + 4).isNull(row));
+            continue;
+          }
+          assertEquals(result[0][count], resultTsBlock.getColumn(5 * i).getLong(row));
+          assertEquals(result[1][count], resultTsBlock.getColumn(5 * i + 1).getInt(row));
+          assertEquals(result[2][count], resultTsBlock.getColumn(5 * i + 2).getInt(row));
+          assertEquals(result[3][count], resultTsBlock.getColumn(5 * i + 3).getInt(row));
+          assertEquals(result[4][count], resultTsBlock.getColumn(5 * i + 4).getInt(row));
+        }
+      }
+    }
+    assertEquals(6, count);
+  }
+
+  /** 0 - 259 260 - 299 `300 - 499 */
+  @Test
+  public void groupByEventRawDataTest1() throws IllegalPathException {
+    int[][] result =
+        new int[][] {
+          {0, 260, 300},
+          {259, 299, 499},
+          {20000, 260, 10300},
+          {10259, 299, 10499}
+        };
+    List<AggregationType> aggregationTypes = new ArrayList<>();
+    List<List<InputLocation[]>> inputLocations = new ArrayList<>();
+    for (int i = 0; i < 2; i++) {
+      aggregationTypes.add(AggregationType.MIN_TIME);
+      List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+      inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+      inputLocations.add(inputLocationForOneAggregator);
+    }
+    for (int i = 0; i < 2; i++) {
+      aggregationTypes.add(AggregationType.MAX_TIME);
+      List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+      inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+      inputLocations.add(inputLocationForOneAggregator);
+    }
+    for (int i = 0; i < 2; i++) {
+      aggregationTypes.add(AggregationType.FIRST_VALUE);
+      List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+      inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+      inputLocations.add(inputLocationForOneAggregator);
+    }
+    for (int i = 0; i < 2; i++) {
+      aggregationTypes.add(AggregationType.LAST_VALUE);
+      List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+      inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+      inputLocations.add(inputLocationForOneAggregator);
+    }
+
+    WindowParameter windowParameter =
+        new EventWindowParameter(TSDataType.INT32, 0, false, false, 10000);
+
+    RawDataAggregationOperator rawDataAggregationOperator =
+        initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
+    int count = 0;
+    while (rawDataAggregationOperator.hasNext()) {
+      TsBlock resultTsBlock = rawDataAggregationOperator.next();
+      if (resultTsBlock == null) {
+        continue;
+      }
+      for (int row = 0; row < resultTsBlock.getPositionCount(); row++, count++) {
+        for (int i = 0; i < 2; i++) {
+          assertEquals(result[0][count], resultTsBlock.getColumn(i).getLong(row));
+        }
+        for (int i = 2; i < 4; i++) {
+          assertEquals(result[1][count], resultTsBlock.getColumn(i).getLong(row));
+        }
+        for (int i = 4; i < 6; i++) {
+          assertEquals(result[2][count], resultTsBlock.getColumn(i).getInt(row));
+        }
+        for (int i = 6; i < 8; i++) {
+          assertEquals(result[3][count], resultTsBlock.getColumn(i).getInt(row));
+        }
+      }
+    }
+    assertEquals(3, count);
+  }
+
+  @Test
+  public void groupByEventRawDataTest2() throws IllegalPathException {
+    int[][] result =
+        new int[][] {
+          {4019900, 613770, 11180, 827160, 7790, 1044950},
+          {200, 60, 40, 80, 20, 100},
+          {20000, 10200, 260, 10300, 380, 10400}
+        };
+    long[][] resultTime =
+        new long[][] {
+          {0, 200, 260, 300, 380, 400},
+          {199, 259, 299, 379, 399, 499}
+        };
+    List<AggregationType> aggregationTypes = new ArrayList<>();
+    List<List<InputLocation[]>> inputLocations = new ArrayList<>();
+    for (int i = 0; i < 2; i++) {
+      aggregationTypes.add(AggregationType.SUM);
+      List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+      inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+      inputLocations.add(inputLocationForOneAggregator);
+    }
+    for (int i = 0; i < 2; i++) {
+      aggregationTypes.add(AggregationType.COUNT);
+      List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+      inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+      inputLocations.add(inputLocationForOneAggregator);
+    }
+    for (int i = 0; i < 2; i++) {
+      aggregationTypes.add(AggregationType.FIRST_VALUE);
+      List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+      inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+      inputLocations.add(inputLocationForOneAggregator);
+    }
+
+    WindowParameter windowParameter =
+        new EventWindowParameter(TSDataType.INT32, 0, true, false, 5000);
+
+    RawDataAggregationOperator rawDataAggregationOperator =
+        initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
+    int count = 0;
+    while (rawDataAggregationOperator.hasNext()) {
+      TsBlock resultTsBlock = rawDataAggregationOperator.next();
+      if (resultTsBlock == null) {
+        continue;
+      }
+      for (int row = 0; row < resultTsBlock.getPositionCount(); row++, count++) {
+        assertEquals(resultTime[0][count], resultTsBlock.getTimeByIndex(row));
+        assertEquals(resultTime[1][count], resultTsBlock.getColumn(0).getLong(row));
+        for (int i = 1; i <= 2; i++) {
+          assertEquals(result[0][count], resultTsBlock.getColumn(i).getDouble(row), 0.01);
+        }
+        for (int i = 3; i <= 4; i++) {
+          assertEquals(result[1][count], resultTsBlock.getColumn(i).getLong(row));
+        }
+        for (int i = 5; i <= 6; i++) {
+          assertEquals(result[2][count], resultTsBlock.getColumn(i).getInt(row));
+        }
+      }
+    }
+    assertEquals(6, count);
+  }
+
+  @Test
+  public void groupByEventRawDataTest3() throws IllegalPathException {
+    int[][] result =
+        new int[][] {
+          {4019900, 613770, 11180, 827160, 7790, 1044950},
+          {200, 60, 40, 80, 20, 100},
+          {20000, 10200, 260, 10300, 380, 10400}
+        };
+    long[] resultTime = new long[] {0, 200, 260, 300, 380, 400};
+    List<AggregationType> aggregationTypes = new ArrayList<>();
+    List<List<InputLocation[]>> inputLocations = new ArrayList<>();
+    for (int i = 0; i < 2; i++) {
+      aggregationTypes.add(AggregationType.SUM);
+      List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+      inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+      inputLocations.add(inputLocationForOneAggregator);
+    }
+    for (int i = 0; i < 2; i++) {
+      aggregationTypes.add(AggregationType.COUNT);
+      List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+      inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+      inputLocations.add(inputLocationForOneAggregator);
+    }
+    for (int i = 0; i < 2; i++) {
+      aggregationTypes.add(AggregationType.FIRST_VALUE);
+      List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+      inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+      inputLocations.add(inputLocationForOneAggregator);
+    }
+
+    WindowParameter windowParameter =
+        new EventWindowParameter(TSDataType.INT32, 0, false, false, 5000);
+
+    RawDataAggregationOperator rawDataAggregationOperator =
+        initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
+    int count = 0;
+    while (rawDataAggregationOperator.hasNext()) {
+      TsBlock resultTsBlock = rawDataAggregationOperator.next();
+      if (resultTsBlock == null) {
+        continue;
+      }
+      for (int row = 0; row < resultTsBlock.getPositionCount(); row++, count++) {
+        assertEquals(resultTime[count], resultTsBlock.getTimeByIndex(row));
+        for (int i = 0; i < 2; i++) {
+          assertEquals(result[0][count], resultTsBlock.getColumn(i).getDouble(row), 0.01);
+        }
+        for (int i = 2; i < 4; i++) {
+          assertEquals(result[1][count], resultTsBlock.getColumn(i).getLong(row));
+        }
+        for (int i = 4; i < 6; i++) {
+          assertEquals(result[2][count], resultTsBlock.getColumn(i).getInt(row));
+        }
+      }
+    }
+    assertEquals(6, count);
+  }
+  /** 0 - 199 200 - 259 260 - 299 300 - 379 380 - 399 400 - 499 */
+  @Test
+  public void groupByEventRawDataTest4() throws IllegalPathException {
+    int[] result =
+        new int[] {
+          20000, 10200, 260, 10300, 380, 10400,
+        };
+    long[][] resultTime =
+        new long[][] {
+          {0, 200, 260, 300, 380, 400},
+          {199, 259, 299, 379, 399, 499}
+        };
+
+    List<AggregationType> aggregationTypes = new ArrayList<>();
+    List<List<InputLocation[]>> inputLocations = new ArrayList<>();
+    for (int i = 0; i < 2; i++) {
+      aggregationTypes.add(AggregationType.FIRST_VALUE);
+      List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+      inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+      inputLocations.add(inputLocationForOneAggregator);
+    }
+
+    WindowParameter windowParameter =
+        new EventWindowParameter(TSDataType.INT32, 0, true, false, 5000);
+
+    RawDataAggregationOperator rawDataAggregationOperator =
+        initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
+    int count = 0;
+    while (rawDataAggregationOperator.hasNext()) {
+      TsBlock resultTsBlock = rawDataAggregationOperator.next();
+      if (resultTsBlock == null) {
+        continue;
+      }
+      for (int row = 0; row < resultTsBlock.getPositionCount(); row++, count++) {
+        assertEquals(resultTime[0][count], resultTsBlock.getTimeByIndex(row));
+        assertEquals(resultTime[1][count], resultTsBlock.getColumn(0).getLong(row));
+        for (int i = 1; i <= 2; i++) {
+          assertEquals(result[count], resultTsBlock.getColumn(i).getInt(row));
+        }
+      }
+    }
+    assertEquals(6, count);
+  }
+
+  @Test
+  public void onePointInOneEqualEventWindowTest() throws IllegalPathException {
+    WindowParameter windowParameter =
+        new EventWindowParameter(TSDataType.INT32, 0, false, false, 0);
+    onePointInOneWindowTest(windowParameter);
+  }
+
+  @Test
+  public void onePointInOneVariationEventWindowTest() throws IllegalPathException {
+    WindowParameter windowParameter =
+        new EventWindowParameter(TSDataType.INT32, 0, false, false, 0.5);
+    onePointInOneWindowTest(windowParameter);
+  }
+
+  private void onePointInOneWindowTest(WindowParameter windowParameter)
+      throws IllegalPathException {
+    List<AggregationType> aggregationTypes = new ArrayList<>();
+    List<List<InputLocation[]>> inputLocations = new ArrayList<>();
+    for (int i = 0; i < 2; i++) {
+      aggregationTypes.add(AggregationType.COUNT);
+      List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+      inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+      inputLocations.add(inputLocationForOneAggregator);
+    }
+    for (int i = 0; i < 2; i++) {
+      aggregationTypes.add(AggregationType.MIN_TIME);
+      List<InputLocation[]> inputLocationForOneAggregator = new ArrayList<>();
+      inputLocationForOneAggregator.add(new InputLocation[] {new InputLocation(0, i)});
+      inputLocations.add(inputLocationForOneAggregator);
+    }
+
+    RawDataAggregationOperator rawDataAggregationOperator =
+        initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
+
+    int resultMinTime1 = -1, resultMinTime2 = -1;
+    while (rawDataAggregationOperator.hasNext()) {
+      TsBlock resultTsBlock = rawDataAggregationOperator.next();
+      if (resultTsBlock == null) {
+        continue;
+      }
+      for (int i = 0; i < 2; i++) {
+        for (int j = 0; j < resultTsBlock.getColumn(i).getPositionCount(); j++) {
+          assertEquals(1, resultTsBlock.getColumn(i).getLong(j));
+        }
+      }
+      // Here, a resultTsBlock has many aggregation results instead of one.
+      for (int i = 2; i < 4; i++) {
+        if (i == 2) {
+          for (int j = 0; j < resultTsBlock.getColumn(i).getPositionCount(); j++) {
+            assertEquals(++resultMinTime1, resultTsBlock.getColumn(i).getLong(j));
+          }
+        } else {
+          for (int j = 0; j < resultTsBlock.getColumn(i).getPositionCount(); j++) {
+            assertEquals(++resultMinTime2, resultTsBlock.getColumn(i).getLong(j));
+          }
+        }
+      }
+    }
+    assertEquals(resultMinTime1, 499);
+    assertEquals(resultMinTime2, 499);
+  }
+
   private RawDataAggregationOperator initRawDataAggregationOperator(
       List<AggregationType> aggregationTypes,
       GroupByTimeParameter groupByTimeParameter,
-      List<List<InputLocation[]>> inputLocations)
+      List<List<InputLocation[]>> inputLocations,
+      WindowParameter windowParameter)
       throws IllegalPathException {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
@@ -339,6 +841,10 @@ public class RawDataAggregationOperatorTest {
               operatorContext.setMaxRunTime(TEST_TIME_SLICE);
             });
 
+    Filter timeFilter = null;
+    if (groupByTimeParameter != null && !groupByTimeParameter.isLeftCRightO()) {
+      timeFilter = new Gt<>(0L, FilterType.TIME_FILTER);
+    }
     SeriesScanOperator seriesScanOperator1 =
         new SeriesScanOperator(
             driverContext.getOperatorContexts().get(0),
@@ -346,7 +852,7 @@ public class RawDataAggregationOperatorTest {
             measurementPath1,
             allSensors,
             TSDataType.INT32,
-            null,
+            timeFilter,
             null,
             true);
     seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
@@ -360,7 +866,7 @@ public class RawDataAggregationOperatorTest {
             measurementPath2,
             allSensors,
             TSDataType.INT32,
-            null,
+            timeFilter,
             null,
             true);
     seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
@@ -389,6 +895,7 @@ public class RawDataAggregationOperatorTest {
         initTimeRangeIterator(groupByTimeParameter, true, true),
         timeJoinOperator,
         true,
-        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
+        DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
+        windowParameter);
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
index 7f3bf6e268..b331966809 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
@@ -53,6 +53,24 @@ import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR;
  *
  * <p>UnSequence time range of data: [0, 19], [100, 139], [200, 259], [300, 379], [400, 499], [0,
  * 199]
+ *
+ * <p>The time series generated by this util are:
+ *
+ * <p>time value
+ *
+ * <p>0-99 20000-20099
+ *
+ * <p>100-199 20100-20199
+ *
+ * <p>200-259 10200-10259
+ *
+ * <p>260-299 260-299
+ *
+ * <p>300-379 10300-10379
+ *
+ * <p>380-399 380-399
+ *
+ * <p>400-499 10400-10499
  */
 public class SeriesReaderTestUtil {