You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/04/20 02:54:02 UTC

[iotdb] branch master updated: [IOTDB-2844] Implementation of SeriesAggregateScanOperator and AggregateOperator - Part I (#5502)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e0ecf5354e [IOTDB-2844] Implementation of SeriesAggregateScanOperator and AggregateOperator - Part I (#5502)
e0ecf5354e is described below

commit e0ecf5354e099b214d2f2e863050b51e2c3f40cc
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Wed Apr 20 10:53:57 2022 +0800

    [IOTDB-2844] Implementation of SeriesAggregateScanOperator and AggregateOperator - Part I (#5502)
---
 .../apache/iotdb/db/mpp/execution/DataDriver.java  |   4 +
 .../iotdb/db/mpp/execution/DataDriverContext.java  |   1 +
 .../iotdb/db/mpp/execution/SchemaDriver.java       |   1 +
 .../db/mpp/execution/SchemaDriverContext.java      |   1 +
 .../source/SeriesAggregateScanOperator.java        | 420 ++++++++++++++++++++-
 .../db/mpp/operator/source/SeriesScanUtil.java     |  10 +-
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  |  28 +-
 .../iotdb/db/mpp/sql/planner/QueryPlanBuilder.java |   5 +
 .../db/mpp/sql/planner/plan/FragmentInstance.java  |   1 +
 .../plan/node/source/SeriesAggregateScanNode.java  |  45 ++-
 .../statement/component/GroupByTimeComponent.java  |  73 ++++
 .../query/dataset/groupby/GroupByFillDataSet.java  |   8 +-
 .../query/dataset/groupby/GroupByTimeDataSet.java  |  30 +-
 .../dataset/groupby/GroupByTimeEngineDataSet.java  |  24 +-
 .../groupby/GroupByWithValueFilterDataSet.java     |   6 +-
 .../groupby/GroupByWithoutValueFilterDataSet.java  |   9 +-
 .../iotdb/db/query/reader/series/SeriesReader.java |   3 +
 .../timerangeiterator/AggrWindowIterator.java      |  38 +-
 .../timerangeiterator/ITimeRangeIterator.java      |   8 +-
 .../timerangeiterator/PreAggrWindowIterator.java   |  38 +-
 .../PreAggrWindowWithNaturalMonthIterator.java     |  50 ++-
 .../SingleTimeWindowIterator.java                  |  65 ++++
 .../operator/SeriesAggregateScanOperatorTest.java  | 373 ++++++++++++++++++
 .../db/mpp/operator/SeriesScanOperatorTest.java    |   6 +-
 .../db/mpp/sql/plan/QueryLogicalPlanUtil.java      |   8 +
 .../source/SeriesAggregateScanNodeSerdeTest.java   |   7 +-
 .../dataset/groupby/GroupByTimeDataSetTest.java    |  74 ++--
 .../query/reader/series/SeriesReaderTestUtil.java  |   8 +
 .../iotdb/db/utils/TimeRangeIteratorTest.java      | 230 +++++++----
 .../iotdb/tsfile/read/common/block/TsBlock.java    |  22 +-
 .../tsfile/read/common/block/TsBlockBuilder.java   |   5 +
 .../common/block/column/BinaryColumnBuilder.java   |  11 +
 .../common/block/column/BooleanColumnBuilder.java  |  11 +
 .../read/common/block/column/ColumnBuilder.java    |   5 +
 .../common/block/column/DoubleColumnBuilder.java   |  11 +
 .../common/block/column/FloatColumnBuilder.java    |  11 +
 .../read/common/block/column/IntColumnBuilder.java |  11 +
 .../common/block/column/LongColumnBuilder.java     |  11 +
 .../read/common/block/column/TimeColumn.java       |   4 +
 .../common/block/column/TimeColumnBuilder.java     |  11 +
 40 files changed, 1473 insertions(+), 214 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
index 21aebe0214..5ae66b14d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriver.java
@@ -53,6 +53,10 @@ import java.util.stream.Collectors;
 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
 
+/**
+ * One dataDriver is responsible for one FragmentInstance which is for data query, which may
+ * contains several series.
+ */
 @NotThreadSafe
 public class DataDriver implements Driver {
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriverContext.java
index abd6ea4b90..dc08957a7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/DataDriverContext.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import java.util.List;
 
+/** TODO Add javadoc for context */
 public class DataDriverContext extends DriverContext {
   private final List<PartialPath> paths;
   private final Filter timeFilter;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
index 4020db4593..844fe417ba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriver.java
@@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 import static org.apache.iotdb.db.mpp.operator.Operator.NOT_BLOCKED;
 
+/** One SchemaDriver is used to execute one FragmentInstance which is for metadata query. */
 @NotThreadSafe
 public class SchemaDriver implements Driver {
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriverContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriverContext.java
index a859df2b77..8781fa3137 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriverContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/SchemaDriverContext.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution;
 
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 
+/** TODO Add javadoc for context */
 public class SchemaDriverContext extends DriverContext {
 
   private final ISchemaRegion schemaRegion;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
index c4b1015529..79586e3c4e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
@@ -19,18 +19,130 @@
 package org.apache.iotdb.db.mpp.operator.source;
 
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.statement.component.GroupByTimeComponent;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.factory.AggregateResultFactory;
+import org.apache.iotdb.db.utils.SchemaUtils;
+import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator;
+import org.apache.iotdb.db.utils.timerangeiterator.SingleTimeWindowIterator;
+import org.apache.iotdb.db.utils.timerangeiterator.TimeRangeIteratorFactory;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This operator is responsible to do the aggregation calculation for one series based on global
+ * time range and time split parameter.
+ *
+ * <p>Every time next() is invoked, one tsBlock which contains current time window will be returned.
+ * In sliding window situation, current time window is a pre-aggregation window. If there is no time
+ * split parameter, i.e. aggregation without groupBy, just one tsBlock will be returned.
+ */
 public class SeriesAggregateScanOperator implements DataSourceOperator {
+
+  private final OperatorContext operatorContext;
+  private final PlanNodeId sourceId;
+  private final SeriesScanUtil seriesScanUtil;
+  private final boolean ascending;
+  private List<AggregateResult> aggregateResultList;
+
+  private ITimeRangeIterator timeRangeIterator;
+  // current interval of aggregation window [curStartTime, curEndTime)
+  private TimeRange curTimeRange;
+
+  private TsBlockSingleColumnIterator preCachedData;
+  // used for resetting the preCachedData to the last read index
+  private int lastReadIndex;
+
+  private TsBlockBuilder tsBlockBuilder;
+  private TsBlock resultTsBlock;
+  private boolean hasCachedTsBlock = false;
+  private boolean finished = false;
+
+  public SeriesAggregateScanOperator(
+      PlanNodeId sourceId,
+      PartialPath seriesPath,
+      Set<String> allSensors,
+      OperatorContext context,
+      List<AggregationType> aggregateFuncList,
+      Filter timeFilter,
+      boolean ascending,
+      GroupByTimeComponent groupByTimeParameter) {
+    this.sourceId = sourceId;
+    this.operatorContext = context;
+    this.ascending = ascending;
+    this.seriesScanUtil =
+        new SeriesScanUtil(
+            seriesPath,
+            allSensors,
+            seriesPath.getSeriesType(),
+            context.getInstanceContext(),
+            timeFilter,
+            null,
+            ascending);
+    aggregateResultList = new ArrayList<>(aggregateFuncList.size());
+    for (AggregationType aggregationType : aggregateFuncList) {
+      aggregateResultList.add(
+          AggregateResultFactory.getAggrResultByType(
+              aggregationType,
+              seriesPath.getSeriesType(),
+              seriesScanUtil.getOrderUtils().getAscending()));
+    }
+    tsBlockBuilder =
+        new TsBlockBuilder(
+            aggregateFuncList.stream()
+                .map(
+                    functionType ->
+                        SchemaUtils.getSeriesTypeByPath(seriesPath, functionType.name()))
+                .collect(Collectors.toList()));
+    this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter);
+  }
+
+  /**
+   * If groupByTimeParameter is null, which means it's an aggregation query without down sampling.
+   * Aggregation query has only one time window and the result set of it does not contain a
+   * timestamp, so it doesn't matter what the time range returns.
+   */
+  public ITimeRangeIterator initTimeRangeIterator(GroupByTimeComponent groupByTimeParameter) {
+    if (groupByTimeParameter == null) {
+      return new SingleTimeWindowIterator(0, Long.MAX_VALUE);
+    } else {
+      return TimeRangeIteratorFactory.getTimeRangeIterator(
+          groupByTimeParameter.getStartTime(),
+          groupByTimeParameter.getEndTime(),
+          groupByTimeParameter.getInterval(),
+          groupByTimeParameter.getSlidingStep(),
+          ascending,
+          groupByTimeParameter.isIntervalByMonth(),
+          groupByTimeParameter.isSlidingStepByMonth(),
+          groupByTimeParameter.getInterval() > groupByTimeParameter.getSlidingStep());
+    }
+  }
+
   @Override
   public OperatorContext getOperatorContext() {
-    return null;
+    return operatorContext;
   }
 
+  // TODO
   @Override
   public ListenableFuture<Void> isBlocked() {
     return DataSourceOperator.super.isBlocked();
@@ -38,14 +150,97 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
 
   @Override
   public TsBlock next() {
+    if (hasCachedTsBlock || hasNext()) {
+      hasCachedTsBlock = false;
+      return resultTsBlock;
+    }
     return null;
   }
 
   @Override
   public boolean hasNext() {
-    return false;
+    if (hasCachedTsBlock) {
+      return true;
+    }
+    try {
+      if (!timeRangeIterator.hasNextTimeRange()) {
+        return false;
+      }
+      curTimeRange = timeRangeIterator.nextTimeRange();
+
+      // 1. Clear previous aggregation result
+      for (AggregateResult result : aggregateResultList) {
+        result.reset();
+      }
+
+      // 2. Calculate aggregation result based on current time window
+      if (calcFromCacheData(curTimeRange)) {
+        updateResultTsBlockUsingAggregateResult();
+        return true;
+      }
+
+      // read page data firstly
+      if (readAndCalcFromPage(curTimeRange)) {
+        updateResultTsBlockUsingAggregateResult();
+        return true;
+      }
+
+      // read chunk data secondly
+      if (readAndCalcFromChunk(curTimeRange)) {
+        updateResultTsBlockUsingAggregateResult();
+        return true;
+      }
+
+      // read from file first
+      while (seriesScanUtil.hasNextFile()) {
+        Statistics fileStatistics = seriesScanUtil.currentFileStatistics();
+        if (fileStatistics.getStartTime() >= curTimeRange.getMax()) {
+          if (ascending) {
+            updateResultTsBlockUsingAggregateResult();
+            return true;
+          } else {
+            seriesScanUtil.skipCurrentFile();
+            continue;
+          }
+        }
+        // calc from fileMetaData
+        if (canUseCurrentFileStatistics()
+            && curTimeRange.contains(fileStatistics.getStartTime(), fileStatistics.getEndTime())) {
+          calcFromStatistics(fileStatistics);
+          seriesScanUtil.skipCurrentFile();
+          continue;
+        }
+
+        // read chunk
+        if (readAndCalcFromChunk(curTimeRange)) {
+          updateResultTsBlockUsingAggregateResult();
+          return true;
+        }
+      }
+
+      updateResultTsBlockUsingAggregateResult();
+      return true;
+    } catch (IOException e) {
+      throw new RuntimeException("Error while scanning the file", e);
+    }
+  }
+
+  private void updateResultTsBlockUsingAggregateResult() {
+    // TODO AVG
+    tsBlockBuilder.reset();
+    TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
+    // Use start time of current time range as time column
+    timeColumnBuilder.writeLong(curTimeRange.getMin());
+    ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
+    for (int i = 0; i < aggregateResultList.size(); i++) {
+      columnBuilders[i].writeObject(aggregateResultList.get(i).getResult());
+    }
+    tsBlockBuilder.declarePosition();
+    resultTsBlock = tsBlockBuilder.build();
+    hasCachedTsBlock = true;
   }
 
+  // TODO Implement it later?
   @Override
   public void close() throws Exception {
     DataSourceOperator.super.close();
@@ -53,14 +248,229 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
 
   @Override
   public boolean isFinished() {
-    return false;
+    return finished || (finished = hasNext());
   }
 
   @Override
   public PlanNodeId getSourceId() {
-    return null;
+    return sourceId;
   }
 
   @Override
-  public void initQueryDataSource(QueryDataSource dataSource) {}
+  public void initQueryDataSource(QueryDataSource dataSource) {
+    seriesScanUtil.initQueryDataSource(dataSource);
+  }
+
+  /** @return if already get the result */
+  private boolean calcFromCacheData(TimeRange curTimeRange) throws IOException {
+    calcFromBatch(preCachedData, curTimeRange);
+    // The result is calculated from the cache
+    return (preCachedData != null
+            && (ascending
+                ? preCachedData.getEndTime() >= curTimeRange.getMax()
+                : preCachedData.getStartTime() < curTimeRange.getMin()))
+        || isEndCalc();
+  }
+
+  @SuppressWarnings("squid:S3776")
+  private void calcFromBatch(TsBlockSingleColumnIterator blockIterator, TimeRange curTimeRange)
+      throws IOException {
+    // check if the batchData does not contain points in current interval
+    if (!satisfied(blockIterator, curTimeRange)) {
+      return;
+    }
+
+    for (AggregateResult result : aggregateResultList) {
+      // current agg method has been calculated
+      if (result.hasFinalResult()) {
+        continue;
+      }
+      // lazy reset batch data for calculation
+      blockIterator.setRowIndex(lastReadIndex);
+      // skip points that cannot be calculated
+      skipOutOfTimeRangePoints(blockIterator, curTimeRange);
+
+      if (blockIterator.hasNext()) {
+        result.updateResultFromPageData(
+            blockIterator, curTimeRange.getMin(), curTimeRange.getMax());
+      }
+    }
+
+    // reset the last position to current Index
+    lastReadIndex = blockIterator.getRowIndex();
+
+    // can calc for next interval
+    if (blockIterator.hasNext()) {
+      preCachedData = blockIterator;
+    }
+  }
+
+  // skip points that cannot be calculated
+  private void skipOutOfTimeRangePoints(
+      TsBlockSingleColumnIterator tsBlockIterator, TimeRange curTimeRange) {
+    if (ascending) {
+      while (tsBlockIterator.hasNext() && tsBlockIterator.currentTime() < curTimeRange.getMin()) {
+        tsBlockIterator.next();
+      }
+    } else {
+      while (tsBlockIterator.hasNext() && tsBlockIterator.currentTime() >= curTimeRange.getMax()) {
+        tsBlockIterator.next();
+      }
+    }
+  }
+
+  private boolean satisfied(TsBlockSingleColumnIterator tsBlockIterator, TimeRange timeRange) {
+    if (tsBlockIterator == null || !tsBlockIterator.hasNext()) {
+      return false;
+    }
+
+    if (ascending
+        && (tsBlockIterator.getEndTime() < timeRange.getMin()
+            || tsBlockIterator.currentTime() >= timeRange.getMax())) {
+      return false;
+    }
+    if (!ascending
+        && (tsBlockIterator.getStartTime() >= timeRange.getMax()
+            || tsBlockIterator.currentTime() < timeRange.getMin())) {
+      preCachedData = tsBlockIterator;
+      return false;
+    }
+    return true;
+  }
+
+  private boolean isEndCalc() {
+    for (AggregateResult result : aggregateResultList) {
+      if (!result.hasFinalResult()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+  private boolean readAndCalcFromPage(TimeRange curTimeRange) throws IOException {
+    while (seriesScanUtil.hasNextPage()) {
+      Statistics pageStatistics = seriesScanUtil.currentPageStatistics();
+      // must be non overlapped page
+      if (pageStatistics != null) {
+        // There is no more eligible points in current time range
+        if (pageStatistics.getStartTime() >= curTimeRange.getMax()) {
+          if (ascending) {
+            return true;
+          } else {
+            seriesScanUtil.skipCurrentPage();
+            continue;
+          }
+        }
+        // can use pageHeader
+        if (canUseCurrentPageStatistics()
+            && curTimeRange.contains(pageStatistics.getStartTime(), pageStatistics.getEndTime())) {
+          calcFromStatistics(pageStatistics);
+          seriesScanUtil.skipCurrentPage();
+          if (isEndCalc()) {
+            return true;
+          }
+          continue;
+        }
+      }
+
+      // calc from page data
+      TsBlockSingleColumnIterator tsBlockIterator =
+          seriesScanUtil.nextPage().getTsBlockSingleColumnIterator();
+      if (tsBlockIterator == null || !tsBlockIterator.hasNext()) {
+        continue;
+      }
+
+      // reset the last position to current Index
+      lastReadIndex = tsBlockIterator.getRowIndex();
+
+      // stop calc and cached current batchData
+      if (ascending && tsBlockIterator.currentTime() >= curTimeRange.getMax()) {
+        preCachedData = tsBlockIterator;
+        return true;
+      }
+
+      // calc from batch data
+      calcFromBatch(tsBlockIterator, curTimeRange);
+
+      // judge whether the calculation finished
+      if (isEndCalc()
+          || (tsBlockIterator.hasNext()
+              && (ascending
+                  ? tsBlockIterator.currentTime() >= curTimeRange.getMax()
+                  : tsBlockIterator.currentTime() < curTimeRange.getMin()))) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean readAndCalcFromChunk(TimeRange curTimeRange) throws IOException {
+    while (seriesScanUtil.hasNextChunk()) {
+      Statistics chunkStatistics = seriesScanUtil.currentChunkStatistics();
+      if (chunkStatistics.getStartTime() >= curTimeRange.getMax()) {
+        if (ascending) {
+          return true;
+        } else {
+          seriesScanUtil.skipCurrentChunk();
+          continue;
+        }
+      }
+      // calc from chunkMetaData
+      if (canUseCurrentChunkStatistics()
+          && curTimeRange.contains(chunkStatistics.getStartTime(), chunkStatistics.getEndTime())) {
+        calcFromStatistics(chunkStatistics);
+        seriesScanUtil.skipCurrentChunk();
+        continue;
+      }
+      // read page
+      if (readAndCalcFromPage(curTimeRange)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private void calcFromStatistics(Statistics statistics) {
+    try {
+      for (AggregateResult result : aggregateResultList) {
+        if (result.hasFinalResult()) {
+          continue;
+        }
+        result.updateResultFromStatistics(statistics);
+      }
+    } catch (QueryProcessException e) {
+      throw new RuntimeException("Error while updating result using statistics", e);
+    }
+  }
+
+  public boolean canUseCurrentFileStatistics() throws IOException {
+    Statistics fileStatistics = seriesScanUtil.currentFileStatistics();
+    return !seriesScanUtil.isFileOverlapped()
+        && containedByTimeFilter(fileStatistics)
+        && !seriesScanUtil.currentFileModified();
+  }
+
+  public boolean canUseCurrentChunkStatistics() throws IOException {
+    Statistics chunkStatistics = seriesScanUtil.currentChunkStatistics();
+    return !seriesScanUtil.isChunkOverlapped()
+        && containedByTimeFilter(chunkStatistics)
+        && !seriesScanUtil.currentChunkModified();
+  }
+
+  public boolean canUseCurrentPageStatistics() throws IOException {
+    Statistics currentPageStatistics = seriesScanUtil.currentPageStatistics();
+    if (currentPageStatistics == null) {
+      return false;
+    }
+    return !seriesScanUtil.isPageOverlapped()
+        && containedByTimeFilter(currentPageStatistics)
+        && !seriesScanUtil.currentPageModified();
+  }
+
+  private boolean containedByTimeFilter(Statistics statistics) {
+    Filter timeFilter = seriesScanUtil.getTimeFilter();
+    return timeFilter == null
+        || timeFilter.containStartEndTime(statistics.getStartTime(), statistics.getEndTime());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
index c6493bedf4..fad08b703d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
@@ -136,12 +136,11 @@ public class SeriesScanUtil {
     if (ascending) {
       this.orderUtils = new AscTimeOrderUtils();
       mergeReader = getPriorityMergeReader();
-      this.curUnseqFileIndex = 0;
     } else {
       this.orderUtils = new DescTimeOrderUtils();
       mergeReader = getDescPriorityMergeReader();
-      this.curUnseqFileIndex = 0;
     }
+    this.curUnseqFileIndex = 0;
 
     unSeqTimeSeriesMetadata =
         new PriorityQueue<>(
@@ -271,6 +270,9 @@ public class SeriesScanUtil {
 
     if (firstChunkMetadata != null) {
       return true;
+      // hasNextFile() has not been invoked
+    } else if (firstTimeSeriesMetadata == null && cachedChunkMetadata.isEmpty()) {
+      return false;
     }
 
     while (firstChunkMetadata == null && (!cachedChunkMetadata.isEmpty() || hasNextFile())) {
@@ -1085,6 +1087,10 @@ public class SeriesScanUtil {
     return timeFilter;
   }
 
+  public TimeOrderUtils getOrderUtils() {
+    return orderUtils;
+  }
+
   private class VersionPageReader {
 
     protected PriorityMergeReader.MergeReaderPriority version;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index e02becf770..e5d60439bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.db.mpp.operator.schema.SchemaMergeOperator;
 import org.apache.iotdb.db.mpp.operator.schema.TimeSeriesSchemaScanOperator;
 import org.apache.iotdb.db.mpp.operator.source.DataSourceOperator;
 import org.apache.iotdb.db.mpp.operator.source.ExchangeOperator;
+import org.apache.iotdb.db.mpp.operator.source.SeriesAggregateScanOperator;
 import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
@@ -76,7 +77,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 
 /**
- * used to plan a fragment instance. Currently, we simply change it from PlanNode to executable
+ * Used to plan a fragment instance. Currently, we simply change it from PlanNode to executable
  * Operator tree, but in the future, we may split one fragment instance into multiple pipeline to
  * run a fragment instance parallel and take full advantage of multi-cores
  */
@@ -214,7 +215,29 @@ public class LocalExecutionPlanner {
     @Override
     public Operator visitSeriesAggregate(
         SeriesAggregateScanNode node, LocalExecutionPlanContext context) {
-      return super.visitSeriesAggregate(node, context);
+      PartialPath seriesPath = node.getSeriesPath();
+      boolean ascending = node.getScanOrder() == OrderBy.TIMESTAMP_ASC;
+      OperatorContext operatorContext =
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              SeriesAggregateScanNode.class.getSimpleName());
+
+      SeriesAggregateScanOperator aggregateScanOperator =
+          new SeriesAggregateScanOperator(
+              node.getPlanNodeId(),
+              seriesPath,
+              node.getAllSensors(),
+              operatorContext,
+              node.getAggregateFuncList(),
+              node.getTimeFilter(),
+              ascending,
+              node.getGroupByTimeParameter());
+
+      context.addSourceOperator(aggregateScanOperator);
+      context.addPath(seriesPath);
+
+      return aggregateScanOperator;
     }
 
     @Override
@@ -355,6 +378,7 @@ public class LocalExecutionPlanner {
   private static class LocalExecutionPlanContext {
     private final FragmentInstanceContext instanceContext;
     private final List<PartialPath> paths;
+    // Used to lock corresponding query resources
     private final List<DataSourceOperator> sourceOperators;
     private ISinkHandle sinkHandle;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java
index 4b0ee9c1ff..db7497ea61 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/QueryPlanBuilder.java
@@ -109,6 +109,10 @@ public class QueryPlanBuilder {
     for (Map.Entry<String, Map<PartialPath, Set<AggregationType>>> entry :
         deviceNameToAggregationsMap.entrySet()) {
       String deviceName = entry.getKey();
+      Set<String> allSensors =
+          entry.getValue().keySet().stream()
+              .map(PartialPath::getMeasurement)
+              .collect(Collectors.toSet());
 
       for (PartialPath path : entry.getValue().keySet()) {
         deviceNameToSourceNodesMap
@@ -117,6 +121,7 @@ public class QueryPlanBuilder {
                 new SeriesAggregateScanNode(
                     context.getQueryId().genPlanNodeId(),
                     path,
+                    allSensors,
                     new ArrayList<>(entry.getValue().get(path)),
                     scanOrder,
                     timeFilter,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index a3065c8c03..7058301ce8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -34,6 +34,7 @@ import java.nio.ByteBuffer;
 import java.util.Objects;
 
 public class FragmentInstance implements IConsensusRequest {
+
   private final FragmentInstanceId id;
   private final QueryType type;
   // The reference of PlanFragment which this instance is generated from
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
index dcde49fd9a..fde2b903a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
@@ -45,8 +45,10 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -71,6 +73,9 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
   private final PartialPath seriesPath;
   private final List<AggregationType> aggregateFuncList;
 
+  // all the sensors in seriesPath's device of current query
+  private Set<String> allSensors;
+
   // The order to traverse the data.
   // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here.
   // The default order is TIMESTAMP_ASC, which means "order by timestamp asc"
@@ -90,12 +95,14 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
   public SeriesAggregateScanNode(
       PlanNodeId id,
       PartialPath seriesPath,
+      Set<String> allSensors,
       List<AggregationType> aggregateFuncList,
       OrderBy scanOrder,
       Filter timeFilter,
       GroupByTimeComponent groupByTimeParameter) {
     super(id);
     this.seriesPath = seriesPath;
+    this.allSensors = allSensors;
     this.aggregateFuncList = aggregateFuncList;
     this.scanOrder = scanOrder;
     this.timeFilter = timeFilter;
@@ -109,6 +116,22 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
             .collect(Collectors.toList());
   }
 
+  public OrderBy getScanOrder() {
+    return scanOrder;
+  }
+
+  public Set<String> getAllSensors() {
+    return allSensors;
+  }
+
+  public Filter getTimeFilter() {
+    return timeFilter;
+  }
+
+  public GroupByTimeComponent getGroupByTimeParameter() {
+    return groupByTimeParameter;
+  }
+
   @Override
   public List<PlanNode> getChildren() {
     return ImmutableList.of();
@@ -167,6 +190,10 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.SERIES_AGGREGATE_SCAN.serialize(byteBuffer);
     seriesPath.serialize(byteBuffer);
+    ReadWriteIOUtils.write(allSensors.size(), byteBuffer);
+    for (String sensor : allSensors) {
+      ReadWriteIOUtils.write(sensor, byteBuffer);
+    }
     ReadWriteIOUtils.write(aggregateFuncList.size(), byteBuffer);
     for (AggregationType aggregationType : aggregateFuncList) {
       ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
@@ -180,12 +207,17 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
     }
     ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer);
     timeFilter.serialize(byteBuffer);
-    // TODO serialize groupByTimeParameter
+    groupByTimeParameter.serialize(byteBuffer);
     regionReplicaSet.serializeImpl(byteBuffer);
   }
 
   public static SeriesAggregateScanNode deserialize(ByteBuffer byteBuffer) {
     PartialPath partialPath = (PartialPath) PathDeserializeUtil.deserialize(byteBuffer);
+    int allSensorsSize = ReadWriteIOUtils.readInt(byteBuffer);
+    Set<String> allSensors = new HashSet<>();
+    for (int i = 0; i < allSensorsSize; i++) {
+      allSensors.add(ReadWriteIOUtils.readString(byteBuffer));
+    }
     int aggregateFuncSize = ReadWriteIOUtils.readInt(byteBuffer);
     List<AggregationType> aggregateFuncList = new ArrayList<>();
     for (int i = 0; i < aggregateFuncSize; i++) {
@@ -194,12 +226,18 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
     OrderBy scanOrder = OrderBy.values()[ReadWriteIOUtils.readInt(byteBuffer)];
     Filter timeFilter = FilterFactory.deserialize(byteBuffer);
 
-    // TODO serialize groupByTimeParameter
+    GroupByTimeComponent groupByTimeComponent = GroupByTimeComponent.deserialize(byteBuffer);
     RegionReplicaSet regionReplicaSet = RegionReplicaSet.deserializeImpl(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     SeriesAggregateScanNode seriesAggregateScanNode =
         new SeriesAggregateScanNode(
-            planNodeId, partialPath, aggregateFuncList, scanOrder, timeFilter, null);
+            planNodeId,
+            partialPath,
+            allSensors,
+            aggregateFuncList,
+            scanOrder,
+            timeFilter,
+            groupByTimeComponent);
     seriesAggregateScanNode.regionReplicaSet = regionReplicaSet;
     return seriesAggregateScanNode;
   }
@@ -233,6 +271,7 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
     SeriesAggregateScanNode that = (SeriesAggregateScanNode) o;
     return Objects.equals(groupByTimeParameter, that.groupByTimeParameter)
         && Objects.equals(seriesPath, that.seriesPath)
+        && Objects.equals(allSensors, that.allSensors)
         && Objects.equals(
             aggregateFuncList.stream().sorted().collect(Collectors.toList()),
             that.aggregateFuncList.stream().sorted().collect(Collectors.toList()))
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByTimeComponent.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByTimeComponent.java
index dbb2daa415..026297a331 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByTimeComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByTimeComponent.java
@@ -20,6 +20,10 @@
 package org.apache.iotdb.db.mpp.sql.statement.component;
 
 import org.apache.iotdb.db.mpp.sql.statement.StatementNode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
 
 /** This class maintains information of {@code GROUP BY} clause. */
 public class GroupByTimeComponent extends StatementNode {
@@ -43,6 +47,28 @@ public class GroupByTimeComponent extends StatementNode {
 
   public GroupByTimeComponent() {}
 
+  public GroupByTimeComponent(
+      long startTime, long endTime, long interval, long slidingStep, boolean leftCRightO) {
+    this(startTime, endTime, interval, slidingStep, false, false, leftCRightO);
+  }
+
+  public GroupByTimeComponent(
+      long startTime,
+      long endTime,
+      long interval,
+      long slidingStep,
+      boolean isIntervalByMonth,
+      boolean isSlidingStepByMonth,
+      boolean leftCRightO) {
+    this.startTime = startTime;
+    this.endTime = endTime;
+    this.interval = interval;
+    this.slidingStep = slidingStep;
+    this.isIntervalByMonth = isIntervalByMonth;
+    this.isSlidingStepByMonth = isSlidingStepByMonth;
+    this.leftCRightO = leftCRightO;
+  }
+
   public boolean isLeftCRightO() {
     return leftCRightO;
   }
@@ -98,4 +124,51 @@ public class GroupByTimeComponent extends StatementNode {
   public void setIntervalByMonth(boolean isIntervalByMonth) {
     this.isIntervalByMonth = isIntervalByMonth;
   }
+
+  public void serialize(ByteBuffer buffer) {
+    ReadWriteIOUtils.write(startTime, buffer);
+    ReadWriteIOUtils.write(endTime, buffer);
+    ReadWriteIOUtils.write(interval, buffer);
+    ReadWriteIOUtils.write(slidingStep, buffer);
+    ReadWriteIOUtils.write(isIntervalByMonth, buffer);
+    ReadWriteIOUtils.write(isSlidingStepByMonth, buffer);
+    ReadWriteIOUtils.write(leftCRightO, buffer);
+  }
+
+  public static GroupByTimeComponent deserialize(ByteBuffer buffer) {
+    GroupByTimeComponent groupByTimeComponent = new GroupByTimeComponent();
+    groupByTimeComponent.setStartTime(ReadWriteIOUtils.readLong(buffer));
+    groupByTimeComponent.setEndTime(ReadWriteIOUtils.readLong(buffer));
+    groupByTimeComponent.setInterval(ReadWriteIOUtils.readLong(buffer));
+    groupByTimeComponent.setSlidingStep(ReadWriteIOUtils.readLong(buffer));
+    groupByTimeComponent.setIntervalByMonth(ReadWriteIOUtils.readBool(buffer));
+    groupByTimeComponent.setSlidingStepByMonth(ReadWriteIOUtils.readBool(buffer));
+    groupByTimeComponent.setLeftCRightO(ReadWriteIOUtils.readBool(buffer));
+    return groupByTimeComponent;
+  }
+
+  public boolean equals(Object obj) {
+    if (!(obj instanceof GroupByTimeComponent)) {
+      return false;
+    }
+    GroupByTimeComponent other = (GroupByTimeComponent) obj;
+    return this.startTime == other.startTime
+        && this.endTime == other.endTime
+        && this.interval == other.interval
+        && this.slidingStep == other.slidingStep
+        && this.isSlidingStepByMonth == other.isSlidingStepByMonth
+        && this.isIntervalByMonth == other.isIntervalByMonth
+        && this.leftCRightO == other.leftCRightO;
+  }
+
+  public int hashCode() {
+    return Objects.hash(
+        startTime,
+        endTime,
+        interval,
+        slidingStep,
+        isIntervalByMonth,
+        isSlidingStepByMonth,
+        leftCRightO);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
index aa34aa3677..5b0c9a7607 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
@@ -153,11 +153,11 @@ public class GroupByFillDataSet extends GroupByTimeDataSet {
     RowRecord record;
     long curTimestamp;
     if (leftCRightO) {
-      curTimestamp = curStartTime;
-      record = new RowRecord(curStartTime);
+      curTimestamp = curAggrTimeRange.getMin();
+      record = new RowRecord(curAggrTimeRange.getMin());
     } else {
-      curTimestamp = curEndTime - 1;
-      record = new RowRecord(curEndTime - 1);
+      curTimestamp = curAggrTimeRange.getMax() - 1;
+      record = new RowRecord(curAggrTimeRange.getMax() - 1);
     }
 
     for (int i = 0; i < aggregations.size(); i++) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
index 8c269f7fd7..16d039e370 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
@@ -27,8 +27,8 @@ import org.apache.iotdb.db.query.executor.groupby.SlidingWindowGroupByExecutor;
 import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.utils.timerangeiterator.TimeRangeIteratorFactory;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -45,13 +45,11 @@ public abstract class GroupByTimeDataSet extends QueryDataSet {
   protected long endTime;
 
   // current interval of aggregation window [curStartTime, curEndTime)
-  protected long curStartTime;
-  protected long curEndTime;
+  protected TimeRange curAggrTimeRange;
   protected boolean hasCachedTimeInterval;
 
   // current interval of pre-aggregation window [curStartTime, curEndTime)
-  protected long curPreAggrStartTime;
-  protected long curPreAggrEndTime;
+  protected TimeRange curPreAggrTimeRange;
 
   protected boolean leftCRightO;
   protected boolean isIntervalByMonth = false;
@@ -125,18 +123,10 @@ public abstract class GroupByTimeDataSet extends QueryDataSet {
             true);
 
     // find the first aggregation interval
-    Pair<Long, Long> retTimeRange;
-    retTimeRange = aggrWindowIterator.getFirstTimeRange();
-
-    curStartTime = retTimeRange.left;
-    curEndTime = retTimeRange.right;
+    curAggrTimeRange = aggrWindowIterator.nextTimeRange();
 
     // find the first pre-aggregation interval
-    Pair<Long, Long> retPerAggrTimeRange;
-    retPerAggrTimeRange = preAggrWindowIterator.getFirstTimeRange();
-
-    curPreAggrStartTime = retPerAggrTimeRange.left;
-    curPreAggrEndTime = retPerAggrTimeRange.right;
+    curPreAggrTimeRange = preAggrWindowIterator.nextTimeRange();
 
     this.hasCachedTimeInterval = true;
 
@@ -151,12 +141,10 @@ public abstract class GroupByTimeDataSet extends QueryDataSet {
     }
 
     // find the next aggregation interval
-    Pair<Long, Long> nextTimeRange = aggrWindowIterator.getNextTimeRange(curStartTime);
-    if (nextTimeRange == null) {
+    if (!aggrWindowIterator.hasNextTimeRange()) {
       return false;
     }
-    curStartTime = nextTimeRange.left;
-    curEndTime = nextTimeRange.right;
+    curAggrTimeRange = aggrWindowIterator.nextTimeRange();
 
     hasCachedTimeInterval = true;
     return true;
@@ -170,8 +158,8 @@ public abstract class GroupByTimeDataSet extends QueryDataSet {
   }
 
   @TestOnly
-  public Pair<Long, Long> nextTimePartition() {
+  public TimeRange nextTimePartition() {
     hasCachedTimeInterval = false;
-    return new Pair<>(curStartTime, curEndTime);
+    return curAggrTimeRange;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeEngineDataSet.java
index 944f5896bc..d822f08780 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeEngineDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeEngineDataSet.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
 
 import java.io.IOException;
 
@@ -51,9 +51,9 @@ public abstract class GroupByTimeEngineDataSet extends GroupByTimeDataSet {
   protected RowRecord constructRowRecord(AggregateResult[] aggregateResultList) {
     RowRecord record;
     if (leftCRightO) {
-      record = new RowRecord(curStartTime);
+      record = new RowRecord(curAggrTimeRange.getMin());
     } else {
-      record = new RowRecord(curEndTime - 1);
+      record = new RowRecord(curAggrTimeRange.getMax() - 1);
     }
     for (AggregateResult res : aggregateResultList) {
       if (res == null) {
@@ -66,22 +66,24 @@ public abstract class GroupByTimeEngineDataSet extends GroupByTimeDataSet {
   }
 
   protected boolean isEndCal() {
-    if (curPreAggrStartTime == -1) {
+    if (curPreAggrTimeRange.getMin() == -1) {
       return true;
     }
-    return ascending ? curPreAggrStartTime >= curEndTime : curPreAggrEndTime <= curStartTime;
+    return ascending
+        ? curPreAggrTimeRange.getMin() >= curAggrTimeRange.getMax()
+        : curPreAggrTimeRange.getMax() <= curAggrTimeRange.getMin();
   }
 
   // find the next pre-aggregation interval
   protected void updatePreAggrInterval() {
-    Pair<Long, Long> retPerAggrTimeRange;
-    retPerAggrTimeRange = preAggrWindowIterator.getNextTimeRange(curPreAggrStartTime);
+    TimeRange retPerAggrTimeRange = null;
+    if (preAggrWindowIterator.hasNextTimeRange()) {
+      retPerAggrTimeRange = preAggrWindowIterator.nextTimeRange();
+    }
     if (retPerAggrTimeRange != null) {
-      curPreAggrStartTime = retPerAggrTimeRange.left;
-      curPreAggrEndTime = retPerAggrTimeRange.right;
+      curPreAggrTimeRange = retPerAggrTimeRange;
     } else {
-      curPreAggrStartTime = -1;
-      curPreAggrEndTime = -1;
+      curPreAggrTimeRange = new TimeRange(-1, -1);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
index 6034f22540..a5ab69bd91 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithValueFilterDataSet.java
@@ -194,10 +194,12 @@ public class GroupByWithValueFilterDataSet extends GroupByTimeEngineDataSet {
     curAggregateResults = new AggregateResult[paths.size()];
     for (SlidingWindowGroupByExecutor slidingWindowGroupByExecutor :
         slidingWindowGroupByExecutors) {
-      slidingWindowGroupByExecutor.setTimeRange(curStartTime, curEndTime);
+      slidingWindowGroupByExecutor.setTimeRange(
+          curAggrTimeRange.getMin(), curAggrTimeRange.getMax());
     }
     while (!isEndCal()) {
-      AggregateResult[] aggregations = calcResult(curPreAggrStartTime, curPreAggrEndTime);
+      AggregateResult[] aggregations =
+          calcResult(curPreAggrTimeRange.getMin(), curPreAggrTimeRange.getMax());
       for (int i = 0; i < aggregations.length; i++) {
         slidingWindowGroupByExecutors[i].update(aggregations[i].clone());
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
index 1292e2c90f..2561c8c412 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
@@ -194,7 +194,8 @@ public class GroupByWithoutValueFilterDataSet extends GroupByTimeEngineDataSet {
     curAggregateResults = new AggregateResult[paths.size()];
     for (SlidingWindowGroupByExecutor slidingWindowGroupByExecutor :
         slidingWindowGroupByExecutors) {
-      slidingWindowGroupByExecutor.setTimeRange(curStartTime, curEndTime);
+      slidingWindowGroupByExecutor.setTimeRange(
+          curAggrTimeRange.getMin(), curAggrTimeRange.getMax());
     }
     try {
       while (!isEndCal()) {
@@ -204,7 +205,8 @@ public class GroupByWithoutValueFilterDataSet extends GroupByTimeEngineDataSet {
           List<Integer> indexes = entry.getValue();
           GroupByExecutor groupByExecutor = pathExecutors.get(path);
           List<AggregateResult> aggregations =
-              groupByExecutor.calcResult(curPreAggrStartTime, curPreAggrEndTime);
+              groupByExecutor.calcResult(
+                  curPreAggrTimeRange.getMin(), curPreAggrTimeRange.getMax());
           for (int i = 0; i < aggregations.size(); i++) {
             int resultIndex = indexes.get(i);
             slidingWindowGroupByExecutors[resultIndex].update(aggregations.get(i).clone());
@@ -217,7 +219,8 @@ public class GroupByWithoutValueFilterDataSet extends GroupByTimeEngineDataSet {
           List<List<Integer>> indexesList = entry.getValue();
           AlignedGroupByExecutor groupByExecutor = alignedPathExecutors.get(path);
           List<List<AggregateResult>> aggregationsList =
-              groupByExecutor.calcAlignedResult(curPreAggrStartTime, curPreAggrEndTime);
+              groupByExecutor.calcAlignedResult(
+                  curPreAggrTimeRange.getMin(), curPreAggrTimeRange.getMax());
           for (int i = 0; i < path.getMeasurementList().size(); i++) {
             List<AggregateResult> aggregations = aggregationsList.get(i);
             List<Integer> indexes = indexesList.get(i);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 136ce73968..87a9741a4a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -331,6 +331,9 @@ public class SeriesReader {
 
     if (firstChunkMetadata != null) {
       return true;
+      // hasNextFile() has not been invoked
+    } else if (firstTimeSeriesMetadata == null && cachedChunkMetadata.isEmpty()) {
+      return false;
     }
 
     while (firstChunkMetadata == null && (!cachedChunkMetadata.isEmpty() || hasNextFile())) {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/AggrWindowIterator.java b/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/AggrWindowIterator.java
index d4e17d1c32..a86383bdfe 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/AggrWindowIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/AggrWindowIterator.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.utils.timerangeiterator;
 
 import org.apache.iotdb.db.qp.utils.DatetimeUtils;
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
 
 import static org.apache.iotdb.db.qp.utils.DatetimeUtils.MS_TO_MONTH;
 
@@ -35,7 +35,6 @@ public class AggrWindowIterator implements ITimeRangeIterator {
   // total query [startTime, endTime)
   private final long startTime;
   private final long endTime;
-
   private final long interval;
   private final long slidingStep;
 
@@ -43,6 +42,8 @@ public class AggrWindowIterator implements ITimeRangeIterator {
   private final boolean isSlidingStepByMonth;
   private final boolean isIntervalByMonth;
 
+  private TimeRange curTimeRange;
+
   public AggrWindowIterator(
       long startTime,
       long endTime,
@@ -61,7 +62,7 @@ public class AggrWindowIterator implements ITimeRangeIterator {
   }
 
   @Override
-  public Pair<Long, Long> getFirstTimeRange() {
+  public TimeRange getFirstTimeRange() {
     if (isAscending) {
       return getLeftmostTimeRange();
     } else {
@@ -69,7 +70,7 @@ public class AggrWindowIterator implements ITimeRangeIterator {
     }
   }
 
-  private Pair<Long, Long> getLeftmostTimeRange() {
+  private TimeRange getLeftmostTimeRange() {
     long retEndTime;
     if (isIntervalByMonth) {
       // calculate interval length by natural month based on startTime
@@ -78,10 +79,10 @@ public class AggrWindowIterator implements ITimeRangeIterator {
     } else {
       retEndTime = Math.min(startTime + interval, endTime);
     }
-    return new Pair<>(startTime, retEndTime);
+    return new TimeRange(startTime, retEndTime);
   }
 
-  private Pair<Long, Long> getRightmostTimeRange() {
+  private TimeRange getRightmostTimeRange() {
     long retStartTime;
     long retEndTime;
     long queryRange = endTime - startTime;
@@ -106,12 +107,18 @@ public class AggrWindowIterator implements ITimeRangeIterator {
     } else {
       retEndTime = Math.min(retStartTime + interval, endTime);
     }
-    return new Pair<>(retStartTime, retEndTime);
+    return new TimeRange(retStartTime, retEndTime);
   }
 
   @Override
-  public Pair<Long, Long> getNextTimeRange(long curStartTime) {
+  public boolean hasNextTimeRange() {
+    if (curTimeRange == null) {
+      curTimeRange = getFirstTimeRange();
+      return true;
+    }
+
     long retStartTime, retEndTime;
+    long curStartTime = curTimeRange.getMin();
     if (isAscending) {
       if (isSlidingStepByMonth) {
         retStartTime = DatetimeUtils.calcIntervalByMonth(curStartTime, (int) (slidingStep));
@@ -120,7 +127,7 @@ public class AggrWindowIterator implements ITimeRangeIterator {
       }
       // This is an open interval , [0-100)
       if (retStartTime >= endTime) {
-        return null;
+        return false;
       }
     } else {
       if (isSlidingStepByMonth) {
@@ -129,7 +136,7 @@ public class AggrWindowIterator implements ITimeRangeIterator {
         retStartTime = curStartTime - slidingStep;
       }
       if (retStartTime < startTime) {
-        return null;
+        return false;
       }
     }
 
@@ -139,7 +146,16 @@ public class AggrWindowIterator implements ITimeRangeIterator {
       retEndTime = retStartTime + interval;
     }
     retEndTime = Math.min(retEndTime, endTime);
-    return new Pair<>(retStartTime, retEndTime);
+    curTimeRange = new TimeRange(retStartTime, retEndTime);
+    return true;
+  }
+
+  @Override
+  public TimeRange nextTimeRange() {
+    if (curTimeRange != null || hasNextTimeRange()) {
+      return curTimeRange;
+    }
+    return null;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/ITimeRangeIterator.java b/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/ITimeRangeIterator.java
index 91c5a337db..2a809557e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/ITimeRangeIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/ITimeRangeIterator.java
@@ -19,19 +19,21 @@
 
 package org.apache.iotdb.db.utils.timerangeiterator;
 
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
 
 /** This interface used for iteratively generating aggregated time windows in GROUP BY query. */
 public interface ITimeRangeIterator {
 
   /** return the first time range by sorting order */
-  Pair<Long, Long> getFirstTimeRange();
+  TimeRange getFirstTimeRange();
 
+  /** @return whether current iterator has next time range */
+  boolean hasNextTimeRange();
   /**
    * return the next time range according to curStartTime (the start time of the last returned time
    * range)
    */
-  Pair<Long, Long> getNextTimeRange(long curStartTime);
+  TimeRange nextTimeRange();
 
   boolean isAscending();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/PreAggrWindowIterator.java b/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/PreAggrWindowIterator.java
index 62cc7b0297..87e520d030 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/PreAggrWindowIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/PreAggrWindowIterator.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.utils.timerangeiterator;
 
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
 
 /**
  * This class iteratively generates pre-aggregated time windows.
@@ -32,7 +32,6 @@ public class PreAggrWindowIterator implements ITimeRangeIterator {
   // total query [startTime, endTime)
   private final long startTime;
   private final long endTime;
-
   private final long interval;
   private final long slidingStep;
 
@@ -43,6 +42,8 @@ public class PreAggrWindowIterator implements ITimeRangeIterator {
   private boolean isIntervalCyclicChange = false;
   private int intervalCnt = 0;
 
+  private TimeRange curTimeRange;
+
   public PreAggrWindowIterator(
       long startTime, long endTime, long interval, long slidingStep, boolean isAscending) {
     this.startTime = startTime;
@@ -54,7 +55,7 @@ public class PreAggrWindowIterator implements ITimeRangeIterator {
   }
 
   @Override
-  public Pair<Long, Long> getFirstTimeRange() {
+  public TimeRange getFirstTimeRange() {
     if (isAscending) {
       return getLeftmostTimeRange();
     } else {
@@ -62,13 +63,13 @@ public class PreAggrWindowIterator implements ITimeRangeIterator {
     }
   }
 
-  private Pair<Long, Long> getLeftmostTimeRange() {
+  private TimeRange getLeftmostTimeRange() {
     long retEndTime = Math.min(startTime + curInterval, endTime);
     updateIntervalAndStep();
-    return new Pair<>(startTime, retEndTime);
+    return new TimeRange(startTime, retEndTime);
   }
 
-  private Pair<Long, Long> getRightmostTimeRange() {
+  private TimeRange getRightmostTimeRange() {
     long retStartTime;
     long retEndTime;
     long intervalNum = (long) Math.ceil((endTime - startTime) / (double) slidingStep);
@@ -79,27 +80,42 @@ public class PreAggrWindowIterator implements ITimeRangeIterator {
     }
     retEndTime = Math.min(retStartTime + curInterval, endTime);
     updateIntervalAndStep();
-    return new Pair<>(retStartTime, retEndTime);
+    return new TimeRange(retStartTime, retEndTime);
   }
 
   @Override
-  public Pair<Long, Long> getNextTimeRange(long curStartTime) {
+  public boolean hasNextTimeRange() {
+    if (curTimeRange == null) {
+      curTimeRange = getFirstTimeRange();
+      return true;
+    }
+
     long retStartTime, retEndTime;
+    long curStartTime = curTimeRange.getMin();
     if (isAscending) {
       retStartTime = curStartTime + curSlidingStep;
       // This is an open interval , [0-100)
       if (retStartTime >= endTime) {
-        return null;
+        return false;
       }
     } else {
       retStartTime = curStartTime - curSlidingStep;
       if (retStartTime < startTime) {
-        return null;
+        return false;
       }
     }
     retEndTime = Math.min(retStartTime + curInterval, endTime);
     updateIntervalAndStep();
-    return new Pair<>(retStartTime, retEndTime);
+    curTimeRange = new TimeRange(retStartTime, retEndTime);
+    return true;
+  }
+
+  @Override
+  public TimeRange nextTimeRange() {
+    if (curTimeRange != null || hasNextTimeRange()) {
+      return curTimeRange;
+    }
+    return null;
   }
 
   private void initIntervalAndStep() {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/PreAggrWindowWithNaturalMonthIterator.java b/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/PreAggrWindowWithNaturalMonthIterator.java
index a5426bcf23..f56d288119 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/PreAggrWindowWithNaturalMonthIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/PreAggrWindowWithNaturalMonthIterator.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.utils.timerangeiterator;
 
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
 
 public class PreAggrWindowWithNaturalMonthIterator implements ITimeRangeIterator {
 
@@ -33,6 +33,7 @@ public class PreAggrWindowWithNaturalMonthIterator implements ITimeRangeIterator
   private long curStartTimeForIterator;
 
   private long lastEndTime;
+  private TimeRange curTimeRange;
 
   public PreAggrWindowWithNaturalMonthIterator(
       long startTime,
@@ -57,48 +58,61 @@ public class PreAggrWindowWithNaturalMonthIterator implements ITimeRangeIterator
   }
 
   @Override
-  public Pair<Long, Long> getFirstTimeRange() {
+  public TimeRange getFirstTimeRange() {
     long retStartTime = timeBoundaryHeap.pollFirst();
     lastEndTime = timeBoundaryHeap.first();
-    return new Pair<>(retStartTime, lastEndTime);
+    return new TimeRange(retStartTime, lastEndTime);
   }
 
   @Override
-  public Pair<Long, Long> getNextTimeRange(long curStartTime) {
+  public boolean hasNextTimeRange() {
+    if (curTimeRange == null) {
+      curTimeRange = getFirstTimeRange();
+      return true;
+    }
+
     if (lastEndTime >= curStartTimeForIterator) {
       tryToExpandHeap();
     }
     if (timeBoundaryHeap.isEmpty()) {
-      return null;
+      return false;
     }
     long retStartTime = timeBoundaryHeap.pollFirst();
     if (retStartTime >= curStartTimeForIterator) {
       tryToExpandHeap();
     }
     if (timeBoundaryHeap.isEmpty()) {
-      return null;
+      return false;
     }
     lastEndTime = timeBoundaryHeap.first();
-    return new Pair<>(retStartTime, lastEndTime);
+    curTimeRange = new TimeRange(retStartTime, lastEndTime);
+    return true;
+  }
+
+  @Override
+  public TimeRange nextTimeRange() {
+    if (curTimeRange != null || hasNextTimeRange()) {
+      return curTimeRange;
+    }
+    return null;
   }
 
   private void initHeap() {
-    Pair<Long, Long> firstTimeRange = aggrWindowIterator.getFirstTimeRange();
-    timeBoundaryHeap.add(firstTimeRange.left);
-    timeBoundaryHeap.add(firstTimeRange.right);
-    curStartTimeForIterator = firstTimeRange.left;
+    TimeRange firstTimeRange = aggrWindowIterator.nextTimeRange();
+    timeBoundaryHeap.add(firstTimeRange.getMin());
+    timeBoundaryHeap.add(firstTimeRange.getMax());
+    curStartTimeForIterator = firstTimeRange.getMin();
 
     tryToExpandHeap();
   }
 
   private void tryToExpandHeap() {
-    Pair<Long, Long> curTimeRange = aggrWindowIterator.getNextTimeRange(curStartTimeForIterator);
-    while (curTimeRange != null && timeBoundaryHeap.size() < HEAP_MAX_SIZE) {
-      timeBoundaryHeap.add(curTimeRange.left);
-      timeBoundaryHeap.add(curTimeRange.right);
-      curStartTimeForIterator = curTimeRange.left;
-
-      curTimeRange = aggrWindowIterator.getNextTimeRange(curStartTimeForIterator);
+    TimeRange timeRangeToExpand = null;
+    while (aggrWindowIterator.hasNextTimeRange() && timeBoundaryHeap.size() < HEAP_MAX_SIZE) {
+      timeRangeToExpand = aggrWindowIterator.nextTimeRange();
+      timeBoundaryHeap.add(timeRangeToExpand.getMin());
+      timeBoundaryHeap.add(timeRangeToExpand.getMax());
+      curStartTimeForIterator = timeRangeToExpand.getMin();
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/SingleTimeWindowIterator.java b/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/SingleTimeWindowIterator.java
new file mode 100644
index 0000000000..3470f98834
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/SingleTimeWindowIterator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.timerangeiterator;
+
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+
+/** Used for aggregation with only one time window. i.e. Aggregation without group by. */
+public class SingleTimeWindowIterator implements ITimeRangeIterator {
+
+  // total query [startTime, endTime)
+  private final long startTime;
+  private final long endTime;
+
+  private TimeRange curTimeRange;
+
+  public SingleTimeWindowIterator(long startTime, long endTime) {
+    this.startTime = startTime;
+    this.endTime = endTime;
+  }
+
+  @Override
+  public TimeRange getFirstTimeRange() {
+    return new TimeRange(startTime, endTime);
+  }
+
+  @Override
+  public boolean hasNextTimeRange() {
+    if (curTimeRange == null) {
+      curTimeRange = getFirstTimeRange();
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public TimeRange nextTimeRange() {
+    if (curTimeRange != null || hasNextTimeRange()) {
+      return curTimeRange;
+    }
+    return null;
+  }
+
+  @Override
+  public boolean isAscending() {
+    return false;
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
new file mode 100644
index 0000000000..d1c71f2eb4
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.operator;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
+import org.apache.iotdb.db.mpp.operator.source.SeriesAggregateScanOperator;
+import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.statement.component.GroupByTimeComponent;
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.filter.TimeFilter;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import com.google.common.collect.Sets;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+
+public class SeriesAggregateScanOperatorTest {
+
+  private static final String SERIES_SCAN_OPERATOR_TEST_SG = "root.SeriesScanOperatorTest";
+  private final List<String> deviceIds = new ArrayList<>();
+  private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+  private final List<TsFileResource> seqResources = new ArrayList<>();
+  private final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+  @Before
+  public void setUp() throws MetadataException, IOException, WriteProcessException {
+    SeriesReaderTestUtil.setUp(
+        measurementSchemas, deviceIds, seqResources, unSeqResources, SERIES_SCAN_OPERATOR_TEST_SG);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+  }
+
+  @Test
+  public void testAggregationWithoutTimeFilter() throws IllegalPathException {
+    SeriesAggregateScanOperator seriesAggregateScanOperator =
+        initSeriesAggregateScanOperator(
+            Collections.singletonList(AggregationType.COUNT), null, true, null);
+    int count = 0;
+    while (seriesAggregateScanOperator.hasNext()) {
+      TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+      assertEquals(500, resultTsBlock.getColumn(0).getLong(0));
+      count++;
+    }
+    assertEquals(1, count);
+  }
+
+  @Test
+  public void testMultiAggregationFuncWithoutTimeFilter1() throws IllegalPathException {
+    List<AggregationType> aggregationTypes = new ArrayList<>();
+    aggregationTypes.add(AggregationType.COUNT);
+    aggregationTypes.add(AggregationType.SUM);
+    SeriesAggregateScanOperator seriesAggregateScanOperator =
+        initSeriesAggregateScanOperator(aggregationTypes, null, true, null);
+    int count = 0;
+    while (seriesAggregateScanOperator.hasNext()) {
+      TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+      assertEquals(500, resultTsBlock.getColumn(0).getLong(0));
+      assertEquals(6524750.0, resultTsBlock.getColumn(1).getDouble(0), 0.0001);
+      count++;
+    }
+    assertEquals(1, count);
+  }
+
+  @Test
+  public void testMultiAggregationFuncWithoutTimeFilter2() throws IllegalPathException {
+    List<AggregationType> aggregationTypes = new ArrayList<>();
+    aggregationTypes.add(AggregationType.FIRST_VALUE);
+    aggregationTypes.add(AggregationType.LAST_VALUE);
+    aggregationTypes.add(AggregationType.MIN_TIME);
+    aggregationTypes.add(AggregationType.MAX_TIME);
+    aggregationTypes.add(AggregationType.MAX_VALUE);
+    aggregationTypes.add(AggregationType.MIN_VALUE);
+    SeriesAggregateScanOperator seriesAggregateScanOperator =
+        initSeriesAggregateScanOperator(aggregationTypes, null, true, null);
+    int count = 0;
+    while (seriesAggregateScanOperator.hasNext()) {
+      TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+      assertEquals(20000, resultTsBlock.getColumn(0).getInt(0));
+      assertEquals(10499, resultTsBlock.getColumn(1).getInt(0));
+      assertEquals(0, resultTsBlock.getColumn(2).getLong(0));
+      assertEquals(499, resultTsBlock.getColumn(3).getLong(0));
+      assertEquals(20199, resultTsBlock.getColumn(4).getInt(0));
+      assertEquals(260, resultTsBlock.getColumn(5).getInt(0));
+      count++;
+    }
+    assertEquals(1, count);
+  }
+
+  @Test
+  public void testAggregationWithTimeFilter1() throws IllegalPathException {
+    Filter timeFilter = TimeFilter.gtEq(120);
+    SeriesAggregateScanOperator seriesAggregateScanOperator =
+        initSeriesAggregateScanOperator(
+            Collections.singletonList(AggregationType.COUNT), timeFilter, true, null);
+    int count = 0;
+    while (seriesAggregateScanOperator.hasNext()) {
+      TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+      assertEquals(resultTsBlock.getColumn(0).getLong(0), 380);
+      count++;
+    }
+    assertEquals(1, count);
+  }
+
+  @Test
+  public void testAggregationWithTimeFilter2() throws IllegalPathException {
+    Filter timeFilter = TimeFilter.ltEq(379);
+    SeriesAggregateScanOperator seriesAggregateScanOperator =
+        initSeriesAggregateScanOperator(
+            Collections.singletonList(AggregationType.COUNT), timeFilter, true, null);
+    int count = 0;
+    while (seriesAggregateScanOperator.hasNext()) {
+      TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+      assertEquals(resultTsBlock.getColumn(0).getLong(0), 380);
+      count++;
+    }
+    assertEquals(1, count);
+  }
+
+  @Test
+  public void testAggregationWithTimeFilter3() throws IllegalPathException {
+    Filter timeFilter = new AndFilter(TimeFilter.gtEq(100), TimeFilter.ltEq(399));
+    SeriesAggregateScanOperator seriesAggregateScanOperator =
+        initSeriesAggregateScanOperator(
+            Collections.singletonList(AggregationType.COUNT), timeFilter, true, null);
+    int count = 0;
+    while (seriesAggregateScanOperator.hasNext()) {
+      TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+      assertEquals(resultTsBlock.getColumn(0).getLong(0), 300);
+      count++;
+    }
+    assertEquals(1, count);
+  }
+
+  @Test
+  public void testMultiAggregationWithTimeFilter() throws IllegalPathException {
+    List<AggregationType> aggregationTypes = new ArrayList<>();
+    aggregationTypes.add(AggregationType.FIRST_VALUE);
+    aggregationTypes.add(AggregationType.LAST_VALUE);
+    aggregationTypes.add(AggregationType.MIN_TIME);
+    aggregationTypes.add(AggregationType.MAX_TIME);
+    aggregationTypes.add(AggregationType.MAX_VALUE);
+    aggregationTypes.add(AggregationType.MIN_VALUE);
+    Filter timeFilter = new AndFilter(TimeFilter.gtEq(100), TimeFilter.ltEq(399));
+    SeriesAggregateScanOperator seriesAggregateScanOperator =
+        initSeriesAggregateScanOperator(aggregationTypes, timeFilter, true, null);
+    int count = 0;
+    while (seriesAggregateScanOperator.hasNext()) {
+      TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+      assertEquals(20100, resultTsBlock.getColumn(0).getInt(0));
+      assertEquals(399, resultTsBlock.getColumn(1).getInt(0));
+      assertEquals(100, resultTsBlock.getColumn(2).getLong(0));
+      assertEquals(399, resultTsBlock.getColumn(3).getLong(0));
+      assertEquals(20199, resultTsBlock.getColumn(4).getInt(0));
+      assertEquals(260, resultTsBlock.getColumn(5).getInt(0));
+      count++;
+    }
+    assertEquals(1, count);
+  }
+
+  @Test
+  public void testGroupByWithoutGlobalTimeFilter() throws IllegalPathException {
+    int[] result = new int[] {100, 100, 100, 100};
+    GroupByTimeComponent groupByTimeParameter = new GroupByTimeComponent(0, 399, 100, 100, true);
+    SeriesAggregateScanOperator seriesAggregateScanOperator =
+        initSeriesAggregateScanOperator(
+            Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter);
+    int count = 0;
+    while (seriesAggregateScanOperator.hasNext()) {
+      TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+      assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(0));
+      assertEquals(result[count], resultTsBlock.getColumn(0).getLong(0));
+      count++;
+    }
+    assertEquals(4, count);
+  }
+
+  @Test
+  public void testGroupByWithGlobalTimeFilter() throws IllegalPathException {
+    int[] result = new int[] {0, 80, 100, 80};
+    Filter timeFilter = new AndFilter(TimeFilter.gtEq(120), TimeFilter.ltEq(379));
+    GroupByTimeComponent groupByTimeParameter = new GroupByTimeComponent(0, 399, 100, 100, true);
+    SeriesAggregateScanOperator seriesAggregateScanOperator =
+        initSeriesAggregateScanOperator(
+            Collections.singletonList(AggregationType.COUNT),
+            timeFilter,
+            true,
+            groupByTimeParameter);
+    int count = 0;
+    while (seriesAggregateScanOperator.hasNext()) {
+      TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+      assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(0));
+      assertEquals(result[count], resultTsBlock.getColumn(0).getLong(0));
+      count++;
+    }
+    assertEquals(4, count);
+  }
+
+  @Test
+  public void testGroupByWithMultiFunction() throws IllegalPathException {
+    int[][] result =
+        new int[][] {
+          {20000, 20100, 10200, 10300},
+          {20099, 20199, 299, 399},
+          {20099, 20199, 10259, 10379},
+          {20000, 20100, 260, 380}
+        };
+    List<AggregationType> aggregationTypes = new ArrayList<>();
+    aggregationTypes.add(AggregationType.FIRST_VALUE);
+    aggregationTypes.add(AggregationType.LAST_VALUE);
+    aggregationTypes.add(AggregationType.MAX_VALUE);
+    aggregationTypes.add(AggregationType.MIN_VALUE);
+    GroupByTimeComponent groupByTimeParameter = new GroupByTimeComponent(0, 399, 100, 100, true);
+    SeriesAggregateScanOperator seriesAggregateScanOperator =
+        initSeriesAggregateScanOperator(aggregationTypes, null, true, groupByTimeParameter);
+    int count = 0;
+    while (seriesAggregateScanOperator.hasNext()) {
+      TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+      assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(0));
+      assertEquals(result[0][count], resultTsBlock.getColumn(0).getInt(0));
+      assertEquals(result[1][count], resultTsBlock.getColumn(1).getInt(0));
+      assertEquals(result[2][count], resultTsBlock.getColumn(2).getInt(0));
+      assertEquals(result[3][count], resultTsBlock.getColumn(3).getInt(0));
+      count++;
+    }
+    assertEquals(4, count);
+  }
+
+  @Test
+  public void testGroupBySlidingTimeWindow() throws IllegalPathException {
+    int[] result = new int[] {50, 50, 50, 50, 50, 50, 50, 50};
+    GroupByTimeComponent groupByTimeParameter = new GroupByTimeComponent(0, 399, 100, 50, true);
+    SeriesAggregateScanOperator seriesAggregateScanOperator =
+        initSeriesAggregateScanOperator(
+            Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter);
+    int count = 0;
+    while (seriesAggregateScanOperator.hasNext()) {
+      TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+      assertEquals(50 * count, resultTsBlock.getTimeColumn().getLong(0));
+      assertEquals(result[count], resultTsBlock.getColumn(0).getLong(0));
+      count++;
+    }
+    assertEquals(result.length, count);
+  }
+
+  @Test
+  public void testGroupBySlidingTimeWindow2() throws IllegalPathException {
+    int[] timeColumn = new int[] {0, 20, 30, 50, 60, 80, 90, 110, 120, 140};
+    int[] result = new int[] {20, 10, 20, 10, 20, 10, 20, 10, 20, 9};
+    GroupByTimeComponent groupByTimeParameter = new GroupByTimeComponent(0, 149, 50, 30, true);
+    SeriesAggregateScanOperator seriesAggregateScanOperator =
+        initSeriesAggregateScanOperator(
+            Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter);
+    int count = 0;
+    while (seriesAggregateScanOperator.hasNext()) {
+      TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+      assertEquals(timeColumn[count], resultTsBlock.getTimeColumn().getLong(0));
+      assertEquals(result[count], resultTsBlock.getColumn(0).getLong(0));
+      count++;
+    }
+    assertEquals(timeColumn.length, count);
+  }
+
+  @Test
+  public void testGroupBySlidingWindowWithMultiFunction() throws IllegalPathException {
+    int[] timeColumn = new int[] {0, 20, 30, 50, 60, 80, 90, 110, 120, 140};
+    int[][] result =
+        new int[][] {
+          {20000, 20020, 20030, 20050, 20060, 20080, 20090, 20110, 20120, 20140},
+          {20019, 20029, 20049, 20059, 20079, 20089, 20109, 20119, 20139, 20148},
+          {20019, 20029, 20049, 20059, 20079, 20089, 20109, 20119, 20139, 20148},
+          {20000, 20020, 20030, 20050, 20060, 20080, 20090, 20110, 20120, 20140}
+        };
+    List<AggregationType> aggregationTypes = new ArrayList<>();
+    aggregationTypes.add(AggregationType.FIRST_VALUE);
+    aggregationTypes.add(AggregationType.LAST_VALUE);
+    aggregationTypes.add(AggregationType.MAX_VALUE);
+    aggregationTypes.add(AggregationType.MIN_VALUE);
+    GroupByTimeComponent groupByTimeParameter = new GroupByTimeComponent(0, 149, 50, 30, true);
+    SeriesAggregateScanOperator seriesAggregateScanOperator =
+        initSeriesAggregateScanOperator(aggregationTypes, null, true, groupByTimeParameter);
+    int count = 0;
+    while (seriesAggregateScanOperator.hasNext()) {
+      TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+      assertEquals(timeColumn[count], resultTsBlock.getTimeColumn().getLong(0));
+      assertEquals(result[0][count], resultTsBlock.getColumn(0).getInt(0));
+      assertEquals(result[1][count], resultTsBlock.getColumn(1).getInt(0));
+      assertEquals(result[2][count], resultTsBlock.getColumn(2).getInt(0));
+      assertEquals(result[3][count], resultTsBlock.getColumn(3).getInt(0));
+      count++;
+    }
+    assertEquals(timeColumn.length, count);
+  }
+
+  public SeriesAggregateScanOperator initSeriesAggregateScanOperator(
+      List<AggregationType> aggregateFuncList,
+      Filter timeFilter,
+      boolean ascending,
+      GroupByTimeComponent groupByTimeParameter)
+      throws IllegalPathException {
+    MeasurementPath measurementPath =
+        new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+    Set<String> allSensors = Sets.newHashSet("sensor0");
+    QueryId queryId = new QueryId("stub_query");
+    AtomicReference<FragmentInstanceState> state =
+        new AtomicReference<>(FragmentInstanceState.RUNNING);
+    FragmentInstanceContext fragmentInstanceContext =
+        new FragmentInstanceContext(
+            new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"), state);
+    PlanNodeId planNodeId = new PlanNodeId("1");
+    fragmentInstanceContext.addOperatorContext(
+        1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
+    SeriesAggregateScanOperator seriesAggregateScanOperator =
+        new SeriesAggregateScanOperator(
+            planNodeId,
+            measurementPath,
+            allSensors,
+            fragmentInstanceContext.getOperatorContexts().get(0),
+            aggregateFuncList,
+            timeFilter,
+            ascending,
+            groupByTimeParameter);
+    seriesAggregateScanOperator.initQueryDataSource(
+        new QueryDataSource(seqResources, unSeqResources));
+    return seriesAggregateScanOperator;
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
index 3793504970..78e6821bcd 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperatorTest.java
@@ -37,13 +37,13 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
+import com.google.common.collect.Sets;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
@@ -76,8 +76,7 @@ public class SeriesScanOperatorTest {
     try {
       MeasurementPath measurementPath =
           new MeasurementPath(SERIES_SCAN_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
-      Set<String> allSensors = new HashSet<>();
-      allSensors.add("sensor0");
+      Set<String> allSensors = Sets.newHashSet("sensor0");
       QueryId queryId = new QueryId("stub_query");
       AtomicReference<FragmentInstanceState> state =
           new AtomicReference<>(FragmentInstanceState.RUNNING);
@@ -87,6 +86,7 @@ public class SeriesScanOperatorTest {
       PlanNodeId planNodeId = new PlanNodeId("1");
       fragmentInstanceContext.addOperatorContext(
           1, planNodeId, SeriesScanOperator.class.getSimpleName());
+
       SeriesScanOperator seriesScanOperator =
           new SeriesScanOperator(
               planNodeId,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryLogicalPlanUtil.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryLogicalPlanUtil.java
index 57792799ae..e902078fa6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryLogicalPlanUtil.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/QueryLogicalPlanUtil.java
@@ -231,6 +231,7 @@ public class QueryLogicalPlanUtil {
         new SeriesAggregateScanNode(
             new PlanNodeId("test_query_0"),
             schemaMap.get("root.sg.d1.s1"),
+            Sets.newHashSet("s1"),
             aggregationTypeList1,
             OrderBy.TIMESTAMP_DESC,
             timeFilter,
@@ -239,6 +240,7 @@ public class QueryLogicalPlanUtil {
         new SeriesAggregateScanNode(
             new PlanNodeId("test_query_1"),
             schemaMap.get("root.sg.d1.s2"),
+            Sets.newHashSet("s2"),
             aggregationTypeList2,
             OrderBy.TIMESTAMP_DESC,
             timeFilter,
@@ -247,6 +249,7 @@ public class QueryLogicalPlanUtil {
         new SeriesAggregateScanNode(
             new PlanNodeId("test_query_2"),
             schemaMap.get("root.sg.d2.s1"),
+            Sets.newHashSet("s1"),
             aggregationTypeList1,
             OrderBy.TIMESTAMP_DESC,
             timeFilter,
@@ -255,6 +258,7 @@ public class QueryLogicalPlanUtil {
         new SeriesAggregateScanNode(
             new PlanNodeId("test_query_3"),
             schemaMap.get("root.sg.d2.s2"),
+            Sets.newHashSet("s2"),
             aggregationTypeList2,
             OrderBy.TIMESTAMP_DESC,
             timeFilter,
@@ -318,6 +322,7 @@ public class QueryLogicalPlanUtil {
         new SeriesAggregateScanNode(
             new PlanNodeId("test_query_0"),
             schemaMap.get("root.sg.d1.s1"),
+            Sets.newHashSet("s1"),
             aggregationTypeList1,
             OrderBy.TIMESTAMP_DESC,
             timeFilter,
@@ -326,6 +331,7 @@ public class QueryLogicalPlanUtil {
         new SeriesAggregateScanNode(
             new PlanNodeId("test_query_1"),
             schemaMap.get("root.sg.d1.s2"),
+            Sets.newHashSet("s2"),
             aggregationTypeList2,
             OrderBy.TIMESTAMP_DESC,
             timeFilter,
@@ -334,6 +340,7 @@ public class QueryLogicalPlanUtil {
         new SeriesAggregateScanNode(
             new PlanNodeId("test_query_2"),
             schemaMap.get("root.sg.d2.s1"),
+            Sets.newHashSet("s1"),
             aggregationTypeList1,
             OrderBy.TIMESTAMP_DESC,
             timeFilter,
@@ -342,6 +349,7 @@ public class QueryLogicalPlanUtil {
         new SeriesAggregateScanNode(
             new PlanNodeId("test_query_3"),
             schemaMap.get("root.sg.d2.s2"),
+            Sets.newHashSet("s2"),
             aggregationTypeList2,
             OrderBy.TIMESTAMP_DESC,
             timeFilter,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/source/SeriesAggregateScanNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/source/SeriesAggregateScanNodeSerdeTest.java
index b66c3e4e9f..67fb28feab 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/source/SeriesAggregateScanNodeSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/node/source/SeriesAggregateScanNodeSerdeTest.java
@@ -26,11 +26,13 @@ import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.mpp.sql.plan.node.PlanNodeDeserializeHelper;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
+import org.apache.iotdb.db.mpp.sql.statement.component.GroupByTimeComponent;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.filter.operator.In;
 
+import org.apache.commons.compress.utils.Sets;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
@@ -50,14 +52,17 @@ public class SeriesAggregateScanNodeSerdeTest {
     st.add("s2");
     List<AggregationType> aggregateFuncList = new ArrayList<>();
     aggregateFuncList.add(AggregationType.MAX_TIME);
+    GroupByTimeComponent groupByTimeComponent =
+        new GroupByTimeComponent(1, 100, 1, 1, true, true, true);
     SeriesAggregateScanNode seriesAggregateScanNode =
         new SeriesAggregateScanNode(
             new PlanNodeId("TestSeriesAggregateScanNode"),
             new MeasurementPath("root.sg.d1.s1", TSDataType.BOOLEAN),
+            Sets.newHashSet("s1"),
             aggregateFuncList,
             OrderBy.TIMESTAMP_ASC,
             new In<String>(st, VALUE_FILTER, true),
-            null);
+            groupByTimeComponent);
     seriesAggregateScanNode.setRegionReplicaSet(
         new RegionReplicaSet(new DataRegionId(1), new ArrayList<>()));
 
diff --git a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSetTest.java b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSetTest.java
index 8dfe2245bf..bde500b3f1 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSetTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSetTest.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.db.query.dataset.groupby;
 
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
 import org.apache.iotdb.db.query.aggregation.impl.CountAggrResult;
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -55,9 +55,9 @@ public class GroupByTimeDataSetTest {
     GroupByTimeDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, groupByTimePlan);
     int cnt = 0;
     while (groupByEngine.hasNext()) {
-      Pair pair = groupByEngine.nextTimePartition();
-      Assert.assertEquals(startTimeArray[cnt], pair.left);
-      Assert.assertEquals(endTimeArray[cnt], pair.right);
+      TimeRange timeRange = groupByEngine.nextTimePartition();
+      Assert.assertEquals(startTimeArray[cnt], timeRange.getMin());
+      Assert.assertEquals(endTimeArray[cnt], timeRange.getMax());
       cnt++;
     }
     Assert.assertEquals(startTimeArray.length, cnt);
@@ -83,9 +83,9 @@ public class GroupByTimeDataSetTest {
     GroupByTimeDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, groupByTimePlan);
     int cnt = 0;
     while (groupByEngine.hasNext()) {
-      Pair pair = groupByEngine.nextTimePartition();
-      Assert.assertEquals(startTimeArray[cnt], pair.left);
-      Assert.assertEquals(endTimeArray[cnt], pair.right);
+      TimeRange timeRange = groupByEngine.nextTimePartition();
+      Assert.assertEquals(startTimeArray[cnt], timeRange.getMin());
+      Assert.assertEquals(endTimeArray[cnt], timeRange.getMax());
       cnt++;
     }
     Assert.assertEquals(startTimeArray.length, cnt);
@@ -111,9 +111,9 @@ public class GroupByTimeDataSetTest {
     GroupByTimeDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, groupByTimePlan);
     int cnt = 0;
     while (groupByEngine.hasNext()) {
-      Pair pair = groupByEngine.nextTimePartition();
-      Assert.assertEquals(startTimeArray[cnt], pair.left);
-      Assert.assertEquals(endTimeArray[cnt], pair.right);
+      TimeRange timeRange = groupByEngine.nextTimePartition();
+      Assert.assertEquals(startTimeArray[cnt], timeRange.getMin());
+      Assert.assertEquals(endTimeArray[cnt], timeRange.getMax());
       cnt++;
     }
     Assert.assertEquals(startTimeArray.length, cnt);
@@ -141,9 +141,9 @@ public class GroupByTimeDataSetTest {
     GroupByTimeDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, groupByTimePlan);
     int cnt = 0;
     while (groupByEngine.hasNext()) {
-      Pair pair = groupByEngine.nextTimePartition();
-      Assert.assertEquals(startTimeArray[cnt], pair.left);
-      Assert.assertEquals(endTimeArray[cnt], pair.right);
+      TimeRange timeRange = groupByEngine.nextTimePartition();
+      Assert.assertEquals(startTimeArray[cnt], timeRange.getMin());
+      Assert.assertEquals(endTimeArray[cnt], timeRange.getMax());
       cnt++;
     }
     Assert.assertEquals(startTimeArray.length, cnt);
@@ -170,9 +170,9 @@ public class GroupByTimeDataSetTest {
     GroupByTimeDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, groupByTimePlan);
     int cnt = 0;
     while (groupByEngine.hasNext()) {
-      Pair pair = groupByEngine.nextTimePartition();
-      Assert.assertEquals(startTimeArray[cnt], pair.left);
-      Assert.assertEquals(endTimeArray[cnt], pair.right);
+      TimeRange timeRange = groupByEngine.nextTimePartition();
+      Assert.assertEquals(startTimeArray[cnt], timeRange.getMin());
+      Assert.assertEquals(endTimeArray[cnt], timeRange.getMax());
       cnt++;
     }
     Assert.assertEquals(startTimeArray.length, cnt);
@@ -202,10 +202,10 @@ public class GroupByTimeDataSetTest {
     GroupByTimeDataSet groupByEngine = new GroupByWithValueFilterDataSet(queryId, groupByTimePlan);
     int cnt = 0;
     while (groupByEngine.hasNext()) {
-      Pair pair = groupByEngine.nextTimePartition();
+      TimeRange timeRange = groupByEngine.nextTimePartition();
       Assert.assertTrue(cnt < startTimeArray.length);
-      Assert.assertEquals(startTimeArray[cnt], pair.left);
-      Assert.assertEquals(endTimeArray[cnt], pair.right);
+      Assert.assertEquals(startTimeArray[cnt], timeRange.getMin());
+      Assert.assertEquals(endTimeArray[cnt], timeRange.getMax());
       cnt++;
     }
     Assert.assertEquals(startTimeArray.length, cnt);
@@ -240,10 +240,10 @@ public class GroupByTimeDataSetTest {
     int cnt = 0;
 
     while (groupByEngine.hasNext()) {
-      Pair pair = groupByEngine.nextTimePartition();
+      TimeRange timeRange = groupByEngine.nextTimePartition();
       Assert.assertTrue(cnt < startTimeArray.length);
-      Assert.assertEquals(startTimeArray[cnt], df.format(new Date((long) pair.left)));
-      Assert.assertEquals(endTimeArray[cnt], df.format(new Date((long) pair.right)));
+      Assert.assertEquals(startTimeArray[cnt], df.format(new Date((long) timeRange.getMin())));
+      Assert.assertEquals(endTimeArray[cnt], df.format(new Date((long) timeRange.getMax())));
       cnt++;
     }
 
@@ -294,10 +294,10 @@ public class GroupByTimeDataSetTest {
     int cnt = 0;
 
     while (groupByEngine.hasNext()) {
-      Pair pair = groupByEngine.nextTimePartition();
+      TimeRange timeRange = groupByEngine.nextTimePartition();
       Assert.assertTrue(cnt < startTimeArray.length);
-      Assert.assertEquals(startTimeArray[cnt], df.format(new Date((long) pair.left)));
-      Assert.assertEquals(endTimeArray[cnt], df.format(new Date((long) pair.right)));
+      Assert.assertEquals(startTimeArray[cnt], df.format(new Date((long) timeRange.getMin())));
+      Assert.assertEquals(endTimeArray[cnt], df.format(new Date((long) timeRange.getMax())));
       cnt++;
     }
 
@@ -334,10 +334,10 @@ public class GroupByTimeDataSetTest {
     int cnt = 0;
 
     while (groupByEngine.hasNext()) {
-      Pair pair = groupByEngine.nextTimePartition();
+      TimeRange timeRange = groupByEngine.nextTimePartition();
       Assert.assertTrue(cnt < startTimeArray.length);
-      Assert.assertEquals(startTimeArray[cnt], df.format(new Date((long) pair.left)));
-      Assert.assertEquals(endTimeArray[cnt], df.format(new Date((long) pair.right)));
+      Assert.assertEquals(startTimeArray[cnt], df.format(new Date((long) timeRange.getMin())));
+      Assert.assertEquals(endTimeArray[cnt], df.format(new Date((long) timeRange.getMax())));
       cnt++;
     }
     Assert.assertEquals(startTimeArray.length, cnt);
@@ -388,10 +388,10 @@ public class GroupByTimeDataSetTest {
     int cnt = 0;
 
     while (groupByEngine.hasNext()) {
-      Pair pair = groupByEngine.nextTimePartition();
+      TimeRange timeRange = groupByEngine.nextTimePartition();
       Assert.assertTrue(cnt < startTimeArray.length);
-      Assert.assertEquals(startTimeArray[cnt], df.format(new Date((long) pair.left)));
-      Assert.assertEquals(endTimeArray[cnt], df.format(new Date((long) pair.right)));
+      Assert.assertEquals(startTimeArray[cnt], df.format(new Date((long) timeRange.getMin())));
+      Assert.assertEquals(endTimeArray[cnt], df.format(new Date((long) timeRange.getMax())));
       cnt++;
     }
     Assert.assertEquals(startTimeArray.length, cnt);
@@ -442,10 +442,10 @@ public class GroupByTimeDataSetTest {
     int cnt = 0;
 
     while (groupByEngine.hasNext()) {
-      Pair pair = groupByEngine.nextTimePartition();
+      TimeRange timeRange = groupByEngine.nextTimePartition();
       Assert.assertTrue(cnt < startTimeArray.length);
-      Assert.assertEquals(startTimeArray[cnt], df.format(new Date((long) pair.left)));
-      Assert.assertEquals(endTimeArray[cnt], df.format(new Date((long) pair.right)));
+      Assert.assertEquals(startTimeArray[cnt], df.format(new Date((long) timeRange.getMin())));
+      Assert.assertEquals(endTimeArray[cnt], df.format(new Date((long) timeRange.getMax())));
       cnt++;
     }
     Assert.assertEquals(startTimeArray.length, cnt);
@@ -481,10 +481,10 @@ public class GroupByTimeDataSetTest {
     int cnt = 0;
 
     while (groupByEngine.hasNext()) {
-      Pair pair = groupByEngine.nextTimePartition();
+      TimeRange timeRange = groupByEngine.nextTimePartition();
       Assert.assertTrue(cnt < startTimeArray.length);
-      Assert.assertEquals(startTimeArray[cnt], df.format(new Date((long) pair.left)));
-      Assert.assertEquals(endTimeArray[cnt], df.format(new Date((long) pair.right)));
+      Assert.assertEquals(startTimeArray[cnt], df.format(new Date((long) timeRange.getMin())));
+      Assert.assertEquals(endTimeArray[cnt], df.format(new Date((long) timeRange.getMax())));
       cnt++;
     }
     Assert.assertEquals(startTimeArray.length, cnt);
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
index aa61459632..15266f5852 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderTestUtil.java
@@ -49,6 +49,14 @@ import java.util.Map;
 
 import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR;
 
+/**
+ * This util contains 5 seqFiles and 5 unseqFiles in default.
+ *
+ * <p>Sequence time range of data: [0, 99], [100, 199], [200, 299], [300, 399], [400, 499]
+ *
+ * <p>UnSequence time range of data: [0, 19], [100, 139], [200, 259], [300, 379], [400, 499], [0,
+ * 199]
+ */
 public class SeriesReaderTestUtil {
 
   private static int seqFileNum = 5;
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/TimeRangeIteratorTest.java b/server/src/test/java/org/apache/iotdb/db/utils/TimeRangeIteratorTest.java
index 2057ff50f2..a5927b7a18 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/TimeRangeIteratorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/TimeRangeIteratorTest.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.utils;
 
 import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.utils.timerangeiterator.TimeRangeIteratorFactory;
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -31,8 +31,17 @@ public class TimeRangeIteratorTest {
   @Test
   public void testNotSplitTimeRange() {
     String[] res = {
-      "<0,4>", "<3,7>", "<6,10>", "<9,13>", "<12,16>", "<15,19>", "<18,22>", "<21,25>", "<24,28>",
-      "<27,31>", "<30,32>"
+      "[ 0 : 4 ]",
+      "[ 3 : 7 ]",
+      "[ 6 : 10 ]",
+      "[ 9 : 13 ]",
+      "[ 12 : 16 ]",
+      "[ 15 : 19 ]",
+      "[ 18 : 22 ]",
+      "[ 21 : 25 ]",
+      "[ 24 : 28 ]",
+      "[ 27 : 31 ]",
+      "[ 30 : 32 ]"
     };
 
     long startTime = 0, endTime = 32, interval = 4, slidingStep = 3;
@@ -53,25 +62,103 @@ public class TimeRangeIteratorTest {
   @Test
   public void testSplitTimeRange() {
     String[] res4_1 = {
-      "<0,1>", "<1,2>", "<2,3>", "<3,4>", "<4,5>", "<5,6>", "<6,7>", "<7,8>", "<8,9>", "<9,10>",
-      "<10,11>", "<11,12>", "<12,13>", "<13,14>", "<14,15>", "<15,16>", "<16,17>", "<17,18>",
-      "<18,19>", "<19,20>", "<20,21>", "<21,22>", "<22,23>", "<23,24>", "<24,25>", "<25,26>",
-      "<26,27>", "<27,28>", "<28,29>", "<29,30>", "<30,31>", "<31,32>"
+      "[ 0 : 1 ]",
+      "[ 1 : 2 ]",
+      "[ 2 : 3 ]",
+      "[ 3 : 4 ]",
+      "[ 4 : 5 ]",
+      "[ 5 : 6 ]",
+      "[ 6 : 7 ]",
+      "[ 7 : 8 ]",
+      "[ 8 : 9 ]",
+      "[ 9 : 10 ]",
+      "[ 10 : 11 ]",
+      "[ 11 : 12 ]",
+      "[ 12 : 13 ]",
+      "[ 13 : 14 ]",
+      "[ 14 : 15 ]",
+      "[ 15 : 16 ]",
+      "[ 16 : 17 ]",
+      "[ 17 : 18 ]",
+      "[ 18 : 19 ]",
+      "[ 19 : 20 ]",
+      "[ 20 : 21 ]",
+      "[ 21 : 22 ]",
+      "[ 22 : 23 ]",
+      "[ 23 : 24 ]",
+      "[ 24 : 25 ]",
+      "[ 25 : 26 ]",
+      "[ 26 : 27 ]",
+      "[ 27 : 28 ]",
+      "[ 28 : 29 ]",
+      "[ 29 : 30 ]",
+      "[ 30 : 31 ]",
+      "[ 31 : 32 ]"
     };
     String[] res4_2 = {
-      "<0,2>", "<2,4>", "<4,6>", "<6,8>", "<8,10>", "<10,12>", "<12,14>", "<14,16>", "<16,18>",
-      "<18,20>", "<20,22>", "<22,24>", "<24,26>", "<26,28>", "<28,30>", "<30,32>"
+      "[ 0 : 2 ]",
+      "[ 2 : 4 ]",
+      "[ 4 : 6 ]",
+      "[ 6 : 8 ]",
+      "[ 8 : 10 ]",
+      "[ 10 : 12 ]",
+      "[ 12 : 14 ]",
+      "[ 14 : 16 ]",
+      "[ 16 : 18 ]",
+      "[ 18 : 20 ]",
+      "[ 20 : 22 ]",
+      "[ 22 : 24 ]",
+      "[ 24 : 26 ]",
+      "[ 26 : 28 ]",
+      "[ 28 : 30 ]",
+      "[ 30 : 32 ]"
     };
     String[] res4_3 = {
-      "<0,1>", "<1,3>", "<3,4>", "<4,6>", "<6,7>", "<7,9>", "<9,10>", "<10,12>", "<12,13>",
-      "<13,15>", "<15,16>", "<16,18>", "<18,19>", "<19,21>", "<21,22>", "<22,24>", "<24,25>",
-      "<25,27>", "<27,28>", "<28,30>", "<30,31>", "<31,32>"
+      "[ 0 : 1 ]",
+      "[ 1 : 3 ]",
+      "[ 3 : 4 ]",
+      "[ 4 : 6 ]",
+      "[ 6 : 7 ]",
+      "[ 7 : 9 ]",
+      "[ 9 : 10 ]",
+      "[ 10 : 12 ]",
+      "[ 12 : 13 ]",
+      "[ 13 : 15 ]",
+      "[ 15 : 16 ]",
+      "[ 16 : 18 ]",
+      "[ 18 : 19 ]",
+      "[ 19 : 21 ]",
+      "[ 21 : 22 ]",
+      "[ 22 : 24 ]",
+      "[ 24 : 25 ]",
+      "[ 25 : 27 ]",
+      "[ 27 : 28 ]",
+      "[ 28 : 30 ]",
+      "[ 30 : 31 ]",
+      "[ 31 : 32 ]"
     };
     String[] res4_4 = {
-      "<0,4>", "<4,8>", "<8,12>", "<12,16>", "<16,20>", "<20,24>", "<24,28>", "<28,32>"
+      "[ 0 : 4 ]",
+      "[ 4 : 8 ]",
+      "[ 8 : 12 ]",
+      "[ 12 : 16 ]",
+      "[ 16 : 20 ]",
+      "[ 20 : 24 ]",
+      "[ 24 : 28 ]",
+      "[ 28 : 32 ]"
+    };
+    String[] res4_5 = {
+      "[ 0 : 4 ]",
+      "[ 5 : 9 ]",
+      "[ 10 : 14 ]",
+      "[ 15 : 19 ]",
+      "[ 20 : 24 ]",
+      "[ 25 : 29 ]",
+      "[ 30 : 32 ]"
+    };
+    String[] res4_6 = {
+      "[ 0 : 4 ]", "[ 6 : 10 ]", "[ 12 : 16 ]", "[ 18 : 22 ]", "[ 24 : 28 ]", "[ 30 : 32 ]"
     };
-    String[] res4_5 = {"<0,4>", "<5,9>", "<10,14>", "<15,19>", "<20,24>", "<25,29>", "<30,32>"};
-    String[] res4_6 = {"<0,4>", "<6,10>", "<12,16>", "<18,22>", "<24,28>", "<30,32>"};
 
     checkRes(
         TimeRangeIteratorFactory.getTimeRangeIterator(0, 32, 4, 1, true, false, false, true),
@@ -114,62 +201,62 @@ public class TimeRangeIteratorTest {
   @Test
   public void testNaturalMonthTimeRange() {
     String[] res1 = {
-      "<1604102400000,1606694400000>",
-      "<1606694400000,1609372800000>",
-      "<1609372800000,1612051200000>",
-      "<1612051200000,1614470400000>",
-      "<1614470400000,1617148800000>"
+      "[ 1604102400000 : 1606694400000 ]",
+      "[ 1606694400000 : 1609372800000 ]",
+      "[ 1609372800000 : 1612051200000 ]",
+      "[ 1612051200000 : 1614470400000 ]",
+      "[ 1614470400000 : 1617148800000 ]"
     };
     String[] res2 = {
-      "<1604102400000,1604966400000>",
-      "<1606694400000,1607558400000>",
-      "<1609372800000,1610236800000>",
-      "<1612051200000,1612915200000>",
-      "<1614470400000,1615334400000>"
+      "[ 1604102400000 : 1604966400000 ]",
+      "[ 1606694400000 : 1607558400000 ]",
+      "[ 1609372800000 : 1610236800000 ]",
+      "[ 1612051200000 : 1612915200000 ]",
+      "[ 1614470400000 : 1615334400000 ]"
     };
     String[] res3 = {
-      "<1604102400000,1606694400000>",
-      "<1604966400000,1607558400000>",
-      "<1605830400000,1608422400000>",
-      "<1606694400000,1609372800000>",
-      "<1607558400000,1610236800000>",
-      "<1608422400000,1611100800000>",
-      "<1609286400000,1611964800000>",
-      "<1610150400000,1612828800000>",
-      "<1611014400000,1613692800000>",
-      "<1611878400000,1614470400000>",
-      "<1612742400000,1615161600000>",
-      "<1613606400000,1616025600000>",
-      "<1614470400000,1617148800000>",
-      "<1615334400000,1617148800000>",
-      "<1616198400000,1617148800000>",
-      "<1617062400000,1617148800000>"
+      "[ 1604102400000 : 1606694400000 ]",
+      "[ 1604966400000 : 1607558400000 ]",
+      "[ 1605830400000 : 1608422400000 ]",
+      "[ 1606694400000 : 1609372800000 ]",
+      "[ 1607558400000 : 1610236800000 ]",
+      "[ 1608422400000 : 1611100800000 ]",
+      "[ 1609286400000 : 1611964800000 ]",
+      "[ 1610150400000 : 1612828800000 ]",
+      "[ 1611014400000 : 1613692800000 ]",
+      "[ 1611878400000 : 1614470400000 ]",
+      "[ 1612742400000 : 1615161600000 ]",
+      "[ 1613606400000 : 1616025600000 ]",
+      "[ 1614470400000 : 1617148800000 ]",
+      "[ 1615334400000 : 1617148800000 ]",
+      "[ 1616198400000 : 1617148800000 ]",
+      "[ 1617062400000 : 1617148800000 ]"
     };
     String[] res4 = {
-      "<1604102400000,1604966400000>",
-      "<1604966400000,1605830400000>",
-      "<1605830400000,1606694400000>",
-      "<1606694400000,1607558400000>",
-      "<1607558400000,1608422400000>",
-      "<1608422400000,1609286400000>",
-      "<1609286400000,1609372800000>",
-      "<1609372800000,1610150400000>",
-      "<1610150400000,1610236800000>",
-      "<1610236800000,1611014400000>",
-      "<1611014400000,1611100800000>",
-      "<1611100800000,1611878400000>",
-      "<1611878400000,1611964800000>",
-      "<1611964800000,1612742400000>",
-      "<1612742400000,1612828800000>",
-      "<1612828800000,1613606400000>",
-      "<1613606400000,1613692800000>",
-      "<1613692800000,1614470400000>",
-      "<1614470400000,1615161600000>",
-      "<1615161600000,1615334400000>",
-      "<1615334400000,1616025600000>",
-      "<1616025600000,1616198400000>",
-      "<1616198400000,1617062400000>",
-      "<1617062400000,1617148800000>"
+      "[ 1604102400000 : 1604966400000 ]",
+      "[ 1604966400000 : 1605830400000 ]",
+      "[ 1605830400000 : 1606694400000 ]",
+      "[ 1606694400000 : 1607558400000 ]",
+      "[ 1607558400000 : 1608422400000 ]",
+      "[ 1608422400000 : 1609286400000 ]",
+      "[ 1609286400000 : 1609372800000 ]",
+      "[ 1609372800000 : 1610150400000 ]",
+      "[ 1610150400000 : 1610236800000 ]",
+      "[ 1610236800000 : 1611014400000 ]",
+      "[ 1611014400000 : 1611100800000 ]",
+      "[ 1611100800000 : 1611878400000 ]",
+      "[ 1611878400000 : 1611964800000 ]",
+      "[ 1611964800000 : 1612742400000 ]",
+      "[ 1612742400000 : 1612828800000 ]",
+      "[ 1612828800000 : 1613606400000 ]",
+      "[ 1613606400000 : 1613692800000 ]",
+      "[ 1613692800000 : 1614470400000 ]",
+      "[ 1614470400000 : 1615161600000 ]",
+      "[ 1615161600000 : 1615334400000 ]",
+      "[ 1615334400000 : 1616025600000 ]",
+      "[ 1616025600000 : 1616198400000 ]",
+      "[ 1616198400000 : 1617062400000 ]",
+      "[ 1617062400000 : 1617148800000 ]"
     };
     checkRes(
         TimeRangeIteratorFactory.getTimeRangeIterator(
@@ -199,22 +286,13 @@ public class TimeRangeIteratorTest {
 
   private void checkRes(ITimeRangeIterator timeRangeIterator, String[] res) {
     boolean isAscending = timeRangeIterator.isAscending();
-    long curStartTime;
     int cnt = isAscending ? 0 : res.length - 1;
 
-    // test first time range
-    Pair<Long, Long> firstTimeRange = timeRangeIterator.getFirstTimeRange();
-    Assert.assertEquals(res[cnt], firstTimeRange.toString());
-    cnt += isAscending ? 1 : -1;
-    curStartTime = firstTimeRange.left;
-
     // test next time ranges
-    Pair<Long, Long> curTimeRange = timeRangeIterator.getNextTimeRange(curStartTime);
-    while (curTimeRange != null) {
+    while (timeRangeIterator.hasNextTimeRange()) {
+      TimeRange curTimeRange = timeRangeIterator.nextTimeRange();
       Assert.assertEquals(res[cnt], curTimeRange.toString());
       cnt += isAscending ? 1 : -1;
-      curStartTime = curTimeRange.left;
-      curTimeRange = timeRangeIterator.getNextTimeRange(curStartTime);
     }
   }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
index f6ae37a398..256b95498d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
@@ -110,6 +110,10 @@ public class TsBlock {
     return positionCount;
   }
 
+  public long getStartTime() {
+    return timeColumn.getStartTime();
+  }
+
   public long getEndTime() {
     return timeColumn.getEndTime();
   }
@@ -191,7 +195,7 @@ public class TsBlock {
     return new AlignedTsBlockIterator(0, subIndex);
   }
 
-  private class TsBlockSingleColumnIterator implements IPointReader, IBatchDataIterator {
+  public class TsBlockSingleColumnIterator implements IPointReader, IBatchDataIterator {
 
     protected int rowIndex;
     protected int columnIndex;
@@ -261,6 +265,22 @@ public class TsBlock {
 
     @Override
     public void close() {}
+
+    public long getEndTime() {
+      return TsBlock.this.getEndTime();
+    }
+
+    public long getStartTime() {
+      return TsBlock.this.getStartTime();
+    }
+
+    public int getRowIndex() {
+      return rowIndex;
+    }
+
+    public void setRowIndex(int rowIndex) {
+      this.rowIndex = rowIndex;
+    }
   }
 
   /** Mainly used for UDF framework. Note that the timestamps are at the last column. */
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
index 8de927b785..b02e897235 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java
@@ -209,6 +209,10 @@ public class TsBlockBuilder {
 
     declaredPositions = 0;
 
+    timeColumnBuilder =
+        (TimeColumnBuilder)
+            timeColumnBuilder.newColumnBuilderLike(
+                tsBlockBuilderStatus.createColumnBuilderStatus());
     for (int i = 0; i < valueColumnBuilders.length; i++) {
       valueColumnBuilders[i] =
           valueColumnBuilders[i].newColumnBuilderLike(
@@ -240,6 +244,7 @@ public class TsBlockBuilder {
     return types.get(channel);
   }
 
+  // Indicate current row number
   public void declarePosition() {
     declaredPositions++;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java
index beb17a3ea5..4efcac5996 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumnBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.read.common.block.column;
 
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -71,6 +72,16 @@ public class BinaryColumnBuilder implements ColumnBuilder {
     return this;
   }
 
+  /** Write an Object to the current entry, which should be the Binary type; */
+  @Override
+  public ColumnBuilder writeObject(Object value) {
+    if (value instanceof Binary) {
+      writeBinary((Binary) value);
+      return this;
+    }
+    throw new UnSupportedDataTypeException("BinaryColumn only support Binary data type");
+  }
+
   @Override
   public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
     return writeBinary(value.getBinary());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumnBuilder.java
index 74fedecd60..24a8284e25 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumnBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.read.common.block.column;
 
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
@@ -73,6 +74,16 @@ public class BooleanColumnBuilder implements ColumnBuilder {
     return this;
   }
 
+  /** Write an Object to the current entry, which should be the Boolean type; */
+  @Override
+  public ColumnBuilder writeObject(Object value) {
+    if (value instanceof Boolean) {
+      writeBoolean((Boolean) value);
+      return this;
+    }
+    throw new UnSupportedDataTypeException("BooleanColumn only support Boolean data type");
+  }
+
   @Override
   public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
     return writeBoolean(value.getBoolean());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilder.java
index 37efd66949..b431db1786 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilder.java
@@ -59,6 +59,11 @@ public interface ColumnBuilder {
     throw new UnsupportedOperationException(getClass().getName());
   }
 
+  /** Write an Object to the current entry, which should be the corresponding type; */
+  default ColumnBuilder writeObject(Object value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
   int appendColumn(
       TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder);
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumnBuilder.java
index bff93c61df..7f546e1b69 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumnBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.read.common.block.column;
 
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
@@ -73,6 +74,16 @@ public class DoubleColumnBuilder implements ColumnBuilder {
     return this;
   }
 
+  /** Write an Object to the current entry, which should be the Double type; */
+  @Override
+  public ColumnBuilder writeObject(Object value) {
+    if (value instanceof Double) {
+      writeDouble((Double) value);
+      return this;
+    }
+    throw new UnSupportedDataTypeException("DoubleColumn only support Double data type");
+  }
+
   @Override
   public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
     return writeDouble(value.getDouble());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumnBuilder.java
index 0061baa03e..921fef29eb 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumnBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.read.common.block.column;
 
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
@@ -78,6 +79,16 @@ public class FloatColumnBuilder implements ColumnBuilder {
     return writeFloat(value.getFloat());
   }
 
+  /** Write an Object to the current entry, which should be the Float type; */
+  @Override
+  public ColumnBuilder writeObject(Object value) {
+    if (value instanceof Float) {
+      writeFloat((Float) value);
+      return this;
+    }
+    throw new UnSupportedDataTypeException("FloatColumn only support Float data type");
+  }
+
   @Override
   public int appendColumn(
       TimeColumn timeColumn, Column valueColumn, int offset, TimeColumnBuilder timeBuilder) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumnBuilder.java
index 3a5db5b773..e1be6f6adb 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumnBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.read.common.block.column;
 
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
@@ -73,6 +74,16 @@ public class IntColumnBuilder implements ColumnBuilder {
     return this;
   }
 
+  /** Write an Object to the current entry, which should be the Integer type; */
+  @Override
+  public ColumnBuilder writeObject(Object value) {
+    if (value instanceof Integer) {
+      writeInt((Integer) value);
+      return this;
+    }
+    throw new UnSupportedDataTypeException("IntegerColumn only support Integer data type");
+  }
+
   @Override
   public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
     return writeInt(value.getInt());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumnBuilder.java
index 38afe3d711..ad21bec65c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumnBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.read.common.block.column;
 
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
@@ -73,6 +74,16 @@ public class LongColumnBuilder implements ColumnBuilder {
     return this;
   }
 
+  /** Write an Object to the current entry, which should be the Long type; */
+  @Override
+  public ColumnBuilder writeObject(Object value) {
+    if (value instanceof Long) {
+      writeLong((Long) value);
+      return this;
+    }
+    throw new UnSupportedDataTypeException("LongColumn only support Long data type");
+  }
+
   @Override
   public ColumnBuilder writeTsPrimitiveType(TsPrimitiveType value) {
     return writeLong(value.getLong());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
index d10ba9fc68..d8b44fd384 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
@@ -104,6 +104,10 @@ public class TimeColumn implements Column {
     return new TimeColumn(positionOffset + arrayOffset, length, values);
   }
 
+  public long getStartTime() {
+    return values[arrayOffset];
+  }
+
   public long getEndTime() {
     return values[getPositionCount() + arrayOffset - 1];
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumnBuilder.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumnBuilder.java
index 7b5dee9368..99488d1213 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumnBuilder.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumnBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.tsfile.read.common.block.column;
 
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import org.openjdk.jol.info.ClassLayout;
@@ -71,6 +72,16 @@ public class TimeColumnBuilder implements ColumnBuilder {
     throw new UnsupportedOperationException(getClass().getName());
   }
 
+  /** Write an Object to the current entry, which should be the Long type; */
+  @Override
+  public ColumnBuilder writeObject(Object value) {
+    if (value instanceof Long) {
+      writeLong((Long) value);
+      return this;
+    }
+    throw new UnSupportedDataTypeException("LongColumn only support Long data type");
+  }
+
   @Override
   public ColumnBuilder appendNull() {
     throw new UnsupportedOperationException(getClass().getName());