You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/17 12:16:35 UTC

[iotdb] branch alignedAggregateScanOp created (now 3c9a2f4fdf)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a change to branch alignedAggregateScanOp
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 3c9a2f4fdf implement AlignedSeriesAggregateScanOperator.java

This branch includes the following new commits:

     new 88c9d5ef52 add AlignedSeriesAggregateScanOperator
     new 4a0af20cb0 Merge branch 'master' into alignedAggregateScanOp
     new e2541077b6 change static methods
     new 3c9a2f4fdf implement AlignedSeriesAggregateScanOperator.java

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 02/04: Merge branch 'master' into alignedAggregateScanOp

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch alignedAggregateScanOp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4a0af20cb0d6e95c42a00d804ff54ec26175ddf6
Merge: 88c9d5ef52 c76113ac45
Author: Alima777 <wx...@gmail.com>
AuthorDate: Tue May 17 16:38:58 2022 +0800

    Merge branch 'master' into alignedAggregateScanOp

 .../resources/conf/iotdb-confignode.properties     |   4 +-
 .../confignode/client/AsyncDataNodeClientPool.java | 128 ++++++-
 .../iotdb/confignode/conf/ConfigNodeConf.java      |   4 +-
 .../consensus/request/write/CreateRegionsReq.java  |  32 +-
 .../request/write/RegisterDataNodeReq.java         |  22 +-
 ...deLocationsResp.java => DataNodeInfosResp.java} |  18 +-
 .../iotdb/confignode/manager/ConfigManager.java    |   4 +-
 .../iotdb/confignode/manager/NodeManager.java      |  40 +--
 .../iotdb/confignode/manager/PartitionManager.java |  64 ++--
 .../iotdb/confignode/manager/load/LoadManager.java | 267 ++++++---------
 .../manager/load/balancer/RegionBalancer.java      |  94 +++++-
 .../allocator/CopySetRegionAllocator.java          |  25 +-
 .../{ => balancer}/allocator/IRegionAllocator.java |   6 +-
 .../confignode/persistence/ClusterSchemaInfo.java  |  27 +-
 .../iotdb/confignode/persistence/NodeInfo.java     |  79 +++--
 .../confignode/persistence/PartitionInfo.java      | 107 ++++--
 .../impl/DeleteStorageGroupProcedure.java          |  13 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  13 +-
 .../consensus/request/ConfigRequestSerDeTest.java  |   9 +-
 .../iotdb/confignode/persistence/NodeInfoTest.java |  12 +-
 .../confignode/persistence/PartitionInfoTest.java  |  54 ++-
 .../thrift/ConfigNodeRPCServiceProcessorTest.java  |  37 ++-
 .../ratis/ApplicationStateMachineProxy.java        |   9 +
 docs/UserGuide/Maintenance-Tools/Metric-Tool.md    |  41 ++-
 docs/zh/UserGuide/Maintenance-Tools/Metric-Tool.md |  42 ++-
 .../iotdb/db/integration/IoTDBRestartIT.java       | 101 ++++--
 .../main/assembly/resources/conf/iotdb-metric.yml  |   3 +
 .../iotdb/metrics/DoNothingMetricService.java      |  10 +
 .../org/apache/iotdb/metrics/MetricService.java    |  13 +
 .../apache/iotdb/metrics/config/MetricConfig.java  |  11 +
 .../commons/utils/ThriftCommonsSerDeUtils.java     |  19 ++
 .../resources/conf/iotdb-engine.properties         |   4 -
 .../src/assembly/resources/sbin/stop-datanode.bat  |   2 +
 .../org/apache/iotdb/db/auth/AuthorityChecker.java |  31 +-
 .../apache/iotdb/db/client/ConfigNodeClient.java   | 326 ++++++++++++++----
 .../org/apache/iotdb/db/client/ConfigNodeInfo.java | 158 +++++++++
 .../iotdb/db/client/DataNodeClientPoolFactory.java |  18 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  11 -
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   7 -
 .../org/apache/iotdb/db/conf/IoTDBStartCheck.java  |  38 ---
 .../compaction/cross/CrossSpaceCompactionTask.java |   9 +-
 .../compaction/inner/InnerSpaceCompactionTask.java |  16 +-
 .../impl/ReadPointCompactionPerformer.java         |   7 +-
 .../compaction/task/AbstractCompactionTask.java    |   7 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |  54 ++-
 .../metadata/cache/DataNodeLastCacheManager.java   |  81 +++++
 .../db/metadata/cache/DataNodeSchemaCache.java     |  34 ++
 .../iotdb/db/metadata/cache/SchemaCacheEntry.java  |  19 ++
 .../iotdb/db/mpp/aggregation/Accumulator.java      |   3 +-
 .../iotdb/db/mpp/aggregation/Aggregator.java       |  39 ++-
 .../iotdb/db/mpp/aggregation/AvgAccumulator.java   |  38 ++-
 .../iotdb/db/mpp/aggregation/CountAccumulator.java |   8 +-
 .../db/mpp/aggregation/ExtremeAccumulator.java     |  23 +-
 .../db/mpp/aggregation/FirstValueAccumulator.java  |  25 +-
 .../mpp/aggregation/FirstValueDescAccumulator.java |  12 +-
 .../db/mpp/aggregation/LastValueAccumulator.java   |  32 +-
 .../mpp/aggregation/LastValueDescAccumulator.java  |  48 +--
 .../db/mpp/aggregation/MaxTimeAccumulator.java     |  24 +-
 .../db/mpp/aggregation/MaxTimeDescAccumulator.java |  17 +-
 .../db/mpp/aggregation/MaxValueAccumulator.java    |  22 +-
 .../db/mpp/aggregation/MinTimeAccumulator.java     |  20 +-
 .../db/mpp/aggregation/MinTimeDescAccumulator.java |   2 +-
 .../db/mpp/aggregation/MinValueAccumulator.java    |  22 +-
 .../iotdb/db/mpp/aggregation/SumAccumulator.java   |  34 +-
 .../timerangeiterator/AggrWindowIterator.java      | 172 ++++++++++
 .../timerangeiterator/ITimeRangeIterator.java      |  58 ++++
 .../timerangeiterator/PreAggrWindowIterator.java   | 169 ++++++++++
 .../PreAggrWindowWithNaturalMonthIterator.java     | 143 ++++++++
 .../SingleTimeWindowIterator.java                  |   7 +-
 .../TimeRangeIteratorFactory.java                  |  73 ++++
 .../operator/process/AggregateOperator.java        |  90 ++++-
 .../execution/operator/process/FilterOperator.java |  25 +-
 .../operator/process/RawDataAggregateOperator.java | 194 +++++++++++
 .../operator/process/TransformOperator.java        |  45 ++-
 .../source/SeriesAggregateScanOperator.java        | 107 +++---
 .../db/mpp/execution/schedule/DriverScheduler.java |   2 +-
 .../org/apache/iotdb/db/mpp/plan/Coordinator.java  |   8 +-
 .../apache/iotdb/db/mpp/plan/analyze/Analyzer.java |   1 +
 .../mpp/plan/analyze/ClusterPartitionFetcher.java  |  51 +--
 .../execution/config/AuthorizerConfigTask.java     |  37 ++-
 .../mpp/plan/execution/config/ConfigExecution.java |  14 +-
 .../execution/config/CountStorageGroupTask.java    |  16 +-
 .../execution/config/DeleteStorageGroupTask.java   |  17 +-
 .../db/mpp/plan/execution/config/IConfigTask.java  |   8 +-
 .../plan/execution/config/SetStorageGroupTask.java |  19 +-
 .../db/mpp/plan/execution/config/SetTTLTask.java   |  20 +-
 .../execution/config/ShowStorageGroupTask.java     |  20 +-
 .../db/mpp/plan/execution/config/ShowTTLTask.java  |  20 +-
 .../db/mpp/plan/parser/StatementGenerator.java     |  39 +--
 .../db/mpp/plan/planner/DistributionPlanner.java   |  51 ++-
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |  91 ++++-
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java |   2 +-
 .../planner/plan/node/process/AggregationNode.java |  64 +++-
 .../plan/node/process/GroupByLevelNode.java        |   6 +-
 .../node/source/SeriesAggregationScanNode.java     |  12 +-
 .../plan/parameter/AggregationDescriptor.java      |  61 +++-
 .../planner/plan/parameter/AggregationStep.java    |  44 ++-
 .../iotdb/db/query/expression/Expression.java      |  19 +-
 .../query/expression/binary/BinaryExpression.java  |  64 +++-
 .../db/query/expression/leaf/ConstantOperand.java  |  26 ++
 .../query/expression/leaf/TimeSeriesOperand.java   |  38 +++
 .../db/query/expression/leaf/TimestampOperand.java |  33 ++
 .../query/expression/multi/FunctionExpression.java | 115 +++++++
 .../db/query/expression/unary/UnaryExpression.java |  50 +++
 .../db/query/udf/core/executor/UDTFExecutor.java   |  27 ++
 .../query/udf/core/layer/EvaluationDAGBuilder.java |  25 +-
 .../java/org/apache/iotdb/db/service/DataNode.java |  24 +-
 .../apache/iotdb/db/service/metrics/Metric.java    |  20 +-
 .../iotdb/db/service/metrics/MetricsService.java   |  19 ++
 .../db/service/metrics/ProcessMetricsMonitor.java  | 148 +++++++++
 .../db/service/metrics/SysRunMetricsMonitor.java   | 134 ++++++++
 .../thrift/impl/DataNodeTSIServiceImpl.java        |  60 +++-
 .../db/service/thrift/impl/TSServiceImpl.java      |   3 +-
 .../db/metadata/cache/DataNodeSchemaCacheTest.java |  69 ++++
 .../iotdb/db/mpp/aggregation/AccumulatorTest.java  | 116 +++++--
 .../db/mpp/aggregation/TimeRangeIteratorTest.java  | 298 +++++++++++++++++
 .../db/mpp/execution/ConfigExecutionTest.java      |  16 +-
 .../execution/operator/AggregateOperatorTest.java  | 321 ++++++++++++++++++
 .../operator/RawDataAggregateOperatorTest.java     | 368 +++++++++++++++++++++
 .../operator/SeriesAggregateScanOperatorTest.java  |  25 +-
 .../node/process/AggregationNodeSerdeTest.java     | 228 +++++++++++++
 thrift-commons/src/main/thrift/common.thrift       |   5 +-
 .../src/main/thrift/confignode.thrift              |  11 +-
 .../apache/iotdb/tsfile/read/common/TimeRange.java |  20 +-
 .../write/writer/RestorableTsFileIOWriter.java     |  37 +--
 125 files changed, 5423 insertions(+), 1081 deletions(-)

