You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2021/11/22 03:58:34 UTC

[GitHub] [iotdb] JackieTien97 commented on a change in pull request #4426: [IOTDB-1773] Aligned timeseries support group by query without value filter for standalone mode

JackieTien97 commented on a change in pull request #4426:
URL: https://github.com/apache/iotdb/pull/4426#discussion_r753910866



##########
File path: server/src/main/java/org/apache/iotdb/db/metadata/utils/MetaUtils.java
##########
@@ -146,4 +148,52 @@ public static void collectLastNode(IMNode node, List<IMNode> lastNodeList) {
       }
     }
   }
+
+  /**
+   * Merge same series and convert to series map. For example: Given: paths: s1, s2, s3, s1 and
+   * aggregations: count, sum, count, sum. Then: pathToAggrIndexesMap: s1 -> 0, 3; s2 -> 1; s3 -> 2
+   *
+   * @param selectedSeries selected series
+   * @return path to aggregation indexes map
+   */
+  public static Map<PartialPath, List<Integer>> groupAggregationsBySeries(
+      List<? extends Path> selectedSeries) {
+    Map<PartialPath, List<Integer>> pathToAggrIndexesMap = new HashMap<>();
+    for (int i = 0; i < selectedSeries.size(); i++) {
+      PartialPath series = (PartialPath) selectedSeries.get(i);
+      pathToAggrIndexesMap.computeIfAbsent(series, key -> new ArrayList<>()).add(i);
+    }
+    return pathToAggrIndexesMap;
+  }
+
+  /**
+   * Group all the series under an aligned entity into one AlignedPath and remove these series from
+   * pathToAggrIndexesMap. For example, input map: d1[s1] -> [1, 3], d1[s2] -> [2,4], will return
+   * d1[s1,s2], [[1,3], [2,4]]
+   */
+  public static Map<PartialPath, List<List<Integer>>> groupAlignedSeries(
+      Map<PartialPath, List<Integer>> pathToAggrIndexesMap) {
+    Map<PartialPath, List<List<Integer>>> result = new HashMap<>();
+    Map<String, AlignedPath> temp = new HashMap<>();
+
+    List<PartialPath> seriesPaths = new ArrayList<>(pathToAggrIndexesMap.keySet());
+    for (PartialPath seriesPath : seriesPaths) {
+      if (((MeasurementPath) seriesPath).isUnderAlignedEntity()) {
+        List<Integer> indexes = pathToAggrIndexesMap.remove(seriesPath);
+        AlignedPath groupPath = temp.get(seriesPath.getDevice());
+        if (groupPath == null) {
+          groupPath = new AlignedPath((MeasurementPath) seriesPath);
+          temp.put(seriesPath.getDevice(), groupPath);
+          result.computeIfAbsent(groupPath, key -> new ArrayList<>()).add(indexes);
+        } else {
+          // groupPath is changed here so we update it
+          List<List<Integer>> subIndexes = result.remove(groupPath);
+          subIndexes.add(indexes);
+          groupPath.addMeasurement((MeasurementPath) seriesPath);
+          result.put(groupPath, subIndexes);
+        }
+      }
+    }
+    return result;
+  }

Review comment:
       Improve this functuin, there is no need to remove groupPath from result and then put into it again. You can change the type of `temp` to `Map<String, Pair<AlignedPath, List<List<Integer>>>` and then transfer it to Map<PartialPath, List<List<Integer>>> at the end of this function.

##########
File path: server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
##########
@@ -56,22 +55,13 @@
 
   protected Map<PartialPath, GroupByExecutor> pathExecutors = new HashMap<>();

Review comment:
       Since you have have already split all the process of aligned and non-align path, you can replace the `pathExecutors ` to two `Map` like `resultIndexes` and `alignedPathToIndexesMap`. If so, you don't have to cast them to corresponding exact class each time.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByFillNoVFilterDataSet.java
##########
@@ -42,7 +42,6 @@ public ClusterGroupByFillNoVFilterDataSet(
     this.metaGroupMember = metaGroupMember;
   }
 
-  @Override

Review comment:
       We need to override this method in cluster, if you change the signature of `GroupByExecutor`'s constructor, you must also change `MergeGroupByExecutor`'s constructor.

##########
File path: server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
##########
@@ -71,18 +68,19 @@ public LocalGroupByExecutor(
         QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter);
     // update filter by TTL
     timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
