You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/01 08:56:49 UTC

[iotdb] branch aggregator created (now 5843336eeb)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a change to branch aggregator
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 5843336eeb add unit tests

This branch includes the following new commits:

     new 25b649bd0d modify seriesAggregateScanOperator using aggregator
     new f69f489fff Merge branch 'master' into aggregator
     new e9a1b58d1b add processTsBlocks
     new be277e5a10 Merge branch 'master' into aggregator
     new 1065953784 implement part accumulator
     new af1cc09402 implement part accumulator 2
     new c00e3bb86a add create accumulator methods
     new aa5adcef26 add desc accumulator
     new 5843336eeb add unit tests

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 02/09: Merge branch 'master' into aggregator

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch aggregator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f69f489fff36cd20dd6d34235384fba119283552
Merge: 25b649bd0d 9cc21e1dfb
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Apr 28 17:21:43 2022 +0800

    Merge branch 'master' into aggregator

 .github/workflows/client-cpp.yml                   |   4 +-
 .github/workflows/sonar-coveralls.yml              |   5 +-
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  16 +-
 .../query/manage/ClusterSessionManager.java        |   1 +
 .../iotdb/cluster/server/ClusterRPCService.java    |   5 -
 .../server/clusterinfo/ClusterInfoServer.java      |   5 -
 .../server/raft/DataRaftHeartBeatService.java      |   6 -
 .../iotdb/cluster/server/raft/DataRaftService.java |   6 -
 .../server/raft/MetaRaftHeartBeatService.java      |   6 -
 .../iotdb/cluster/server/raft/MetaRaftService.java |   6 -
 .../query/ClusterPhysicalGeneratorTest.java        |   2 +-
 .../consensus/request/ConfigRequest.java           |  57 ++-
 .../consensus/request/ConfigRequestType.java       |   9 +-
 ...ataNodeInfoReq.java => GetDataNodeInfoReq.java} |  12 +-
 ...InfoReq.java => GetOrCountStorageGroupReq.java} |  43 +-
 .../SetDataReplicationFactorReq.java}              |  46 +-
 .../SetSchemaReplicationFactorReq.java}            |  46 +-
 .../SetTTLReq.java}                                |  45 +-
 .../SetTimePartitionIntervalReq.java}              |  46 +-
 ...pSchemaResp.java => CountStorageGroupResp.java} |  31 +-
 .../consensus/response/StorageGroupSchemaResp.java |  18 +-
 .../statemachine/PartitionRegionStateMachine.java  |   6 +-
 .../confignode/manager/ClusterSchemaManager.java   |  85 +++-
 .../iotdb/confignode/manager/ConfigManager.java    | 107 ++++-
 .../iotdb/confignode/manager/DataNodeManager.java  |   4 +-
 .../apache/iotdb/confignode/manager/Manager.java   |  50 +-
 .../iotdb/confignode/manager/PartitionManager.java |   8 +-
 .../iotdb/confignode/persistence/AuthorInfo.java   |  21 +
 .../confignode/persistence/ClusterSchemaInfo.java  | 320 +++++++++++++
 .../iotdb/confignode/persistence/DataNodeInfo.java |   4 +-
 .../confignode/persistence/StorageGroupInfo.java   | 150 ------
 .../iotdb/confignode/service/ConfigNode.java       |  17 +-
 .../confignode/service/ConfigNodeCommandLine.java  |   2 +-
 ...lanExecutor.java => ConfigRequestExecutor.java} |  71 +--
 .../service/thrift/ConfigNodeRPCService.java       |   9 -
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  88 +++-
 .../request}/ConfigRequestSerDeTest.java           | 307 +++++++------
 .../thrift/ConfigNodeRPCServiceProcessorTest.java  |  92 +++-
 .../utils/ConfigNodeEnvironmentUtils.java          |   4 +-
 .../apache/iotdb/consensus/ratis/RatisClient.java  | 107 +++++
 .../iotdb/consensus/ratis/RatisClientFactory.java  |  67 ---
 .../iotdb/consensus/ratis/RatisConsensus.java      |  93 ++--
 .../iotdb/consensus/ratis/RatisConsensusTest.java  |   2 +-
 .../Data-Concept/Data-Model-and-Terminology.md     |   2 +-
 docs/UserGuide/Ecosystem Integration/DBeaver.md    |  22 +-
 docs/UserGuide/Maintenance-Tools/Sync-Tool.md      | 119 ++++-
 docs/UserGuide/Query-Data/Select-Expression.md     |   7 +-
 docs/zh/UserGuide/Ecosystem Integration/DBeaver.md |  28 +-
 docs/zh/UserGuide/Maintenance-Tools/Sync-Tool.md   |  23 +-
 docs/zh/UserGuide/Query-Data/Select-Expression.md  |   7 +-
 .../iotdb/db/integration/IoTDBInsertNullIT.java    | 207 +++++++++
 .../db/integration/IoTDBPartialInsertionIT.java    |  62 +++
 .../iotdb/db/integration/IoTDBSelectSchemaIT.java  |  12 +-
 .../sync/IoTDBSyncReceiverCollectorIT.java         |  12 +-
 .../db/integration/sync/IoTDBSyncSenderIT.java     | 327 +++++++++++--
 .../db/integration/sync/TransportClientMock.java   |  12 +-
 .../db/integration/sync/TransportHandlerMock.java  |  61 +++
 .../commons/client/ClientFactoryProperty.java      |   4 +-
 .../iotdb/commons/client/ClientPoolProperty.java   |   3 +-
 .../iotdb/commons/service/ThriftService.java       |   4 +-
 .../apache/iotdb/commons/ClientManagerTest.java    |   9 +-
 .../iotdb/procedure/service/ProcedureServer.java   |   5 -
 .../resources/conf/iotdb-engine.properties         |   5 +
 .../resources/conf/schema-rocksdb.properties       |  24 +-
 .../db/auth/authorizer/AuthorizerManager.java      |  55 +++
 .../db/auth/authorizer/ClusterAuthorizer.java      |  24 +-
 .../apache/iotdb/db/client/ConfigNodeClient.java   |  22 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  27 ++
 .../org/apache/iotdb/db/conf/IoTDBConfigCheck.java |  33 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   5 +
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  11 -
 .../apache/iotdb/db/engine/StorageEngineV2.java    |  11 +
 .../iotdb/db/engine/memtable/AbstractMemTable.java |  23 +-
 .../iotdb/db/localconfignode/LocalConfigNode.java  |  13 +-
 .../iotdb/db/metadata/LocalSchemaProcessor.java    |   6 +-
 .../db/metadata/mnode/IStorageGroupMNode.java      |  12 +
 .../db/metadata/mnode/StorageGroupEntityMNode.java |  18 +
 .../iotdb/db/metadata/mnode/StorageGroupMNode.java |  43 +-
 .../iotdb/db/metadata/mtree/MTreeAboveSG.java      |   9 +
 .../schemaregion/rocksdb/RSchemaConfLoader.java    |   7 +-
 .../rocksdb/mnode/RStorageGroupMNode.java          |  18 +
 .../db/metadata/tree/AbstractTreeVisitor.java      | 376 +++++++++++++++
 .../tree/AbstractTreeVisitorWithLimitOffset.java   | 106 +++++
 .../ITreeNode.java}                                |   8 +-
 .../iotdb/db/mpp/buffer/DataBlockManager.java      |  17 +-
 .../iotdb/db/mpp/buffer/DataBlockService.java      |  13 +-
 .../mpp/buffer/DataBlockServiceClientFactory.java  | 117 -----
 .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java |  58 ++-
 .../apache/iotdb/db/mpp/buffer/SourceHandle.java   |  53 ++-
 .../iotdb/db/mpp/common/MPPQueryContext.java       |  25 +-
 .../iotdb/db/mpp/common/header/DatasetHeader.java  |   6 +
 .../iotdb/db/mpp/common/header/HeaderConstant.java |  40 ++
 .../db/mpp/common/schematree/DeviceSchemaInfo.java |  40 +-
 .../iotdb/db/mpp/common/schematree/SchemaTree.java |  33 +-
 .../mpp/common/schematree/SchemaTreeVisitor.java   | 237 ----------
 .../schematree/{ => node}/SchemaEntityNode.java    |   2 +-
 .../schematree/{ => node}/SchemaInternalNode.java  |   2 +-
 .../{ => node}/SchemaMeasurementNode.java          |   2 +-
 .../common/schematree/{ => node}/SchemaNode.java   |   6 +-
 .../visitor/SchemaTreeDeviceVisitor.java           |  61 +++
 .../visitor/SchemaTreeMeasurementVisitor.java      |  80 ++++
 .../schematree/visitor/SchemaTreeVisitor.java      |  60 +++
 .../apache/iotdb/db/mpp/execution/Coordinator.java |  48 +-
 .../apache/iotdb/db/mpp/execution/DataDriver.java  |   4 +-
 .../org/apache/iotdb/db/mpp/execution/Driver.java  |  30 --
 .../mpp/execution/FragmentInstanceExecution.java   |  16 +-
 .../db/mpp/execution/FragmentInstanceManager.java  |   6 +-
 .../org/apache/iotdb/db/mpp/execution/IDriver.java |  31 ++
 .../iotdb/db/mpp/execution/QueryExecution.java     |  19 +-
 .../db/mpp/execution/config/ConfigExecution.java   |   8 +-
 .../db/mpp/execution/config/ConfigTaskResult.java  |  10 +-
 .../db/mpp/execution/config/ConfigTaskVisitor.java |   9 +
 .../mpp/execution/config/SetStorageGroupTask.java  |   4 +
 .../mpp/execution/config/ShowStorageGroupTask.java | 113 +++++
 .../scheduler/AbstractFragInsStateTracker.java     |  45 +-
 .../mpp/execution/scheduler/ClusterScheduler.java  |  14 +-
 .../scheduler/FixedRateFragInsStateTracker.java    |  11 +-
 .../scheduler/InternalServiceClientFactory.java    | 112 -----
 .../scheduler/SimpleFragInstanceDispatcher.java    |  62 ++-
 .../execution/scheduler/SimpleQueryTerminator.java |  47 +-
 .../execution/scheduler/StandaloneScheduler.java   |  12 +-
 .../db/mpp/operator/schema/CountMergeOperator.java | 133 ++++++
 .../mpp/operator/schema/DevicesCountOperator.java  |  90 ++++
 .../operator/schema/DevicesSchemaScanOperator.java |  20 +-
 .../schema/LevelTimeSeriesCountOperator.java       | 100 ++++
 .../mpp/operator/schema/SchemaMergeOperator.java   |  31 +-
 .../operator/schema/TimeSeriesCountOperator.java   |  90 ++++
 .../schema/TimeSeriesSchemaScanOperator.java       |  17 +-
 ...ractExecutor.java => AbstractDriverThread.java} |  19 +-
 ...InstanceScheduler.java => DriverScheduler.java} | 126 +++--
 ...anceTaskExecutor.java => DriverTaskThread.java} |  12 +-
 ...l.java => DriverTaskTimeoutSentinelThread.java} |  12 +-
 .../iotdb/db/mpp/schedule/ExecutionContext.java    |   4 +-
 ...nstanceScheduler.java => IDriverScheduler.java} |   7 +-
 .../iotdb/db/mpp/schedule/ITaskScheduler.java      |  35 +-
 .../{FragmentInstanceTask.java => DriverTask.java} |  40 +-
 ...agmentInstanceTaskID.java => DriverTaskID.java} |   8 +-
 ...stanceTaskStatus.java => DriverTaskStatus.java} |   4 +-
 .../apache/iotdb/db/mpp/sql/analyze/Analyzer.java  | 187 ++++----
 .../mpp/sql/analyze/ClusterPartitionFetcher.java   | 446 ++++++++++++++++--
 .../mpp/sql/analyze/FakePartitionFetcherImpl.java  |  11 +
 .../db/mpp/sql/analyze/FakeSchemaFetcherImpl.java  |   8 +-
 .../db/mpp/sql/analyze/IPartitionFetcher.java      |   4 +
 .../iotdb/db/mpp/sql/analyze/SchemaValidator.java  |  57 +++
 .../sql/analyze/StandalonePartitionFetcher.java    |  11 +
 .../iotdb/db/mpp/sql/constant/StatementType.java   |   4 +-
 .../apache/iotdb/db/mpp/sql/parser/ASTVisitor.java | 157 +++++--
 .../db/mpp/sql/parser/StatementGenerator.java      |   5 +-
 .../db/mpp/sql/planner/DistributionPlanner.java    |  53 ++-
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  |  98 +++-
 .../iotdb/db/mpp/sql/planner/LogicalPlanner.java   | 169 ++++---
 .../iotdb/db/mpp/sql/planner/QueryPlanBuilder.java |  33 +-
 .../sql/planner/SimpleFragmentParallelPlanner.java |   9 +-
 .../db/mpp/sql/planner/plan/FragmentInstance.java  |  25 +-
 .../db/mpp/sql/planner/plan/node/PlanNodeType.java |  22 +-
 .../db/mpp/sql/planner/plan/node/PlanVisitor.java  |  24 +-
 ...MergeNode.java => AbstractSchemaMergeNode.java} |  55 +--
 ...emaMergeNode.java => CountSchemaMergeNode.java} |  62 +--
 ...{SchemaMergeNode.java => DevicesCountNode.java} |  70 ++-
 .../node/metedata/read/DevicesSchemaScanNode.java  |  24 +-
 ...ScanNode.java => LevelTimeSeriesCountNode.java} |  66 +--
 .../plan/node/metedata/read/SchemaFetchNode.java   |  15 -
 .../plan/node/metedata/read/SchemaScanNode.java    |  28 +-
 ...maMergeNode.java => SeriesSchemaMergeNode.java} |  50 +-
 ...hemaFetchNode.java => TimeSeriesCountNode.java} |  73 +--
 .../metedata/read/TimeSeriesSchemaScanNode.java    |  16 +-
 .../node/metedata/write/AlterTimeSeriesNode.java   |  99 ++--
 .../write/CreateAlignedTimeSeriesNode.java         | 135 +++---
 .../node/metedata/write/CreateTimeSeriesNode.java  |  14 +-
 .../plan/node/write/InsertMultiTabletsNode.java    |  89 +++-
 .../sql/planner/plan/node/write/InsertNode.java    |  77 ++--
 .../sql/planner/plan/node/write/InsertRowNode.java | 369 ++++++++++-----
 .../planner/plan/node/write/InsertRowsNode.java    |  93 +++-
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |  79 +++-
 .../planner/plan/node/write/InsertTabletNode.java  | 505 +++++++++++----------
 .../db/mpp/sql/rewriter/ConcatPathRewriter.java    |   2 +-
 .../db/mpp/sql/rewriter/WildcardsRemover.java      |   2 +-
 ...{ConfigStatement.java => IConfigStatement.java} |   4 +-
 .../db/mpp/sql/statement/StatementVisitor.java     |  15 +
 .../component/GroupByLevelController.java          |   2 +-
 .../mpp/sql/statement/component/ResultColumn.java  |   2 +-
 .../sql/statement/component/SelectComponent.java   |   4 +-
 .../statement/crud/AggregationQueryStatement.java  |   4 +-
 .../sql/statement/crud/BatchInsert.java}           |  20 +-
 .../sql/statement/crud/InsertBaseStatement.java    |  31 --
 .../crud/InsertMultiTabletsStatement.java          |  11 -
 .../mpp/sql/statement/crud/InsertRowStatement.java |  99 +---
 .../crud/InsertRowsOfOneDeviceStatement.java       |  22 +-
 .../sql/statement/crud/InsertRowsStatement.java    |  18 -
 .../sql/statement/crud/InsertTabletStatement.java  |  45 +-
 .../mpp/sql/statement/crud/LastQueryStatement.java |   2 +-
 .../db/mpp/sql/statement/crud/QueryStatement.java  |   4 +-
 .../mpp/sql/statement/crud/UDAFQueryStatement.java |   2 +-
 ...upStatement.java => CountDevicesStatement.java} |  23 +-
 ...ent.java => CountLevelTimeSeriesStatement.java} |  24 +-
 .../statement/metadata/CountNodesStatement.java}   |  14 +-
 ...rageGroupStatement.java => CountStatement.java} |  26 +-
 .../metadata/CountStorageGroupStatement.java}      |  13 +-
 ...tatement.java => CountTimeSeriesStatement.java} |  23 +-
 .../metadata/SetStorageGroupStatement.java         |   5 +-
 .../metadata/ShowStorageGroupStatement.java        |   9 +-
 .../db/mpp/sql/statement/sys/AuthorStatement.java  |   5 +-
 .../protocol/influxdb/function/InfluxFunction.java |   2 +-
 .../db/protocol/influxdb/handler/QueryHandler.java |   4 +-
 .../influxdb/operator/InfluxSelectComponent.java   |   4 +-
 .../protocol/influxdb/sql/InfluxDBSqlVisitor.java  |   4 +-
 .../qp/logical/crud/AggregationQueryOperator.java  |   4 +-
 .../db/qp/logical/crud/LastQueryOperator.java      |   2 +-
 .../iotdb/db/qp/logical/crud/QueryOperator.java    |   4 +-
 .../iotdb/db/qp/logical/crud/SelectComponent.java  |   4 +-
 .../db/qp/logical/crud/UDAFQueryOperator.java      |   4 +-
 .../iotdb/db/qp/physical/crud/InsertRowPlan.java   |  28 +-
 .../db/qp/physical/crud/InsertTabletPlan.java      |  69 +--
 .../db/qp/physical/sys/DeleteTimeSeriesPlan.java   |  12 +
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |  95 ++--
 .../iotdb/db/qp/strategy/LogicalGenerator.java     |   2 +-
 .../qp/strategy/optimizer/ConcatPathOptimizer.java |   2 +-
 .../iotdb/db/qp/utils/GroupByLevelController.java  |   2 +-
 .../iotdb/db/query/expression/Expression.java      |  75 +--
 .../iotdb/db/query/expression/ExpressionType.java  |  61 ++-
 .../expression/binary/AdditionExpression.java      |   4 +-
 .../query/expression/binary/BinaryExpression.java  |   5 +-
 .../expression/binary/DivisionExpression.java      |   4 +-
 .../query/expression/binary/EqualToExpression.java |   4 +-
 .../expression/binary/GreaterEqualExpression.java  |   4 +-
 .../expression/binary/GreaterThanExpression.java   |   4 +-
 .../expression/binary/LessEqualExpression.java     |   4 +-
 .../expression/binary/LessThanExpression.java      |   4 +-
 .../expression/binary/LogicAndExpression.java      |   4 +-
 .../query/expression/binary/LogicOrExpression.java |   4 +-
 .../query/expression/binary/ModuloExpression.java  |   4 +-
 .../binary/MultiplicationExpression.java           |   4 +-
 .../expression/binary/NonEqualExpression.java      |   4 +-
 .../expression/binary/SubtractionExpression.java   |   4 +-
 .../{unary => leaf}/ConstantOperand.java           |  28 +-
 .../db/query/expression/leaf/LeafOperand.java      |  25 +-
 .../{unary => leaf}/TimeSeriesOperand.java         |  22 +-
 .../{unary => multi}/FunctionExpression.java       |  10 +-
 .../db/query/expression/unary/InExpression.java    |  86 ++++
 .../db/query/expression/unary/LikeExpression.java  |  69 +++
 .../query/expression/unary/LogicNotExpression.java | 183 +-------
 .../query/expression/unary/NegationExpression.java | 184 +-------
 .../query/expression/unary/RegularExpression.java  | 138 +-----
 ...ogicNotExpression.java => UnaryExpression.java} | 129 +++---
 .../api/customizer/parameter/UDFParameters.java    |   2 +-
 .../db/query/udf/core/executor/UDTFContext.java    |   2 +-
 .../db/query/udf/core/executor/UDTFExecutor.java   |   2 +-
 .../udf/core/layer/ConstantIntermediateLayer.java  |   2 +-
 .../query/udf/core/layer/LayerMemoryAssigner.java  |   2 +-
 .../udf/core/reader/ConstantLayerPointReader.java  |   2 +-
 .../query/udf/service/UDFRegistrationService.java  |   2 +-
 .../java/org/apache/iotdb/db/service/DataNode.java |  18 +-
 .../iotdb/db/service/InfluxDBRPCService.java       |   5 -
 .../apache/iotdb/db/service/InternalService.java   |   5 -
 .../java/org/apache/iotdb/db/service/IoTDB.java    |   6 +-
 .../org/apache/iotdb/db/service/RPCService.java    |   5 -
 .../thrift/impl/DataNodeTSIServiceImpl.java        |   8 +-
 .../service/thrift/impl/InternalServiceImpl.java   |  25 +-
 .../apache/iotdb/db/sync/conf/SyncConstant.java    |  54 +--
 .../apache/iotdb/db/sync/conf/SyncPathUtil.java    |  82 +++-
 .../iotdb/db/sync/pipedata/DeletionPipeData.java   |   2 +-
 .../sync/pipedata/queue/BufferedPipeDataQueue.java |  34 +-
 .../db/sync/pipedata/queue/PipeDataQueue.java      |   2 +
 .../db/sync/receiver/collector/Collector.java      |   4 +-
 .../db/sync/receiver/manager/ReceiverManager.java  |  32 +-
 .../db/sync/sender/manager/SchemaSyncManager.java  |  18 +-
 .../iotdb/db/sync/sender/pipe/IoTDBPipeSink.java   |  18 +
 .../org/apache/iotdb/db/sync/sender/pipe/Pipe.java |  10 +
 .../iotdb/db/sync/sender/pipe/TsFilePipe.java      |  61 ++-
 .../db/sync/sender/recovery/SenderLogAnalyzer.java |  25 +-
 .../db/sync/sender/recovery/SenderLogger.java      |  12 +-
 .../db/sync/sender/recovery/TsFilePipeLogger.java  |   2 +-
 .../iotdb/db/sync/sender/service/MsgManager.java   | 114 +++++
 .../db/sync/sender/service/SenderService.java      |  81 ++--
 .../db/sync/sender/service/TransportHandler.java   |  47 +-
 .../db/sync/transport/client/TransportClient.java  |  35 +-
 .../transport/server/TransportServerManager.java   |   5 -
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   |  87 ++--
 .../db/engine/storagegroup/DataRegionTest.java     |  44 +-
 .../engine/storagegroup/TsFileProcessorV2Test.java |  25 +-
 .../apache/iotdb/db/mpp/buffer/SinkHandleTest.java |  51 ++-
 .../iotdb/db/mpp/buffer/SourceHandleTest.java      |  65 ++-
 .../db/mpp/common/schematree/SchemaTreeTest.java   | 167 ++++++-
 .../db/mpp/execution/ConfigExecutionTest.java      |  11 +-
 .../iotdb/db/mpp/execution/DataDriverTest.java     |   2 +-
 ...eratorTest.java => CountMergeOperatorTest.java} | 177 +++-----
 ...ratorTest.java => SchemaCountOperatorTest.java} | 215 ++++-----
 .../operator/schema/SchemaFetchOperatorTest.java   |   2 +-
 .../operator/schema/SchemaScanOperatorTest.java    |   3 +-
 .../db/mpp/schedule/DefaultTaskSchedulerTest.java  | 169 ++++---
 ...SchedulerTest.java => DriverSchedulerTest.java} |  52 +--
 ...va => DriverTaskTimeoutSentinelThreadTest.java} | 115 ++---
 .../db/mpp/sql/plan/DistributionPlannerTest.java   |  71 +--
 .../db/mpp/sql/plan/FragmentInstanceIdTest.java    |   2 +-
 .../db/mpp/sql/plan/FragmentInstanceSerdeTest.java |   4 +-
 .../iotdb/db/mpp/sql/plan/LogicalPlannerTest.java  |  29 +-
 .../iotdb/db/mpp/sql/plan/QueryPlannerTest.java    |  32 +-
 .../read/DeviceSchemaScanNodeSerdeTest.java        |   7 +-
 ...erdeTest.java => SchemaCountNodeSerdeTest.java} |  64 ++-
 .../read/TimeSeriesSchemaScanNodeSerdeTest.java    |   7 +-
 .../plan/node/process/AggregateNodeSerdeTest.java  |   2 +-
 .../node/process/DeviceMergeNodeSerdeTest.java     |   2 +-
 .../plan/node/process/ExchangeNodeSerdeTest.java   |   2 +-
 .../sql/plan/node/process/FillNodeSerdeTest.java   |   2 +-
 .../sql/plan/node/process/FilterNodeSerdeTest.java |   2 +-
 .../plan/node/process/FilterNullNodeSerdeTest.java |   2 +-
 .../node/process/GroupByLevelNodeSerdeTest.java    |   2 +-
 .../sql/plan/node/process/LimitNodeSerdeTest.java  |   2 +-
 .../sql/plan/node/process/OffsetNodeSerdeTest.java |   4 +-
 .../sql/plan/node/process/SortNodeSerdeTest.java   |   2 +-
 .../plan/node/process/TimeJoinNodeSerdeTest.java   |   2 +-
 .../plan/node/sink/FragmentSinkNodeSerdeTest.java  |   2 +-
 .../source/SeriesAggregateScanNodeSerdeTest.java   |   2 +-
 .../plan/node/source/SeriesScanNodeSerdeTest.java  |   2 +-
 .../write/InsertMultiTabletsNodeSerdeTest.java     | 101 +++++
 .../plan/node/write/InsertRowNodeSerdeTest.java    | 136 ++++--
 .../plan/node/write/InsertRowsNodeSerdeTest.java   |  72 +++
 .../write/InsertRowsOfOneDeviceNodeSerdeTest.java  |  74 +++
 .../plan/node/write/InsertTabletNodeSerdeTest.java |  98 +++-
 .../influxdb/sql/InfluxDBLogicalGeneratorTest.java |   2 +-
 .../iotdb/db/service/InternalServiceImplTest.java  |   4 +-
 .../sync/pipedata/BufferedPipeDataQueueTest.java   |  26 +-
 .../sync/receiver/manager/ReceiverManagerTest.java |   2 +-
 .../receiver/recovery/ReceiverLogAnalyzerTest.java |   6 +-
 .../iotdb/rpc/ConfigNodeConnectionException.java   |  13 +-
 .../src/main/thrift/confignode.thrift              |  31 +-
 326 files changed, 8770 insertions(+), 4944 deletions(-)



