You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2022/03/21 02:33:26 UTC

[iotdb] branch new_mpp updated (50d3bd9 -> 0b5356f)

This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a change to branch new_mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


 discard 50d3bd9  [To_new_mpp] data block manager interface (#5280)
    omit 1feea78  Sync with master
    omit 0ef2b42  Fragment schedule develop
    omit 6e4e2cb  [To_new_mpp] add IDataBlockManager (#5240)
    omit 4a85d31  [To_new_mpp] add common classes & thrift files (#5239)
    omit d31733f  [To_new_mpp] Basic query memory control (#5216)
    omit 678f712  Basic implementation of FragmentInstanceManager
     add 3c22c12  [IOTDB-2675][IOTDB-2524] Manage Metadata by Storage Group && Support alias, tag and attributes on aligned timeseries (#5205)
     add 3deadae  [IOTDB-2752] reconstructing the start and stop command (#5261)
     add 60dc2d4  Open idtable configuration (#5269)
     add 2058744  [IOTDB-2675] Rename MManager to SchemaEngine and SGMManager to SchemaRegion (#5265)
     add 31108de  [IOTDB-2755] Fix tests of flink-tsfile-connector not executed in CI (#5268)
     add 842b868  Add Concurrency Control On CI (#5262)
     add d29bc89  [IOTDB-2754] SessionPool auto redirect IoTDB instance (#5267)
     add dc38c7a  [IOTDB-2609] A new lossy encoding method based on frequency domain (#5118)
     add e06b9e1  [IOTDB-2733] Supplement the doc of compiling source code (#5276)
     add b3fff9f  [IOTDB-2764] Refine the consensus layer framework and add examples (#5277)
     add 95164c1  [IOTDB-2765] Fix grafana plugin compile error on lastest arm macos (#5282)
     add 5a699d0  Fix sonar-coveralls aways be cancelled (#5283)
     add d8fd5b3  [IOTDB-2730] Config node server (#5284)
     add 00f1005  [IOTDB-2713] Generate statement from AST for MPP query - Part 1 (#5288)
     add c83ccfa  [IOTDB-2767] Add Operators and PlanVisitor definitions for mpp (#5286)
     add a389890  Update 0.13.0 download links (#5290)
     add fb08f7b  [IOTDB-1808] Compatibility of Apache IoTDB with InfluxDB - Query Func (#5104)
     add 4e7687b  [IOTDB-2730] start config service (#5287)
     add 49938f4  [IOTDB-2772] Fix influxdb build error (#5292)
     new 736bd02  Basic implementation of FragmentInstanceManager
     new 199930b  [To_new_mpp] Basic query memory control (#5216)
     new 333c9f0  [To_new_mpp] add common classes & thrift files (#5239)
     new 93722e8  [To_new_mpp] add IDataBlockManager (#5240)
     new 8fb27a2  Fragment schedule develop
     new 4da5757  Sync with master
     new 0b5356f  Implement phase 1

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (50d3bd9)
            \
             N -- N -- N   refs/heads/new_mpp (0b5356f)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/client-go.yml                    |     4 +
 .github/workflows/client.yml                       |     4 +
 .github/workflows/cluster.yml                      |     4 +
 .github/workflows/e2e.yml                          |     4 +
 .github/workflows/grafana-plugin.yml               |     7 +-
 .github/workflows/influxdb-protocol.yml            |     4 +
 .github/workflows/main-unix.yml                    |     4 +
 .github/workflows/main-win.yml                     |     4 +
 .github/workflows/sonar-coveralls.yml              |    11 +-
 .../apache/iotdb/db/qp/sql/InfluxDBSqlParser.g4    |     2 +
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |    16 +-
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |    11 +-
 client-cpp/src/main/Session.h                      |     3 +-
 client-py/iotdb/utils/IoTDBConstants.py            |     1 +
 .../java/org/apache/iotdb/cluster/ClientMain.java  |     2 +-
 .../org/apache/iotdb/cluster/ClusterIoTDB.java     |    81 +-
 .../cluster/ClusterIoTDBServerCommandLine.java     |    94 +
 .../cluster/client/async/AsyncDataClient.java      |     2 +-
 .../cluster/client/async/AsyncMetaClient.java      |     2 +-
 .../iotdb/cluster/client/sync/SyncDataClient.java  |     2 +-
 .../iotdb/cluster/client/sync/SyncMetaClient.java  |     2 +-
 .../iotdb/cluster/config/ClusterConstant.java      |     2 +-
 .../iotdb/cluster/config/ClusterDescriptor.java    |     2 +-
 .../iotdb/cluster/coordinator/Coordinator.java     |    14 +-
 .../apache/iotdb/cluster/log/LogDispatcher.java    |     6 +-
 .../cluster/log/applier/AsyncDataLogApplier.java   |     8 +-
 .../iotdb/cluster/log/applier/BaseApplier.java     |     2 +-
 .../iotdb/cluster/log/applier/DataLogApplier.java  |     6 +-
 .../iotdb/cluster/log/catchup/CatchUpTask.java     |     2 +-
 .../iotdb/cluster/log/catchup/LogCatchUpTask.java  |     4 +-
 .../cluster/log/manage/CommittedEntryManager.java  |     2 +-
 .../log/manage/MetaSingleSnapshotLogManager.java   |     2 +-
 .../log/manage/PartitionedSnapshotLogManager.java  |     4 +-
 .../iotdb/cluster/log/manage/RaftLogManager.java   |     4 +-
 .../log/manage/UnCommittedEntryManager.java        |     2 +-
 .../serializable/SyncLogDequeSerializer.java       |     4 +-
 .../cluster/log/snapshot/MetaSimpleSnapshot.java   |     4 +-
 .../{CMManager.java => CSchemaEngine.java}         |    22 +-
 .../apache/iotdb/cluster/metadata/MetaPuller.java  |    10 +-
 .../iotdb/cluster/partition/PartitionTable.java    |     4 +-
 .../cluster/query/ClusterPhysicalGenerator.java    |     8 +-
 .../iotdb/cluster/query/ClusterPlanExecutor.java   |    26 +-
 .../iotdb/cluster/query/ClusterPlanRouter.java     |    18 +-
 .../iotdb/cluster/query/LocalQueryExecutor.java    |    31 +-
 .../iotdb/cluster/query/filter/SlotSgFilter.java   |     2 +-
 .../query/last/ClusterLastQueryExecutor.java       |     2 +-
 .../cluster/query/reader/ClusterTimeGenerator.java |     6 +-
 .../iotdb/cluster/server/ClusterRPCService.java    |    10 +-
 .../cluster/server/ClusterRPCServiceMBean.java     |     2 +-
 .../cluster/server/PullSnapshotHintService.java    |     2 +-
 .../server/clusterinfo/ClusterInfoServer.java      |    10 +-
 .../cluster/server/member/DataGroupMember.java     |    14 +-
 .../cluster/server/member/MetaGroupMember.java     |    10 +-
 .../iotdb/cluster/server/member/RaftMember.java    |    10 +-
 .../cluster/server/monitor/NodeStatusManager.java  |     2 +-
 .../cluster/server/raft/AbstractRaftService.java   |     6 +-
 .../server/raft/DataRaftHeartBeatService.java      |     8 +-
 .../iotdb/cluster/server/raft/DataRaftService.java |     8 +-
 .../server/raft/MetaRaftHeartBeatService.java      |     8 +-
 .../iotdb/cluster/server/raft/MetaRaftService.java |     8 +-
 .../cluster/server/service/DataAsyncService.java   |    14 +-
 .../cluster/server/service/DataGroupEngine.java    |     8 +-
 .../cluster/server/service/DataSyncService.java    |    12 +-
 .../iotdb/cluster/utils/ClusterQueryUtils.java     |     2 +-
 .../apache/iotdb/cluster/utils/ClusterUtils.java   |     4 +-
 .../apache/iotdb/cluster/utils/PartitionUtils.java |     2 -
 .../apache/iotdb/cluster/utils/PlanSerializer.java |     4 +-
 .../cluster/utils/nodetool/ClusterMonitor.java     |    16 +-
 .../org/apache/iotdb/cluster/common/IoTDBTest.java |     4 +-
 .../iotdb/cluster/integration/SingleNodeTest.java  |     2 +-
 .../iotdb/cluster/log/LogDispatcherTest.java       |     2 +-
 .../log/applier/AsyncDataLogApplierTest.java       |     6 +-
 .../cluster/log/applier/DataLogApplierTest.java    |    20 +-
 .../cluster/log/applier/MetaLogApplierTest.java    |    16 +-
 .../iotdb/cluster/log/catchup/CatchUpTaskTest.java |     4 +-
 .../cluster/log/catchup/LogCatchUpTaskTest.java    |     2 +-
 .../manage/MetaSingleSnapshotLogManagerTest.java   |     2 +-
 .../serializable/SyncLogDequeSerializerTest.java   |     2 +-
 .../cluster/log/snapshot/DataSnapshotTest.java     |     4 +-
 .../cluster/log/snapshot/FileSnapshotTest.java     |     8 +-
 .../log/snapshot/MetaSimpleSnapshotTest.java       |     9 +-
 .../log/snapshot/PartitionedSnapshotTest.java      |     5 +-
 .../cluster/log/snapshot/PullSnapshotTaskTest.java |     4 +-
 ...agerWhiteBox.java => SchemaEngineWhiteBox.java} |    20 +-
 .../cluster/partition/SlotPartitionTableTest.java  |    28 +-
 .../cluster/query/ClusterPlanExecutorTest.java     |     2 +-
 .../query/ClusterUDTFQueryExecutorTest.java        |     2 +-
 .../clusterinfo/ClusterInfoServiceImplTest.java    |     4 +-
 .../iotdb/cluster/server/member/BaseMember.java    |    12 +-
 .../cluster/server/member/DataGroupMemberTest.java |     4 +-
 .../cluster/server/member/MetaGroupMemberTest.java |    22 +-
 confignode/pom.xml                                 |     6 -
 .../resources/conf/iotdb-confignode.properties     |    69 +-
 .../assembly/resources/sbin/start-confignode.bat   |     3 +-
 .../assembly/resources/sbin/start-confignode.sh    |     2 +-
 .../iotdb/confignode/conf/ConfigNodeConf.java      |   144 +-
 .../iotdb/confignode/conf/ConfigNodeConfCheck.java |    39 +-
 .../confignode/conf/ConfigNodeDescriptor.java      |   117 +-
 .../conf/RepeatConfigurationException.java         |    32 -
 .../exception/startup/StartupException.java        |    33 -
 .../iotdb/confignode/manager/ConfigManager.java    |     2 +-
 .../iotdb/confignode/service/ConfigNode.java       |    56 +-
 .../confignode/service/ConfigNodeCommandLine.java  |    81 +
 .../iotdb/confignode/service/ConfigNodeMBean.java  |     6 +-
 .../confignode/service/register/IService.java      |    51 -
 .../confignode/service/register/JMXService.java    |   105 -
 .../service/register/RegisterManager.java          |    82 -
 .../confignode/service/startup/StartupCheck.java   |    28 -
 .../confignode/service/startup/StartupChecks.java  |    89 -
 .../service/thrift/server/ConfigNodeRPCServer.java |    88 +-
 .../thrift/server/ConfigNodeRPCServerMBean.java    |     4 +-
 ...rver.java => ConfigNodeRPCServerProcessor.java} |    28 +-
 .../thrift/server/ConfigNodeRPCServiceHandler.java |    52 +
 .../utils/ConfigNodeEnvironmentUtils.java          |    11 +-
 .../org/apache/iotdb/consensus/IConsensus.java     |     1 +
 ...equest.java => ByteBufferConsensusRequest.java} |    21 +-
 .../common/request/IConsensusRequest.java          |     2 -
 .../consensus/standalone/StandAloneServerImpl.java |     8 +-
 .../consensus/statemachine/EmptyStateMachine.java  |     2 +-
 .../standalone/StandAloneConsensusTest.java        |    30 +-
 docs/Download/README.md                            |    22 +-
 docs/UserGuide/API/Programming-Java-Native-API.md  |     1 +
 docs/UserGuide/Data-Concept/Encoding.md            |    14 +-
 .../Maintenance-Tools/Maintenance-Command.md       |     8 -
 docs/UserGuide/Operate-Metadata/Timeseries.md      |     2 +-
 docs/UserGuide/Query-Data/Overview.md              |    27 +-
 docs/UserGuide/QuickStart/WayToGetIoTDB.md         |    17 +-
 docs/UserGuide/Reference/Config-Manual.md          |    18 +
 docs/UserGuide/Reference/SQL-Reference.md          |     5 -
 docs/zh/Download/README.md                         |    22 +-
 .../UserGuide/API/Programming-Java-Native-API.md   |     3 +-
 docs/zh/UserGuide/Data-Concept/Encoding.md         |    14 +-
 .../Maintenance-Tools/Maintenance-Command.md       |     7 -
 docs/zh/UserGuide/Operate-Metadata/Timeseries.md   |     2 +-
 docs/zh/UserGuide/Query-Data/Overview.md           |    27 +-
 docs/zh/UserGuide/QuickStart/WayToGetIoTDB.md      |    17 +-
 docs/zh/UserGuide/Reference/Config-Manual.md       |    22 +-
 docs/zh/UserGuide/Reference/SQL-Reference.md       |     6 -
 .../java/org/apache/iotdb/SessionPoolExample.java  |    42 +-
 ...=> RowTSRecordOutputFormatIntegrationTest.java} |     2 +-
 ...va => RowTsFileInputFormatIntegrationTest.java} |    58 +-
 .../util/TSFileConfigUtilCompletenessTest.java     |     4 +-
 grafana-plugin/package.json                        |     4 +-
 grafana-plugin/src/componments/ControlValue.tsx    |     5 +-
 grafana-plugin/src/componments/FromValue.tsx       |     8 +-
 grafana-plugin/src/componments/SelectValue.tsx     |     8 +-
 grafana-plugin/src/componments/WhereValue.tsx      |     5 +-
 grafana-plugin/src/datasource.ts                   |    16 +-
 grafana-plugin/yarn.lock                           | 10529 +++++++++----------
 .../org/apache/iotdb/influxdb/IoTDBInfluxDB.java   |     4 +-
 .../iotdb/influxdb/example/InfluxDBExample.java    |    38 +-
 .../protocol/impl/IoTDBInfluxDBService.java        |    22 +-
 .../iotdb/influxdb/session/InfluxDBSession.java    |    29 +
 .../influxdb/integration/IoTDBInfluxDBIT.java      |    77 +-
 .../iotdb/db/integration/IoTDBArithmeticIT.java    |    18 +-
 .../iotdb/db/integration/IoTDBCheckConfigIT.java   |     6 +-
 .../db/integration/IoTDBCreateSnapshotIT.java      |   180 -
 .../apache/iotdb/db/integration/IoTDBDaemonIT.java |     2 +-
 .../iotdb/db/integration/IoTDBEncodingIT.java      |    76 +
 .../iotdb/db/integration/IoTDBLargeDataIT.java     |     2 +-
 .../apache/iotdb/db/integration/IoTDBLastIT.java   |    14 +-
 .../iotdb/db/integration/IoTDBMetadataFetchIT.java |    47 +-
 .../iotdb/db/integration/IoTDBMultiSeriesIT.java   |     2 +-
 .../iotdb/db/integration/IoTDBNestedQueryIT.java   |    12 +-
 .../db/integration/IoTDBRecoverUnclosedIT.java     |     2 +-
 .../iotdb/db/integration/IoTDBSelectIntoIT.java    |    18 +-
 .../iotdb/db/integration/IoTDBSimpleQueryIT.java   |     8 +-
 .../db/integration/IoTDBTriggerExecutionIT.java    |    26 +-
 .../db/integration/IoTDBTriggerManagementIT.java   |     8 +-
 .../iotdb/db/integration/IoTDBUDFManagementIT.java |    12 +-
 .../iotdb/session/IoTDBSessionComplexIT.java       |     2 +-
 .../iotdb/session/IoTDBSessionIteratorIT.java      |     2 +-
 .../apache/iotdb/session/IoTDBSessionSimpleIT.java |     6 +-
 .../session/IoTDBSessionSyntaxConventionIT.java    |     2 +-
 integration/src/test/resources/logback.xml         |     4 +-
 iotdb-commons/pom.xml                              |    32 -
 {tsfile => node-commons}/pom.xml                   |    45 +-
 .../apache/iotdb/commons/ServerCommandLine.java    |    67 +
 .../apache/iotdb/commons}/concurrent/HashLock.java |     2 +-
 .../concurrent/IoTDBDaemonThreadFactory.java       |     2 +-
 .../IoTDBDefaultThreadExceptionHandler.java        |     2 +-
 .../concurrent/IoTDBThreadPoolFactory.java         |    10 +-
 .../commons}/concurrent/IoTThreadFactory.java      |     2 +-
 .../iotdb/commons}/concurrent/ThreadName.java      |     9 +-
 .../iotdb/commons}/concurrent/WrappedRunnable.java |     2 +-
 .../concurrent/threadpool/IThreadPoolMBean.java    |     2 +-
 .../WrappedScheduledExecutorService.java           |     6 +-
 .../WrappedScheduledExecutorServiceMBean.java      |     2 +-
 .../WrappedSingleThreadExecutorService.java        |     6 +-
 .../WrappedSingleThreadExecutorServiceMBean.java   |     2 +-
 .../WrappedSingleThreadScheduledExecutor.java      |     6 +-
 .../WrappedSingleThreadScheduledExecutorMBean.java |     2 +-
 .../threadpool/WrappedThreadPoolExecutor.java      |     8 +-
 .../threadpool/WrappedThreadPoolExecutorMBean.java |     2 +-
 .../apache/iotdb/commons}/conf/IoTDBConstant.java  |     2 +-
 .../commons}/exception/ConfigurationException.java |     2 +-
 .../iotdb/commons}/exception/IoTDBException.java   |     2 +-
 .../commons}/exception/ShutdownException.java      |     2 +-
 .../iotdb/commons}/exception/StartupException.java |     2 +-
 .../exception/runtime/RPCServiceException.java     |     2 +-
 .../apache/iotdb/commons/hash/APHashExecutor.java  |     0
 .../iotdb/commons/hash/BKDRHashExecutor.java       |     0
 .../commons/hash/DeviceGroupHashExecutor.java      |     0
 .../apache/iotdb/commons/hash/JSHashExecutor.java  |     0
 .../iotdb/commons/hash/SDBMHashExecutor.java       |     0
 .../service/AbstractThriftServiceThread.java       |    35 +-
 .../apache/iotdb/commons}/service/IService.java    |     6 +-
 .../apache/iotdb/commons}/service/JMXService.java  |     4 +-
 .../iotdb/commons}/service/RegisterManager.java    |     8 +-
 .../apache/iotdb/commons}/service/ServiceType.java |    10 +-
 .../iotdb/commons}/service/StartupCheck.java       |     4 +-
 .../iotdb/commons}/service/StartupChecks.java      |    10 +-
 .../iotdb/commons/service}/ThriftService.java      |    12 +-
 .../iotdb/commons/service/ThriftServiceThread.java |    89 +
 .../apache/iotdb/commons/utils/JVMCommonUtils.java |    81 +
 .../org/apache/iotdb/commons}/utils/TestOnly.java  |     2 +-
 .../IoTDBDefaultThreadExceptionHandlerTest.java    |     2 +-
 .../iotdb/commons}/IoTDBThreadPoolFactoryTest.java |     5 +-
 pom.xml                                            |     2 +-
 .../resources/conf/iotdb-engine.properties         |    31 +-
 .../org/apache/iotdb/db/auth/AuthorityChecker.java |     2 +-
 .../iotdb/db/auth/authorizer/BasicAuthorizer.java  |     8 +-
 .../iotdb/db/auth/role/BasicRoleManager.java       |     2 +-
 .../iotdb/db/auth/role/LocalFileRoleAccessor.java  |     2 +-
 .../iotdb/db/auth/user/BasicUserManager.java       |     2 +-
 .../iotdb/db/auth/user/LocalFileUserAccessor.java  |     2 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |    55 +-
 .../org/apache/iotdb/db/conf/IoTDBConfigCheck.java |     7 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |    49 +-
 .../db/conf/directories/DirectoryManager.java      |     2 +-
 .../directories/strategy/DirectoryStrategy.java    |     4 +-
 .../strategy/MaxDiskUsableSpaceFirstStrategy.java  |     6 +-
 .../MinFolderOccupiedSpaceFirstStrategy.java       |     6 +-
 .../strategy/RandomOnDiskUsableSpaceStrategy.java  |     4 +-
 .../directories/strategy/SequenceStrategy.java     |     6 +-
 .../db/conf/rest/IoTDBRestServiceDescriptor.java   |     2 +-
 .../{ConsensusMain.java => ConsensusExample.java}  |    32 +-
 .../consensus/statemachine/BaseStateMachine.java   |    74 +
 .../DataRegionStateMachine.java}                   |    23 +-
 .../SchemaRegionStateMachine.java}                 |    23 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |    32 +-
 .../iotdb/db/engine/cache/BloomFilterCache.java    |     2 +-
 .../db/engine/cache/CacheHitRatioMonitor.java      |    10 +-
 .../apache/iotdb/db/engine/cache/ChunkCache.java   |     2 +-
 .../db/engine/cache/TimeSeriesMetadataCache.java   |     4 +-
 .../engine/compaction/CompactionTaskManager.java   |    14 +-
 .../db/engine/compaction/CompactionUtils.java      |     6 +-
 .../db/engine/compaction/TsFileIdentifier.java     |     2 +-
 .../CrossSpaceCompactionExceptionHandler.java      |     2 +-
 .../RewriteCrossSpaceCompactionSelector.java       |     2 +-
 .../task/RewriteCrossCompactionRecoverTask.java    |     2 +-
 .../task/RewriteCrossSpaceCompactionTask.java      |     2 +-
 .../inner/AbstractInnerSpaceCompactionTask.java    |     2 +-
 .../InnerSpaceCompactionExceptionHandler.java      |     2 +-
 .../SizeTieredCompactionRecoverTask.java           |     2 +-
 .../sizetiered/SizeTieredCompactionSelector.java   |     2 +-
 .../inner/sizetiered/SizeTieredCompactionTask.java |     2 +-
 .../inner/utils/InnerSpaceCompactionUtils.java     |     4 +-
 .../compaction/task/AbstractCompactionTask.java    |     2 +-
 .../compaction/task/CompactionRecoverTask.java     |     2 +-
 .../utils/log/CompactionLogAnalyzer.java           |     2 +-
 .../engine/cq/ContinuousQuerySchemaCheckTask.java  |     2 +-
 .../iotdb/db/engine/cq/ContinuousQueryService.java |    10 +-
 .../iotdb/db/engine/cq/ContinuousQueryTask.java    |     4 +-
 .../engine/cq/ContinuousQueryTaskPoolManager.java  |     4 +-
 .../apache/iotdb/db/engine/flush/FlushManager.java |    10 +-
 .../engine/flush/pool/FlushSubTaskPoolManager.java |     4 +-
 .../db/engine/flush/pool/FlushTaskPoolManager.java |     4 +-
 .../apache/iotdb/db/engine/settle/SettleTask.java  |     2 +-
 .../db/engine/storagegroup/StorageGroupInfo.java   |     2 +-
 .../db/engine/storagegroup/TsFileManager.java      |     2 +-
 .../engine/storagegroup/TsFileNameGenerator.java   |     4 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |     2 +-
 .../db/engine/storagegroup/TsFileResource.java     |     4 +-
 .../db/engine/storagegroup/TsFileResourceList.java |     2 +-
 .../storagegroup/VirtualStorageGroupProcessor.java |    24 +-
 .../virtualSg/StorageGroupManager.java             |     2 +-
 .../engine/trigger/executor/TriggerExecutor.java   |     2 +-
 .../service/TriggerRegistrationService.java        |    26 +-
 .../trigger/sink/local/LocalIoTDBHandler.java      |     6 +-
 .../iotdb/db/engine/upgrade/UpgradeTask.java       |     2 +-
 .../iotdb/db/exception/LoadFileException.java      |     1 +
 .../apache/iotdb/db/exception/MergeException.java  |     1 +
 .../db/exception/QueryIdNotExsitException.java     |     1 +
 .../exception/QueryInBatchStatementException.java  |     1 +
 .../iotdb/db/exception/StorageEngineException.java |     1 +
 .../exception/StorageGroupProcessorException.java  |     1 +
 .../db/exception/SyncConnectionException.java      |     1 +
 .../SyncDeviceOwnerConflictException.java          |     1 +
 .../iotdb/db/exception/SystemCheckException.java   |     1 +
 .../db/exception/TsFileProcessorException.java     |     1 +
 .../iotdb/db/exception/WriteProcessException.java  |     1 +
 .../db/exception/index/IndexManagerException.java  |     2 +-
 .../db/exception/metadata/MetadataException.java   |     2 +-
 .../SchemaDirCreationFailureException.java}        |    10 +-
 .../exception/query/LogicalOperatorException.java  |     2 +-
 .../exception/query/LogicalOptimizeException.java  |     2 +-
 .../db/exception/query/QueryProcessException.java  |     2 +-
 .../{runtime => sql}/SQLParserException.java       |     2 +-
 .../StatementAnalyzeException.java}                |    21 +-
 .../db/metadata/IStorageGroupSchemaManager.java    |   232 +
 .../apache/iotdb/db/metadata/MetadataConstant.java |     5 +-
 .../org/apache/iotdb/db/metadata/SchemaEngine.java |  1736 +++
 .../metadata/{MManager.java => SchemaRegion.java}  |  1238 +--
 .../db/metadata/StorageGroupSchemaManager.java     |   251 +
 .../idtable/AppendOnlyDiskSchemaManager.java       |     2 +-
 .../apache/iotdb/db/metadata/idtable/IDTable.java  |     2 +-
 .../db/metadata/idtable/IDTableHashmapImpl.java    |    13 +-
 .../iotdb/db/metadata/idtable/IDTableManager.java  |     2 +-
 .../db/metadata/idtable/IDiskSchemaManager.java    |     2 +-
 .../db/metadata/idtable/entry/DeviceEntry.java     |     2 +-
 .../db/metadata/idtable/entry/DeviceIDFactory.java |     2 +-
 .../idtable/entry/InsertMeasurementMNode.java      |     9 +-
 .../db/metadata/idtable/entry/SchemaEntry.java     |     2 +-
 .../db/metadata/lastCache/LastCacheManager.java    |     6 +-
 .../iotdb/db/metadata/logfile/MLogReader.java      |     2 +-
 .../iotdb/db/metadata/logfile/MLogTxtReader.java   |     2 +-
 .../iotdb/db/metadata/logfile/MLogUpgrader.java    |   290 -
 .../iotdb/db/metadata/mnode/EntityMNode.java       |    13 +
 .../org/apache/iotdb/db/metadata/mnode/IMNode.java |     7 +-
 .../db/metadata/mnode/IStorageGroupMNode.java      |     6 +
 .../iotdb/db/metadata/mnode/InternalMNode.java     |    38 +-
 .../org/apache/iotdb/db/metadata/mnode/MNode.java  |    16 +-
 .../iotdb/db/metadata/mnode/MeasurementMNode.java  |     3 +-
 .../db/metadata/mnode/StorageGroupEntityMNode.java |    23 +
 .../iotdb/db/metadata/mnode/StorageGroupMNode.java |    23 +
 .../iotdb/db/metadata/mtree/MTreeAboveSG.java      |   559 +
 .../mtree/{MTree.java => MTreeBelowSG.java}        |  1101 +-
 .../db/metadata/mtree/traverser/Traverser.java     |    90 +-
 .../MNodeAboveSGCollector.java}                    |    48 +-
 .../mtree/traverser/collector/MNodeCollector.java  |     2 +-
 .../traverser/collector/MeasurementCollector.java  |    20 +-
 ...lCounter.java => MNodeAboveSGLevelCounter.java} |    47 +-
 .../mtree/traverser/counter/MNodeLevelCounter.java |    10 +-
 .../counter/MeasurementGroupByLevelCounter.java    |    26 +
 .../apache/iotdb/db/metadata/path/AlignedPath.java |     2 +-
 .../iotdb/db/metadata/path/MeasurementPath.java    |     4 +-
 .../apache/iotdb/db/metadata/path/PartialPath.java |     6 +-
 .../db/metadata/rescon/TimeseriesStatistics.java   |   104 +
 .../apache/iotdb/db/metadata/tag/TagManager.java   |    27 +-
 .../template/TemplateLogReader.java}               |    33 +-
 .../db/metadata/template/TemplateLogWriter.java    |    64 +
 .../db/metadata/template/TemplateManager.java      |   125 +-
 .../db/metadata/upgrade/MetadataUpgrader.java      |   429 +
 .../iotdb/db/metadata/utils/MetaFormatUtils.java   |    10 +-
 .../apache/iotdb/db/metadata/utils/MetaUtils.java  |     4 +-
 .../iotdb/db/mpp/buffer/DataBlockManager.java      |   116 -
 .../db/mpp/buffer/DataBlockManagerService.java     |    90 -
 .../DataBlockManagerServiceThriftHandler.java      |    44 -
 .../mpp/buffer/DataBlockServiceClientFactory.java  |    44 -
 .../iotdb/db/mpp/buffer/DataBlockServiceImpl.java  |    50 -
 .../iotdb/db/mpp/buffer/IDataBlockManager.java     |    80 +
 .../buffer/{ISinkHandle.java => SinkHandle.java}   |    35 +-
 .../apache/iotdb/db/mpp/buffer/SourceHandle.java   |    70 +-
 .../org/apache/iotdb/db/mpp/common/Analysis.java   |    13 +-
 .../{WithoutPolicy.java => FilterNullPolicy.java}  |     2 +-
 .../mpp/common/{TreeNode.java => FragmentId.java}  |    40 +-
 .../{QueryId.java => FragmentInstanceId.java}      |    11 +-
 .../org/apache/iotdb/db/mpp/common/QueryId.java    |    90 +-
 .../org/apache/iotdb/db/mpp/common/TsBlock.java    |    25 +
 .../FragmentInfo.java}                             |    33 +-
 .../db/mpp/execution/FragmentInstanceContext.java  |    67 +
 .../db/mpp/execution/FragmentInstanceState.java    |    68 +
 .../iotdb/db/mpp/execution/FragmentState.java      |    71 +
 .../iotdb/db/mpp/execution/QueryExecution.java     |    10 +-
 .../ClusterScheduler.java}                         |    34 +-
 .../scheduler/IScheduler.java}                     |    30 +-
 .../execution/scheduler/StandaloneScheduler.java   |    56 +
 .../iotdb/db/mpp/operator/OperatorContext.java     |     6 +-
 .../process/AggregateOperator.java}                |    33 +-
 .../process/DeviceMergeOperator.java}              |    34 +-
 .../process/FillOperator.java}                     |    34 +-
 .../process/FilterNullOperator.java}               |    33 +-
 .../process/GroupByLevelOperator.java}             |    33 +-
 .../db/mpp/operator/process/LimitOperator.java     |    75 +
 .../process/OffsetOperator.java}                   |    33 +-
 .../process/ProcessOperator.java}                  |    15 +-
 .../process/SortOperator.java}                     |    33 +-
 .../process/TimeJoinOperator.java}                 |    33 +-
 .../sink/FragmentSinkOperator.java}                |    42 +-
 .../sink/SinkOperator.java}                        |    42 +-
 .../source/SeriesAggregateScanOperator.java}       |    38 +-
 .../source/SeriesScanOperator.java}                |    39 +-
 .../source/SourceOperator.java}                    |    14 +-
 .../apache/iotdb/db/mpp/plan/node/PlanNodeId.java  |    34 -
 .../db/mpp/plan/node/process/FilterNullNode.java   |    38 -
 ...anceTaskExecutor.java => AbstractExecutor.java} |    40 +-
 .../iotdb/db/mpp/schedule/ExecutionContext.java    |    24 +-
 .../db/mpp/schedule/FragmentInstanceManager.java   |   270 +-
 .../mpp/schedule/FragmentInstanceTaskCallback.java |    27 -
 .../mpp/schedule/FragmentInstanceTaskExecutor.java |    66 +-
 .../schedule/FragmentInstanceTimeoutSentinel.java  |    66 +-
 .../db/mpp/schedule/IFragmentInstanceManager.java  |    23 +-
 .../iotdb/db/mpp/schedule/ITaskScheduler.java      |    77 +
 .../db/mpp/schedule/task/FragmentInstanceTask.java |    83 +-
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  |   132 +
 .../planner/optimization}/PlanOptimizer.java       |     7 +-
 .../planner}/plan/DistributedQueryPlan.java        |     7 +-
 .../planner}/plan/DistributionPlanner.java         |     2 +-
 .../{ => sql/planner}/plan/FragmentInstance.java   |     4 +-
 .../mpp/{ => sql/planner}/plan/LogicalPlanner.java |     4 +-
 .../{ => sql/planner}/plan/LogicalQueryPlan.java   |     7 +-
 .../mpp/{ => sql/planner}/plan/PlanFragment.java   |     7 +-
 .../mpp/{ => sql/planner}/plan/PlanFragmentId.java |     2 +-
 .../mpp/{ => sql/planner}/plan/node/PlanNode.java  |    31 +-
 .../planner/plan/node/PlanNodeId.java}             |    16 +-
 .../planner}/plan/node/PlanNodeIdAllocator.java    |     2 +-
 .../db/mpp/sql/planner/plan/node/PlanVisitor.java  |    76 +
 .../planner/plan/node/process/AggregateNode.java}  |    49 +-
 .../plan/node/process/DeviceMergeNode.java         |    38 +-
 .../planner}/plan/node/process/FillNode.java       |    27 +-
 .../sql/planner/plan/node/process/FilterNode.java  |    66 +
 .../planner/plan/node/process/FilterNullNode.java  |    69 +
 .../plan/node/process/GroupByLevelNode.java        |    48 +-
 .../planner}/plan/node/process/LimitNode.java      |    41 +-
 .../planner}/plan/node/process/OffsetNode.java     |    39 +-
 .../planner}/plan/node/process/ProcessNode.java    |    10 +-
 .../planner}/plan/node/process/SortNode.java       |    35 +-
 .../planner}/plan/node/process/TimeJoinNode.java   |    55 +-
 .../planner}/plan/node/sink/CsvSinkNode.java       |    17 +-
 .../planner}/plan/node/sink/FragmentSinkNode.java  |    17 +-
 .../{ => sql/planner}/plan/node/sink/SinkNode.java |     9 +-
 .../planner}/plan/node/sink/ThriftSinkNode.java    |    17 +-
 .../planner}/plan/node/source/CsvSourceNode.java   |    17 +-
 .../plan/node/source/SeriesAggregateScanNode.java} |    35 +-
 .../planner}/plan/node/source/SeriesScanNode.java  |    27 +-
 .../planner}/plan/node/source/SourceNode.java      |     9 +-
 .../tree/Expression.java}                          |     7 +-
 .../{InfluxDBConstant.java => InfluxConstant.java} |     6 +-
 .../influxdb/constant/InfluxSQLConstant.java       |     7 +
 .../protocol/influxdb/function/InfluxFunction.java |    58 +
 .../influxdb/function/InfluxFunctionFactory.java   |    62 +
 .../influxdb/function/InfluxFunctionValue.java}    |    31 +-
 .../function/aggregator/InfluxAggregator.java      |    25 +-
 .../function/aggregator/InfluxCountFunction.java   |    59 +
 .../function/aggregator/InfluxMeanFunction.java    |    72 +
 .../function/aggregator/InfluxMedianFunction.java  |    75 +
 .../function/aggregator/InfluxModeFunction.java    |    88 +
 .../function/aggregator/InfluxSpreadFunction.java  |    88 +
 .../function/aggregator/InfluxStddevFunction.java  |    68 +
 .../function/aggregator/InfluxSumFunction.java     |    68 +
 .../function/selector/InfluxFirstFunction.java     |    73 +
 .../function/selector/InfluxLastFunction.java      |    73 +
 .../function/selector/InfluxMaxFunction.java       |   102 +
 .../function/selector/InfluxMinFunction.java       |   102 +
 .../influxdb/function/selector/InfluxSelector.java |    57 +
 .../db/protocol/influxdb/handler/QueryHandler.java |   956 ++
 .../influxdb/meta/InfluxDBMetaManager.java         |    10 +-
 .../influxdb/operator/InfluxSelectComponent.java   |     8 +-
 .../db/protocol/influxdb/util/CommonUtils.java     |    36 +-
 .../influxdb/util/FieldUtils.java}                 |    52 +-
 .../db/protocol/influxdb/util/FilterUtils.java     |    83 +
 .../protocol/influxdb/util/QueryResultUtils.java   |   291 +
 .../db/protocol/influxdb/util/StringUtils.java     |    97 +
 .../apache/iotdb/db/protocol/rest/RestService.java |     6 +-
 .../db/protocol/rest/handler/ExceptionHandler.java |     2 +-
 .../protocol/rest/impl/GrafanaApiServiceImpl.java  |     2 +-
 .../db/protocol/rest/impl/PingApiServiceImpl.java  |     2 +-
 .../db/protocol/rest/impl/RestApiServiceImpl.java  |     2 +-
 .../main/java/org/apache/iotdb/db/qp/Planner.java  |     2 +-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |   173 +-
 .../db/qp/logical/crud/BasicFunctionOperator.java  |     2 +-
 .../db/qp/logical/crud/BasicOperatorType.java      |     4 +-
 .../db/qp/logical/crud/DeleteDataOperator.java     |     2 +-
 .../db/qp/logical/crud/FillQueryOperator.java      |     2 +-
 .../iotdb/db/qp/logical/crud/InsertOperator.java   |     2 +-
 .../iotdb/db/qp/logical/crud/QueryOperator.java    |     4 +-
 .../sys/CreateAlignedTimeSeriesOperator.java       |     4 +-
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |    15 +-
 .../iotdb/db/qp/physical/crud/InsertPlan.java      |     2 +-
 .../iotdb/db/qp/physical/crud/InsertRowPlan.java   |     4 +-
 .../iotdb/db/qp/physical/crud/QueryPlan.java       |     2 +-
 .../physical/sys/CreateAlignedTimeSeriesPlan.java  |    82 +-
 .../db/qp/physical/sys/CreateSnapshotPlan.java     |    56 -
 .../db/qp/physical/sys/CreateTemplatePlan.java     |     2 +-
 .../iotdb/db/qp/physical/sys/SetTemplatePlan.java  |     2 +-
 .../db/qp/physical/sys/UnsetTemplatePlan.java      |     2 +-
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |    31 +-
 .../iotdb/db/qp/strategy/LogicalGenerator.java     |     6 +-
 .../qp/strategy/optimizer/ConcatPathOptimizer.java |     4 +-
 .../apache/iotdb/db/qp/utils/DatetimeUtils.java    |     2 +-
 .../iotdb/db/qp/utils/GroupByLevelController.java  |     2 +-
 .../apache/iotdb/db/qp/utils/WildcardsRemover.java |     4 +-
 .../iotdb/db/query/control/QueryTimeManager.java   |     6 +-
 .../iotdb/db/query/control/SessionManager.java     |     2 +-
 .../db/query/control/SessionTimeoutManager.java    |     2 +-
 .../db/query/dataset/AlignByDeviceDataSet.java     |     4 +
 .../db/query/dataset/NonAlignEngineDataSet.java    |     2 +-
 .../dataset/RawQueryDataSetWithoutValueFilter.java |     2 +-
 .../iotdb/db/query/dataset/ShowDevicesDataSet.java |     8 +-
 .../db/query/dataset/ShowTimeseriesDataSet.java    |    18 +-
 .../query/dataset/groupby/GroupByTimeDataSet.java  |     2 +-
 .../groupby/GroupByWithValueFilterDataSet.java     |     2 +-
 .../db/query/executor/AggregationExecutor.java     |     2 +-
 .../iotdb/db/query/executor/LastQueryExecutor.java |    22 +-
 .../query/expression/unary/FunctionExpression.java |     2 +-
 .../iotdb/db/query/pool/QueryTaskManager.java      |     4 +-
 .../db/query/pool/RawQueryReadTaskPoolManager.java |     4 +-
 .../query/reader/series/AlignedSeriesReader.java   |     2 +-
 .../query/reader/series/SeriesAggregateReader.java |     2 +-
 .../reader/series/SeriesRawDataBatchReader.java    |     2 +-
 .../iotdb/db/query/reader/series/SeriesReader.java |     2 +-
 .../reader/series/SeriesReaderByTimestamp.java     |     2 +-
 .../row/SerializableRowRecordList.java             |     2 +-
 .../datastructure/tv/SerializableBinaryTVList.java |     2 +-
 .../tv/SerializableBooleanTVList.java              |     2 +-
 .../datastructure/tv/SerializableDoubleTVList.java |     2 +-
 .../datastructure/tv/SerializableFloatTVList.java  |     2 +-
 .../datastructure/tv/SerializableIntTVList.java    |     2 +-
 .../datastructure/tv/SerializableLongTVList.java   |     2 +-
 .../udf/service/TemporaryQueryDataFileService.java |     6 +-
 .../query/udf/service/UDFClassLoaderManager.java   |     6 +-
 .../query/udf/service/UDFRegistrationService.java  |     8 +-
 .../org/apache/iotdb/db/rescon/SystemInfo.java     |     2 +-
 .../iotdb/db/rescon/TsFileResourceManager.java     |     2 +-
 .../iotdb/db/service/InfluxDBRPCService.java       |     9 +-
 .../java/org/apache/iotdb/db/service/IoTDB.java    |    25 +-
 .../org/apache/iotdb/db/service/IoTDBMBean.java    |     4 +-
 .../org/apache/iotdb/db/service/MQTTService.java   |     2 +
 .../org/apache/iotdb/db/service/RPCService.java    |     9 +-
 .../apache/iotdb/db/service/RPCServiceMBean.java   |     2 +-
 .../org/apache/iotdb/db/service/SettleService.java |     6 +-
 .../org/apache/iotdb/db/service/StaticResps.java   |     6 +-
 .../org/apache/iotdb/db/service/UpgradeSevice.java |     4 +-
 .../db/service/basic/QueryFrequencyRecorder.java   |     2 +-
 .../iotdb/db/service/basic/ServiceProvider.java    |     2 +-
 .../iotdb/db/service/metrics/MetricsService.java   |    10 +-
 .../db/service/metrics/MetricsServiceMBean.java    |     2 +-
 .../service/thrift/impl/InfluxDBServiceImpl.java   |    14 +
 .../db/service/thrift/impl/TSServiceImpl.java      |    24 +-
 .../iotdb/db/sql/constant/FilterConstant.java      |   102 +
 .../iotdb/db/sql/constant/StatementType.java       |   134 +
 .../org/apache/iotdb/db/sql/parser/ASTVisitor.java |  1368 +++
 .../iotdb/db/sql/parser/StatementGenerator.java    |   184 +
 .../statement/AggregationQueryStatement.java}      |    35 +-
 .../statement/FillQueryStatement.java}             |    20 +-
 .../statement/GroupByFillQueryStatement.java}      |    28 +-
 .../db/sql/statement/GroupByQueryStatement.java    |    29 +-
 .../iotdb/db/sql/statement/LastQueryStatement.java |    14 +-
 .../iotdb/db/sql/statement/QueryStatement.java     |   211 +
 .../statement/ShowDevicesStatement.java}           |    16 +-
 .../statement/ShowTimeSeriesStatement.java}        |    16 +-
 .../statement/Statement.java}                      |    40 +-
 .../statement/UDAFQueryStatement.java}             |    10 +-
 .../statement/UDTFQueryStatement.java}             |     9 +-
 .../db/sql/statement/component/FillComponent.java  |    36 +-
 .../db/sql/statement/component/FromComponent.java  |    29 +-
 .../statement/component/GroupByLevelComponent.java |    31 +-
 .../statement/component/GroupByTimeComponent.java  |    99 +
 .../statement/component/OrderBy.java}              |     8 +-
 .../db/sql/statement/component/ResultColumn.java   |   168 +
 .../statement/component/ResultSetFormat.java}      |     9 +-
 .../sql/statement/component/SelectComponent.java   |    99 +
 .../statement/component/WhereCondition.java}       |    27 +-
 .../db/sql/statement/component/WithoutPolicy.java  |    59 +
 .../statement/filter/BasicFilterType.java}         |    18 +-
 .../statement/filter/BasicFunctionFilter.java}     |    35 +-
 .../statement/filter/FunctionFilter.java}          |    34 +-
 .../iotdb/db/sql/statement/filter/InFilter.java    |   201 +
 .../iotdb/db/sql/statement/filter/LikeFilter.java  |   134 +
 .../iotdb/db/sql/statement/filter/QueryFilter.java |   295 +
 .../db/sql/statement/filter/RegexpFilter.java      |   134 +
 .../iotdb/db/sync/conf/SyncSenderDescriptor.java   |     2 +-
 .../iotdb/db/sync/receiver/SyncServerManager.java  |    10 +-
 .../db/sync/receiver/SyncServerManagerMBean.java   |     2 +-
 .../db/sync/receiver/load/FileLoaderManager.java   |     4 +-
 .../db/sync/receiver/transfer/SyncServiceImpl.java |     4 +-
 .../db/sync/sender/manage/SyncFileManager.java     |     4 +-
 .../iotdb/db/sync/sender/transfer/SyncClient.java  |     4 +-
 .../org/apache/iotdb/db/tools/TsFileSplitTool.java |     2 +-
 .../db/tools/virtualsg/DeviceMappingViewer.java    |    12 +-
 .../java/org/apache/iotdb/db/utils/AuthUtils.java  |     2 +-
 .../org/apache/iotdb/db/utils/CommonUtils.java     |    58 +-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |     4 +-
 .../apache/iotdb/db/utils/ErrorHandlingUtils.java  |     4 +-
 .../java/org/apache/iotdb/db/utils/MathUtils.java  |    51 +
 .../java/org/apache/iotdb/db/utils/MemUtils.java   |     2 +-
 .../org/apache/iotdb/db/utils/OpenFileNumUtil.java |     2 +-
 .../org/apache/iotdb/db/utils/SchemaTestUtils.java |     2 +-
 .../org/apache/iotdb/db/utils/SchemaUtils.java     |     5 +-
 .../org/apache/iotdb/db/utils/ThreadUtils.java     |     2 +-
 .../db/utils/datastructure/AlignedTVList.java      |     2 +-
 .../iotdb/db/utils/datastructure/TVList.java       |     2 +-
 .../org/apache/iotdb/db/utils/stats/CpuTimer.java  |   156 +
 .../windowing/runtime/WindowEvaluationTask.java    |     2 +-
 .../runtime/WindowEvaluationTaskPoolManager.java   |     6 +-
 .../utils/windowing/window/EvictableBatchList.java |     2 +-
 .../org/apache/iotdb/db/writelog/io/LogWriter.java |     2 +-
 .../writelog/manager/MultiFileLogNodeManager.java  |     8 +-
 .../db/writelog/node/ExclusiveWriteLogNode.java    |     4 +-
 .../iotdb/db/writelog/recover/LogReplayer.java     |     6 +-
 .../apache/iotdb/db/conf/IoTDBDescriptorTest.java  |     2 +
 .../strategy/DirectoryStrategyTest.java            |    24 +-
 .../iotdb/db/engine/MetadataManagerHelper.java     |    48 +-
 .../iotdb/db/engine/cache/ChunkCacheTest.java      |    10 +-
 .../engine/compaction/AbstractCompactionTest.java  |    10 +-
 .../engine/compaction/CompactionSchedulerTest.java |    64 +-
 .../db/engine/compaction/CompactionUtilsTest.java  |     2 +-
 .../compaction/TestUtilsForAlignedSeries.java      |     6 +-
 .../cross/CrossSpaceCompactionExceptionTest.java   |     2 +-
 .../compaction/cross/CrossSpaceCompactionTest.java |     8 +-
 .../db/engine/compaction/cross/MergeTest.java      |    10 +-
 .../engine/compaction/cross/MergeUpgradeTest.java  |     2 +-
 .../cross/RewriteCompactionFileSelectorTest.java   |     2 +-
 .../RewriteCrossSpaceCompactionRecoverTest.java    |     2 +-
 .../cross/RewriteCrossSpaceCompactionTest.java     |     4 +-
 .../inner/AbstractInnerSpaceCompactionTest.java    |    12 +-
 .../inner/InnerCompactionMoreDataTest.java         |     8 +-
 .../compaction/inner/InnerCompactionTest.java      |    12 +-
 .../compaction/inner/InnerSeqCompactionTest.java   |     8 +-
 .../InnerSpaceCompactionUtilsAlignedTest.java      |     4 +-
 .../InnerSpaceCompactionUtilsNoAlignedTest.java    |     6 +-
 .../inner/InnerSpaceCompactionUtilsOldTest.java    |     2 +-
 .../compaction/inner/InnerUnseqCompactionTest.java |     8 +-
 .../SizeTieredCompactionRecoverTest.java           |     2 +-
 .../inner/sizetiered/SizeTieredCompactionTest.java |    12 +-
 ...eCrossSpaceCompactionRecoverCompatibleTest.java |     2 +-
 .../recover/SizeTieredCompactionRecoverTest.java   |    14 +-
 .../compaction/utils/CompactionClearUtils.java     |     2 +-
 .../utils/CompactionFileGeneratorUtils.java        |     2 +-
 .../engine/modification/DeletionFileNodeTest.java  |     4 +-
 .../db/engine/modification/DeletionQueryTest.java  |     4 +-
 .../storagegroup/FileNodeManagerBenchmark.java     |     8 +-
 .../storagegroup/StorageGroupProcessorTest.java    |     2 +-
 .../iotdb/db/engine/storagegroup/TTLTest.java      |    18 +-
 .../db/engine/storagegroup/TsFileManagerTest.java  |     2 +-
 .../org/apache/iotdb/db/metadata/MTreeTest.java    |  1060 --
 ...ncedTest.java => SchemaEngineAdvancedTest.java} |    72 +-
 ...erBasicTest.java => SchemaEngineBasicTest.java} |   996 +-
 ...proveTest.java => SchemaEngineImproveTest.java} |    36 +-
 .../org/apache/iotdb/db/metadata/TemplateTest.java |   112 +-
 .../iotdb/db/metadata/idtable/IDTableTest.java     |    66 +-
 .../db/metadata/idtable/InsertWithIDTableTest.java |    18 +-
 .../iotdb/db/metadata/mlog/MLogUpgraderTest.java   |   176 -
 .../iotdb/db/metadata/mtree/MTreeAboveSGTest.java  |   292 +
 .../iotdb/db/metadata/mtree/MTreeBelowSGTest.java  |   796 ++
 .../db/metadata/upgrade/MetadataUpgradeTest.java   |   304 +
 .../iotdb/db/protocol/rest/IoTDBRestServiceIT.java |     2 +-
 .../java/org/apache/iotdb/db/qp/PlannerTest.java   |    38 +-
 .../iotdb/db/qp/logical/LogicalPlanSmallTest.java  |     6 +-
 .../iotdb/db/qp/physical/ConcatOptimizerTest.java  |    18 +-
 .../iotdb/db/qp/physical/InsertRowPlanTest.java    |    12 +-
 .../iotdb/db/qp/physical/InsertTabletPlanTest.java |    10 +-
 .../iotdb/db/qp/physical/PhysicalPlanTest.java     |    14 +-
 .../iotdb/db/qp/physical/SerializationTest.java    |    14 +-
 .../apache/iotdb/db/qp/sql/ASTVisitorTest.java}    |    24 +-
 .../iotdb/db/qp/sql/IoTDBsqlVisitorTest.java       |     2 +-
 .../dataset/EngineDataSetWithValueFilterTest.java  |     2 +-
 .../query/dataset/UDTFAlignByTimeDataSetTest.java  |    14 +-
 .../query/dataset/groupby/GroupByDataSetTest.java  |     2 +-
 .../dataset/groupby/GroupByFillDataSetTest.java    |     2 +-
 .../dataset/groupby/GroupByLevelDataSetTest.java   |     2 +-
 .../query/reader/series/SeriesReaderTestUtil.java  |    10 +-
 .../iotdb/db/rescon/ResourceManagerTest.java       |    12 +-
 .../iotdb/db/sql/StatementGeneratorTest.java       |    75 +
 .../db/sync/receiver/load/FileLoaderTest.java      |    14 +-
 .../recover/SyncReceiverLogAnalyzerTest.java       |    14 +-
 .../db/sync/sender/manage/SyncFileManagerTest.java |     4 +-
 .../sender/recover/SyncSenderLogAnalyzerTest.java  |     6 +-
 .../sync/sender/recover/SyncSenderLoggerTest.java  |     2 +-
 .../db/sync/sender/transfer/SyncClientTest.java    |     2 +-
 .../apache/iotdb/db/tools/IoTDBWatermarkTest.java  |     2 +-
 .../org/apache/iotdb/db/tools/MLogParserTest.java  |   112 +-
 .../org/apache/iotdb/db/utils/SchemaUtilsTest.java |     8 +-
 .../iotdb/db/writelog/IoTDBLogFileSizeTest.java    |     2 +-
 .../apache/iotdb/db/writelog/PerformanceTest.java  |    10 +-
 .../db/writelog/recover/DeviceStringTest.java      |    12 +-
 .../iotdb/db/writelog/recover/LogReplayerTest.java |     4 +-
 .../recover/RecoverResourceFromReaderTest.java     |    10 +-
 .../db/writelog/recover/SeqTsFileRecoverTest.java  |     8 +-
 .../writelog/recover/UnseqTsFileRecoverTest.java   |    10 +-
 server/src/test/resources/logback.xml              |     4 +-
 .../java/org/apache/iotdb/session/Session.java     |    88 +-
 .../org/apache/iotdb/session/pool/SessionPool.java |   178 +-
 .../session/IoTDBSessionDisableMemControlIT.java   |     2 +-
 .../iotdb/session/IoTDBSessionVectorInsertIT.java  |     2 +-
 .../java/org/apache/iotdb/session/SessionTest.java |     2 +-
 .../apache/iotdb/session/pool/SessionPoolTest.java |     2 +-
 .../apache/iotdb/session/template/TemplateUT.java  |     2 +-
 site/src/main/.vuepress/config.js                  |     3 +-
 .../apache/iotdb/spark/db/EnvironmentUtils.java    |     6 +-
 .../org/apache/iotdb/spark/db/IoTDBTest.scala      |     3 +-
 .../org/apache/iotdb/spark/db/IoTDBWriteTest.scala |     3 +-
 .../iotdb/spark/db/unit/DataFrameToolsTest.scala   |     4 +-
 thrift-confignode/pom.xml                          |     2 +-
 .../src/main/thrift/confignode.thrift              |    29 +-
 thrift-influxdb/src/main/thrift/influxdb.thrift    |    19 +
 .../src/main/thrift/common.thrift                  |     9 +-
 thrift/src/main/thrift/mpp.thrift                  |    55 -
 tsfile/pom.xml                                     |    14 +-
 .../iotdb/tsfile/common/conf/TSFileConfig.java     |    20 +
 .../iotdb/tsfile/common/conf/TSFileDescriptor.java |     6 +
 .../iotdb/tsfile/encoding/decoder/Decoder.java     |     3 +-
 .../iotdb/tsfile/encoding/decoder/FreqDecoder.java |   140 +
 .../iotdb/tsfile/encoding/encoder/FreqEncoder.java |   313 +
 .../tsfile/encoding/encoder/TSEncodingBuilder.java |    65 +-
 .../tsfile/file/metadata/enums/TSEncoding.java     |     5 +-
 .../apache/iotdb/tsfile/utils/BitConstructor.java  |    93 +
 .../org/apache/iotdb/tsfile/utils/BitReader.java   |    70 +
 .../iotdb/tsfile/utils/MeasurementGroup.java       |     3 +-
 .../tsfile/encoding/decoder/FreqDecoderTest.java   |   161 +
 .../zeppelin/iotdb/IoTDBInterpreterTest.java       |     2 +-
 702 files changed, 23958 insertions(+), 13973 deletions(-)
 create mode 100644 cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDBServerCommandLine.java
 rename cluster/src/main/java/org/apache/iotdb/cluster/metadata/{CMManager.java => CSchemaEngine.java} (99%)
 rename cluster/src/test/java/org/apache/iotdb/cluster/partition/{MManagerWhiteBox.java => SchemaEngineWhiteBox.java} (70%)
 delete mode 100644 confignode/src/main/java/org/apache/iotdb/confignode/exception/conf/RepeatConfigurationException.java
 delete mode 100644 confignode/src/main/java/org/apache/iotdb/confignode/exception/startup/StartupException.java
 create mode 100644 confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeCommandLine.java
 delete mode 100644 confignode/src/main/java/org/apache/iotdb/confignode/service/register/IService.java
 delete mode 100644 confignode/src/main/java/org/apache/iotdb/confignode/service/register/JMXService.java
 delete mode 100644 confignode/src/main/java/org/apache/iotdb/confignode/service/register/RegisterManager.java
 delete mode 100644 confignode/src/main/java/org/apache/iotdb/confignode/service/startup/StartupCheck.java
 delete mode 100644 confignode/src/main/java/org/apache/iotdb/confignode/service/startup/StartupChecks.java
 copy server/src/main/java/org/apache/iotdb/db/concurrent/threadpool/WrappedSingleThreadExecutorServiceMBean.java => confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServerMBean.java (87%)
 copy confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/{ConfigNodeRPCServer.java => ConfigNodeRPCServerProcessor.java} (71%)
 create mode 100644 confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/server/ConfigNodeRPCServiceHandler.java
 copy consensus/src/main/java/org/apache/iotdb/consensus/common/request/{IConsensusRequest.java => ByteBufferConsensusRequest.java} (57%)
 rename flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/tsfile/{RowTSRecordOutputFormatIT.java => RowTSRecordOutputFormatIntegrationTest.java} (95%)
 rename flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/tsfile/{RowTsFileInputFormatIT.java => RowTsFileInputFormatIntegrationTest.java} (67%)
 delete mode 100644 integration/src/test/java/org/apache/iotdb/db/integration/IoTDBCreateSnapshotIT.java
 delete mode 100644 iotdb-commons/pom.xml
 copy {tsfile => node-commons}/pom.xml (83%)
 create mode 100644 node-commons/src/main/java/org/apache/iotdb/commons/ServerCommandLine.java
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/concurrent/HashLock.java (97%)
 copy {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/concurrent/IoTDBDaemonThreadFactory.java (96%)
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/concurrent/IoTDBDefaultThreadExceptionHandler.java (96%)
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/concurrent/IoTDBThreadPoolFactory.java (96%)
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/concurrent/IoTThreadFactory.java (98%)
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/concurrent/ThreadName.java (93%)
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/concurrent/WrappedRunnable.java (97%)
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/concurrent/threadpool/IThreadPoolMBean.java (95%)
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/concurrent/threadpool/WrappedScheduledExecutorService.java (97%)
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/concurrent/threadpool/WrappedScheduledExecutorServiceMBean.java (94%)
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/concurrent/threadpool/WrappedSingleThreadExecutorService.java (95%)
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/concurrent/threadpool/WrappedSingleThreadExecutorServiceMBean.java (93%)
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/concurrent/threadpool/WrappedSingleThreadScheduledExecutor.java (96%)
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/concurrent/threadpool/WrappedSingleThreadScheduledExecutorMBean.java (93%)
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/concurrent/threadpool/WrappedThreadPoolExecutor.java (91%)
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/concurrent/threadpool/WrappedThreadPoolExecutorMBean.java (94%)
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/conf/IoTDBConstant.java (99%)
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/exception/ConfigurationException.java (97%)
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/exception/IoTDBException.java (97%)
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/exception/ShutdownException.java (96%)
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/exception/StartupException.java (97%)
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/exception/runtime/RPCServiceException.java (95%)
 rename {iotdb-commons => node-commons}/src/main/java/org/apache/iotdb/commons/hash/APHashExecutor.java (100%)
 copy {iotdb-commons => node-commons}/src/main/java/org/apache/iotdb/commons/hash/BKDRHashExecutor.java (100%)
 rename {iotdb-commons => node-commons}/src/main/java/org/apache/iotdb/commons/hash/DeviceGroupHashExecutor.java (100%)
 copy {iotdb-commons => node-commons}/src/main/java/org/apache/iotdb/commons/hash/JSHashExecutor.java (100%)
 rename {iotdb-commons => node-commons}/src/main/java/org/apache/iotdb/commons/hash/SDBMHashExecutor.java (100%)
 rename server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java => node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java (91%)
 copy {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/service/IService.java (89%)
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/service/JMXService.java (97%)
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/service/RegisterManager.java (92%)
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/service/ServiceType.java (93%)
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/service/StartupCheck.java (90%)
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/service/StartupChecks.java (90%)
 rename {server/src/main/java/org/apache/iotdb/db/service/thrift => node-commons/src/main/java/org/apache/iotdb/commons/service}/ThriftService.java (94%)
 create mode 100644 node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
 create mode 100644 node-commons/src/main/java/org/apache/iotdb/commons/utils/JVMCommonUtils.java
 rename {server/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/utils/TestOnly.java (96%)
 rename {server/src/test/java/org/apache/iotdb/db/concurrent => node-commons/src/test/java/org/apache/iotdb/commons}/IoTDBDefaultThreadExceptionHandlerTest.java (98%)
 rename {server/src/test/java/org/apache/iotdb/db/concurrent => node-commons/src/test/java/org/apache/iotdb/commons}/IoTDBThreadPoolFactoryTest.java (97%)
 rename server/src/main/java/org/apache/iotdb/db/consensus/{ConsensusMain.java => ConsensusExample.java} (75%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
 rename server/src/main/java/org/apache/iotdb/db/consensus/{ratis/RatisDataRegionStateMachine.java => statemachine/DataRegionStateMachine.java} (61%)
 rename server/src/main/java/org/apache/iotdb/db/consensus/{ratis/RatisSchemaRegionStateMachine.java => statemachine/SchemaRegionStateMachine.java} (61%)
 copy server/src/main/java/org/apache/iotdb/db/exception/{QueryIdNotExsitException.java => metadata/SchemaDirCreationFailureException.java} (75%)
 rename server/src/main/java/org/apache/iotdb/db/exception/{runtime => sql}/SQLParserException.java (96%)
 copy server/src/main/java/org/apache/iotdb/db/exception/{query/LogicalOptimizeException.java => sql/StatementAnalyzeException.java} (64%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/IStorageGroupSchemaManager.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/SchemaEngine.java
 rename server/src/main/java/org/apache/iotdb/db/metadata/{MManager.java => SchemaRegion.java} (59%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/StorageGroupSchemaManager.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogUpgrader.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeAboveSG.java
 rename server/src/main/java/org/apache/iotdb/db/metadata/mtree/{MTree.java => MTreeBelowSG.java} (53%)
 copy server/src/main/java/org/apache/iotdb/db/metadata/mtree/traverser/{counter/MNodeLevelCounter.java => collector/MNodeAboveSGCollector.java} (54%)
 copy server/src/main/java/org/apache/iotdb/db/metadata/mtree/traverser/counter/{MNodeLevelCounter.java => MNodeAboveSGLevelCounter.java} (57%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java
 copy server/src/main/java/org/apache/iotdb/db/{mpp/common/TreeNode.java => metadata/template/TemplateLogReader.java} (58%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/template/TemplateLogWriter.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/upgrade/MetadataUpgrader.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManager.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManagerService.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockManagerServiceThriftHandler.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceClientFactory.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/buffer/DataBlockServiceImpl.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
 rename server/src/main/java/org/apache/iotdb/db/mpp/buffer/{ISinkHandle.java => SinkHandle.java} (53%)
 copy server/src/main/java/org/apache/iotdb/db/mpp/common/{WithoutPolicy.java => FilterNullPolicy.java} (96%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/common/{TreeNode.java => FragmentId.java} (50%)
 copy server/src/main/java/org/apache/iotdb/db/mpp/common/{QueryId.java => FragmentInstanceId.java} (86%)
 copy server/src/main/java/org/apache/iotdb/db/mpp/{plan/LogicalPlanner.java => execution/FragmentInfo.java} (57%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceContext.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentInstanceState.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/execution/FragmentState.java
 rename server/src/main/java/org/apache/iotdb/db/mpp/execution/{QueryScheduler.java => scheduler/ClusterScheduler.java} (65%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{plan/node/process/FilterNode.java => execution/scheduler/IScheduler.java} (61%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
 copy server/src/main/java/org/apache/iotdb/db/mpp/{buffer/ISourceHandle.java => operator/process/AggregateOperator.java} (62%)
 copy server/src/main/java/org/apache/iotdb/db/mpp/{buffer/ISourceHandle.java => operator/process/DeviceMergeOperator.java} (62%)
 copy server/src/main/java/org/apache/iotdb/db/mpp/{buffer/ISourceHandle.java => operator/process/FillOperator.java} (62%)
 copy server/src/main/java/org/apache/iotdb/db/mpp/{buffer/ISourceHandle.java => operator/process/FilterNullOperator.java} (62%)
 copy server/src/main/java/org/apache/iotdb/db/mpp/{buffer/ISourceHandle.java => operator/process/GroupByLevelOperator.java} (62%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
 copy server/src/main/java/org/apache/iotdb/db/mpp/{buffer/ISourceHandle.java => operator/process/OffsetOperator.java} (62%)
 copy server/src/main/java/org/apache/iotdb/db/mpp/{common/QueryId.java => operator/process/ProcessOperator.java} (78%)
 copy server/src/main/java/org/apache/iotdb/db/mpp/{buffer/ISourceHandle.java => operator/process/SortOperator.java} (62%)
 copy server/src/main/java/org/apache/iotdb/db/mpp/{buffer/ISourceHandle.java => operator/process/TimeJoinOperator.java} (62%)
 copy server/src/main/java/org/apache/iotdb/db/mpp/{buffer/ISourceHandle.java => operator/sink/FragmentSinkOperator.java} (57%)
 copy server/src/main/java/org/apache/iotdb/db/mpp/{buffer/ISourceHandle.java => operator/sink/SinkOperator.java} (54%)
 copy server/src/main/java/org/apache/iotdb/db/mpp/{buffer/ISourceHandle.java => operator/source/SeriesAggregateScanOperator.java} (57%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{buffer/ISourceHandle.java => operator/source/SeriesScanOperator.java} (57%)
 copy server/src/main/java/org/apache/iotdb/db/mpp/{common/QueryId.java => operator/source/SourceOperator.java} (77%)
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeId.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNullNode.java
 copy server/src/main/java/org/apache/iotdb/db/mpp/schedule/{FragmentInstanceTaskExecutor.java => AbstractExecutor.java} (54%)
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskCallback.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/schedule/ITaskScheduler.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
 rename server/src/main/java/org/apache/iotdb/db/mpp/{plan/optimzation => sql/planner/optimization}/PlanOptimizer.java (80%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{ => sql/planner}/plan/DistributedQueryPlan.java (86%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{ => sql/planner}/plan/DistributionPlanner.java (95%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{ => sql/planner}/plan/FragmentInstance.java (91%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{ => sql/planner}/plan/LogicalPlanner.java (91%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{ => sql/planner}/plan/LogicalQueryPlan.java (85%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{ => sql/planner}/plan/PlanFragment.java (85%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{ => sql/planner}/plan/PlanFragmentId.java (94%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{ => sql/planner}/plan/node/PlanNode.java (55%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{plan/FragmentInstanceId.java => sql/planner/plan/node/PlanNodeId.java} (78%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{ => sql/planner}/plan/node/PlanNodeIdAllocator.java (94%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
 rename server/src/main/java/org/apache/iotdb/db/mpp/{plan/node/process/RowBasedSeriesAggregateNode.java => sql/planner/plan/node/process/AggregateNode.java} (53%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{ => sql/planner}/plan/node/process/DeviceMergeNode.java (69%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{ => sql/planner}/plan/node/process/FillNode.java (64%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
 rename server/src/main/java/org/apache/iotdb/db/mpp/{ => sql/planner}/plan/node/process/GroupByLevelNode.java (59%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{ => sql/planner}/plan/node/process/LimitNode.java (53%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{ => sql/planner}/plan/node/process/OffsetNode.java (56%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{ => sql/planner}/plan/node/process/ProcessNode.java (77%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{ => sql/planner}/plan/node/process/SortNode.java (57%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{ => sql/planner}/plan/node/process/TimeJoinNode.java (55%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{ => sql/planner}/plan/node/sink/CsvSinkNode.java (73%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{ => sql/planner}/plan/node/sink/FragmentSinkNode.java (73%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{ => sql/planner}/plan/node/sink/SinkNode.java (76%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{ => sql/planner}/plan/node/sink/ThriftSinkNode.java (74%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{ => sql/planner}/plan/node/source/CsvSourceNode.java (74%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{plan/node/source/SeriesAggregateNode.java => sql/planner/plan/node/source/SeriesAggregateScanNode.java} (75%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{ => sql/planner}/plan/node/source/SeriesScanNode.java (79%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{ => sql/planner}/plan/node/source/SourceNode.java (76%)
 rename server/src/main/java/org/apache/iotdb/db/mpp/{common/WithoutPolicy.java => sql/tree/Expression.java} (89%)
 copy server/src/main/java/org/apache/iotdb/db/protocol/influxdb/constant/{InfluxDBConstant.java => InfluxConstant.java} (86%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/InfluxFunction.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/InfluxFunctionFactory.java
 copy server/src/main/java/org/apache/iotdb/db/{concurrent/IoTDBDaemonThreadFactory.java => protocol/influxdb/function/InfluxFunctionValue.java} (62%)
 copy iotdb-commons/src/main/java/org/apache/iotdb/commons/hash/JSHashExecutor.java => server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/aggregator/InfluxAggregator.java (61%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/aggregator/InfluxCountFunction.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/aggregator/InfluxMeanFunction.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/aggregator/InfluxMedianFunction.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/aggregator/InfluxModeFunction.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/aggregator/InfluxSpreadFunction.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/aggregator/InfluxStddevFunction.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/aggregator/InfluxSumFunction.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/selector/InfluxFirstFunction.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/selector/InfluxLastFunction.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/selector/InfluxMaxFunction.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/selector/InfluxMinFunction.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/protocol/influxdb/function/selector/InfluxSelector.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/protocol/influxdb/handler/QueryHandler.java
 rename iotdb-commons/src/main/java/org/apache/iotdb/commons/hash/BKDRHashExecutor.java => server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/CommonUtils.java (62%)
 rename server/src/main/java/org/apache/iotdb/db/{service/IService.java => protocol/influxdb/util/FieldUtils.java} (55%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/FilterUtils.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/QueryResultUtils.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/protocol/influxdb/util/StringUtils.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateSnapshotPlan.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sql/constant/FilterConstant.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sql/constant/StatementType.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sql/parser/ASTVisitor.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sql/parser/StatementGenerator.java
 copy server/src/main/java/org/apache/iotdb/db/{utils/windowing/runtime/WindowEvaluationTask.java => sql/statement/AggregationQueryStatement.java} (52%)
 copy server/src/main/java/org/apache/iotdb/db/{exception/LoadFileException.java => sql/statement/FillQueryStatement.java} (66%)
 copy server/src/main/java/org/apache/iotdb/db/{concurrent/IoTDBDaemonThreadFactory.java => sql/statement/GroupByFillQueryStatement.java} (60%)
 copy confignode/src/main/java/org/apache/iotdb/confignode/service/register/ServiceType.java => server/src/main/java/org/apache/iotdb/db/sql/statement/GroupByQueryStatement.java (57%)
 copy consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java => server/src/main/java/org/apache/iotdb/db/sql/statement/LastQueryStatement.java (77%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sql/statement/QueryStatement.java
 copy server/src/main/java/org/apache/iotdb/db/{exception/QueryIdNotExsitException.java => sql/statement/ShowDevicesStatement.java} (75%)
 copy server/src/main/java/org/apache/iotdb/db/{exception/QueryIdNotExsitException.java => sql/statement/ShowTimeSeriesStatement.java} (75%)
 copy server/src/main/java/org/apache/iotdb/db/{exception/StorageEngineException.java => sql/statement/Statement.java} (50%)
 copy server/src/main/java/org/apache/iotdb/db/{service/IoTDBMBean.java => sql/statement/UDAFQueryStatement.java} (80%)
 copy server/src/main/java/org/apache/iotdb/db/{protocol/influxdb/constant/InfluxDBConstant.java => sql/statement/UDTFQueryStatement.java} (80%)
 copy confignode/src/main/java/org/apache/iotdb/confignode/service/register/ServiceType.java => server/src/main/java/org/apache/iotdb/db/sql/statement/component/FillComponent.java (54%)
 rename iotdb-commons/src/main/java/org/apache/iotdb/commons/hash/JSHashExecutor.java => server/src/main/java/org/apache/iotdb/db/sql/statement/component/FromComponent.java (62%)
 rename confignode/src/main/java/org/apache/iotdb/confignode/service/register/ServiceType.java => server/src/main/java/org/apache/iotdb/db/sql/statement/component/GroupByLevelComponent.java (55%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sql/statement/component/GroupByTimeComponent.java
 copy server/src/main/java/org/apache/iotdb/db/{protocol/influxdb/constant/InfluxDBConstant.java => sql/statement/component/OrderBy.java} (83%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sql/statement/component/ResultColumn.java
 copy server/src/main/java/org/apache/iotdb/db/{protocol/influxdb/constant/InfluxDBConstant.java => sql/statement/component/ResultSetFormat.java} (82%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sql/statement/component/SelectComponent.java
 rename server/src/main/java/org/apache/iotdb/db/{concurrent/IoTDBDaemonThreadFactory.java => sql/statement/component/WhereCondition.java} (61%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sql/statement/component/WithoutPolicy.java
 copy server/src/main/java/org/apache/iotdb/db/{qp/logical/crud/BasicOperatorType.java => sql/statement/filter/BasicFilterType.java} (90%)
 copy server/src/main/java/org/apache/iotdb/db/{qp/logical/crud/BasicFunctionOperator.java => sql/statement/filter/BasicFunctionFilter.java} (81%)
 rename server/src/main/java/org/apache/iotdb/db/{qp/logical/sys/CreateSnapshotOperator.java => sql/statement/filter/FunctionFilter.java} (53%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sql/statement/filter/InFilter.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sql/statement/filter/LikeFilter.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sql/statement/filter/QueryFilter.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sql/statement/filter/RegexpFilter.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/utils/stats/CpuTimer.java
 delete mode 100644 server/src/test/java/org/apache/iotdb/db/metadata/MTreeTest.java
 rename server/src/test/java/org/apache/iotdb/db/metadata/{MManagerAdvancedTest.java => SchemaEngineAdvancedTest.java} (77%)
 rename server/src/test/java/org/apache/iotdb/db/metadata/{MManagerBasicTest.java => SchemaEngineBasicTest.java} (66%)
 rename server/src/test/java/org/apache/iotdb/db/metadata/{MManagerImproveTest.java => SchemaEngineImproveTest.java} (83%)
 delete mode 100644 server/src/test/java/org/apache/iotdb/db/metadata/mlog/MLogUpgraderTest.java
 create mode 100644 server/src/test/java/org/apache/iotdb/db/metadata/mtree/MTreeAboveSGTest.java
 create mode 100644 server/src/test/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGTest.java
 create mode 100644 server/src/test/java/org/apache/iotdb/db/metadata/upgrade/MetadataUpgradeTest.java
 copy server/src/{main/java/org/apache/iotdb/db/exception/MergeException.java => test/java/org/apache/iotdb/db/qp/sql/ASTVisitorTest.java} (66%)
 create mode 100644 server/src/test/java/org/apache/iotdb/db/sql/StatementGeneratorTest.java
 rename server/src/main/java/org/apache/iotdb/db/protocol/influxdb/constant/InfluxDBConstant.java => thrift/src/main/thrift/common.thrift (82%)
 delete mode 100644 thrift/src/main/thrift/mpp.thrift
 create mode 100644 tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/decoder/FreqDecoder.java
 create mode 100644 tsfile/src/main/java/org/apache/iotdb/tsfile/encoding/encoder/FreqEncoder.java
 create mode 100644 tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BitConstructor.java
 create mode 100644 tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BitReader.java
 create mode 100644 tsfile/src/test/java/org/apache/iotdb/tsfile/encoding/decoder/FreqDecoderTest.java

[iotdb] 02/07: [To_new_mpp] Basic query memory control (#5216)

Posted by er...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch new_mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 199930b2b121286712e82753198847bac58ae5fc
Author: Zhong Wang <wa...@alibaba-inc.com>
AuthorDate: Tue Mar 15 09:22:55 2022 +0800

    [To_new_mpp] Basic query memory control (#5216)
    
    * [To_new_mpp] Basic query memory control
    
    * Add license
---
 .../iotdb/mpp/memory/LocalMemoryManager.java       |  46 +++++++
 .../org/apache/iotdb/mpp/memory/MemoryPool.java    |  90 +++++++++++++
 .../apache/iotdb/mpp/memory/MemoryPoolTest.java    | 150 +++++++++++++++++++++
 3 files changed, 286 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/mpp/memory/LocalMemoryManager.java b/server/src/main/java/org/apache/iotdb/mpp/memory/LocalMemoryManager.java
new file mode 100644
index 0000000..cc5305e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/memory/LocalMemoryManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.mpp.memory;
+
+/**
+ * Manages memory of a data node. The memory is divided into two memory pools so that the memory for
+ * read and for write can be isolated.
+ */
+public class LocalMemoryManager {
+
+  private final long maxBytes;
+  private final MemoryPool queryPool;
+
+  public LocalMemoryManager() {
+    long maxMemory = Runtime.getRuntime().maxMemory();
+    // Save 20% memory for untracked allocations.
+    maxBytes = (long) (maxMemory * 0.8);
+    // Allocate 50% memory for query execution.
+    queryPool = new MemoryPool("query", (long) (maxBytes * 0.5));
+  }
+
+  public long getMaxBytes() {
+    return maxBytes;
+  }
+
+  public MemoryPool getQueryPool() {
+    return queryPool;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/mpp/memory/MemoryPool.java
new file mode 100644
index 0000000..29b7228
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/memory/MemoryPool.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.mpp.memory;
+
+import org.apache.commons.lang3.Validate;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Manages certain amount of memory. */
+public class MemoryPool {
+
+  private final String id;
+  private final long maxBytes;
+
+  private long reservedBytes = 0L;
+  private final Map<String, Long> queryMemoryReservations = new HashMap<>();
+
+  public MemoryPool(String id, long maxBytes) {
+    this.id = Validate.notNull(id);
+    Validate.isTrue(maxBytes > 0L);
+    this.maxBytes = maxBytes;
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public long getMaxBytes() {
+    return maxBytes;
+  }
+
+  public boolean tryReserve(String queryId, long bytes) {
+    Validate.notNull(queryId);
+    Validate.isTrue(bytes > 0L);
+
+    synchronized (this) {
+      if (maxBytes - reservedBytes < bytes) {
+        return false;
+      }
+      reservedBytes += bytes;
+      queryMemoryReservations.merge(queryId, bytes, Long::sum);
+    }
+
+    return true;
+  }
+
+  public synchronized void free(String queryId, long bytes) {
+    Validate.notNull(queryId);
+    Validate.isTrue(bytes > 0L);
+
+    Long queryReservedBytes = queryMemoryReservations.get(queryId);
+    Validate.notNull(queryReservedBytes);
+    Validate.isTrue(bytes <= queryReservedBytes);
+
+    queryReservedBytes -= bytes;
+    if (queryReservedBytes == 0) {
+      queryMemoryReservations.remove(queryId);
+    } else {
+      queryMemoryReservations.put(queryId, queryReservedBytes);
+    }
+
+    reservedBytes -= bytes;
+  }
+
+  public synchronized long getQueryMemoryReservedBytes(String queryId) {
+    return queryMemoryReservations.getOrDefault(queryId, 0L);
+  }
+
+  public long getReservedBytes() {
+    return reservedBytes;
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/mpp/memory/MemoryPoolTest.java b/server/src/test/java/org/apache/iotdb/mpp/memory/MemoryPoolTest.java
new file mode 100644
index 0000000..cb76b93
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/mpp/memory/MemoryPoolTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.mpp.memory;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MemoryPoolTest {
+
+  MemoryPool pool;
+
+  @Before
+  public void before() {
+    pool = new MemoryPool("test", 1024L);
+  }
+
+  @Test
+  public void testReserve() {
+    String queryId = "q0";
+    Assert.assertTrue(pool.tryReserve(queryId, 512L));
+    Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(512L, pool.getReservedBytes());
+    Assert.assertEquals(1024L, pool.getMaxBytes());
+  }
+
+  @Test
+  public void testReserveZero() {
+    String queryId = "q0";
+    try {
+      pool.tryReserve(queryId, 0L);
+      Assert.fail("Expect IllegalArgumentException");
+    } catch (IllegalArgumentException ignore) {
+    }
+  }
+
+  @Test
+  public void testReserveNegative() {
+    String queryId = "q0";
+    try {
+      pool.tryReserve(queryId, -1L);
+      Assert.fail("Expect IllegalArgumentException");
+    } catch (IllegalArgumentException ignore) {
+    }
+  }
+
+  @Test
+  public void testReserveAll() {
+    String queryId = "q0";
+    Assert.assertTrue(pool.tryReserve(queryId, 1024L));
+    Assert.assertEquals(1024L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(1024L, pool.getReservedBytes());
+    Assert.assertEquals(1024L, pool.getMaxBytes());
+  }
+
+  @Test
+  public void testOverReserve() {
+    String queryId = "q0";
+    Assert.assertFalse(pool.tryReserve(queryId, 1025L));
+    Assert.assertEquals(0L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(0L, pool.getReservedBytes());
+    Assert.assertEquals(1024L, pool.getMaxBytes());
+  }
+
+  @Test
+  public void testFree() {
+    String queryId = "q0";
+    Assert.assertTrue(pool.tryReserve(queryId, 512L));
+    Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(512L, pool.getReservedBytes());
+    Assert.assertEquals(1024L, pool.getMaxBytes());
+
+    pool.free(queryId, 256L);
+    Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(256L, pool.getReservedBytes());
+  }
+
+  @Test
+  public void testFreeAll() {
+    String queryId = "q0";
+    Assert.assertTrue(pool.tryReserve(queryId, 512L));
+    Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(512L, pool.getReservedBytes());
+    Assert.assertEquals(1024L, pool.getMaxBytes());
+
+    pool.free(queryId, 512L);
+    Assert.assertEquals(0L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(0L, pool.getReservedBytes());
+  }
+
+  @Test
+  public void testFreeZero() {
+    String queryId = "q0";
+    Assert.assertTrue(pool.tryReserve(queryId, 512L));
+    Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(512L, pool.getReservedBytes());
+    Assert.assertEquals(1024L, pool.getMaxBytes());
+
+    pool.free(queryId, 256L);
+    Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(256L, pool.getReservedBytes());
+  }
+
+  @Test
+  public void testFreeNegative() {
+    String queryId = "q0";
+    Assert.assertTrue(pool.tryReserve(queryId, 512L));
+    Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(512L, pool.getReservedBytes());
+    Assert.assertEquals(1024L, pool.getMaxBytes());
+
+    try {
+      pool.free(queryId, -1L);
+      Assert.fail("Expect IllegalArgumentException");
+    } catch (IllegalArgumentException ignore) {
+    }
+  }
+
+  @Test
+  public void testOverFree() {
+    String queryId = "q0";
+    Assert.assertTrue(pool.tryReserve(queryId, 512L));
+    Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(512L, pool.getReservedBytes());
+    Assert.assertEquals(1024L, pool.getMaxBytes());
+
+    try {
+      pool.free(queryId, 513L);
+      Assert.fail("Expect IllegalArgumentException");
+    } catch (IllegalArgumentException ignore) {
+    }
+  }
+}

[iotdb] 06/07: Sync with master

Posted by er...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch new_mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4da5757bd1d5a03311a5111ca9b36874f024d10e
Author: ericpai <er...@hotmail.com>
AuthorDate: Thu Mar 17 10:41:32 2022 +0800

    Sync with master
---
 .../mpp/buffer}/IDataBlockManager.java             |  6 +--
 .../{ => db}/mpp/memory/LocalMemoryManager.java    |  2 +-
 .../iotdb/{ => db}/mpp/memory/MemoryPool.java      |  2 +-
 .../mpp/schedule}/ExecutionContext.java            |  4 +-
 .../mpp/schedule}/FragmentInstanceManager.java     | 14 +++----
 .../schedule}/FragmentInstanceTaskCallback.java    |  4 +-
 .../schedule}/FragmentInstanceTaskExecutor.java    |  6 +--
 .../schedule}/FragmentInstanceTimeoutSentinel.java |  8 ++--
 .../mpp/schedule}/IFragmentInstanceManager.java    | 11 +++---
 .../execution => db/mpp/schedule}/queue/ID.java    |  2 +-
 .../mpp/schedule}/queue/IDIndexedAccessible.java   |  2 +-
 .../mpp/schedule}/queue/IndexedBlockingQueue.java  |  2 +-
 .../mpp/schedule}/queue/L1PriorityQueue.java       |  2 +-
 .../mpp/schedule}/queue/L2PriorityQueue.java       |  2 +-
 .../mpp/schedule}/task/FragmentInstanceID.java     |  4 +-
 .../mpp/schedule}/task/FragmentInstanceTask.java   | 10 ++---
 .../schedule}/task/FragmentInstanceTaskStatus.java |  2 +-
 .../java/org/apache/iotdb/mpp/common/ITSBlock.java | 45 ----------------------
 .../apache/iotdb/mpp/common/TsBlockMetadata.java   | 22 -----------
 .../iotdb/{ => db}/mpp/memory/MemoryPoolTest.java  |  2 +-
 .../mpp/schedule}/queue/L1PriorityQueueTest.java   |  2 +-
 .../mpp/schedule}/queue/L2PriorityQueueTest.java   |  2 +-
 .../mpp/schedule}/queue/QueueElement.java          |  2 +-
 23 files changed, 45 insertions(+), 113 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/mpp/shuffle/IDataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/mpp/shuffle/IDataBlockManager.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
index a05d04a..391db95 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/shuffle/IDataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
@@ -17,11 +17,11 @@
  * under the License.
  */
 
-package org.apache.iotdb.mpp.shuffle;
+package org.apache.iotdb.db.mpp.buffer;
 
+import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceID;
+import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
 import org.apache.iotdb.mpp.common.ITSBlock;
-import org.apache.iotdb.mpp.execution.task.FragmentInstanceID;
-import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
 
 public interface IDataBlockManager {
 
diff --git a/server/src/main/java/org/apache/iotdb/mpp/memory/LocalMemoryManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/memory/LocalMemoryManager.java
similarity index 97%
rename from server/src/main/java/org/apache/iotdb/mpp/memory/LocalMemoryManager.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/memory/LocalMemoryManager.java
index cc5305e..797075f 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/memory/LocalMemoryManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/memory/LocalMemoryManager.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.mpp.memory;
+package org.apache.iotdb.db.mpp.memory;
 
 /**
  * Manages memory of a data node. The memory is divided into two memory pools so that the memory for
diff --git a/server/src/main/java/org/apache/iotdb/mpp/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/db/mpp/memory/MemoryPool.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/mpp/memory/MemoryPool.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/memory/MemoryPool.java
index 29b7228..41a687d 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/memory/MemoryPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/memory/MemoryPool.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.mpp.memory;
+package org.apache.iotdb.db.mpp.memory;
 
 import org.apache.commons.lang3.Validate;
 
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/ExecutionContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/ExecutionContext.java
similarity index 89%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/ExecutionContext.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/ExecutionContext.java
index 7428b73..adf7874 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/ExecutionContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/ExecutionContext.java
@@ -16,9 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution;
+package org.apache.iotdb.db.mpp.schedule;
 
-import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
 
 /** The execution context of a {@link FragmentInstanceTask} */
 public class ExecutionContext {}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManager.java
similarity index 92%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManager.java
index 8917dd4..bf38396 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManager.java
@@ -16,17 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution;
+package org.apache.iotdb.db.mpp.schedule;
 
 import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.mpp.schedule.queue.IndexedBlockingQueue;
+import org.apache.iotdb.db.mpp.schedule.queue.L1PriorityQueue;
+import org.apache.iotdb.db.mpp.schedule.queue.L2PriorityQueue;
+import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceID;
+import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
+import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskStatus;
 import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.ServiceType;
-import org.apache.iotdb.mpp.execution.queue.IndexedBlockingQueue;
-import org.apache.iotdb.mpp.execution.queue.L1PriorityQueue;
-import org.apache.iotdb.mpp.execution.queue.L2PriorityQueue;
-import org.apache.iotdb.mpp.execution.task.FragmentInstanceID;
-import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
-import org.apache.iotdb.mpp.execution.task.FragmentInstanceTaskStatus;
 
 import java.util.List;
 import java.util.Map;
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskCallback.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskCallback.java
similarity index 90%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskCallback.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskCallback.java
index 6eee1ad..c3ba2e7 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskCallback.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskCallback.java
@@ -16,9 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution;
+package org.apache.iotdb.db.mpp.schedule;
 
-import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
 
 /** A common interface for {@link FragmentInstanceTask} business logic callback */
 @FunctionalInterface
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
similarity index 90%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
index b35a6f7..2d3e606 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
@@ -16,10 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution;
+package org.apache.iotdb.db.mpp.schedule;
 
-import org.apache.iotdb.mpp.execution.queue.IndexedBlockingQueue;
-import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+import org.apache.iotdb.db.mpp.schedule.queue.IndexedBlockingQueue;
+import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java
index 4171f26..75f9eba 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java
@@ -16,11 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution;
+package org.apache.iotdb.db.mpp.schedule;
 
-import org.apache.iotdb.mpp.execution.queue.IndexedBlockingQueue;
-import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
-import org.apache.iotdb.mpp.execution.task.FragmentInstanceTaskStatus;
+import org.apache.iotdb.db.mpp.schedule.queue.IndexedBlockingQueue;
+import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
+import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskStatus;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceManager.java
similarity index 80%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceManager.java
index a27db32..98dc0c4 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceManager.java
@@ -16,9 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution;
+package org.apache.iotdb.db.mpp.schedule;
 
-import org.apache.iotdb.mpp.execution.task.FragmentInstanceID;
+import org.apache.iotdb.db.mpp.buffer.IDataBlockManager;
+import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceID;
 
 /** the interface of fragment instance scheduling */
 public interface IFragmentInstanceManager {
@@ -26,8 +27,7 @@ public interface IFragmentInstanceManager {
   void submitFragmentInstance();
 
   /**
-   * the notifying interface for {@link org.apache.iotdb.mpp.shuffle.IDataBlockManager} when
-   * upstream data comes.
+   * the notifying interface for {@link IDataBlockManager} when upstream data comes.
    *
    * @param instanceID the fragment instance to be notified.
    * @param upstreamInstanceId the upstream instance id.
@@ -35,8 +35,7 @@ public interface IFragmentInstanceManager {
   void inputBlockAvailable(FragmentInstanceID instanceID, FragmentInstanceID upstreamInstanceId);
 
   /**
-   * the notifying interface for {@link org.apache.iotdb.mpp.shuffle.IDataBlockManager} when
-   * downstream data has been consumed.
+   * the notifying interface for {@link IDataBlockManager} when downstream data has been consumed.
    *
    * @param instanceID the fragment instance to be notified.
    */
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/ID.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/ID.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/queue/ID.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/ID.java
index cc7d58f..940370f 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/ID.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/ID.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution.queue;
+package org.apache.iotdb.db.mpp.schedule.queue;
 
 /** A simple interface to indicate the id type */
 public interface ID {}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IDIndexedAccessible.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/IDIndexedAccessible.java
similarity index 95%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/queue/IDIndexedAccessible.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/IDIndexedAccessible.java
index 5ae4c96..86b0c8b 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IDIndexedAccessible.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/IDIndexedAccessible.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution.queue;
+package org.apache.iotdb.db.mpp.schedule.queue;
 
 /** A simple interface for id getter and setter */
 public interface IDIndexedAccessible {
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/IndexedBlockingQueue.java
similarity index 99%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/IndexedBlockingQueue.java
index 6ddc610..bb6751f 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/IndexedBlockingQueue.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution.queue;
+package org.apache.iotdb.db.mpp.schedule.queue;
 
 /**
  * The base class of a special kind of blocking queue, which has these characters:
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueue.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueue.java
index a9cad83..6aedb89 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueue.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution.queue;
+package org.apache.iotdb.db.mpp.schedule.queue;
 
 import java.util.Comparator;
 import java.util.SortedMap;
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueue.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueue.java
index c23e74a..cd3caf3 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueue.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution.queue;
+package org.apache.iotdb.db.mpp.schedule.queue;
 
 import java.util.Comparator;
 import java.util.SortedMap;
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceID.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceID.java
similarity index 95%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceID.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceID.java
index f52b5c3..dab6e0a 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceID.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceID.java
@@ -16,9 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution.task;
+package org.apache.iotdb.db.mpp.schedule.task;
 
-import org.apache.iotdb.mpp.execution.queue.ID;
+import org.apache.iotdb.db.mpp.schedule.queue.ID;
 
 import org.jetbrains.annotations.NotNull;
 
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
similarity index 93%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
index 1836c74..3c26f33 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
@@ -16,12 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution.task;
+package org.apache.iotdb.db.mpp.schedule.task;
 
-import org.apache.iotdb.mpp.execution.ExecutionContext;
-import org.apache.iotdb.mpp.execution.FragmentInstanceTaskExecutor;
-import org.apache.iotdb.mpp.execution.queue.ID;
-import org.apache.iotdb.mpp.execution.queue.IDIndexedAccessible;
+import org.apache.iotdb.db.mpp.schedule.ExecutionContext;
+import org.apache.iotdb.db.mpp.schedule.FragmentInstanceTaskExecutor;
+import org.apache.iotdb.db.mpp.schedule.queue.ID;
+import org.apache.iotdb.db.mpp.schedule.queue.IDIndexedAccessible;
 
 import java.util.Comparator;
 import java.util.concurrent.locks.Lock;
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTaskStatus.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTaskStatus.java
similarity index 96%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTaskStatus.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTaskStatus.java
index 5ee9fb3..f50dc4d 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTaskStatus.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTaskStatus.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution.task;
+package org.apache.iotdb.db.mpp.schedule.task;
 
 /** the status enum of {@link FragmentInstanceTask} */
 public enum FragmentInstanceTaskStatus {
diff --git a/server/src/main/java/org/apache/iotdb/mpp/common/ITSBlock.java b/server/src/main/java/org/apache/iotdb/mpp/common/ITSBlock.java
deleted file mode 100644
index 449e0fc..0000000
--- a/server/src/main/java/org/apache/iotdb/mpp/common/ITSBlock.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.mpp.common;
-
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-
-/**
- * Intermediate result for most of ExecOperators. The TsBlock contains data from one or more columns
- * and constructs them as a row based view The columns can be series, aggregation result for one
- * series or scalar value (such as deviceName). The TsBlock also contains the metadata to describe
- * the columns.
- */
-public class ITSBlock {
-
-  private TsBlockMetadata metadata;
-
-  public boolean hasNext() {
-    return false;
-  }
-
-  public RowRecord getNext() {
-    return null;
-  }
-
-  public TsBlockMetadata getMetadata() {
-    return metadata;
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/common/TsBlockMetadata.java b/server/src/main/java/org/apache/iotdb/mpp/common/TsBlockMetadata.java
deleted file mode 100644
index ed7680f..0000000
--- a/server/src/main/java/org/apache/iotdb/mpp/common/TsBlockMetadata.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.mpp.common;
-
-public class TsBlockMetadata {}
diff --git a/server/src/test/java/org/apache/iotdb/mpp/memory/MemoryPoolTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/memory/MemoryPoolTest.java
similarity index 99%
rename from server/src/test/java/org/apache/iotdb/mpp/memory/MemoryPoolTest.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/memory/MemoryPoolTest.java
index cb76b93..8f21c7d 100644
--- a/server/src/test/java/org/apache/iotdb/mpp/memory/MemoryPoolTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/memory/MemoryPoolTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.mpp.memory;
+package org.apache.iotdb.db.mpp.memory;
 
 import org.junit.Assert;
 import org.junit.Before;
diff --git a/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueueTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueueTest.java
similarity index 98%
rename from server/src/test/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueueTest.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueueTest.java
index 3809d4a..fba87fd 100644
--- a/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueueTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueueTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution.queue;
+package org.apache.iotdb.db.mpp.schedule.queue;
 
 import org.junit.Assert;
 import org.junit.Test;
diff --git a/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueueTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueueTest.java
similarity index 98%
rename from server/src/test/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueueTest.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueueTest.java
index 5d34bb7..9f161e1 100644
--- a/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueueTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueueTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution.queue;
+package org.apache.iotdb.db.mpp.schedule.queue;
 
 import org.junit.Assert;
 import org.junit.Test;
diff --git a/server/src/test/java/org/apache/iotdb/mpp/execution/queue/QueueElement.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/QueueElement.java
similarity index 97%
rename from server/src/test/java/org/apache/iotdb/mpp/execution/queue/QueueElement.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/QueueElement.java
index 4aed3a5..844c91d 100644
--- a/server/src/test/java/org/apache/iotdb/mpp/execution/queue/QueueElement.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/QueueElement.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution.queue;
+package org.apache.iotdb.db.mpp.schedule.queue;
 
 public class QueueElement implements IDIndexedAccessible {
   private QueueElementID id;

[iotdb] 04/07: [To_new_mpp] add IDataBlockManager (#5240)

Posted by er...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch new_mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 93722e87856c9972ccdf7eacc47166ebbc9f8654
Author: Zhong Wang <wa...@alibaba-inc.com>
AuthorDate: Tue Mar 15 16:21:37 2022 +0800

    [To_new_mpp] add IDataBlockManager (#5240)
---
 .../mpp/execution/IFragmentInstanceManager.java    |  6 +-
 .../iotdb/mpp/shuffle/IDataBlockManager.java       | 80 ++++++++++++++++++++++
 2 files changed, 84 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
index e0eecfa..3bda549 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
@@ -26,7 +26,8 @@ public interface IFragmentInstanceManager {
   void submitFragmentInstance();
 
   /**
-   * the notifying interface for {@link DataBlockManager} when upstream data comes.
+   * the notifying interface for {@link org.apache.iotdb.mpp.shuffle.IDataBlockManager} when
+   * upstream data comes.
    *
    * @param instanceID the fragment instance to be notified.
    * @param upstreamInstanceId the upstream instance id.
@@ -34,7 +35,8 @@ public interface IFragmentInstanceManager {
   void inputBlockAvailable(FragmentInstanceID instanceID, FragmentInstanceID upstreamInstanceId);
 
   /**
-   * the notifying interface for {@link DataBlockManager} when downstream data has been consumed.
+   * the notifying interface for {@link org.apache.iotdb.mpp.shuffle.IDataBlockManager} when
+   * downstream data has been consumed.
    *
    * @param instanceID the fragment instance to be notified.
    * @param downstreamInstanceId the downstream instance id.
diff --git a/server/src/main/java/org/apache/iotdb/mpp/shuffle/IDataBlockManager.java b/server/src/main/java/org/apache/iotdb/mpp/shuffle/IDataBlockManager.java
new file mode 100644
index 0000000..a05d04a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/shuffle/IDataBlockManager.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.mpp.shuffle;
+
+import org.apache.iotdb.mpp.common.ITSBlock;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceID;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+
+public interface IDataBlockManager {
+
+  /**
+   * Register a new fragment instance. The block manager will start looking for upstream data blocks
+   * and flushing data blocks generated to downstream fragment instances.
+   */
+  void registerFragmentInstance(FragmentInstanceTask task);
+
+  /**
+   * Deregister a fragment instance. The block manager will stop looking for upstream data blocks
+   * and release the input data blocks, but will keep flushing data blocks to downstream fragment
+   * instances until all the data blocks are sent. Once all the data blocks are sent, the output
+   * data blocks will be release.
+   *
+   * <p>This method should be called when a fragment instance finished in a normal state.
+   */
+  void deregisterFragmentInstance(FragmentInstanceTask task);
+
+  /**
+   * Deregister a fragment instance. The block manager will release all the related resources.
+   * Including data blocks that are not yet sent to downstream fragment instances.
+   *
+   * <p>This method should be called when a fragment instance finished in an abnormal state.
+   */
+  void forceDeregisterFragmentInstance(FragmentInstanceTask task);
+
+  /**
+   * Put a data block to the output buffer for downstream fragment instances. Will throw an {@link
+   * IllegalStateException} if the output buffer is full.
+   *
+   * <p>Once the block be put into the output buffer, the data block manager will notify downstream
+   * fragment instances that a new data block is available.
+   *
+   * @param instanceID ID of fragment instance that generates the block.
+   * @return If there are enough memory for the next block.
+   */
+  boolean putDataBlock(FragmentInstanceID instanceID, ITSBlock block);
+
+  /**
+   * Check if there are data blocks from the specified upstream fragment instance.
+   *
+   * @param instanceID ID of the upstream fragment instance.
+   * @return If there are available data blocks.
+   */
+  boolean hasDataBlock(FragmentInstanceID instanceID);
+
+  /**
+   * Get a data block from the input buffer of specified upstream fragment instance. Will throw an
+   * {@link IllegalStateException} if the input buffer is empty.
+   *
+   * @param instanceID ID of the upstream fragment instance.
+   * @return A data block.
+   */
+  ITSBlock getDataBlock(FragmentInstanceID instanceID);
+}

[iotdb] 05/07: Fragment schedule develop

Posted by er...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch new_mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8fb27a2194a7f2078539fd21ca34795ade5d6a34
Author: ericpai <er...@hotmail.com>
AuthorDate: Tue Mar 15 17:43:12 2022 +0800

    Fragment schedule develop
---
 .../mpp/execution/FragmentInstanceManager.java     |  89 ++++++++++++++-
 ...utor.java => FragmentInstanceTaskCallback.java} |  32 +-----
 .../execution/FragmentInstanceTaskExecutor.java    |   9 +-
 .../execution/FragmentInstanceTimeoutSentinel.java |  43 ++++++-
 .../mpp/execution/IFragmentInstanceManager.java    |   3 +-
 .../mpp/execution/queue/IndexedBlockingQueue.java  |  45 +++++++-
 .../iotdb/mpp/execution/queue/L1PriorityQueue.java |  12 +-
 .../iotdb/mpp/execution/queue/L2PriorityQueue.java |  17 ++-
 .../mpp/execution/task/FragmentInstanceTask.java   |  13 +++
 .../mpp/execution/queue/L1PriorityQueueTest.java   | 115 +++++++++++++++++++
 .../mpp/execution/queue/L2PriorityQueueTest.java   | 124 +++++++++++++++++++++
 .../iotdb/mpp/execution/queue/QueueElement.java    |  80 +++++++++++++
 12 files changed, 530 insertions(+), 52 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java
index 4c7c157..8917dd4 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.mpp.execution.queue.L1PriorityQueue;
 import org.apache.iotdb.mpp.execution.queue.L2PriorityQueue;
 import org.apache.iotdb.mpp.execution.task.FragmentInstanceID;
 import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceTaskStatus;
 
 import java.util.List;
 import java.util.Map;
@@ -40,7 +41,7 @@ public class FragmentInstanceManager implements IFragmentInstanceManager, IServi
 
   private final IndexedBlockingQueue<FragmentInstanceTask> readyQueue;
   private final IndexedBlockingQueue<FragmentInstanceTask> timeoutQueue;
-  private final Map<String, List<FragmentInstanceID>> queryMap;
+  private final Map<String, List<FragmentInstanceTask>> queryMap;
 
   private static final int MAX_CAPACITY = 1000; // TODO: load from config files
   private static final int WORKER_THREAD_NUM = 4; // TODO: load from config files
@@ -65,7 +66,9 @@ public class FragmentInstanceManager implements IFragmentInstanceManager, IServi
     for (int i = 0; i < WORKER_THREAD_NUM; i++) {
       new FragmentInstanceTaskExecutor("Worker-Thread-" + i, workerGroups, readyQueue).start();
     }
-    new FragmentInstanceTimeoutSentinel("Sentinel-Thread", workerGroups, timeoutQueue).start();
+    new FragmentInstanceTimeoutSentinel(
+            "Sentinel-Thread", workerGroups, timeoutQueue, this::abortFragmentInstanceTask)
+        .start();
   }
 
   @Override
@@ -79,15 +82,89 @@ public class FragmentInstanceManager implements IFragmentInstanceManager, IServi
   }
 
   @Override
-  public void submitFragmentInstance() {}
+  public void submitFragmentInstance() {
+    // TODO: pass a real task
+    FragmentInstanceTask task = new FragmentInstanceTask();
+
+    task.lock();
+    try {
+      timeoutQueue.push(task);
+      // TODO: if no upstream deps, set to ready
+      task.setStatus(FragmentInstanceTaskStatus.READY);
+      readyQueue.push(task);
+    } finally {
+      task.unlock();
+    }
+  }
 
   @Override
   public void inputBlockAvailable(
-      FragmentInstanceID instanceID, FragmentInstanceID upstreamInstanceId) {}
+      FragmentInstanceID instanceID, FragmentInstanceID upstreamInstanceId) {
+    FragmentInstanceTask task = timeoutQueue.get(instanceID);
+    if (task == null) {
+      return;
+    }
+    task.lock();
+    try {
+      if (task.getStatus() != FragmentInstanceTaskStatus.BLOCKED) {
+        return;
+      }
+      task.inputReady(instanceID);
+      if (task.getStatus() == FragmentInstanceTaskStatus.READY) {
+        readyQueue.push(task);
+      }
+    } finally {
+      task.unlock();
+    }
+  }
 
   @Override
-  public void outputBlockAvailable(
-      FragmentInstanceID instanceID, FragmentInstanceID downstreamInstanceId) {}
+  public void outputBlockAvailable(FragmentInstanceID instanceID) {
+    FragmentInstanceTask task = timeoutQueue.get(instanceID);
+    if (task == null) {
+      return;
+    }
+    task.lock();
+    try {
+      if (task.getStatus() != FragmentInstanceTaskStatus.BLOCKED) {
+        return;
+      }
+      task.outputReady();
+      if (task.getStatus() == FragmentInstanceTaskStatus.READY) {
+        readyQueue.push(task);
+      }
+    } finally {
+      task.unlock();
+    }
+  }
+
+  /** abort a {@link FragmentInstanceTask} */
+  void abortFragmentInstanceTask(FragmentInstanceTask task) {
+    List<FragmentInstanceTask> queryRelatedTasks = queryMap.remove(task.getId().getQueryId());
+    clearFragmentInstanceTask(task);
+    if (queryRelatedTasks != null) {
+      // if queryRelatedTask is not null, it means that the clean request comes from this node, not
+      // coordinator.
+      // TODO: tell coordinator
+      for (FragmentInstanceTask otherTask : queryRelatedTasks) {
+        clearFragmentInstanceTask(otherTask);
+      }
+    }
+    // TODO: call LocalMemoryManager to release resources
+  }
+
+  private void clearFragmentInstanceTask(FragmentInstanceTask task) {
+    task.lock();
+    try {
+      if (task.getStatus() != FragmentInstanceTaskStatus.FINISHED) {
+        task.setStatus(FragmentInstanceTaskStatus.ABORTED);
+      }
+      readyQueue.remove(task.getId());
+      timeoutQueue.remove(task.getId());
+    } finally {
+      task.unlock();
+    }
+  }
 
   @Override
   public void abortQuery(String queryId) {}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskCallback.java
similarity index 52%
copy from server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
copy to server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskCallback.java
index 5c704db..6eee1ad 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskCallback.java
@@ -18,34 +18,10 @@
  */
 package org.apache.iotdb.mpp.execution;
 
-import org.apache.iotdb.mpp.execution.queue.IndexedBlockingQueue;
 import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** the worker thread of {@link FragmentInstanceTask} */
-public class FragmentInstanceTaskExecutor extends Thread {
-
-  private static final Logger logger = LoggerFactory.getLogger(FragmentInstanceTaskExecutor.class);
-
-  private final IndexedBlockingQueue<FragmentInstanceTask> queue;
-
-  public FragmentInstanceTaskExecutor(
-      String workerId, ThreadGroup tg, IndexedBlockingQueue<FragmentInstanceTask> queue) {
-    super(tg, workerId);
-    this.queue = queue;
-  }
-
-  @Override
-  public void run() {
-    try {
-      while (true) {
-        FragmentInstanceTask next = queue.poll();
-        // do logic here
-      }
-    } catch (InterruptedException e) {
-      logger.info("{} is interrupted.", this.getName());
-    }
-  }
+/** A common interface for {@link FragmentInstanceTask} business logic callback */
+@FunctionalInterface
+public interface FragmentInstanceTaskCallback {
+  void call(FragmentInstanceTask task) throws Exception;
 }
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
index 5c704db..b35a6f7 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
@@ -39,13 +39,14 @@ public class FragmentInstanceTaskExecutor extends Thread {
 
   @Override
   public void run() {
-    try {
-      while (true) {
+    while (true) {
+      try {
         FragmentInstanceTask next = queue.poll();
         // do logic here
+      } catch (InterruptedException e) {
+        logger.info("{} is interrupted.", this.getName());
+        break;
       }
-    } catch (InterruptedException e) {
-      logger.info("{} is interrupted.", this.getName());
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java
index 7b352ba..4171f26 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.mpp.execution;
 
 import org.apache.iotdb.mpp.execution.queue.IndexedBlockingQueue;
 import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceTaskStatus;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,22 +32,52 @@ public class FragmentInstanceTimeoutSentinel extends Thread {
       LoggerFactory.getLogger(FragmentInstanceTimeoutSentinel.class);
 
   private final IndexedBlockingQueue<FragmentInstanceTask> queue;
+  private final FragmentInstanceTaskCallback timeoutCallback;
+  // the check interval in milliseconds if the queue head remains the same.
+  private static final int CHECK_INTERVAL = 100;
 
   public FragmentInstanceTimeoutSentinel(
-      String workerId, ThreadGroup tg, IndexedBlockingQueue<FragmentInstanceTask> queue) {
+      String workerId,
+      ThreadGroup tg,
+      IndexedBlockingQueue<FragmentInstanceTask> queue,
+      FragmentInstanceTaskCallback timeoutCallback) {
     super(tg, workerId);
     this.queue = queue;
+    this.timeoutCallback = timeoutCallback;
   }
 
   @Override
   public void run() {
-    try {
-      while (true) {
+    while (true) {
+      try {
         FragmentInstanceTask next = queue.poll();
-        // do logic here
+        next.lock();
+        try {
+          // if this task is already in an end state, it means that the resource releasing will be
+          // handled by other threads, we don't care anymore.
+          if (next.isEndState()) {
+            continue;
+          }
+          // if this task is not in end state and not timeout, we should push it back to the queue.
+          if (next.getDDL() > System.currentTimeMillis()) {
+            queue.push(next);
+            Thread.sleep(CHECK_INTERVAL);
+            continue;
+          }
+          next.setStatus(FragmentInstanceTaskStatus.ABORTED);
+        } finally {
+          next.unlock();
+        }
+        try {
+          // Or we should do something to abort
+          timeoutCallback.call(next);
+        } catch (Exception e) {
+          logger.error("Abort instance " + next.getId() + " failed", e);
+        }
+      } catch (InterruptedException e) {
+        logger.info("{} is interrupted.", this.getName());
+        break;
       }
-    } catch (InterruptedException e) {
-      logger.info("{} is interrupted.", this.getName());
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
index 3bda549..a27db32 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
@@ -39,9 +39,8 @@ public interface IFragmentInstanceManager {
    * downstream data has been consumed.
    *
    * @param instanceID the fragment instance to be notified.
-   * @param downstreamInstanceId the downstream instance id.
    */
-  void outputBlockAvailable(FragmentInstanceID instanceID, FragmentInstanceID downstreamInstanceId);
+  void outputBlockAvailable(FragmentInstanceID instanceID);
 
   /**
    * abort all the instances in this query
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java
index bcabb5c..6ddc610 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java
@@ -29,7 +29,9 @@ package org.apache.iotdb.mpp.execution.queue;
  * <p>3. Can push a non-null element to queue. When the queue is beyond the max size, an exception
  * will be thrown.
  *
- * <p>4. Can remove an element by a long type id.
+ * <p>4. Can remove an element by a type of {@link ID}.
+ *
+ * <p>5. Each element has the different ID.
  */
 public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
 
@@ -79,15 +81,17 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
     if (element == null) {
       throw new NullPointerException("pushed element is null");
     }
-    if (size + 1 > MAX_CAPACITY) {
+    int sizeDelta = contains(element) ? 0 : 1;
+    if (size + sizeDelta > MAX_CAPACITY) {
       throw new IllegalStateException("the queue is full");
     }
     pushToQueue(element);
+    size += sizeDelta;
     this.notifyAll();
   }
 
   /**
-   * Remove and return the element by a long id. It returns null if it doesn't exist.
+   * Remove and return the element by id. It returns null if it doesn't exist.
    *
    * @param id the id of the element to be removed.
    * @return the removed element.
@@ -103,6 +107,17 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
   }
 
   /**
+   * Get the element by id. It returns null if it doesn't exist.
+   *
+   * @param id the id of the element.
+   * @return the removed element.
+   */
+  public synchronized E get(ID id) {
+    queryHolder.setId(id);
+    return get(queryHolder);
+  }
+
+  /**
    * Get the current queue size.
    *
    * @return the current queue size.
@@ -139,10 +154,32 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
   protected abstract void pushToQueue(E element);
 
   /**
-   * Remove and return the element by a long id. It returns null if it doesn't exist.
+   * Remove and return the element by its ID. It returns null if it doesn't exist.
+   *
+   * <p>This implementation needn't be thread-safe.
    *
    * @param element the element to be removed.
    * @return the removed element.
    */
   protected abstract E remove(E element);
+
+  /**
+   * Check whether an element with the same ID exists.
+   *
+   * <p>This implementation needn't be thread-safe.
+   *
+   * @param element the element to be checked.
+   * @return true if an element with the same ID exists, otherwise false.
+   */
+  protected abstract boolean contains(E element);
+
+  /**
+   * Return the element with the same id of the input, null if it doesn't exist.
+   *
+   * <p>This implementation needn't be thread-safe.
+   *
+   * @param element the element to be queried.
+   * @return the element with the same id in the queue. Null if it doesn't exist.
+   */
+  protected abstract E get(E element);
 }
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java
index a5ccf14..a9cad83 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java
@@ -28,10 +28,10 @@ import java.util.TreeMap;
  * <p>The time complexity of operations are:
  *
  * <ul>
- *   <li><b>{@link #size()}: </b> O(1).
  *   <li><b>{@link #remove(IDIndexedAccessible)} ()}: </b> O(logN).
  *   <li><b>{@link #push(IDIndexedAccessible)}: </b> O(logN).
  *   <li><b>{@link #poll()}: </b> O(logN).
+ *   <li><b>{@link #get(ID)}}: </b> O(1).
  * </ul>
  */
 public class L1PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlockingQueue<E> {
@@ -73,4 +73,14 @@ public class L1PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlock
   protected E remove(E element) {
     return elements.remove(element);
   }
+
+  @Override
+  protected boolean contains(E element) {
+    return elements.containsKey(element);
+  }
+
+  @Override
+  protected E get(E element) {
+    return elements.get(element);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java
index 3b8a9e6..c23e74a 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java
@@ -30,10 +30,10 @@ import java.util.TreeMap;
  * <p>The time complexity of operations are:
  *
  * <ul>
- *   <li><b>{@link #size()}: </b> O(1).
  *   <li><b>{@link #remove(IDIndexedAccessible)} ()}: </b> O(logN).
  *   <li><b>{@link #push(IDIndexedAccessible)}: </b> O(logN).
  *   <li><b>{@link #poll()}: </b> O(logN).
+ *   <li><b>{@link #get(ID)}}: </b> O(1).
  * </ul>
  */
 public class L2PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlockingQueue<E> {
@@ -75,6 +75,7 @@ public class L2PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlock
 
   @Override
   protected void pushToQueue(E element) {
+    workingElements.remove(element);
     idleElements.put(element, element);
   }
 
@@ -86,4 +87,18 @@ public class L2PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlock
     }
     return e;
   }
+
+  @Override
+  protected boolean contains(E element) {
+    return workingElements.containsKey(element) || idleElements.containsKey(element);
+  }
+
+  @Override
+  protected E get(E element) {
+    E e = workingElements.get(element);
+    if (e != null) {
+      return e;
+    }
+    return idleElements.get(element);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java
index 7bd7205..1836c74 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java
@@ -70,6 +70,19 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
     return status;
   }
 
+  public boolean isEndState() {
+    return status == FragmentInstanceTaskStatus.ABORTED
+        || status == FragmentInstanceTaskStatus.FINISHED;
+  }
+
+  public void inputReady(FragmentInstanceID inputId) {
+    throw new UnsupportedOperationException("unsupported");
+  }
+
+  public void outputReady() {
+    throw new UnsupportedOperationException("unsupported");
+  }
+
   public void setStatus(FragmentInstanceTaskStatus status) {
     this.status = status;
   }
diff --git a/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueueTest.java b/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueueTest.java
new file mode 100644
index 0000000..3809d4a
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueueTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class L1PriorityQueueTest {
+
+  @Test
+  public void testPollBlocked() throws InterruptedException {
+    IndexedBlockingQueue<QueueElement> queue =
+        new L1PriorityQueue<>(
+            10,
+            (o1, o2) -> {
+              if (o1.equals(o2)) {
+                return 0;
+              }
+              return Integer.compare(o1.getValue(), o2.getValue());
+            },
+            new QueueElement(new QueueElement.QueueElementID(0), 0));
+    List<QueueElement> res = new ArrayList<>();
+    Thread t1 =
+        new Thread(
+            () -> {
+              try {
+                QueueElement e = queue.poll();
+                res.add(e);
+              } catch (InterruptedException e) {
+                e.printStackTrace();
+                Assert.fail();
+              }
+            });
+    t1.start();
+    Thread.sleep(10);
+    Assert.assertEquals(Thread.State.WAITING, t1.getState());
+    QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(1), 1);
+    queue.push(e2);
+    Thread.sleep(10);
+    Assert.assertEquals(Thread.State.TERMINATED, t1.getState());
+    Assert.assertEquals(1, res.size());
+    Assert.assertEquals(e2.getId().toString(), res.get(0).getId().toString());
+  }
+
+  @Test
+  public void testPushExceedCapacity() {
+    IndexedBlockingQueue<QueueElement> queue =
+        new L1PriorityQueue<>(
+            1,
+            (o1, o2) -> {
+              if (o1.equals(o2)) {
+                return 0;
+              }
+              return Integer.compare(o1.getValue(), o2.getValue());
+            },
+            new QueueElement(new QueueElement.QueueElementID(0), 0));
+    QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(1), 1);
+    queue.push(e2);
+    QueueElement e2e = new QueueElement(new QueueElement.QueueElementID(1), 10);
+    queue.push(e2e);
+    QueueElement e3 = new QueueElement(new QueueElement.QueueElementID(2), 2);
+    try {
+      queue.push(e3);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      // ignore;
+    }
+  }
+
+  @Test
+  public void testPushAndPoll() throws InterruptedException {
+    IndexedBlockingQueue<QueueElement> queue =
+        new L1PriorityQueue<>(
+            10,
+            (o1, o2) -> {
+              if (o1.equals(o2)) {
+                return 0;
+              }
+              return Integer.compare(o1.getValue(), o2.getValue());
+            },
+            new QueueElement(new QueueElement.QueueElementID(0), 0));
+    QueueElement e1 = new QueueElement(new QueueElement.QueueElementID(1), 10);
+    queue.push(e1);
+    QueueElement e1e = new QueueElement(new QueueElement.QueueElementID(1), 20);
+    queue.push(e1e);
+    // only 1 element with the same id can be put into
+    Assert.assertEquals(1, queue.size());
+    QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(2), 5);
+    queue.push(e2);
+    Assert.assertEquals(2, queue.size());
+    Assert.assertEquals(e2.getId().toString(), queue.poll().getId().toString());
+    Assert.assertEquals(1, queue.size());
+    Assert.assertEquals(e1e.getId().toString(), queue.poll().getId().toString());
+    Assert.assertEquals(0, queue.size());
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueueTest.java b/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueueTest.java
new file mode 100644
index 0000000..5d34bb7
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueueTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class L2PriorityQueueTest {
+  @Test
+  public void testPollBlocked() throws InterruptedException {
+    IndexedBlockingQueue<QueueElement> queue =
+        new L2PriorityQueue<>(
+            10,
+            (o1, o2) -> {
+              if (o1.equals(o2)) {
+                return 0;
+              }
+              return Integer.compare(o1.getValue(), o2.getValue());
+            },
+            new QueueElement(new QueueElement.QueueElementID(0), 0));
+    List<QueueElement> res = new ArrayList<>();
+    Thread t1 =
+        new Thread(
+            () -> {
+              try {
+                QueueElement e = queue.poll();
+                res.add(e);
+              } catch (InterruptedException e) {
+                e.printStackTrace();
+                Assert.fail();
+              }
+            });
+    t1.start();
+    Thread.sleep(10);
+    Assert.assertEquals(Thread.State.WAITING, t1.getState());
+    QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(1), 1);
+    queue.push(e2);
+    Thread.sleep(10);
+    Assert.assertEquals(Thread.State.TERMINATED, t1.getState());
+    Assert.assertEquals(1, res.size());
+    Assert.assertEquals(e2.getId().toString(), res.get(0).getId().toString());
+  }
+
+  @Test
+  public void testPushExceedCapacity() {
+    IndexedBlockingQueue<QueueElement> queue =
+        new L2PriorityQueue<>(
+            1,
+            (o1, o2) -> {
+              if (o1.equals(o2)) {
+                return 0;
+              }
+              return Integer.compare(o1.getValue(), o2.getValue());
+            },
+            new QueueElement(new QueueElement.QueueElementID(0), 0));
+    QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(1), 1);
+    queue.push(e2);
+    QueueElement e2e = new QueueElement(new QueueElement.QueueElementID(1), 10);
+    queue.push(e2e);
+    QueueElement e3 = new QueueElement(new QueueElement.QueueElementID(2), 2);
+    try {
+      queue.push(e3);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      // ignore;
+    }
+  }
+
+  @Test
+  public void testPushAndPoll() throws InterruptedException {
+    IndexedBlockingQueue<QueueElement> queue =
+        new L2PriorityQueue<>(
+            10,
+            (o1, o2) -> {
+              if (o1.equals(o2)) {
+                return 0;
+              }
+              int res = Integer.compare(o1.getValue(), o2.getValue());
+              if (res != 0) {
+                return res;
+              }
+              return String.CASE_INSENSITIVE_ORDER.compare(
+                  o1.getId().toString(), o2.getId().toString());
+            },
+            new QueueElement(new QueueElement.QueueElementID(0), 0));
+    QueueElement e1 = new QueueElement(new QueueElement.QueueElementID(1), 10);
+    queue.push(e1);
+    QueueElement e1e = new QueueElement(new QueueElement.QueueElementID(1), 20);
+    queue.push(e1e);
+    // only 1 element with the same id can be put into
+    Assert.assertEquals(1, queue.size());
+    QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(2), 5);
+    queue.push(e2);
+    Assert.assertEquals(2, queue.size());
+    Assert.assertEquals(e2.getId().toString(), queue.poll().getId().toString());
+    Assert.assertEquals(1, queue.size());
+    // L1: 5 -> 20 L2: 10
+    QueueElement e3 = new QueueElement(new QueueElement.QueueElementID(3), 10);
+    queue.push(e3);
+    Assert.assertEquals(e1e.getId().toString(), queue.poll().getId().toString());
+    Assert.assertEquals(1, queue.size());
+    Assert.assertEquals(e3.getId().toString(), queue.poll().getId().toString());
+    Assert.assertEquals(0, queue.size());
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/mpp/execution/queue/QueueElement.java b/server/src/test/java/org/apache/iotdb/mpp/execution/queue/QueueElement.java
new file mode 100644
index 0000000..4aed3a5
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/mpp/execution/queue/QueueElement.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+public class QueueElement implements IDIndexedAccessible {
+  private QueueElementID id;
+  private final int value;
+
+  public QueueElement(QueueElementID id, int value) {
+    this.id = id;
+    this.value = value;
+  }
+
+  public int getValue() {
+    return this.value;
+  }
+
+  @Override
+  public ID getId() {
+    return id;
+  }
+
+  @Override
+  public void setId(ID id) {
+    this.id = (QueueElementID) id;
+  }
+
+  @Override
+  public int hashCode() {
+    return id.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return o instanceof QueueElement && ((QueueElement) o).getId().equals(this.id);
+  }
+
+  public static class QueueElementID implements ID {
+    private final int id;
+
+    public QueueElementID(int id) {
+      this.id = id;
+    }
+
+    public int getId() {
+      return this.id;
+    }
+
+    @Override
+    public int hashCode() {
+      return Integer.hashCode(id);
+    }
+
+    @Override
+    public String toString() {
+      return String.valueOf(id);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      return o instanceof QueueElementID && ((QueueElementID) o).getId() == this.id;
+    }
+  }
+}

[iotdb] 03/07: [To_new_mpp] add common classes & thrift files (#5239)

Posted by er...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch new_mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 333c9f04403e907b2acae7352d4ce113975b4255
Author: Zhong Wang <wa...@alibaba-inc.com>
AuthorDate: Tue Mar 15 16:14:08 2022 +0800

    [To_new_mpp] add common classes & thrift files (#5239)
---
 .../java/org/apache/iotdb/mpp/common/ITSBlock.java | 45 ++++++++++++++++++++++
 .../apache/iotdb/mpp/common/TsBlockMetadata.java   | 22 +++++++++++
 thrift/src/main/thrift/common.thrift               | 27 +++++++++++++
 3 files changed, 94 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/mpp/common/ITSBlock.java b/server/src/main/java/org/apache/iotdb/mpp/common/ITSBlock.java
new file mode 100644
index 0000000..449e0fc
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/common/ITSBlock.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.mpp.common;
+
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+/**
+ * Intermediate result for most of ExecOperators. The TsBlock contains data from one or more columns
+ * and constructs them as a row based view The columns can be series, aggregation result for one
+ * series or scalar value (such as deviceName). The TsBlock also contains the metadata to describe
+ * the columns.
+ */
+public class ITSBlock {
+
+  private TsBlockMetadata metadata;
+
+  public boolean hasNext() {
+    return false;
+  }
+
+  public RowRecord getNext() {
+    return null;
+  }
+
+  public TsBlockMetadata getMetadata() {
+    return metadata;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/common/TsBlockMetadata.java b/server/src/main/java/org/apache/iotdb/mpp/common/TsBlockMetadata.java
new file mode 100644
index 0000000..ed7680f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/common/TsBlockMetadata.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.mpp.common;
+
+public class TsBlockMetadata {}
diff --git a/thrift/src/main/thrift/common.thrift b/thrift/src/main/thrift/common.thrift
new file mode 100644
index 0000000..967deed
--- /dev/null
+++ b/thrift/src/main/thrift/common.thrift
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace java org.apache.iotdb.mpp.common.rpc.thrift
+
+
+struct FragmentInstanceID {
+  1: required string queryID
+  2: required string fragmentID
+  3: required string instanceID
+}

[iotdb] 07/07: Implement phase 1

Posted by er...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch new_mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0b5356fd6fdd67454e6a2bf57bdef684ec14d9b7
Author: ericpai <er...@hotmail.com>
AuthorDate: Mon Mar 21 10:09:37 2022 +0800

    Implement phase 1
---
 .../apache/iotdb/commons/service/ServiceType.java  |   2 +-
 .../iotdb/db/mpp/buffer/IDataBlockManager.java     |   6 +-
 ...anceTaskExecutor.java => AbstractExecutor.java} |  40 ++-
 .../iotdb/db/mpp/schedule/ExecutionContext.java    |  24 +-
 .../db/mpp/schedule/FragmentInstanceManager.java   | 270 ++++++++++++++-------
 .../mpp/schedule/FragmentInstanceTaskCallback.java |  27 ---
 .../mpp/schedule/FragmentInstanceTaskExecutor.java |  66 +++--
 .../schedule/FragmentInstanceTimeoutSentinel.java  |  66 ++---
 .../db/mpp/schedule/IFragmentInstanceManager.java  |  24 +-
 .../iotdb/db/mpp/schedule/ITaskScheduler.java      |  77 ++++++
 .../db/mpp/schedule/task/FragmentInstanceTask.java |  83 +++++--
 .../org/apache/iotdb/db/utils/stats/CpuTimer.java  | 156 ++++++++++++
 12 files changed, 622 insertions(+), 219 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index a5ad95d..e583447 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -65,7 +65,7 @@ public enum ServiceType {
   CLUSTER_META_ENGINE("Cluster Meta Engine", "ClusterMetaEngine"),
   CLUSTER_DATA_ENGINE("Cluster Data Engine", "ClusterDataEngine"),
   REST_SERVICE("REST Service", "REST Service"),
-  CONFIG_NODE_SERVICE("Config Node service", "ConfigNodeRPCServer");
+  CONFIG_NODE_SERVICE("Config Node service", "ConfigNodeRPCServer"),
   FRAGMENT_INSTANCE_MANAGER_SERVICE("Fragment instance manager", "FragmentInstanceManager");
 
   private final String name;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
index 391db95..5f45ee8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
@@ -19,9 +19,9 @@
 
 package org.apache.iotdb.db.mpp.buffer;
 
+import org.apache.iotdb.db.mpp.common.TsBlock;
 import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceID;
 import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
-import org.apache.iotdb.mpp.common.ITSBlock;
 
 public interface IDataBlockManager {
 
@@ -59,7 +59,7 @@ public interface IDataBlockManager {
    * @param instanceID ID of fragment instance that generates the block.
    * @return If there are enough memory for the next block.
    */
-  boolean putDataBlock(FragmentInstanceID instanceID, ITSBlock block);
+  boolean putDataBlock(FragmentInstanceID instanceID, TsBlock block);
 
   /**
    * Check if there are data blocks from the specified upstream fragment instance.
@@ -76,5 +76,5 @@ public interface IDataBlockManager {
    * @param instanceID ID of the upstream fragment instance.
    * @return A data block.
    */
-  ITSBlock getDataBlock(FragmentInstanceID instanceID);
+  TsBlock getDataBlock(FragmentInstanceID instanceID);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/AbstractExecutor.java
similarity index 54%
copy from server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/schedule/AbstractExecutor.java
index 2d3e606..f15b192 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/AbstractExecutor.java
@@ -24,29 +24,53 @@ import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** the worker thread of {@link FragmentInstanceTask} */
-public class FragmentInstanceTaskExecutor extends Thread {
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
 
-  private static final Logger logger = LoggerFactory.getLogger(FragmentInstanceTaskExecutor.class);
+/** an abstract executor for {@link FragmentInstanceTask} */
+public abstract class AbstractExecutor extends Thread implements Closeable {
 
+  private static final Logger logger = LoggerFactory.getLogger(AbstractExecutor.class);
   private final IndexedBlockingQueue<FragmentInstanceTask> queue;
+  private final ITaskScheduler scheduler;
+  private volatile boolean closed;
 
-  public FragmentInstanceTaskExecutor(
-      String workerId, ThreadGroup tg, IndexedBlockingQueue<FragmentInstanceTask> queue) {
+  public AbstractExecutor(
+      String workerId,
+      ThreadGroup tg,
+      IndexedBlockingQueue<FragmentInstanceTask> queue,
+      ITaskScheduler scheduler) {
     super(tg, workerId);
     this.queue = queue;
+    this.scheduler = scheduler;
+    this.closed = false;
   }
 
   @Override
   public void run() {
-    while (true) {
+    while (!closed && !Thread.currentThread().isInterrupted()) {
       try {
         FragmentInstanceTask next = queue.poll();
-        // do logic here
+        execute(next);
       } catch (InterruptedException e) {
-        logger.info("{} is interrupted.", this.getName());
         break;
+      } catch (Exception e) {
+        logger.error("Executor " + this.getName() + " processes failed", e);
       }
     }
   }
+
+  protected ITaskScheduler getScheduler() {
+    return scheduler;
+  }
+
+  /** Processing a task. */
+  protected abstract void execute(FragmentInstanceTask task)
+      throws InterruptedException, ExecutionException;
+
+  @Override
+  public void close() throws IOException {
+    closed = true;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/ExecutionContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/ExecutionContext.java
index adf7874..e8cd091 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/ExecutionContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/ExecutionContext.java
@@ -19,6 +19,28 @@
 package org.apache.iotdb.db.mpp.schedule;
 
 import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
+import org.apache.iotdb.db.utils.stats.CpuTimer;
+
+import io.airlift.units.Duration;
 
 /** The execution context of a {@link FragmentInstanceTask} */
-public class ExecutionContext {}
+public class ExecutionContext {
+  private CpuTimer.CpuDuration cpuDuration;
+  private Duration timeSlice;
+
+  public CpuTimer.CpuDuration getCpuDuration() {
+    return cpuDuration;
+  }
+
+  public void setCpuDuration(CpuTimer.CpuDuration cpuDuration) {
+    this.cpuDuration = cpuDuration;
+  }
+
+  public Duration getTimeSlice() {
+    return timeSlice;
+  }
+
+  public void setTimeSlice(Duration timeSlice) {
+    this.timeSlice = timeSlice;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManager.java
index bf38396..1e22368 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManager.java
@@ -18,34 +18,49 @@
  */
 package org.apache.iotdb.db.mpp.schedule;
 
-import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.service.IService;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.db.mpp.execution.ExecFragmentInstance;
 import org.apache.iotdb.db.mpp.schedule.queue.IndexedBlockingQueue;
 import org.apache.iotdb.db.mpp.schedule.queue.L1PriorityQueue;
 import org.apache.iotdb.db.mpp.schedule.queue.L2PriorityQueue;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceID;
 import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
 import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskStatus;
-import org.apache.iotdb.db.service.IService;
-import org.apache.iotdb.db.service.ServiceType;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 /** the manager of fragment instances scheduling */
 public class FragmentInstanceManager implements IFragmentInstanceManager, IService {
 
+  private static final Logger logger = LoggerFactory.getLogger(FragmentInstanceManager.class);
+
   public static IFragmentInstanceManager getInstance() {
     return InstanceHolder.instance;
   }
 
   private final IndexedBlockingQueue<FragmentInstanceTask> readyQueue;
   private final IndexedBlockingQueue<FragmentInstanceTask> timeoutQueue;
-  private final Map<String, List<FragmentInstanceTask>> queryMap;
+  private final Set<FragmentInstanceTask> blockedTasks;
+  private final Map<String, Set<FragmentInstanceTask>> queryMap;
+  private final ITaskScheduler scheduler;
 
   private static final int MAX_CAPACITY = 1000; // TODO: load from config files
   private static final int WORKER_THREAD_NUM = 4; // TODO: load from config files
-  private final ThreadGroup workerGroups = new ThreadGroup("ScheduleThreads");
+  private static final int QUERY_TIMEOUT_MS = 10000; // TODO: load from config files or requests
+  private final ThreadGroup workerGroups;
+  private final List<AbstractExecutor> threads;
 
   public FragmentInstanceManager() {
     this.readyQueue =
@@ -59,21 +74,38 @@ public class FragmentInstanceManager implements IFragmentInstanceManager, IServi
             new FragmentInstanceTask.SchedulePriorityComparator(),
             new FragmentInstanceTask());
     this.queryMap = new ConcurrentHashMap<>();
+    this.blockedTasks = Collections.synchronizedSet(new HashSet<>());
+    this.scheduler = new Scheduler();
+    this.workerGroups = new ThreadGroup("ScheduleThreads");
+    this.threads = new ArrayList<>();
   }
 
   @Override
   public void start() throws StartupException {
     for (int i = 0; i < WORKER_THREAD_NUM; i++) {
-      new FragmentInstanceTaskExecutor("Worker-Thread-" + i, workerGroups, readyQueue).start();
+      AbstractExecutor t =
+          new FragmentInstanceTaskExecutor(
+              "Worker-Thread-" + i, workerGroups, readyQueue, scheduler);
+      threads.add(t);
+      t.start();
     }
-    new FragmentInstanceTimeoutSentinel(
-            "Sentinel-Thread", workerGroups, timeoutQueue, this::abortFragmentInstanceTask)
-        .start();
+    AbstractExecutor t =
+        new FragmentInstanceTimeoutSentinel(
+            "Sentinel-Thread", workerGroups, timeoutQueue, scheduler);
+    threads.add(t);
+    t.start();
   }
 
   @Override
   public void stop() {
-    workerGroups.interrupt();
+    this.threads.forEach(
+        t -> {
+          try {
+            t.close();
+          } catch (IOException e) {
+            // Only a field is set, there's no chance to throw an IOException
+          }
+        });
   }
 
   @Override
@@ -82,97 +114,173 @@ public class FragmentInstanceManager implements IFragmentInstanceManager, IServi
   }
 
   @Override
-  public void submitFragmentInstance() {
-    // TODO: pass a real task
-    FragmentInstanceTask task = new FragmentInstanceTask();
-
-    task.lock();
-    try {
-      timeoutQueue.push(task);
-      // TODO: if no upstream deps, set to ready
-      task.setStatus(FragmentInstanceTaskStatus.READY);
-      readyQueue.push(task);
-    } finally {
-      task.unlock();
+  public void submitFragmentInstances(String queryId, List<ExecFragmentInstance> instances) {
+    Set<FragmentInstanceTask> tasks =
+        instances.stream()
+            .map(
+                v ->
+                    new FragmentInstanceTask(v, QUERY_TIMEOUT_MS, FragmentInstanceTaskStatus.READY))
+            .collect(Collectors.toSet());
+    queryMap.put(queryId, Collections.synchronizedSet(tasks));
+    for (FragmentInstanceTask task : tasks) {
+      task.lock();
+      try {
+        timeoutQueue.push(task);
+        readyQueue.push(task);
+      } finally {
+        task.unlock();
+      }
     }
   }
 
   @Override
-  public void inputBlockAvailable(
-      FragmentInstanceID instanceID, FragmentInstanceID upstreamInstanceId) {
-    FragmentInstanceTask task = timeoutQueue.get(instanceID);
-    if (task == null) {
-      return;
-    }
-    task.lock();
-    try {
-      if (task.getStatus() != FragmentInstanceTaskStatus.BLOCKED) {
-        return;
-      }
-      task.inputReady(instanceID);
-      if (task.getStatus() == FragmentInstanceTaskStatus.READY) {
-        readyQueue.push(task);
+  public void abortQuery(String queryId) {
+    Set<FragmentInstanceTask> queryRelatedTasks = queryMap.remove(queryId);
+    if (queryRelatedTasks != null) {
+      for (FragmentInstanceTask task : queryRelatedTasks) {
+        task.lock();
+        try {
+          clearFragmentInstanceTask(task);
+        } finally {
+          task.unlock();
+        }
       }
-    } finally {
-      task.unlock();
     }
   }
 
   @Override
-  public void outputBlockAvailable(FragmentInstanceID instanceID) {
-    FragmentInstanceTask task = timeoutQueue.get(instanceID);
-    if (task == null) {
-      return;
+  public void fetchFragmentInstance(ExecFragmentInstance instance) {}
+
+  private void clearFragmentInstanceTask(FragmentInstanceTask task) {
+    if (task.getStatus() != FragmentInstanceTaskStatus.FINISHED) {
+      task.setStatus(FragmentInstanceTaskStatus.ABORTED);
     }
-    task.lock();
-    try {
-      if (task.getStatus() != FragmentInstanceTaskStatus.BLOCKED) {
-        return;
-      }
-      task.outputReady();
-      if (task.getStatus() == FragmentInstanceTaskStatus.READY) {
-        readyQueue.push(task);
-      }
-    } finally {
-      task.unlock();
+    if (task.getStatus() == FragmentInstanceTaskStatus.ABORTED) {
+      // TODO: remember to call the implementation
+      // IDataBlockManager.forceDeregisterFragmentInstance(task);
+    }
+    readyQueue.remove(task.getId());
+    timeoutQueue.remove(task.getId());
+    blockedTasks.remove(task);
+    Set<FragmentInstanceTask> tasks = queryMap.get(task.getId().getQueryId());
+    tasks.remove(task);
+    if (tasks.isEmpty()) {
+      queryMap.remove(task.getId().getQueryId());
     }
   }
 
-  /** abort a {@link FragmentInstanceTask} */
-  void abortFragmentInstanceTask(FragmentInstanceTask task) {
-    List<FragmentInstanceTask> queryRelatedTasks = queryMap.remove(task.getId().getQueryId());
-    clearFragmentInstanceTask(task);
-    if (queryRelatedTasks != null) {
-      // if queryRelatedTask is not null, it means that the clean request comes from this node, not
-      // coordinator.
-      // TODO: tell coordinator
-      for (FragmentInstanceTask otherTask : queryRelatedTasks) {
-        clearFragmentInstanceTask(otherTask);
+  private static class InstanceHolder {
+
+    private InstanceHolder() {}
+
+    private static final IFragmentInstanceManager instance = new FragmentInstanceManager();
+  }
+  /** the default scheduler implementation */
+  private class Scheduler implements ITaskScheduler {
+    @Override
+    public void blockedToReady(FragmentInstanceTask task) {
+      task.lock();
+      try {
+        if (task.getStatus() != FragmentInstanceTaskStatus.BLOCKED) {
+          return;
+        }
+        task.setStatus(FragmentInstanceTaskStatus.READY);
+        readyQueue.push(task);
+        blockedTasks.remove(task);
+      } finally {
+        task.unlock();
       }
     }
-    // TODO: call LocalMemoryManager to release resources
-  }
 
-  private void clearFragmentInstanceTask(FragmentInstanceTask task) {
-    task.lock();
-    try {
-      if (task.getStatus() != FragmentInstanceTaskStatus.FINISHED) {
-        task.setStatus(FragmentInstanceTaskStatus.ABORTED);
+    @Override
+    public boolean readyToRunning(FragmentInstanceTask task) {
+      task.lock();
+      try {
+        if (task.getStatus() != FragmentInstanceTaskStatus.READY) {
+          return false;
+        }
+        task.setStatus(FragmentInstanceTaskStatus.RUNNING);
+      } finally {
+        task.unlock();
       }
-      readyQueue.remove(task.getId());
-      timeoutQueue.remove(task.getId());
-    } finally {
-      task.unlock();
+      return true;
     }
-  }
 
-  @Override
-  public void abortQuery(String queryId) {}
+    @Override
+    public void runningToReady(FragmentInstanceTask task, ExecutionContext context) {
+      task.lock();
+      try {
+        if (task.getStatus() != FragmentInstanceTaskStatus.RUNNING) {
+          return;
+        }
+        task.updateSchedulePriority(context);
+        task.setStatus(FragmentInstanceTaskStatus.READY);
+        readyQueue.push(task);
+      } finally {
+        task.unlock();
+      }
+    }
 
-  private static class InstanceHolder {
+    @Override
+    public void runningToBlocked(FragmentInstanceTask task, ExecutionContext context) {
+      task.lock();
+      try {
+        if (task.getStatus() != FragmentInstanceTaskStatus.RUNNING) {
+          return;
+        }
+        task.updateSchedulePriority(context);
+        task.setStatus(FragmentInstanceTaskStatus.BLOCKED);
+        blockedTasks.add(task);
+      } finally {
+        task.unlock();
+      }
+    }
 
-    private InstanceHolder() {}
+    @Override
+    public void runningToFinished(FragmentInstanceTask task, ExecutionContext context) {
+      task.lock();
+      try {
+        if (task.getStatus() != FragmentInstanceTaskStatus.RUNNING) {
+          return;
+        }
+        task.updateSchedulePriority(context);
+        task.setStatus(FragmentInstanceTaskStatus.FINISHED);
+        clearFragmentInstanceTask(task);
+      } finally {
+        task.unlock();
+      }
+    }
 
-    private static final IFragmentInstanceManager instance = new FragmentInstanceManager();
+    @Override
+    public void toAborted(FragmentInstanceTask task) {
+      task.lock();
+      try {
+        // If a task is already in an end state, it indicates that the task is finalized in other
+        // threads.
+        if (task.isEndState()) {
+          return;
+        }
+        logger.warn(
+            "The task {} is aborted. All other tasks in the same query will be cancelled",
+            task.getId().toString());
+        clearFragmentInstanceTask(task);
+      } finally {
+        task.unlock();
+      }
+      Set<FragmentInstanceTask> queryRelatedTasks = queryMap.get(task.getId().getQueryId());
+      if (queryRelatedTasks != null) {
+        for (FragmentInstanceTask otherTask : queryRelatedTasks) {
+          if (task.equals(otherTask)) {
+            continue;
+          }
+          otherTask.lock();
+          try {
+            clearFragmentInstanceTask(otherTask);
+          } finally {
+            otherTask.unlock();
+          }
+        }
+      }
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskCallback.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskCallback.java
deleted file mode 100644
index c3ba2e7..0000000
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskCallback.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.mpp.schedule;
-
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
-
-/** A common interface for {@link FragmentInstanceTask} business logic callback */
-@FunctionalInterface
-public interface FragmentInstanceTaskCallback {
-  void call(FragmentInstanceTask task) throws Exception;
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
index 2d3e606..d1d1b8c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
@@ -18,35 +18,67 @@
  */
 package org.apache.iotdb.db.mpp.schedule;
 
+import org.apache.iotdb.db.mpp.execution.ExecFragmentInstance;
 import org.apache.iotdb.db.mpp.schedule.queue.IndexedBlockingQueue;
 import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
+import org.apache.iotdb.db.utils.stats.CpuTimer;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.airlift.units.Duration;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 
 /** the worker thread of {@link FragmentInstanceTask} */
-public class FragmentInstanceTaskExecutor extends Thread {
+public class FragmentInstanceTaskExecutor extends AbstractExecutor {
 
-  private static final Logger logger = LoggerFactory.getLogger(FragmentInstanceTaskExecutor.class);
+  private static final Duration EXECUTION_TIME_SLICE = new Duration(100, TimeUnit.MILLISECONDS);
 
-  private final IndexedBlockingQueue<FragmentInstanceTask> queue;
+  // As the callback is lightweight enough, there's no need to use another one thread to execute.
+  private static final Executor listeningExecutor = MoreExecutors.directExecutor();
 
   public FragmentInstanceTaskExecutor(
-      String workerId, ThreadGroup tg, IndexedBlockingQueue<FragmentInstanceTask> queue) {
-    super(tg, workerId);
-    this.queue = queue;
+      String workerId,
+      ThreadGroup tg,
+      IndexedBlockingQueue<FragmentInstanceTask> queue,
+      ITaskScheduler scheduler) {
+    super(workerId, tg, queue, scheduler);
   }
 
   @Override
-  public void run() {
-    while (true) {
-      try {
-        FragmentInstanceTask next = queue.poll();
-        // do logic here
-      } catch (InterruptedException e) {
-        logger.info("{} is interrupted.", this.getName());
-        break;
-      }
+  public void execute(FragmentInstanceTask task) throws InterruptedException {
+    // try to switch it to RUNNING
+    if (!getScheduler().readyToRunning(task)) {
+      return;
+    }
+    ExecFragmentInstance instance = task.getFragmentInstance();
+    CpuTimer timer = new CpuTimer();
+    ListenableFuture<Void> future = instance.processFor(EXECUTION_TIME_SLICE);
+    CpuTimer.CpuDuration duration = timer.elapsedTime();
+    // long cost = System.nanoTime() - startTime;
+    // If the future is cancelled, the task is in an error and should be thrown.
+    if (future.isCancelled()) {
+      getScheduler().toAborted(task);
+      return;
+    }
+    ExecutionContext context = new ExecutionContext();
+    context.setCpuDuration(duration);
+    context.setTimeSlice(EXECUTION_TIME_SLICE);
+    if (instance.isFinished()) {
+      getScheduler().runningToFinished(task, context);
+      return;
+    }
+
+    if (future.isDone()) {
+      getScheduler().runningToReady(task, context);
+    } else {
+      getScheduler().runningToBlocked(task, context);
+      future.addListener(
+          () -> {
+            getScheduler().blockedToReady(task);
+          },
+          listeningExecutor);
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java
index 75f9eba..5093ecc 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java
@@ -20,64 +20,36 @@ package org.apache.iotdb.db.mpp.schedule;
 
 import org.apache.iotdb.db.mpp.schedule.queue.IndexedBlockingQueue;
 import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskStatus;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /** the thread for watching the timeout of {@link FragmentInstanceTask} */
-public class FragmentInstanceTimeoutSentinel extends Thread {
-
-  private static final Logger logger =
-      LoggerFactory.getLogger(FragmentInstanceTimeoutSentinel.class);
-
-  private final IndexedBlockingQueue<FragmentInstanceTask> queue;
-  private final FragmentInstanceTaskCallback timeoutCallback;
-  // the check interval in milliseconds if the queue head remains the same.
-  private static final int CHECK_INTERVAL = 100;
+public class FragmentInstanceTimeoutSentinel extends AbstractExecutor {
 
   public FragmentInstanceTimeoutSentinel(
       String workerId,
       ThreadGroup tg,
       IndexedBlockingQueue<FragmentInstanceTask> queue,
-      FragmentInstanceTaskCallback timeoutCallback) {
-    super(tg, workerId);
-    this.queue = queue;
-    this.timeoutCallback = timeoutCallback;
+      ITaskScheduler scheduler) {
+    super(workerId, tg, queue, scheduler);
   }
 
   @Override
-  public void run() {
-    while (true) {
-      try {
-        FragmentInstanceTask next = queue.poll();
-        next.lock();
-        try {
-          // if this task is already in an end state, it means that the resource releasing will be
-          // handled by other threads, we don't care anymore.
-          if (next.isEndState()) {
-            continue;
-          }
-          // if this task is not in end state and not timeout, we should push it back to the queue.
-          if (next.getDDL() > System.currentTimeMillis()) {
-            queue.push(next);
-            Thread.sleep(CHECK_INTERVAL);
-            continue;
-          }
-          next.setStatus(FragmentInstanceTaskStatus.ABORTED);
-        } finally {
-          next.unlock();
-        }
-        try {
-          // Or we should do something to abort
-          timeoutCallback.call(next);
-        } catch (Exception e) {
-          logger.error("Abort instance " + next.getId() + " failed", e);
-        }
-      } catch (InterruptedException e) {
-        logger.info("{} is interrupted.", this.getName());
-        break;
+  public void execute(FragmentInstanceTask task) throws InterruptedException {
+    task.lock();
+    try {
+      // if this task is already in an end state, it means that the resource releasing will be
+      // handled by other threads, we don't care anymore.
+      if (task.isEndState()) {
+        return;
       }
+    } finally {
+      task.unlock();
+    }
+    // if this task is not timeout, we can wait it to timeout.
+    long waitTime = task.getDDL() - System.currentTimeMillis();
+    if (waitTime > 0L) {
+      // After this time, the task must be timeout.
+      Thread.sleep(waitTime);
     }
+    getScheduler().toAborted(task);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceManager.java
index 98dc0c4..6d1e34f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceManager.java
@@ -18,28 +18,19 @@
  */
 package org.apache.iotdb.db.mpp.schedule;
 
-import org.apache.iotdb.db.mpp.buffer.IDataBlockManager;
-import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceID;
+import org.apache.iotdb.db.mpp.execution.ExecFragmentInstance;
+
+import java.util.List;
 
 /** the interface of fragment instance scheduling */
 public interface IFragmentInstanceManager {
 
-  void submitFragmentInstance();
-
   /**
-   * the notifying interface for {@link IDataBlockManager} when upstream data comes.
+   * Submit one or more {@link ExecFragmentInstance} in one query for later scheduling.
    *
-   * @param instanceID the fragment instance to be notified.
-   * @param upstreamInstanceId the upstream instance id.
+   * @param instances the submitted instances.
    */
-  void inputBlockAvailable(FragmentInstanceID instanceID, FragmentInstanceID upstreamInstanceId);
-
-  /**
-   * the notifying interface for {@link IDataBlockManager} when downstream data has been consumed.
-   *
-   * @param instanceID the fragment instance to be notified.
-   */
-  void outputBlockAvailable(FragmentInstanceID instanceID);
+  void submitFragmentInstances(String queryId, List<ExecFragmentInstance> instances);
 
   /**
    * abort all the instances in this query
@@ -47,4 +38,7 @@ public interface IFragmentInstanceManager {
    * @param queryId the id of the query to be aborted.
    */
   void abortQuery(String queryId);
+
+  /** Fetch an {@link ExecFragmentInstance}. */
+  void fetchFragmentInstance(ExecFragmentInstance instance);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/ITaskScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/ITaskScheduler.java
new file mode 100644
index 0000000..328acb6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/ITaskScheduler.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.schedule;
+
+import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
+import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskStatus;
+
+/** the scheduler interface of {@link FragmentInstanceTask} */
+interface ITaskScheduler {
+
+  /**
+   * Switch a task from {@link FragmentInstanceTaskStatus#BLOCKED} to {@link
+   * FragmentInstanceTaskStatus#READY}.
+   *
+   * @param task the task to be switched.
+   */
+  void blockedToReady(FragmentInstanceTask task);
+
+  /**
+   * Switch a task from {@link FragmentInstanceTaskStatus#READY} to {@link
+   * FragmentInstanceTaskStatus#RUNNING}.
+   *
+   * @param task the task to be switched.
+   * @return true if it's switched to the target status successfully, otherwise false.
+   */
+  boolean readyToRunning(FragmentInstanceTask task);
+
+  /**
+   * Switch a task from {@link FragmentInstanceTaskStatus#RUNNING} to {@link
+   * FragmentInstanceTaskStatus#READY}.
+   *
+   * @param task the task to be switched.
+   * @param context the execution context of last running.
+   */
+  void runningToReady(FragmentInstanceTask task, ExecutionContext context);
+
+  /**
+   * Switch a task from {@link FragmentInstanceTaskStatus#RUNNING} to {@link
+   * FragmentInstanceTaskStatus#BLOCKED}.
+   *
+   * @param task the task to be switched.
+   * @param context the execution context of last running.
+   */
+  void runningToBlocked(FragmentInstanceTask task, ExecutionContext context);
+
+  /**
+   * Switch a task from {@link FragmentInstanceTaskStatus#RUNNING} to {@link
+   * FragmentInstanceTaskStatus#FINISHED}.
+   *
+   * @param task the task to be switched.
+   * @param context the execution context of last running.
+   */
+  void runningToFinished(FragmentInstanceTask task, ExecutionContext context);
+
+  /**
+   * Switch a task to {@link FragmentInstanceTaskStatus#ABORTED}.
+   *
+   * @param task the task to be switched.
+   */
+  void toAborted(FragmentInstanceTask task);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
index 3c26f33..65e2e4a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
@@ -18,12 +18,17 @@
  */
 package org.apache.iotdb.db.mpp.schedule.task;
 
+import org.apache.iotdb.db.mpp.execution.ExecFragmentInstance;
 import org.apache.iotdb.db.mpp.schedule.ExecutionContext;
 import org.apache.iotdb.db.mpp.schedule.FragmentInstanceTaskExecutor;
 import org.apache.iotdb.db.mpp.schedule.queue.ID;
 import org.apache.iotdb.db.mpp.schedule.queue.IDIndexedAccessible;
 
+import com.google.common.util.concurrent.ListenableFuture;
+import io.airlift.units.Duration;
+
 import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -35,23 +40,26 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
 
   private FragmentInstanceID id;
   private FragmentInstanceTaskStatus status;
-  private final ExecutionContext executionContext;
+  private final ExecFragmentInstance fragmentInstance;
 
   // the higher this field is, the higher probability it will be scheduled.
-  private long schedulePriority;
+  private double schedulePriority;
   private final long ddl;
   private final Lock lock;
 
+  // Running stats
+  private long cpuWallNano;
+
   /** Initialize a dummy instance for queryHolder */
   public FragmentInstanceTask() {
-    this(null, 0L, null);
+    this(new StubFragmentInstance(), 0L, null);
   }
 
   public FragmentInstanceTask(
-      FragmentInstanceID id, long timeoutMs, FragmentInstanceTaskStatus status) {
-    this.id = id;
+      ExecFragmentInstance instance, long timeoutMs, FragmentInstanceTaskStatus status) {
+    this.fragmentInstance = instance;
+    this.id = new FragmentInstanceID(instance.getInfo(), instance.getInfo(), instance.getInfo());
     this.setStatus(status);
-    this.executionContext = new ExecutionContext();
     this.schedulePriority = 0L;
     this.ddl = System.currentTimeMillis() + timeoutMs;
     this.lock = new ReentrantLock();
@@ -75,26 +83,32 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
         || status == FragmentInstanceTaskStatus.FINISHED;
   }
 
-  public void inputReady(FragmentInstanceID inputId) {
-    throw new UnsupportedOperationException("unsupported");
-  }
-
-  public void outputReady() {
-    throw new UnsupportedOperationException("unsupported");
+  public ExecFragmentInstance getFragmentInstance() {
+    return fragmentInstance;
   }
 
   public void setStatus(FragmentInstanceTaskStatus status) {
     this.status = status;
   }
 
-  public ExecutionContext getExecutionContext() {
-    return executionContext;
-  }
+  /**
+   * Update the schedule priority according to the execution context.
+   *
+   * @param context the last execution context.
+   */
+  public void updateSchedulePriority(ExecutionContext context) {
+    // TODO: need to implement more complex here
+
+    // 1. The penalty factor means that if a task executes less time in one schedule, it will have a
+    // high schedule priority
+    double penaltyFactor =
+        context.getCpuDuration().getWall().getValue(TimeUnit.NANOSECONDS)
+            / context.getTimeSlice().getValue(TimeUnit.NANOSECONDS);
+    // 2. If a task is nearly timeout, it should be scheduled as soon as possible.
+    long base = System.currentTimeMillis() - ddl;
 
-  /** Update the schedule priority according to the execution context. */
-  public void updateSchedulePriority() {
-    // TODO: need to implement here
-    this.schedulePriority = System.currentTimeMillis() - ddl;
+    // 3. Now the final schedulePriority is out, this may not be so reasonable.
+    this.schedulePriority = base * penaltyFactor;
   }
 
   public void lock() {
@@ -113,6 +127,16 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
     return ddl;
   }
 
+  @Override
+  public int hashCode() {
+    return id.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return o instanceof FragmentInstanceTask && ((FragmentInstanceTask) o).getId().equals(id);
+  }
+
   /** a comparator of ddl, the less the ddl is, the low order it has. */
   public static class TimeoutComparator implements Comparator<FragmentInstanceTask> {
 
@@ -148,4 +172,25 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
       return o1.getId().compareTo(o2);
     }
   }
+
+  private static class StubFragmentInstance implements ExecFragmentInstance {
+
+    @Override
+    public boolean isFinished() {
+      return false;
+    }
+
+    @Override
+    public ListenableFuture<Void> processFor(Duration duration) {
+      return null;
+    }
+
+    @Override
+    public String getInfo() {
+      return "stub";
+    }
+
+    @Override
+    public void close() {}
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/stats/CpuTimer.java b/server/src/main/java/org/apache/iotdb/db/utils/stats/CpuTimer.java
new file mode 100644
index 0000000..2e4947e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/stats/CpuTimer.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils.stats;
+
+import io.airlift.units.Duration;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+/**
+ * this class is copied from
+ * https://github.com/airlift/airlift/blob/214/stats/src/main/java/io/airlift/stats/CpuTimer.java as
+ * it doesn't support Java8.
+ */
+public class CpuTimer {
+  private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean();
+
+  private final long wallStartTime;
+  private final long cpuStartTime;
+  private final long userStartTime;
+
+  private long intervalWallStart;
+  private long intervalCpuStart;
+  private long intervalUserStart;
+
+  public CpuTimer() {
+    wallStartTime = System.nanoTime();
+    cpuStartTime = THREAD_MX_BEAN.getCurrentThreadCpuTime();
+    userStartTime = THREAD_MX_BEAN.getCurrentThreadUserTime();
+
+    intervalWallStart = wallStartTime;
+    intervalCpuStart = cpuStartTime;
+    intervalUserStart = userStartTime;
+  }
+
+  public CpuDuration startNewInterval() {
+    long currentWallTime = System.nanoTime();
+    long currentCpuTime = THREAD_MX_BEAN.getCurrentThreadCpuTime();
+    long currentUserTime = THREAD_MX_BEAN.getCurrentThreadUserTime();
+
+    CpuDuration cpuDuration =
+        new CpuDuration(
+            nanosBetween(intervalWallStart, currentWallTime),
+            nanosBetween(intervalCpuStart, currentCpuTime),
+            nanosBetween(intervalUserStart, currentUserTime));
+
+    intervalWallStart = currentWallTime;
+    intervalCpuStart = currentCpuTime;
+    intervalUserStart = currentUserTime;
+
+    return cpuDuration;
+  }
+
+  public CpuDuration elapsedIntervalTime() {
+    long currentWallTime = System.nanoTime();
+    long currentCpuTime = THREAD_MX_BEAN.getCurrentThreadCpuTime();
+    long currentUserTime = THREAD_MX_BEAN.getCurrentThreadUserTime();
+
+    return new CpuDuration(
+        nanosBetween(intervalWallStart, currentWallTime),
+        nanosBetween(intervalCpuStart, currentCpuTime),
+        nanosBetween(intervalUserStart, currentUserTime));
+  }
+
+  public CpuDuration elapsedTime() {
+    long currentWallTime = System.nanoTime();
+    long currentCpuTime = THREAD_MX_BEAN.getCurrentThreadCpuTime();
+    long currentUserTime = THREAD_MX_BEAN.getCurrentThreadUserTime();
+
+    return new CpuDuration(
+        nanosBetween(wallStartTime, currentWallTime),
+        nanosBetween(cpuStartTime, currentCpuTime),
+        nanosBetween(userStartTime, currentUserTime));
+  }
+
+  private static Duration nanosBetween(long start, long end) {
+    return new Duration(Math.abs(end - start), NANOSECONDS);
+  }
+
+  public static class CpuDuration {
+    private final Duration wall;
+    private final Duration cpu;
+    private final Duration user;
+
+    public CpuDuration() {
+      this.wall = new Duration(0, NANOSECONDS);
+      this.cpu = new Duration(0, NANOSECONDS);
+      this.user = new Duration(0, NANOSECONDS);
+    }
+
+    public CpuDuration(Duration wall, Duration cpu, Duration user) {
+      this.wall = wall;
+      this.cpu = cpu;
+      this.user = user;
+    }
+
+    public Duration getWall() {
+      return wall;
+    }
+
+    public Duration getCpu() {
+      return cpu;
+    }
+
+    public Duration getUser() {
+      return user;
+    }
+
+    public CpuDuration add(CpuDuration cpuDuration) {
+      return new CpuDuration(
+          addDurations(wall, cpuDuration.wall),
+          addDurations(cpu, cpuDuration.cpu),
+          addDurations(user, cpuDuration.user));
+    }
+
+    public CpuDuration subtract(CpuDuration cpuDuration) {
+      return new CpuDuration(
+          subtractDurations(wall, cpuDuration.wall),
+          subtractDurations(cpu, cpuDuration.cpu),
+          subtractDurations(user, cpuDuration.user));
+    }
+
+    private static Duration addDurations(Duration a, Duration b) {
+      return new Duration(a.getValue(NANOSECONDS) + b.getValue(NANOSECONDS), NANOSECONDS);
+    }
+
+    private static Duration subtractDurations(Duration a, Duration b) {
+      return new Duration(
+          Math.max(0, a.getValue(NANOSECONDS) - b.getValue(NANOSECONDS)), NANOSECONDS);
+    }
+
+    @Override
+    public String toString() {
+      return toStringHelper(this).add("wall", wall).add("cpu", cpu).add("user", user).toString();
+    }
+  }
+}

[iotdb] 01/07: Basic implementation of FragmentInstanceManager

Posted by er...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch new_mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 736bd02ea6b87caf3a6bbb6514e81da1c5640a10
Author: ericpai <er...@hotmail.com>
AuthorDate: Mon Mar 14 18:08:07 2022 +0800

    Basic implementation of FragmentInstanceManager
---
 .../apache/iotdb/commons/service/ServiceType.java  |   1 +
 .../iotdb/mpp/execution/ExecutionContext.java      |  24 ++++
 .../mpp/execution/FragmentInstanceManager.java     | 101 ++++++++++++++
 .../execution/FragmentInstanceTaskExecutor.java    |  51 +++++++
 .../execution/FragmentInstanceTimeoutSentinel.java |  52 ++++++++
 .../mpp/execution/IFragmentInstanceManager.java    |  50 +++++++
 .../org/apache/iotdb/mpp/execution/queue/ID.java   |  22 +++
 .../mpp/execution/queue/IDIndexedAccessible.java   |  27 ++++
 .../mpp/execution/queue/IndexedBlockingQueue.java  | 148 +++++++++++++++++++++
 .../iotdb/mpp/execution/queue/L1PriorityQueue.java |  76 +++++++++++
 .../iotdb/mpp/execution/queue/L2PriorityQueue.java |  89 +++++++++++++
 .../mpp/execution/task/FragmentInstanceID.java     |  67 ++++++++++
 .../mpp/execution/task/FragmentInstanceTask.java   | 138 +++++++++++++++++++
 .../execution/task/FragmentInstanceTaskStatus.java |  37 ++++++
 14 files changed, 883 insertions(+)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index 6d00f88..a5ad95d 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -66,6 +66,7 @@ public enum ServiceType {
   CLUSTER_DATA_ENGINE("Cluster Data Engine", "ClusterDataEngine"),
   REST_SERVICE("REST Service", "REST Service"),
   CONFIG_NODE_SERVICE("Config Node service", "ConfigNodeRPCServer");
+  FRAGMENT_INSTANCE_MANAGER_SERVICE("Fragment instance manager", "FragmentInstanceManager");
 
   private final String name;
   private final String jmxName;
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/ExecutionContext.java b/server/src/main/java/org/apache/iotdb/mpp/execution/ExecutionContext.java
new file mode 100644
index 0000000..7428b73
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/ExecutionContext.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution;
+
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+
+/** The execution context of a {@link FragmentInstanceTask} */
+public class ExecutionContext {}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java
new file mode 100644
index 0000000..4c7c157
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution;
+
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.mpp.execution.queue.IndexedBlockingQueue;
+import org.apache.iotdb.mpp.execution.queue.L1PriorityQueue;
+import org.apache.iotdb.mpp.execution.queue.L2PriorityQueue;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceID;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** the manager of fragment instances scheduling */
+public class FragmentInstanceManager implements IFragmentInstanceManager, IService {
+
+  public static IFragmentInstanceManager getInstance() {
+    return InstanceHolder.instance;
+  }
+
+  private final IndexedBlockingQueue<FragmentInstanceTask> readyQueue;
+  private final IndexedBlockingQueue<FragmentInstanceTask> timeoutQueue;
+  private final Map<String, List<FragmentInstanceID>> queryMap;
+
+  private static final int MAX_CAPACITY = 1000; // TODO: load from config files
+  private static final int WORKER_THREAD_NUM = 4; // TODO: load from config files
+  private final ThreadGroup workerGroups = new ThreadGroup("ScheduleThreads");
+
+  public FragmentInstanceManager() {
+    this.readyQueue =
+        new L2PriorityQueue<>(
+            MAX_CAPACITY,
+            new FragmentInstanceTask.SchedulePriorityComparator(),
+            new FragmentInstanceTask());
+    this.timeoutQueue =
+        new L1PriorityQueue<>(
+            MAX_CAPACITY,
+            new FragmentInstanceTask.SchedulePriorityComparator(),
+            new FragmentInstanceTask());
+    this.queryMap = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void start() throws StartupException {
+    for (int i = 0; i < WORKER_THREAD_NUM; i++) {
+      new FragmentInstanceTaskExecutor("Worker-Thread-" + i, workerGroups, readyQueue).start();
+    }
+    new FragmentInstanceTimeoutSentinel("Sentinel-Thread", workerGroups, timeoutQueue).start();
+  }
+
+  @Override
+  public void stop() {
+    workerGroups.interrupt();
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.FRAGMENT_INSTANCE_MANAGER_SERVICE;
+  }
+
+  @Override
+  public void submitFragmentInstance() {}
+
+  @Override
+  public void inputBlockAvailable(
+      FragmentInstanceID instanceID, FragmentInstanceID upstreamInstanceId) {}
+
+  @Override
+  public void outputBlockAvailable(
+      FragmentInstanceID instanceID, FragmentInstanceID downstreamInstanceId) {}
+
+  @Override
+  public void abortQuery(String queryId) {}
+
+  private static class InstanceHolder {
+
+    private InstanceHolder() {}
+
+    private static final IFragmentInstanceManager instance = new FragmentInstanceManager();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
new file mode 100644
index 0000000..5c704db
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution;
+
+import org.apache.iotdb.mpp.execution.queue.IndexedBlockingQueue;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** the worker thread of {@link FragmentInstanceTask} */
+public class FragmentInstanceTaskExecutor extends Thread {
+
+  private static final Logger logger = LoggerFactory.getLogger(FragmentInstanceTaskExecutor.class);
+
+  private final IndexedBlockingQueue<FragmentInstanceTask> queue;
+
+  public FragmentInstanceTaskExecutor(
+      String workerId, ThreadGroup tg, IndexedBlockingQueue<FragmentInstanceTask> queue) {
+    super(tg, workerId);
+    this.queue = queue;
+  }
+
+  @Override
+  public void run() {
+    try {
+      while (true) {
+        FragmentInstanceTask next = queue.poll();
+        // do logic here
+      }
+    } catch (InterruptedException e) {
+      logger.info("{} is interrupted.", this.getName());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java
new file mode 100644
index 0000000..7b352ba
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution;
+
+import org.apache.iotdb.mpp.execution.queue.IndexedBlockingQueue;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** the thread for watching the timeout of {@link FragmentInstanceTask} */
+public class FragmentInstanceTimeoutSentinel extends Thread {
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(FragmentInstanceTimeoutSentinel.class);
+
+  private final IndexedBlockingQueue<FragmentInstanceTask> queue;
+
+  public FragmentInstanceTimeoutSentinel(
+      String workerId, ThreadGroup tg, IndexedBlockingQueue<FragmentInstanceTask> queue) {
+    super(tg, workerId);
+    this.queue = queue;
+  }
+
+  @Override
+  public void run() {
+    try {
+      while (true) {
+        FragmentInstanceTask next = queue.poll();
+        // do logic here
+      }
+    } catch (InterruptedException e) {
+      logger.info("{} is interrupted.", this.getName());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
new file mode 100644
index 0000000..e0eecfa
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution;
+
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceID;
+
+/** the interface of fragment instance scheduling */
+public interface IFragmentInstanceManager {
+
+  void submitFragmentInstance();
+
+  /**
+   * the notifying interface for {@link DataBlockManager} when upstream data comes.
+   *
+   * @param instanceID the fragment instance to be notified.
+   * @param upstreamInstanceId the upstream instance id.
+   */
+  void inputBlockAvailable(FragmentInstanceID instanceID, FragmentInstanceID upstreamInstanceId);
+
+  /**
+   * the notifying interface for {@link DataBlockManager} when downstream data has been consumed.
+   *
+   * @param instanceID the fragment instance to be notified.
+   * @param downstreamInstanceId the downstream instance id.
+   */
+  void outputBlockAvailable(FragmentInstanceID instanceID, FragmentInstanceID downstreamInstanceId);
+
+  /**
+   * abort all the instances in this query
+   *
+   * @param queryId the id of the query to be aborted.
+   */
+  void abortQuery(String queryId);
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/ID.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/ID.java
new file mode 100644
index 0000000..cc7d58f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/ID.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+/** A simple interface to indicate the id type */
+public interface ID {}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IDIndexedAccessible.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IDIndexedAccessible.java
new file mode 100644
index 0000000..5ae4c96
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IDIndexedAccessible.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+/** A simple interface for id getter and setter */
+public interface IDIndexedAccessible {
+
+  ID getId();
+
+  void setId(ID id);
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java
new file mode 100644
index 0000000..bcabb5c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+/**
+ * The base class of a special kind of blocking queue, which has these characters:
+ *
+ * <p>1. Thread-safe.
+ *
+ * <p>2. Can poll from queue head. When the queue is empty, the poll() will be blocked until an
+ * element is inserted.
+ *
+ * <p>3. Can push a non-null element to queue. When the queue is beyond the max size, an exception
+ * will be thrown.
+ *
+ * <p>4. Can remove an element by a long type id.
+ */
+public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
+
+  private final int MAX_CAPACITY;
+  private final E queryHolder;
+  private int size;
+
+  /**
+   * Init the queue with a max capacity. The queryHolder is just a simple reused object in query to
+   * avoid small objects allocation. It should be not used in any other places out of the queue as
+   * the id may be mutated.
+   *
+   * @param maxCapacity the max capacity of the queue.
+   * @param queryHolder the query holder instance.
+   * @throws IllegalArgumentException if maxCapacity <= 0.
+   */
+  public IndexedBlockingQueue(int maxCapacity, E queryHolder) {
+    this.MAX_CAPACITY = maxCapacity;
+    this.queryHolder = queryHolder;
+  }
+
+  /**
+   * Get and remove the first element of the queue. If the queue is empty, this call will be blocked
+   * until an element has been pushed.
+   *
+   * @return the queue head element.
+   */
+  public synchronized E poll() throws InterruptedException {
+    while (isEmpty()) {
+      this.wait();
+    }
+    E output = pollFirst();
+    size--;
+    return output;
+  }
+
+  /**
+   * Push an element to the queue. The new element position is determined by the implementation. If
+   * the queue size has been reached the maxCapacity, an {@link IllegalStateException} will be
+   * thrown. If the element is null, an {@link NullPointerException} will be thrown.
+   *
+   * @param element the element to be pushed.
+   * @throws NullPointerException the pushed element is null.
+   * @throws IllegalStateException the queue size has been reached the maxCapacity.
+   */
+  public synchronized void push(E element) {
+    if (element == null) {
+      throw new NullPointerException("pushed element is null");
+    }
+    if (size + 1 > MAX_CAPACITY) {
+      throw new IllegalStateException("the queue is full");
+    }
+    pushToQueue(element);
+    this.notifyAll();
+  }
+
+  /**
+   * Remove and return the element by a long id. It returns null if it doesn't exist.
+   *
+   * @param id the id of the element to be removed.
+   * @return the removed element.
+   */
+  public synchronized E remove(ID id) {
+    queryHolder.setId(id);
+    E output = remove(queryHolder);
+    if (output == null) {
+      return null;
+    }
+    size--;
+    return output;
+  }
+
+  /**
+   * Get the current queue size.
+   *
+   * @return the current queue size.
+   */
+  public final synchronized int size() {
+    return size;
+  }
+
+  /**
+   * Whether the queue is empty.
+   *
+   * <p>This implementation needn't be thread-safe.
+   *
+   * @return true if the queue is empty, otherwise false.
+   */
+  protected abstract boolean isEmpty();
+
+  /**
+   * Get and remove the first element.
+   *
+   * <p>This implementation needn't be thread-safe.
+   *
+   * @return The first element.
+   */
+  protected abstract E pollFirst();
+
+  /**
+   * Push the element into the queue.
+   *
+   * <p>This implementation needn't be thread-safe.
+   *
+   * @param element the element to be pushed.
+   */
+  protected abstract void pushToQueue(E element);
+
+  /**
+   * Remove and return the element by a long id. It returns null if it doesn't exist.
+   *
+   * @param element the element to be removed.
+   * @return the removed element.
+   */
+  protected abstract E remove(E element);
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java
new file mode 100644
index 0000000..a5ccf14
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+import java.util.Comparator;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * An efficient subclass of {@link IndexedBlockingQueue} with 1-level priority groups.
+ *
+ * <p>The time complexity of operations are:
+ *
+ * <ul>
+ *   <li><b>{@link #size()}: </b> O(1).
+ *   <li><b>{@link #remove(IDIndexedAccessible)} ()}: </b> O(logN).
+ *   <li><b>{@link #push(IDIndexedAccessible)}: </b> O(logN).
+ *   <li><b>{@link #poll()}: </b> O(logN).
+ * </ul>
+ */
+public class L1PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlockingQueue<E> {
+
+  // Here we use a map not a set to act as a queue because we need to get the element reference
+  // after it was removed.
+  private final SortedMap<E, E> elements;
+
+  /**
+   * Init the queue with max capacity and specified comparator.
+   *
+   * @see IndexedBlockingQueue
+   * @param maxCapacity the max capacity of the queue.
+   * @param comparator the comparator for comparing the elements.
+   * @param queryHolder the query holder instance.
+   * @throws IllegalArgumentException if maxCapacity <= 0.
+   */
+  public L1PriorityQueue(int maxCapacity, Comparator<E> comparator, E queryHolder) {
+    super(maxCapacity, queryHolder);
+    this.elements = new TreeMap<>(comparator);
+  }
+
+  @Override
+  protected boolean isEmpty() {
+    return elements.isEmpty();
+  }
+
+  @Override
+  protected E pollFirst() {
+    return elements.remove(elements.firstKey());
+  }
+
+  @Override
+  protected void pushToQueue(E element) {
+    elements.put(element, element);
+  }
+
+  @Override
+  protected E remove(E element) {
+    return elements.remove(element);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java
new file mode 100644
index 0000000..3b8a9e6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+import java.util.Comparator;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * An efficient subclass of {@link IndexedBlockingQueue} with 2-level priority groups. The
+ * advantages compared to {@link L1PriorityQueue} are that each element in this queue will not be
+ * starved to death by its low sequence order.
+ *
+ * <p>The time complexity of operations are:
+ *
+ * <ul>
+ *   <li><b>{@link #size()}: </b> O(1).
+ *   <li><b>{@link #remove(IDIndexedAccessible)} ()}: </b> O(logN).
+ *   <li><b>{@link #push(IDIndexedAccessible)}: </b> O(logN).
+ *   <li><b>{@link #poll()}: </b> O(logN).
+ * </ul>
+ */
+public class L2PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlockingQueue<E> {
+
+  // Here we use a map not a set to act as a queue because we need to get the element reference
+  // after it was removed.
+  private SortedMap<E, E> workingElements;
+  private SortedMap<E, E> idleElements;
+
+  /**
+   * Init the queue with max capacity and specified comparator.
+   *
+   * @see IndexedBlockingQueue
+   * @param maxCapacity the max capacity of the queue.
+   * @param comparator the comparator for comparing the elements.
+   * @param queryHolder the query holder instance.
+   * @throws IllegalArgumentException if maxCapacity <= 0.
+   */
+  public L2PriorityQueue(int maxCapacity, Comparator<E> comparator, E queryHolder) {
+    super(maxCapacity, queryHolder);
+    this.workingElements = new TreeMap<>(comparator);
+    this.idleElements = new TreeMap<>(comparator);
+  }
+
+  @Override
+  protected boolean isEmpty() {
+    return workingElements.isEmpty() && idleElements.isEmpty();
+  }
+
+  @Override
+  protected E pollFirst() {
+    if (workingElements.isEmpty()) {
+      SortedMap<E, E> tmp = workingElements;
+      workingElements = idleElements;
+      idleElements = tmp;
+    }
+    return workingElements.remove(workingElements.firstKey());
+  }
+
+  @Override
+  protected void pushToQueue(E element) {
+    idleElements.put(element, element);
+  }
+
+  @Override
+  protected E remove(E element) {
+    E e = workingElements.remove(element);
+    if (e == null) {
+      e = idleElements.remove(element);
+    }
+    return e;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceID.java b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceID.java
new file mode 100644
index 0000000..f52b5c3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceID.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.task;
+
+import org.apache.iotdb.mpp.execution.queue.ID;
+
+import org.jetbrains.annotations.NotNull;
+
+/** the class of id of the fragment instance */
+public class FragmentInstanceID implements ID, Comparable<FragmentInstanceTask> {
+
+  private final String instanceId;
+  private final String fragmentId;
+  private final String queryId;
+
+  public FragmentInstanceID(String queryId, String fragmentId, String instanceId) {
+    this.queryId = queryId;
+    this.fragmentId = fragmentId;
+    this.instanceId = instanceId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return o instanceof FragmentInstanceID
+        && queryId.equals(((FragmentInstanceID) o).getQueryId())
+        && fragmentId.equals(((FragmentInstanceID) o).getFragmentId())
+        && instanceId.equals(((FragmentInstanceID) o).getInstanceId());
+  }
+
+  public String toString() {
+    return String.format("%s.%s.%s", getInstanceId(), getFragmentId(), getQueryId());
+  }
+
+  public String getInstanceId() {
+    return instanceId;
+  }
+
+  public String getFragmentId() {
+    return fragmentId;
+  }
+
+  public String getQueryId() {
+    return queryId;
+  }
+
+  // This is the default comparator of FragmentInstanceID
+  @Override
+  public int compareTo(@NotNull FragmentInstanceTask o) {
+    return String.CASE_INSENSITIVE_ORDER.compare(this.toString(), o.toString());
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java
new file mode 100644
index 0000000..7bd7205
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.task;
+
+import org.apache.iotdb.mpp.execution.ExecutionContext;
+import org.apache.iotdb.mpp.execution.FragmentInstanceTaskExecutor;
+import org.apache.iotdb.mpp.execution.queue.ID;
+import org.apache.iotdb.mpp.execution.queue.IDIndexedAccessible;
+
+import java.util.Comparator;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * the scheduling element of {@link FragmentInstanceTaskExecutor}. It wraps a single
+ * FragmentInstance.
+ */
+public class FragmentInstanceTask implements IDIndexedAccessible {
+
+  private FragmentInstanceID id;
+  private FragmentInstanceTaskStatus status;
+  private final ExecutionContext executionContext;
+
+  // the higher this field is, the higher probability it will be scheduled.
+  private long schedulePriority;
+  private final long ddl;
+  private final Lock lock;
+
+  /** Initialize a dummy instance for queryHolder */
+  public FragmentInstanceTask() {
+    this(null, 0L, null);
+  }
+
+  public FragmentInstanceTask(
+      FragmentInstanceID id, long timeoutMs, FragmentInstanceTaskStatus status) {
+    this.id = id;
+    this.setStatus(status);
+    this.executionContext = new ExecutionContext();
+    this.schedulePriority = 0L;
+    this.ddl = System.currentTimeMillis() + timeoutMs;
+    this.lock = new ReentrantLock();
+  }
+
+  public FragmentInstanceID getId() {
+    return id;
+  }
+
+  @Override
+  public void setId(ID id) {
+    this.id = (FragmentInstanceID) id;
+  }
+
+  public FragmentInstanceTaskStatus getStatus() {
+    return status;
+  }
+
+  public void setStatus(FragmentInstanceTaskStatus status) {
+    this.status = status;
+  }
+
+  public ExecutionContext getExecutionContext() {
+    return executionContext;
+  }
+
+  /** Update the schedule priority according to the execution context. */
+  public void updateSchedulePriority() {
+    // TODO: need to implement here
+    this.schedulePriority = System.currentTimeMillis() - ddl;
+  }
+
+  public void lock() {
+    lock.lock();
+  }
+
+  public void unlock() {
+    lock.unlock();
+  }
+
+  public double getSchedulePriority() {
+    return schedulePriority;
+  }
+
+  public long getDDL() {
+    return ddl;
+  }
+
+  /** a comparator of ddl, the less the ddl is, the low order it has. */
+  public static class TimeoutComparator implements Comparator<FragmentInstanceTask> {
+
+    @Override
+    public int compare(FragmentInstanceTask o1, FragmentInstanceTask o2) {
+      if (o1.getId().equals(o2.getId())) {
+        return 0;
+      }
+      if (o1.getDDL() < o2.getDDL()) {
+        return -1;
+      }
+      if (o1.getDDL() > o2.getDDL()) {
+        return 1;
+      }
+      return o1.getId().compareTo(o2);
+    }
+  }
+
+  /** a comparator of ddl, the higher the schedulePriority is, the low order it has. */
+  public static class SchedulePriorityComparator implements Comparator<FragmentInstanceTask> {
+
+    @Override
+    public int compare(FragmentInstanceTask o1, FragmentInstanceTask o2) {
+      if (o1.getId().equals(o2.getId())) {
+        return 0;
+      }
+      if (o1.getSchedulePriority() > o2.getSchedulePriority()) {
+        return -1;
+      }
+      if (o1.getSchedulePriority() < o2.getSchedulePriority()) {
+        return 1;
+      }
+      return o1.getId().compareTo(o2);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTaskStatus.java b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTaskStatus.java
new file mode 100644
index 0000000..5ee9fb3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTaskStatus.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.task;
+
+/** the status enum of {@link FragmentInstanceTask} */
+public enum FragmentInstanceTaskStatus {
+  /* Ready to be executed */
+  READY,
+
+  /* Being executed */
+  RUNNING,
+
+  /* Waiting upstream input or output consumed by downstream FragmentInstances */
+  BLOCKED,
+
+  /* Interrupted caused by timeout or coordinator's cancellation */
+  ABORTED,
+
+  /* Finished by met the EOF of upstream inputs */
+  FINISHED,
+}