You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2023/03/03 09:16:35 UTC

[iotdb] branch rel/1.1 updated: [IOTDB-5563] Extract and decouple the logic of window segmentation in Aggregator (#9207)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new 697fd43565 [IOTDB-5563] Extract and decouple the logic of window segmentation in Aggregator (#9207)
697fd43565 is described below

commit 697fd43565a8493f2475b6e2e981c3d3c2b60e9f
Author: YangCaiyin <yc...@gmail.com>
AuthorDate: Fri Mar 3 17:16:28 2023 +0800

    [IOTDB-5563] Extract and decouple the logic of window segmentation in Aggregator (#9207)
    
    (cherry picked from commit 0a4a84fca3fc785af585b4589c66f0b84046d175)
---
 .../db/it/groupby/IoTDBGroupByConditionIT.java     |   2 +-
 .../iotdb/db/it/groupby/IoTDBGroupBySessionIT.java |  18 +--
 ...eventWindow.ftl => abstractVariationWindow.ftl} |  30 ++---
 .../main/codegen/templates/eventWindowManager.ftl  |  51 -------
 .../{evEventWindow.ftl => variationWindow.ftl}     |  16 +--
 ...indowManager.ftl => variationWindowManager.ftl} |  36 ++---
 .../iotdb/db/mpp/aggregation/Accumulator.java      |  16 +--
 .../iotdb/db/mpp/aggregation/Aggregator.java       |  35 +----
 .../iotdb/db/mpp/aggregation/AvgAccumulator.java   |  90 +++++--------
 .../iotdb/db/mpp/aggregation/CountAccumulator.java |  23 ++--
 .../db/mpp/aggregation/CountIfAccumulator.java     |  25 ++--
 .../db/mpp/aggregation/ExtremeAccumulator.java     |  90 +++++--------
 .../db/mpp/aggregation/FirstValueAccumulator.java  | 150 +++++++--------------
 .../mpp/aggregation/FirstValueDescAccumulator.java | 110 +++++----------
 .../db/mpp/aggregation/LastValueAccumulator.java   | 132 +++++++-----------
 .../mpp/aggregation/LastValueDescAccumulator.java  | 122 +++++------------
 .../db/mpp/aggregation/MaxTimeAccumulator.java     |  22 +--
 .../db/mpp/aggregation/MaxTimeDescAccumulator.java |  25 ++--
 .../db/mpp/aggregation/MaxValueAccumulator.java    |  94 +++++--------
 .../db/mpp/aggregation/MinTimeAccumulator.java     |  25 ++--
 .../db/mpp/aggregation/MinTimeDescAccumulator.java |  20 +--
 .../db/mpp/aggregation/MinValueAccumulator.java    |  93 +++++--------
 .../iotdb/db/mpp/aggregation/SumAccumulator.java   |  92 +++++--------
 .../slidingwindow/SlidingWindowAggregator.java     |   5 +-
 .../db/mpp/execution/operator/AggregationUtil.java |  18 ++-
 .../operator/process/AggregationOperator.java      |   2 +-
 .../process/RawDataAggregationOperator.java        |  48 +++++--
 .../process/SlidingWindowAggregationOperator.java  |   5 +-
 .../AbstractSeriesAggregationScanOperator.java     |   2 +-
 ...entWindow.java => AbstractVariationWindow.java} |  40 +++---
 .../{SeriesWindow.java => ConditionWindow.java}    |  11 +-
 ...dowManager.java => ConditionWindowManager.java} |  63 +++------
 ...arameter.java => ConditionWindowParameter.java} |   6 +-
 .../db/mpp/execution/operator/window/IWindow.java  |   2 +-
 .../execution/operator/window/IWindowManager.java  |  52 +++++--
 .../execution/operator/window/SessionWindow.java   |   5 +-
 .../operator/window/SessionWindowManager.java      |  33 +----
 .../operator/window/TimeWindowManager.java         |  24 +---
 ...dowManager.java => VariationWindowManager.java} |  48 ++-----
 ...arameter.java => VariationWindowParameter.java} |   6 +-
 .../operator/window/WindowManagerFactory.java      |  40 +++---
 .../mpp/execution/operator/window/WindowType.java  |   4 +-
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  28 ++--
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       |  20 +--
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  34 +++--
 ...rameter.java => GroupByConditionParameter.java} |  14 +-
 .../planner/plan/parameter/GroupByParameter.java   |   6 +-
 .../plan/parameter/GroupByVariationParameter.java  |   2 +-
 ...mponent.java => GroupByConditionComponent.java} |   6 +-
 .../component/GroupByVariationComponent.java       |   2 +-
 .../db/mpp/plan/statement/crud/QueryStatement.java |  10 +-
 .../iotdb/db/mpp/aggregation/AccumulatorTest.java  |  55 ++++----
 .../operator/RawDataAggregationOperatorTest.java   |  15 ++-
 53 files changed, 727 insertions(+), 1196 deletions(-)

diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IoTDBGroupByConditionIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IoTDBGroupByConditionIT.java
index 8c9eeb555e..fdbea94e8c 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IoTDBGroupByConditionIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IoTDBGroupByConditionIT.java
@@ -210,7 +210,7 @@ public class IoTDBGroupByConditionIT {
           {"6", "2500000000", "2499999994", "13", "100.0"},
         };
     String sql =
-        "select __endTime,max_time(charging_status) - min_time(charging_status),count(vehicle_status),last_value(soc) from root.sg.beijing.car01 group by condition(soc>=24.0,KEEP<=20)";
+        "select __endTime,max_time(charging_status) - min_time(charging_status),count(vehicle_status),last_value(soc) from root.sg.beijing.car01 group by condition(soc>=24.0,KEEP<=15)";
     normalTestWithEndTime(res, sql);
   }
 
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IoTDBGroupBySessionIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IoTDBGroupBySessionIT.java
index c9f4f2fe17..7e4a8e3b07 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IoTDBGroupBySessionIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/groupby/IoTDBGroupBySessionIT.java
@@ -369,11 +369,7 @@ public class IoTDBGroupBySessionIT {
         };
     String sql =
         "select __endTime,count(status), avg(temperature), sum(hardware), first_value(hardware) from root.ln.** group by session(50s) having count(status)>5 align by device";
-    normalTestAlignByDevice(
-        res,
-        sql,
-        1,
-        "Time,Device,__endTime,count(status),avg(temperature),sum(hardware),first_value(hardware)");
+    normalTestAlignByDevice(res, sql, 1);
   }
 
   @Test
@@ -386,11 +382,7 @@ public class IoTDBGroupBySessionIT {
         };
     String sql =
         "select __endTime,count(status), avg(temperature), sum(hardware), first_value(hardware) from root.ln.** group by session(1d) align by device";
-    normalTestAlignByDevice(
-        res,
-        sql,
-        2,
-        "Time,Device,__endTime,count(status),avg(temperature),sum(hardware),first_value(hardware)");
+    normalTestAlignByDevice(res, sql, 2);
   }
 
   @Test
@@ -430,13 +422,15 @@ public class IoTDBGroupBySessionIT {
     normalTestAlignByDevice2(res, sql, 2, "Time,Device,__endTime,count(hardware)");
   }
 