diff --cc server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
index d5ce7f47fc,92010c87ec..cb50a86cb1
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
@@@ -33,10 -34,10 +34,8 @@@ import org.apache.iotdb.tsfile.read.com
  import org.apache.iotdb.tsfile.read.common.block.TsBlock;
  import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator;
  import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
- import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
- import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
  import org.apache.iotdb.tsfile.read.filter.basic.Filter;
  
 -import com.google.common.util.concurrent.ListenableFuture;
 -
  import java.io.IOException;
  import java.util.ArrayList;
  import java.util.Arrays;


[iotdb] 01/04: add AlignedSeriesAggregateScanOperator

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch alignedAggregateScanOp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 88c9d5ef527c322a7a2176d201e4a4e9586ef2ac
Author: Alima777 <wx...@gmail.com>
AuthorDate: Mon May 16 16:02:12 2022 +0800

    add AlignedSeriesAggregateScanOperator
---
 ...ava => AlignedSeriesAggregateScanOperator.java} | 96 ++++++++--------------
 .../source/SeriesAggregateScanOperator.java        |  8 --
 2 files changed, 35 insertions(+), 69 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregateScanOperator.java
similarity index 84%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregateScanOperator.java
index fdc50a808b..5deaa74c2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregateScanOperator.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.operator.source;
 
 import org.apache.iotdb.commons.path.PartialPath;