-    this.reader =
+    // init SeriesAggregateReader for non-aligned series
+    reader =
         new SeriesAggregateReader(
             path,
-            allSensors,
-            dataType,
+            Collections.singleton(path.getMeasurement()),
+            path.getSeriesType(),

Review comment:
       Why change this? Here is a query improvement before.

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByNoVFilterDataSet.java
##########
@@ -48,7 +48,6 @@ public ClusterGroupByNoVFilterDataSet(
     this.metaGroupMember = metaGroupMember;
   }
 
-  @Override

Review comment:
       same as above

##########
File path: server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalAlignedGroupByExecutor.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset.groupby;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.series.AlignedSeriesAggregateReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class LocalAlignedGroupByExecutor implements GroupByExecutor {
+
+  private final AlignedSeriesAggregateReader reader;
+  private BatchData preCachedData;
+
+  // Aggregate result buffer
+  private final List<List<AggregateResult>> results = new ArrayList<>();
+  private final TimeRange timeRange;
+
+  // used for resetting the batch data to the last index
+  private int lastReadCurArrayIndex;
+  private int lastReadCurListIndex;
+  private final boolean ascending;
+
+  private final QueryDataSource queryDataSource;
+
+  public LocalAlignedGroupByExecutor(
+      PartialPath path,
+      QueryContext context,
+      Filter timeFilter,
+      TsFileFilter fileFilter,
+      boolean ascending)
+      throws StorageEngineException, QueryProcessException {
+    queryDataSource =
+        QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter);
+    // update filter by TTL
+    timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+    // init AlignedSeriesAggregateReader for aligned series
+    Set<String> allSensors = new HashSet<>(((AlignedPath) path).getMeasurementList());
+    reader =
+        new AlignedSeriesAggregateReader(
+            (AlignedPath) path,
+            allSensors,
+            TSDataType.VECTOR,
+            context,
+            queryDataSource,
+            timeFilter,
+            null,
+            fileFilter,
+            ascending);
+
+    preCachedData = null;
+    timeRange = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE);
+    lastReadCurArrayIndex = 0;
+    lastReadCurListIndex = 0;
+    this.ascending = ascending;
+  }
+
+  @Override
+  public void addAggregateResult(AggregateResult aggrResult) {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  @Override
+  public List<AggregateResult> calcResult(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  @Override
+  public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime)
+      throws IOException {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  public void addAggregateResultList(List<AggregateResult> aggrResultList) {
+    results.add(aggrResultList);
+  }
+
+  private boolean isEndCalc() {
+    for (List<AggregateResult> resultsOfOneMeasurement : results) {
+      for (AggregateResult result : resultsOfOneMeasurement) {
+        if (!result.hasFinalResult()) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  public List<List<AggregateResult>> calcAlignedResult(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+
+    // clear result cache
+    for (List<AggregateResult> resultsOfOneMeasurement : results) {
+      for (AggregateResult result : resultsOfOneMeasurement) {
+        result.reset();
+      }
+    }
+
+    timeRange.set(curStartTime, curEndTime - 1);
+    if (calcFromCacheData(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read page data firstly
+    if (readAndCalcFromPage(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read chunk data secondly
+    if (readAndCalcFromChunk(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read from file
+    while (reader.hasNextFile()) {
+      // try to calc from fileMetaData
+      Statistics fileStatistics = reader.currentFileStatistics();

Review comment:
       `reader.currentFileStatistics()` return the first sub sensor's statistics. Here you need to get the time column's statistics which could never be null, so you don't even to do the `isEmpty` judgement in the next line. 

##########
File path: server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalAlignedGroupByExecutor.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset.groupby;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.series.AlignedSeriesAggregateReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class LocalAlignedGroupByExecutor implements GroupByExecutor {
+
+  private final AlignedSeriesAggregateReader reader;
+  private BatchData preCachedData;
+
+  // Aggregate result buffer
+  private final List<List<AggregateResult>> results = new ArrayList<>();
+  private final TimeRange timeRange;
+
+  // used for resetting the batch data to the last index
+  private int lastReadCurArrayIndex;
+  private int lastReadCurListIndex;
+  private final boolean ascending;
+
+  private final QueryDataSource queryDataSource;
+
+  public LocalAlignedGroupByExecutor(
+      PartialPath path,
+      QueryContext context,
+      Filter timeFilter,
+      TsFileFilter fileFilter,
+      boolean ascending)
+      throws StorageEngineException, QueryProcessException {
+    queryDataSource =
+        QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter);
+    // update filter by TTL
+    timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+    // init AlignedSeriesAggregateReader for aligned series
+    Set<String> allSensors = new HashSet<>(((AlignedPath) path).getMeasurementList());
+    reader =
+        new AlignedSeriesAggregateReader(
+            (AlignedPath) path,
+            allSensors,
+            TSDataType.VECTOR,
+            context,
+            queryDataSource,
+            timeFilter,
+            null,
+            fileFilter,
+            ascending);
+
+    preCachedData = null;
+    timeRange = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE);
+    lastReadCurArrayIndex = 0;
+    lastReadCurListIndex = 0;
+    this.ascending = ascending;
+  }
+
+  @Override
+  public void addAggregateResult(AggregateResult aggrResult) {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  @Override
+  public List<AggregateResult> calcResult(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  @Override
+  public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime)
+      throws IOException {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  public void addAggregateResultList(List<AggregateResult> aggrResultList) {
+    results.add(aggrResultList);
+  }
+
+  private boolean isEndCalc() {
+    for (List<AggregateResult> resultsOfOneMeasurement : results) {
+      for (AggregateResult result : resultsOfOneMeasurement) {
+        if (!result.hasFinalResult()) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  public List<List<AggregateResult>> calcAlignedResult(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+
+    // clear result cache
+    for (List<AggregateResult> resultsOfOneMeasurement : results) {
+      for (AggregateResult result : resultsOfOneMeasurement) {
+        result.reset();
+      }
+    }
+
+    timeRange.set(curStartTime, curEndTime - 1);
+    if (calcFromCacheData(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read page data firstly
+    if (readAndCalcFromPage(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read chunk data secondly
+    if (readAndCalcFromChunk(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read from file
+    while (reader.hasNextFile()) {
+      // try to calc from fileMetaData
+      Statistics fileStatistics = reader.currentFileStatistics();
+      if (fileStatistics != null && !isEmpty(fileStatistics)) {
+        if (fileStatistics.getStartTime() >= curEndTime) {
+          return results;
+        }
+        if (reader.canUseCurrentFileStatistics()

Review comment:
       The `canUseCurrentFileStatistics()` function in `AlignedSeriesAggregateReader` has the same wrong thing like the above, it use the first sub sensor's statistics instead of time column's. You can change that by the way.

##########
File path: server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
##########
@@ -56,22 +55,13 @@
 
   protected Map<PartialPath, GroupByExecutor> pathExecutors = new HashMap<>();
 
-  /**
-   * path -> result index for each aggregation
-   *
-   * <p>e.g.,
-   *
-   * <p>deduplicated paths : s1, s2, s1 deduplicated aggregations : count, count, sum
-   *
-   * <p>s1 -> 0, 2 s2 -> 1
-   */

Review comment:
       Change the comments to the correct one instead of simply deleting that.

##########
File path: server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor.java
##########
@@ -150,12 +148,6 @@ private void calcFromBatch(BatchData batchData, long curStartTime, long curEndTi
         result.updateResultFromPageData(batchIterator, curStartTime, curEndTime);
       }
     }
-    lastReadCurArrayIndex = batchData.getReadCurArrayIndex();
-    lastReadCurListIndex = batchData.getReadCurListIndex();
-    // can calc for next interval
-    if (batchData.hasCurrent()) {
-      preCachedData = batchData;
-    }

Review comment:
       You delete these lines in `calcFromBatch` and then add them in the process of `readAndCalcFromPage`, but `calcFromBatch` is still called by `calcFromCacheData` where you forgot to change. So why you want move these lines outside of this method.

##########
File path: server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalAlignedGroupByExecutor.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset.groupby;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.series.AlignedSeriesAggregateReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class LocalAlignedGroupByExecutor implements GroupByExecutor {
+
+  private final AlignedSeriesAggregateReader reader;
+  private BatchData preCachedData;
+
+  // Aggregate result buffer
+  private final List<List<AggregateResult>> results = new ArrayList<>();
+  private final TimeRange timeRange;
+
+  // used for resetting the batch data to the last index
+  private int lastReadCurArrayIndex;
+  private int lastReadCurListIndex;
+  private final boolean ascending;
+
+  private final QueryDataSource queryDataSource;
+
+  public LocalAlignedGroupByExecutor(
+      PartialPath path,
+      QueryContext context,
+      Filter timeFilter,
+      TsFileFilter fileFilter,
+      boolean ascending)
+      throws StorageEngineException, QueryProcessException {
+    queryDataSource =
+        QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter);
+    // update filter by TTL
+    timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+    // init AlignedSeriesAggregateReader for aligned series
+    Set<String> allSensors = new HashSet<>(((AlignedPath) path).getMeasurementList());
+    reader =
+        new AlignedSeriesAggregateReader(
+            (AlignedPath) path,
+            allSensors,
+            TSDataType.VECTOR,
+            context,
+            queryDataSource,
+            timeFilter,
+            null,
+            fileFilter,
+            ascending);
+
+    preCachedData = null;
+    timeRange = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE);
+    lastReadCurArrayIndex = 0;
+    lastReadCurListIndex = 0;
+    this.ascending = ascending;
+  }
+
+  @Override
+  public void addAggregateResult(AggregateResult aggrResult) {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  @Override
+  public List<AggregateResult> calcResult(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  @Override
+  public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime)
+      throws IOException {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  public void addAggregateResultList(List<AggregateResult> aggrResultList) {
+    results.add(aggrResultList);
+  }
+
+  private boolean isEndCalc() {
+    for (List<AggregateResult> resultsOfOneMeasurement : results) {
+      for (AggregateResult result : resultsOfOneMeasurement) {
+        if (!result.hasFinalResult()) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  public List<List<AggregateResult>> calcAlignedResult(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+
+    // clear result cache
+    for (List<AggregateResult> resultsOfOneMeasurement : results) {
+      for (AggregateResult result : resultsOfOneMeasurement) {
+        result.reset();
+      }
+    }
+
+    timeRange.set(curStartTime, curEndTime - 1);
+    if (calcFromCacheData(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read page data firstly
+    if (readAndCalcFromPage(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read chunk data secondly
+    if (readAndCalcFromChunk(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read from file
+    while (reader.hasNextFile()) {
+      // try to calc from fileMetaData
+      Statistics fileStatistics = reader.currentFileStatistics();
+      if (fileStatistics != null && !isEmpty(fileStatistics)) {
+        if (fileStatistics.getStartTime() >= curEndTime) {
+          return results;
+        }

Review comment:
       You still need to use `ascending` here, like the judgement in `readAndCalcFromChunk` function. It may have bugs even in non-aligned case, you can make another pr to fix that before this pr merging

##########
File path: server/src/main/java/org/apache/iotdb/db/query/reader/series/AlignedSeriesAggregateReader.java
##########
@@ -174,4 +174,8 @@ public int getCurIndex() {
   public void resetIndex() {
     curIndex = 0;
   }
+
+  public int getSubSensorSize() {
+    return subSensorSize;
+  }

Review comment:
       This method is not used, remove it.

##########
File path: server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalAlignedGroupByExecutor.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset.groupby;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.series.AlignedSeriesAggregateReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class LocalAlignedGroupByExecutor implements GroupByExecutor {
+
+  private final AlignedSeriesAggregateReader reader;
+  private BatchData preCachedData;
+
+  // Aggregate result buffer
+  private final List<List<AggregateResult>> results = new ArrayList<>();
+  private final TimeRange timeRange;
+
+  // used for resetting the batch data to the last index
+  private int lastReadCurArrayIndex;
+  private int lastReadCurListIndex;
+  private final boolean ascending;
+
+  private final QueryDataSource queryDataSource;
+
+  public LocalAlignedGroupByExecutor(
+      PartialPath path,
+      QueryContext context,
+      Filter timeFilter,
+      TsFileFilter fileFilter,
+      boolean ascending)
+      throws StorageEngineException, QueryProcessException {
+    queryDataSource =
+        QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter);
+    // update filter by TTL
+    timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+    // init AlignedSeriesAggregateReader for aligned series
+    Set<String> allSensors = new HashSet<>(((AlignedPath) path).getMeasurementList());
+    reader =
+        new AlignedSeriesAggregateReader(
+            (AlignedPath) path,
+            allSensors,
+            TSDataType.VECTOR,
+            context,
+            queryDataSource,
+            timeFilter,
+            null,
+            fileFilter,
+            ascending);
+
+    preCachedData = null;
+    timeRange = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE);
+    lastReadCurArrayIndex = 0;
+    lastReadCurListIndex = 0;
+    this.ascending = ascending;
+  }
+
+  @Override
+  public void addAggregateResult(AggregateResult aggrResult) {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  @Override
+  public List<AggregateResult> calcResult(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  @Override
+  public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime)
+      throws IOException {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  public void addAggregateResultList(List<AggregateResult> aggrResultList) {
+    results.add(aggrResultList);
+  }
+
+  private boolean isEndCalc() {
+    for (List<AggregateResult> resultsOfOneMeasurement : results) {
+      for (AggregateResult result : resultsOfOneMeasurement) {
+        if (!result.hasFinalResult()) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  public List<List<AggregateResult>> calcAlignedResult(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+
+    // clear result cache
+    for (List<AggregateResult> resultsOfOneMeasurement : results) {
+      for (AggregateResult result : resultsOfOneMeasurement) {
+        result.reset();
+      }
+    }
+
+    timeRange.set(curStartTime, curEndTime - 1);
+    if (calcFromCacheData(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read page data firstly
+    if (readAndCalcFromPage(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read chunk data secondly
+    if (readAndCalcFromChunk(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read from file
+    while (reader.hasNextFile()) {
+      // try to calc from fileMetaData
+      Statistics fileStatistics = reader.currentFileStatistics();
+      if (fileStatistics != null && !isEmpty(fileStatistics)) {
+        if (fileStatistics.getStartTime() >= curEndTime) {
+          return results;
+        }
+        if (reader.canUseCurrentFileStatistics()
+            && timeRange.contains(fileStatistics.getStartTime(), fileStatistics.getEndTime())) {
+          // calc from fileMetaData
+          while (reader.hasNextSubSeries()) {
+            Statistics currentFileStatistics = reader.currentFileStatistics();
+            calcFromStatistics(currentFileStatistics, results.get(reader.getCurIndex()));
+            reader.nextSeries();
+          }
+          reader.skipCurrentFile();
+          continue;
+        }
+      }
+      // read chunk
+      if (readAndCalcFromChunk(curStartTime, curEndTime)) {
+        return results;
+      }
+    }
+
+    return results;
+  }
+
+  private void calcFromStatistics(Statistics statistics, List<AggregateResult> aggregateResultList)
+      throws QueryProcessException {
+    // statistics may be null for aligned time series
+    if (statistics == null) {
+      return;
+    }
+    for (AggregateResult result : aggregateResultList) {
+      if (result.hasFinalResult()) {
+        continue;
+      }
+      result.updateResultFromStatistics(statistics);
+    }
+  }
+
+  private boolean readAndCalcFromChunk(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+    while (reader.hasNextChunk()) {
+      // try to calc from chunkMetaData
+      Statistics chunkStatistics = reader.currentChunkStatistics();
+      if (chunkStatistics != null && !isEmpty(chunkStatistics)) {
+        if (chunkStatistics.getStartTime() >= curEndTime) {
+          if (ascending) {
+            return true;
+          } else {
+            reader.skipCurrentChunk();
+            continue;
+          }
+        }
+        if (reader.canUseCurrentChunkStatistics()
+            && timeRange.contains(chunkStatistics.getStartTime(), chunkStatistics.getEndTime())) {
+          // calc from chunkMetaData
+          while (reader.hasNextSubSeries()) {
+            Statistics currentChunkStatistics = reader.currentChunkStatistics();
+            calcFromStatistics(currentChunkStatistics, results.get(reader.getCurIndex()));
+            reader.nextSeries();
+          }
+          reader.skipCurrentChunk();
+          continue;
+        }
+      }
+      // read page
+      if (readAndCalcFromPage(curStartTime, curEndTime)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean readAndCalcFromPage(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+    while (reader.hasNextPage()) {
+      // try to calc from pageHeader
+      Statistics pageStatistics = reader.currentPageStatistics();
+      // must be non overlapped page
+      if (pageStatistics != null && !isEmpty(pageStatistics)) {
+        // current page max than time range
+        if (pageStatistics.getStartTime() >= curEndTime) {
+          if (ascending) {
+            return true;
+          } else {
+            reader.skipCurrentPage();
+            continue;
+          }
+        }
+        if (reader.canUseCurrentPageStatistics()
+            && timeRange.contains(pageStatistics.getStartTime(), pageStatistics.getEndTime())) {
+          // calc from pageHeader
+          while (reader.hasNextSubSeries()) {
+            Statistics currentPageStatistics = reader.currentPageStatistics();
+            calcFromStatistics(currentPageStatistics, results.get(reader.getCurIndex()));
+            reader.nextSeries();
+          }
+          reader.skipCurrentPage();
+          if (isEndCalc()) {
+            return true;
+          }
+          continue;
+        }
+      }
+
+      // calc from page data
+      BatchData batchData = reader.nextPage();
+      if (batchData == null || !batchData.hasCurrent()) {
+        continue;
+      }
+
+      // stop calc and cached current batchData
+      if (ascending && batchData.currentTime() >= curEndTime) {
+        preCachedData = batchData;
+        // reset the last position to current Index
+        lastReadCurArrayIndex = batchData.getReadCurArrayIndex();
+        lastReadCurListIndex = batchData.getReadCurListIndex();

Review comment:
       put this before the if block and delete the following same two lines, just like `LocalGroupByExecutor`

##########
File path: server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalAlignedGroupByExecutor.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset.groupby;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.series.AlignedSeriesAggregateReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class LocalAlignedGroupByExecutor implements GroupByExecutor {
+
+  private final AlignedSeriesAggregateReader reader;
+  private BatchData preCachedData;
+
+  // Aggregate result buffer
+  private final List<List<AggregateResult>> results = new ArrayList<>();
+  private final TimeRange timeRange;
+
+  // used for resetting the batch data to the last index
+  private int lastReadCurArrayIndex;
+  private int lastReadCurListIndex;
+  private final boolean ascending;
+
+  private final QueryDataSource queryDataSource;
+
+  public LocalAlignedGroupByExecutor(
+      PartialPath path,
+      QueryContext context,
+      Filter timeFilter,
+      TsFileFilter fileFilter,
+      boolean ascending)
+      throws StorageEngineException, QueryProcessException {
+    queryDataSource =
+        QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter);
+    // update filter by TTL
+    timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+    // init AlignedSeriesAggregateReader for aligned series
+    Set<String> allSensors = new HashSet<>(((AlignedPath) path).getMeasurementList());
+    reader =
+        new AlignedSeriesAggregateReader(
+            (AlignedPath) path,
+            allSensors,
+            TSDataType.VECTOR,
+            context,
+            queryDataSource,
+            timeFilter,
+            null,
+            fileFilter,
+            ascending);
+
+    preCachedData = null;
+    timeRange = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE);
+    lastReadCurArrayIndex = 0;
+    lastReadCurListIndex = 0;
+    this.ascending = ascending;
+  }
+
+  @Override
+  public void addAggregateResult(AggregateResult aggrResult) {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  @Override
+  public List<AggregateResult> calcResult(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  @Override
+  public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime)
+      throws IOException {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  public void addAggregateResultList(List<AggregateResult> aggrResultList) {
+    results.add(aggrResultList);
+  }
+
+  private boolean isEndCalc() {
+    for (List<AggregateResult> resultsOfOneMeasurement : results) {
+      for (AggregateResult result : resultsOfOneMeasurement) {
+        if (!result.hasFinalResult()) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  public List<List<AggregateResult>> calcAlignedResult(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+
+    // clear result cache
+    for (List<AggregateResult> resultsOfOneMeasurement : results) {
+      for (AggregateResult result : resultsOfOneMeasurement) {
+        result.reset();
+      }
+    }
+
+    timeRange.set(curStartTime, curEndTime - 1);
+    if (calcFromCacheData(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read page data firstly
+    if (readAndCalcFromPage(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read chunk data secondly
+    if (readAndCalcFromChunk(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read from file
+    while (reader.hasNextFile()) {
+      // try to calc from fileMetaData
+      Statistics fileStatistics = reader.currentFileStatistics();
+      if (fileStatistics != null && !isEmpty(fileStatistics)) {
+        if (fileStatistics.getStartTime() >= curEndTime) {
+          return results;
+        }
+        if (reader.canUseCurrentFileStatistics()
+            && timeRange.contains(fileStatistics.getStartTime(), fileStatistics.getEndTime())) {
+          // calc from fileMetaData
+          while (reader.hasNextSubSeries()) {
+            Statistics currentFileStatistics = reader.currentFileStatistics();
+            calcFromStatistics(currentFileStatistics, results.get(reader.getCurIndex()));
+            reader.nextSeries();
+          }
+          reader.skipCurrentFile();
+          continue;
+        }
+      }
+      // read chunk
+      if (readAndCalcFromChunk(curStartTime, curEndTime)) {
+        return results;
+      }
+    }
+
+    return results;
+  }
+
+  private void calcFromStatistics(Statistics statistics, List<AggregateResult> aggregateResultList)
+      throws QueryProcessException {
+    // statistics may be null for aligned time series
+    if (statistics == null) {
+      return;
+    }
+    for (AggregateResult result : aggregateResultList) {
+      if (result.hasFinalResult()) {
+        continue;
+      }
+      result.updateResultFromStatistics(statistics);
+    }
+  }
+
+  private boolean readAndCalcFromChunk(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+    while (reader.hasNextChunk()) {
+      // try to calc from chunkMetaData
+      Statistics chunkStatistics = reader.currentChunkStatistics();
+      if (chunkStatistics != null && !isEmpty(chunkStatistics)) {

Review comment:
       no need to judge whether it is empty.

##########
File path: server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalAlignedGroupByExecutor.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset.groupby;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.series.AlignedSeriesAggregateReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class LocalAlignedGroupByExecutor implements GroupByExecutor {
+
+  private final AlignedSeriesAggregateReader reader;
+  private BatchData preCachedData;
+
+  // Aggregate result buffer
+  private final List<List<AggregateResult>> results = new ArrayList<>();
+  private final TimeRange timeRange;
+
+  // used for resetting the batch data to the last index
+  private int lastReadCurArrayIndex;
+  private int lastReadCurListIndex;
+  private final boolean ascending;
+
+  private final QueryDataSource queryDataSource;
+
+  public LocalAlignedGroupByExecutor(
+      PartialPath path,
+      QueryContext context,
+      Filter timeFilter,
+      TsFileFilter fileFilter,
+      boolean ascending)
+      throws StorageEngineException, QueryProcessException {
+    queryDataSource =
+        QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter);
+    // update filter by TTL
+    timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+    // init AlignedSeriesAggregateReader for aligned series
+    Set<String> allSensors = new HashSet<>(((AlignedPath) path).getMeasurementList());
+    reader =
+        new AlignedSeriesAggregateReader(
+            (AlignedPath) path,
+            allSensors,
+            TSDataType.VECTOR,
+            context,
+            queryDataSource,
+            timeFilter,
+            null,
+            fileFilter,
+            ascending);
+
+    preCachedData = null;
+    timeRange = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE);
+    lastReadCurArrayIndex = 0;
+    lastReadCurListIndex = 0;
+    this.ascending = ascending;
+  }
+
+  @Override
+  public void addAggregateResult(AggregateResult aggrResult) {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  @Override
+  public List<AggregateResult> calcResult(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  @Override
+  public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime)
+      throws IOException {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  public void addAggregateResultList(List<AggregateResult> aggrResultList) {
+    results.add(aggrResultList);
+  }
+
+  private boolean isEndCalc() {
+    for (List<AggregateResult> resultsOfOneMeasurement : results) {
+      for (AggregateResult result : resultsOfOneMeasurement) {
+        if (!result.hasFinalResult()) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  public List<List<AggregateResult>> calcAlignedResult(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+
+    // clear result cache
+    for (List<AggregateResult> resultsOfOneMeasurement : results) {
+      for (AggregateResult result : resultsOfOneMeasurement) {
+        result.reset();
+      }
+    }
+
+    timeRange.set(curStartTime, curEndTime - 1);
+    if (calcFromCacheData(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read page data firstly
+    if (readAndCalcFromPage(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read chunk data secondly
+    if (readAndCalcFromChunk(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read from file
+    while (reader.hasNextFile()) {
+      // try to calc from fileMetaData
+      Statistics fileStatistics = reader.currentFileStatistics();
+      if (fileStatistics != null && !isEmpty(fileStatistics)) {
+        if (fileStatistics.getStartTime() >= curEndTime) {
+          return results;
+        }
+        if (reader.canUseCurrentFileStatistics()
+            && timeRange.contains(fileStatistics.getStartTime(), fileStatistics.getEndTime())) {
+          // calc from fileMetaData
+          while (reader.hasNextSubSeries()) {
+            Statistics currentFileStatistics = reader.currentFileStatistics();
+            calcFromStatistics(currentFileStatistics, results.get(reader.getCurIndex()));
+            reader.nextSeries();
+          }
+          reader.skipCurrentFile();
+          continue;
+        }
+      }
+      // read chunk
+      if (readAndCalcFromChunk(curStartTime, curEndTime)) {
+        return results;
+      }
+    }
+
+    return results;
+  }
+
+  private void calcFromStatistics(Statistics statistics, List<AggregateResult> aggregateResultList)
+      throws QueryProcessException {
+    // statistics may be null for aligned time series
+    if (statistics == null) {
+      return;
+    }
+    for (AggregateResult result : aggregateResultList) {
+      if (result.hasFinalResult()) {
+        continue;
+      }
+      result.updateResultFromStatistics(statistics);
+    }
+  }
+
+  private boolean readAndCalcFromChunk(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+    while (reader.hasNextChunk()) {
+      // try to calc from chunkMetaData
+      Statistics chunkStatistics = reader.currentChunkStatistics();
+      if (chunkStatistics != null && !isEmpty(chunkStatistics)) {
+        if (chunkStatistics.getStartTime() >= curEndTime) {
+          if (ascending) {
+            return true;
+          } else {
+            reader.skipCurrentChunk();
+            continue;
+          }
+        }
+        if (reader.canUseCurrentChunkStatistics()
+            && timeRange.contains(chunkStatistics.getStartTime(), chunkStatistics.getEndTime())) {
+          // calc from chunkMetaData
+          while (reader.hasNextSubSeries()) {
+            Statistics currentChunkStatistics = reader.currentChunkStatistics();
+            calcFromStatistics(currentChunkStatistics, results.get(reader.getCurIndex()));
+            reader.nextSeries();
+          }
+          reader.skipCurrentChunk();
+          continue;
+        }
+      }
+      // read page
+      if (readAndCalcFromPage(curStartTime, curEndTime)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean readAndCalcFromPage(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+    while (reader.hasNextPage()) {
+      // try to calc from pageHeader
+      Statistics pageStatistics = reader.currentPageStatistics();
+      // must be non overlapped page
+      if (pageStatistics != null && !isEmpty(pageStatistics)) {
+        // current page max than time range
+        if (pageStatistics.getStartTime() >= curEndTime) {
+          if (ascending) {
+            return true;
+          } else {
+            reader.skipCurrentPage();
+            continue;
+          }
+        }
+        if (reader.canUseCurrentPageStatistics()

Review comment:
       change it in another pr.

##########
File path: server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalAlignedGroupByExecutor.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset.groupby;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.series.AlignedSeriesAggregateReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class LocalAlignedGroupByExecutor implements GroupByExecutor {
+
+  private final AlignedSeriesAggregateReader reader;
+  private BatchData preCachedData;
+
+  // Aggregate result buffer
+  private final List<List<AggregateResult>> results = new ArrayList<>();
+  private final TimeRange timeRange;
+
+  // used for resetting the batch data to the last index
+  private int lastReadCurArrayIndex;
+  private int lastReadCurListIndex;
+  private final boolean ascending;
+
+  private final QueryDataSource queryDataSource;
+
+  public LocalAlignedGroupByExecutor(
+      PartialPath path,
+      QueryContext context,
+      Filter timeFilter,
+      TsFileFilter fileFilter,
+      boolean ascending)
+      throws StorageEngineException, QueryProcessException {
+    queryDataSource =
+        QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter);
+    // update filter by TTL
+    timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+    // init AlignedSeriesAggregateReader for aligned series
+    Set<String> allSensors = new HashSet<>(((AlignedPath) path).getMeasurementList());
+    reader =
+        new AlignedSeriesAggregateReader(
+            (AlignedPath) path,
+            allSensors,
+            TSDataType.VECTOR,
+            context,
+            queryDataSource,
+            timeFilter,
+            null,
+            fileFilter,
+            ascending);
+
+    preCachedData = null;
+    timeRange = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE);
+    lastReadCurArrayIndex = 0;
+    lastReadCurListIndex = 0;
+    this.ascending = ascending;
+  }
+
+  @Override
+  public void addAggregateResult(AggregateResult aggrResult) {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  @Override
+  public List<AggregateResult> calcResult(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  @Override
+  public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime)
+      throws IOException {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  public void addAggregateResultList(List<AggregateResult> aggrResultList) {
+    results.add(aggrResultList);
+  }
+
+  private boolean isEndCalc() {
+    for (List<AggregateResult> resultsOfOneMeasurement : results) {
+      for (AggregateResult result : resultsOfOneMeasurement) {
+        if (!result.hasFinalResult()) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  public List<List<AggregateResult>> calcAlignedResult(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+
+    // clear result cache
+    for (List<AggregateResult> resultsOfOneMeasurement : results) {
+      for (AggregateResult result : resultsOfOneMeasurement) {
+        result.reset();
+      }
+    }
+
+    timeRange.set(curStartTime, curEndTime - 1);
+    if (calcFromCacheData(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read page data firstly
+    if (readAndCalcFromPage(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read chunk data secondly
+    if (readAndCalcFromChunk(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read from file
+    while (reader.hasNextFile()) {
+      // try to calc from fileMetaData
+      Statistics fileStatistics = reader.currentFileStatistics();
+      if (fileStatistics != null && !isEmpty(fileStatistics)) {
+        if (fileStatistics.getStartTime() >= curEndTime) {
+          return results;
+        }
+        if (reader.canUseCurrentFileStatistics()
+            && timeRange.contains(fileStatistics.getStartTime(), fileStatistics.getEndTime())) {
+          // calc from fileMetaData
+          while (reader.hasNextSubSeries()) {
+            Statistics currentFileStatistics = reader.currentFileStatistics();
+            calcFromStatistics(currentFileStatistics, results.get(reader.getCurIndex()));
+            reader.nextSeries();
+          }
+          reader.skipCurrentFile();
+          continue;
+        }
+      }
+      // read chunk
+      if (readAndCalcFromChunk(curStartTime, curEndTime)) {
+        return results;
+      }
+    }
+
+    return results;
+  }
+
+  private void calcFromStatistics(Statistics statistics, List<AggregateResult> aggregateResultList)
+      throws QueryProcessException {
+    // statistics may be null for aligned time series
+    if (statistics == null) {
+      return;
+    }
+    for (AggregateResult result : aggregateResultList) {
+      if (result.hasFinalResult()) {
+        continue;
+      }
+      result.updateResultFromStatistics(statistics);
+    }
+  }
+
+  private boolean readAndCalcFromChunk(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+    while (reader.hasNextChunk()) {
+      // try to calc from chunkMetaData
+      Statistics chunkStatistics = reader.currentChunkStatistics();
+      if (chunkStatistics != null && !isEmpty(chunkStatistics)) {
+        if (chunkStatistics.getStartTime() >= curEndTime) {
+          if (ascending) {
+            return true;
+          } else {
+            reader.skipCurrentChunk();
+            continue;
+          }
+        }
+        if (reader.canUseCurrentChunkStatistics()

Review comment:
       change `canUseCurrentChunkStatistics` function to use time column in another pr.

##########
File path: server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalAlignedGroupByExecutor.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset.groupby;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.series.AlignedSeriesAggregateReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class LocalAlignedGroupByExecutor implements GroupByExecutor {
+
+  private final AlignedSeriesAggregateReader reader;
+  private BatchData preCachedData;
+
+  // Aggregate result buffer
+  private final List<List<AggregateResult>> results = new ArrayList<>();
+  private final TimeRange timeRange;
+
+  // used for resetting the batch data to the last index
+  private int lastReadCurArrayIndex;
+  private int lastReadCurListIndex;
+  private final boolean ascending;
+
+  private final QueryDataSource queryDataSource;
+
+  public LocalAlignedGroupByExecutor(
+      PartialPath path,
+      QueryContext context,
+      Filter timeFilter,
+      TsFileFilter fileFilter,
+      boolean ascending)
+      throws StorageEngineException, QueryProcessException {
+    queryDataSource =
+        QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter);
+    // update filter by TTL
+    timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+    // init AlignedSeriesAggregateReader for aligned series
+    Set<String> allSensors = new HashSet<>(((AlignedPath) path).getMeasurementList());
+    reader =
+        new AlignedSeriesAggregateReader(
+            (AlignedPath) path,
+            allSensors,
+            TSDataType.VECTOR,
+            context,
+            queryDataSource,
+            timeFilter,
+            null,
+            fileFilter,
+            ascending);
+
+    preCachedData = null;
+    timeRange = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE);
+    lastReadCurArrayIndex = 0;
+    lastReadCurListIndex = 0;
+    this.ascending = ascending;
+  }
+
+  @Override
+  public void addAggregateResult(AggregateResult aggrResult) {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  @Override
+  public List<AggregateResult> calcResult(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  @Override
+  public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime)
+      throws IOException {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  public void addAggregateResultList(List<AggregateResult> aggrResultList) {
+    results.add(aggrResultList);
+  }
+
+  private boolean isEndCalc() {
+    for (List<AggregateResult> resultsOfOneMeasurement : results) {
+      for (AggregateResult result : resultsOfOneMeasurement) {
+        if (!result.hasFinalResult()) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  public List<List<AggregateResult>> calcAlignedResult(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+
+    // clear result cache
+    for (List<AggregateResult> resultsOfOneMeasurement : results) {
+      for (AggregateResult result : resultsOfOneMeasurement) {
+        result.reset();
+      }
+    }
+
+    timeRange.set(curStartTime, curEndTime - 1);
+    if (calcFromCacheData(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read page data firstly
+    if (readAndCalcFromPage(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read chunk data secondly
+    if (readAndCalcFromChunk(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read from file
+    while (reader.hasNextFile()) {
+      // try to calc from fileMetaData
+      Statistics fileStatistics = reader.currentFileStatistics();
+      if (fileStatistics != null && !isEmpty(fileStatistics)) {
+        if (fileStatistics.getStartTime() >= curEndTime) {
+          return results;
+        }
+        if (reader.canUseCurrentFileStatistics()
+            && timeRange.contains(fileStatistics.getStartTime(), fileStatistics.getEndTime())) {
+          // calc from fileMetaData
+          while (reader.hasNextSubSeries()) {
+            Statistics currentFileStatistics = reader.currentFileStatistics();
+            calcFromStatistics(currentFileStatistics, results.get(reader.getCurIndex()));
+            reader.nextSeries();
+          }
+          reader.skipCurrentFile();
+          continue;
+        }
+      }
+      // read chunk
+      if (readAndCalcFromChunk(curStartTime, curEndTime)) {
+        return results;
+      }
+    }
+
+    return results;
+  }
+
+  private void calcFromStatistics(Statistics statistics, List<AggregateResult> aggregateResultList)
+      throws QueryProcessException {
+    // statistics may be null for aligned time series
+    if (statistics == null) {
+      return;
+    }
+    for (AggregateResult result : aggregateResultList) {
+      if (result.hasFinalResult()) {
+        continue;
+      }
+      result.updateResultFromStatistics(statistics);
+    }
+  }
+
+  private boolean readAndCalcFromChunk(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+    while (reader.hasNextChunk()) {
+      // try to calc from chunkMetaData
+      Statistics chunkStatistics = reader.currentChunkStatistics();

Review comment:
       same as above ,use time column's statistics.

##########
File path: server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalAlignedGroupByExecutor.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset.groupby;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.series.AlignedSeriesAggregateReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class LocalAlignedGroupByExecutor implements GroupByExecutor {
+
+  private final AlignedSeriesAggregateReader reader;
+  private BatchData preCachedData;
+
+  // Aggregate result buffer
+  private final List<List<AggregateResult>> results = new ArrayList<>();
+  private final TimeRange timeRange;
+
+  // used for resetting the batch data to the last index
+  private int lastReadCurArrayIndex;
+  private int lastReadCurListIndex;
+  private final boolean ascending;
+
+  private final QueryDataSource queryDataSource;
+
+  public LocalAlignedGroupByExecutor(
+      PartialPath path,
+      QueryContext context,
+      Filter timeFilter,
+      TsFileFilter fileFilter,
+      boolean ascending)
+      throws StorageEngineException, QueryProcessException {
+    queryDataSource =
+        QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter);
+    // update filter by TTL
+    timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+    // init AlignedSeriesAggregateReader for aligned series
+    Set<String> allSensors = new HashSet<>(((AlignedPath) path).getMeasurementList());
+    reader =
+        new AlignedSeriesAggregateReader(
+            (AlignedPath) path,
+            allSensors,
+            TSDataType.VECTOR,
+            context,
+            queryDataSource,
+            timeFilter,
+            null,
+            fileFilter,
+            ascending);
+
+    preCachedData = null;
+    timeRange = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE);
+    lastReadCurArrayIndex = 0;
+    lastReadCurListIndex = 0;
+    this.ascending = ascending;
+  }
+
+  @Override
+  public void addAggregateResult(AggregateResult aggrResult) {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  @Override
+  public List<AggregateResult> calcResult(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  @Override
+  public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime)
+      throws IOException {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  public void addAggregateResultList(List<AggregateResult> aggrResultList) {
+    results.add(aggrResultList);
+  }
+
+  private boolean isEndCalc() {
+    for (List<AggregateResult> resultsOfOneMeasurement : results) {
+      for (AggregateResult result : resultsOfOneMeasurement) {
+        if (!result.hasFinalResult()) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  public List<List<AggregateResult>> calcAlignedResult(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+
+    // clear result cache
+    for (List<AggregateResult> resultsOfOneMeasurement : results) {
+      for (AggregateResult result : resultsOfOneMeasurement) {
+        result.reset();
+      }
+    }
+
+    timeRange.set(curStartTime, curEndTime - 1);
+    if (calcFromCacheData(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read page data firstly
+    if (readAndCalcFromPage(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read chunk data secondly
+    if (readAndCalcFromChunk(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read from file
+    while (reader.hasNextFile()) {
+      // try to calc from fileMetaData
+      Statistics fileStatistics = reader.currentFileStatistics();
+      if (fileStatistics != null && !isEmpty(fileStatistics)) {
+        if (fileStatistics.getStartTime() >= curEndTime) {
+          return results;
+        }
+        if (reader.canUseCurrentFileStatistics()
+            && timeRange.contains(fileStatistics.getStartTime(), fileStatistics.getEndTime())) {
+          // calc from fileMetaData
+          while (reader.hasNextSubSeries()) {
+            Statistics currentFileStatistics = reader.currentFileStatistics();
+            calcFromStatistics(currentFileStatistics, results.get(reader.getCurIndex()));
+            reader.nextSeries();
+          }
+          reader.skipCurrentFile();
+          continue;
+        }
+      }
+      // read chunk
+      if (readAndCalcFromChunk(curStartTime, curEndTime)) {
+        return results;
+      }
+    }
+
+    return results;
+  }
+
+  private void calcFromStatistics(Statistics statistics, List<AggregateResult> aggregateResultList)
+      throws QueryProcessException {
+    // statistics may be null for aligned time series
+    if (statistics == null) {
+      return;
+    }
+    for (AggregateResult result : aggregateResultList) {
+      if (result.hasFinalResult()) {
+        continue;
+      }
+      result.updateResultFromStatistics(statistics);
+    }
+  }
+
+  private boolean readAndCalcFromChunk(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+    while (reader.hasNextChunk()) {
+      // try to calc from chunkMetaData
+      Statistics chunkStatistics = reader.currentChunkStatistics();
+      if (chunkStatistics != null && !isEmpty(chunkStatistics)) {
+        if (chunkStatistics.getStartTime() >= curEndTime) {
+          if (ascending) {
+            return true;
+          } else {
+            reader.skipCurrentChunk();
+            continue;
+          }
+        }
+        if (reader.canUseCurrentChunkStatistics()
+            && timeRange.contains(chunkStatistics.getStartTime(), chunkStatistics.getEndTime())) {
+          // calc from chunkMetaData
+          while (reader.hasNextSubSeries()) {
+            Statistics currentChunkStatistics = reader.currentChunkStatistics();
+            calcFromStatistics(currentChunkStatistics, results.get(reader.getCurIndex()));
+            reader.nextSeries();
+          }
+          reader.skipCurrentChunk();
+          continue;
+        }
+      }
+      // read page
+      if (readAndCalcFromPage(curStartTime, curEndTime)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean readAndCalcFromPage(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+    while (reader.hasNextPage()) {
+      // try to calc from pageHeader
+      Statistics pageStatistics = reader.currentPageStatistics();
+      // must be non overlapped page
+      if (pageStatistics != null && !isEmpty(pageStatistics)) {

Review comment:
       same as above

##########
File path: server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalAlignedGroupByExecutor.java
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.dataset.groupby;
+
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.aggregation.AggregateResult;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.filter.TsFileFilter;
+import org.apache.iotdb.db.query.reader.series.AlignedSeriesAggregateReader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class LocalAlignedGroupByExecutor implements GroupByExecutor {
+
+  private final AlignedSeriesAggregateReader reader;
+  private BatchData preCachedData;
+
+  // Aggregate result buffer
+  private final List<List<AggregateResult>> results = new ArrayList<>();
+  private final TimeRange timeRange;
+
+  // used for resetting the batch data to the last index
+  private int lastReadCurArrayIndex;
+  private int lastReadCurListIndex;
+  private final boolean ascending;
+
+  private final QueryDataSource queryDataSource;
+
+  public LocalAlignedGroupByExecutor(
+      PartialPath path,
+      QueryContext context,
+      Filter timeFilter,
+      TsFileFilter fileFilter,
+      boolean ascending)
+      throws StorageEngineException, QueryProcessException {
+    queryDataSource =
+        QueryResourceManager.getInstance().getQueryDataSource(path, context, timeFilter);
+    // update filter by TTL
+    timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
+    // init AlignedSeriesAggregateReader for aligned series
+    Set<String> allSensors = new HashSet<>(((AlignedPath) path).getMeasurementList());
+    reader =
+        new AlignedSeriesAggregateReader(
+            (AlignedPath) path,
+            allSensors,
+            TSDataType.VECTOR,
+            context,
+            queryDataSource,
+            timeFilter,
+            null,
+            fileFilter,
+            ascending);
+
+    preCachedData = null;
+    timeRange = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE);
+    lastReadCurArrayIndex = 0;
+    lastReadCurListIndex = 0;
+    this.ascending = ascending;
+  }
+
+  @Override
+  public void addAggregateResult(AggregateResult aggrResult) {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  @Override
+  public List<AggregateResult> calcResult(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  @Override
+  public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime)
+      throws IOException {
+    throw new UnsupportedOperationException(
+        "This method is not supported in LocalAlignedGroupByExecutor");
+  }
+
+  public void addAggregateResultList(List<AggregateResult> aggrResultList) {
+    results.add(aggrResultList);
+  }
+
+  private boolean isEndCalc() {
+    for (List<AggregateResult> resultsOfOneMeasurement : results) {
+      for (AggregateResult result : resultsOfOneMeasurement) {
+        if (!result.hasFinalResult()) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  public List<List<AggregateResult>> calcAlignedResult(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+
+    // clear result cache
+    for (List<AggregateResult> resultsOfOneMeasurement : results) {
+      for (AggregateResult result : resultsOfOneMeasurement) {
+        result.reset();
+      }
+    }
+
+    timeRange.set(curStartTime, curEndTime - 1);
+    if (calcFromCacheData(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read page data firstly
+    if (readAndCalcFromPage(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read chunk data secondly
+    if (readAndCalcFromChunk(curStartTime, curEndTime)) {
+      return results;
+    }
+
+    // read from file
+    while (reader.hasNextFile()) {
+      // try to calc from fileMetaData
+      Statistics fileStatistics = reader.currentFileStatistics();
+      if (fileStatistics != null && !isEmpty(fileStatistics)) {
+        if (fileStatistics.getStartTime() >= curEndTime) {
+          return results;
+        }
+        if (reader.canUseCurrentFileStatistics()
+            && timeRange.contains(fileStatistics.getStartTime(), fileStatistics.getEndTime())) {
+          // calc from fileMetaData
+          while (reader.hasNextSubSeries()) {
+            Statistics currentFileStatistics = reader.currentFileStatistics();
+            calcFromStatistics(currentFileStatistics, results.get(reader.getCurIndex()));
+            reader.nextSeries();
+          }
+          reader.skipCurrentFile();
+          continue;
+        }
+      }
+      // read chunk
+      if (readAndCalcFromChunk(curStartTime, curEndTime)) {
+        return results;
+      }
+    }
+
+    return results;
+  }
+
+  private void calcFromStatistics(Statistics statistics, List<AggregateResult> aggregateResultList)
+      throws QueryProcessException {
+    // statistics may be null for aligned time series
+    if (statistics == null) {
+      return;
+    }
+    for (AggregateResult result : aggregateResultList) {
+      if (result.hasFinalResult()) {
+        continue;
+      }
+      result.updateResultFromStatistics(statistics);
+    }
+  }
+
+  private boolean readAndCalcFromChunk(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+    while (reader.hasNextChunk()) {
+      // try to calc from chunkMetaData
+      Statistics chunkStatistics = reader.currentChunkStatistics();
+      if (chunkStatistics != null && !isEmpty(chunkStatistics)) {
+        if (chunkStatistics.getStartTime() >= curEndTime) {
+          if (ascending) {
+            return true;
+          } else {
+            reader.skipCurrentChunk();
+            continue;
+          }
+        }
+        if (reader.canUseCurrentChunkStatistics()
+            && timeRange.contains(chunkStatistics.getStartTime(), chunkStatistics.getEndTime())) {
+          // calc from chunkMetaData
+          while (reader.hasNextSubSeries()) {
+            Statistics currentChunkStatistics = reader.currentChunkStatistics();
+            calcFromStatistics(currentChunkStatistics, results.get(reader.getCurIndex()));
+            reader.nextSeries();
+          }
+          reader.skipCurrentChunk();
+          continue;
+        }
+      }
+      // read page
+      if (readAndCalcFromPage(curStartTime, curEndTime)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean readAndCalcFromPage(long curStartTime, long curEndTime)
+      throws IOException, QueryProcessException {
+    while (reader.hasNextPage()) {
+      // try to calc from pageHeader
+      Statistics pageStatistics = reader.currentPageStatistics();
+      // must be non overlapped page
+      if (pageStatistics != null && !isEmpty(pageStatistics)) {
+        // current page max than time range
+        if (pageStatistics.getStartTime() >= curEndTime) {
+          if (ascending) {
+            return true;
+          } else {
+            reader.skipCurrentPage();
+            continue;
+          }
+        }
+        if (reader.canUseCurrentPageStatistics()
+            && timeRange.contains(pageStatistics.getStartTime(), pageStatistics.getEndTime())) {
+          // calc from pageHeader
+          while (reader.hasNextSubSeries()) {
+            Statistics currentPageStatistics = reader.currentPageStatistics();
+            calcFromStatistics(currentPageStatistics, results.get(reader.getCurIndex()));
+            reader.nextSeries();
+          }
+          reader.skipCurrentPage();
+          if (isEndCalc()) {
+            return true;
+          }
+          continue;
+        }
+      }
+
+      // calc from page data
+      BatchData batchData = reader.nextPage();
+      if (batchData == null || !batchData.hasCurrent()) {
+        continue;
+      }
+
+      // stop calc and cached current batchData
+      if (ascending && batchData.currentTime() >= curEndTime) {
+        preCachedData = batchData;
+        // reset the last position to current Index
+        lastReadCurArrayIndex = batchData.getReadCurArrayIndex();
+        lastReadCurListIndex = batchData.getReadCurListIndex();
+        return true;
+      }
+
+      // reset the last position to current Index
+      lastReadCurArrayIndex = batchData.getReadCurArrayIndex();
+      lastReadCurListIndex = batchData.getReadCurListIndex();
+
+      // calc from batch data
+      while (reader.hasNextSubSeries()) {
+        int subIndex = reader.getCurIndex();
+        batchData.resetBatchData(lastReadCurArrayIndex, lastReadCurListIndex);
+        calcFromBatch(batchData, subIndex, curStartTime, curEndTime, results.get(subIndex));
+        reader.nextSeries();
+      }
+
+      // reset the last position to current Index
+      lastReadCurArrayIndex = batchData.getReadCurArrayIndex();
+      lastReadCurListIndex = batchData.getReadCurListIndex();
+
+      // can calc for next interval
+      if (batchData.hasCurrent()) {
+        preCachedData = batchData;
+      }
+
+      // judge whether the calculation finished
+      if (isEndCalc()
+          || (batchData.hasCurrent()
+              && (ascending
+                  ? batchData.currentTime() >= curEndTime
+                  : batchData.currentTime() < curStartTime))) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean calcFromCacheData(long curStartTime, long curEndTime) throws IOException {
+    if (preCachedData == null) return false;
+    while (reader.hasNextSubSeries()) {
+      int subIndex = reader.getCurIndex();
+      preCachedData.resetBatchData(lastReadCurArrayIndex, lastReadCurListIndex);
+      calcFromBatch(preCachedData, subIndex, curStartTime, curEndTime, results.get(subIndex));
+      reader.nextSeries();
+    }
+    // The result is calculated from the cache
+    return (preCachedData != null
+            && (ascending
+                ? preCachedData.getMaxTimestamp() >= curEndTime
+                : preCachedData.getMinTimestamp() < curStartTime))
+        || isEndCalc();
+  }
+
+  private void calcFromBatch(
+      BatchData batchData,
+      int curIndex,
+      long curStartTime,
+      long curEndTime,
+      List<AggregateResult> aggregateResultList)
+      throws IOException {
+    // check if the batchData does not contain points in current interval
+    if (!satisfied(batchData, curStartTime, curEndTime)) {
+      return;
+    }
+
+    for (AggregateResult result : aggregateResultList) {
+      // current agg method has been calculated
+      if (result.hasFinalResult()) {
+        continue;
+      }
+      // lazy reset batch data for calculation
+      batchData.resetBatchData(lastReadCurArrayIndex, lastReadCurListIndex);
+      IBatchDataIterator batchDataIterator = batchData.getBatchDataIterator(curIndex);
+      if (ascending) {
+        // skip points that cannot be calculated
+        while (batchDataIterator.hasNext() && batchDataIterator.currentTime() < curStartTime) {
+          batchDataIterator.next();
+        }
+      } else {
+        while (batchDataIterator.hasNext() && batchDataIterator.currentTime() >= curEndTime) {
+          batchDataIterator.next();
+        }
+      }
+
+      if (batchDataIterator.hasNext()) {
+        result.updateResultFromPageData(batchDataIterator, curStartTime, curEndTime);
+      }
+    }
+  }
+
+  private boolean satisfied(BatchData batchData, long curStartTime, long curEndTime) {
+    if (batchData == null || !batchData.hasCurrent()) {
+      return false;
+    }
+
+    if (ascending
+        && (batchData.getMaxTimestamp() < curStartTime || batchData.currentTime() >= curEndTime)) {
+      return false;
+    }
+    if (!ascending
+        && (batchData.getTimeByIndex(0) >= curEndTime || batchData.currentTime() < curStartTime)) {
+      preCachedData = batchData;
+      return false;
+    }
+    return true;
+  }
+
+  private boolean isEmpty(Statistics statistics) {
+    if (statistics.getStartTime() == Long.MAX_VALUE && statistics.getEndTime() == Long.MIN_VALUE) {
+      return true;
+    }
+    return false;
+  }

Review comment:
       delete it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org