-  private void normalTestAlignByDevice(String[][] res, String sql, int split, String title) {
+  private void normalTestAlignByDevice(String[][] res, String sql, int split) {
     try (Connection connection = EnvFactory.getEnv().getConnection();
         Statement statement = connection.createStatement()) {
 
       try (ResultSet resultSet = statement.executeQuery(sql)) {
         ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
-        checkHeader(resultSetMetaData, title);
+        checkHeader(
+            resultSetMetaData,
+            "Time,Device,__endTime,count(status),avg(temperature),sum(hardware),first_value(hardware)");
         int count = 0;
         String device = "root.ln.wf02.wt01";
         while (resultSet.next()) {
diff --git a/server/src/main/codegen/templates/eventWindow.ftl b/server/src/main/codegen/templates/abstractVariationWindow.ftl
similarity index 73%
rename from server/src/main/codegen/templates/eventWindow.ftl
rename to server/src/main/codegen/templates/abstractVariationWindow.ftl
index 04b461c3e7..aa8083cfaa 100644
--- a/server/src/main/codegen/templates/eventWindow.ftl
+++ b/server/src/main/codegen/templates/abstractVariationWindow.ftl
@@ -20,7 +20,7 @@
 
 <#list allDataTypes.types as type>
 
-    <#assign className = "Event${type.dataType?cap_first}Window">
+    <#assign className = "AbstractVariation${type.dataType?cap_first}Window">
     <@pp.changeOutputFile name="/org/apache/iotdb/db/mpp/execution/operator/window/${className}.java" />
 package org.apache.iotdb.db.mpp.execution.operator.window;
 
@@ -32,19 +32,19 @@ import org.apache.iotdb.tsfile.utils.Binary;
 /*
 * This class is generated using freemarker and the ${.template_name} template.
 */
-public abstract class ${className} extends EventWindow {
+public abstract class ${className} extends AbstractVariationWindow {
 
-  protected ${type.dataType} eventValue;
+  protected ${type.dataType} headValue;
 
-  private ${type.dataType} previousEventValue;
+  private ${type.dataType} previousValue;
 
-  public ${className}(EventWindowParameter eventWindowParameter) {
-    super(eventWindowParameter);
+  public ${className}(VariationWindowParameter variationWindowParameter) {
+    super(variationWindowParameter);
   }
 
   @Override
-  public void updatePreviousEventValue() {
-    previousEventValue = eventValue;
+  public void updatePreviousValue() {
+    previousValue = headValue;
   }
 
   @Override
@@ -59,25 +59,25 @@ public abstract class ${className} extends EventWindow {
       endTime = currentTime;
     }
     // judge whether we need initialize eventValue
-    if (!initializedEventValue) {
+    if (!initializedHeadValue) {
       startTime = currentTime;
       endTime = currentTime;
       if(controlTimeAndValueColumn[0].isNull(index)){
         valueIsNull = true;
       }else{
         valueIsNull = false;
-        eventValue = controlTimeAndValueColumn[0].get${type.dataType?cap_first}(index);
+        headValue = controlTimeAndValueColumn[0].get${type.dataType?cap_first}(index);
       }
-      initializedEventValue = true;
+      initializedHeadValue = true;
     }
   }
 
-  public ${type.dataType} getEventValue() {
-    return eventValue;
+  public ${type.dataType} getHeadValue() {
+    return headValue;
   }
 
-  public ${type.dataType} getPreviousEventValue() {
-    return previousEventValue;
+  public ${type.dataType} getPreviousHeadValue() {
+    return previousValue;
   }
 }
 
diff --git a/server/src/main/codegen/templates/eventWindowManager.ftl b/server/src/main/codegen/templates/eventWindowManager.ftl
deleted file mode 100644
index 4934268e46..0000000000
--- a/server/src/main/codegen/templates/eventWindowManager.ftl
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-<@pp.dropOutputFile />
-
-<#list allDataTypes.types as type>
-
-    <#assign className = "Event${type.dataType?cap_first}WindowManager">
-    <@pp.changeOutputFile name="/org/apache/iotdb/db/mpp/execution/operator/window/${className}.java" />
-package org.apache.iotdb.db.mpp.execution.operator.window;
-
-import org.apache.iotdb.db.mpp.aggregation.Aggregator;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-
-import java.util.List;
-
-/*
-* This class is generated using freemarker and the ${.template_name} template.
-*/
-public abstract class ${className} extends EventWindowManager {
-
-    public ${className}(EventWindowParameter eventWindowParameter, boolean ascending) {
-        super(eventWindowParameter, ascending);
-    }
-
-    @Override
-    public void appendAggregationResult(
-        TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
-        // Append aggregation results to valueColumnBuilders.
-        ColumnBuilder[] columnBuilders =
-            appendOriginAggregationResult(resultTsBlockBuilder, aggregators);
-    }
-}
-
-</#list>
\ No newline at end of file
diff --git a/server/src/main/codegen/templates/evEventWindow.ftl b/server/src/main/codegen/templates/variationWindow.ftl
similarity index 83%
rename from server/src/main/codegen/templates/evEventWindow.ftl
rename to server/src/main/codegen/templates/variationWindow.ftl
index 03b62cb586..5930fd0077 100644
--- a/server/src/main/codegen/templates/evEventWindow.ftl
+++ b/server/src/main/codegen/templates/variationWindow.ftl
@@ -30,22 +30,22 @@
             </#if>
         </#if>
 
-        <#assign className = "${compareType.compareType?cap_first}Event${dataType.dataType?cap_first}Window">
+        <#assign className = "${compareType.compareType?cap_first}${dataType.dataType?cap_first}Window">
         <@pp.changeOutputFile name="/org/apache/iotdb/db/mpp/execution/operator/window/${className}.java" />
 
 package org.apache.iotdb.db.mpp.execution.operator.window;
 
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 
-public class ${className} extends Event${dataType.dataType?cap_first}Window {
+public class ${className} extends AbstractVariation${dataType.dataType?cap_first}Window {
 
-  public ${className}(EventWindowParameter eventWindowParameter) {
-    super(eventWindowParameter);
+  public ${className}(VariationWindowParameter variationWindowParameter) {
+    super(variationWindowParameter);
   }
 
   @Override
   public boolean satisfy(Column column, int index) {
-    if (!initializedEventValue) {
+    if (!initializedHeadValue) {
       return true;
     }
     if(column.isNull(index)){
@@ -53,12 +53,12 @@ public class ${className} extends Event${dataType.dataType?cap_first}Window {
     }
     <#if compareType.compareType == "equal">
         <#if dataType.dataType == "Binary">
-    return !valueIsNull&&column.get${dataType.dataType?cap_first}(index).equals(eventValue);
+    return !valueIsNull&&column.get${dataType.dataType?cap_first}(index).equals(headValue);
         <#else>
-    return !valueIsNull&&column.get${dataType.dataType?cap_first}(index) == eventValue;
+    return !valueIsNull&&column.get${dataType.dataType?cap_first}(index) == headValue;
         </#if>
     <#else>
-    return !valueIsNull&&Math.abs(column.get${dataType.dataType?cap_first}(index) - eventValue) <= eventWindowParameter.getDelta();
+    return !valueIsNull&&Math.abs(column.get${dataType.dataType?cap_first}(index) - headValue) <= getDelta();
     </#if>
   }
 }
diff --git a/server/src/main/codegen/templates/evEventWindowManager.ftl b/server/src/main/codegen/templates/variationWindowManager.ftl
similarity index 74%
rename from server/src/main/codegen/templates/evEventWindowManager.ftl
rename to server/src/main/codegen/templates/variationWindowManager.ftl
index 1064e94295..5409884e86 100644
--- a/server/src/main/codegen/templates/evEventWindowManager.ftl
+++ b/server/src/main/codegen/templates/variationWindowManager.ftl
@@ -30,8 +30,8 @@
             </#if>
         </#if>
 
-        <#assign className = "${compareType.compareType?cap_first}Event${dataType.dataType?cap_first}WindowManager">
-        <#assign windowName = "${compareType.compareType?cap_first}Event${dataType.dataType?cap_first}Window">
+        <#assign className = "${compareType.compareType?cap_first}${dataType.dataType?cap_first}WindowManager">
+        <#assign windowName = "${compareType.compareType?cap_first}${dataType.dataType?cap_first}Window">
         <@pp.changeOutputFile name="/org/apache/iotdb/db/mpp/execution/operator/window/${className}.java" />
 
 package org.apache.iotdb.db.mpp.execution.operator.window;
@@ -46,12 +46,12 @@ import org.apache.iotdb.tsfile.utils.Binary;
 /*
 * This class is generated using freemarker and the ${.template_name} template.
 */
-public class ${className} extends Event${dataType.dataType?cap_first}WindowManager {
+public class ${className} extends VariationWindowManager {
 
   public ${className}(
-      EventWindowParameter eventWindowParameter, boolean ascending) {
-    super(eventWindowParameter, ascending);
-    eventWindow = new ${windowName}(eventWindowParameter);
+      VariationWindowParameter variationWindowParameter, boolean ascending) {
+    super(ascending);
+    variationWindow = new ${windowName}(variationWindowParameter);
   }
 
   @Override
@@ -64,24 +64,24 @@ public class ${className} extends Event${dataType.dataType?cap_first}WindowManag
       return inputTsBlock;
     }
 
-    Column controlColumn = inputTsBlock.getColumn(eventWindowParameter.getControlColumnIndex());
+    Column controlColumn = variationWindow.getControlColumn(inputTsBlock);
     TimeColumn timeColumn = inputTsBlock.getTimeColumn();
     int i = 0, size = inputTsBlock.getPositionCount();
-    ${dataType.dataType} previousEventValue = ((${windowName}) eventWindow).getPreviousEventValue();
-    boolean previousEventValueIsNull = ((${windowName}) eventWindow).valueIsNull();
+    ${dataType.dataType} previousValue = ((${windowName}) variationWindow).getPreviousHeadValue();
+    boolean previousValueIsNull = ((${windowName}) variationWindow).valueIsNull();
     for (; i < size; i++) {
       // condition must be initialized when isNull is false
       boolean condition = false;
       boolean isNull = controlColumn.isNull(i);
       <#if compareType.compareType == "equal">
           <#if dataType.dataType == "Binary">
-      if(!isNull) condition = !controlColumn.get${dataType.dataType?cap_first}(i).equals(previousEventValue);
+      if(!isNull) condition = !controlColumn.get${dataType.dataType?cap_first}(i).equals(previousValue);
           <#else>
-      if(!isNull) condition = controlColumn.get${dataType.dataType?cap_first}(i) != previousEventValue;
+      if(!isNull) condition = controlColumn.get${dataType.dataType?cap_first}(i) != previousValue;
           </#if>
       <#else>
-      if(!isNull) condition = Math.abs(controlColumn.get${dataType.dataType?cap_first}(i) - previousEventValue)
-          > eventWindowParameter.getDelta();
+      if(!isNull) condition = Math.abs(controlColumn.get${dataType.dataType?cap_first}(i) - previousValue)
+          > variationWindow.getDelta();
       </#if>
       if(isIgnoringNull()){
         if (!isNull && condition) {
@@ -90,19 +90,19 @@ public class ${className} extends Event${dataType.dataType?cap_first}WindowManag
             continue;
         }
       }else{
-        if((isNull&&!previousEventValueIsNull)||!isNull&&previousEventValueIsNull||(!isNull&&condition)){
+        if((isNull&&!previousValueIsNull)||!isNull&&previousValueIsNull||(!isNull&&condition)){
             break;
         }
       }
 
       long currentTime = timeColumn.getLong(i);
       // judge whether we need update startTime
-      if (eventWindow.getStartTime() > currentTime) {
-        eventWindow.setStartTime(currentTime);
+      if (variationWindow.getStartTime() > currentTime) {
+        variationWindow.setStartTime(currentTime);
       }
       // judge whether we need update endTime
-      if (eventWindow.getEndTime() < currentTime) {
-        eventWindow.setEndTime(currentTime);
+      if (variationWindow.getEndTime() < currentTime) {
+        variationWindow.setEndTime(currentTime);
       }
     }
     // we can create a new window beginning at index i of inputTsBlock
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Accumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Accumulator.java
index bbf571d39b..a268c96fb6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Accumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Accumulator.java
@@ -18,26 +18,26 @@
  */
 package org.apache.iotdb.db.mpp.aggregation;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.BitMap;
 
 public interface Accumulator {
 
   /**
-   * Column should be like: | ControlColumn | Time | Value |
+   * Column should be like: |Time | Value |
    *
    * <p>IgnoringNull is required when considering the row where the value of controlColumn is null
    *
-   * <p>Return the last read row index of current timeColumn
+   * <p>bitMap is required for group-by framework. When needed(eq. controlColumn is null), bitMap
+   * can guide accumulator to skip some rows
+   *
+   * <p>lastIndex is required for group-by framework indicating the row to return to leave in
+   * advance for various reasons(eq. current row doesn't satisfy the window)
    */
-  default int addInput(Column[] column, IWindow window) {
-    return addInput(column, window, true);
-  }
-
-  int addInput(Column[] column, IWindow window, boolean ignoringNull);
+  void addInput(Column[] column, BitMap bitMap, int lastIndex);
 
   /**
    * For aggregation function like COUNT, SUM, partialResult should be single; But for AVG,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
index 2295636d0a..e359ce884a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
@@ -19,17 +19,15 @@
 
 package org.apache.iotdb.db.mpp.aggregation;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
-import org.apache.iotdb.db.mpp.execution.operator.window.TimeWindow;
 import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.BitMap;
 
 import java.util.Collections;
 import java.util.List;
@@ -45,8 +43,6 @@ public class Aggregator {
   protected List<InputLocation[]> inputLocationList;
   protected final AggregationStep step;
 
-  protected IWindow curWindow;
-
   protected final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
 
   // Used for SeriesAggregateScanOperator
@@ -66,27 +62,21 @@ public class Aggregator {
   }
 
   // Used for SeriesAggregateScanOperator and RawDataAggregateOperator
-  public int processTsBlock(TsBlock tsBlock, boolean ignoringNull) {
+  public void processTsBlock(TsBlock tsBlock, BitMap bitMap, int lastIndex) {
     long startTime = System.nanoTime();
     try {
       checkArgument(
           step.isInputRaw(),
           "Step in SeriesAggregateScanOperator and RawDataAggregateOperator can only process raw input");
-      int lastReadReadIndex = 0;
       for (InputLocation[] inputLocations : inputLocationList) {
         checkArgument(
             inputLocations[0].getTsBlockIndex() == 0,
             "RawDataAggregateOperator can only process one tsBlock input.");
-        Column[] controlTimeAndValueColumn = new Column[3];
-        controlTimeAndValueColumn[0] = curWindow.getControlColumn(tsBlock);
-        controlTimeAndValueColumn[1] = tsBlock.getTimeColumn();
-        controlTimeAndValueColumn[2] = tsBlock.getColumn(inputLocations[0].getValueColumnIndex());
-        lastReadReadIndex =
-            Math.max(
-                lastReadReadIndex,
-                accumulator.addInput(controlTimeAndValueColumn, curWindow, ignoringNull));
+        Column[] timeAndValueColumn = new Column[2];
+        timeAndValueColumn[0] = tsBlock.getTimeColumn();
+        timeAndValueColumn[1] = tsBlock.getColumn(inputLocations[0].getValueColumnIndex());
+        accumulator.addInput(timeAndValueColumn, bitMap, lastIndex);
       }
-      return lastReadReadIndex;
     } finally {
       QUERY_METRICS.recordExecutionCost(AGGREGATION_FROM_RAW_DATA, System.nanoTime() - startTime);
     }
@@ -149,21 +139,10 @@ public class Aggregator {
   }
 
   public void reset() {
-    curWindow = null;
     accumulator.reset();
   }
 
   public boolean hasFinalResult() {
-    return curWindow.hasFinalResult(accumulator);
-  }
-
-  public void updateTimeRange(TimeRange curTimeRange) {
-    reset();
-    this.curWindow = new TimeWindow(curTimeRange);
-  }
-
-  public void updateWindow(IWindow curWindow) {
-    reset();
-    this.curWindow = curWindow;
+    return accumulator.hasFinalResult();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java
index 89b946207e..eb3ba80523 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/AvgAccumulator.java
@@ -19,19 +19,19 @@
 
 package org.apache.iotdb.db.mpp.aggregation;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.BitMap;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
 public class AvgAccumulator implements Accumulator {
 
-  private TSDataType seriesDataType;
+  private final TSDataType seriesDataType;
   private long countValue;
   private double sumValue;
   private boolean initResult = false;
@@ -41,16 +41,20 @@ public class AvgAccumulator implements Accumulator {
   }
 
   @Override
-  public int addInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
+  public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
     switch (seriesDataType) {
       case INT32:
-        return addIntInput(column, curWindow, ignoringNull);
+        addIntInput(column, bitMap, lastIndex);
+        return;
       case INT64:
-        return addLongInput(column, curWindow, ignoringNull);
+        addLongInput(column, bitMap, lastIndex);
+        return;
       case FLOAT:
-        return addFloatInput(column, curWindow, ignoringNull);
+        addFloatInput(column, bitMap, lastIndex);
+        return;
       case DOUBLE:
-        return addDoubleInput(column, curWindow, ignoringNull);
+        addDoubleInput(column, bitMap, lastIndex);
+        return;
       case TEXT:
       case BOOLEAN:
       default:
@@ -146,87 +150,55 @@ public class AvgAccumulator implements Accumulator {
     return TSDataType.DOUBLE;
   }
 
-  private int addIntInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  private void addIntInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
+      if (!column[1].isNull(i)) {
         initResult = true;
         countValue++;
-        sumValue += column[2].getInt(i);
+        sumValue += column[1].getInt(i);
       }
     }
-    return curPositionCount;
   }
 
-  private int addLongInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  private void addLongInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
+      if (!column[1].isNull(i)) {
         initResult = true;
         countValue++;
-        sumValue += column[2].getLong(i);
+        sumValue += column[1].getLong(i);
       }
     }
-    return curPositionCount;
   }
 
-  private int addFloatInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  private void addFloatInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
+      if (!column[1].isNull(i)) {
         initResult = true;
         countValue++;
-        sumValue += column[2].getFloat(i);
+        sumValue += column[1].getFloat(i);
       }
     }
-    return curPositionCount;
   }
 
-  private int addDoubleInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  private void addDoubleInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
+      if (!column[1].isNull(i)) {
         initResult = true;
         countValue++;
-        sumValue += column[2].getDouble(i);
+        sumValue += column[1].getDouble(i);
       }
     }
-    return curPositionCount;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java
index 6f7f19c1fa..538c21d0fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java
@@ -19,11 +19,11 @@
 
 package org.apache.iotdb.db.mpp.aggregation;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.BitMap;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
@@ -33,31 +33,26 @@ public class CountAccumulator implements Accumulator {
 
   public CountAccumulator() {}
 
-  // Column should be like: | ControlColumn | Time | Value |
+  // Column should be like: | Time | Value |
 
   @Override
-  public int addInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
+  public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
     int curPositionCount = column[0].getPositionCount();
 
-    if (!column[2].mayHaveNull() && curWindow.contains(column[0])) {
+    if (!column[1].mayHaveNull()
+        && lastIndex == curPositionCount - 1
+        && ((bitMap == null) || bitMap.isAllMarked())) {
       countValue += curPositionCount;
     } else {
-      for (int i = 0; i < curPositionCount; i++) {
-        // skip null value in control column
-        if (ignoringNull && column[0].isNull(i)) {
+      for (int i = 0; i <= lastIndex; i++) {
+        if (bitMap != null && !bitMap.isMarked(i)) {
           continue;
         }
-        if (!curWindow.satisfy(column[0], i)) {
-          return i;
-        }
-        curWindow.mergeOnePoint(column, i);
-        if (!column[2].isNull(i)) {
+        if (!column[1].isNull(i)) {
           countValue++;
         }
       }
     }
-
-    return curPositionCount;
   }
 
   // partialResult should be like: | partialCountValue1 |
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountIfAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountIfAccumulator.java
index 8d3e2e03e8..cb33e44f50 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountIfAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountIfAccumulator.java
@@ -20,11 +20,11 @@
 package org.apache.iotdb.db.mpp.aggregation;
 
 import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory.KeepEvaluator;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.BitMap;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
@@ -47,23 +47,16 @@ public class CountIfAccumulator implements Accumulator {
     this.ignoreNull = ignoreNull;
   }
 
-  // Column should be like: | ControlColumn | Time | Value |
+  // Column should be like: | Time | Value |
   @Override
-  public int addInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-    for (int i = 0; i < curPositionCount; i++) {
+  public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
       // skip null value in control column
-      // the input parameter 'ignoringNull' effects on ControlColumn
-      if (ignoringNull && column[0].isNull(i)) {
+      // the input parameter 'bitMap' and 'lastIndex' effects on ControlColumn
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-
-      if (column[2].isNull(i)) {
-        // the member variable 'ignoreNull' effects on calculation of ValueColumn
+      if (column[1].isNull(i)) {
         if (!this.ignoreNull) {
           // data point segment was over, judge whether to count
           if (lastPointIsSatisfy && keepEvaluator.apply(keep)) {
@@ -73,7 +66,7 @@ public class CountIfAccumulator implements Accumulator {
           lastPointIsSatisfy = false;
         }
       } else {
-        if (column[2].getBoolean(i)) {
+        if (column[1].getBoolean(i)) {
           keep++;
           lastPointIsSatisfy = true;
         } else {
@@ -86,8 +79,6 @@ public class CountIfAccumulator implements Accumulator {
         }
       }
     }
-
-    return curPositionCount;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java
index 44a2c90283..87643d97bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/ExtremeAccumulator.java
@@ -19,12 +19,12 @@
 
 package org.apache.iotdb.db.mpp.aggregation;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -32,7 +32,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 public class ExtremeAccumulator implements Accumulator {
 
   private final TSDataType seriesDataType;
-  private TsPrimitiveType extremeResult;
+  private final TsPrimitiveType extremeResult;
   private boolean initResult;
 
   public ExtremeAccumulator(TSDataType seriesDataType) {
@@ -41,16 +41,20 @@ public class ExtremeAccumulator implements Accumulator {
   }
 
   @Override
-  public int addInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
+  public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
     switch (seriesDataType) {
       case INT32:
-        return addIntInput(column, curWindow, ignoringNull);
+        addIntInput(column, bitMap, lastIndex);
+        return;
       case INT64:
-        return addLongInput(column, curWindow, ignoringNull);
+        addLongInput(column, bitMap, lastIndex);
+        return;
       case FLOAT:
-        return addFloatInput(column, curWindow, ignoringNull);
+        addFloatInput(column, bitMap, lastIndex);
+        return;
       case DOUBLE:
-        return addDoubleInput(column, curWindow, ignoringNull);
+        addDoubleInput(column, bitMap, lastIndex);
+        return;
       case TEXT:
       case BOOLEAN:
       default:
@@ -221,23 +225,15 @@ public class ExtremeAccumulator implements Accumulator {
     return extremeResult.getDataType();
   }
 
-  private int addIntInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  private void addIntInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateIntResult(column[2].getInt(i));
+      if (!column[1].isNull(i)) {
+        updateIntResult(column[1].getInt(i));
       }
     }
-    return curPositionCount;
   }
 
   private void updateIntResult(int extVal) {
@@ -253,23 +249,15 @@ public class ExtremeAccumulator implements Accumulator {
     }
   }
 
-  private int addLongInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  private void addLongInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateLongResult(column[2].getLong(i));
+      if (!column[1].isNull(i)) {
+        updateLongResult(column[1].getLong(i));
       }
     }
-    return curPositionCount;
   }
 
   private void updateLongResult(long extVal) {
@@ -285,23 +273,15 @@ public class ExtremeAccumulator implements Accumulator {
     }
   }
 
-  private int addFloatInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  private void addFloatInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateFloatResult(column[2].getFloat(i));
+      if (!column[1].isNull(i)) {
+        updateFloatResult(column[1].getFloat(i));
       }
     }
-    return curPositionCount;
   }
 
   private void updateFloatResult(float extVal) {
@@ -317,23 +297,15 @@ public class ExtremeAccumulator implements Accumulator {
     }
   }
 
-  private int addDoubleInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  private void addDoubleInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateDoubleResult(column[2].getDouble(i));
+      if (!column[1].isNull(i)) {
+        updateDoubleResult(column[1].getDouble(i));
       }
     }
-    return curPositionCount;
   }
 
   private void updateDoubleResult(double extVal) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
index 86d2b34f5e..5aff5e4e7d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
@@ -19,13 +19,13 @@
 
 package org.apache.iotdb.db.mpp.aggregation;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -42,22 +42,28 @@ public class FirstValueAccumulator implements Accumulator {
     firstValue = TsPrimitiveType.getByType(seriesDataType);
   }
 
-  // Column should be like: | ControlColumn | Time | Value |
+  // Column should be like: | Time | Value |
   @Override
-  public int addInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
+  public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
     switch (seriesDataType) {
       case INT32:
-        return addIntInput(column, curWindow, ignoringNull);
+        addIntInput(column, bitMap, lastIndex);
+        return;
       case INT64:
-        return addLongInput(column, curWindow, ignoringNull);
+        addLongInput(column, bitMap, lastIndex);
+        return;
       case FLOAT:
-        return addFloatInput(column, curWindow, ignoringNull);
+        addFloatInput(column, bitMap, lastIndex);
+        return;
       case DOUBLE:
-        return addDoubleInput(column, curWindow, ignoringNull);
+        addDoubleInput(column, bitMap, lastIndex);
+        return;
       case TEXT:
-        return addBinaryInput(column, curWindow, ignoringNull);
+        addBinaryInput(column, bitMap, lastIndex);
+        return;
       case BOOLEAN:
-        return addBooleanInput(column, curWindow, ignoringNull);
+        addBooleanInput(column, bitMap, lastIndex);
+        return;
       default:
         throw new UnSupportedDataTypeException(
             String.format("Unsupported data type in FirstValue: %s", seriesDataType));
@@ -246,25 +252,16 @@ public class FirstValueAccumulator implements Accumulator {
     return firstValue.getDataType();
   }
 
-  protected int addIntInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  protected void addIntInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateIntFirstValue(column[2].getInt(i), column[1].getLong(i));
-        return i + 1;
+      if (!column[1].isNull(i)) {
+        updateIntFirstValue(column[1].getInt(i), column[0].getLong(i));
+        return;
       }
     }
-
-    return curPositionCount;
   }
 
   protected void updateIntFirstValue(int value, long curTime) {
@@ -275,25 +272,16 @@ public class FirstValueAccumulator implements Accumulator {
     }
   }
 
-  protected int addLongInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  protected void addLongInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateLongFirstValue(column[2].getLong(i), column[1].getLong(i));
-        return i + 1;
+      if (!column[1].isNull(i)) {
+        updateLongFirstValue(column[1].getLong(i), column[0].getLong(i));
+        return;
       }
     }
-
-    return curPositionCount;
   }
 
   protected void updateLongFirstValue(long value, long curTime) {
@@ -304,25 +292,16 @@ public class FirstValueAccumulator implements Accumulator {
     }
   }
 
-  protected int addFloatInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  protected void addFloatInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateFloatFirstValue(column[2].getFloat(i), column[1].getLong(i));
-        return i + 1;
+      if (!column[1].isNull(i)) {
+        updateFloatFirstValue(column[1].getFloat(i), column[0].getLong(i));
+        return;
       }
     }
-
-    return curPositionCount;
   }
 
   protected void updateFloatFirstValue(float value, long curTime) {
@@ -333,25 +312,16 @@ public class FirstValueAccumulator implements Accumulator {
     }
   }
 
-  protected int addDoubleInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  protected void addDoubleInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateDoubleFirstValue(column[2].getDouble(i), column[1].getLong(i));
-        return i + 1;
+      if (!column[1].isNull(i)) {
+        updateDoubleFirstValue(column[1].getDouble(i), column[0].getLong(i));
+        return;
       }
     }
-
-    return curPositionCount;
   }
 
   protected void updateDoubleFirstValue(double value, long curTime) {
@@ -362,25 +332,16 @@ public class FirstValueAccumulator implements Accumulator {
     }
   }
 
-  protected int addBooleanInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  protected void addBooleanInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateBooleanFirstValue(column[2].getBoolean(i), column[1].getLong(i));
-        return i + 1;
+      if (!column[1].isNull(i)) {
+        updateBooleanFirstValue(column[1].getBoolean(i), column[0].getLong(i));
+        return;
       }
     }
-
-    return curPositionCount;
   }
 
   protected void updateBooleanFirstValue(boolean value, long curTime) {
@@ -391,25 +352,16 @@ public class FirstValueAccumulator implements Accumulator {
     }
   }
 
-  protected int addBinaryInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  protected void addBinaryInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateBinaryFirstValue(column[2].getBinary(i), column[1].getLong(i));
-        return i + 1;
+      if (!column[1].isNull(i)) {
+        updateBinaryFirstValue(column[1].getBinary(i), column[0].getLong(i));
+        return;
       }
     }
-
-    return curPositionCount;
   }
 
   protected void updateBinaryFirstValue(Binary value, long curTime) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueDescAccumulator.java
index 23c40d8de3..f885a3e7fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueDescAccumulator.java
@@ -19,9 +19,9 @@
 
 package org.apache.iotdb.db.mpp.aggregation;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.BitMap;
 
 public class FirstValueDescAccumulator extends FirstValueAccumulator {
 
@@ -36,122 +36,74 @@ public class FirstValueDescAccumulator extends FirstValueAccumulator {
 
   // Don't break in advance
   @Override
-  protected int addIntInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  protected void addIntInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateIntFirstValue(column[2].getInt(i), column[1].getLong(i));
+      if (!column[1].isNull(i)) {
+        updateIntFirstValue(column[1].getInt(i), column[0].getLong(i));
       }
     }
-    return curPositionCount;
   }
 
   @Override
-  protected int addLongInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  protected void addLongInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateLongFirstValue(column[2].getLong(i), column[1].getLong(i));
+      if (!column[1].isNull(i)) {
+        updateLongFirstValue(column[1].getLong(i), column[0].getLong(i));
       }
     }
-    return curPositionCount;
   }
 
   @Override
-  protected int addFloatInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  protected void addFloatInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateFloatFirstValue(column[2].getFloat(i), column[1].getLong(i));
+      if (!column[1].isNull(i)) {
+        updateFloatFirstValue(column[1].getFloat(i), column[0].getLong(i));
       }
     }
-    return curPositionCount;
   }
 
   @Override
-  protected int addDoubleInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  protected void addDoubleInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateDoubleFirstValue(column[2].getDouble(i), column[1].getLong(i));
+      if (!column[1].isNull(i)) {
+        updateDoubleFirstValue(column[1].getDouble(i), column[0].getLong(i));
       }
     }
-    return curPositionCount;
   }
 
   @Override
-  protected int addBooleanInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  protected void addBooleanInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateBooleanFirstValue(column[2].getBoolean(i), column[1].getLong(i));
+      if (!column[1].isNull(i)) {
+        updateBooleanFirstValue(column[1].getBoolean(i), column[0].getLong(i));
       }
     }
-    return curPositionCount;
   }
 
   @Override
-  protected int addBinaryInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  protected void addBinaryInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateBinaryFirstValue(column[2].getBinary(i), column[1].getLong(i));
+      if (!column[1].isNull(i)) {
+        updateBinaryFirstValue(column[1].getBinary(i), column[0].getLong(i));
       }
     }
-    return curPositionCount;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
index 8441ed197f..d2f71a370a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
@@ -19,13 +19,13 @@
 
 package org.apache.iotdb.db.mpp.aggregation;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -42,22 +42,28 @@ public class LastValueAccumulator implements Accumulator {
     lastValue = TsPrimitiveType.getByType(seriesDataType);
   }
 
-  // Column should be like: | ControlColumn | Time | Value |
+  // Column should be like: | Time | Value |
   @Override
-  public int addInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
+  public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
     switch (seriesDataType) {
       case INT32:
-        return addIntInput(column, curWindow, ignoringNull);
+        addIntInput(column, bitMap, lastIndex);
+        return;
       case INT64:
-        return addLongInput(column, curWindow, ignoringNull);
+        addLongInput(column, bitMap, lastIndex);
+        return;
       case FLOAT:
-        return addFloatInput(column, curWindow, ignoringNull);
+        addFloatInput(column, bitMap, lastIndex);
+        return;
       case DOUBLE:
-        return addDoubleInput(column, curWindow, ignoringNull);
+        addDoubleInput(column, bitMap, lastIndex);
+        return;
       case TEXT:
-        return addBinaryInput(column, curWindow, ignoringNull);
+        addBinaryInput(column, bitMap, lastIndex);
+        return;
       case BOOLEAN:
-        return addBooleanInput(column, curWindow, ignoringNull);
+        addBooleanInput(column, bitMap, lastIndex);
+        return;
       default:
         throw new UnSupportedDataTypeException(
             String.format("Unsupported data type in LastValue: %s", seriesDataType));
@@ -246,23 +252,15 @@ public class LastValueAccumulator implements Accumulator {
     return lastValue.getDataType();
   }
 
-  protected int addIntInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  protected void addIntInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateIntLastValue(column[2].getInt(i), column[1].getLong(i));
+      if (!column[1].isNull(i)) {
+        updateIntLastValue(column[1].getInt(i), column[0].getLong(i));
       }
     }
-    return curPositionCount;
   }
 
   protected void updateIntLastValue(int value, long curTime) {
@@ -273,23 +271,15 @@ public class LastValueAccumulator implements Accumulator {
     }
   }
 
-  protected int addLongInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  protected void addLongInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateLongLastValue(column[2].getLong(i), column[1].getLong(i));
+      if (!column[1].isNull(i)) {
+        updateLongLastValue(column[1].getLong(i), column[0].getLong(i));
       }
     }
-    return curPositionCount;
   }
 
   protected void updateLongLastValue(long value, long curTime) {
@@ -300,23 +290,15 @@ public class LastValueAccumulator implements Accumulator {
     }
   }
 
-  protected int addFloatInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  protected void addFloatInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateFloatLastValue(column[2].getFloat(i), column[1].getLong(i));
+      if (!column[1].isNull(i)) {
+        updateFloatLastValue(column[1].getFloat(i), column[0].getLong(i));
       }
     }
-    return curPositionCount;
   }
 
   protected void updateFloatLastValue(float value, long curTime) {
@@ -327,23 +309,15 @@ public class LastValueAccumulator implements Accumulator {
     }
   }
 
-  protected int addDoubleInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  protected void addDoubleInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateDoubleLastValue(column[2].getDouble(i), column[1].getLong(i));
+      if (!column[1].isNull(i)) {
+        updateDoubleLastValue(column[1].getDouble(i), column[0].getLong(i));
       }
     }
-    return curPositionCount;
   }
 
   protected void updateDoubleLastValue(double value, long curTime) {
@@ -354,23 +328,15 @@ public class LastValueAccumulator implements Accumulator {
     }
   }
 
-  protected int addBooleanInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  protected void addBooleanInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateBooleanLastValue(column[2].getBoolean(i), column[1].getLong(i));
+      if (!column[1].isNull(i)) {
+        updateBooleanLastValue(column[1].getBoolean(i), column[0].getLong(i));
       }
     }
-    return curPositionCount;
   }
 
   protected void updateBooleanLastValue(boolean value, long curTime) {
@@ -381,23 +347,15 @@ public class LastValueAccumulator implements Accumulator {
     }
   }
 
-  protected int addBinaryInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  protected void addBinaryInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateBinaryLastValue(column[2].getBinary(i), column[1].getLong(i));
+      if (!column[1].isNull(i)) {
+        updateBinaryLastValue(column[1].getBinary(i), column[0].getLong(i));
       }
     }
-    return curPositionCount;
   }
 
   protected void updateBinaryLastValue(Binary value, long curTime) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueDescAccumulator.java
index 92c984950c..15607bba15 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueDescAccumulator.java
@@ -19,9 +19,9 @@
 
 package org.apache.iotdb.db.mpp.aggregation;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.BitMap;
 
 public class LastValueDescAccumulator extends LastValueAccumulator {
 
@@ -40,134 +40,86 @@ public class LastValueDescAccumulator extends LastValueAccumulator {
   }
 
   @Override
-  protected int addIntInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
+  protected void addIntInput(Column[] column, BitMap needSkip, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
       // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+      if (needSkip != null && needSkip.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateIntLastValue(column[2].getInt(i), column[1].getLong(i));
-        return i;
+      if (!column[1].isNull(i)) {
+        updateIntLastValue(column[1].getInt(i), column[0].getLong(i));
+        return;
       }
     }
-
-    return curPositionCount;
   }
 
   @Override
-  protected int addLongInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
+  protected void addLongInput(Column[] column, BitMap needSkip, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
       // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+      if (needSkip != null && needSkip.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateLongLastValue(column[2].getLong(i), column[1].getLong(i));
-        return i;
+      if (!column[1].isNull(i)) {
+        updateLongLastValue(column[1].getLong(i), column[0].getLong(i));
+        return;
       }
     }
-
-    return curPositionCount;
   }
 
   @Override
-  protected int addFloatInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
+  protected void addFloatInput(Column[] column, BitMap needSkip, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
       // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+      if (needSkip != null && needSkip.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateFloatLastValue(column[2].getFloat(i), column[1].getLong(i));
-        return i;
+      if (!column[1].isNull(i)) {
+        updateFloatLastValue(column[1].getFloat(i), column[0].getLong(i));
+        return;
       }
     }
-
-    return curPositionCount;
   }
 
   @Override
-  protected int addDoubleInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
+  protected void addDoubleInput(Column[] column, BitMap needSkip, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
       // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+      if (needSkip != null && needSkip.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateDoubleLastValue(column[2].getDouble(i), column[1].getLong(i));
-        return i;
+      if (!column[1].isNull(i)) {
+        updateDoubleLastValue(column[1].getDouble(i), column[0].getLong(i));
+        return;
       }
     }
-
-    return curPositionCount;
   }
 
   @Override
-  protected int addBooleanInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
+  protected void addBooleanInput(Column[] column, BitMap needSkip, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
       // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+      if (needSkip != null && needSkip.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateBooleanLastValue(column[2].getBoolean(i), column[1].getLong(i));
-        return i;
+      if (!column[1].isNull(i)) {
+        updateBooleanLastValue(column[1].getBoolean(i), column[0].getLong(i));
+        return;
       }
     }
-
-    return curPositionCount;
   }
 
   @Override
-  protected int addBinaryInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
+  protected void addBinaryInput(Column[] column, BitMap needSkip, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
       // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+      if (needSkip != null && needSkip.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateBinaryLastValue(column[2].getBinary(i), column[1].getLong(i));
-        return i;
+      if (!column[1].isNull(i)) {
+        updateBinaryLastValue(column[1].getBinary(i), column[0].getLong(i));
+        return;
       }
     }
-
-    return curPositionCount;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java
index 63e5347759..c715285163 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeAccumulator.java
@@ -19,11 +19,11 @@
 
 package org.apache.iotdb.db.mpp.aggregation;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.BitMap;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
@@ -34,26 +34,18 @@ public class MaxTimeAccumulator implements Accumulator {
 
   public MaxTimeAccumulator() {}
 
-  // Column should be like: | ControlColumn | Time | Value |
+  // Column should be like: | Time | Value |
   // Value is used to judge isNull()
   @Override
-  public int addInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateMaxTime(column[1].getLong(i));
+      if (!column[1].isNull(i)) {
+        updateMaxTime(column[0].getLong(i));
       }
     }
-    return curPositionCount;
   }
 
   // partialResult should be like: | partialMaxTimeValue |
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeDescAccumulator.java
index 050be6461f..65c3707c27 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeDescAccumulator.java
@@ -19,33 +19,24 @@
 
 package org.apache.iotdb.db.mpp.aggregation;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.BitMap;
 
 public class MaxTimeDescAccumulator extends MaxTimeAccumulator {
 
-  // Column should be like: | ControlColumn | Time | Value |
+  // Column should be like: | Time | Value |
   // Value is used to judge isNull()
   @Override
-  public int addInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateMaxTime(column[1].getLong(i));
-        return i;
+      if (!column[1].isNull(i)) {
+        updateMaxTime(column[0].getLong(i));
+        return;
       }
     }
-
-    return curPositionCount;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java
index 45a21adbe8..5c960f3b30 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxValueAccumulator.java
@@ -19,20 +19,20 @@
 
 package org.apache.iotdb.db.mpp.aggregation;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
 public class MaxValueAccumulator implements Accumulator {
 
-  private TSDataType seriesDataType;
-  private TsPrimitiveType maxResult;
+  private final TSDataType seriesDataType;
+  private final TsPrimitiveType maxResult;
   private boolean initResult;
 
   public MaxValueAccumulator(TSDataType seriesDataType) {
@@ -40,18 +40,22 @@ public class MaxValueAccumulator implements Accumulator {
     this.maxResult = TsPrimitiveType.getByType(seriesDataType);
   }
 
-  // Column should be like: | ControlColumn | Time | Value |
+  // Column should be like: | Time | Value |
   @Override
-  public int addInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
+  public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
     switch (seriesDataType) {
       case INT32:
-        return addIntInput(column, curWindow, ignoringNull);
+        addIntInput(column, bitMap, lastIndex);
+        return;
       case INT64:
-        return addLongInput(column, curWindow, ignoringNull);
+        addLongInput(column, bitMap, lastIndex);
+        return;
       case FLOAT:
-        return addFloatInput(column, curWindow, ignoringNull);
+        addFloatInput(column, bitMap, lastIndex);
+        return;
       case DOUBLE:
-        return addDoubleInput(column, curWindow, ignoringNull);
+        addDoubleInput(column, bitMap, lastIndex);
+        return;
       case TEXT:
       case BOOLEAN:
       default:
@@ -219,23 +223,15 @@ public class MaxValueAccumulator implements Accumulator {
     return maxResult.getDataType();
   }
 
-  private int addIntInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  private void addIntInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateIntResult(column[2].getInt(i));
+      if (!column[1].isNull(i)) {
+        updateIntResult(column[1].getInt(i));
       }
     }
-    return curPositionCount;
   }
 
   private void updateIntResult(int maxVal) {
@@ -245,23 +241,15 @@ public class MaxValueAccumulator implements Accumulator {
     }
   }
 
-  private int addLongInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  private void addLongInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateLongResult(column[2].getLong(i));
+      if (!column[1].isNull(i)) {
+        updateLongResult(column[1].getLong(i));
       }
     }
-    return curPositionCount;
   }
 
   private void updateLongResult(long maxVal) {
@@ -271,23 +259,15 @@ public class MaxValueAccumulator implements Accumulator {
     }
   }
 
-  private int addFloatInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  private void addFloatInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateFloatResult(column[2].getFloat(i));
+      if (!column[1].isNull(i)) {
+        updateFloatResult(column[1].getFloat(i));
       }
     }
-    return curPositionCount;
   }
 
   private void updateFloatResult(float maxVal) {
@@ -297,23 +277,15 @@ public class MaxValueAccumulator implements Accumulator {
     }
   }
 
-  private int addDoubleInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  private void addDoubleInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateDoubleResult(column[2].getDouble(i));
+      if (!column[1].isNull(i)) {
+        updateDoubleResult(column[1].getDouble(i));
       }
     }
-    return curPositionCount;
   }
 
   private void updateDoubleResult(double maxVal) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java
index 9ab1c3d20b..4725265ec6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeAccumulator.java
@@ -19,11 +19,11 @@
 
 package org.apache.iotdb.db.mpp.aggregation;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.BitMap;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
@@ -34,28 +34,19 @@ public class MinTimeAccumulator implements Accumulator {
 
   public MinTimeAccumulator() {}
 
-  // Column should be like: | ControlColumn | Time | Value |
+  // Column should be like: | Time | Value |
   // Value is used to judge isNull()
   @Override
-  public int addInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateMinTime(column[1].getLong(i));
-        return i;
+      if (!column[1].isNull(i)) {
+        updateMinTime(column[0].getLong(i));
+        return;
       }
     }
-
-    return curPositionCount;
   }
 
   // partialResult should be like: | partialMinTimeValue |
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
index 9640d9332b..dab7c847b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinTimeDescAccumulator.java
@@ -19,29 +19,21 @@
 
 package org.apache.iotdb.db.mpp.aggregation;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.BitMap;
 
 public class MinTimeDescAccumulator extends MinTimeAccumulator {
 
   @Override
-  public int addInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateMinTime(column[1].getLong(i));
+      if (!column[1].isNull(i)) {
+        updateMinTime(column[0].getLong(i));
       }
     }
-    return curPositionCount;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java
index 493fe64ab7..fdc96e063c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MinValueAccumulator.java
@@ -19,20 +19,20 @@
 
 package org.apache.iotdb.db.mpp.aggregation;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
 public class MinValueAccumulator implements Accumulator {
 
-  private TSDataType seriesDataType;
-  private TsPrimitiveType minResult;
+  private final TSDataType seriesDataType;
+  private final TsPrimitiveType minResult;
   private boolean initResult = false;
 
   public MinValueAccumulator(TSDataType seriesDataType) {
@@ -40,18 +40,22 @@ public class MinValueAccumulator implements Accumulator {
     this.minResult = TsPrimitiveType.getByType(seriesDataType);
   }
 
-  // Column should be like: | ControlColumn | Time | Value |
+  // Column should be like: | Time | Value |
   @Override
-  public int addInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
+  public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
     switch (seriesDataType) {
       case INT32:
-        return addIntInput(column, curWindow, ignoringNull);
+        addIntInput(column, bitMap, lastIndex);
+        return;
       case INT64:
-        return addLongInput(column, curWindow, ignoringNull);
+        addLongInput(column, bitMap, lastIndex);
+        return;
       case FLOAT:
-        return addFloatInput(column, curWindow, ignoringNull);
+        addFloatInput(column, bitMap, lastIndex);
+        return;
       case DOUBLE:
-        return addDoubleInput(column, curWindow, ignoringNull);
+        addDoubleInput(column, bitMap, lastIndex);
+        return;
       case TEXT:
       case BOOLEAN:
       default:
@@ -219,23 +223,15 @@ public class MinValueAccumulator implements Accumulator {
     return minResult.getDataType();
   }
 
-  private int addIntInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  private void addIntInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateIntResult(column[2].getInt(i));
+      if (!column[1].isNull(i)) {
+        updateIntResult(column[1].getInt(i));
       }
     }
-    return curPositionCount;
   }
 
   private void updateIntResult(int minVal) {
@@ -245,23 +241,16 @@ public class MinValueAccumulator implements Accumulator {
     }
   }
 
-  private int addLongInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
+  private void addLongInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
       // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateLongResult(column[2].getLong(i));
+      if (!column[1].isNull(i)) {
+        updateLongResult(column[1].getLong(i));
       }
     }
-    return curPositionCount;
   }
 
   private void updateLongResult(long minVal) {
@@ -271,23 +260,15 @@ public class MinValueAccumulator implements Accumulator {
     }
   }
 
-  private int addFloatInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  private void addFloatInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateFloatResult(column[2].getFloat(i));
+      if (!column[1].isNull(i)) {
+        updateFloatResult(column[1].getFloat(i));
       }
     }
-    return curPositionCount;
   }
 
   private void updateFloatResult(float minVal) {
@@ -297,23 +278,15 @@ public class MinValueAccumulator implements Accumulator {
     }
   }
 
-  private int addDoubleInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  private void addDoubleInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
-        updateDoubleResult(column[2].getDouble(i));
+      if (!column[1].isNull(i)) {
+        updateDoubleResult(column[1].getDouble(i));
       }
     }
-    return curPositionCount;
   }
 
   private void updateDoubleResult(double minVal) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java
index 060cb8b4c9..3c0b75a22d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/SumAccumulator.java
@@ -19,19 +19,19 @@
 
 package org.apache.iotdb.db.mpp.aggregation;
 
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.BitMap;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
 public class SumAccumulator implements Accumulator {
 
-  private TSDataType seriesDataType;
+  private final TSDataType seriesDataType;
   private double sumValue = 0;
   private boolean initResult = false;
 
@@ -39,18 +39,22 @@ public class SumAccumulator implements Accumulator {
     this.seriesDataType = seriesDataType;
   }
 
-  // Column should be like: | ControlColumn | Time | Value |
+  // Column should be like: | Time | Value |
   @Override
-  public int addInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
+  public void addInput(Column[] column, BitMap bitMap, int lastIndex) {
     switch (seriesDataType) {
       case INT32:
-        return addIntInput(column, curWindow, ignoringNull);
+        addIntInput(column, bitMap, lastIndex);
+        return;
       case INT64:
-        return addLongInput(column, curWindow, ignoringNull);
+        addLongInput(column, bitMap, lastIndex);
+        return;
       case FLOAT:
-        return addFloatInput(column, curWindow, ignoringNull);
+        addFloatInput(column, bitMap, lastIndex);
+        return;
       case DOUBLE:
-        return addDoubleInput(column, curWindow, ignoringNull);
+        addDoubleInput(column, bitMap, lastIndex);
+        return;
       case TEXT:
       case BOOLEAN:
       default:
@@ -135,83 +139,51 @@ public class SumAccumulator implements Accumulator {
     return TSDataType.DOUBLE;
   }
 
-  private int addIntInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  private void addIntInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
+      if (!column[1].isNull(i)) {
         initResult = true;
-        sumValue += column[2].getInt(i);
+        sumValue += column[1].getInt(i);
       }
     }
-    return curPositionCount;
   }
 
-  private int addLongInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  private void addLongInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
+      if (!column[1].isNull(i)) {
         initResult = true;
-        sumValue += column[2].getLong(i);
+        sumValue += column[1].getLong(i);
       }
     }
-    return curPositionCount;
   }
 
-  private int addFloatInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  private void addFloatInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
+      if (!column[1].isNull(i)) {
         initResult = true;
-        sumValue += column[2].getFloat(i);
+        sumValue += column[1].getFloat(i);
       }
     }
-    return curPositionCount;
   }
 
-  private int addDoubleInput(Column[] column, IWindow curWindow, boolean ignoringNull) {
-    int curPositionCount = column[0].getPositionCount();
-
-    for (int i = 0; i < curPositionCount; i++) {
-      // skip null value in control column
-      if (ignoringNull && column[0].isNull(i)) {
+  private void addDoubleInput(Column[] column, BitMap bitMap, int lastIndex) {
+    for (int i = 0; i <= lastIndex; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
         continue;
       }
-      if (!curWindow.satisfy(column[0], i)) {
-        return i;
-      }
-      curWindow.mergeOnePoint(column, i);
-      if (!column[2].isNull(i)) {
+      if (!column[1].isNull(i)) {
         initResult = true;
-        sumValue += column[2].getDouble(i);
+        sumValue += column[1].getDouble(i);
       }
     }
-    return curPositionCount;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java
index 6eb6012b2f..9ce9f803c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java
@@ -53,8 +53,7 @@ public abstract class SlidingWindowAggregator extends Aggregator {
     this.deque = new LinkedList<>();
   }
 
-  @Override
-  public int processTsBlock(TsBlock tsBlock, boolean ignoringNull) {
+  public void processTsBlock(TsBlock tsBlock) {
     checkArgument(
         step.isInputPartial(),
         "Step in SlidingWindowAggregationOperator can only process partial result");
@@ -68,10 +67,8 @@ public abstract class SlidingWindowAggregator extends Aggregator {
       valueColumn[i] = tsBlock.getColumn(inputLocation.getValueColumnIndex());
     }
     processPartialResult(new PartialAggregationResult(timeColumn, valueColumn));
-    return 1;
   }
 
-  @Override
   public void updateTimeRange(TimeRange curTimeRange) {
     this.curTimeRange = curTimeRange;
     evictingExpiredValue();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
index 5458e92482..586ff48be3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.SingleTimeWindowIterator;
 import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.TimeRangeIteratorFactory;
+import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
+import org.apache.iotdb.db.mpp.execution.operator.window.TimeWindow;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
@@ -104,16 +106,26 @@ public class AggregationUtil {
         inputTsBlock = skipPointsOutOfTimeRange(inputTsBlock, curTimeRange, ascending);
       }
 
-      int lastReadRowIndex = 0;
+      // Get the row which need to be processed by aggregator
+      IWindow curWindow = new TimeWindow(curTimeRange);
+      TimeColumn timeColumn = inputTsBlock.getTimeColumn();
+      int lastIndexToProcess = 0;
+      for (int i = 0; i < inputTsBlock.getPositionCount(); i++) {
+        if (!curWindow.satisfy(timeColumn, i)) {
+          break;
+        }
+        lastIndexToProcess = i;
+      }
+
       for (Aggregator aggregator : aggregators) {
         // current agg method has been calculated
         if (aggregator.hasFinalResult()) {
           continue;
         }
 
-        lastReadRowIndex =
-            Math.max(lastReadRowIndex, aggregator.processTsBlock(inputTsBlock, true));
+        aggregator.processTsBlock(inputTsBlock, null, lastIndexToProcess);
       }
+      int lastReadRowIndex = lastIndexToProcess + 1;
       if (lastReadRowIndex >= inputTsBlock.getPositionCount()) {
         inputTsBlock = null;
       } else {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 62b5e7a53c..364086c4fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -117,7 +117,7 @@ public class AggregationOperator extends AbstractConsumeAllOperator {
 
         // clear previous aggregation result
         for (Aggregator aggregator : aggregators) {
-          aggregator.updateTimeRange(curTimeRange);
+          aggregator.reset();
         }
       }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index 33b9c8dd99..97e6f6d93c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
 import org.apache.iotdb.db.mpp.execution.operator.window.IWindowManager;
 import org.apache.iotdb.db.mpp.execution.operator.window.WindowParameter;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.BitMap;
 
 import java.util.List;
 
@@ -102,7 +104,7 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
       } else {
         // If there are no points belong to last time window, the last time window will not
         // initialize window and aggregators. Specially for time window.
-        if (windowManager.notInitedLastTimeWindow()) {
+        if (windowManager.notInitializedLastTimeWindow()) {
           initWindowAndAggregators();
         }
         // If the window is not initialized, it just returns to avoid invoking updateResultTsBlock()
@@ -148,25 +150,48 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
 
     if (windowManager.satisfiedCurWindow(inputTsBlock)) {
 
-      int lastReadRowIndex = 0;
+      // Get the indexes in tsBlock which needs to be processed by aggregator, and the last row
+      // needed to be processed.
+      int tsBlockSize = inputTsBlock.getPositionCount();
+      IWindow curWindow = windowManager.getCurWindow();
+
+      Column[] controlAndTimeColumn = new Column[2];
+      controlAndTimeColumn[0] = curWindow.getControlColumn(inputTsBlock);
+      controlAndTimeColumn[1] = inputTsBlock.getTimeColumn();
+
+      BitMap needProcess = new BitMap(tsBlockSize);
+      int lastIndexToProcess = -1;
+      boolean hasSkip = false;
+
+      for (int i = 0; i < tsBlockSize; i++) {
+        if (windowManager.isIgnoringNull() && controlAndTimeColumn[0].isNull(i)) {
+          lastIndexToProcess = i;
+          hasSkip = true;
+          continue;
+        }
+        if (!curWindow.satisfy(controlAndTimeColumn[0], i)) {
+          break;
+        }
+        needProcess.mark(i);
+        curWindow.mergeOnePoint(controlAndTimeColumn, i);
+        lastIndexToProcess = i;
+      }
+
+      // if no row needs to skip, just send a null parameter.
+      if (!hasSkip) needProcess = null;
+
       for (Aggregator aggregator : aggregators) {
         // Current agg method has been calculated
         if (aggregator.hasFinalResult()) {
           continue;
         }
 
-        lastReadRowIndex =
-            Math.max(
-                lastReadRowIndex,
-                aggregator.processTsBlock(inputTsBlock, windowManager.isIgnoringNull()));
+        aggregator.processTsBlock(inputTsBlock, needProcess, lastIndexToProcess);
       }
-
+      int lastReadRowIndex = lastIndexToProcess + 1;
       // If lastReadRowIndex is not zero, some of tsBlock is consumed and result is cached in
       // aggregators.
       if (lastReadRowIndex != 0) {
-        // todo update the keep value in group by series, it will be removed in the future
-        windowManager.setKeep(lastReadRowIndex);
-        windowManager.setLastTsBlockTime();
         hasCachedDataInAggregator = true;
       }
       if (lastReadRowIndex >= inputTsBlock.getPositionCount()) {
@@ -207,9 +232,8 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
 
   private void initWindowAndAggregators() {
     windowManager.initCurWindow();
-    IWindow curWindow = windowManager.getCurWindow();
     for (Aggregator aggregator : aggregators) {
-      aggregator.updateWindow(curWindow);
+      aggregator.reset();
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
index 95df36c2e7..3fb0dca85a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.aggregation.slidingwindow.SlidingWindowAggregator;
 import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
@@ -82,7 +83,7 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper
 
       // clear previous aggregation result
       for (Aggregator aggregator : aggregators) {
-        aggregator.updateTimeRange(curTimeRange);
+        ((SlidingWindowAggregator) aggregator).updateTimeRange(curTimeRange);
       }
     }
 
@@ -129,7 +130,7 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper
     }
 
     for (Aggregator aggregator : aggregators) {
-      aggregator.processTsBlock(inputTsBlock, true);
+      ((SlidingWindowAggregator) aggregator).processTsBlock(inputTsBlock);
     }
 
     inputTsBlock = inputTsBlock.skipFirst();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index ee7c0a0b64..afb2957f93 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -131,7 +131,7 @@ public abstract class AbstractSeriesAggregationScanOperator extends AbstractData
 
       // clear previous aggregation result
       for (Aggregator aggregator : aggregators) {
-        aggregator.updateTimeRange(curTimeRange);
+        aggregator.reset();
       }
 
       // calculate aggregation result on current time window
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/AbstractVariationWindow.java
similarity index 63%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindow.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/AbstractVariationWindow.java
index 63356361f4..5ce44233b7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/AbstractVariationWindow.java
@@ -23,25 +23,28 @@ import org.apache.iotdb.db.mpp.aggregation.Accumulator;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 
-public abstract class EventWindow implements IWindow {
+public abstract class AbstractVariationWindow implements IWindow {
 
-  protected EventWindowParameter eventWindowParameter;
+  private final double delta;
+  private final int controlColumnIndex;
+  private final boolean outputEndTime;
+  private final boolean ignoreNull;
 
   protected long startTime;
-
   protected long endTime;
-
-  protected boolean initializedEventValue;
-
+  protected boolean initializedHeadValue;
   protected boolean valueIsNull = false;
 
-  protected EventWindow(EventWindowParameter eventWindowParameter) {
-    this.eventWindowParameter = eventWindowParameter;
+  protected AbstractVariationWindow(VariationWindowParameter variationWindowParameter) {
+    this.controlColumnIndex = variationWindowParameter.getControlColumnIndex();
+    this.ignoreNull = variationWindowParameter.isIgnoringNull();
+    this.outputEndTime = variationWindowParameter.isNeedOutputEndTime();
+    this.delta = variationWindowParameter.getDelta();
   }
 
   @Override
   public Column getControlColumn(TsBlock tsBlock) {
-    return tsBlock.getColumn(eventWindowParameter.getControlColumnIndex());
+    return tsBlock.getColumn(controlColumnIndex);
   }
 
   @Override
@@ -49,13 +52,12 @@ public abstract class EventWindow implements IWindow {
     return accumulator.hasFinalResult();
   }
 
-  // TODO
   @Override
   public boolean contains(Column column) {
     return false;
   }
 
-  public abstract void updatePreviousEventValue();
+  public abstract void updatePreviousValue();
 
   public long getStartTime() {
     return startTime;
@@ -73,15 +75,23 @@ public abstract class EventWindow implements IWindow {
     this.endTime = endTime;
   }
 
-  public void setInitializedEventValue(boolean initializedEventValue) {
-    this.initializedEventValue = initializedEventValue;
+  public void setInitializedHeadValue(boolean initializedHeadValue) {
+    this.initializedHeadValue = initializedHeadValue;
+  }
+
+  public boolean ignoreNull() {
+    return ignoreNull;
   }
 
   public boolean valueIsNull() {
     return valueIsNull;
   }
 
-  public boolean ignoringNull() {
-    return eventWindowParameter.isIgnoringNull();
+  public boolean isOutputEndTime() {
+    return outputEndTime;
+  }
+
+  public double getDelta() {
+    return delta;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/ConditionWindow.java
similarity index 88%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindow.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/ConditionWindow.java
index b32238e196..5ed29a80d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/ConditionWindow.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.mpp.aggregation.Accumulator;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 
-public class SeriesWindow implements IWindow {
+public class ConditionWindow implements IWindow {
 
   private final int controlColumnIndex;
   private final boolean outputEndTime;
@@ -33,10 +33,10 @@ public class SeriesWindow implements IWindow {
   private long keep;
   private boolean timeInitialized;
 
-  public SeriesWindow(SeriesWindowParameter seriesWindowParameter) {
-    this.ignoringNull = seriesWindowParameter.isIgnoringNull();
-    this.controlColumnIndex = seriesWindowParameter.getControlColumnIndex();
-    this.outputEndTime = seriesWindowParameter.isNeedOutputEndTime();
+  public ConditionWindow(ConditionWindowParameter conditionWindowParameter) {
+    this.ignoringNull = conditionWindowParameter.isIgnoringNull();
+    this.controlColumnIndex = conditionWindowParameter.getControlColumnIndex();
+    this.outputEndTime = conditionWindowParameter.isNeedOutputEndTime();
   }
 
   @Override
@@ -54,6 +54,7 @@ public class SeriesWindow implements IWindow {
 
   @Override
   public void mergeOnePoint(Column[] controlTimeAndValueColumn, int index) {
+    keep++;
     long currentTime = controlTimeAndValueColumn[1].getLong(index);
     if (!timeInitialized) {
       startTime = currentTime;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/ConditionWindowManager.java
similarity index 68%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindowManager.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/ConditionWindowManager.java
index d8c9c128aa..f1551fedda 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindowManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/ConditionWindowManager.java
@@ -26,15 +26,13 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import java.util.List;
 
-public class SeriesWindowManager implements IWindowManager {
+public class ConditionWindowManager implements IWindowManager {
 
-  private final SeriesWindow seriesWindow;
+  private final ConditionWindow conditionWindow;
   private boolean initialized;
   private boolean needSkip;
 
@@ -47,13 +45,13 @@ public class SeriesWindowManager implements IWindowManager {
   private boolean isFirstSkip;
   private final KeepEvaluator keepEvaluator;
 
-  public SeriesWindowManager(SeriesWindowParameter seriesWindowParameter) {
-    this.seriesWindow = new SeriesWindow(seriesWindowParameter);
+  public ConditionWindowManager(ConditionWindowParameter conditionWindowParameter) {
+    this.conditionWindow = new ConditionWindow(conditionWindowParameter);
     // In group by condition, the first data point cannot be guaranteed to be true in controlColumn,
     // so there is going to be a skipPointsOutOfBounds() in the beginning.
     this.needSkip = true;
     this.keepEvaluator =
-        AccumulatorFactory.initKeepEvaluator(seriesWindowParameter.getKeepExpression());
+        AccumulatorFactory.initKeepEvaluator(conditionWindowParameter.getKeepExpression());
   }
 
   @Override
@@ -64,8 +62,8 @@ public class SeriesWindowManager implements IWindowManager {
   @Override
   public void initCurWindow() {
     this.initialized = true;
-    this.seriesWindow.setTimeInitialized(false);
-    this.seriesWindow.setKeep(0);
+    this.conditionWindow.setTimeInitialized(false);
+    this.conditionWindow.setKeep(0);
   }
 
   @Override
@@ -82,7 +80,7 @@ public class SeriesWindowManager implements IWindowManager {
 
   @Override
   public IWindow getCurWindow() {
-    return seriesWindow;
+    return conditionWindow;
   }
 
   @Override
@@ -95,7 +93,7 @@ public class SeriesWindowManager implements IWindowManager {
       return inputTsBlock;
     }
 
-    Column controlColumn = seriesWindow.getControlColumn(inputTsBlock);
+    Column controlColumn = conditionWindow.getControlColumn(inputTsBlock);
     TimeColumn timeColumn = inputTsBlock.getTimeColumn();
     int i = 0, size = inputTsBlock.getPositionCount();
     int k = 0;
@@ -117,11 +115,11 @@ public class SeriesWindowManager implements IWindowManager {
       if (isFirstSkip) {
         k++;
         long currentTime = timeColumn.getLong(i);
-        if (seriesWindow.getStartTime() > currentTime) {
-          seriesWindow.setStartTime(currentTime);
+        if (conditionWindow.getStartTime() > currentTime) {
+          conditionWindow.setStartTime(currentTime);
         }
-        if (seriesWindow.getEndTime() < currentTime) {
-          seriesWindow.setEndTime(currentTime);
+        if (conditionWindow.getEndTime() < currentTime) {
+          conditionWindow.setEndTime(currentTime);
         }
       }
     }
@@ -130,7 +128,7 @@ public class SeriesWindowManager implements IWindowManager {
     // not finish.
     if (isFirstSkip) {
       if (i != size) isFirstSkip = false;
-      seriesWindow.setKeep(seriesWindow.getKeep() + k);
+      conditionWindow.setKeep(conditionWindow.getKeep() + k);
       return inputTsBlock.subTsBlock(i);
     }
 
@@ -145,7 +143,7 @@ public class SeriesWindowManager implements IWindowManager {
   public TsBlockBuilder createResultTsBlockBuilder(List<Aggregator> aggregators) {
     List<TSDataType> dataTypes = getResultDataTypes(aggregators);
     // Judge whether we need output endTime column.
-    if (seriesWindow.isOutputEndTime()) {
+    if (conditionWindow.isOutputEndTime()) {
       dataTypes.add(0, TSDataType.INT64);
     }
     return new TsBlockBuilder(dataTypes);
@@ -154,29 +152,11 @@ public class SeriesWindowManager implements IWindowManager {
   @Override
   public void appendAggregationResult(
       TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
-    if (!keepEvaluator.apply(seriesWindow.getKeep())) {
-      for (Aggregator aggregator : aggregators) aggregator.reset();
+    if (!keepEvaluator.apply(conditionWindow.getKeep())) {
       return;
     }
-    // Use the start time of eventWindow as default output time.
-    TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
-    timeColumnBuilder.writeLong(seriesWindow.getStartTime());
-
-    ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
-    int columnIndex = 0;
-    if (seriesWindow.isOutputEndTime()) {
-      columnBuilders[0].writeLong(seriesWindow.getEndTime());
-      columnIndex = 1;
-    }
-    for (Aggregator aggregator : aggregators) {
-      ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length];
-      columnBuilder[0] = columnBuilders[columnIndex++];
-      if (columnBuilder.length > 1) {
-        columnBuilder[1] = columnBuilders[columnIndex++];
-      }
-      aggregator.outputResult(columnBuilder);
-    }
-    resultTsBlockBuilder.declarePosition();
+    long endTime = conditionWindow.isOutputEndTime() ? conditionWindow.getEndTime() : -1;
+    outputAggregators(aggregators, resultTsBlockBuilder, conditionWindow.getStartTime(), endTime);
   }
 
   @Override
@@ -186,11 +166,6 @@ public class SeriesWindowManager implements IWindowManager {
 
   @Override
   public boolean isIgnoringNull() {
-    return seriesWindow.ignoringNull();
-  }
-
-  @Override
-  public void setKeep(long keep) {
-    seriesWindow.setKeep(seriesWindow.getKeep() + keep);
+    return conditionWindow.ignoringNull();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindowParameter.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/ConditionWindowParameter.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindowParameter.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/ConditionWindowParameter.java
index cde53be91f..83a3149965 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SeriesWindowParameter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/ConditionWindowParameter.java
@@ -21,19 +21,19 @@ package org.apache.iotdb.db.mpp.execution.operator.window;
 
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 
-public class SeriesWindowParameter extends WindowParameter {
+public class ConditionWindowParameter extends WindowParameter {
 
   private final boolean ignoringNull;
   private final int controlColumnIndex;
   private final Expression keepExpression;
 
-  public SeriesWindowParameter(
+  public ConditionWindowParameter(
       boolean needOutputEndTime,
       boolean ignoringNull,
       int controlColumnIndex,
       Expression keepExpression) {
     super(needOutputEndTime);
-    this.windowType = WindowType.SERIES_WINDOW;
+    this.windowType = WindowType.CONDITION_WINDOW;
     this.ignoringNull = ignoringNull;
     this.controlColumnIndex = controlColumnIndex;
     this.keepExpression = keepExpression;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindow.java
index f6003c0120..ee39f197c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindow.java
@@ -46,7 +46,7 @@ public interface IWindow {
    * When we merge a point into window, at this time, we can use this method to change the status in
    * this window
    */
-  void mergeOnePoint(Column[] controlTimeAndValueColumn, int index);
+  void mergeOnePoint(Column[] timeAndValueColumn, int index);
 
   /**
    * Used to customize whether the window has final aggregation result
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindowManager.java
index 50c94da4fc..ef0f493a1c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindowManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindowManager.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -77,7 +79,7 @@ public interface IWindowManager {
    */
   default boolean satisfiedCurWindow(TsBlock inputTsBlock) {
     return true;
-  };
+  }
 
   /**
    * Used to determine whether there are extra points for the next window
@@ -87,12 +89,12 @@ public interface IWindowManager {
    */
   default boolean isTsBlockOutOfBound(TsBlock inputTsBlock) {
     return false;
-  };
+  }
 
   /**
    * According to the Aggregator list, we could obtain all the aggregation result column type list.
    *
-   * @param aggregators
+   * @param aggregators the list of aggregators
    * @return Aggregation result column type list.
    */
   default List<TSDataType> getResultDataTypes(List<Aggregator> aggregators) {
@@ -109,7 +111,7 @@ public interface IWindowManager {
    * <p>For the implementation, we should consider whether we need to add endTime column and event
    * column in the resultSet besides the aggregation columns.
    *
-   * @param aggregators
+   * @param aggregators the list of aggregators
    * @return TsBlockBuilder of resultSet
    */
   TsBlockBuilder createResultTsBlockBuilder(List<Aggregator> aggregators);
@@ -121,8 +123,8 @@ public interface IWindowManager {
    * whether we need to add endTime column and event column in the resultSet besides the aggregation
    * columns.
    *
-   * @param resultTsBlockBuilder
-   * @param aggregators
+   * @param resultTsBlockBuilder tsBlockBuilder for resultSet
+   * @param aggregators the list of aggregators
    */
   void appendAggregationResult(TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators);
 
@@ -132,9 +134,9 @@ public interface IWindowManager {
    *
    * @return whether the window is TimeWindow and the last TimeWindow has not been initialized
    */
-  default boolean notInitedLastTimeWindow() {
+  default boolean notInitializedLastTimeWindow() {
     return false;
-  };
+  }
 
   /**
    * When endTime is required in resultSet, operator should skip the points in last window directly
@@ -151,9 +153,33 @@ public interface IWindowManager {
    */
   boolean isIgnoringNull();
 
-  // TODO: "group by series" used for keep value temporarily, it will be removed in the future.
-  default void setKeep(long keep) {}
-
-  // TODO: "group by session" used for keeping lastTsBlockTime, it will be removed in the future.
-  default void setLastTsBlockTime() {}
+  /**
+   * output the result in aggregators to columnBuilders
+   *
+   * @param endTime if the window doesn't need to output endTime, just assign -1 to endTime.
+   */
+  default void outputAggregators(
+      List<Aggregator> aggregators,
+      TsBlockBuilder resultTsBlockBuilder,
+      long startTime,
+      long endTime) {
+    TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
+    timeColumnBuilder.writeLong(startTime);
+
+    ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
+    int columnIndex = 0;
+    if (endTime != -1) {
+      columnBuilders[0].writeLong(endTime);
+      columnIndex = 1;
+    }
+    for (Aggregator aggregator : aggregators) {
+      ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length];
+      columnBuilder[0] = columnBuilders[columnIndex++];
+      if (columnBuilder.length > 1) {
+        columnBuilder[1] = columnBuilders[columnIndex++];
+      }
+      aggregator.outputResult(columnBuilder);
+    }
+    resultTsBlockBuilder.declarePosition();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindow.java
index 68c4aa30af..79b22dd858 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindow.java
@@ -83,6 +83,7 @@ public class SessionWindow implements IWindow {
     }
     // update the last time of session window
     timeValue = ascending ? Math.max(timeValue, currentTime) : Math.min(timeValue, currentTime);
+    setLastTsBlockTime(timeValue);
   }
 
   @Override
@@ -147,10 +148,6 @@ public class SessionWindow implements IWindow {
     this.initializedTimeValue = initializedTimeValue;
   }
 
-  public long getLastTsBlockTime() {
-    return lastTsBlockTime;
-  }
-
   public void setLastTsBlockTime(long lastTsBlockTime) {
     this.lastTsBlockTime = lastTsBlockTime;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindowManager.java
index d8976445b4..d76496aa92 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindowManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/SessionWindowManager.java
@@ -23,9 +23,7 @@ import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import java.util.List;
 
@@ -138,30 +136,8 @@ public class SessionWindowManager implements IWindowManager {
   @Override
   public void appendAggregationResult(
       TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
-    // Use the start time of sessionWindow as default output time.
-    TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
-    timeColumnBuilder.writeLong(sessionWindow.getStartTime());
-
-    ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
-    int columnIndex = 0;
-    if (isNeedOutputEndTime) {
-      columnBuilders[0].writeLong(sessionWindow.getEndTime());
-      columnIndex = 1;
-    }
-    for (Aggregator aggregator : aggregators) {
-      ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length];
-      columnBuilder[0] = columnBuilders[columnIndex++];
-      if (columnBuilder.length > 1) {
-        columnBuilder[1] = columnBuilders[columnIndex++];
-      }
-      aggregator.outputResult(columnBuilder);
-    }
-    resultTsBlockBuilder.declarePosition();
-  }
-
-  @Override
-  public boolean notInitedLastTimeWindow() {
-    return false;
+    long endTime = isNeedOutputEndTime ? sessionWindow.getEndTime() : -1;
+    outputAggregators(aggregators, resultTsBlockBuilder, sessionWindow.getStartTime(), endTime);
   }
 
   @Override
@@ -173,9 +149,4 @@ public class SessionWindowManager implements IWindowManager {
   public boolean isIgnoringNull() {
     return false;
   }
-
-  @Override
-  public void setLastTsBlockTime() {
-    this.sessionWindow.setLastTsBlockTime(this.sessionWindow.getTimeValue());
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindowManager.java
index 8a0e2f3f2a..5321e7d14b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindowManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindowManager.java
@@ -26,8 +26,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockUtil;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import java.util.List;
 
@@ -150,28 +148,12 @@ public class TimeWindowManager implements IWindowManager {
   @Override
   public void appendAggregationResult(
       TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
-    TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
-    // Use start time of current time range as time column
-    timeColumnBuilder.writeLong(startTime);
-    ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
-    int columnIndex = 0;
-    if (this.needOutputEndTime) {
-      columnBuilders[0].writeLong(endTime);
-      columnIndex = 1;
-    }
-    for (Aggregator aggregator : aggregators) {
-      ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length];
-      columnBuilder[0] = columnBuilders[columnIndex++];
-      if (columnBuilder.length > 1) {
-        columnBuilder[1] = columnBuilders[columnIndex++];
-      }
-      aggregator.outputResult(columnBuilder);
-    }
-    resultTsBlockBuilder.declarePosition();
+    long endTime = this.needOutputEndTime ? this.endTime : -1;
+    outputAggregators(aggregators, resultTsBlockBuilder, this.startTime, endTime);
   }
 
   @Override
-  public boolean notInitedLastTimeWindow() {
+  public boolean notInitializedLastTimeWindow() {
     return !this.initialized;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationWindowManager.java
similarity index 58%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindowManager.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationWindowManager.java
index ca1acf0ba7..b4d162b68a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindowManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationWindowManager.java
@@ -22,12 +22,10 @@ package org.apache.iotdb.db.mpp.execution.operator.window;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import java.util.List;
 
-public abstract class EventWindowManager implements IWindowManager {
+public abstract class VariationWindowManager implements IWindowManager {
 
   protected boolean initialized;
 
@@ -35,12 +33,9 @@ public abstract class EventWindowManager implements IWindowManager {
 
   protected boolean needSkip;
 
-  protected EventWindowParameter eventWindowParameter;
+  protected AbstractVariationWindow variationWindow;
 
-  protected EventWindow eventWindow;
-
-  protected EventWindowManager(EventWindowParameter eventWindowParameter, boolean ascending) {
-    this.eventWindowParameter = eventWindowParameter;
+  protected VariationWindowManager(boolean ascending) {
     this.initialized = false;
     this.ascending = ascending;
     // At beginning, we do not need to skip inputTsBlock
@@ -48,7 +43,7 @@ public abstract class EventWindowManager implements IWindowManager {
   }
 
   public boolean isIgnoringNull() {
-    return eventWindowParameter.isIgnoringNull();
+    return variationWindow.ignoreNull();
   }
 
   @Override
@@ -59,7 +54,7 @@ public abstract class EventWindowManager implements IWindowManager {
   @Override
   public void initCurWindow() {
     this.initialized = true;
-    this.eventWindow.setInitializedEventValue(false);
+    this.variationWindow.setInitializedHeadValue(false);
   }
 
   @Override
@@ -73,50 +68,33 @@ public abstract class EventWindowManager implements IWindowManager {
     // belong to previous window have been consumed. If not, we need skip these points.
     this.needSkip = true;
     this.initialized = false;
-    this.eventWindow.updatePreviousEventValue();
+    this.variationWindow.updatePreviousValue();
   }
 
   @Override
   public IWindow getCurWindow() {
-    return eventWindow;
+    return variationWindow;
   }
 
   @Override
   public TsBlockBuilder createResultTsBlockBuilder(List<Aggregator> aggregators) {
     List<TSDataType> dataTypes = getResultDataTypes(aggregators);
     // Judge whether we need output endTime column.
-    if (eventWindowParameter.isNeedOutputEndTime()) {
+    if (variationWindow.isOutputEndTime()) {
       dataTypes.add(0, TSDataType.INT64);
     }
     return new TsBlockBuilder(dataTypes);
   }
 
-  protected ColumnBuilder[] appendOriginAggregationResult(
+  public void appendAggregationResult(
       TsBlockBuilder resultTsBlockBuilder, List<Aggregator> aggregators) {
-    // Use the start time of eventWindow as default output time.
-    TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
-    timeColumnBuilder.writeLong(eventWindow.getStartTime());
-
-    ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
-    int columnIndex = 0;
-    if (eventWindowParameter.isNeedOutputEndTime()) {
-      columnBuilders[0].writeLong(eventWindow.getEndTime());
-      columnIndex = 1;
-    }
-    for (Aggregator aggregator : aggregators) {
-      ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length];
-      columnBuilder[0] = columnBuilders[columnIndex++];
-      if (columnBuilder.length > 1) {
-        columnBuilder[1] = columnBuilders[columnIndex++];
-      }
-      aggregator.outputResult(columnBuilder);
-    }
-    resultTsBlockBuilder.declarePosition();
-    return columnBuilders;
+
+    long endTime = variationWindow.isOutputEndTime() ? variationWindow.getEndTime() : -1;
+    outputAggregators(aggregators, resultTsBlockBuilder, variationWindow.getStartTime(), endTime);
   }
 
   @Override
   public boolean needSkipInAdvance() {
-    return eventWindowParameter.isNeedOutputEndTime();
+    return variationWindow.isOutputEndTime();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindowParameter.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationWindowParameter.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindowParameter.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationWindowParameter.java
index 2bfcb7d4d9..7fdcb0b621 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/EventWindowParameter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/VariationWindowParameter.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.mpp.execution.operator.window;
 
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
-public class EventWindowParameter extends WindowParameter {
+public class VariationWindowParameter extends WindowParameter {
 
   private final boolean ignoringNull;
   private final double delta;
@@ -29,7 +29,7 @@ public class EventWindowParameter extends WindowParameter {
 
   private final int controlColumnIndex;
 
-  public EventWindowParameter(
+  public VariationWindowParameter(
       TSDataType dataType,
       int controlColumnIndex,
       boolean needOutputEndTime,
@@ -40,7 +40,7 @@ public class EventWindowParameter extends WindowParameter {
     this.dataType = dataType;
     this.ignoringNull = ignoringNull;
     this.delta = delta;
-    this.windowType = WindowType.EVENT_WINDOW;
+    this.windowType = WindowType.VARIATION_WINDOW;
   }
 
   public boolean isIgnoringNull() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowManagerFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowManagerFactory.java
index eaeba4d246..8e5df9a2d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowManagerFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowManagerFactory.java
@@ -29,12 +29,12 @@ public class WindowManagerFactory {
     switch (windowParameter.getWindowType()) {
       case TIME_WINDOW:
         return new TimeWindowManager(timeRangeIterator, (TimeWindowParameter) windowParameter);
-      case EVENT_WINDOW:
-        return ((EventWindowParameter) windowParameter).getDelta() == 0
-            ? genEqualEventWindowManager((EventWindowParameter) windowParameter, ascending)
-            : genVariationEventWindowManager((EventWindowParameter) windowParameter, ascending);
-      case SERIES_WINDOW:
-        return new SeriesWindowManager((SeriesWindowParameter) windowParameter);
+      case VARIATION_WINDOW:
+        return ((VariationWindowParameter) windowParameter).getDelta() == 0
+            ? genEqualEventWindowManager((VariationWindowParameter) windowParameter, ascending)
+            : genVariationEventWindowManager((VariationWindowParameter) windowParameter, ascending);
+      case CONDITION_WINDOW:
+        return new ConditionWindowManager((ConditionWindowParameter) windowParameter);
       case SESSION_WINDOW:
         return new SessionWindowManager(
             windowParameter.isNeedOutputEndTime(),
@@ -47,21 +47,21 @@ public class WindowManagerFactory {
     }
   }
 
-  private static EventWindowManager genEqualEventWindowManager(
-      EventWindowParameter eventWindowParameter, boolean ascending) {
+  private static VariationWindowManager genEqualEventWindowManager(
+      VariationWindowParameter eventWindowParameter, boolean ascending) {
     switch (eventWindowParameter.getDataType()) {
       case INT32:
-        return new EqualEventIntWindowManager(eventWindowParameter, ascending);
+        return new EqualIntWindowManager(eventWindowParameter, ascending);
       case INT64:
-        return new EqualEventLongWindowManager(eventWindowParameter, ascending);
+        return new EqualLongWindowManager(eventWindowParameter, ascending);
       case FLOAT:
-        return new EqualEventFloatWindowManager(eventWindowParameter, ascending);
+        return new EqualFloatWindowManager(eventWindowParameter, ascending);
       case DOUBLE:
-        return new EqualEventDoubleWindowManager(eventWindowParameter, ascending);
+        return new EqualDoubleWindowManager(eventWindowParameter, ascending);
       case TEXT:
-        return new EqualEventBinaryWindowManager(eventWindowParameter, ascending);
+        return new EqualBinaryWindowManager(eventWindowParameter, ascending);
       case BOOLEAN:
-        return new EqualEventBooleanWindowManager(eventWindowParameter, ascending);
+        return new EqualBooleanWindowManager(eventWindowParameter, ascending);
       default:
         throw new UnSupportedDataTypeException(
             String.format(
@@ -70,17 +70,17 @@ public class WindowManagerFactory {
     }
   }
 
-  private static EventWindowManager genVariationEventWindowManager(
-      EventWindowParameter eventWindowParameter, boolean ascending) {
+  private static VariationWindowManager genVariationEventWindowManager(
+      VariationWindowParameter eventWindowParameter, boolean ascending) {
     switch (eventWindowParameter.getDataType()) {
       case INT32:
-        return new VariationEventIntWindowManager(eventWindowParameter, ascending);
+        return new VariationIntWindowManager(eventWindowParameter, ascending);
       case INT64:
-        return new VariationEventLongWindowManager(eventWindowParameter, ascending);
+        return new VariationLongWindowManager(eventWindowParameter, ascending);
       case FLOAT:
-        return new VariationEventFloatWindowManager(eventWindowParameter, ascending);
+        return new VariationFloatWindowManager(eventWindowParameter, ascending);
       case DOUBLE:
-        return new VariationEventDoubleWindowManager(eventWindowParameter, ascending);
+        return new VariationDoubleWindowManager(eventWindowParameter, ascending);
       default:
         throw new UnSupportedDataTypeException(
             String.format(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java
index e2ff0f8241..acc12791da 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/WindowType.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.db.mpp.execution.operator.window;
 
 public enum WindowType {
   TIME_WINDOW((byte) 0),
-  EVENT_WINDOW((byte) 1),
-  SERIES_WINDOW((byte) 2),
+  VARIATION_WINDOW((byte) 1),
+  CONDITION_WINDOW((byte) 2),
   SESSION_WINDOW((byte) 3);
 
   private final byte type;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index ce1488490b..a0c4650db9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -67,8 +67,8 @@ import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.mpp.plan.expression.multi.FunctionExpression;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByConditionParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByParameter;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupBySeriesParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupBySessionParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByVariationParameter;
@@ -79,7 +79,7 @@ import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
 import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.mpp.plan.statement.component.FillComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByComponent;
-import org.apache.iotdb.db.mpp.plan.statement.component.GroupBySeriesComponent;
+import org.apache.iotdb.db.mpp.plan.statement.component.GroupByConditionComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupBySessionComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByVariationComponent;
@@ -1189,7 +1189,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
       }
     }
 
-    if (windowType == WindowType.EVENT_WINDOW) {
+    if (windowType == WindowType.VARIATION_WINDOW) {
       double delta = ((GroupByVariationComponent) groupByComponent).getDelta();
       for (Expression expression : deviceToGroupByExpression.values()) {
         checkGroupByVariationExpressionType(analysis, expression, delta);
@@ -1198,13 +1198,14 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
           new GroupByVariationParameter(groupByComponent.isIgnoringNull(), delta);
       analysis.setGroupByParameter(groupByParameter);
       analysis.setDeviceToGroupByExpression(deviceToGroupByExpression);
-    } else if (windowType == WindowType.SERIES_WINDOW) {
-      Expression keepExpression = ((GroupBySeriesComponent) groupByComponent).getKeepExpression();
+    } else if (windowType == WindowType.CONDITION_WINDOW) {
+      Expression keepExpression =
+          ((GroupByConditionComponent) groupByComponent).getKeepExpression();
       for (Expression expression : deviceToGroupByExpression.values()) {
-        checkGroupBySeriesExpressionType(analysis, expression, keepExpression);
+        checkGroupByConditionExpressionType(analysis, expression, keepExpression);
       }
       GroupByParameter groupByParameter =
-          new GroupBySeriesParameter(groupByComponent.isIgnoringNull(), keepExpression);
+          new GroupByConditionParameter(groupByComponent.isIgnoringNull(), keepExpression);
       analysis.setGroupByParameter(groupByParameter);
       analysis.setDeviceToGroupByExpression(deviceToGroupByExpression);
     } else if (windowType == WindowType.SESSION_WINDOW) {
@@ -1244,18 +1245,19 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
       groupByExpression = expressions.get(0);
     }
 
-    if (windowType == WindowType.EVENT_WINDOW) {
+    if (windowType == WindowType.VARIATION_WINDOW) {
       double delta = ((GroupByVariationComponent) groupByComponent).getDelta();
       checkGroupByVariationExpressionType(analysis, groupByExpression, delta);
       GroupByParameter groupByParameter =
           new GroupByVariationParameter(groupByComponent.isIgnoringNull(), delta);
       analysis.setGroupByExpression(groupByExpression);
       analysis.setGroupByParameter(groupByParameter);
-    } else if (windowType == WindowType.SERIES_WINDOW) {
-      Expression keepExpression = ((GroupBySeriesComponent) groupByComponent).getKeepExpression();
-      checkGroupBySeriesExpressionType(analysis, groupByExpression, keepExpression);
+    } else if (windowType == WindowType.CONDITION_WINDOW) {
+      Expression keepExpression =
+          ((GroupByConditionComponent) groupByComponent).getKeepExpression();
+      checkGroupByConditionExpressionType(analysis, groupByExpression, keepExpression);
       GroupByParameter groupByParameter =
-          new GroupBySeriesParameter(groupByComponent.isIgnoringNull(), keepExpression);
+          new GroupByConditionParameter(groupByComponent.isIgnoringNull(), keepExpression);
       analysis.setGroupByExpression(groupByExpression);
       analysis.setGroupByParameter(groupByParameter);
     } else if (windowType == WindowType.SESSION_WINDOW) {
@@ -1277,7 +1279,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
     }
   }
 
-  private void checkGroupBySeriesExpressionType(
+  private void checkGroupByConditionExpressionType(
       Analysis analysis, Expression groupByExpression, Expression keepExpression) {
     TSDataType type = analyzeExpression(analysis, groupByExpression);
     if (type != TSDataType.BOOLEAN) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index dcb9d0d173..c5ac196776 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -71,8 +71,8 @@ import org.apache.iotdb.db.mpp.plan.statement.component.FillComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.FillPolicy;
 import org.apache.iotdb.db.mpp.plan.statement.component.FromComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByComponent;
+import org.apache.iotdb.db.mpp.plan.statement.component.GroupByConditionComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByLevelComponent;
-import org.apache.iotdb.db.mpp.plan.statement.component.GroupBySeriesComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupBySessionComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTagComponent;
 import org.apache.iotdb.db.mpp.plan.statement.component.GroupByTimeComponent;
@@ -965,7 +965,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
 
           groupByKeys.add("COMMON");
           queryStatement.setGroupByComponent(
-              parseGroupByClause(groupByAttribute, WindowType.EVENT_WINDOW));
+              parseGroupByClause(groupByAttribute, WindowType.VARIATION_WINDOW));
         } else if (groupByAttribute.CONDITION() != null) {
           if (groupByKeys.contains("COMMON")) {
             throw new SemanticException(GROUP_BY_COMMON_ONLY_ONE_MSG);
@@ -973,7 +973,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
 
           groupByKeys.add("COMMON");
           queryStatement.setGroupByComponent(
-              parseGroupByClause(groupByAttribute, WindowType.SERIES_WINDOW));
+              parseGroupByClause(groupByAttribute, WindowType.CONDITION_WINDOW));
         } else if (groupByAttribute.SESSION() != null) {
           if (groupByKeys.contains("COMMON")) {
             throw new SemanticException(GROUP_BY_COMMON_ONLY_ONE_MSG);
@@ -1209,7 +1209,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
       }
     }
     List<ExpressionContext> expressions = ctx.expression();
-    if (windowType == WindowType.EVENT_WINDOW) {
+    if (windowType == WindowType.VARIATION_WINDOW) {
       ExpressionContext expressionContext = expressions.get(0);
       GroupByVariationComponent groupByVariationComponent = new GroupByVariationComponent();
       groupByVariationComponent.setControlColumnExpression(
@@ -1218,16 +1218,16 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
           ctx.delta == null ? 0 : Double.parseDouble(ctx.delta.getText()));
       groupByVariationComponent.setIgnoringNull(ignoringNull);
       return groupByVariationComponent;
-    } else if (windowType == WindowType.SERIES_WINDOW) {
+    } else if (windowType == WindowType.CONDITION_WINDOW) {
       ExpressionContext conditionExpressionContext = expressions.get(0);
-      GroupBySeriesComponent groupBySeriesComponent = new GroupBySeriesComponent();
-      groupBySeriesComponent.setControlColumnExpression(
+      GroupByConditionComponent groupByConditionComponent = new GroupByConditionComponent();
+      groupByConditionComponent.setControlColumnExpression(
           parseExpression(conditionExpressionContext, true));
       if (expressions.size() == 2) {
-        groupBySeriesComponent.setKeepExpression(parseExpression(expressions.get(1), true));
+        groupByConditionComponent.setKeepExpression(parseExpression(expressions.get(1), true));
       }
-      groupBySeriesComponent.setIgnoringNull(ignoringNull);
-      return groupBySeriesComponent;
+      groupByConditionComponent.setIgnoringNull(ignoringNull);
+      return groupByConditionComponent;
     } else if (windowType == WindowType.SESSION_WINDOW) {
       long interval = DateTimeUtils.convertDurationStrToLong(ctx.timeInterval.getText());
       return new GroupBySessionComponent(interval);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index efbb423fbf..06031866fd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -123,10 +123,10 @@ import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.execution.operator.source.ShowQueriesOperator;
-import org.apache.iotdb.db.mpp.execution.operator.window.EventWindowParameter;
-import org.apache.iotdb.db.mpp.execution.operator.window.SeriesWindowParameter;
+import org.apache.iotdb.db.mpp.execution.operator.window.ConditionWindowParameter;
 import org.apache.iotdb.db.mpp.execution.operator.window.SessionWindowParameter;
 import org.apache.iotdb.db.mpp.execution.operator.window.TimeWindowParameter;
+import org.apache.iotdb.db.mpp.execution.operator.window.VariationWindowParameter;
 import org.apache.iotdb.db.mpp.execution.operator.window.WindowParameter;
 import org.apache.iotdb.db.mpp.execution.operator.window.WindowType;
 import org.apache.iotdb.db.mpp.plan.Coordinator;
@@ -189,8 +189,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.FillDescriptor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByConditionParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByParameter;
-import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupBySeriesParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupBySessionParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByVariationParameter;
@@ -773,6 +773,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
         operatorContext, node.getDevices(), children, deviceColumnIndex, outputColumnTypes);
   }
 
+  @Deprecated
   @Override
   public Operator visitDeviceMerge(DeviceMergeNode node, LocalExecutionPlanContext context) {
     OperatorContext operatorContext =
@@ -1464,27 +1465,35 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
 
         WindowParameter windowParameter;
         switch (windowType) {
-          case EVENT_WINDOW:
-            String controlColumn = node.getGroupByExpression().getExpressionString();
+          case VARIATION_WINDOW:
+            Expression groupByVariationExpression = node.getGroupByExpression();
+            if (groupByVariationExpression == null) {
+              throw new IllegalArgumentException("groupByVariationExpression can't be null");
+            }
+            String controlColumn = groupByVariationExpression.getExpressionString();
             TSDataType controlColumnType = context.getTypeProvider().getType(controlColumn);
             windowParameter =
-                new EventWindowParameter(
+                new VariationWindowParameter(
                     controlColumnType,
                     layout.get(controlColumn).get(0).getValueColumnIndex(),
                     node.isOutputEndTime(),
                     ((GroupByVariationParameter) groupByParameter).isIgnoringNull(),
                     ((GroupByVariationParameter) groupByParameter).getDelta());
             break;
-          case SERIES_WINDOW:
+          case CONDITION_WINDOW:
+            Expression groupByConditionExpression = node.getGroupByExpression();
+            if (groupByConditionExpression == null) {
+              throw new IllegalArgumentException("groupByConditionExpression can't be null");
+            }
             windowParameter =
-                new SeriesWindowParameter(
+                new ConditionWindowParameter(
                     node.isOutputEndTime(),
-                    ((GroupBySeriesParameter) groupByParameter).isIgnoringNull(),
+                    ((GroupByConditionParameter) groupByParameter).isIgnoringNull(),
                     layout
-                        .get(node.getGroupByExpression().getExpressionString())
+                        .get(groupByConditionExpression.getExpressionString())
                         .get(0)
                         .getValueColumnIndex(),
-                    ((GroupBySeriesParameter) groupByParameter).getKeepExpression());
+                    ((GroupByConditionParameter) groupByParameter).getKeepExpression());
             break;
           case SESSION_WINDOW:
             windowParameter =
@@ -1746,6 +1755,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
     }
   }
 
+  @Deprecated
   @Override
   public Operator visitTimeJoin(TimeJoinNode node, LocalExecutionPlanContext context) {
     List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, context);
@@ -2420,7 +2430,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
           LocalExecutionPlanContext subContext = context.createSubContext();
           subContext.setDegreeOfParallelism(1);
           // Create partial parent operator for children
-          PlanNode partialParentNode = null;
+          PlanNode partialParentNode;
           if (endIndex - startIndex == 1) {
             partialParentNode = node.getChildren().get(startIndex);
           } else {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupBySeriesParameter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByConditionParameter.java
similarity index 83%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupBySeriesParameter.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByConditionParameter.java
index f0ed735779..5d5bc09d15 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupBySeriesParameter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByConditionParameter.java
@@ -28,13 +28,13 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
-public class GroupBySeriesParameter extends GroupByParameter {
+public class GroupByConditionParameter extends GroupByParameter {
 
   private final Expression keepExpression;
-  private boolean ignoringNull;
+  private final boolean ignoringNull;
 
-  public GroupBySeriesParameter(boolean ignoringNull, Expression keepExpression) {
-    super(WindowType.SERIES_WINDOW);
+  public GroupByConditionParameter(boolean ignoringNull, Expression keepExpression) {
+    super(WindowType.CONDITION_WINDOW);
     this.keepExpression = keepExpression;
     this.ignoringNull = ignoringNull;
   }
@@ -54,7 +54,7 @@ public class GroupBySeriesParameter extends GroupByParameter {
   public static GroupByParameter deserialize(ByteBuffer buffer) {
     boolean ignoringNull = ReadWriteIOUtils.readBool(buffer);
     Expression keepExpression = Expression.deserialize(buffer);
-    return new GroupBySeriesParameter(ignoringNull, keepExpression);
+    return new GroupByConditionParameter(ignoringNull, keepExpression);
   }
 
   public Expression getKeepExpression() {
@@ -76,8 +76,8 @@ public class GroupBySeriesParameter extends GroupByParameter {
     if (!super.equals(obj)) {
       return false;
     }
-    return this.keepExpression == ((GroupBySeriesParameter) obj).getKeepExpression()
-        && this.ignoringNull == ((GroupBySeriesParameter) obj).ignoringNull;
+    return this.keepExpression == ((GroupByConditionParameter) obj).getKeepExpression()
+        && this.ignoringNull == ((GroupByConditionParameter) obj).ignoringNull;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByParameter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByParameter.java
index 3380ea6a0d..586188fc4a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByParameter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByParameter.java
@@ -71,10 +71,10 @@ public abstract class GroupByParameter {
 
   public static GroupByParameter deserialize(ByteBuffer byteBuffer) {
     byte type = ReadWriteIOUtils.readByte(byteBuffer);
-    if (type == WindowType.EVENT_WINDOW.getType()) {
+    if (type == WindowType.VARIATION_WINDOW.getType()) {
       return GroupByVariationParameter.deserialize(byteBuffer);
-    } else if (type == WindowType.SERIES_WINDOW.getType()) {
-      return GroupBySeriesParameter.deserialize(byteBuffer);
+    } else if (type == WindowType.CONDITION_WINDOW.getType()) {
+      return GroupByConditionParameter.deserialize(byteBuffer);
     } else if (type == WindowType.SESSION_WINDOW.getType()) {
       return GroupBySessionParameter.deserialize(byteBuffer);
     } else throw new SemanticException("Unsupported window type");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByVariationParameter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByVariationParameter.java
index 6a397e5e33..7e30251f74 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByVariationParameter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/parameter/GroupByVariationParameter.java
@@ -34,7 +34,7 @@ public class GroupByVariationParameter extends GroupByParameter {
   boolean ignoringNull;
 
   public GroupByVariationParameter(boolean ignoringNull, double delta) {
-    super(WindowType.EVENT_WINDOW);
+    super(WindowType.VARIATION_WINDOW);
     this.delta = delta;
     this.ignoringNull = ignoringNull;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupBySeriesComponent.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByConditionComponent.java
similarity index 89%
rename from server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupBySeriesComponent.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByConditionComponent.java
index 31f44f4389..86589b2ccb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupBySeriesComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByConditionComponent.java
@@ -22,13 +22,13 @@ package org.apache.iotdb.db.mpp.plan.statement.component;
 import org.apache.iotdb.db.mpp.execution.operator.window.WindowType;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
 
-public class GroupBySeriesComponent extends GroupByComponent {
+public class GroupByConditionComponent extends GroupByComponent {
 
   // use to filter continuous value (keep>=/>/=/</<=threshold)
   private Expression keepExpression;
 
-  public GroupBySeriesComponent() {
-    super(WindowType.SERIES_WINDOW);
+  public GroupByConditionComponent() {
+    super(WindowType.CONDITION_WINDOW);
   }
 
   public void setKeepExpression(Expression keepExpression) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByVariationComponent.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByVariationComponent.java
index 90c7b3d606..001607635a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByVariationComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/GroupByVariationComponent.java
@@ -25,7 +25,7 @@ public class GroupByVariationComponent extends GroupByComponent {
   private double delta = 0;
 
   public GroupByVariationComponent() {
-    super(WindowType.EVENT_WINDOW);
+    super(WindowType.VARIATION_WINDOW);
   }
 
   public double getDelta() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
index 480ab2a026..810586dc29 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
@@ -294,15 +294,17 @@ public class QueryStatement extends Statement {
   }
 
   private boolean isGroupByVariation() {
-    return groupByComponent != null && groupByComponent.getWindowType() == WindowType.EVENT_WINDOW;
+    return groupByComponent != null
+        && groupByComponent.getWindowType() == WindowType.VARIATION_WINDOW;
   }
 
-  private boolean isGroupBySeries() {
-    return groupByComponent != null && groupByComponent.getWindowType() == WindowType.SERIES_WINDOW;
+  private boolean isGroupByCondition() {
+    return groupByComponent != null
+        && groupByComponent.getWindowType() == WindowType.CONDITION_WINDOW;
   }
 
   public boolean hasGroupByExpression() {
-    return isGroupByVariation() || isGroupBySeries();
+    return isGroupByVariation() || isGroupByCondition();
   }
 
   public boolean isAlignByTime() {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/AccumulatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/AccumulatorTest.java
index 8e79b3fb3c..5d0729fe5b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/AccumulatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/aggregation/AccumulatorTest.java
@@ -20,11 +20,8 @@
 package org.apache.iotdb.db.mpp.aggregation;
 
 import org.apache.iotdb.common.rpc.thrift.TAggregationType;
-import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
-import org.apache.iotdb.db.mpp.execution.operator.window.TimeWindow;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -45,9 +42,6 @@ public class AccumulatorTest {
 
   private TsBlock rawData;
   private Statistics statistics;
-  private TimeRange defaultTimeRange = new TimeRange(0, Long.MAX_VALUE);
-
-  private TimeWindow timeWindow = new TimeWindow(defaultTimeRange);
 
   @Before
   public void setUp() {
@@ -71,11 +65,10 @@ public class AccumulatorTest {
     statistics.update(100L, 100d);
   }
 
-  public Column[] getControlTimeAndValueColumn(IWindow curWindow, int columnIndex) {
-    Column[] columns = new Column[3];
-    columns[0] = curWindow.getControlColumn(rawData);
-    columns[1] = rawData.getTimeColumn();
-    columns[2] = rawData.getColumn(columnIndex);
+  public Column[] getTimeAndValueColumn(int columnIndex) {
+    Column[] columns = new Column[2];
+    columns[0] = rawData.getTimeColumn();
+    columns[1] = rawData.getColumn(columnIndex);
     return columns;
   }
 
@@ -102,8 +95,8 @@ public class AccumulatorTest {
     avgAccumulator.outputFinal(finalResult);
     Assert.assertTrue(finalResult.build().isNull(0));
 
-    Column[] controlTimeAndValueColumn = getControlTimeAndValueColumn(timeWindow, 0);
-    avgAccumulator.addInput(controlTimeAndValueColumn, timeWindow);
+    Column[] timeAndValueColumn = getTimeAndValueColumn(0);
+    avgAccumulator.addInput(timeAndValueColumn, null, rawData.getPositionCount() - 1);
     Assert.assertFalse(avgAccumulator.hasFinalResult());
     intermediateResult[0] = new LongColumnBuilder(null, 1);
     intermediateResult[1] = new DoubleColumnBuilder(null, 1);
@@ -145,8 +138,8 @@ public class AccumulatorTest {
     countAccumulator.outputFinal(finalResult);
     Assert.assertEquals(0, finalResult.build().getLong(0));
 
-    Column[] controlTimeAndValueColumn = getControlTimeAndValueColumn(timeWindow, 0);
-    countAccumulator.addInput(controlTimeAndValueColumn, timeWindow);
+    Column[] timeAndValueColumn = getTimeAndValueColumn(0);
+    countAccumulator.addInput(timeAndValueColumn, null, rawData.getPositionCount() - 1);
     Assert.assertFalse(countAccumulator.hasFinalResult());
     intermediateResult[0] = new LongColumnBuilder(null, 1);
     countAccumulator.outputIntermediate(intermediateResult);
@@ -185,8 +178,8 @@ public class AccumulatorTest {
     extremeAccumulator.outputFinal(finalResult);
     Assert.assertTrue(finalResult.build().isNull(0));
 
-    Column[] controlTimeAndValueColumn = getControlTimeAndValueColumn(timeWindow, 0);
-    extremeAccumulator.addInput(controlTimeAndValueColumn, timeWindow);
+    Column[] timeAndValueColumn = getTimeAndValueColumn(0);
+    extremeAccumulator.addInput(timeAndValueColumn, null, rawData.getPositionCount() - 1);
     Assert.assertFalse(extremeAccumulator.hasFinalResult());
     intermediateResult[0] = new DoubleColumnBuilder(null, 1);
     extremeAccumulator.outputIntermediate(intermediateResult);
@@ -228,8 +221,8 @@ public class AccumulatorTest {
     firstValueAccumulator.outputFinal(finalResult);
     Assert.assertTrue(finalResult.build().isNull(0));
 
-    Column[] controlTimeAndValueColumn = getControlTimeAndValueColumn(timeWindow, 0);
-    firstValueAccumulator.addInput(controlTimeAndValueColumn, timeWindow);
+    Column[] timeAndValueColumn = getTimeAndValueColumn(0);
+    firstValueAccumulator.addInput(timeAndValueColumn, null, rawData.getPositionCount() - 1);
     Assert.assertTrue(firstValueAccumulator.hasFinalResult());
     intermediateResult[0] = new DoubleColumnBuilder(null, 1);
     intermediateResult[1] = new LongColumnBuilder(null, 1);
@@ -274,8 +267,8 @@ public class AccumulatorTest {
     lastValueAccumulator.outputFinal(finalResult);
     Assert.assertTrue(finalResult.build().isNull(0));
 
-    Column[] controlTimeAndValueColumn = getControlTimeAndValueColumn(timeWindow, 0);
-    lastValueAccumulator.addInput(controlTimeAndValueColumn, timeWindow);
+    Column[] timeAndValueColumn = getTimeAndValueColumn(0);
+    lastValueAccumulator.addInput(timeAndValueColumn, null, rawData.getPositionCount() - 1);
     intermediateResult[0] = new DoubleColumnBuilder(null, 1);
     intermediateResult[1] = new LongColumnBuilder(null, 1);
     lastValueAccumulator.outputIntermediate(intermediateResult);
@@ -316,8 +309,8 @@ public class AccumulatorTest {
     maxTimeAccumulator.outputFinal(finalResult);
     Assert.assertTrue(finalResult.build().isNull(0));
 
-    Column[] controlTimeAndValueColumn = getControlTimeAndValueColumn(timeWindow, 0);
-    maxTimeAccumulator.addInput(controlTimeAndValueColumn, timeWindow);
+    Column[] timeAndValueColumn = getTimeAndValueColumn(0);
+    maxTimeAccumulator.addInput(timeAndValueColumn, null, rawData.getPositionCount() - 1);
     Assert.assertFalse(maxTimeAccumulator.hasFinalResult());
     intermediateResult[0] = new LongColumnBuilder(null, 1);
     maxTimeAccumulator.outputIntermediate(intermediateResult);
@@ -356,8 +349,8 @@ public class AccumulatorTest {
     minTimeAccumulator.outputFinal(finalResult);
     Assert.assertTrue(finalResult.build().isNull(0));
 
-    Column[] controlTimeAndValueColumn = getControlTimeAndValueColumn(timeWindow, 0);
-    minTimeAccumulator.addInput(controlTimeAndValueColumn, timeWindow);
+    Column[] timeAndValueColumn = getTimeAndValueColumn(0);
+    minTimeAccumulator.addInput(timeAndValueColumn, null, rawData.getPositionCount() - 1);
     Assert.assertTrue(minTimeAccumulator.hasFinalResult());
     intermediateResult[0] = new LongColumnBuilder(null, 1);
     minTimeAccumulator.outputIntermediate(intermediateResult);
@@ -396,8 +389,8 @@ public class AccumulatorTest {
     extremeAccumulator.outputFinal(finalResult);
     Assert.assertTrue(finalResult.build().isNull(0));
 
-    Column[] controlTimeAndValueColumn = getControlTimeAndValueColumn(timeWindow, 0);
-    extremeAccumulator.addInput(controlTimeAndValueColumn, timeWindow);
+    Column[] timeAndValueColumn = getTimeAndValueColumn(0);
+    extremeAccumulator.addInput(timeAndValueColumn, null, rawData.getPositionCount() - 1);
     Assert.assertFalse(extremeAccumulator.hasFinalResult());
     intermediateResult[0] = new DoubleColumnBuilder(null, 1);
     extremeAccumulator.outputIntermediate(intermediateResult);
@@ -436,8 +429,8 @@ public class AccumulatorTest {
     extremeAccumulator.outputFinal(finalResult);
     Assert.assertTrue(finalResult.build().isNull(0));
 
-    Column[] controlTimeAndValueColumn = getControlTimeAndValueColumn(timeWindow, 0);
-    extremeAccumulator.addInput(controlTimeAndValueColumn, timeWindow);
+    Column[] timeAndValueColumn = getTimeAndValueColumn(0);
+    extremeAccumulator.addInput(timeAndValueColumn, null, rawData.getPositionCount() - 1);
     Assert.assertFalse(extremeAccumulator.hasFinalResult());
     intermediateResult[0] = new DoubleColumnBuilder(null, 1);
     extremeAccumulator.outputIntermediate(intermediateResult);
@@ -476,8 +469,8 @@ public class AccumulatorTest {
     sumAccumulator.outputFinal(finalResult);
     Assert.assertTrue(finalResult.build().isNull(0));
 
-    Column[] controlTimeAndValueColumn = getControlTimeAndValueColumn(timeWindow, 0);
-    sumAccumulator.addInput(controlTimeAndValueColumn, timeWindow);
+    Column[] timeAndValueColumn = getTimeAndValueColumn(0);
+    sumAccumulator.addInput(timeAndValueColumn, null, rawData.getPositionCount() - 1);
     Assert.assertFalse(sumAccumulator.hasFinalResult());
     intermediateResult[0] = new DoubleColumnBuilder(null, 1);
     sumAccumulator.outputIntermediate(intermediateResult);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
index 76aa21237e..aa333914a9 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/RawDataAggregationOperatorTest.java
@@ -40,9 +40,9 @@ import org.apache.iotdb.db.mpp.execution.operator.process.join.RowBasedTimeJoinO
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
 import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
 import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
-import org.apache.iotdb.db.mpp.execution.operator.window.EventWindowParameter;
 import org.apache.iotdb.db.mpp.execution.operator.window.SessionWindowParameter;
 import org.apache.iotdb.db.mpp.execution.operator.window.TimeWindowParameter;
+import org.apache.iotdb.db.mpp.execution.operator.window.VariationWindowParameter;
 import org.apache.iotdb.db.mpp.execution.operator.window.WindowParameter;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
@@ -562,7 +562,7 @@ public class RawDataAggregationOperatorTest {
     }
 
     WindowParameter windowParameter =
-        new EventWindowParameter(TSDataType.INT32, 0, false, true, 10000);
+        new VariationWindowParameter(TSDataType.INT32, 0, false, true, 10000);
 
     RawDataAggregationOperator rawDataAggregationOperator =
         initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
@@ -626,7 +626,7 @@ public class RawDataAggregationOperatorTest {
     }
 
     WindowParameter windowParameter =
-        new EventWindowParameter(TSDataType.INT32, 0, true, true, 5000);
+        new VariationWindowParameter(TSDataType.INT32, 0, true, true, 5000);
 
     RawDataAggregationOperator rawDataAggregationOperator =
         initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
@@ -685,7 +685,7 @@ public class RawDataAggregationOperatorTest {
     }
 
     WindowParameter windowParameter =
-        new EventWindowParameter(TSDataType.INT32, 0, false, true, 5000);
+        new VariationWindowParameter(TSDataType.INT32, 0, false, true, 5000);
 
     RawDataAggregationOperator rawDataAggregationOperator =
         initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
@@ -734,7 +734,7 @@ public class RawDataAggregationOperatorTest {
     }
 
     WindowParameter windowParameter =
-        new EventWindowParameter(TSDataType.INT32, 0, true, true, 5000);
+        new VariationWindowParameter(TSDataType.INT32, 0, true, true, 5000);
 
     RawDataAggregationOperator rawDataAggregationOperator =
         initRawDataAggregationOperator(aggregationTypes, null, inputLocations, windowParameter);
@@ -758,14 +758,15 @@ public class RawDataAggregationOperatorTest {
 
   @Test
   public void onePointInOneEqualEventWindowTest() throws IllegalPathException {
-    WindowParameter windowParameter = new EventWindowParameter(TSDataType.INT32, 0, false, true, 0);
+    WindowParameter windowParameter =
+        new VariationWindowParameter(TSDataType.INT32, 0, false, true, 0);
     onePointInOneWindowTest(windowParameter);
   }
 
   @Test
   public void onePointInOneVariationEventWindowTest() throws IllegalPathException {
     WindowParameter windowParameter =
-        new EventWindowParameter(TSDataType.INT32, 0, false, true, 0.5);
+        new VariationWindowParameter(TSDataType.INT32, 0, false, true, 0.5);
     onePointInOneWindowTest(windowParameter);
   }