[iotdb] 07/09: add create accumulator methods

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch aggregator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c00e3bb86a8fd3673d28af1cd6dca2a2216d54d7
Author: Alima777 <wx...@gmail.com>
AuthorDate: Sun May 1 14:32:08 2022 +0800

    add create accumulator methods
---
 .../iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java
index 1eb421c4e7..e389d543be 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 public class AccumulatorFactory {
 
+  // TODO: Are we going to create different seriesScanOperator based on order by sequence?
   public static Accumulator createAccumulator(
       AggregationType aggregationType, TSDataType tsDataType) {
     switch (aggregationType) {
@@ -32,15 +33,23 @@ public class AccumulatorFactory {
       case AVG:
         return new AvgAccumulator(tsDataType);
       case SUM:
+        return new SumAccumulator(tsDataType);
       case EXTREME:
+        return new ExtremeAccumulator(tsDataType);
       case MAX_TIME:
+        return new MaxTimeAccumulator();
       case MIN_TIME:
+        return new MinTimeAccumulator();
       case MAX_VALUE:
+        return new MaxValueAccumulator(tsDataType);
       case MIN_VALUE:
+        return new MinValueAccumulator(tsDataType);
       case LAST_VALUE:
+        return new LastValueAccumulator(tsDataType);
       case FIRST_VALUE:
+        return new FirstValueAccumulator(tsDataType);
       default:
-        return null;
+        throw new IllegalArgumentException("Invalid Aggregation function: " + aggregationType);
     }
   }
 }


[iotdb] 09/09: add unit tests

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch aggregator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5843336eeb9151c303f40bc80455af8181582d66
Author: Alima777 <wx...@gmail.com>
AuthorDate: Sun May 1 16:56:33 2022 +0800

    add unit tests
---
 .../operator/aggregation/AccumulatorFactory.java   |  12 ++
 .../db/mpp/operator/aggregation/Aggregator.java    |   4 +-
 .../aggregation/FirstValueAccumulator.java         |   1 +
 .../aggregation/FirstValueDescAccumulator.java     |   2 +-
 .../operator/aggregation/LastValueAccumulator.java |   2 +-
 .../aggregation/LastValueDescAccumulator.java      |  15 ++-
 .../aggregation/MaxTimeDescAccumulator.java        |  15 ++-
 .../operator/aggregation/MinTimeAccumulator.java   |   2 +
 .../operator/aggregation/MinValueAccumulator.java  |   2 +-
 .../operator/SeriesAggregateScanOperatorTest.java  | 146 ++++++++++++++++++---
 .../apache/iotdb/tsfile/utils/TsPrimitiveType.java |   7 +
 11 files changed, 176 insertions(+), 32 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java
index 10619b9e35..64ad9fc917 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java
@@ -22,6 +22,9 @@ package org.apache.iotdb.db.mpp.operator.aggregation;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
+import java.util.ArrayList;
+import java.util.List;
+
 public class AccumulatorFactory {
 
   // TODO: Are we going to create different seriesScanOperator based on order by sequence?
@@ -56,4 +59,13 @@ public class AccumulatorFactory {
         throw new IllegalArgumentException("Invalid Aggregation function: " + aggregationType);
     }
   }
+
+  public static List<Accumulator> createAccumulators(
+      List<AggregationType> aggregationTypes, TSDataType tsDataType, boolean ascending) {
+    List<Accumulator> accumulators = new ArrayList<>();
+    for (AggregationType aggregationType : aggregationTypes) {
+      accumulators.add(createAccumulator(aggregationType, tsDataType, ascending));
+    }
+    return accumulators;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
index 617d89d4ae..5cad7948c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
@@ -97,9 +97,9 @@ public class Aggregator {
 
   public TSDataType[] getOutputType() {
     if (step.isOutputPartial()) {
-      return new TSDataType[] {accumulator.getFinalType()};
-    } else {
       return accumulator.getIntermediateType();
+    } else {
+      return new TSDataType[] {accumulator.getFinalType()};
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
index 8fa0801faf..bdb4ff49e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
@@ -80,6 +80,7 @@ public class FirstValueAccumulator implements Accumulator {
 
   @Override
   public void reset() {
+    hasCandidateResult = false;
     this.minTime = Long.MAX_VALUE;
     this.firstValue.reset();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueDescAccumulator.java
index 87b939438e..674e251f6b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueDescAccumulator.java
@@ -35,7 +35,7 @@ public class FirstValueDescAccumulator extends FirstValueAccumulator {
     for (int i = 0; i < column[0].getPositionCount(); i++) {
       long curTime = column[0].getLong(i);
       if (curTime >= timeRange.getMin() && curTime < timeRange.getMax()) {
-        updateFirstValue(column[1].getObject(0), curTime);
+        updateFirstValue(column[1].getObject(i), curTime);
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
index 901759b687..8f636e5594 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
@@ -41,7 +41,7 @@ public class LastValueAccumulator implements Accumulator {
     for (int i = 0; i < column[0].getPositionCount(); i++) {
       long curTime = column[0].getLong(i);
       if (curTime >= timeRange.getMin() && curTime < timeRange.getMax()) {
-        updateLastValue(column[1].getObject(0), curTime);
+        updateLastValue(column[1].getObject(i), curTime);
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueDescAccumulator.java
index 3d3f61644f..6cc8965251 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueDescAccumulator.java
@@ -34,9 +34,12 @@ public class LastValueDescAccumulator extends LastValueAccumulator {
   // Column should be like: | Time | Value |
   @Override
   public void addInput(Column[] column, TimeRange timeRange) {
-    long curTime = column[0].getLong(0);
-    if (curTime < timeRange.getMax() && curTime >= timeRange.getMin()) {
-      updateLastValue(column[1].getObject(0), curTime);
+    // Data inside tsBlock is still in ascending order, we have to traverse the first tsBlock
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax()) {
+        updateLastValue(column[1].getObject(i), curTime);
+      }
     }
   }
 
@@ -45,6 +48,12 @@ public class LastValueDescAccumulator extends LastValueAccumulator {
     return hasCandidateResult;
   }
 
+  @Override
+  public void reset() {
+    hasCandidateResult = false;
+    super.reset();
+  }
+
   protected void updateLastValue(Object value, long curTime) {
     hasCandidateResult = true;
     super.updateLastValue(value, curTime);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeDescAccumulator.java
index 01fb65f541..03f99eb61c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeDescAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeDescAccumulator.java
@@ -29,9 +29,12 @@ public class MaxTimeDescAccumulator extends MaxTimeAccumulator {
   // Column should be like: | Time |
   @Override
   public void addInput(Column[] column, TimeRange timeRange) {
-    long curTime = column[0].getLong(0);
-    if (curTime < timeRange.getMax() && curTime >= timeRange.getMin()) {
-      updateMaxTime(curTime);
+    // Data inside tsBlock is still in ascending order, we have to traverse the tsBlock
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax()) {
+        updateMaxTime(curTime);
+      }
     }
   }
 
@@ -40,6 +43,12 @@ public class MaxTimeDescAccumulator extends MaxTimeAccumulator {
     return hasCandidateResult;
   }
 
+  @Override
+  public void reset() {
+    hasCandidateResult = false;
+    super.reset();
+  }
+
   protected void updateMaxTime(long curTime) {
     hasCandidateResult = true;
     super.updateMaxTime(curTime);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
index b80adbe470..0001a9d791 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
@@ -74,6 +74,8 @@ public class MinTimeAccumulator implements Accumulator {
 
   @Override
   public void reset() {
+
+    hasCandidateResult = false;
     this.minTime = Long.MAX_VALUE;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinValueAccumulator.java
index 97f46724ae..7a6b7b73fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinValueAccumulator.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 public class MinValueAccumulator implements Accumulator {
 
   private TsPrimitiveType minResult;
-  private boolean hasCandidateResult;
+  private boolean hasCandidateResult = false;
 
   public MinValueAccumulator(TSDataType seriesDataType) {
     this.minResult = TsPrimitiveType.getByType(seriesDataType);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
index 2b4d7c4c7c..8afac0e69b 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
@@ -50,6 +50,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import com.google.common.collect.Sets;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -89,16 +90,12 @@ public class SeriesAggregateScanOperatorTest {
 
   @Test
   public void testAggregationWithoutTimeFilter() throws IllegalPathException {
+    List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(
-            Collections.singletonList(
-                new Aggregator(
-                    AccumulatorFactory.createAccumulator(
-                        AggregationType.COUNT, TSDataType.INT32, true),
-                    AggregationStep.SINGLE)),
-            null,
-            true,
-            null);
+        initSeriesAggregateScanOperator(aggregators, null, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -113,8 +110,11 @@ public class SeriesAggregateScanOperatorTest {
     List<AggregationType> aggregationTypes = new ArrayList<>();
     aggregationTypes.add(AggregationType.COUNT);
     aggregationTypes.add(AggregationType.SUM);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(null, null, true, null);
+        initSeriesAggregateScanOperator(aggregators, null, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -134,8 +134,41 @@ public class SeriesAggregateScanOperatorTest {
     aggregationTypes.add(AggregationType.MAX_TIME);
     aggregationTypes.add(AggregationType.MAX_VALUE);
     aggregationTypes.add(AggregationType.MIN_VALUE);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(null, null, true, null);
+        initSeriesAggregateScanOperator(aggregators, null, true, null);
+    int count = 0;
+    while (seriesAggregateScanOperator.hasNext()) {
+      TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+      assertEquals(20000, resultTsBlock.getColumn(0).getInt(0));
+      assertEquals(10499, resultTsBlock.getColumn(1).getInt(0));
+      assertEquals(0, resultTsBlock.getColumn(2).getLong(0));
+      assertEquals(499, resultTsBlock.getColumn(3).getLong(0));
+      assertEquals(20199, resultTsBlock.getColumn(4).getInt(0));
+      assertEquals(260, resultTsBlock.getColumn(5).getInt(0));
+      count++;
+    }
+    assertEquals(1, count);
+  }
+
+  @Ignore
+  @Test
+  public void testMultiAggregationFuncWithoutTimeFilterOrderByTimeDesc()
+      throws IllegalPathException {
+    List<AggregationType> aggregationTypes = new ArrayList<>();
+    aggregationTypes.add(AggregationType.FIRST_VALUE);
+    aggregationTypes.add(AggregationType.LAST_VALUE);
+    aggregationTypes.add(AggregationType.MIN_TIME);
+    aggregationTypes.add(AggregationType.MAX_TIME);
+    aggregationTypes.add(AggregationType.MAX_VALUE);
+    aggregationTypes.add(AggregationType.MIN_VALUE);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, false)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
+    SeriesAggregateScanOperator seriesAggregateScanOperator =
+        initSeriesAggregateScanOperator(aggregators, null, false, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -152,9 +185,13 @@ public class SeriesAggregateScanOperatorTest {
 
   @Test
   public void testAggregationWithTimeFilter1() throws IllegalPathException {
+    List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
     Filter timeFilter = TimeFilter.gtEq(120);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(null, timeFilter, true, null);
+        initSeriesAggregateScanOperator(aggregators, timeFilter, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -167,8 +204,12 @@ public class SeriesAggregateScanOperatorTest {
   @Test
   public void testAggregationWithTimeFilter2() throws IllegalPathException {
     Filter timeFilter = TimeFilter.ltEq(379);
+    List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(null, timeFilter, true, null);
+        initSeriesAggregateScanOperator(aggregators, timeFilter, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -181,8 +222,12 @@ public class SeriesAggregateScanOperatorTest {
   @Test
   public void testAggregationWithTimeFilter3() throws IllegalPathException {
     Filter timeFilter = new AndFilter(TimeFilter.gtEq(100), TimeFilter.ltEq(399));
+    List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(null, timeFilter, true, null);
+        initSeriesAggregateScanOperator(aggregators, timeFilter, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -201,9 +246,12 @@ public class SeriesAggregateScanOperatorTest {
     aggregationTypes.add(AggregationType.MAX_TIME);
     aggregationTypes.add(AggregationType.MAX_VALUE);
     aggregationTypes.add(AggregationType.MIN_VALUE);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
     Filter timeFilter = new AndFilter(TimeFilter.gtEq(100), TimeFilter.ltEq(399));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(null, timeFilter, true, null);
+        initSeriesAggregateScanOperator(aggregators, timeFilter, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -222,8 +270,12 @@ public class SeriesAggregateScanOperatorTest {
   public void testGroupByWithoutGlobalTimeFilter() throws IllegalPathException {
     int[] result = new int[] {100, 100, 100, 100};
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
+    List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter);
+        initSeriesAggregateScanOperator(aggregators, null, true, groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -239,8 +291,12 @@ public class SeriesAggregateScanOperatorTest {
     int[] result = new int[] {0, 80, 100, 80};
     Filter timeFilter = new AndFilter(TimeFilter.gtEq(120), TimeFilter.ltEq(379));
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
+    List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(null, timeFilter, true, groupByTimeParameter);
+        initSeriesAggregateScanOperator(aggregators, timeFilter, true, groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -266,8 +322,11 @@ public class SeriesAggregateScanOperatorTest {
     aggregationTypes.add(AggregationType.MAX_VALUE);
     aggregationTypes.add(AggregationType.MIN_VALUE);
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter);
+        initSeriesAggregateScanOperator(aggregators, null, true, groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -281,12 +340,50 @@ public class SeriesAggregateScanOperatorTest {
     assertEquals(4, count);
   }
 
+  @Ignore
+  @Test
+  public void testGroupByWithMultiFunctionOrderByTimeDesc() throws IllegalPathException {
+    int[][] result =
+        new int[][] {
+          {20000, 20100, 10200, 10300},
+          {20099, 20199, 299, 399},
+          {20099, 20199, 10259, 10379},
+          {20000, 20100, 260, 380}
+        };
+    List<AggregationType> aggregationTypes = new ArrayList<>();
+    aggregationTypes.add(AggregationType.FIRST_VALUE);
+    aggregationTypes.add(AggregationType.LAST_VALUE);
+    aggregationTypes.add(AggregationType.MAX_VALUE);
+    aggregationTypes.add(AggregationType.MIN_VALUE);
+    GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, false)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
+    SeriesAggregateScanOperator seriesAggregateScanOperator =
+        initSeriesAggregateScanOperator(aggregators, null, false, groupByTimeParameter);
+    int count = 0;
+    while (seriesAggregateScanOperator.hasNext()) {
+      TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+      assertEquals(100 * (3 - count), resultTsBlock.getTimeColumn().getLong(0));
+      assertEquals(result[0][3 - count], resultTsBlock.getColumn(0).getInt(0));
+      assertEquals(result[1][3 - count], resultTsBlock.getColumn(1).getInt(0));
+      assertEquals(result[2][3 - count], resultTsBlock.getColumn(2).getInt(0));
+      assertEquals(result[3][3 - count], resultTsBlock.getColumn(3).getInt(0));
+      count++;
+    }
+    assertEquals(4, count);
+  }
+
   @Test
   public void testGroupBySlidingTimeWindow() throws IllegalPathException {
     int[] result = new int[] {50, 50, 50, 50, 50, 50, 50, 50};
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 50, true);
+    List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter);
+        initSeriesAggregateScanOperator(aggregators, null, true, groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -302,8 +399,12 @@ public class SeriesAggregateScanOperatorTest {
     int[] timeColumn = new int[] {0, 20, 30, 50, 60, 80, 90, 110, 120, 140};
     int[] result = new int[] {20, 10, 20, 10, 20, 10, 20, 10, 20, 9};
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 149, 50, 30, true);
+    List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter);
+        initSeriesAggregateScanOperator(aggregators, null, true, groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -330,8 +431,11 @@ public class SeriesAggregateScanOperatorTest {
     aggregationTypes.add(AggregationType.MAX_VALUE);
     aggregationTypes.add(AggregationType.MIN_VALUE);
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 149, 50, 30, true);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true)
+        .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter);
+        initSeriesAggregateScanOperator(aggregators, null, true, groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
index 8251fe9438..938dd4fd60 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
@@ -192,6 +192,7 @@ public abstract class TsPrimitiveType implements Serializable {
     public void setObject(Object val) {
       if (val instanceof Binary) {
         setBinary((Binary) val);
+        return;
       }
       throw new UnSupportedDataTypeException("TsBoolean can only be set Binary value");
     }
@@ -263,6 +264,7 @@ public abstract class TsPrimitiveType implements Serializable {
     public void setObject(Object val) {
       if (val instanceof Integer) {
         setInt((Integer) val);
+        return;
       }
       throw new UnSupportedDataTypeException("TsInt can only be set Integer value");
     }
@@ -334,6 +336,7 @@ public abstract class TsPrimitiveType implements Serializable {
     public void setObject(Object val) {
       if (val instanceof Long) {
         setLong((Long) val);
+        return;
       }
       throw new UnSupportedDataTypeException("TsLong can only be set Long value");
     }
@@ -405,6 +408,7 @@ public abstract class TsPrimitiveType implements Serializable {
     public void setObject(Object val) {
       if (val instanceof Float) {
         setFloat((Float) val);
+        return;
       }
       throw new UnSupportedDataTypeException("TsFloat can only be set float value");
     }
@@ -476,6 +480,7 @@ public abstract class TsPrimitiveType implements Serializable {
     public void setObject(Object val) {
       if (val instanceof Double) {
         setDouble((Double) val);
+        return;
       }
       throw new UnSupportedDataTypeException("TsDouble can only be set Double value");
     }
@@ -547,6 +552,7 @@ public abstract class TsPrimitiveType implements Serializable {
     public void setObject(Object val) {
       if (val instanceof Binary) {
         setBinary((Binary) val);
+        return;
       }
       throw new UnSupportedDataTypeException("TsBinary can only be set Binary value");
     }
@@ -618,6 +624,7 @@ public abstract class TsPrimitiveType implements Serializable {
     public void setObject(Object val) {
       if (val instanceof TsPrimitiveType[]) {
         setVector((TsPrimitiveType[]) val);
+        return;
       }
       throw new UnSupportedDataTypeException("TsVector can only be set TsPrimitiveType[] value");
     }


[iotdb] 01/09: modify seriesAggregateScanOperator using aggregator

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch aggregator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 25b649bd0d0000db00cc29970018da6d8b20b636
Author: Alima777 <wx...@gmail.com>
AuthorDate: Thu Apr 28 17:09:23 2022 +0800

    modify seriesAggregateScanOperator using aggregator
---
 .../Accumulator.java}                              |  45 +--
 .../db/mpp/operator/aggregation/Aggregator.java    |  98 +++++++
 .../AggregatorFactory.java}                        |  45 +--
 .../mpp/operator/aggregation/AvgAccumulator.java   |  86 ++++++
 .../mpp/operator/aggregation/CountAccumulator.java |  84 ++++++
 .../db/mpp/operator/process/AggregateOperator.java |  17 +-
 .../source/SeriesAggregateScanOperator.java        | 111 +++----
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  |   2 +-
 .../sql/planner/plan/parameter/InputLocation.java  |   4 +-
 .../operator/SeriesAggregateScanOperatorTest.java  |   5 +-
 .../iotdb/tsfile/read/common/block/TsBlock.java    |  38 ++-
 .../read/common/block/column/BinaryColumn.java     |   9 +
 .../read/common/block/column/BooleanColumn.java    |   9 +
 .../tsfile/read/common/block/column/Column.java    |   6 +
 .../read/common/block/column/DoubleColumn.java     |   9 +
 .../read/common/block/column/FloatColumn.java      |   8 +
 .../tsfile/read/common/block/column/IntColumn.java |   8 +
 .../read/common/block/column/LongColumn.java       |   8 +
 .../block/column/RunLengthEncodedColumn.java       |   8 +
 .../read/common/block/column/TimeColumn.java       |   8 +
 .../iotdb/tsfile/common/block/TsBlockTest.java     |  49 ++++
 .../iotdb/tsfile/read/common/ColumnTest.java       | 322 +++++++++++++++++++++
 22 files changed, 837 insertions(+), 142 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java
similarity index 52%
copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java
index 0eda188787..f6f268aad1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java
@@ -16,42 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.operator.process;
+package org.apache.iotdb.db.mpp.operator.aggregation;
 
-import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 
-import com.google.common.util.concurrent.ListenableFuture;
+public interface Accumulator {
 
-public class AggregateOperator implements ProcessOperator {
+  // Column should be like: | Time | Value |
+  void addInput(Column[] column, TimeRange timeRange);
 
-  @Override
-  public OperatorContext getOperatorContext() {
-    return null;
-  }
+  void addIntermediate(Column[] partialResult);
 
-  @Override
-  public ListenableFuture<Void> isBlocked() {
-    return ProcessOperator.super.isBlocked();
-  }
+  void addStatistics(Statistics statistics);
 
-  @Override
-  public TsBlock next() {
-    return null;
-  }
+  void setFinal(Column finalResult);
 
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
+  void outputIntermediate(ColumnBuilder[] tsBlockBuilder);
 
-  @Override
-  public void close() throws Exception {
-    ProcessOperator.super.close();
-  }
+  void outputFinal(ColumnBuilder tsBlockBuilder);
 
-  @Override
-  public boolean isFinished() {
-    return false;
-  }
+  void reset();
+
+  boolean hasFinalResult();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
new file mode 100644
index 0000000000..94fe9080cb
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.operator.aggregation;
+
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.AggregationStep;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+import java.util.List;
+
+public class Aggregator {
+
+  private final Accumulator accumulator;
+  private final List<InputLocation> inputLocationList;
+  private final AggregationStep step;
+  private final TSDataType intermediateType;
+  private final TSDataType finalType;
+
+  private TimeRange timeRange;
+
+  public Aggregator(
+      Accumulator accumulator,
+      AggregationStep step,
+      List<InputLocation> inputLocationList,
+      TSDataType intermediateType,
+      TSDataType finalType) {
+    this.accumulator = accumulator;
+    this.step = step;
+    this.inputLocationList = inputLocationList;
+    this.intermediateType = intermediateType;
+    this.finalType = finalType;
+  }
+
+  public void processTsBlock(TsBlock tsBlock) {
+    if (step.isInputRaw()) {
+      accumulator.addInput(tsBlock.getTimeAndValueColumn(0), timeRange);
+    } else {
+      accumulator.addIntermediate(tsBlock.getColumns(new int[] {0}));
+    }
+  }
+
+  public void outputResult(ColumnBuilder[] columnBuilder) {
+    if (step.isOutputPartial()) {
+      accumulator.outputIntermediate(columnBuilder);
+    } else {
+      accumulator.outputFinal(columnBuilder[0]);
+    }
+  }
+
+  public void processStatistics(Statistics statistics) {
+    accumulator.addStatistics(statistics);
+  }
+
+  public TSDataType getOutputType() {
+    if (step.isOutputPartial()) {
+      return intermediateType;
+    } else {
+      return finalType;
+    }
+  }
+
+  public void reset() {
+    accumulator.reset();
+  }
+
+  public boolean hasFinalResult() {
+    return accumulator.hasFinalResult();
+  }
+
+  public void setTimeRange(TimeRange timeRange) {
+    this.timeRange = timeRange;
+  }
+
+  public TimeRange getTimeRange() {
+    return timeRange;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
similarity index 52%
copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
index 0eda188787..ce493b3256 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
@@ -16,42 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.operator.process;
 
-import org.apache.iotdb.db.mpp.operator.OperatorContext;
-import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+package org.apache.iotdb.db.mpp.operator.aggregation;
 
-import com.google.common.util.concurrent.ListenableFuture;
+public class AggregatorFactory {
 
-public class AggregateOperator implements ProcessOperator {
-
-  @Override
-  public OperatorContext getOperatorContext() {
-    return null;
-  }
-
-  @Override
-  public ListenableFuture<Void> isBlocked() {
-    return ProcessOperator.super.isBlocked();
-  }
-
-  @Override
-  public TsBlock next() {
-    return null;
-  }
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public void close() throws Exception {
-    ProcessOperator.super.close();
-  }
-
-  @Override
-  public boolean isFinished() {
-    return false;
+  public static Aggregator createAggregator() {
+    Accumulator accumulator;
+    if (step.isInputRaw()) {
+      accumulator = accumulatorFactory.createAccumulator(lambdaProviders);
+    } else {
+      accumulator = accumulatorFactory.createIntermediateAccumulator(lambdaProviders);
+    }
+    return new Aggregator(
+        accumulator, step, intermediateType, finalType, inputChannels, maskChannel);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AvgAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AvgAccumulator.java
new file mode 100644
index 0000000000..dc993a65c5
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AvgAccumulator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.operator.aggregation;
+
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+public class AvgAccumulator implements Accumulator {
+
+  private TSDataType seriesDataType;
+  private long countValue;
+  private double sumValue;
+
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {}
+
+  @Override
+  public void addIntermediate(Column[] partialResult) {}
+
+  @Override
+  public void addStatistics(Statistics statistics) {}
+
+  @Override
+  public void setFinal(Column finalResult) {}
+
+  @Override
+  public void outputIntermediate(ColumnBuilder[] tsBlockBuilder) {}
+
+  @Override
+  public void outputFinal(ColumnBuilder tsBlockBuilder) {}
+
+  @Override
+  public void reset() {
+    this.countValue = 0;
+    this.sumValue = 0.0;
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+
+  private void updateAvg(TSDataType type, Object sumVal) throws UnSupportedDataTypeException {
+    double val;
+    switch (type) {
+      case INT32:
+        val = (int) sumVal;
+        break;
+      case INT64:
+        val = (long) sumVal;
+        break;
+      case FLOAT:
+        val = (float) sumVal;
+        break;
+      case DOUBLE:
+        val = (double) sumVal;
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in aggregation AVG : %s", type));
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
new file mode 100644
index 0000000000..db266b55e4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.operator.aggregation;
+
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+
+public class CountAccumulator implements Accumulator {
+
+  private long countValue = 0;
+
+  // Column should be like: | Time | Value |
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    TimeColumn timeColumn = (TimeColumn) column[0];
+    for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+      long curTime = timeColumn.getLong(i);
+      if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+        break;
+      }
+      countValue++;
+    }
+  }
+
+  // partialResult should be like: | partialCountValue1 | partialCountValue2 |
+  @Override
+  public void addIntermediate(Column[] partialResult) {
+    for (int i = 0; i < partialResult.length; i++) {
+      countValue += partialResult[i].getLong(0);
+    }
+  }
+
+  @Override
+  public void addStatistics(Statistics statistics) {
+    countValue += statistics.getCount();
+  }
+
+  // finalResult should be single column, like: | finalCountValue |
+  @Override
+  public void setFinal(Column finalResult) {
+    countValue = finalResult.getLong(0);
+  }
+
+  // columnBuilder should be single in countAccumulator
+  @Override
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    columnBuilders[0].writeLong(countValue);
+  }
+
+  @Override
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    columnBuilder.writeLong(countValue);
+  }
+
+  @Override
+  public void reset() {
+    this.countValue = 0;
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
index 0eda188787..e0cd04a5a4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/AggregateOperator.java
@@ -18,16 +18,31 @@
  */
 package org.apache.iotdb.db.mpp.operator.process;
 
+import org.apache.iotdb.db.mpp.operator.Operator;
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.operator.aggregation.Aggregator;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import java.util.List;
+
 public class AggregateOperator implements ProcessOperator {
 
+  private final OperatorContext operatorContext;
+  private final List<Aggregator> aggregators;
+  private final List<Operator> children;
+
+  public AggregateOperator(
+      OperatorContext operatorContext, List<Aggregator> aggregators, List<Operator> children) {
+    this.operatorContext = operatorContext;
+    this.aggregators = aggregators;
+    this.children = children;
+  }
+
   @Override
   public OperatorContext getOperatorContext() {
-    return null;
+    return operatorContext;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
index b94fadccc1..8966b1f57b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
@@ -19,15 +19,11 @@
 package org.apache.iotdb.db.mpp.operator.source;
 
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.operator.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.GroupByTimeParameter;
-import org.apache.iotdb.db.query.aggregation.AggregateResult;
-import org.apache.iotdb.db.query.aggregation.AggregationType;
-import org.apache.iotdb.db.query.factory.AggregateResultFactory;
-import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.utils.timerangeiterator.SingleTimeWindowIterator;
 import org.apache.iotdb.db.utils.timerangeiterator.TimeRangeIteratorFactory;
@@ -43,7 +39,6 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import com.google.common.util.concurrent.ListenableFuture;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -62,15 +57,15 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
   private final PlanNodeId sourceId;
   private final SeriesScanUtil seriesScanUtil;
   private final boolean ascending;
-  private List<AggregateResult> aggregateResultList;
+  // We still think aggregator in SeriesAggregateScanOperator is a inputRaw step.
+  // But in facing of statistics, it will invoke another method processStatistics()
+  private List<Aggregator> aggregators;
 
   private ITimeRangeIterator timeRangeIterator;
   // current interval of aggregation window [curStartTime, curEndTime)
   private TimeRange curTimeRange;
 
-  private TsBlockSingleColumnIterator preCachedData;
-  // used for resetting the preCachedData to the last read index
-  private int lastReadIndex;
+  private TsBlock preCachedData;
 
   private TsBlockBuilder tsBlockBuilder;
   private TsBlock resultTsBlock;
@@ -82,7 +77,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
       PartialPath seriesPath,
       Set<String> allSensors,
       OperatorContext context,
-      List<AggregationType> aggregateFuncList,
+      List<Aggregator> aggregators,
       Filter timeFilter,
       boolean ascending,
       GroupByTimeParameter groupByTimeParameter) {
@@ -98,21 +93,10 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
             timeFilter,
             null,
             ascending);
-    aggregateResultList = new ArrayList<>(aggregateFuncList.size());
-    for (AggregationType aggregationType : aggregateFuncList) {
-      aggregateResultList.add(
-          AggregateResultFactory.getAggrResultByType(
-              aggregationType,
-              seriesPath.getSeriesType(),
-              seriesScanUtil.getOrderUtils().getAscending()));
-    }
+    this.aggregators = aggregators;
     tsBlockBuilder =
         new TsBlockBuilder(
-            aggregateFuncList.stream()
-                .map(
-                    functionType ->
-                        SchemaUtils.getSeriesTypeByPath(seriesPath, functionType.name()))
-                .collect(Collectors.toList()));
+            aggregators.stream().map(Aggregator::getOutputType).collect(Collectors.toList()));
     this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter);
   }
 
@@ -169,8 +153,9 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
       curTimeRange = timeRangeIterator.nextTimeRange();
 
       // 1. Clear previous aggregation result
-      for (AggregateResult result : aggregateResultList) {
-        result.reset();
+      for (Aggregator aggregator : aggregators) {
+        aggregator.reset();
+        aggregator.setTimeRange(curTimeRange);
       }
 
       // 2. Calculate aggregation result based on current time window
@@ -226,14 +211,15 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
   }
 
   private void updateResultTsBlockUsingAggregateResult() {
-    // TODO AVG
     tsBlockBuilder.reset();
     TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder();
     // Use start time of current time range as time column
     timeColumnBuilder.writeLong(curTimeRange.getMin());
     ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
-    for (int i = 0; i < aggregateResultList.size(); i++) {
-      columnBuilders[i].writeObject(aggregateResultList.get(i).getResult());
+    for (int i = 0; i < aggregators.size(); i++) {
+      ColumnBuilder[] columnBuilder = new ColumnBuilder[1];
+      columnBuilder[0] = columnBuilders[i];
+      aggregators.get(i).outputResult(columnBuilder);
     }
     tsBlockBuilder.declarePosition();
     resultTsBlock = tsBlockBuilder.build();
@@ -273,41 +259,33 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
   }
 
   @SuppressWarnings("squid:S3776")
-  private void calcFromBatch(TsBlockSingleColumnIterator blockIterator, TimeRange curTimeRange)
-      throws IOException {
+  private void calcFromBatch(TsBlock tsBlock, TimeRange curTimeRange) {
     // check if the batchData does not contain points in current interval
-    if (!satisfied(blockIterator, curTimeRange)) {
+    if (!satisfied(tsBlock, curTimeRange)) {
       return;
     }
 
-    for (AggregateResult result : aggregateResultList) {
+    // skip points that cannot be calculated
+    tsBlock = skipOutOfTimeRangePoints(tsBlock, curTimeRange);
+
+    for (Aggregator aggregator : aggregators) {
       // current agg method has been calculated
-      if (result.hasFinalResult()) {
+      if (aggregator.hasFinalResult()) {
         continue;
       }
-      // lazy reset batch data for calculation
-      blockIterator.setRowIndex(lastReadIndex);
-      // skip points that cannot be calculated
-      skipOutOfTimeRangePoints(blockIterator, curTimeRange);
-
-      if (blockIterator.hasNext()) {
-        result.updateResultFromPageData(
-            blockIterator, curTimeRange.getMin(), curTimeRange.getMax());
-      }
-    }
 
-    // reset the last position to current Index
-    lastReadIndex = blockIterator.getRowIndex();
+      aggregator.processTsBlock(tsBlock);
+    }
 
     // can calc for next interval
-    if (blockIterator.hasNext()) {
-      preCachedData = blockIterator;
+    if (tsBlock.getTsBlockSingleColumnIterator().hasNext()) {
+      preCachedData = tsBlock;
     }
   }
 
   // skip points that cannot be calculated
-  private void skipOutOfTimeRangePoints(
-      TsBlockSingleColumnIterator tsBlockIterator, TimeRange curTimeRange) {
+  private TsBlock skipOutOfTimeRangePoints(TsBlock tsBlock, TimeRange curTimeRange) {
+    TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator();
     if (ascending) {
       while (tsBlockIterator.hasNext() && tsBlockIterator.currentTime() < curTimeRange.getMin()) {
         tsBlockIterator.next();
@@ -317,9 +295,11 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
         tsBlockIterator.next();
       }
     }
+    return tsBlock.subTsBlock(tsBlockIterator.getRowIndex());
   }
 
-  private boolean satisfied(TsBlockSingleColumnIterator tsBlockIterator, TimeRange timeRange) {
+  private boolean satisfied(TsBlock tsBlock, TimeRange timeRange) {
+    TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator();
     if (tsBlockIterator == null || !tsBlockIterator.hasNext()) {
       return false;
     }
@@ -332,15 +312,15 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
     if (!ascending
         && (tsBlockIterator.getStartTime() >= timeRange.getMax()
             || tsBlockIterator.currentTime() < timeRange.getMin())) {
-      preCachedData = tsBlockIterator;
+      preCachedData = tsBlock;
       return false;
     }
     return true;
   }
 
   private boolean isEndCalc() {
-    for (AggregateResult result : aggregateResultList) {
-      if (!result.hasFinalResult()) {
+    for (Aggregator aggregator : aggregators) {
+      if (!aggregator.hasFinalResult()) {
         return false;
       }
     }
@@ -375,23 +355,23 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
       }
 
       // calc from page data
-      TsBlockSingleColumnIterator tsBlockIterator =
-          seriesScanUtil.nextPage().getTsBlockSingleColumnIterator();
+      TsBlock tsBlock = seriesScanUtil.nextPage();
+      TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator();
       if (tsBlockIterator == null || !tsBlockIterator.hasNext()) {
         continue;
       }
 
       // reset the last position to current Index
-      lastReadIndex = tsBlockIterator.getRowIndex();
+      // lastReadIndex = tsBlockIterator.getRowIndex();
 
       // stop calc and cached current batchData
       if (ascending && tsBlockIterator.currentTime() >= curTimeRange.getMax()) {
-        preCachedData = tsBlockIterator;
+        preCachedData = tsBlock;
         return true;
       }
 
       // calc from batch data
-      calcFromBatch(tsBlockIterator, curTimeRange);
+      calcFromBatch(tsBlock, curTimeRange);
 
       // judge whether the calculation finished
       if (isEndCalc()
@@ -432,15 +412,12 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
   }
 
   private void calcFromStatistics(Statistics statistics) {
-    try {
-      for (AggregateResult result : aggregateResultList) {
-        if (result.hasFinalResult()) {
-          continue;
-        }
-        result.updateResultFromStatistics(statistics);
+    for (int i = 0; i < aggregators.size(); i++) {
+      Aggregator aggregator = aggregators.get(i);
+      if (aggregator.hasFinalResult()) {
+        continue;
       }
-    } catch (QueryProcessException e) {
-      throw new RuntimeException("Error while updating result using statistics", e);
+      aggregator.processStatistics(statistics);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index cfa593146a..bee6e9f5fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -232,7 +232,7 @@ public class LocalExecutionPlanner {
               seriesPath,
               node.getAllSensors(),
               operatorContext,
-              node.getAggregateFuncList(),
+              null,
               node.getTimeFilter(),
               ascending,
               node.getGroupByTimeParameter());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/InputLocation.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/InputLocation.java
index fda0f4a957..97d7bb591a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/InputLocation.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/InputLocation.java
@@ -24,9 +24,9 @@ import java.nio.ByteBuffer;
 import java.util.Objects;
 
 public class InputLocation {
-  // which input tsblock
+  // which input tsBlock
   private final int tsBlockIndex;
-  // which value column of that tsblock
+  // which value column of that tsBlock
   private final int valueColumnIndex;
 
   public InputLocation(int tsBlockIndex, int valueColumnIndex) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
index 09d56b7211..bfaced0670 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.operator.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.operator.source.SeriesAggregateScanOperator;
 import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
@@ -346,7 +347,7 @@ public class SeriesAggregateScanOperatorTest {
   }
 
   public SeriesAggregateScanOperator initSeriesAggregateScanOperator(
-      List<AggregationType> aggregateFuncList,
+      List<Aggregator> aggregators,
       Filter timeFilter,
       boolean ascending,
       GroupByTimeParameter groupByTimeParameter)
@@ -373,7 +374,7 @@ public class SeriesAggregateScanOperatorTest {
             measurementPath,
             allSensors,
             fragmentInstanceContext.getOperatorContexts().get(0),
-            aggregateFuncList,
+            aggregators,
             timeFilter,
             ascending,
             groupByTimeParameter);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
index f6d4b68c4d..008db2dde3 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
@@ -92,12 +92,6 @@ public class TsBlock {
     }
   }
 
-  public boolean hasNext() {
-    return false;
-  }
-
-  public void next() {}
-
   public int getPositionCount() {
     return positionCount;
   }
@@ -170,6 +164,23 @@ public class TsBlock {
     return wrapBlocksWithoutCopy(positionCount, timeColumn, newBlocks);
   }
 
+  /**
+   * This method will create a temporary view of origin tsBlock, which will reuse the arrays of
+   * columns but with different offset. It can be used where you want to skip some points when
+   * getting iterator.
+   */
+  public TsBlock subTsBlock(int fromIndex) {
+    if (fromIndex > positionCount) {
+      throw new IllegalArgumentException("FromIndex of subTsBlock cannot over positionCount.");
+    }
+    TimeColumn subTimeColumn = (TimeColumn) timeColumn.subColumn(fromIndex);
+    Column[] subValueColumns = new Column[valueColumns.length];
+    for (int i = 0; i < subValueColumns.length; i++) {
+      subValueColumns[i] = valueColumns[i].subColumn(fromIndex);
+    }
+    return new TsBlock(subTimeColumn, subValueColumns);
+  }
+
   public long getTimeByIndex(int index) {
     return timeColumn.getLong(index);
   }
@@ -186,6 +197,21 @@ public class TsBlock {
     return valueColumns[columnIndex];
   }
 
+  public Column[] getTimeAndValueColumn(int columnIndex) {
+    Column[] columns = new Column[2];
+    columns[0] = getTimeColumn();
+    columns[1] = getColumn(columnIndex);
+    return columns;
+  }
+
+  public Column[] getColumns(int[] columnIndexes) {
+    Column[] columns = new Column[columnIndexes.length];
+    for (int i = 0; i < columnIndexes.length; i++) {
+      columns[i] = valueColumns[columnIndexes[i]];
+    }
+    return columns;
+  }
+
   public TsBlockSingleColumnIterator getTsBlockSingleColumnIterator() {
     return new TsBlockSingleColumnIterator(0);
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
index 4d7a888394..f9a5497992 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
@@ -123,6 +123,15 @@ public class BinaryColumn implements Column {
     return new BinaryColumn(positionOffset + arrayOffset, length, valueIsNull, values);
   }
 
+  @Override
+  public Column subColumn(int fromIndex) {
+    if (fromIndex > positionCount) {
+      throw new IllegalArgumentException("fromIndex is not valid");
+    }
+    return new BinaryColumn(
+        arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values);
+  }
+
   private void checkReadablePosition(int position) {
     if (position < 0 || position >= getPositionCount()) {
       throw new IllegalArgumentException("position is not valid");
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
index 218ce1baf8..1166ccfa4c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
@@ -122,6 +122,15 @@ public class BooleanColumn implements Column {
     return new BooleanColumn(positionOffset + arrayOffset, length, valueIsNull, values);
   }
 
+  @Override
+  public Column subColumn(int fromIndex) {
+    if (fromIndex > positionCount) {
+      throw new IllegalArgumentException("fromIndex is not valid");
+    }
+    return new BooleanColumn(
+        arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values);
+  }
+
   private void checkReadablePosition(int position) {
     if (position < 0 || position >= getPositionCount()) {
       throw new IllegalArgumentException("position is not valid");
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
index ef9fb7d637..446b6a83fe 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
@@ -102,4 +102,10 @@ public interface Column {
    * also be released. If the region column is released, this block may also be released.
    */
   Column getRegion(int positionOffset, int length);
+
+  /**
+   * This method will create a temporary view of origin column, which will reuse the array of column
+   * but with different array offset.
+   */
+  Column subColumn(int fromIndex);
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
index 13faf135fb..1da44fe212 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
@@ -122,6 +122,15 @@ public class DoubleColumn implements Column {
     return new DoubleColumn(positionOffset + arrayOffset, length, valueIsNull, values);
   }
 
+  @Override
+  public Column subColumn(int fromIndex) {
+    if (fromIndex > positionCount) {
+      throw new IllegalArgumentException("fromIndex is not valid");
+    }
+    return new DoubleColumn(
+        arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values);
+  }
+
   private void checkReadablePosition(int position) {
     if (position < 0 || position >= getPositionCount()) {
       throw new IllegalArgumentException("position is not valid");
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
index 08762164fb..49d73c3156 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
@@ -121,6 +121,14 @@ public class FloatColumn implements Column {
     return new FloatColumn(positionOffset + arrayOffset, length, valueIsNull, values);
   }
 
+  @Override
+  public Column subColumn(int fromIndex) {
+    if (fromIndex > positionCount) {
+      throw new IllegalArgumentException("fromIndex is not valid");
+    }
+    return new FloatColumn(arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values);
+  }
+
   private void checkReadablePosition(int position) {
     if (position < 0 || position >= getPositionCount()) {
       throw new IllegalArgumentException("position is not valid");
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
index 7e8d67f1b3..cfad52184d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
@@ -121,6 +121,14 @@ public class IntColumn implements Column {
     return new IntColumn(positionOffset + arrayOffset, length, valueIsNull, values);
   }
 
+  @Override
+  public Column subColumn(int fromIndex) {
+    if (fromIndex > positionCount) {
+      throw new IllegalArgumentException("fromIndex is not valid");
+    }
+    return new IntColumn(arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values);
+  }
+
   private void checkReadablePosition(int position) {
     if (position < 0 || position >= getPositionCount()) {
       throw new IllegalArgumentException("position is not valid");
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
index a786918af8..9b89a09233 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
@@ -121,6 +121,14 @@ public class LongColumn implements Column {
     return new LongColumn(positionOffset + arrayOffset, length, valueIsNull, values);
   }
 
+  @Override
+  public Column subColumn(int fromIndex) {
+    if (fromIndex > positionCount) {
+      throw new IllegalArgumentException("fromIndex is not valid");
+    }
+    return new LongColumn(arrayOffset + fromIndex, positionCount - fromIndex, valueIsNull, values);
+  }
+
   private void checkReadablePosition(int position) {
     if (position < 0 || position >= getPositionCount()) {
       throw new IllegalArgumentException("position is not valid");
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
index 283c374a99..c55d2e5686 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
@@ -147,6 +147,14 @@ public class RunLengthEncodedColumn implements Column {
     return new RunLengthEncodedColumn(value, length);
   }
 
+  @Override
+  public Column subColumn(int fromIndex) {
+    if (fromIndex > positionCount) {
+      throw new IllegalArgumentException("fromIndex is not valid");
+    }
+    return new RunLengthEncodedColumn(value, positionCount - fromIndex);
+  }
+
   private void checkReadablePosition(int position) {
     if (position < 0 || position >= positionCount) {
       throw new IllegalArgumentException("position is not valid");
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
index d8b44fd384..87164cf6d7 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
@@ -104,6 +104,14 @@ public class TimeColumn implements Column {
     return new TimeColumn(positionOffset + arrayOffset, length, values);
   }
 
+  @Override
+  public Column subColumn(int fromIndex) {
+    if (fromIndex > positionCount) {
+      throw new IllegalArgumentException("fromIndex is not valid");
+    }
+    return new TimeColumn(arrayOffset + fromIndex, positionCount - fromIndex, values);
+  }
+
   public long getStartTime() {
     return values[arrayOffset];
   }
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/TsBlockTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/TsBlockTest.java
index 75d28596e9..669d3a41ad 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/TsBlockTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/common/block/TsBlockTest.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.tsfile.common.block;
 
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
@@ -30,6 +31,7 @@ import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
 import org.apache.iotdb.tsfile.utils.Binary;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -322,4 +324,51 @@ public class TsBlockTest {
       }
     }
   }
+
+  @Test
+  public void testSubTsBlock() {
+    TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT32));
+    for (int i = 0; i < 10; i++) {
+      builder.getTimeColumnBuilder().writeLong(i);
+      builder.getColumnBuilder(0).writeInt(i);
+      builder.declarePosition();
+    }
+    TsBlock tsBlock = builder.build();
+    TsBlockSingleColumnIterator iterator = tsBlock.getTsBlockSingleColumnIterator();
+    int index = 0;
+    while (iterator.hasNext()) {
+      Assert.assertEquals(index, iterator.currentTime());
+      Assert.assertEquals(index, iterator.currentValue());
+      iterator.next();
+      index++;
+    }
+    // get subTsBlock from TsBlock, offset = 3
+    int offset = 3;
+    TsBlock subTsBlock = tsBlock.subTsBlock(offset);
+    iterator = subTsBlock.getTsBlockSingleColumnIterator();
+    index = offset;
+    while (iterator.hasNext()) {
+      Assert.assertEquals(index, iterator.currentTime());
+      Assert.assertEquals(index, iterator.currentValue());
+      iterator.next();
+      index++;
+    }
+    // get subSubTsBlock from subTsBlock, offset = 2
+    int nextOffset = 2;
+    TsBlock subSubTsBlock = subTsBlock.subTsBlock(nextOffset);
+    iterator = subSubTsBlock.getTsBlockSingleColumnIterator();
+    index = offset + nextOffset;
+    while (iterator.hasNext()) {
+      Assert.assertEquals(index, iterator.currentTime());
+      Assert.assertEquals(index, iterator.currentValue());
+      iterator.next();
+      index++;
+    }
+    try {
+      subSubTsBlock.subTsBlock(3);
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(
+          e.getMessage().contains("FromIndex of subTsBlock cannot over positionCount."));
+    }
+  }
 }
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/common/ColumnTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/common/ColumnTest.java
new file mode 100644
index 0000000000..f21df099f7
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/common/ColumnTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.read.common;
+
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.FloatColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ColumnTest {
+
+  @Test
+  public void timeColumnSubColumnTest() {
+    TimeColumnBuilder columnBuilder = new TimeColumnBuilder(null, 10);
+    for (int i = 0; i < 10; i++) {
+      columnBuilder.writeLong(i);
+    }
+    TimeColumn timeColumn = (TimeColumn) columnBuilder.build();
+    timeColumn = (TimeColumn) timeColumn.subColumn(5);
+    Assert.assertEquals(5, timeColumn.getPositionCount());
+    Assert.assertEquals(5, timeColumn.getLong(0));
+    Assert.assertEquals(9, timeColumn.getLong(4));
+    try {
+      timeColumn.getLong(5);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    timeColumn = (TimeColumn) timeColumn.subColumn(3);
+    Assert.assertEquals(2, timeColumn.getPositionCount());
+    Assert.assertEquals(8, timeColumn.getLong(0));
+    Assert.assertEquals(9, timeColumn.getLong(1));
+    try {
+      timeColumn.getLong(2);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    try {
+      timeColumn.subColumn(3);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+    }
+  }
+
+  @Test
+  public void binaryColumnSubColumnTest() {
+    BinaryColumnBuilder columnBuilder = new BinaryColumnBuilder(null, 10);
+    for (int i = 0; i < 10; i++) {
+      columnBuilder.writeBinary(Binary.valueOf(String.valueOf(i)));
+    }
+    BinaryColumn binaryColumn = (BinaryColumn) columnBuilder.build();
+    binaryColumn = (BinaryColumn) binaryColumn.subColumn(5);
+    Assert.assertEquals(5, binaryColumn.getPositionCount());
+    Assert.assertEquals("5", binaryColumn.getBinary(0).toString());
+    Assert.assertEquals("9", binaryColumn.getBinary(4).toString());
+    try {
+      binaryColumn.getBinary(5);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    binaryColumn = (BinaryColumn) binaryColumn.subColumn(3);
+    Assert.assertEquals(2, binaryColumn.getPositionCount());
+    Assert.assertEquals("8", binaryColumn.getBinary(0).toString());
+    Assert.assertEquals("9", binaryColumn.getBinary(1).toString());
+    try {
+      binaryColumn.getBinary(2);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    try {
+      binaryColumn.subColumn(3);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+    }
+  }
+
+  @Test
+  public void booleanColumnSubColumnTest() {
+    BooleanColumnBuilder columnBuilder = new BooleanColumnBuilder(null, 10);
+    // 0: true, 1: false
+    for (int i = 0; i < 10; i++) {
+      columnBuilder.writeBoolean(i % 2 == 0);
+    }
+    BooleanColumn booleanColumn = (BooleanColumn) columnBuilder.build();
+    booleanColumn = (BooleanColumn) booleanColumn.subColumn(5);
+    Assert.assertEquals(5, booleanColumn.getPositionCount());
+    Assert.assertFalse(booleanColumn.getBoolean(0));
+    Assert.assertFalse(booleanColumn.getBoolean(4));
+    try {
+      booleanColumn.getBoolean(5);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    booleanColumn = (BooleanColumn) booleanColumn.subColumn(3);
+    Assert.assertEquals(2, booleanColumn.getPositionCount());
+    Assert.assertTrue(booleanColumn.getBoolean(0));
+    Assert.assertFalse(booleanColumn.getBoolean(1));
+    try {
+      booleanColumn.getBoolean(2);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    try {
+      booleanColumn.subColumn(3);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+    }
+  }
+
+  @Test
+  public void doubleColumnSubColumnTest() {
+    DoubleColumnBuilder columnBuilder = new DoubleColumnBuilder(null, 10);
+    for (int i = 0; i < 10; i++) {
+      columnBuilder.writeDouble(i);
+    }
+    DoubleColumn doubleColumn = (DoubleColumn) columnBuilder.build();
+    doubleColumn = (DoubleColumn) doubleColumn.subColumn(5);
+    Assert.assertEquals(5, doubleColumn.getPositionCount());
+    Assert.assertEquals(5.0, doubleColumn.getDouble(0), 0.001);
+    Assert.assertEquals(9.0, doubleColumn.getDouble(4), 0.001);
+    try {
+      doubleColumn.getDouble(5);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    doubleColumn = (DoubleColumn) doubleColumn.subColumn(3);
+    Assert.assertEquals(2, doubleColumn.getPositionCount());
+    Assert.assertEquals(8.0, doubleColumn.getDouble(0), 0.001);
+    Assert.assertEquals(9.0, doubleColumn.getDouble(1), 0.001);
+    try {
+      doubleColumn.getDouble(2);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    try {
+      doubleColumn.subColumn(3);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+    }
+  }
+
+  @Test
+  public void floatColumnSubColumnTest() {
+    FloatColumnBuilder columnBuilder = new FloatColumnBuilder(null, 10);
+    for (int i = 0; i < 10; i++) {
+      columnBuilder.writeFloat(i);
+    }
+    FloatColumn floatColumn = (FloatColumn) columnBuilder.build();
+    floatColumn = (FloatColumn) floatColumn.subColumn(5);
+    Assert.assertEquals(5, floatColumn.getPositionCount());
+    Assert.assertEquals(5.0, floatColumn.getFloat(0), 0.001);
+    Assert.assertEquals(9.0, floatColumn.getFloat(4), 0.001);
+    try {
+      floatColumn.getFloat(5);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    floatColumn = (FloatColumn) floatColumn.subColumn(3);
+    Assert.assertEquals(2, floatColumn.getPositionCount());
+    Assert.assertEquals(8.0, floatColumn.getFloat(0), 0.001);
+    Assert.assertEquals(9.0, floatColumn.getFloat(1), 0.001);
+    try {
+      floatColumn.getFloat(2);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    try {
+      floatColumn.subColumn(3);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+    }
+  }
+
+  @Test
+  public void intColumnSubColumnTest() {
+    IntColumnBuilder columnBuilder = new IntColumnBuilder(null, 10);
+    for (int i = 0; i < 10; i++) {
+      columnBuilder.writeInt(i);
+    }
+    IntColumn intColumn = (IntColumn) columnBuilder.build();
+    intColumn = (IntColumn) intColumn.subColumn(5);
+    Assert.assertEquals(5, intColumn.getPositionCount());
+    Assert.assertEquals(5, intColumn.getInt(0));
+    Assert.assertEquals(9, intColumn.getInt(4));
+    try {
+      intColumn.getInt(5);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    intColumn = (IntColumn) intColumn.subColumn(3);
+    Assert.assertEquals(2, intColumn.getPositionCount());
+    Assert.assertEquals(8, intColumn.getInt(0));
+    Assert.assertEquals(9, intColumn.getInt(1));
+    try {
+      intColumn.getInt(2);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    try {
+      intColumn.subColumn(3);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+    }
+  }
+
+  @Test
+  public void longColumnSubColumnTest() {
+    LongColumnBuilder columnBuilder = new LongColumnBuilder(null, 10);
+    for (int i = 0; i < 10; i++) {
+      columnBuilder.writeLong(i);
+    }
+    LongColumn longColumn = (LongColumn) columnBuilder.build();
+    longColumn = (LongColumn) longColumn.subColumn(5);
+    Assert.assertEquals(5, longColumn.getPositionCount());
+    Assert.assertEquals(5, longColumn.getLong(0));
+    Assert.assertEquals(9, longColumn.getLong(4));
+    try {
+      longColumn.getLong(5);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    longColumn = (LongColumn) longColumn.subColumn(3);
+    Assert.assertEquals(2, longColumn.getPositionCount());
+    Assert.assertEquals(8, longColumn.getLong(0));
+    Assert.assertEquals(9, longColumn.getLong(1));
+    try {
+      longColumn.getLong(2);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    try {
+      longColumn.subColumn(3);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+    }
+  }
+
+  @Test
+  public void runLengthEncodedColumnSubColumnTest() {
+    LongColumnBuilder longColumnBuilder = new LongColumnBuilder(null, 1);
+    longColumnBuilder.writeLong(1);
+    RunLengthEncodedColumn column = new RunLengthEncodedColumn(longColumnBuilder.build(), 10);
+    column = (RunLengthEncodedColumn) column.subColumn(5);
+    Assert.assertEquals(5, column.getPositionCount());
+    Assert.assertEquals(1, column.getLong(0));
+    Assert.assertEquals(1, column.getLong(4));
+    try {
+      column.getLong(5);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    column = (RunLengthEncodedColumn) column.subColumn(3);
+    Assert.assertEquals(2, column.getPositionCount());
+    Assert.assertEquals(1, column.getLong(0));
+    Assert.assertEquals(1, column.getLong(1));
+    try {
+      column.getLong(2);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("position is not valid"));
+    }
+    try {
+      column.subColumn(3);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("fromIndex is not valid"));
+    }
+  }
+}


[iotdb] 05/09: implement part accumulator

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch aggregator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1065953784e6c2ed8aabf10ef255f2a0efb05394
Author: Alima777 <wx...@gmail.com>
AuthorDate: Fri Apr 29 17:00:09 2022 +0800

    implement part accumulator
---
 .../db/mpp/operator/aggregation/Accumulator.java   |  27 +++-
 ...regatorFactory.java => AccumulatorFactory.java} |  26 +++-
 .../db/mpp/operator/aggregation/Aggregator.java    |  18 +--
 .../mpp/operator/aggregation/AvgAccumulator.java   |  76 +++++++++--
 .../mpp/operator/aggregation/CountAccumulator.java |  20 ++-
 .../operator/aggregation/ExtremeAccumulator.java   | 142 +++++++++++++++++++++
 ...atorFactory.java => FirstValueAccumulator.java} |   5 +-
 ...gatorFactory.java => LastValueAccumulator.java} |   5 +-
 ...regatorFactory.java => MaxTimeAccumulator.java} |   5 +-
 ...ntAccumulator.java => MaxValueAccumulator.java} |  52 ++++++--
 ...regatorFactory.java => MinTimeAccumulator.java} |   5 +-
 ...ntAccumulator.java => MinValueAccumulator.java} |  54 ++++++--
 .../{CountAccumulator.java => SumAccumulator.java} |  70 ++++++++--
 .../source/SeriesAggregateScanOperator.java        |  26 ++--
 .../plan/parameter/AggregationDescriptor.java      |   4 +
 .../operator/SeriesAggregateScanOperatorTest.java  |  45 ++++---
 .../apache/iotdb/tsfile/utils/TsPrimitiveType.java | 139 +++++++++++++++++++-
 17 files changed, 606 insertions(+), 113 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java
index f6f268aad1..5833a75959 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.operator.aggregation;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -25,20 +26,44 @@ import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 
 public interface Accumulator {
 
-  // Column should be like: | Time | Value |
+  /** Column should be like: | Time | Value | */
   void addInput(Column[] column, TimeRange timeRange);
 
+  /**
+   * For aggregation function like COUNT, SUM, partialResult should be single; But for AVG,
+   * last_value, it should be double column with dictionary order.
+   */
   void addIntermediate(Column[] partialResult);
 
+  /**
+   * This method can only be used in seriesAggregateScanOperator, it will use different statistics
+   * based on the type of Accumulator.
+   */
   void addStatistics(Statistics statistics);
 
+  /**
+   * Attention: setFinal should be invoked only once, and addInput() and addIntermediate() are not
+   * allowed again.
+   */
   void setFinal(Column finalResult);
 
+  /**
+   * For aggregation function like COUNT, SUM, partialResult should be single, so its output column
+   * is single too; But for AVG, last_value, it should be double column with dictionary order.
+   */
   void outputIntermediate(ColumnBuilder[] tsBlockBuilder);
 
+  /** Final result is single column for any aggregation function. */
   void outputFinal(ColumnBuilder tsBlockBuilder);
 
   void reset();
 
+  /**
+   * For first_value or last_value in decreasing order, we can get final result by the first record.
+   */
   boolean hasFinalResult();
+
+  TSDataType[] getIntermediateType();
+
+  TSDataType getFinalType();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java
similarity index 57%
copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java
index fb49e8c8c1..1eb421c4e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java
@@ -19,4 +19,28 @@
 
 package org.apache.iotdb.db.mpp.operator.aggregation;
 
-public class AggregatorFactory {}
+import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class AccumulatorFactory {
+
+  public static Accumulator createAccumulator(
+      AggregationType aggregationType, TSDataType tsDataType) {
+    switch (aggregationType) {
+      case COUNT:
+        return new CountAccumulator();
+      case AVG:
+        return new AvgAccumulator(tsDataType);
+      case SUM:
+      case EXTREME:
+      case MAX_TIME:
+      case MIN_TIME:
+      case MAX_VALUE:
+      case MIN_VALUE:
+      case LAST_VALUE:
+      case FIRST_VALUE:
+      default:
+        return null;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
index 8a05f460c5..a808c734bc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
@@ -36,22 +36,14 @@ public class Aggregator {
   // In some intermediate result input, inputLocation[] should include two columns
   private final List<InputLocation[]> inputLocationList;
   private final AggregationStep step;
-  private final TSDataType intermediateType;
-  private final TSDataType finalType;
 
-  private TimeRange timeRange;
+  private TimeRange timeRange = new TimeRange(0, Long.MAX_VALUE);
 
   public Aggregator(
-      Accumulator accumulator,
-      AggregationStep step,
-      List<InputLocation[]> inputLocationList,
-      TSDataType intermediateType,
-      TSDataType finalType) {
+      Accumulator accumulator, AggregationStep step, List<InputLocation[]> inputLocationList) {
     this.accumulator = accumulator;
     this.step = step;
     this.inputLocationList = inputLocationList;
-    this.intermediateType = intermediateType;
-    this.finalType = finalType;
   }
 
   // Used for SeriesAggregateScanOperator
@@ -96,11 +88,11 @@ public class Aggregator {
     accumulator.addStatistics(statistics);
   }
 
-  public TSDataType getOutputType() {
+  public TSDataType[] getOutputType() {
     if (step.isOutputPartial()) {
-      return intermediateType;
+      return new TSDataType[] {accumulator.getFinalType()};
     } else {
-      return finalType;
+      return accumulator.getIntermediateType();
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AvgAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AvgAccumulator.java
index dc993a65c5..0898eefcb4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AvgAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AvgAccumulator.java
@@ -21,10 +21,12 @@ package org.apache.iotdb.db.mpp.operator.aggregation;
 
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
 
 public class AvgAccumulator implements Accumulator {
 
@@ -32,23 +34,60 @@ public class AvgAccumulator implements Accumulator {
   private long countValue;
   private double sumValue;
 
+  public AvgAccumulator(TSDataType seriesDataType) {
+    this.seriesDataType = seriesDataType;
+  }
+
   @Override
-  public void addInput(Column[] column, TimeRange timeRange) {}
+  public void addInput(Column[] column, TimeRange timeRange) {
+    TimeColumn timeColumn = (TimeColumn) column[0];
+    for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+      long curTime = timeColumn.getLong(i);
+      if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+        break;
+      }
+      countValue++;
+      updateSumValue(column[1].getObject(i));
+    }
+  }
 
+  // partialResult should be like: | countValue1 | sumValue1 |
   @Override
-  public void addIntermediate(Column[] partialResult) {}
+  public void addIntermediate(Column[] partialResult) {
+    if (partialResult.length != 2) {
+      throw new IllegalArgumentException("partialResult of Avg should be 2");
+    }
+    countValue += partialResult[0].getLong(0);
+    updateSumValue(partialResult[1].getObject(0));
+  }
 
   @Override
-  public void addStatistics(Statistics statistics) {}
+  public void addStatistics(Statistics statistics) {
+    countValue += statistics.getCount();
+    if (statistics instanceof IntegerStatistics) {
+      sumValue += statistics.getSumLongValue();
+    } else {
+      sumValue += statistics.getSumDoubleValue();
+    }
+  }
 
+  // Set sumValue to finalResult and keep countValue equals to 1
   @Override
-  public void setFinal(Column finalResult) {}
+  public void setFinal(Column finalResult) {
+    reset();
+    updateSumValue(finalResult.getObject(0));
+  }
 
   @Override
-  public void outputIntermediate(ColumnBuilder[] tsBlockBuilder) {}
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    columnBuilders[0].writeLong(countValue);
+    columnBuilders[1].writeDouble(sumValue);
+  }
 
   @Override
-  public void outputFinal(ColumnBuilder tsBlockBuilder) {}
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    columnBuilder.writeDouble(sumValue / countValue);
+  }
 
   @Override
   public void reset() {
@@ -61,26 +100,35 @@ public class AvgAccumulator implements Accumulator {
     return false;
   }
 
-  private void updateAvg(TSDataType type, Object sumVal) throws UnSupportedDataTypeException {
-    double val;
-    switch (type) {
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {TSDataType.INT64, TSDataType.DOUBLE};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return TSDataType.DOUBLE;
+  }
+
+  private void updateSumValue(Object sumVal) throws UnSupportedDataTypeException {
+    switch (seriesDataType) {
       case INT32:
-        val = (int) sumVal;
+        sumValue += (int) sumVal;
         break;
       case INT64:
-        val = (long) sumVal;
+        sumValue = (long) sumVal;
         break;
       case FLOAT:
-        val = (float) sumVal;
+        sumValue = (float) sumVal;
         break;
       case DOUBLE:
-        val = (double) sumVal;
+        sumValue = (double) sumVal;
         break;
       case TEXT:
       case BOOLEAN:
       default:
         throw new UnSupportedDataTypeException(
-            String.format("Unsupported data type in aggregation AVG : %s", type));
+            String.format("Unsupported data type in aggregation AVG : %s", seriesDataType));
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
index db266b55e4..84dae3f986 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.operator.aggregation;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -29,6 +30,8 @@ public class CountAccumulator implements Accumulator {
 
   private long countValue = 0;
 
+  public CountAccumulator() {}
+
   // Column should be like: | Time | Value |
   @Override
   public void addInput(Column[] column, TimeRange timeRange) {
@@ -42,12 +45,13 @@ public class CountAccumulator implements Accumulator {
     }
   }
 
-  // partialResult should be like: | partialCountValue1 | partialCountValue2 |
+  // partialResult should be like: | partialCountValue1 |
   @Override
   public void addIntermediate(Column[] partialResult) {
-    for (int i = 0; i < partialResult.length; i++) {
-      countValue += partialResult[i].getLong(0);
+    if (partialResult.length != 1) {
+      throw new IllegalArgumentException("partialResult of Count should be 1");
     }
+    countValue += partialResult[0].getLong(0);
   }
 
   @Override
@@ -81,4 +85,14 @@ public class CountAccumulator implements Accumulator {
   public boolean hasFinalResult() {
     return false;
   }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {TSDataType.INT64};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return TSDataType.INT64;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/ExtremeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/ExtremeAccumulator.java
new file mode 100644
index 0000000000..599feec646
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/ExtremeAccumulator.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.operator.aggregation;
+
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+public class ExtremeAccumulator implements Accumulator {
+
+  private TsPrimitiveType extremeResult;
+  private boolean hasCandidateResult;
+
+  public ExtremeAccumulator(TSDataType seriesDataType) {
+    this.extremeResult = TsPrimitiveType.getByType(seriesDataType);
+  }
+
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    TimeColumn timeColumn = (TimeColumn) column[0];
+    for (int i = 0; i < timeColumn.getPositionCount(); i++) {
+      long curTime = timeColumn.getLong(i);
+      if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
+        break;
+      }
+      updateResult((Comparable<Object>) column[1].getObject(i));
+    }
+  }
+
+  @Override
+  public void addIntermediate(Column[] partialResult) {
+    if (partialResult.length != 1) {
+      throw new IllegalArgumentException("partialResult of ExtremeValue should be 1");
+    }
+    updateResult((Comparable<Object>) partialResult[0].getObject(0));
+  }
+
+  @Override
+  public void addStatistics(Statistics statistics) {
+    Comparable<Object> maxVal = (Comparable<Object>) statistics.getMaxValue();
+    Comparable<Object> minVal = (Comparable<Object>) statistics.getMinValue();
+
+    Comparable<Object> absMaxVal = (Comparable<Object>) getAbsValue(maxVal);
+    Comparable<Object> absMinVal = (Comparable<Object>) getAbsValue(minVal);
+
+    Comparable<Object> extVal = absMaxVal.compareTo(absMinVal) >= 0 ? maxVal : minVal;
+    updateResult(extVal);
+  }
+
+  @Override
+  public void setFinal(Column finalResult) {
+    extremeResult.setObject(finalResult.getObject(0));
+  }
+
+  // columnBuilder should be single in ExtremeAccumulator
+  @Override
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    columnBuilders[0].writeObject(extremeResult.getValue());
+  }
+
+  @Override
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    columnBuilder.writeObject(extremeResult.getValue());
+  }
+
+  @Override
+  public void reset() {
+    hasCandidateResult = false;
+    extremeResult.reset();
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {extremeResult.getDataType()};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return extremeResult.getDataType();
+  }
+
+  public Object getAbsValue(Object v) {
+    switch (extremeResult.getDataType()) {
+      case DOUBLE:
+        return Math.abs((Double) v);
+      case FLOAT:
+        return Math.abs((Float) v);
+      case INT32:
+        return Math.abs((Integer) v);
+      case INT64:
+        return Math.abs((Long) v);
+      default:
+        throw new UnSupportedDataTypeException(String.valueOf(extremeResult.getDataType()));
+    }
+  }
+
+  private void updateResult(Comparable<Object> extVal) {
+    if (extVal == null) {
+      return;
+    }
+
+    Comparable<Object> absExtVal = (Comparable<Object>) getAbsValue(extVal);
+    Comparable<Object> candidateResult = (Comparable<Object>) extremeResult.getValue();
+    Comparable<Object> absCandidateResult =
+        (Comparable<Object>) getAbsValue(extremeResult.getValue());
+
+    if (!hasCandidateResult
+        || (absExtVal.compareTo(absCandidateResult) > 0
+            || (absExtVal.compareTo(absCandidateResult) == 0
+                && extVal.compareTo(candidateResult) > 0))) {
+      hasCandidateResult = true;
+      extremeResult.setObject(extVal);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
similarity index 89%
copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
index fb49e8c8c1..49ad6a5133 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
@@ -17,6 +17,5 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.operator.aggregation;
-
-public class AggregatorFactory {}
+package org.apache.iotdb.db.mpp.operator.aggregation;public class FirstValueAccumulator {
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
similarity index 89%
copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
index fb49e8c8c1..97183aae52 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
@@ -17,6 +17,5 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.operator.aggregation;
-
-public class AggregatorFactory {}
+package org.apache.iotdb.db.mpp.operator.aggregation;public class LastValueAccumulator {
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java
similarity index 90%
copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java
index fb49e8c8c1..c5d03f8442 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java
@@ -17,6 +17,5 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.operator.aggregation;
-
-public class AggregatorFactory {}
+package org.apache.iotdb.db.mpp.operator.aggregation;public class MaxTimeAccumulator {
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxValueAccumulator.java
similarity index 59%
copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxValueAccumulator.java
index db266b55e4..bb3f1d880a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxValueAccumulator.java
@@ -19,15 +19,22 @@
 
 package org.apache.iotdb.db.mpp.operator.aggregation;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
-public class CountAccumulator implements Accumulator {
+public class MaxValueAccumulator implements Accumulator {
 
-  private long countValue = 0;
+  private TsPrimitiveType maxResult;
+  private boolean hasCandidateResult;
+
+  public MaxValueAccumulator(TSDataType seriesDataType) {
+    this.maxResult = TsPrimitiveType.getByType(seriesDataType);
+  }
 
   // Column should be like: | Time | Value |
   @Override
@@ -38,47 +45,70 @@ public class CountAccumulator implements Accumulator {
       if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
         break;
       }
-      countValue++;
+      updateResult((Comparable<Object>) column[1].getObject(i));
     }
   }
 
-  // partialResult should be like: | partialCountValue1 | partialCountValue2 |
+  // partialResult should be like: | partialMaxValue1 |
   @Override
   public void addIntermediate(Column[] partialResult) {
-    for (int i = 0; i < partialResult.length; i++) {
-      countValue += partialResult[i].getLong(0);
+    if (partialResult.length != 1) {
+      throw new IllegalArgumentException("partialResult of MaxValue should be 1");
     }
+    updateResult((Comparable<Object>) partialResult[0].getObject(0));
   }
 
   @Override
   public void addStatistics(Statistics statistics) {
-    countValue += statistics.getCount();
+    Comparable<Object> maxValue = (Comparable<Object>) statistics.getMaxValue();
+    updateResult(maxValue);
   }
 
   // finalResult should be single column, like: | finalCountValue |
   @Override
   public void setFinal(Column finalResult) {
-    countValue = finalResult.getLong(0);
+    maxResult.setObject(finalResult.getObject(0));
   }
 
   // columnBuilder should be single in countAccumulator
   @Override
   public void outputIntermediate(ColumnBuilder[] columnBuilders) {
-    columnBuilders[0].writeLong(countValue);
+    columnBuilders[0].writeObject(maxResult.getValue());
   }
 
   @Override
   public void outputFinal(ColumnBuilder columnBuilder) {
-    columnBuilder.writeLong(countValue);
+    columnBuilder.writeObject(maxResult.getValue());
   }
 
   @Override
   public void reset() {
-    this.countValue = 0;
+    hasCandidateResult = false;
+    this.maxResult.reset();
   }
 
   @Override
   public boolean hasFinalResult() {
     return false;
   }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {maxResult.getDataType()};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return maxResult.getDataType();
+  }
+
+  private void updateResult(Comparable<Object> minVal) {
+    if (minVal == null) {
+      return;
+    }
+    if (!hasCandidateResult || minVal.compareTo(maxResult.getValue()) > 0) {
+      hasCandidateResult = true;
+      maxResult.setObject(minVal);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
similarity index 90%
rename from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
index fb49e8c8c1..95bf611acf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
@@ -17,6 +17,5 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.operator.aggregation;
-
-public class AggregatorFactory {}
+package org.apache.iotdb.db.mpp.operator.aggregation;public class MinTimeAccumulator {
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinValueAccumulator.java
similarity index 58%
copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinValueAccumulator.java
index db266b55e4..97f46724ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinValueAccumulator.java
@@ -19,15 +19,22 @@
 
 package org.apache.iotdb.db.mpp.operator.aggregation;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
-public class CountAccumulator implements Accumulator {
+public class MinValueAccumulator implements Accumulator {
 
-  private long countValue = 0;
+  private TsPrimitiveType minResult;
+  private boolean hasCandidateResult;
+
+  public MinValueAccumulator(TSDataType seriesDataType) {
+    this.minResult = TsPrimitiveType.getByType(seriesDataType);
+  }
 
   // Column should be like: | Time | Value |
   @Override
@@ -38,47 +45,70 @@ public class CountAccumulator implements Accumulator {
       if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
         break;
       }
-      countValue++;
+      updateResult((Comparable<Object>) column[1].getObject(i));
     }
   }
 
-  // partialResult should be like: | partialCountValue1 | partialCountValue2 |
+  // partialResult should be like: | partialMinValue1 |
   @Override
   public void addIntermediate(Column[] partialResult) {
-    for (int i = 0; i < partialResult.length; i++) {
-      countValue += partialResult[i].getLong(0);
+    if (partialResult.length != 1) {
+      throw new IllegalArgumentException("partialResult of MinValue should be 1");
     }
+    updateResult((Comparable<Object>) partialResult[0].getObject(0));
   }
 
   @Override
   public void addStatistics(Statistics statistics) {
-    countValue += statistics.getCount();
+    Comparable<Object> minVal = (Comparable<Object>) statistics.getMinValue();
+    updateResult(minVal);
   }
 
   // finalResult should be single column, like: | finalCountValue |
   @Override
   public void setFinal(Column finalResult) {
-    countValue = finalResult.getLong(0);
+    minResult.setObject(finalResult.getObject(0));
   }
 
-  // columnBuilder should be single in countAccumulator
+  // columnBuilder should be single in MinValueAccumulator
   @Override
   public void outputIntermediate(ColumnBuilder[] columnBuilders) {
-    columnBuilders[0].writeLong(countValue);
+    columnBuilders[0].writeObject(minResult.getValue());
   }
 
   @Override
   public void outputFinal(ColumnBuilder columnBuilder) {
-    columnBuilder.writeLong(countValue);
+    columnBuilder.writeObject(minResult.getValue());
   }
 
   @Override
   public void reset() {
-    this.countValue = 0;
+    hasCandidateResult = false;
+    this.minResult.reset();
   }
 
   @Override
   public boolean hasFinalResult() {
     return false;
   }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {minResult.getDataType()};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return minResult.getDataType();
+  }
+
+  private void updateResult(Comparable<Object> minVal) {
+    if (minVal == null) {
+      return;
+    }
+    if (!hasCandidateResult || minVal.compareTo(minResult.getValue()) < 0) {
+      hasCandidateResult = true;
+      minResult.setObject(minVal);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/SumAccumulator.java
similarity index 52%
copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/SumAccumulator.java
index db266b55e4..7dd3896b1c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/SumAccumulator.java
@@ -19,15 +19,23 @@
 
 package org.apache.iotdb.db.mpp.operator.aggregation;
 
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
 
-public class CountAccumulator implements Accumulator {
+public class SumAccumulator implements Accumulator {
 
-  private long countValue = 0;
+  private TSDataType seriesDataType;
+  private double sumValue = 0;
+
+  public SumAccumulator(TSDataType seriesDataType) {
+    this.seriesDataType = seriesDataType;
+  }
 
   // Column should be like: | Time | Value |
   @Override
@@ -38,47 +46,85 @@ public class CountAccumulator implements Accumulator {
       if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) {
         break;
       }
-      countValue++;
+      updateSumValue(column[1].getObject(i));
     }
   }
 
-  // partialResult should be like: | partialCountValue1 | partialCountValue2 |
+  // partialResult should be like: | partialSumValue1 |
   @Override
   public void addIntermediate(Column[] partialResult) {
-    for (int i = 0; i < partialResult.length; i++) {
-      countValue += partialResult[i].getLong(0);
+    if (partialResult.length != 1) {
+      throw new IllegalArgumentException("partialResult of Sum should be 1");
     }
+    updateSumValue(partialResult[0].getObject(0));
   }
 
   @Override
   public void addStatistics(Statistics statistics) {
-    countValue += statistics.getCount();
+    if (statistics instanceof IntegerStatistics) {
+      sumValue += statistics.getSumLongValue();
+    } else {
+      sumValue += statistics.getSumDoubleValue();
+    }
   }
 
-  // finalResult should be single column, like: | finalCountValue |
+  // finalResult should be single column, like: | finalSumValue |
   @Override
   public void setFinal(Column finalResult) {
-    countValue = finalResult.getLong(0);
+    reset();
+    updateSumValue(finalResult.getObject(0));
   }
 
   // columnBuilder should be single in countAccumulator
   @Override
   public void outputIntermediate(ColumnBuilder[] columnBuilders) {
-    columnBuilders[0].writeLong(countValue);
+    columnBuilders[0].writeDouble(sumValue);
   }
 
   @Override
   public void outputFinal(ColumnBuilder columnBuilder) {
-    columnBuilder.writeLong(countValue);
+    columnBuilder.writeDouble(sumValue);
   }
 
   @Override
   public void reset() {
-    this.countValue = 0;
+    this.sumValue = 0;
   }
 
   @Override
   public boolean hasFinalResult() {
     return false;
   }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {TSDataType.DOUBLE};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return TSDataType.DOUBLE;
+  }
+
+  private void updateSumValue(Object sumVal) throws UnSupportedDataTypeException {
+    switch (seriesDataType) {
+      case INT32:
+        sumValue += (int) sumVal;
+        break;
+      case INT64:
+        sumValue = (long) sumVal;
+        break;
+      case FLOAT:
+        sumValue = (float) sumVal;
+        break;
+      case DOUBLE:
+        sumValue = (double) sumVal;
+        break;
+      case TEXT:
+      case BOOLEAN:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in aggregation AVG : %s", seriesDataType));
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
index 8966b1f57b..b679febcca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.utils.timerangeiterator.SingleTimeWindowIterator;
 import org.apache.iotdb.db.utils.timerangeiterator.TimeRangeIteratorFactory;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -39,9 +40,10 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import com.google.common.util.concurrent.ListenableFuture;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 /**
  * This operator is responsible to do the aggregation calculation for one series based on global
@@ -94,9 +96,11 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
             null,
             ascending);
     this.aggregators = aggregators;
-    tsBlockBuilder =
-        new TsBlockBuilder(
-            aggregators.stream().map(Aggregator::getOutputType).collect(Collectors.toList()));
+    List<TSDataType> dataTypes = new ArrayList<>();
+    for (Aggregator aggregator : aggregators) {
+      dataTypes.addAll(Arrays.asList(aggregator.getOutputType()));
+    }
+    tsBlockBuilder = new TsBlockBuilder(dataTypes);
     this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter);
   }
 
@@ -216,10 +220,14 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
     // Use start time of current time range as time column
     timeColumnBuilder.writeLong(curTimeRange.getMin());
     ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
-    for (int i = 0; i < aggregators.size(); i++) {
-      ColumnBuilder[] columnBuilder = new ColumnBuilder[1];
-      columnBuilder[0] = columnBuilders[i];
-      aggregators.get(i).outputResult(columnBuilder);
+    int columnIndex = 0;
+    for (Aggregator aggregator : aggregators) {
+      ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length];
+      columnBuilder[0] = columnBuilders[columnIndex++];
+      if (columnBuilder.length > 1) {
+        columnBuilder[1] = columnBuilders[columnIndex++];
+      }
+      aggregator.outputResult(columnBuilder);
     }
     tsBlockBuilder.declarePosition();
     resultTsBlock = tsBlockBuilder.build();
@@ -261,7 +269,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator {
   @SuppressWarnings("squid:S3776")
   private void calcFromBatch(TsBlock tsBlock, TimeRange curTimeRange) {
     // check if the batchData does not contain points in current interval
-    if (!satisfied(tsBlock, curTimeRange)) {
+    if (tsBlock == null || !satisfied(tsBlock, curTimeRange)) {
       return;
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/AggregationDescriptor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/AggregationDescriptor.java
index 244fbcf7b8..8f3dbff4fe 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/AggregationDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/AggregationDescriptor.java
@@ -59,6 +59,10 @@ public class AggregationDescriptor {
     return aggregationType;
   }
 
+  public AggregationStep getStep() {
+    return step;
+  }
+
   public void serialize(ByteBuffer byteBuffer) {
     ReadWriteIOUtils.write(aggregationType.ordinal(), byteBuffer);
     step.serialize(byteBuffer);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
index bfaced0670..c21314b5d9 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
@@ -30,10 +30,12 @@ import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.operator.aggregation.AccumulatorFactory;
 import org.apache.iotdb.db.mpp.operator.aggregation.Aggregator;
 import org.apache.iotdb.db.mpp.operator.source.SeriesAggregateScanOperator;
 import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
@@ -89,7 +91,14 @@ public class SeriesAggregateScanOperatorTest {
   public void testAggregationWithoutTimeFilter() throws IllegalPathException {
     SeriesAggregateScanOperator seriesAggregateScanOperator =
         initSeriesAggregateScanOperator(
-            Collections.singletonList(AggregationType.COUNT), null, true, null);
+            Collections.singletonList(
+                new Aggregator(
+                    AccumulatorFactory.createAccumulator(AggregationType.COUNT, TSDataType.INT32),
+                    AggregationStep.SINGLE,
+                    null)),
+            null,
+            true,
+            null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -105,7 +114,7 @@ public class SeriesAggregateScanOperatorTest {
     aggregationTypes.add(AggregationType.COUNT);
     aggregationTypes.add(AggregationType.SUM);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(aggregationTypes, null, true, null);
+        initSeriesAggregateScanOperator(null, null, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -126,7 +135,7 @@ public class SeriesAggregateScanOperatorTest {
     aggregationTypes.add(AggregationType.MAX_VALUE);
     aggregationTypes.add(AggregationType.MIN_VALUE);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(aggregationTypes, null, true, null);
+        initSeriesAggregateScanOperator(null, null, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -145,8 +154,7 @@ public class SeriesAggregateScanOperatorTest {
   public void testAggregationWithTimeFilter1() throws IllegalPathException {
     Filter timeFilter = TimeFilter.gtEq(120);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(
-            Collections.singletonList(AggregationType.COUNT), timeFilter, true, null);
+        initSeriesAggregateScanOperator(null, timeFilter, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -160,8 +168,7 @@ public class SeriesAggregateScanOperatorTest {
   public void testAggregationWithTimeFilter2() throws IllegalPathException {
     Filter timeFilter = TimeFilter.ltEq(379);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(
-            Collections.singletonList(AggregationType.COUNT), timeFilter, true, null);
+        initSeriesAggregateScanOperator(null, timeFilter, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -175,8 +182,7 @@ public class SeriesAggregateScanOperatorTest {
   public void testAggregationWithTimeFilter3() throws IllegalPathException {
     Filter timeFilter = new AndFilter(TimeFilter.gtEq(100), TimeFilter.ltEq(399));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(
-            Collections.singletonList(AggregationType.COUNT), timeFilter, true, null);
+        initSeriesAggregateScanOperator(null, timeFilter, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -197,7 +203,7 @@ public class SeriesAggregateScanOperatorTest {
     aggregationTypes.add(AggregationType.MIN_VALUE);
     Filter timeFilter = new AndFilter(TimeFilter.gtEq(100), TimeFilter.ltEq(399));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(aggregationTypes, timeFilter, true, null);
+        initSeriesAggregateScanOperator(null, timeFilter, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -217,8 +223,7 @@ public class SeriesAggregateScanOperatorTest {
     int[] result = new int[] {100, 100, 100, 100};
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(
-            Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter);
+        initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -235,11 +240,7 @@ public class SeriesAggregateScanOperatorTest {
     Filter timeFilter = new AndFilter(TimeFilter.gtEq(120), TimeFilter.ltEq(379));
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(
-            Collections.singletonList(AggregationType.COUNT),
-            timeFilter,
-            true,
-            groupByTimeParameter);
+        initSeriesAggregateScanOperator(null, timeFilter, true, groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -266,7 +267,7 @@ public class SeriesAggregateScanOperatorTest {
     aggregationTypes.add(AggregationType.MIN_VALUE);
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(aggregationTypes, null, true, groupByTimeParameter);
+        initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -285,8 +286,7 @@ public class SeriesAggregateScanOperatorTest {
     int[] result = new int[] {50, 50, 50, 50, 50, 50, 50, 50};
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 50, true);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(
-            Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter);
+        initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -303,8 +303,7 @@ public class SeriesAggregateScanOperatorTest {
     int[] result = new int[] {20, 10, 20, 10, 20, 10, 20, 10, 20, 9};
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 149, 50, 30, true);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(
-            Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter);
+        initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -332,7 +331,7 @@ public class SeriesAggregateScanOperatorTest {
     aggregationTypes.add(AggregationType.MIN_VALUE);
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 149, 50, 30, true);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(aggregationTypes, null, true, groupByTimeParameter);
+        initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
index 7bfd218374..8251fe9438 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
@@ -29,8 +29,34 @@ public abstract class TsPrimitiveType implements Serializable {
   /**
    * get tsPrimitiveType by resultDataType.
    *
-   * @param dataType -given TsDataType
-   * @param v -
+   * @param dataType given TsDataType
+   */
+  public static TsPrimitiveType getByType(TSDataType dataType) {
+    switch (dataType) {
+      case BOOLEAN:
+        return new TsPrimitiveType.TsBoolean();
+      case INT32:
+        return new TsPrimitiveType.TsInt();
+      case INT64:
+        return new TsPrimitiveType.TsLong();
+      case FLOAT:
+        return new TsPrimitiveType.TsFloat();
+      case DOUBLE:
+        return new TsPrimitiveType.TsDouble();
+      case TEXT:
+        return new TsPrimitiveType.TsBinary();
+      case VECTOR:
+        return new TsPrimitiveType.TsVector();
+      default:
+        throw new UnSupportedDataTypeException("Unsupported data type:" + dataType);
+    }
+  }
+
+  /**
+   * get tsPrimitiveType by resultDataType and initial value.
+   *
+   * @param dataType given TsDataType
+   * @param v initial value
    */
   public static TsPrimitiveType getByType(TSDataType dataType, Object v) {
     switch (dataType) {
@@ -109,6 +135,10 @@ public abstract class TsPrimitiveType implements Serializable {
     throw new UnsupportedOperationException("setVector() is not supported for current sub-class");
   }
 
+  public abstract void setObject(Object val);
+
+  public abstract void reset();
+
   /**
    * get the size of one instance of current class.
    *
@@ -142,6 +172,8 @@ public abstract class TsPrimitiveType implements Serializable {
 
     private boolean value;
 
+    public TsBoolean() {}
+
     public TsBoolean(boolean value) {
       this.value = value;
     }
@@ -156,6 +188,19 @@ public abstract class TsPrimitiveType implements Serializable {
       this.value = val;
     }
 
+    @Override
+    public void setObject(Object val) {
+      if (val instanceof Binary) {
+        setBinary((Binary) val);
+      }
+      throw new UnSupportedDataTypeException("TsBoolean can only be set Binary value");
+    }
+
+    @Override
+    public void reset() {
+      value = false;
+    }
+
     @Override
     public int getSize() {
       return 4 + 1;
@@ -198,6 +243,8 @@ public abstract class TsPrimitiveType implements Serializable {
 
     private int value;
 
+    public TsInt() {}
+
     public TsInt(int value) {
       this.value = value;
     }
@@ -212,6 +259,19 @@ public abstract class TsPrimitiveType implements Serializable {
       this.value = val;
     }
 
+    @Override
+    public void setObject(Object val) {
+      if (val instanceof Integer) {
+        setInt((Integer) val);
+      }
+      throw new UnSupportedDataTypeException("TsInt can only be set Integer value");
+    }
+
+    @Override
+    public void reset() {
+      value = 0;
+    }
+
     @Override
     public int getSize() {
       return 4 + 4;
@@ -254,6 +314,8 @@ public abstract class TsPrimitiveType implements Serializable {
 
     private long value;
 
+    public TsLong() {}
+
     public TsLong(long value) {
       this.value = value;
     }
@@ -268,6 +330,19 @@ public abstract class TsPrimitiveType implements Serializable {
       this.value = val;
     }
 
+    @Override
+    public void setObject(Object val) {
+      if (val instanceof Long) {
+        setLong((Long) val);
+      }
+      throw new UnSupportedDataTypeException("TsLong can only be set Long value");
+    }
+
+    @Override
+    public void reset() {
+      value = 0;
+    }
+
     @Override
     public int getSize() {
       return 4 + 8;
@@ -310,6 +385,8 @@ public abstract class TsPrimitiveType implements Serializable {
 
     private float value;
 
+    public TsFloat() {}
+
     public TsFloat(float value) {
       this.value = value;
     }
@@ -324,6 +401,19 @@ public abstract class TsPrimitiveType implements Serializable {
       this.value = val;
     }
 
+    @Override
+    public void setObject(Object val) {
+      if (val instanceof Float) {
+        setFloat((Float) val);
+      }
+      throw new UnSupportedDataTypeException("TsFloat can only be set float value");
+    }
+
+    @Override
+    public void reset() {
+      value = 0;
+    }
+
     @Override
     public int getSize() {
       return 4 + 4;
@@ -366,6 +456,8 @@ public abstract class TsPrimitiveType implements Serializable {
 
     private double value;
 
+    public TsDouble() {}
+
     public TsDouble(double value) {
       this.value = value;
     }
@@ -380,6 +472,19 @@ public abstract class TsPrimitiveType implements Serializable {
       this.value = val;
     }
 
+    @Override
+    public void setObject(Object val) {
+      if (val instanceof Double) {
+        setDouble((Double) val);
+      }
+      throw new UnSupportedDataTypeException("TsDouble can only be set Double value");
+    }
+
+    @Override
+    public void reset() {
+      value = 0.0;
+    }
+
     @Override
     public int getSize() {
       return 4 + 8;
@@ -422,6 +527,8 @@ public abstract class TsPrimitiveType implements Serializable {
 
     private Binary value;
 
+    public TsBinary() {}
+
     public TsBinary(Binary value) {
       this.value = value;
     }
@@ -436,6 +543,19 @@ public abstract class TsPrimitiveType implements Serializable {
       this.value = val;
     }
 
+    @Override
+    public void setObject(Object val) {
+      if (val instanceof Binary) {
+        setBinary((Binary) val);
+      }
+      throw new UnSupportedDataTypeException("TsBinary can only be set Binary value");
+    }
+
+    @Override
+    public void reset() {
+      value = null;
+    }
+
     @Override
     public int getSize() {
       return 4 + 4 + value.getLength();
@@ -478,6 +598,8 @@ public abstract class TsPrimitiveType implements Serializable {
 
     private TsPrimitiveType[] values;
 
+    public TsVector() {}
+
     public TsVector(TsPrimitiveType[] values) {
       this.values = values;
     }
@@ -492,6 +614,19 @@ public abstract class TsPrimitiveType implements Serializable {
       this.values = vals;
     }
 
+    @Override
+    public void setObject(Object val) {
+      if (val instanceof TsPrimitiveType[]) {
+        setVector((TsPrimitiveType[]) val);
+      }
+      throw new UnSupportedDataTypeException("TsVector can only be set TsPrimitiveType[] value");
+    }
+
+    @Override
+    public void reset() {
+      values = null;
+    }
+
     @Override
     public int getSize() {
       int size = 0;


[iotdb] 06/09: implement part accumulator 2

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch aggregator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit af1cc09402f3a15ea7ce47ce5af079fe2a0b3d3b
Author: Alima777 <wx...@gmail.com>
AuthorDate: Fri Apr 29 20:04:46 2022 +0800

    implement part accumulator 2
---
 .../db/mpp/operator/aggregation/Accumulator.java   |  3 +-
 .../aggregation/FirstValueAccumulator.java         | 90 +++++++++++++++++++++-
 .../operator/aggregation/LastValueAccumulator.java | 90 +++++++++++++++++++++-
 .../operator/aggregation/MaxTimeAccumulator.java   | 80 ++++++++++++++++++-
 .../operator/aggregation/MinTimeAccumulator.java   | 80 ++++++++++++++++++-
 5 files changed, 338 insertions(+), 5 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java
index 5833a75959..eaa7dd99e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java
@@ -59,7 +59,8 @@ public interface Accumulator {
   void reset();
 
   /**
-   * For first_value or last_value in decreasing order, we can get final result by the first record.
+   * This method can only be used in seriesAggregateScanOperator. For first_value or last_value in
+   * decreasing order, we can get final result by the first record.
    */
   boolean hasFinalResult();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
index 49ad6a5133..6af6164b49 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
@@ -17,5 +17,93 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.operator.aggregation;public class FirstValueAccumulator {
+package org.apache.iotdb.db.mpp.operator.aggregation;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+public class FirstValueAccumulator implements Accumulator {
+
+  private boolean hasCandidateResult;
+  private TsPrimitiveType firstValue;
+  private long minTime = Long.MAX_VALUE;
+
+  public FirstValueAccumulator(TSDataType seriesDataType) {
+    firstValue = TsPrimitiveType.getByType(seriesDataType);
+  }
+
+  // Column should be like: | Time | Value |
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    long curTime = column[0].getLong(0);
+    if (curTime < timeRange.getMax() && curTime >= timeRange.getMin()) {
+      updateFirstValue(column[1].getObject(0), curTime);
+    }
+  }
+
+  // partialResult should be like: | FirstValue | MinTime |
+  @Override
+  public void addIntermediate(Column[] partialResult) {
+    if (partialResult.length != 2) {
+      throw new IllegalArgumentException("partialResult of FirstValue should be 2");
+    }
+    updateFirstValue(partialResult[0].getObject(0), partialResult[1].getLong(0));
+  }
+
+  @Override
+  public void addStatistics(Statistics statistics) {
+    updateFirstValue(statistics.getFirstValue(), statistics.getStartTime());
+  }
+
+  // finalResult should be single column, like: | finalFirstValue |
+  @Override
+  public void setFinal(Column finalResult) {
+    reset();
+    firstValue.setObject(finalResult.getObject(0));
+  }
+
+  // columnBuilder should be double in FirstValueAccumulator
+  @Override
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    columnBuilders[0].writeObject(firstValue.getValue());
+    columnBuilders[1].writeLong(minTime);
+  }
+
+  @Override
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    columnBuilder.writeObject(firstValue.getValue());
+  }
+
+  @Override
+  public void reset() {
+    this.minTime = Long.MAX_VALUE;
+    this.firstValue.reset();
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return hasCandidateResult;
+  }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {firstValue.getDataType(), TSDataType.INT64};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return firstValue.getDataType();
+  }
+
+  private void updateFirstValue(Object value, long curTime) {
+    hasCandidateResult = true;
+    if (curTime < minTime) {
+      minTime = curTime;
+      firstValue.setObject(value);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
index 97183aae52..1ecd65ae61 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
@@ -17,5 +17,93 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.operator.aggregation;public class LastValueAccumulator {
+package org.apache.iotdb.db.mpp.operator.aggregation;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
+
+public class LastValueAccumulator implements Accumulator {
+
+  private TsPrimitiveType lastValue;
+  private long maxTime = Long.MIN_VALUE;
+
+  public LastValueAccumulator(TSDataType seriesDataType) {
+    lastValue = TsPrimitiveType.getByType(seriesDataType);
+  }
+
+  // Column should be like: | Time | Value |
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax()) {
+        updateLastValue(column[1].getObject(0), curTime);
+      }
+    }
+  }
+
+  // partialResult should be like: | LastValue | MaxTime |
+  @Override
+  public void addIntermediate(Column[] partialResult) {
+    if (partialResult.length != 2) {
+      throw new IllegalArgumentException("partialResult of LastValue should be 2");
+    }
+    updateLastValue(partialResult[0].getObject(0), partialResult[1].getLong(0));
+  }
+
+  @Override
+  public void addStatistics(Statistics statistics) {
+    updateLastValue(statistics.getLastValue(), statistics.getEndTime());
+  }
+
+  // finalResult should be single column, like: | finalLastValue |
+  @Override
+  public void setFinal(Column finalResult) {
+    reset();
+    lastValue.setObject(finalResult.getObject(0));
+  }
+
+  // columnBuilder should be double in LastValueAccumulator
+  @Override
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    columnBuilders[0].writeObject(lastValue.getValue());
+    columnBuilders[1].writeLong(maxTime);
+  }
+
+  @Override
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    columnBuilder.writeObject(lastValue.getValue());
+  }
+
+  @Override
+  public void reset() {
+    this.maxTime = Long.MIN_VALUE;
+    this.lastValue.reset();
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {lastValue.getDataType(), TSDataType.INT64};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return lastValue.getDataType();
+  }
+
+  private void updateLastValue(Object value, long curTime) {
+    if (curTime > maxTime) {
+      maxTime = curTime;
+      lastValue.setObject(value);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java
index c5d03f8442..3addbf26d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java
@@ -17,5 +17,83 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.operator.aggregation;public class MaxTimeAccumulator {
+package org.apache.iotdb.db.mpp.operator.aggregation;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+public class MaxTimeAccumulator implements Accumulator {
+
+  private long maxTime = Long.MIN_VALUE;
+
+  public MaxTimeAccumulator() {}
+
+  // Column should be like: | Time |
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax()) {
+        updateMaxTime(curTime);
+      }
+    }
+  }
+
+  // partialResult should be like: | partialMaxTimeValue |
+  @Override
+  public void addIntermediate(Column[] partialResult) {
+    if (partialResult.length != 1) {
+      throw new IllegalArgumentException("partialResult of MaxTime should be 1");
+    }
+    updateMaxTime(partialResult[0].getLong(0));
+  }
+
+  @Override
+  public void addStatistics(Statistics statistics) {
+    updateMaxTime(statistics.getEndTime());
+  }
+
+  // finalResult should be single column, like: | finalMaxTime |
+  @Override
+  public void setFinal(Column finalResult) {
+    maxTime = finalResult.getLong(0);
+  }
+
+  // columnBuilder should be single in maxTimeAccumulator
+  @Override
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    columnBuilders[0].writeLong(maxTime);
+  }
+
+  @Override
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    columnBuilder.writeLong(maxTime);
+  }
+
+  @Override
+  public void reset() {
+    this.maxTime = Long.MIN_VALUE;
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {TSDataType.INT64};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return TSDataType.INT64;
+  }
+
+  private void updateMaxTime(long curTime) {
+    maxTime = Math.max(maxTime, curTime);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
index 95bf611acf..893d8436eb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
@@ -17,5 +17,83 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.operator.aggregation;public class MinTimeAccumulator {
+package org.apache.iotdb.db.mpp.operator.aggregation;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+
+public class MinTimeAccumulator implements Accumulator {
+
+  private boolean hasCandidateResult;
+  private long minTime = Long.MAX_VALUE;
+
+  public MinTimeAccumulator() {}
+
+  // Column should be like: | Time |
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    long curTime = column[0].getLong(0);
+    if (curTime < timeRange.getMax() && curTime >= timeRange.getMin()) {
+      updateMinTime(curTime);
+    }
+  }
+
+  // partialResult should be like: | partialMinTimeValue |
+  @Override
+  public void addIntermediate(Column[] partialResult) {
+    if (partialResult.length != 1) {
+      throw new IllegalArgumentException("partialResult of MinTime should be 1");
+    }
+    updateMinTime(partialResult[0].getLong(0));
+  }
+
+  @Override
+  public void addStatistics(Statistics statistics) {
+    updateMinTime(statistics.getStartTime());
+  }
+
+  // finalResult should be single column, like: | finalMinTime |
+  @Override
+  public void setFinal(Column finalResult) {
+    minTime = finalResult.getLong(0);
+  }
+
+  // columnBuilder should be single in minTimeAccumulator
+  @Override
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    columnBuilders[0].writeLong(minTime);
+  }
+
+  @Override
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    columnBuilder.writeLong(minTime);
+  }
+
+  @Override
+  public void reset() {
+    this.minTime = Long.MAX_VALUE;
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return hasCandidateResult;
+  }
+
+  @Override
+  public TSDataType[] getIntermediateType() {
+    return new TSDataType[] {TSDataType.INT64};
+  }
+
+  @Override
+  public TSDataType getFinalType() {
+    return TSDataType.INT64;
+  }
+
+  private void updateMinTime(long curTime) {
+    hasCandidateResult = true;
+    minTime = Math.min(minTime, curTime);
+  }
 }


[iotdb] 04/09: Merge branch 'master' into aggregator

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch aggregator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit be277e5a108557cc3bb6922ebfeb1a902c977d86
Merge: e9a1b58d1b 8f7fb57661
Author: Alima777 <wx...@gmail.com>
AuthorDate: Fri Apr 29 09:55:36 2022 +0800

    Merge branch 'master' into aggregator

 LICENSE                                            |   9 +
 client-py/SessionAlignedTimeseriesExample.py       |  18 +
 client-py/SessionExample.py                        |  30 +
 client-py/iotdb/Session.py                         | 132 ++++-
 client-py/tests/tablet_performance_comparison.py   |   4 +-
 client-py/tests/test_one_device.py                 | 152 +++++
 .../iotdb/confignode/cli/TemporaryClient.java      | 202 -------
 .../iotdb/confignode/client/AsyncClientPool.java   | 113 ++++
 .../client/handlers/InitRegionHandler.java         |  56 ++
 .../consensus/request/ConfigRequest.java           |  29 +-
 .../consensus/request/ConfigRequestType.java       |   1 +
 ...rageGroupReq.java => CountStorageGroupReq.java} |  19 +-
 ...aPartitionReq.java => GetDataPartitionReq.java} |  10 +-
 .../request/read/GetOrCreateDataPartitionReq.java  | 119 +---
 .../read/GetOrCreateSchemaPartitionReq.java        |  76 +--
 ...artitionReq.java => GetSchemaPartitionReq.java} |  10 +-
 .../consensus/request/read/GetStorageGroupReq.java |  21 +-
 .../consensus/request/write/CreateRegionsReq.java  |  20 +-
 ...tStorageGroupReq.java => DeleteRegionsReq.java} |  48 +-
 .../request/write/DeleteStorageGroupReq.java       |  36 +-
 .../request/write/SetStorageGroupReq.java          |   4 -
 .../confignode/manager/ClusterSchemaManager.java   | 128 ++++-
 .../iotdb/confignode/manager/ConfigManager.java    |  17 +-
 .../iotdb/confignode/manager/DataNodeManager.java  |   4 +
 .../apache/iotdb/confignode/manager/Manager.java   |  10 +-
 .../iotdb/confignode/manager/PartitionManager.java |   6 +-
 .../confignode/persistence/ClusterSchemaInfo.java  |  34 +-
 .../confignode/persistence/PartitionInfo.java      |  57 +-
 .../service/executor/ConfigRequestExecutor.java    |  21 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  22 +-
 .../consensus/request/ConfigRequestSerDeTest.java  |  89 ++-
 docs/UserGuide/Process-Data/Select-Into.md         |   6 +-
 docs/zh/UserGuide/Process-Data/Select-Into.md      |   8 +-
 .../iotdb/db/integration/IoTDBSelectIntoIT.java    |   6 +-
 .../apache/iotdb/db/client/ConfigNodeClient.java   |  21 +-
 .../iotdb/db/localconfignode/LocalConfigNode.java  |   7 +-
 .../mtree/store/disk/CachedMNodeContainer.java     |  12 +-
 .../db/metadata/tree/AbstractTreeVisitor.java      |  62 +-
 .../iotdb/db/mpp/buffer/DataBlockManager.java      |  14 +-
 .../apache/iotdb/db/mpp/buffer/ISinkHandle.java    |  19 +-
 .../apache/iotdb/db/mpp/buffer/ISourceHandle.java  |  13 +-
 .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java | 104 ++--
 .../apache/iotdb/db/mpp/buffer/SourceHandle.java   | 114 ++--
 .../org/apache/iotdb/db/mpp/execution/Driver.java  |   2 +-
 .../db/mpp/execution/FragmentInstanceManager.java  |   1 +
 .../iotdb/db/mpp/execution/QueryExecution.java     |   4 +-
 .../db/mpp/execution/config/ConfigTaskVisitor.java |   7 +
 .../org/apache/iotdb/db/mpp/memory/MemoryPool.java |  25 +-
 .../db/mpp/operator/source/ExchangeOperator.java   |   2 +-
 .../apache/iotdb/db/mpp/sql/analyze/Analysis.java  |  10 +
 .../apache/iotdb/db/mpp/sql/analyze/Analyzer.java  |  16 +-
 .../mpp/sql/analyze/ClusterPartitionFetcher.java   | 177 +++---
 .../db/mpp/sql/analyze/FakeSchemaFetcherImpl.java  |   2 +
 .../iotdb/db/mpp/sql/analyze/SchemaValidator.java  |  14 +-
 .../iotdb/db/mpp/sql/analyze/TypeProvider.java     |  77 +++
 .../db/mpp/sql/planner/DistributionPlanner.java    |  10 +-
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  |  76 ++-
 .../db/mpp/sql/planner/LogicalPlanBuilder.java     | 226 ++++++++
 .../iotdb/db/mpp/sql/planner/LogicalPlanner.java   | 194 +++----
 .../iotdb/db/mpp/sql/planner/QueryPlanBuilder.java | 364 ------------
 .../sql/planner/SimpleFragmentParallelPlanner.java |   1 +
 .../db/mpp/sql/planner/plan/PlanFragment.java      |  25 +-
 .../db/mpp/sql/planner/plan/node/PlanNode.java     |   6 -
 .../db/mpp/sql/planner/plan/node/PlanNodeType.java |  33 +-
 .../db/mpp/sql/planner/plan/node/PlanVisitor.java  |  33 +-
 .../metedata/read/AbstractSchemaMergeNode.java     |  18 -
 .../plan/node/metedata/read/DevicesCountNode.java  |  12 -
 .../node/metedata/read/DevicesSchemaScanNode.java  |  18 -
 .../metedata/read/LevelTimeSeriesCountNode.java    |  12 -
 .../plan/node/metedata/read/SchemaFetchNode.java   |  12 -
 .../node/metedata/read/TimeSeriesCountNode.java    |  12 -
 .../metedata/read/TimeSeriesSchemaScanNode.java    |  12 -
 .../node/metedata/write/AlterTimeSeriesNode.java   |  12 -
 .../write/CreateAlignedTimeSeriesNode.java         |  11 -
 .../node/metedata/write/CreateTimeSeriesNode.java  |  11 -
 .../planner/plan/node/process/AggregateNode.java   | 199 -------
 .../planner/plan/node/process/AggregationNode.java | 186 ++++++
 .../planner/plan/node/process/DeviceMergeNode.java | 160 ++----
 .../planner/plan/node/process/DeviceViewNode.java  | 166 ++++++
 .../planner/plan/node/process/ExchangeNode.java    |  12 -
 .../sql/planner/plan/node/process/FillNode.java    |  71 +--
 .../sql/planner/plan/node/process/FilterNode.java  | 103 +---
 .../planner/plan/node/process/FilterNullNode.java  |  57 +-
 .../plan/node/process/GroupByLevelNode.java        | 170 ++----
 .../planner/plan/node/process/GroupByTimeNode.java | 165 ++++++
 .../sql/planner/plan/node/process/LimitNode.java   |  59 +-
 .../sql/planner/plan/node/process/OffsetNode.java  |  35 +-
 .../process/{OffsetNode.java => ProjectNode.java}  |  93 ++-
 .../sql/planner/plan/node/process/SortNode.java    |  61 +-
 .../planner/plan/node/process/TimeJoinNode.java    | 156 +----
 .../planner/plan/node/sink/FragmentSinkNode.java   |  12 -
 .../source/AlignedSeriesAggregationScanNode.java   | 267 +++++++++
 ...iesScanNode.java => AlignedSeriesScanNode.java} | 269 ++++-----
 ...canNode.java => SeriesAggregationScanNode.java} | 251 ++++----
 .../planner/plan/node/source/SeriesScanNode.java   | 239 ++++----
 .../plan/node/write/BatchInsertNode.java}          |   9 +-
 .../plan/node/write/InsertMultiTabletsNode.java    |  15 +-
 .../sql/planner/plan/node/write/InsertRowNode.java |  19 +-
 .../planner/plan/node/write/InsertRowsNode.java    |  21 +-
 .../plan/node/write/InsertRowsOfOneDeviceNode.java | 111 +++-
 .../planner/plan/node/write/InsertTabletNode.java  |  11 -
 .../sql/planner/plan/parameter/Aggregation.java    |  43 --
 .../plan/parameter/AggregationDescriptor.java      | 102 ++++
 .../planner/plan/parameter/AggregationStep.java    |  24 +
 .../sql/planner/plan/parameter/FillDescriptor.java |  69 +++
 .../plan/parameter/FilterNullParameter.java        |  15 +-
 .../db/mpp/sql/rewriter/WildcardsRemover.java      |   8 +-
 .../db/mpp/sql/statement/StatementVisitor.java     |   4 +-
 .../db/mpp/sql/statement/component/FillPolicy.java |   4 +-
 .../db/mpp/sql/statement/component/OrderBy.java    |   4 +-
 .../db/qp/logical/crud/SelectIntoOperator.java     |  15 +
 .../apache/iotdb/db/mpp/buffer/SinkHandleTest.java |  69 +--
 .../iotdb/db/mpp/buffer/SourceHandleTest.java      |  76 ++-
 .../apache/iotdb/db/mpp/buffer/StubSinkHandle.java |  20 +-
 .../mpp/common/schematree/PathPatternTreeTest.java |   4 +-
 .../db/mpp/common/schematree/SchemaTreeTest.java   |  51 ++
 .../apache/iotdb/db/mpp/memory/MemoryPoolTest.java |  25 +
 .../db/mpp/sql/plan/FragmentInstanceSerdeTest.java |  39 +-
 .../iotdb/db/mpp/sql/plan/LogicalPlanPrinter.java  | 228 --------
 .../iotdb/db/mpp/sql/plan/LogicalPlannerTest.java  |   2 -
 .../db/mpp/sql/plan/QueryLogicalPlanUtil.java      | 631 +++++++++++++--------
 .../plan/node/process/AggregateNodeSerdeTest.java  |  69 ---
 ...erdeTest.java => AggregationNodeSerdeTest.java} |  82 +--
 .../DeviceViewNodeSerdeTest.java}                  |  44 +-
 .../plan/node/process/ExchangeNodeSerdeTest.java   |  53 +-
 .../sql/plan/node/process/FillNodeSerdeTest.java   |  60 +-
 .../sql/plan/node/process/FilterNodeSerdeTest.java |  66 +--
 .../plan/node/process/FilterNullNodeSerdeTest.java |  74 +--
 .../node/process/GroupByLevelNodeSerdeTest.java    | 119 ++--
 .../sql/plan/node/process/LimitNodeSerdeTest.java  |  86 +--
 .../sql/plan/node/process/OffsetNodeSerdeTest.java | 154 +----
 .../sql/plan/node/process/SortNodeSerdeTest.java   |  95 +---
 .../plan/node/process/TimeJoinNodeSerdeTest.java   | 118 +---
 ...ava => SeriesAggregationScanNodeSerdeTest.java} |  43 +-
 .../plan/node/source/SeriesScanNodeSerdeTest.java  |  15 +-
 .../write/InsertRowsOfOneDeviceNodeSerdeTest.java  |  19 +-
 tsfile/pom.xml                                     |   5 -
 .../apache/iotdb/tsfile/utils/BitConstructor.java  |   3 -
 .../apache/iotdb/tsfile/utils/ByteArrayList.java   | 135 +++++
 139 files changed, 4465 insertions(+), 4443 deletions(-)



[iotdb] 08/09: add desc accumulator

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch aggregator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit aa5adcef267884faf6cf97c22caa3236d44210d1
Author: Alima777 <wx...@gmail.com>
AuthorDate: Sun May 1 14:49:58 2022 +0800

    add desc accumulator
---
 .../operator/aggregation/AccumulatorFactory.java   | 14 +++---
 .../db/mpp/operator/aggregation/Aggregator.java    |  9 +++-
 .../aggregation/FirstValueAccumulator.java         |  8 ++--
 .../aggregation/FirstValueDescAccumulator.java     | 47 +++++++++++++++++++
 .../operator/aggregation/LastValueAccumulator.java |  6 +--
 .../aggregation/LastValueDescAccumulator.java      | 52 ++++++++++++++++++++++
 .../operator/aggregation/MaxTimeAccumulator.java   |  4 +-
 .../aggregation/MaxTimeDescAccumulator.java        | 47 +++++++++++++++++++
 .../operator/aggregation/MinTimeAccumulator.java   |  6 +--
 .../aggregation/MinTimeDescAccumulator.java        | 42 +++++++++++++++++
 .../operator/SeriesAggregateScanOperatorTest.java  |  6 +--
 11 files changed, 220 insertions(+), 21 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java
index e389d543be..10619b9e35 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java
@@ -26,7 +26,7 @@ public class AccumulatorFactory {
 
   // TODO: Are we going to create different seriesScanOperator based on order by sequence?
   public static Accumulator createAccumulator(
-      AggregationType aggregationType, TSDataType tsDataType) {
+      AggregationType aggregationType, TSDataType tsDataType, boolean ascending) {
     switch (aggregationType) {
       case COUNT:
         return new CountAccumulator();
@@ -37,17 +37,21 @@ public class AccumulatorFactory {
       case EXTREME:
         return new ExtremeAccumulator(tsDataType);
       case MAX_TIME:
-        return new MaxTimeAccumulator();
+        return ascending ? new MaxTimeAccumulator() : new MaxTimeDescAccumulator();
       case MIN_TIME:
-        return new MinTimeAccumulator();
+        return ascending ? new MinTimeAccumulator() : new MinTimeDescAccumulator();
       case MAX_VALUE:
         return new MaxValueAccumulator(tsDataType);
       case MIN_VALUE:
         return new MinValueAccumulator(tsDataType);
       case LAST_VALUE:
-        return new LastValueAccumulator(tsDataType);
+        return ascending
+            ? new LastValueAccumulator(tsDataType)
+            : new LastValueDescAccumulator(tsDataType);
       case FIRST_VALUE:
-        return new FirstValueAccumulator(tsDataType);
+        return ascending
+            ? new FirstValueAccumulator(tsDataType)
+            : new FirstValueDescAccumulator(tsDataType);
       default:
         throw new IllegalArgumentException("Invalid Aggregation function: " + aggregationType);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
index a808c734bc..617d89d4ae 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
@@ -34,11 +34,18 @@ public class Aggregator {
 
   private final Accumulator accumulator;
   // In some intermediate result input, inputLocation[] should include two columns
-  private final List<InputLocation[]> inputLocationList;
+  private List<InputLocation[]> inputLocationList;
   private final AggregationStep step;
 
   private TimeRange timeRange = new TimeRange(0, Long.MAX_VALUE);
 
+  // Used for SeriesAggregateScanOperator
+  public Aggregator(Accumulator accumulator, AggregationStep step) {
+    this.accumulator = accumulator;
+    this.step = step;
+  }
+
+  // Used for aggregateOperator
   public Aggregator(
       Accumulator accumulator, AggregationStep step, List<InputLocation[]> inputLocationList) {
     this.accumulator = accumulator;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
index 6af6164b49..8fa0801faf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
@@ -28,9 +28,9 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 public class FirstValueAccumulator implements Accumulator {
 
-  private boolean hasCandidateResult;
-  private TsPrimitiveType firstValue;
-  private long minTime = Long.MAX_VALUE;
+  protected boolean hasCandidateResult;
+  protected TsPrimitiveType firstValue;
+  protected long minTime = Long.MAX_VALUE;
 
   public FirstValueAccumulator(TSDataType seriesDataType) {
     firstValue = TsPrimitiveType.getByType(seriesDataType);
@@ -99,7 +99,7 @@ public class FirstValueAccumulator implements Accumulator {
     return firstValue.getDataType();
   }
 
-  private void updateFirstValue(Object value, long curTime) {
+  protected void updateFirstValue(Object value, long curTime) {
     hasCandidateResult = true;
     if (curTime < minTime) {
       minTime = curTime;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueDescAccumulator.java
new file mode 100644
index 0000000000..87b939438e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueDescAccumulator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.operator.aggregation;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public class FirstValueDescAccumulator extends FirstValueAccumulator {
+
+  public FirstValueDescAccumulator(TSDataType seriesDataType) {
+    super(seriesDataType);
+  }
+
+  // Column should be like: | Time | Value |
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax()) {
+        updateFirstValue(column[1].getObject(0), curTime);
+      }
+    }
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
index 1ecd65ae61..901759b687 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
@@ -28,8 +28,8 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 public class LastValueAccumulator implements Accumulator {
 
-  private TsPrimitiveType lastValue;
-  private long maxTime = Long.MIN_VALUE;
+  protected TsPrimitiveType lastValue;
+  protected long maxTime = Long.MIN_VALUE;
 
   public LastValueAccumulator(TSDataType seriesDataType) {
     lastValue = TsPrimitiveType.getByType(seriesDataType);
@@ -100,7 +100,7 @@ public class LastValueAccumulator implements Accumulator {
     return lastValue.getDataType();
   }
 
-  private void updateLastValue(Object value, long curTime) {
+  protected void updateLastValue(Object value, long curTime) {
     if (curTime > maxTime) {
       maxTime = curTime;
       lastValue.setObject(value);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueDescAccumulator.java
new file mode 100644
index 0000000000..3d3f61644f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueDescAccumulator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.operator.aggregation;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public class LastValueDescAccumulator extends LastValueAccumulator {
+
+  private boolean hasCandidateResult = false;
+
+  public LastValueDescAccumulator(TSDataType seriesDataType) {
+    super(seriesDataType);
+  }
+
+  // Column should be like: | Time | Value |
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    long curTime = column[0].getLong(0);
+    if (curTime < timeRange.getMax() && curTime >= timeRange.getMin()) {
+      updateLastValue(column[1].getObject(0), curTime);
+    }
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return hasCandidateResult;
+  }
+
+  protected void updateLastValue(Object value, long curTime) {
+    hasCandidateResult = true;
+    super.updateLastValue(value, curTime);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java
index 3addbf26d9..cda25af4a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 
 public class MaxTimeAccumulator implements Accumulator {
 
-  private long maxTime = Long.MIN_VALUE;
+  protected long maxTime = Long.MIN_VALUE;
 
   public MaxTimeAccumulator() {}
 
@@ -93,7 +93,7 @@ public class MaxTimeAccumulator implements Accumulator {
     return TSDataType.INT64;
   }
 
-  private void updateMaxTime(long curTime) {
+  protected void updateMaxTime(long curTime) {
     maxTime = Math.max(maxTime, curTime);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeDescAccumulator.java
new file mode 100644
index 0000000000..01fb65f541
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeDescAccumulator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.operator.aggregation;
+
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public class MaxTimeDescAccumulator extends MaxTimeAccumulator {
+
+  private boolean hasCandidateResult = false;
+
+  // Column should be like: | Time |
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    long curTime = column[0].getLong(0);
+    if (curTime < timeRange.getMax() && curTime >= timeRange.getMin()) {
+      updateMaxTime(curTime);
+    }
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return hasCandidateResult;
+  }
+
+  protected void updateMaxTime(long curTime) {
+    hasCandidateResult = true;
+    super.updateMaxTime(curTime);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
index 893d8436eb..b80adbe470 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
@@ -27,8 +27,8 @@ import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 
 public class MinTimeAccumulator implements Accumulator {
 
-  private boolean hasCandidateResult;
-  private long minTime = Long.MAX_VALUE;
+  protected boolean hasCandidateResult;
+  protected long minTime = Long.MAX_VALUE;
 
   public MinTimeAccumulator() {}
 
@@ -92,7 +92,7 @@ public class MinTimeAccumulator implements Accumulator {
     return TSDataType.INT64;
   }
 
-  private void updateMinTime(long curTime) {
+  protected void updateMinTime(long curTime) {
     hasCandidateResult = true;
     minTime = Math.min(minTime, curTime);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeDescAccumulator.java
new file mode 100644
index 0000000000..cb9136eca9
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeDescAccumulator.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.operator.aggregation;
+
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public class MinTimeDescAccumulator extends MinTimeAccumulator {
+
+  // Column should be like: | Time |
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax()) {
+        updateMinTime(curTime);
+      }
+    }
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
index c21314b5d9..2b4d7c4c7c 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
@@ -93,9 +93,9 @@ public class SeriesAggregateScanOperatorTest {
         initSeriesAggregateScanOperator(
             Collections.singletonList(
                 new Aggregator(
-                    AccumulatorFactory.createAccumulator(AggregationType.COUNT, TSDataType.INT32),
-                    AggregationStep.SINGLE,
-                    null)),
+                    AccumulatorFactory.createAccumulator(
+                        AggregationType.COUNT, TSDataType.INT32, true),
+                    AggregationStep.SINGLE)),
             null,
             true,
             null);


[iotdb] 03/09: add processTsBlocks

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch aggregator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e9a1b58d1b32fee0924e5a8e8f0a519918f0e612
Author: Alima777 <wx...@gmail.com>
AuthorDate: Fri Apr 29 09:55:01 2022 +0800

    add processTsBlocks
---
 .../db/mpp/operator/aggregation/Aggregator.java    | 28 ++++++++++++++++++++--
 .../operator/aggregation/AggregatorFactory.java    | 14 +----------
 2 files changed, 27 insertions(+), 15 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
index 94fe9080cb..8a05f460c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 
 import java.util.List;
@@ -32,7 +33,8 @@ import java.util.List;
 public class Aggregator {
 
   private final Accumulator accumulator;
-  private final List<InputLocation> inputLocationList;
+  // In some intermediate result input, inputLocation[] should include two columns
+  private final List<InputLocation[]> inputLocationList;
   private final AggregationStep step;
   private final TSDataType intermediateType;
   private final TSDataType finalType;
@@ -42,7 +44,7 @@ public class Aggregator {
   public Aggregator(
       Accumulator accumulator,
       AggregationStep step,
-      List<InputLocation> inputLocationList,
+      List<InputLocation[]> inputLocationList,
       TSDataType intermediateType,
       TSDataType finalType) {
     this.accumulator = accumulator;
@@ -52,6 +54,7 @@ public class Aggregator {
     this.finalType = finalType;
   }
 
+  // Used for SeriesAggregateScanOperator
   public void processTsBlock(TsBlock tsBlock) {
     if (step.isInputRaw()) {
       accumulator.addInput(tsBlock.getTimeAndValueColumn(0), timeRange);
@@ -60,6 +63,27 @@ public class Aggregator {
     }
   }
 
+  // Used for aggregateOperator
+  public void processTsBlocks(TsBlock[] tsBlock) {
+    for (InputLocation[] inputLocations : inputLocationList) {
+      if (step.isInputRaw()) {
+        TsBlock rawTsBlock = tsBlock[inputLocations[0].getTsBlockIndex()];
+        Column[] timeValueColumn = new Column[2];
+        timeValueColumn[0] = rawTsBlock.getTimeColumn();
+        timeValueColumn[1] = rawTsBlock.getColumn(inputLocations[0].getValueColumnIndex());
+        accumulator.addInput(timeValueColumn, timeRange);
+      } else {
+        Column[] columns = new Column[inputLocations.length];
+        for (int i = 0; i < inputLocations.length; i++) {
+          columns[i] =
+              tsBlock[inputLocations[i].getTsBlockIndex()].getColumn(
+                  inputLocations[i].getValueColumnIndex());
+        }
+        accumulator.addIntermediate(columns);
+      }
+    }
+  }
+
   public void outputResult(ColumnBuilder[] columnBuilder) {
     if (step.isOutputPartial()) {
       accumulator.outputIntermediate(columnBuilder);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
index ce493b3256..fb49e8c8c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java
@@ -19,16 +19,4 @@
 
 package org.apache.iotdb.db.mpp.operator.aggregation;
 
-public class AggregatorFactory {
-
-  public static Aggregator createAggregator() {
-    Accumulator accumulator;
-    if (step.isInputRaw()) {
-      accumulator = accumulatorFactory.createAccumulator(lambdaProviders);
-    } else {
-      accumulator = accumulatorFactory.createIntermediateAccumulator(lambdaProviders);
-    }
-    return new Aggregator(
-        accumulator, step, intermediateType, finalType, inputChannels, maskChannel);
-  }
-}
+public class AggregatorFactory {}