@@ -37,27 +38,18 @@ import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
-import com.google.common.util.concurrent.ListenableFuture;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 
-/**
- * This operator is responsible to do the aggregation calculation for one series based on global
- * time range and time split parameter.
- *
- * <p>Every time next() is invoked, one tsBlock which contains current time window will be returned.
- * In sliding window situation, current time window is a pre-aggregation window. If there is no time
- * split parameter, i.e. aggregation without groupBy, just one tsBlock will be returned.
- */
-public class SeriesAggregateScanOperator implements DataSourceOperator {
+/** This operator is responsible to do the aggregation calculation especially for aligned series. */
+public class AlignedSeriesAggregateScanOperator implements DataSourceOperator {
 
   private final OperatorContext operatorContext;
   private final PlanNodeId sourceId;
-  private final SeriesScanUtil seriesScanUtil;
+  private final AlignedSeriesScanUtil alignedSeriesScanUtil;
   private final boolean ascending;
   // We still think aggregator in SeriesAggregateScanOperator is a inputRaw step.
   // But in facing of statistics, it will invoke another method processStatistics()
@@ -74,7 +66,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
   private boolean hasCachedTsBlock = false;
   private boolean finished = false;
 
-  public SeriesAggregateScanOperator(
+  public AlignedSeriesAggregateScanOperator(
       PlanNodeId sourceId,
       PartialPath seriesPath,
       Set<String> allSensors,
@@ -86,15 +78,9 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
     this.sourceId = sourceId;
     this.operatorContext = context;
     this.ascending = ascending;
-    this.seriesScanUtil =
-        new SeriesScanUtil(
-            seriesPath,
-            allSensors,
-            seriesPath.getSeriesType(),
-            context.getInstanceContext(),
-            timeFilter,
-            null,
-            ascending);
+    this.alignedSeriesScanUtil =
+        new AlignedSeriesScanUtil(
+            seriesPath, allSensors, context.getInstanceContext(), timeFilter, null, ascending);
     this.aggregators = aggregators;
     List<TSDataType> dataTypes = new ArrayList<>();
     for (Aggregator aggregator : aggregators) {
@@ -130,12 +116,6 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
     return operatorContext;
   }
 
-  // TODO
-  @Override
-  public ListenableFuture<Void> isBlocked() {
-    return DataSourceOperator.super.isBlocked();
-  }
-
   @Override
   public TsBlock next() {
     if (hasCachedTsBlock || hasNext()) {
@@ -181,14 +161,14 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
       }
 
       // read from file first
-      while (seriesScanUtil.hasNextFile()) {
-        Statistics fileStatistics = seriesScanUtil.currentFileStatistics();
+      while (alignedSeriesScanUtil.hasNextFile()) {
+        Statistics fileStatistics = AlignedSeriesScanUtil.currentFileStatistics();
         if (fileStatistics.getStartTime() >= curTimeRange.getMax()) {
           if (ascending) {
             updateResultTsBlockUsingAggregateResult();
             return true;
           } else {
-            seriesScanUtil.skipCurrentFile();
+            alignedSeriesScanUtil.skipCurrentFile();
             continue;
           }
         }
@@ -196,7 +176,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
         if (canUseCurrentFileStatistics()
             && curTimeRange.contains(fileStatistics.getStartTime(), fileStatistics.getEndTime())) {
           calcFromStatistics(fileStatistics);
-          seriesScanUtil.skipCurrentFile();
+          alignedSeriesScanUtil.skipCurrentFile();
           continue;
         }
 
@@ -234,25 +214,19 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
     hasCachedTsBlock = true;
   }
 
-  // TODO Implement it later?
-  @Override
-  public void close() throws Exception {
-    DataSourceOperator.super.close();
-  }
-
   @Override
   public boolean isFinished() {
     return finished || (finished = hasNext());
   }
 
   @Override
-  public PlanNodeId getSourceId() {
-    return sourceId;
+  public void initQueryDataSource(QueryDataSource dataSource) {
+    alignedalignedSeriesScanUtil.initQueryDataSource(dataSource);
   }
 
   @Override
-  public void initQueryDataSource(QueryDataSource dataSource) {
-    seriesScanUtil.initQueryDataSource(dataSource);
+  public PlanNodeId getSourceId() {
+    return sourceId;
   }
 
   /** @return if already get the result */
@@ -337,8 +311,8 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
 
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   private boolean readAndCalcFromPage(TimeRange curTimeRange) throws IOException {
-    while (seriesScanUtil.hasNextPage()) {
-      Statistics pageStatistics = seriesScanUtil.currentPageStatistics();
+    while (alignedSeriesScanUtil.hasNextPage()) {
+      Statistics pageStatistics = alignedSeriesScanUtil.currentPageStatistics();
       // must be non overlapped page
       if (pageStatistics != null) {
         // There is no more eligible points in current time range
@@ -346,7 +320,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
           if (ascending) {
             return true;
           } else {
-            seriesScanUtil.skipCurrentPage();
+            alignedSeriesScanUtil.skipCurrentPage();
             continue;
           }
         }
@@ -354,7 +328,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
         if (canUseCurrentPageStatistics()
             && curTimeRange.contains(pageStatistics.getStartTime(), pageStatistics.getEndTime())) {
           calcFromStatistics(pageStatistics);
-          seriesScanUtil.skipCurrentPage();
+          alignedSeriesScanUtil.skipCurrentPage();
           if (isEndCalc()) {
             return true;
           }
@@ -363,7 +337,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
       }
 
       // calc from page data
-      TsBlock tsBlock = seriesScanUtil.nextPage();
+      TsBlock tsBlock = alignedSeriesScanUtil.nextPage();
       TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator();
       if (tsBlockIterator == null || !tsBlockIterator.hasNext()) {
         continue;
@@ -394,13 +368,13 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
   }
 
   private boolean readAndCalcFromChunk(TimeRange curTimeRange) throws IOException {
-    while (seriesScanUtil.hasNextChunk()) {
-      Statistics chunkStatistics = seriesScanUtil.currentChunkStatistics();
+    while (alignedSeriesScanUtil.hasNextChunk()) {
+      Statistics chunkStatistics = alignedSeriesScanUtil.currentChunkStatistics();
       if (chunkStatistics.getStartTime() >= curTimeRange.getMax()) {
         if (ascending) {
           return true;
         } else {
-          seriesScanUtil.skipCurrentChunk();
+          alignedSeriesScanUtil.skipCurrentChunk();
           continue;
         }
       }
@@ -408,7 +382,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
       if (canUseCurrentChunkStatistics()
           && curTimeRange.contains(chunkStatistics.getStartTime(), chunkStatistics.getEndTime())) {
         calcFromStatistics(chunkStatistics);
-        seriesScanUtil.skipCurrentChunk();
+        alignedSeriesScanUtil.skipCurrentChunk();
         continue;
       }
       // read page
@@ -430,31 +404,31 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
   }
 
   public boolean canUseCurrentFileStatistics() throws IOException {
-    Statistics fileStatistics = seriesScanUtil.currentFileStatistics();
-    return !seriesScanUtil.isFileOverlapped()
+    Statistics fileStatistics = alignedSeriesScanUtil.currentFileStatistics();
+    return !alignedSeriesScanUtil.isFileOverlapped()
         && containedByTimeFilter(fileStatistics)
-        && !seriesScanUtil.currentFileModified();
+        && !alignedSeriesScanUtil.currentFileModified();
   }
 
   public boolean canUseCurrentChunkStatistics() throws IOException {
-    Statistics chunkStatistics = seriesScanUtil.currentChunkStatistics();
-    return !seriesScanUtil.isChunkOverlapped()
+    Statistics chunkStatistics = alignedSeriesScanUtil.currentChunkStatistics();
+    return !alignedSeriesScanUtil.isChunkOverlapped()
         && containedByTimeFilter(chunkStatistics)
-        && !seriesScanUtil.currentChunkModified();
+        && !alignedSeriesScanUtil.currentChunkModified();
   }
 
   public boolean canUseCurrentPageStatistics() throws IOException {
-    Statistics currentPageStatistics = seriesScanUtil.currentPageStatistics();
+    Statistics currentPageStatistics = alignedSeriesScanUtil.currentPageStatistics();
     if (currentPageStatistics == null) {
       return false;
     }
-    return !seriesScanUtil.isPageOverlapped()
+    return !alignedSeriesScanUtil.isPageOverlapped()
         && containedByTimeFilter(currentPageStatistics)
-        && !seriesScanUtil.currentPageModified();
+        && !alignedSeriesScanUtil.currentPageModified();
   }
 
   private boolean containedByTimeFilter(Statistics statistics) {
-    Filter timeFilter = seriesScanUtil.getTimeFilter();
+    Filter timeFilter = alignedSeriesScanUtil.getTimeFilter();
     return timeFilter == null
         || timeFilter.containStartEndTime(statistics.getStartTime(), statistics.getEndTime());
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
index fdc50a808b..d5ce7f47fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java
@@ -37,8 +37,6 @@ import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
-import com.google.common.util.concurrent.ListenableFuture;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -130,12 +128,6 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
     return operatorContext;
   }
 
-  // TODO
-  @Override
-  public ListenableFuture<Void> isBlocked() {
-    return DataSourceOperator.super.isBlocked();
-  }
-
   @Override
   public TsBlock next() {
     if (hasCachedTsBlock || hasNext()) {


[iotdb] 04/04: implement AlignedSeriesAggregateScanOperator.java

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch alignedAggregateScanOp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3c9a2f4fdf251bd810462df44e209e2a599f5792
Author: Alima777 <wx...@gmail.com>
AuthorDate: Tue May 17 20:16:15 2022 +0800

    implement AlignedSeriesAggregateScanOperator.java
---
 data                                               |   1 +
 .../iotdb/db/mpp/aggregation/Aggregator.java       |  12 ++-
 .../source/AlignedSeriesAggregateScanOperator.java | 116 +++++++++++----------
 .../groupby/impl/LocalAlignedGroupByExecutor.java  |   2 -
 4 files changed, 72 insertions(+), 59 deletions(-)

diff --git a/data b/data
new file mode 120000
index 0000000000..fa2f394669
--- /dev/null
+++ b/data
@@ -0,0 +1 @@
+/Users/alima/Downloads/apache-iotdb-0.12.4-server-bin/data
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
index 6077648e59..b9ccc4c7b3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
@@ -60,7 +60,6 @@ public class Aggregator {
     checkArgument(
         step.isInputRaw(),
         "Step in SeriesAggregateScanOperator and RawDataAggregateOperator can only process raw input");
-    // TODO Aligned TimeSeries
     if (inputLocationList == null) {
       accumulator.addInput(tsBlock.getTimeAndValueColumn(0), timeRange);
     } else {
@@ -110,6 +109,17 @@ public class Aggregator {
     accumulator.addStatistics(statistics);
   }
 
+  /** Used for AlignedSeriesAggregateScanOperator. */
+  public void processStatistics(Statistics[] statistics) {
+    for (InputLocation[] inputLocations : inputLocationList) {
+      checkArgument(
+          inputLocations[0].getTsBlockIndex() == 0,
+          "AlignedSeriesAggregateScanOperator can only process one tsBlock input.");
+      int valueIndex = inputLocations[0].getValueColumnIndex();
+      accumulator.addStatistics(statistics[valueIndex]);
+    }
+  }
+
   public TSDataType[] getOutputType() {
     if (step.isOutputPartial()) {
       return accumulator.getIntermediateType();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregateScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregateScanOperator.java
index dcfcd6adc6..05914c7102 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregateScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregateScanOperator.java
@@ -19,11 +19,12 @@
 
 package org.apache.iotdb.db.mpp.execution.operator.source;
 
-import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.execution.operator.process.AggregateOperator;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -32,8 +33,6 @@ import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import java.io.IOException;
@@ -52,8 +51,9 @@ public class AlignedSeriesAggregateScanOperator implements DataSourceOperator {
   private final OperatorContext operatorContext;
   private final PlanNodeId sourceId;
   private final AlignedSeriesScanUtil alignedSeriesScanUtil;
+  private final int subSensorSize;
   private final boolean ascending;
-  // We still think aggregator in SeriesAggregateScanOperator is a inputRaw step.
+  // We still think aggregator in AlignedSeriesAggregateScanOperator is a inputRaw step.
   // But in facing of statistics, it will invoke another method processStatistics()
   private List<Aggregator> aggregators;
 
@@ -70,7 +70,7 @@ public class AlignedSeriesAggregateScanOperator implements DataSourceOperator {
 
   public AlignedSeriesAggregateScanOperator(
       PlanNodeId sourceId,
-      PartialPath seriesPath,
+      AlignedPath seriesPath,
       Set<String> allSensors,
       OperatorContext context,
       List<Aggregator> aggregators,
@@ -83,6 +83,7 @@ public class AlignedSeriesAggregateScanOperator implements DataSourceOperator {
     this.alignedSeriesScanUtil =
         new AlignedSeriesScanUtil(
             seriesPath, allSensors, context.getInstanceContext(), timeFilter, null, ascending);
+    this.subSensorSize = seriesPath.getMeasurementList().size();
     this.aggregators = aggregators;
     List<TSDataType> dataTypes = new ArrayList<>();
     for (Aggregator aggregator : aggregators) {
@@ -125,28 +126,28 @@ public class AlignedSeriesAggregateScanOperator implements DataSourceOperator {
 
       // 2. Calculate aggregation result based on current time window
       if (calcFromCacheData(curTimeRange)) {
-        updateResultTsBlockUsingAggregateResult();
+        updateResultTsBlockFromAggregators();
         return true;
       }
 
       // read page data firstly
       if (readAndCalcFromPage(curTimeRange)) {
-        updateResultTsBlockUsingAggregateResult();
+        updateResultTsBlockFromAggregators();
         return true;
       }
 
       // read chunk data secondly
       if (readAndCalcFromChunk(curTimeRange)) {
-        updateResultTsBlockUsingAggregateResult();
+        updateResultTsBlockFromAggregators();
         return true;
       }
 
       // read from file first
       while (alignedSeriesScanUtil.hasNextFile()) {
-        Statistics fileStatistics = AlignedSeriesScanUtil.currentFileStatistics();
-        if (fileStatistics.getStartTime() >= curTimeRange.getMax()) {
+        Statistics fileTimeStatistics = alignedSeriesScanUtil.currentFileTimeStatistics();
+        if (fileTimeStatistics.getStartTime() > curTimeRange.getMax()) {
           if (ascending) {
-            updateResultTsBlockUsingAggregateResult();
+            updateResultTsBlockFromAggregators();
             return true;
           } else {
             alignedSeriesScanUtil.skipCurrentFile();
@@ -155,49 +156,34 @@ public class AlignedSeriesAggregateScanOperator implements DataSourceOperator {
         }
         // calc from fileMetaData
         if (canUseCurrentFileStatistics()
-            && curTimeRange.contains(fileStatistics.getStartTime(), fileStatistics.getEndTime())) {
-          calcFromStatistics(fileStatistics);
+            && curTimeRange.contains(
+                fileTimeStatistics.getStartTime(), fileTimeStatistics.getEndTime())) {
+          Statistics[] statisticsList = new Statistics[subSensorSize];
+          for (int i = 0; i < subSensorSize; i++) {
+            statisticsList[i] = alignedSeriesScanUtil.currentFileStatistics(i);
+          }
+          calcFromStatistics(statisticsList);
           alignedSeriesScanUtil.skipCurrentFile();
           continue;
         }
 
         // read chunk
         if (readAndCalcFromChunk(curTimeRange)) {
-          updateResultTsBlockUsingAggregateResult();
+          updateResultTsBlockFromAggregators();
           return true;
         }
       }
 
-      updateResultTsBlockUsingAggregateResult();
+      updateResultTsBlockFromAggregators();
       return true;
     } catch (IOException e) {
       throw new RuntimeException("Error while scanning the file", e);
     }
   }
 
-  private void updateResultTsBlockUsingAggregateResult() {
-    tsBlockBuilder.reset();
-    TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
-    // Use start time of current time range as time column
-    timeColumnBuilder.writeLong(curTimeRange.getMin());
-    ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
-    int columnIndex = 0;
-    for (Aggregator aggregator : aggregators) {
-      ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length];
-      columnBuilder[0] = columnBuilders[columnIndex++];
-      if (columnBuilder.length > 1) {
-        columnBuilder[1] = columnBuilders[columnIndex++];
-      }
-      aggregator.outputResult(columnBuilder);
-    }
-    tsBlockBuilder.declarePosition();
-    resultTsBlock = tsBlockBuilder.build();
-    hasCachedTsBlock = true;
-  }
-
   @Override
   public boolean isFinished() {
-    return finished || (finished = hasNext());
+    return finished || (finished = !hasNext());
   }
 
   @Override
@@ -210,13 +196,20 @@ public class AlignedSeriesAggregateScanOperator implements DataSourceOperator {
     return sourceId;
   }
 
+  private void updateResultTsBlockFromAggregators() {
+    resultTsBlock =
+        AggregateOperator.updateResultTsBlockFromAggregators(
+            tsBlockBuilder, aggregators, timeRangeIterator);
+    hasCachedTsBlock = true;
+  }
+
   /** @return if already get the result */
   private boolean calcFromCacheData(TimeRange curTimeRange) throws IOException {
     calcFromBatch(preCachedData, curTimeRange);
     // The result is calculated from the cache
     return (preCachedData != null
             && (ascending
-                ? preCachedData.getEndTime() >= curTimeRange.getMax()
+                ? preCachedData.getEndTime() > curTimeRange.getMax()
                 : preCachedData.getStartTime() < curTimeRange.getMin()))
         || isEndCalc(aggregators);
   }
@@ -224,7 +217,7 @@ public class AlignedSeriesAggregateScanOperator implements DataSourceOperator {
   @SuppressWarnings("squid:S3776")
   private void calcFromBatch(TsBlock tsBlock, TimeRange curTimeRange) {
     // check if the batchData does not contain points in current interval
-    if (tsBlock == null || !satisfied(tsBlock, curTimeRange)) {
+    if (tsBlock == null || !satisfied(tsBlock, curTimeRange, ascending)) {
       return;
     }
 
@@ -246,7 +239,7 @@ public class AlignedSeriesAggregateScanOperator implements DataSourceOperator {
     }
   }
 
-  private boolean satisfied(TsBlock tsBlock, TimeRange timeRange) {
+  private boolean satisfied(TsBlock tsBlock, TimeRange timeRange, boolean ascending) {
     TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator();
     if (tsBlockIterator == null || !tsBlockIterator.hasNext()) {
       return false;
@@ -254,11 +247,11 @@ public class AlignedSeriesAggregateScanOperator implements DataSourceOperator {
 
     if (ascending
         && (tsBlockIterator.getEndTime() < timeRange.getMin()
-            || tsBlockIterator.currentTime() >= timeRange.getMax())) {
+            || tsBlockIterator.currentTime() > timeRange.getMax())) {
       return false;
     }
     if (!ascending
-        && (tsBlockIterator.getStartTime() >= timeRange.getMax()
+        && (tsBlockIterator.getEndTime() > timeRange.getMax()
             || tsBlockIterator.currentTime() < timeRange.getMin())) {
       preCachedData = tsBlock;
       return false;
@@ -269,11 +262,11 @@ public class AlignedSeriesAggregateScanOperator implements DataSourceOperator {
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   private boolean readAndCalcFromPage(TimeRange curTimeRange) throws IOException {
     while (alignedSeriesScanUtil.hasNextPage()) {
-      Statistics pageStatistics = alignedSeriesScanUtil.currentPageStatistics();
+      Statistics pageTimeStatistics = alignedSeriesScanUtil.currentPageTimeStatistics();
       // must be non overlapped page
-      if (pageStatistics != null) {
+      if (pageTimeStatistics != null) {
         // There is no more eligible points in current time range
-        if (pageStatistics.getStartTime() >= curTimeRange.getMax()) {
+        if (pageTimeStatistics.getStartTime() > curTimeRange.getMax()) {
           if (ascending) {
             return true;
           } else {
@@ -283,8 +276,13 @@ public class AlignedSeriesAggregateScanOperator implements DataSourceOperator {
         }
         // can use pageHeader
         if (canUseCurrentPageStatistics()
-            && curTimeRange.contains(pageStatistics.getStartTime(), pageStatistics.getEndTime())) {
-          calcFromStatistics(pageStatistics);
+            && curTimeRange.contains(
+                pageTimeStatistics.getStartTime(), pageTimeStatistics.getEndTime())) {
+          Statistics[] statisticsList = new Statistics[subSensorSize];
+          for (int i = 0; i < subSensorSize; i++) {
+            statisticsList[i] = alignedSeriesScanUtil.currentPageStatistics(i);
+          }
+          calcFromStatistics(statisticsList);
           alignedSeriesScanUtil.skipCurrentPage();
           if (isEndCalc(aggregators)) {
             return true;
@@ -304,7 +302,7 @@ public class AlignedSeriesAggregateScanOperator implements DataSourceOperator {
       // lastReadIndex = tsBlockIterator.getRowIndex();
 
       // stop calc and cached current batchData
-      if (ascending && tsBlockIterator.currentTime() >= curTimeRange.getMax()) {
+      if (ascending && tsBlockIterator.currentTime() > curTimeRange.getMax()) {
         preCachedData = tsBlock;
         return true;
       }
@@ -316,7 +314,7 @@ public class AlignedSeriesAggregateScanOperator implements DataSourceOperator {
       if (isEndCalc(aggregators)
           || (tsBlockIterator.hasNext()
               && (ascending
-                  ? tsBlockIterator.currentTime() >= curTimeRange.getMax()
+                  ? tsBlockIterator.currentTime() > curTimeRange.getMax()
                   : tsBlockIterator.currentTime() < curTimeRange.getMin()))) {
         return true;
       }
@@ -326,8 +324,8 @@ public class AlignedSeriesAggregateScanOperator implements DataSourceOperator {
 
   private boolean readAndCalcFromChunk(TimeRange curTimeRange) throws IOException {
     while (alignedSeriesScanUtil.hasNextChunk()) {
-      Statistics chunkStatistics = alignedSeriesScanUtil.currentChunkStatistics();
-      if (chunkStatistics.getStartTime() >= curTimeRange.getMax()) {
+      Statistics chunkTimeStatistics = alignedSeriesScanUtil.currentChunkTimeStatistics();
+      if (chunkTimeStatistics.getStartTime() > curTimeRange.getMax()) {
         if (ascending) {
           return true;
         } else {
@@ -337,8 +335,14 @@ public class AlignedSeriesAggregateScanOperator implements DataSourceOperator {
       }
       // calc from chunkMetaData
       if (canUseCurrentChunkStatistics()
-          && curTimeRange.contains(chunkStatistics.getStartTime(), chunkStatistics.getEndTime())) {
-        calcFromStatistics(chunkStatistics);
+          && curTimeRange.contains(
+              chunkTimeStatistics.getStartTime(), chunkTimeStatistics.getEndTime())) {
+        // calc from chunkMetaData
+        Statistics[] statisticsList = new Statistics[subSensorSize];
+        for (int i = 0; i < subSensorSize; i++) {
+          statisticsList[i] = alignedSeriesScanUtil.currentChunkStatistics(i);
+        }
+        calcFromStatistics(statisticsList);
         alignedSeriesScanUtil.skipCurrentChunk();
         continue;
       }
@@ -350,7 +354,7 @@ public class AlignedSeriesAggregateScanOperator implements DataSourceOperator {
     return false;
   }
 
-  private void calcFromStatistics(Statistics statistics) {
+  private void calcFromStatistics(Statistics[] statistics) {
     for (int i = 0; i < aggregators.size(); i++) {
       Aggregator aggregator = aggregators.get(i);
       if (aggregator.hasFinalResult()) {
@@ -361,21 +365,21 @@ public class AlignedSeriesAggregateScanOperator implements DataSourceOperator {
   }
 
   public boolean canUseCurrentFileStatistics() throws IOException {
-    Statistics fileStatistics = alignedSeriesScanUtil.currentFileStatistics();
+    Statistics fileStatistics = alignedSeriesScanUtil.currentFileTimeStatistics();
     return !alignedSeriesScanUtil.isFileOverlapped()
         && containedByTimeFilter(fileStatistics)
         && !alignedSeriesScanUtil.currentFileModified();
   }
 
   public boolean canUseCurrentChunkStatistics() throws IOException {
-    Statistics chunkStatistics = alignedSeriesScanUtil.currentChunkStatistics();
+    Statistics chunkStatistics = alignedSeriesScanUtil.currentChunkTimeStatistics();
     return !alignedSeriesScanUtil.isChunkOverlapped()
         && containedByTimeFilter(chunkStatistics)
         && !alignedSeriesScanUtil.currentChunkModified();
   }
 
   public boolean canUseCurrentPageStatistics() throws IOException {
-    Statistics currentPageStatistics = alignedSeriesScanUtil.currentPageStatistics();
+    Statistics currentPageStatistics = alignedSeriesScanUtil.currentPageTimeStatistics();
     if (currentPageStatistics == null) {
       return false;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/groupby/impl/LocalAlignedGroupByExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/groupby/impl/LocalAlignedGroupByExecutor.java
index 9a47eaad0a..aa5ede9abd 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/groupby/impl/LocalAlignedGroupByExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/groupby/impl/LocalAlignedGroupByExecutor.java
@@ -246,9 +246,7 @@ public class LocalAlignedGroupByExecutor implements AlignedGroupByExecutor {
 
       // set initial Index
       lastReadCurArrayIndex = batchData.getReadCurArrayIndex();
-      ;
       lastReadCurListIndex = batchData.getReadCurListIndex();
-      ;
 
       // stop calc and cached current batchData
       if (ascending && batchData.currentTime() >= curEndTime) {


[iotdb] 03/04: change static methods

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch alignedAggregateScanOp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e2541077b6d9e1c515e2699a345f84f9f1e2712a
Author: Alima777 <wx...@gmail.com>
AuthorDate: Tue May 17 17:14:53 2022 +0800

    change static methods
---
 .../source/AlignedSeriesAggregateScanOperator.java | 65 ++++------------------
 1 file changed, 11 insertions(+), 54 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregateScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregateScanOperator.java
index 5deaa74c2c..dcfcd6adc6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregateScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/AlignedSeriesAggregateScanOperator.java
@@ -22,12 +22,10 @@ package org.apache.iotdb.db.mpp.execution.operator.source;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
+import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
-import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator;
-import org.apache.iotdb.db.utils.timerangeiterator.SingleTimeWindowIterator;
-import org.apache.iotdb.db.utils.timerangeiterator.TimeRangeIteratorFactory;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
@@ -44,6 +42,10 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 
+import static org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregateOperator.isEndCalc;
+import static org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregateOperator.skipOutOfTimeRangePoints;
+import static org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregateScanOperator.initTimeRangeIterator;
+
 /** This operator is responsible to do the aggregation calculation especially for aligned series. */
 public class AlignedSeriesAggregateScanOperator implements DataSourceOperator {
 
@@ -87,28 +89,7 @@ public class AlignedSeriesAggregateScanOperator implements DataSourceOperator {
       dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
     }
     tsBlockBuilder = new TsBlockBuilder(dataTypes);
-    this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter);
-  }
-
-  /**
-   * If groupByTimeParameter is null, which means it's an aggregation query without down sampling.
-   * Aggregation query has only one time window and the result set of it does not contain a
-   * timestamp, so it doesn't matter what the time range returns.
-   */
-  public ITimeRangeIterator initTimeRangeIterator(GroupByTimeParameter groupByTimeParameter) {
-    if (groupByTimeParameter == null) {
-      return new SingleTimeWindowIterator(0, Long.MAX_VALUE);
-    } else {
-      return TimeRangeIteratorFactory.getTimeRangeIterator(
-          groupByTimeParameter.getStartTime(),
-          groupByTimeParameter.getEndTime(),
-          groupByTimeParameter.getInterval(),
-          groupByTimeParameter.getSlidingStep(),
-          ascending,
-          groupByTimeParameter.isIntervalByMonth(),
-          groupByTimeParameter.isSlidingStepByMonth(),
-          groupByTimeParameter.getInterval() > groupByTimeParameter.getSlidingStep());
-    }
+    this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending);
   }
 
   @Override
@@ -221,7 +202,7 @@ public class AlignedSeriesAggregateScanOperator implements DataSourceOperator {
 
   @Override
   public void initQueryDataSource(QueryDataSource dataSource) {
-    alignedalignedSeriesScanUtil.initQueryDataSource(dataSource);
+    alignedSeriesScanUtil.initQueryDataSource(dataSource);
   }
 
   @Override
@@ -237,7 +218,7 @@ public class AlignedSeriesAggregateScanOperator implements DataSourceOperator {
             && (ascending
                 ? preCachedData.getEndTime() >= curTimeRange.getMax()
                 : preCachedData.getStartTime() < curTimeRange.getMin()))
-        || isEndCalc();
+        || isEndCalc(aggregators);
   }
 
   @SuppressWarnings("squid:S3776")
@@ -248,7 +229,7 @@ public class AlignedSeriesAggregateScanOperator implements DataSourceOperator {
     }
 
     // skip points that cannot be calculated
-    tsBlock = skipOutOfTimeRangePoints(tsBlock, curTimeRange);
+    tsBlock = skipOutOfTimeRangePoints(tsBlock, curTimeRange, ascending);
 
     for (Aggregator aggregator : aggregators) {
       // current agg method has been calculated
@@ -265,21 +246,6 @@ public class AlignedSeriesAggregateScanOperator implements DataSourceOperator {
     }
   }
 
-  // skip points that cannot be calculated
-  private TsBlock skipOutOfTimeRangePoints(TsBlock tsBlock, TimeRange curTimeRange) {
-    TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator();
-    if (ascending) {
-      while (tsBlockIterator.hasNext() && tsBlockIterator.currentTime() < curTimeRange.getMin()) {
-        tsBlockIterator.next();
-      }
-    } else {
-      while (tsBlockIterator.hasNext() && tsBlockIterator.currentTime() >= curTimeRange.getMax()) {
-        tsBlockIterator.next();
-      }
-    }
-    return tsBlock.subTsBlock(tsBlockIterator.getRowIndex());
-  }
-
   private boolean satisfied(TsBlock tsBlock, TimeRange timeRange) {
     TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator();
     if (tsBlockIterator == null || !tsBlockIterator.hasNext()) {
@@ -300,15 +266,6 @@ public class AlignedSeriesAggregateScanOperator implements DataSourceOperator {
     return true;
   }
 
-  private boolean isEndCalc() {
-    for (Aggregator aggregator : aggregators) {
-      if (!aggregator.hasFinalResult()) {
-        return false;
-      }
-    }
-    return true;
-  }
-
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   private boolean readAndCalcFromPage(TimeRange curTimeRange) throws IOException {
     while (alignedSeriesScanUtil.hasNextPage()) {
@@ -329,7 +286,7 @@ public class AlignedSeriesAggregateScanOperator implements DataSourceOperator {
             && curTimeRange.contains(pageStatistics.getStartTime(), pageStatistics.getEndTime())) {
           calcFromStatistics(pageStatistics);
           alignedSeriesScanUtil.skipCurrentPage();
-          if (isEndCalc()) {
+          if (isEndCalc(aggregators)) {
             return true;
           }
           continue;
@@ -356,7 +313,7 @@ public class AlignedSeriesAggregateScanOperator implements DataSourceOperator {
       calcFromBatch(tsBlock, curTimeRange);
 
       // judge whether the calculation finished
-      if (isEndCalc()
+      if (isEndCalc(aggregators)
           || (tsBlockIterator.hasNext()
               && (ascending
                   ? tsBlockIterator.currentTime() >= curTimeRange.getMax()