You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by er...@apache.org on 2022/03/17 02:42:48 UTC

[iotdb] branch new_mpp updated (a1e83fa -> 1feea78)

This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a change to branch new_mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


 discard a1e83fa  Fragment schedule develop
 discard a15776f  [To_new_mpp] add IDataBlockManager (#5240)
    omit b3e564a  [To_new_mpp] add common classes & thrift files (#5239)
    omit 7c04879  [To_new_mpp] Basic query memory control (#5216)
    omit 37f6f70  Basic implementation of FragmentInstanceManager
     add 08e9d34  change master version to 0.14.0-SNAPSHOT (#5209)
     add c3e49a8  [IOTDB-2735] Remove the redundant code in Tablet (#5211)
     add b05a81f  [IOTDB-2732] Reject inserting an invalid infinity float value (#5210)
     add 10b15ea  [IOTDB-2734] Correct result type name in ResultMetadata (#5213)
     add 2a5d371  Update dependabot to check rel/0.13 (#5203)
     add 83ab2f6  Fix .github/dependabot.yml (#5222)
     add 272d5db  Update UserGuide descriptions (#5219)
     add 1460bb7  [IOTDB-2739] Update v0.13 upgrade user guide (#5234)
     add 0d2cad6  Bump org.slf4j.version from 1.7.32 to 1.7.36 (#5225)
     add 7ce67a1  [IOTDB-2742] Hide IDTable configurations (#5238)
     add b3147de  fix write with null value in tsFileWriter (#5237)
     add 30cd1c0  Revert "fix write with null value in tsFileWriter (#5237)" (#5246)
     add a88b6b5  Modify cross priority param to upper case in doc (#5242)
     add c3d34b6  [IOTDB-2590] Group by time query: support time sliding step less than the time interval (#5196)
     add c2a2acd  [IOTDB-2736] DELETE_STORAGE_GROUP can not be granted to user (reporting 401) (#5233)
     add 9f04de9  [IOTDB-2723] Fix sequence inner space compaction lose data
     add 2f78450  [IOTDB-2614]Fix inserting tablet with null value in TsFileWriter (#5244)
     add 919532a  Bump jacoco-maven-plugin from 0.8.5 to 0.8.7 (#5194)
     add 74189ee  Bump maven-bundle-plugin from 5.1.1 to 5.1.4 (#5192)
     add 6347db7  [IOTDB-2725] MPP query engine interface definition (#5250)
     add 1ff6db9  [IOTDB-2524] Aligned Timeseries support tags and attributes (#5191)
     add 5a88650  [IOTDB-2685] Create and start config node (#5247)
     add 5edf9e9  [IOTDB-2750] Enhance check statement before writing mlog  (#5253)
     add 34b480d  [IOTDB-2737] Fix ID Table compaction bug (#5214)
     add 0426e72  [IOTDB-1593] Fix generated files in client-py have unapproved license (#5258)
     add 37dae90  add setNoMoreTsBlocks(), close() and abort() methods in SinkHandle interface (#5256)
     add d29ef70  [IOTDB-2749]Fix isPathExist check on MTree (#5260)
     new 678f712  Basic implementation of FragmentInstanceManager
     new d31733f  [To_new_mpp] Basic query memory control (#5216)
     new 4a85d31  [To_new_mpp] add common classes & thrift files (#5239)
     new 6e4e2cb  [To_new_mpp] add IDataBlockManager (#5240)
     new 0ef2b42  Fragment schedule develop
     new 1feea78  Sync with master

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (a1e83fa)
            \
             N -- N -- N   refs/heads/new_mpp (1feea78)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/dependabot.yml                             |  26 +
 README.md                                          |   4 +-
 README_ZH.md                                       |   4 +-
 antlr/pom.xml                                      |   2 +-
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |   7 +-
 cli/pom.xml                                        |   2 +-
 client-cpp/pom.xml                                 |   2 +-
 client-py/pom.xml                                  |   2 +-
 cluster/pom.xml                                    |   2 +-
 .../apache/iotdb/cluster/metadata/CMManager.java   |   9 +-
 .../iotdb/cluster/query/LocalQueryExecutor.java    |   4 +-
 .../groupby/ClusterGroupByNoVFilterDataSet.java    |   4 +-
 .../groupby/ClusterGroupByVFilterDataSet.java      |   2 +-
 .../query/groupby/MergeGroupByExecutor.java        |   2 +-
 .../query/groupby/RemoteGroupByExecutor.java       |   2 +-
 .../cluster/query/manage/ClusterQueryManager.java  |   2 +-
 .../cluster/query/reader/ClusterReaderFactory.java |   2 +-
 .../iotdb/cluster/query/reader/EmptyReader.java    |   2 +-
 .../query/groupby/RemoteGroupByExecutorTest.java   |   2 +-
 code-coverage/pom.xml                              |   4 +-
 compile-tools/pom.xml                              |   2 +-
 compile-tools/thrift/pom.xml                       |   2 +-
 confignode/pom.xml                                 |  19 +-
 .../src/assembly/resources/conf/confignode-env.bat | 146 +++++
 .../src/assembly/resources/conf/confignode-env.sh  | 264 +++++++++
 .../resources/conf/iotdb-confignode.properties     |  10 +-
 .../src/assembly/resources/conf/jmx.access         |   0
 .../src/assembly/resources/conf/jmx.password       |   0
 .../assembly/resources/sbin/start-confignode.bat   | 122 ++++
 .../assembly/resources/sbin/start-confignode.sh    |  77 +++
 .../assembly/resources/sbin/stop-confignode.bat    |  38 ++
 .../resources/sbin/stop-confignode.sh}             |  32 +-
 .../iotdb/confignode/conf/ConfigNodeConf.java      |   5 +-
 .../iotdb/confignode/conf/ConfigNodeConfCheck.java | 136 +++++
 .../iotdb/confignode/conf/ConfigNodeConstant.java  |  28 +-
 .../confignode/conf/ConfigNodeDescriptor.java      |  66 ++-
 .../confignode/exception/ConfigNodeException.java  |  11 +-
 .../conf/RepeatConfigurationException.java}        |  18 +-
 .../startup/StartupException.java}                 |  17 +-
 .../iotdb/confignode/manager/ConfigManager.java    |   2 +-
 .../iotdb/confignode/service/ConfigNode.java       | 125 ++++
 .../iotdb/confignode/service/ConfigNodeMBean.java  |   7 +-
 .../confignode/service/register/IService.java      |  40 +-
 .../confignode/service/register/JMXService.java    | 105 ++++
 .../service/register/RegisterManager.java          |  82 +++
 .../register/ServiceType.java}                     |  30 +-
 .../confignode/service/startup/StartupCheck.java   |  11 +-
 .../confignode/service/startup/StartupChecks.java  |  89 +++
 .../{impl => server}/ConfigNodeRPCServer.java      |   2 +-
 .../confignode/conf/ConfigNodeDescriptorTest.java  |  76 ---
 .../manager/hash/DeviceGroupHashExecutorTest.java  |   3 +-
 .../utils/ConfigNodeEnvironmentUtils.java          | 133 +++++
 consensus/pom.xml                                  |   2 +-
 cross-tests/pom.xml                                |   2 +-
 distribution/pom.xml                               |   2 +-
 docs/Download/README.md                            |  16 +-
 docs/UserGuide/API/Programming-Java-Native-API.md  |   4 +-
 docs/UserGuide/API/Programming-TsFile-API.md       |   4 +-
 .../Administration-Management/Administration.md    |   6 +-
 docs/UserGuide/Cluster/Cluster-Setup-Example.md    |  10 +-
 .../UserGuide/Ecosystem Integration/Hive TsFile.md |   8 +-
 .../Ecosystem Integration/MapReduce TsFile.md      |   2 +-
 .../Ecosystem Integration/Spark TsFile.md          |   2 +-
 docs/UserGuide/Operate-Metadata/Timeseries.md      |  39 +-
 docs/UserGuide/Process-Data/Triggers.md            |   6 +-
 .../Process-Data/UDF-User-Defined-Function.md      |   8 +-
 docs/UserGuide/Query-Data/Aggregate-Query.md       |  28 +-
 docs/UserGuide/Query-Data/Overview.md              |   2 +-
 docs/UserGuide/QuickStart/WayToGetIoTDB.md         |   2 +-
 docs/UserGuide/Reference/Config-Manual.md          |   7 +-
 docs/UserGuide/Reference/SQL-Reference.md          |   2 +-
 docs/zh/Download/README.md                         |  11 +-
 docs/zh/UserGuide/API/Programming-JDBC.md          |   2 +-
 .../UserGuide/API/Programming-Java-Native-API.md   |   4 +-
 docs/zh/UserGuide/API/Programming-TsFile-API.md    |   4 +-
 .../Administration-Management/Administration.md    |   8 +-
 docs/zh/UserGuide/Cluster/Cluster-Setup-Example.md |  10 +-
 .../UserGuide/Ecosystem Integration/Hive TsFile.md |   8 +-
 .../Ecosystem Integration/MapReduce TsFile.md      |   2 +-
 .../Ecosystem Integration/Spark TsFile.md          |   2 +-
 docs/zh/UserGuide/Operate-Metadata/Timeseries.md   |  34 +-
 docs/zh/UserGuide/Process-Data/Triggers.md         |   6 +-
 .../Process-Data/UDF-User-Defined-Function.md      |   8 +-
 docs/zh/UserGuide/Query-Data/Aggregate-Query.md    |  37 +-
 docs/zh/UserGuide/Query-Data/Overview.md           |   2 +-
 docs/zh/UserGuide/QuickStart/WayToGetIoTDB.md      |   2 +-
 docs/zh/UserGuide/Reference/Config-Manual.md       |   5 +-
 docs/zh/UserGuide/Reference/SQL-Reference.md       |   2 +-
 example/client-cpp-example/pom.xml                 |   2 +-
 example/flink/pom.xml                              |   2 +-
 example/hadoop/pom.xml                             |   2 +-
 example/jdbc/pom.xml                               |   2 +-
 example/kafka/pom.xml                              |   4 +-
 example/kafka/readme.md                            |   4 +-
 example/mqtt-customize/pom.xml                     |   2 +-
 example/mqtt/pom.xml                               |   2 +-
 example/pom.xml                                    |   2 +-
 example/pulsar/pom.xml                             |   2 +-
 example/rabbitmq/pom.xml                           |   2 +-
 example/rocketmq/pom.xml                           |   2 +-
 example/rocketmq/readme.md                         |   4 +-
 example/session/pom.xml                            |   2 +-
 .../iotdb/AlignedTimeseriesSessionExample.java     |   9 +-
 .../main/java/org/apache/iotdb/SessionExample.java |   5 +-
 example/trigger/pom.xml                            |   4 +-
 example/tsfile/pom.xml                             |   2 +-
 example/tsfile/readme.md                           |   2 +-
 example/udf/pom.xml                                |   2 +-
 flink-iotdb-connector/pom.xml                      |   2 +-
 flink-tsfile-connector/pom.xml                     |   2 +-
 grafana-connector/pom.xml                          |   2 +-
 grafana-plugin/pom.xml                             |   2 +-
 hadoop/README.md                                   |   2 +-
 hadoop/pom.xml                                     |   2 +-
 hive-connector/pom.xml                             |   2 +-
 influxdb-protocol/pom.xml                          |   2 +-
 integration/pom.xml                                |   2 +-
 .../apache/iotdb/db/integration/IOTDBInsertIT.java |  26 +
 .../iotdb/db/integration/IoTDBAuthorizationIT.java |  49 ++
 .../integration/IoTDBCompactionWithIDTableIT.java  | 352 ++++++++++++
 .../db/integration/IoTDBResultMetadataIT.java      |  82 +++
 .../IoTDBGroupByQueryWithoutValueFilterIT.java     |  14 -
 ...roupBySlidingWindowQueryWithValueFilterIT.java} | 575 +++++++------------
 ...pBySlidingWindowQueryWithoutValueFilterIT.java} | 552 ++++++------------
 .../integration/{ => groupby}/IOTDBGroupByIT.java  |   2 +-
 .../{ => groupby}/IOTDBGroupByInnerIntervalIT.java |   2 +-
 .../{ => groupby}/IoTDBGroupByFillIT.java          |   2 +-
 .../{ => groupby}/IoTDBGroupByFillMixPathsIT.java  |   2 +-
 .../{ => groupby}/IoTDBGroupByFillWithRangeIT.java |   2 +-
 .../{ => groupby}/IoTDBGroupByMonthFillIT.java     |   2 +-
 .../{ => groupby}/IoTDBGroupByMonthIT.java         |  95 ++-
 .../{ => groupby}/IoTDBGroupByUnseqIT.java         |   2 +-
 .../apache/iotdb/session/IoTDBSessionSimpleIT.java |   5 +-
 {example/mqtt => iotdb-commons}/pom.xml            |  15 +-
 .../apache/iotdb/commons}/hash/APHashExecutor.java |   2 +-
 .../iotdb/commons}/hash/BKDRHashExecutor.java      |   2 +-
 .../commons}/hash/DeviceGroupHashExecutor.java     |   2 +-
 .../apache/iotdb/commons}/hash/JSHashExecutor.java |   2 +-
 .../iotdb/commons}/hash/SDBMHashExecutor.java      |   2 +-
 jdbc/README.md                                     |   4 +-
 jdbc/pom.xml                                       |   2 +-
 .../org/apache/iotdb/jdbc/IoTDBResultMetadata.java |  29 +-
 library-udf/pom.xml                                |   2 +-
 metrics/dropwizard-metrics/pom.xml                 |   4 +-
 metrics/interface/pom.xml                          |   4 +-
 metrics/micrometer-metrics/pom.xml                 |   4 +-
 metrics/pom.xml                                    |   2 +-
 openapi/pom.xml                                    |   2 +-
 pom.xml                                            |  16 +-
 server/pom.xml                                     |  11 +-
 .../resources/conf/iotdb-engine.properties         |  21 +-
 .../org/apache/iotdb/db/auth/AuthorityChecker.java |   2 +
 .../apache/iotdb/db/auth/entity/PrivilegeType.java |  38 +-
 .../org/apache/iotdb/db/conf/IoTDBConfigCheck.java |  20 +
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  26 +-
 .../db/engine/compaction/CompactionUtils.java      |  19 +-
 .../manage/CrossSpaceCompactionResource.java       |   6 -
 .../inner/utils/InnerSpaceCompactionUtils.java     |   8 +-
 .../utils/SingleSeriesCompactionExecutor.java      |   9 +-
 .../org/apache/iotdb/db/metadata/MManager.java     |  45 +-
 .../idtable/AppendOnlyDiskSchemaManager.java       |  39 +-
 .../apache/iotdb/db/metadata/idtable/IDTable.java  |  10 +
 .../db/metadata/idtable/IDTableHashmapImpl.java    |  28 +
 .../iotdb/db/metadata/idtable/IDTableManager.java  |  21 +
 .../org/apache/iotdb/db/metadata/mtree/MTree.java  |   8 +-
 .../mpp/buffer}/IDataBlockManager.java             |   6 +-
 .../org/apache/iotdb/db/mpp/buffer/SinkHandle.java |  56 ++
 .../apache/iotdb/db/mpp/buffer/SourceHandle.java   |  22 +-
 .../mpp/common/Analysis.java}                      |   8 +-
 .../mpp/common/FillPolicy.java}                    |   9 +-
 .../mpp/common/GroupByTimeParameter.java}          |  13 +-
 .../mpp/common/OrderBy.java}                       |  16 +-
 .../mpp/common/QueryContext.java}                  |  18 +-
 .../mpp/common/QueryId.java}                       |  16 +-
 .../mpp/common/QuerySession.java}                  |   7 +-
 .../mpp/common/TreeNode.java}                      |  32 +-
 .../ITSBlock.java => db/mpp/common/TsBlock.java}   |  15 +-
 .../iotdb/db/mpp/common/TsBlockMetadata.java       |  41 ++
 .../mpp/common/WithoutPolicy.java}                 |  10 +-
 .../execution/Coordinator.java}                    |  32 +-
 .../db/mpp/execution/ExecFragmentInstance.java     |  57 ++
 .../iotdb/db/mpp/execution/QueryExecution.java     |  96 ++++
 .../iotdb/db/mpp/execution/QueryScheduler.java     |  51 ++
 .../mpp/execution/QueryStateMachine.java}          |  11 +-
 .../{ => db}/mpp/memory/LocalMemoryManager.java    |   2 +-
 .../iotdb/{ => db}/mpp/memory/MemoryPool.java      |   2 +-
 .../org/apache/iotdb/db/mpp/operator/Operator.java |  49 ++
 .../iotdb/db/mpp/operator/OperatorContext.java     |  25 +-
 .../iotdb/db/mpp/operator/SeriesScanOperator.java  |  21 +-
 .../iotdb/db/mpp/plan/DistributedQueryPlan.java    |  20 +-
 .../iotdb/db/mpp/plan/DistributionPlanner.java     |  21 +-
 .../apache/iotdb/db/mpp/plan/FragmentInstance.java |  17 +-
 .../mpp/plan/FragmentInstanceId.java}              |  15 +-
 .../apache/iotdb/db/mpp/plan/LogicalPlanner.java   |  35 +-
 .../apache/iotdb/db/mpp/plan/LogicalQueryPlan.java |  22 +-
 .../org/apache/iotdb/db/mpp/plan/PlanFragment.java |  19 +-
 .../mpp/plan/PlanFragmentId.java}                  |  11 +-
 .../apache/iotdb/db/mpp/plan/node/PlanNode.java    |  22 +-
 .../apache/iotdb/db/mpp/plan/node/PlanNodeId.java  |  34 ++
 .../mpp/plan/node/PlanNodeIdAllocator.java}        |   8 +-
 .../db/mpp/plan/node/process/DeviceMergeNode.java  |  66 +++
 .../iotdb/db/mpp/plan/node/process/FillNode.java   |  30 +-
 .../iotdb/db/mpp/plan/node/process/FilterNode.java |  30 +-
 .../db/mpp/plan/node/process/FilterNullNode.java   |  30 +-
 .../db/mpp/plan/node/process/GroupByLevelNode.java |  41 ++
 .../iotdb/db/mpp/plan/node/process/LimitNode.java  |  23 +-
 .../mpp/plan/node/process/OffsetNode.java}         |  31 +-
 .../db/mpp/plan/node/process/ProcessNode.java      |  17 +-
 .../node/process/RowBasedSeriesAggregateNode.java  |  59 ++
 .../iotdb/db/mpp/plan/node/process/SortNode.java   |  32 +-
 .../db/mpp/plan/node/process/TimeJoinNode.java     |  67 +++
 .../iotdb/db/mpp/plan/node/sink/CsvSinkNode.java   |  15 +-
 .../db/mpp/plan/node/sink/FragmentSinkNode.java    |  17 +-
 .../iotdb/db/mpp/plan/node/sink/SinkNode.java      |  16 +-
 .../db/mpp/plan/node/sink/ThriftSinkNode.java      |  17 +-
 .../db/mpp/plan/node/source/CsvSourceNode.java     |  17 +-
 .../mpp/plan/node/source/SeriesAggregateNode.java  |  81 +++
 .../db/mpp/plan/node/source/SeriesScanNode.java    |  85 +++
 .../iotdb/db/mpp/plan/node/source/SourceNode.java  |  16 +-
 .../mpp/plan/optimzation/PlanOptimizer.java}       |  14 +-
 .../mpp/schedule}/ExecutionContext.java            |   4 +-
 .../mpp/schedule}/FragmentInstanceManager.java     |  14 +-
 .../schedule}/FragmentInstanceTaskCallback.java    |   4 +-
 .../schedule}/FragmentInstanceTaskExecutor.java    |   6 +-
 .../schedule}/FragmentInstanceTimeoutSentinel.java |   8 +-
 .../mpp/schedule}/IFragmentInstanceManager.java    |  11 +-
 .../execution => db/mpp/schedule}/queue/ID.java    |   2 +-
 .../mpp/schedule}/queue/IDIndexedAccessible.java   |   2 +-
 .../mpp/schedule}/queue/IndexedBlockingQueue.java  |   2 +-
 .../mpp/schedule}/queue/L1PriorityQueue.java       |   2 +-
 .../mpp/schedule}/queue/L2PriorityQueue.java       |   2 +-
 .../mpp/schedule}/task/FragmentInstanceID.java     |   4 +-
 .../mpp/schedule}/task/FragmentInstanceTask.java   |  10 +-
 .../schedule}/task/FragmentInstanceTaskStatus.java |   2 +-
 .../sys/CreateAlignedTimeSeriesOperator.java       |  55 +-
 .../iotdb/db/qp/physical/crud/AggregationPlan.java |   2 +-
 .../db/qp/physical/crud/GroupByTimeFillPlan.java   |   7 +-
 .../physical/sys/CreateAlignedTimeSeriesPlan.java  | 125 +++-
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |  12 +-
 .../apache/iotdb/db/qp/utils/DatetimeUtils.java    |  22 +
 .../db/query/aggregation/AggregateResult.java      |  53 +-
 .../aggregation/RemovableAggregateResult.java}     |  13 +-
 .../db/query/aggregation/impl/AvgAggrResult.java   |  22 +-
 .../db/query/aggregation/impl/CountAggrResult.java |  14 +-
 .../query/aggregation/impl/ExtremeAggrResult.java  |   8 +-
 .../aggregation/impl/FirstValueAggrResult.java     |   3 -
 .../aggregation/impl/LastValueAggrResult.java      |   3 -
 .../query/aggregation/impl/MaxTimeAggrResult.java  |   5 +
 .../query/aggregation/impl/MaxValueAggrResult.java |   4 +
 .../query/aggregation/impl/MinTimeAggrResult.java  |   5 +
 .../query/aggregation/impl/MinValueAggrResult.java |   4 +
 .../db/query/aggregation/impl/SumAggrResult.java   |  20 +-
 .../dataset/groupby/GroupByEngineDataSet.java      | 239 --------
 .../query/dataset/groupby/GroupByFillDataSet.java  |   2 +-
 .../query/dataset/groupby/GroupByLevelDataSet.java |   2 +-
 .../query/dataset/groupby/GroupByTimeDataSet.java  | 177 ++++++
 .../dataset/groupby/GroupByTimeEngineDataSet.java  |  91 +++
 .../groupby/GroupByWithValueFilterDataSet.java     |  85 +--
 .../groupby/GroupByWithoutValueFilterDataSet.java  | 104 ++--
 .../iotdb/db/query/executor/QueryRouter.java       |  10 +-
 .../apache/iotdb/db/query/executor/fill/IFill.java |  13 +-
 .../groupby/AlignedGroupByExecutor.java            |   2 +-
 .../groupby/GroupByExecutor.java                   |   2 +-
 .../groupby/SlidingWindowGroupByExecutor.java      |  66 +++
 .../SlidingWindowGroupByExecutorFactory.java       | 117 ++++
 .../EmptyQueueSlidingWindowGroupByExecutor.java    |  51 ++
 .../groupby/impl}/LocalAlignedGroupByExecutor.java |   3 +-
 .../groupby/impl}/LocalGroupByExecutor.java        |   3 +-
 ...MonotonicQueueSlidingWindowGroupByExecutor.java |  73 +++
 .../NormalQueueSlidingWindowGroupByExecutor.java   |  61 ++
 .../SmoothQueueSlidingWindowGroupByExecutor.java   |  55 ++
 .../db/query/factory/AggregateResultFactory.java   |   2 +
 .../db/service/thrift/impl/TSServiceImpl.java      |   4 +-
 .../java/org/apache/iotdb/db/utils/AuthUtils.java  |   2 +
 .../org/apache/iotdb/db/utils/CommonUtils.java     |  12 +-
 .../iotdb/db/utils/datastructure/TimeSelector.java |  18 +
 .../timerangeiterator/AggrWindowIterator.java      | 149 +++++
 .../timerangeiterator/ITimeRangeIterator.java      |  27 +-
 .../timerangeiterator/PreAggrWindowIterator.java   | 142 +++++
 .../PreAggrWindowWithNaturalMonthIterator.java     | 109 ++++
 .../TimeRangeIteratorFactory.java                  |  69 +++
 .../InnerSpaceCompactionUtilsNoAlignedTest.java    |  83 ++-
 .../iotdb/db/metadata/MManagerBasicTest.java       |  30 +
 .../iotdb/db/metadata/idtable/IDTableTest.java     |   4 +
 .../iotdb/{ => db}/mpp/memory/MemoryPoolTest.java  |   2 +-
 .../mpp/schedule}/queue/L1PriorityQueueTest.java   |   2 +-
 .../mpp/schedule}/queue/L2PriorityQueueTest.java   |   2 +-
 .../mpp/schedule}/queue/QueueElement.java          |   2 +-
 .../db/qp/physical/PhysicalPlanSerializeTest.java  |   2 +
 ...imeDataSetTest.java => GroupByDataSetTest.java} |   4 +-
 .../dataset/groupby/GroupByEngineDataSetTest.java  | 504 ----------------
 .../dataset/groupby/GroupByTimeDataSetTest.java    | 636 +++++++++++++++------
 .../iotdb/db/utils/TimeRangeIteratorTest.java      | 220 +++++++
 .../db/utils/datastructure/TimeSelectorTest.java   |  22 +
 service-rpc/pom.xml                                |   2 +-
 session/pom.xml                                    |   2 +-
 .../java/org/apache/iotdb/session/Session.java     |  19 +-
 .../session/IoTDBSessionDisableMemControlIT.java   |   9 +-
 .../session/IoTDBSessionVectorABDeviceIT.java      |   2 +-
 .../IoTDBSessionVectorAggregationWithUnSeqIT.java  |   2 +-
 .../iotdb/session/IoTDBSessionVectorInsertIT.java  |   9 +-
 site/pom.xml                                       |   2 +-
 spark-iotdb-connector/pom.xml                      |   2 +-
 spark-tsfile/pom.xml                               |   2 +-
 testcontainer/pom.xml                              |   2 +-
 thrift-cluster/pom.xml                             |   2 +-
 thrift-confignode/pom.xml                          |   2 +-
 thrift-influxdb/pom.xml                            |   2 +-
 thrift-sync/pom.xml                                |   2 +-
 thrift/pom.xml                                     |   2 +-
 thrift/src/main/thrift/rpc.thrift                  |   2 +
 tsfile/README.md                                   |   2 +-
 tsfile/pom.xml                                     |   2 +-
 .../write/chunk/AlignedChunkGroupWriterImpl.java   |   9 +-
 .../chunk/NonAlignedChunkGroupWriterImpl.java      |   6 +
 .../iotdb/tsfile/write/chunk/ValueChunkWriter.java |  12 +-
 .../apache/iotdb/tsfile/write/record/Tablet.java   |  15 +-
 .../iotdb/tsfile/write/TsFileWriteApiTest.java     |  99 ++++
 zeppelin-interpreter/pom.xml                       |   2 +-
 319 files changed, 6956 insertions(+), 2780 deletions(-)
 create mode 100644 confignode/src/assembly/resources/conf/confignode-env.bat
 create mode 100644 confignode/src/assembly/resources/conf/confignode-env.sh
 copy {server => confignode}/src/assembly/resources/conf/jmx.access (100%)
 copy {server => confignode}/src/assembly/resources/conf/jmx.password (100%)
 create mode 100644 confignode/src/assembly/resources/sbin/start-confignode.bat
 create mode 100644 confignode/src/assembly/resources/sbin/start-confignode.sh
 create mode 100644 confignode/src/assembly/resources/sbin/stop-confignode.bat
 rename confignode/src/{test/resources/iotdb-confignode.properties => assembly/resources/sbin/stop-confignode.sh} (50%)
 create mode 100644 confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfCheck.java
 copy server/src/main/java/org/apache/iotdb/mpp/execution/queue/IDIndexedAccessible.java => confignode/src/main/java/org/apache/iotdb/confignode/exception/ConfigNodeException.java (81%)
 copy confignode/src/main/java/org/apache/iotdb/confignode/{conf/ConfigNodeConstant.java => exception/conf/RepeatConfigurationException.java} (60%)
 copy confignode/src/main/java/org/apache/iotdb/confignode/{conf/ConfigNodeConstant.java => exception/startup/StartupException.java} (64%)
 create mode 100644 confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
 copy server/src/main/java/org/apache/iotdb/mpp/common/TsBlockMetadata.java => confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeMBean.java (87%)
 copy server/src/main/java/org/apache/iotdb/mpp/common/ITSBlock.java => confignode/src/main/java/org/apache/iotdb/confignode/service/register/IService.java (51%)
 create mode 100644 confignode/src/main/java/org/apache/iotdb/confignode/service/register/JMXService.java
 create mode 100644 confignode/src/main/java/org/apache/iotdb/confignode/service/register/RegisterManager.java
 copy confignode/src/main/java/org/apache/iotdb/confignode/{manager/hash/BKDRHashExecutor.java => service/register/ServiceType.java} (57%)
 copy server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskCallback.java => confignode/src/main/java/org/apache/iotdb/confignode/service/startup/StartupCheck.java (74%)
 create mode 100644 confignode/src/main/java/org/apache/iotdb/confignode/service/startup/StartupChecks.java
 rename confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/{impl => server}/ConfigNodeRPCServer.java (97%)
 delete mode 100644 confignode/src/test/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptorTest.java
 create mode 100644 confignode/src/test/java/org/apache/iotdb/confignode/utils/ConfigNodeEnvironmentUtils.java
 create mode 100644 integration/src/test/java/org/apache/iotdb/db/integration/IoTDBCompactionWithIDTableIT.java
 create mode 100644 integration/src/test/java/org/apache/iotdb/db/integration/IoTDBResultMetadataIT.java
 copy integration/src/test/java/org/apache/iotdb/db/integration/aligned/{IoTDBGroupByQueryWithoutValueFilterIT.java => IoTDBGroupBySlidingWindowQueryWithValueFilterIT.java} (61%)
 copy integration/src/test/java/org/apache/iotdb/db/integration/aligned/{IoTDBGroupByQueryWithoutValueFilterIT.java => IoTDBGroupBySlidingWindowQueryWithoutValueFilterIT.java} (63%)
 rename integration/src/test/java/org/apache/iotdb/db/integration/{ => groupby}/IOTDBGroupByIT.java (99%)
 rename integration/src/test/java/org/apache/iotdb/db/integration/{ => groupby}/IOTDBGroupByInnerIntervalIT.java (99%)
 rename integration/src/test/java/org/apache/iotdb/db/integration/{ => groupby}/IoTDBGroupByFillIT.java (99%)
 rename integration/src/test/java/org/apache/iotdb/db/integration/{ => groupby}/IoTDBGroupByFillMixPathsIT.java (99%)
 rename integration/src/test/java/org/apache/iotdb/db/integration/{ => groupby}/IoTDBGroupByFillWithRangeIT.java (99%)
 rename integration/src/test/java/org/apache/iotdb/db/integration/{ => groupby}/IoTDBGroupByMonthFillIT.java (99%)
 rename integration/src/test/java/org/apache/iotdb/db/integration/{ => groupby}/IoTDBGroupByMonthIT.java (75%)
 rename integration/src/test/java/org/apache/iotdb/db/integration/{ => groupby}/IoTDBGroupByUnseqIT.java (99%)
 copy {example/mqtt => iotdb-commons}/pom.xml (76%)
 copy {confignode/src/main/java/org/apache/iotdb/confignode/manager => iotdb-commons/src/main/java/org/apache/iotdb/commons}/hash/APHashExecutor.java (96%)
 copy {confignode/src/main/java/org/apache/iotdb/confignode/manager => iotdb-commons/src/main/java/org/apache/iotdb/commons}/hash/BKDRHashExecutor.java (96%)
 rename {confignode/src/main/java/org/apache/iotdb/confignode/manager => iotdb-commons/src/main/java/org/apache/iotdb/commons}/hash/DeviceGroupHashExecutor.java (95%)
 copy {confignode/src/main/java/org/apache/iotdb/confignode/manager => iotdb-commons/src/main/java/org/apache/iotdb/commons}/hash/JSHashExecutor.java (96%)
 rename {confignode/src/main/java/org/apache/iotdb/confignode/manager => iotdb-commons/src/main/java/org/apache/iotdb/commons}/hash/SDBMHashExecutor.java (96%)
 rename server/src/main/java/org/apache/iotdb/{mpp/shuffle => db/mpp/buffer}/IDataBlockManager.java (94%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
 copy confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java => server/src/main/java/org/apache/iotdb/db/mpp/buffer/SourceHandle.java (67%)
 copy server/src/main/java/org/apache/iotdb/{mpp/common/TsBlockMetadata.java => db/mpp/common/Analysis.java} (77%)
 copy server/src/main/java/org/apache/iotdb/{mpp/common/TsBlockMetadata.java => db/mpp/common/FillPolicy.java} (85%)
 copy server/src/main/java/org/apache/iotdb/{mpp/execution/ExecutionContext.java => db/mpp/common/GroupByTimeParameter.java} (66%)
 copy server/src/main/java/org/apache/iotdb/{mpp/execution/queue/IDIndexedAccessible.java => db/mpp/common/OrderBy.java} (75%)
 copy server/src/main/java/org/apache/iotdb/{mpp/execution/queue/IDIndexedAccessible.java => db/mpp/common/QueryContext.java} (69%)
 copy server/src/main/java/org/apache/iotdb/{mpp/execution/queue/IDIndexedAccessible.java => db/mpp/common/QueryId.java} (76%)
 copy server/src/main/java/org/apache/iotdb/{mpp/common/TsBlockMetadata.java => db/mpp/common/QuerySession.java} (86%)
 copy server/src/main/java/org/apache/iotdb/{mpp/execution/task/FragmentInstanceTaskStatus.java => db/mpp/common/TreeNode.java} (60%)
 rename server/src/main/java/org/apache/iotdb/{mpp/common/ITSBlock.java => db/mpp/common/TsBlock.java} (70%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlockMetadata.java
 copy server/src/main/java/org/apache/iotdb/{mpp/common/TsBlockMetadata.java => db/mpp/common/WithoutPolicy.java} (83%)
 copy server/src/main/java/org/apache/iotdb/db/{query/dataset/groupby/AlignedGroupByExecutor.java => mpp/execution/Coordinator.java} (57%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/execution/ExecFragmentInstance.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryScheduler.java
 copy server/src/main/java/org/apache/iotdb/{mpp/execution/ExecutionContext.java => db/mpp/execution/QueryStateMachine.java} (76%)
 rename server/src/main/java/org/apache/iotdb/{ => db}/mpp/memory/LocalMemoryManager.java (97%)
 rename server/src/main/java/org/apache/iotdb/{ => db}/mpp/memory/MemoryPool.java (98%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/operator/Operator.java
 copy confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java => server/src/main/java/org/apache/iotdb/db/mpp/operator/OperatorContext.java (57%)
 copy confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java => server/src/main/java/org/apache/iotdb/db/mpp/operator/SeriesScanOperator.java (70%)
 copy confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java => server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributedQueryPlan.java (59%)
 copy confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java => server/src/main/java/org/apache/iotdb/db/mpp/plan/DistributionPlanner.java (62%)
 copy confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java => server/src/main/java/org/apache/iotdb/db/mpp/plan/FragmentInstance.java (65%)
 copy server/src/main/java/org/apache/iotdb/{mpp/execution/queue/IDIndexedAccessible.java => db/mpp/plan/FragmentInstanceId.java} (70%)
 rename confignode/src/main/java/org/apache/iotdb/confignode/manager/hash/APHashExecutor.java => server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalPlanner.java (54%)
 copy confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java => server/src/main/java/org/apache/iotdb/db/mpp/plan/LogicalQueryPlan.java (62%)
 copy confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java => server/src/main/java/org/apache/iotdb/db/mpp/plan/PlanFragment.java (64%)
 copy server/src/main/java/org/apache/iotdb/{mpp/common/TsBlockMetadata.java => db/mpp/plan/PlanFragmentId.java} (79%)
 copy confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java => server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNode.java (58%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/plan/node/PlanNodeId.java
 rename server/src/main/java/org/apache/iotdb/{mpp/common/TsBlockMetadata.java => db/mpp/plan/node/PlanNodeIdAllocator.java} (81%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/DeviceMergeNode.java
 copy confignode/src/main/java/org/apache/iotdb/confignode/manager/hash/JSHashExecutor.java => server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FillNode.java (58%)
 rename confignode/src/main/java/org/apache/iotdb/confignode/manager/hash/BKDRHashExecutor.java => server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNode.java (58%)
 copy confignode/src/main/java/org/apache/iotdb/confignode/manager/hash/JSHashExecutor.java => server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/FilterNullNode.java (55%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/GroupByLevelNode.java
 copy confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java => server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/LimitNode.java (61%)
 copy server/src/main/java/org/apache/iotdb/{mpp/execution/task/FragmentInstanceTaskStatus.java => db/mpp/plan/node/process/OffsetNode.java} (60%)
 copy confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java => server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/ProcessNode.java (68%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/RowBasedSeriesAggregateNode.java
 copy confignode/src/main/java/org/apache/iotdb/confignode/manager/hash/JSHashExecutor.java => server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/SortNode.java (56%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/plan/node/process/TimeJoinNode.java
 copy confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java => server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/CsvSinkNode.java (74%)
 copy confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java => server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/FragmentSinkNode.java (68%)
 copy confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java => server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/SinkNode.java (69%)
 copy confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java => server/src/main/java/org/apache/iotdb/db/mpp/plan/node/sink/ThriftSinkNode.java (70%)
 copy confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java => server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/CsvSourceNode.java (69%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesAggregateNode.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SeriesScanNode.java
 copy confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java => server/src/main/java/org/apache/iotdb/db/mpp/plan/node/source/SourceNode.java (67%)
 copy server/src/main/java/org/apache/iotdb/{mpp/execution/queue/IDIndexedAccessible.java => db/mpp/plan/optimzation/PlanOptimizer.java} (67%)
 rename server/src/main/java/org/apache/iotdb/{mpp/execution => db/mpp/schedule}/ExecutionContext.java (89%)
 rename server/src/main/java/org/apache/iotdb/{mpp/execution => db/mpp/schedule}/FragmentInstanceManager.java (92%)
 rename server/src/main/java/org/apache/iotdb/{mpp/execution => db/mpp/schedule}/FragmentInstanceTaskCallback.java (90%)
 rename server/src/main/java/org/apache/iotdb/{mpp/execution => db/mpp/schedule}/FragmentInstanceTaskExecutor.java (90%)
 rename server/src/main/java/org/apache/iotdb/{mpp/execution => db/mpp/schedule}/FragmentInstanceTimeoutSentinel.java (91%)
 rename server/src/main/java/org/apache/iotdb/{mpp/execution => db/mpp/schedule}/IFragmentInstanceManager.java (80%)
 rename server/src/main/java/org/apache/iotdb/{mpp/execution => db/mpp/schedule}/queue/ID.java (94%)
 copy server/src/main/java/org/apache/iotdb/{mpp/execution => db/mpp/schedule}/queue/IDIndexedAccessible.java (95%)
 rename server/src/main/java/org/apache/iotdb/{mpp/execution => db/mpp/schedule}/queue/IndexedBlockingQueue.java (99%)
 rename server/src/main/java/org/apache/iotdb/{mpp/execution => db/mpp/schedule}/queue/L1PriorityQueue.java (98%)
 rename server/src/main/java/org/apache/iotdb/{mpp/execution => db/mpp/schedule}/queue/L2PriorityQueue.java (98%)
 rename server/src/main/java/org/apache/iotdb/{mpp/execution => db/mpp/schedule}/task/FragmentInstanceID.java (95%)
 rename server/src/main/java/org/apache/iotdb/{mpp/execution => db/mpp/schedule}/task/FragmentInstanceTask.java (93%)
 rename server/src/main/java/org/apache/iotdb/{mpp/execution => db/mpp/schedule}/task/FragmentInstanceTaskStatus.java (96%)
 rename server/src/main/java/org/apache/iotdb/{mpp/execution/queue/IDIndexedAccessible.java => db/query/aggregation/RemovableAggregateResult.java} (73%)
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeDataSet.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByTimeEngineDataSet.java
 rename server/src/main/java/org/apache/iotdb/db/query/{dataset => executor}/groupby/AlignedGroupByExecutor.java (96%)
 rename server/src/main/java/org/apache/iotdb/db/query/{dataset => executor}/groupby/GroupByExecutor.java (96%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/query/executor/groupby/SlidingWindowGroupByExecutor.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/query/executor/groupby/SlidingWindowGroupByExecutorFactory.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/query/executor/groupby/impl/EmptyQueueSlidingWindowGroupByExecutor.java
 rename server/src/main/java/org/apache/iotdb/db/query/{dataset/groupby => executor/groupby/impl}/LocalAlignedGroupByExecutor.java (98%)
 rename server/src/main/java/org/apache/iotdb/db/query/{dataset/groupby => executor/groupby/impl}/LocalGroupByExecutor.java (98%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/query/executor/groupby/impl/MonotonicQueueSlidingWindowGroupByExecutor.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/query/executor/groupby/impl/NormalQueueSlidingWindowGroupByExecutor.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/query/executor/groupby/impl/SmoothQueueSlidingWindowGroupByExecutor.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/AggrWindowIterator.java
 rename confignode/src/main/java/org/apache/iotdb/confignode/manager/hash/JSHashExecutor.java => server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/ITimeRangeIterator.java (60%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/PreAggrWindowIterator.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/PreAggrWindowWithNaturalMonthIterator.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/utils/timerangeiterator/TimeRangeIteratorFactory.java
 rename server/src/test/java/org/apache/iotdb/{ => db}/mpp/memory/MemoryPoolTest.java (99%)
 rename server/src/test/java/org/apache/iotdb/{mpp/execution => db/mpp/schedule}/queue/L1PriorityQueueTest.java (98%)
 rename server/src/test/java/org/apache/iotdb/{mpp/execution => db/mpp/schedule}/queue/L2PriorityQueueTest.java (98%)
 rename server/src/test/java/org/apache/iotdb/{mpp/execution => db/mpp/schedule}/queue/QueueElement.java (97%)
 copy server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/{GroupByTimeDataSetTest.java => GroupByDataSetTest.java} (98%)
 delete mode 100644 server/src/test/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSetTest.java
 create mode 100644 server/src/test/java/org/apache/iotdb/db/utils/TimeRangeIteratorTest.java

[iotdb] 03/06: [To_new_mpp] add common classes & thrift files (#5239)

Posted by er...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch new_mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4a85d31fc1f704a3fe3b28efcd65a26dfec0dbfd
Author: Zhong Wang <wa...@alibaba-inc.com>
AuthorDate: Tue Mar 15 16:14:08 2022 +0800

    [To_new_mpp] add common classes & thrift files (#5239)
---
 .../java/org/apache/iotdb/mpp/common/ITSBlock.java | 45 ++++++++++++++++++++++
 .../apache/iotdb/mpp/common/TsBlockMetadata.java   | 22 +++++++++++
 thrift/src/main/thrift/common.thrift               | 27 +++++++++++++
 3 files changed, 94 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/mpp/common/ITSBlock.java b/server/src/main/java/org/apache/iotdb/mpp/common/ITSBlock.java
new file mode 100644
index 0000000..449e0fc
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/common/ITSBlock.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.mpp.common;
+
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+/**
+ * Intermediate result for most of ExecOperators. The TsBlock contains data from one or more columns
+ * and constructs them as a row based view The columns can be series, aggregation result for one
+ * series or scalar value (such as deviceName). The TsBlock also contains the metadata to describe
+ * the columns.
+ */
+public class ITSBlock {
+
+  private TsBlockMetadata metadata;
+
+  public boolean hasNext() {
+    return false;
+  }
+
+  public RowRecord getNext() {
+    return null;
+  }
+
+  public TsBlockMetadata getMetadata() {
+    return metadata;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/common/TsBlockMetadata.java b/server/src/main/java/org/apache/iotdb/mpp/common/TsBlockMetadata.java
new file mode 100644
index 0000000..ed7680f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/common/TsBlockMetadata.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.mpp.common;
+
+public class TsBlockMetadata {}
diff --git a/thrift/src/main/thrift/common.thrift b/thrift/src/main/thrift/common.thrift
new file mode 100644
index 0000000..967deed
--- /dev/null
+++ b/thrift/src/main/thrift/common.thrift
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace java org.apache.iotdb.mpp.common.rpc.thrift
+
+
+struct FragmentInstanceID {
+  1: required string queryID
+  2: required string fragmentID
+  3: required string instanceID
+}

[iotdb] 02/06: [To_new_mpp] Basic query memory control (#5216)

Posted by er...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch new_mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d31733fcef9f3a321ea97aacc22a26b8a8f649f0
Author: Zhong Wang <wa...@alibaba-inc.com>
AuthorDate: Tue Mar 15 09:22:55 2022 +0800

    [To_new_mpp] Basic query memory control (#5216)
    
    * [To_new_mpp] Basic query memory control
    
    * Add license
---
 .../iotdb/mpp/memory/LocalMemoryManager.java       |  46 +++++++
 .../org/apache/iotdb/mpp/memory/MemoryPool.java    |  90 +++++++++++++
 .../apache/iotdb/mpp/memory/MemoryPoolTest.java    | 150 +++++++++++++++++++++
 3 files changed, 286 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/mpp/memory/LocalMemoryManager.java b/server/src/main/java/org/apache/iotdb/mpp/memory/LocalMemoryManager.java
new file mode 100644
index 0000000..cc5305e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/memory/LocalMemoryManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.mpp.memory;
+
+/**
+ * Manages memory of a data node. The memory is divided into two memory pools so that the memory for
+ * read and for write can be isolated.
+ */
+public class LocalMemoryManager {
+
+  private final long maxBytes;
+  private final MemoryPool queryPool;
+
+  public LocalMemoryManager() {
+    long maxMemory = Runtime.getRuntime().maxMemory();
+    // Save 20% memory for untracked allocations.
+    maxBytes = (long) (maxMemory * 0.8);
+    // Allocate 50% memory for query execution.
+    queryPool = new MemoryPool("query", (long) (maxBytes * 0.5));
+  }
+
+  public long getMaxBytes() {
+    return maxBytes;
+  }
+
+  public MemoryPool getQueryPool() {
+    return queryPool;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/mpp/memory/MemoryPool.java
new file mode 100644
index 0000000..29b7228
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/memory/MemoryPool.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.mpp.memory;
+
+import org.apache.commons.lang3.Validate;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Manages certain amount of memory. */
+public class MemoryPool {
+
+  private final String id;
+  private final long maxBytes;
+
+  private long reservedBytes = 0L;
+  private final Map<String, Long> queryMemoryReservations = new HashMap<>();
+
+  public MemoryPool(String id, long maxBytes) {
+    this.id = Validate.notNull(id);
+    Validate.isTrue(maxBytes > 0L);
+    this.maxBytes = maxBytes;
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public long getMaxBytes() {
+    return maxBytes;
+  }
+
+  public boolean tryReserve(String queryId, long bytes) {
+    Validate.notNull(queryId);
+    Validate.isTrue(bytes > 0L);
+
+    synchronized (this) {
+      if (maxBytes - reservedBytes < bytes) {
+        return false;
+      }
+      reservedBytes += bytes;
+      queryMemoryReservations.merge(queryId, bytes, Long::sum);
+    }
+
+    return true;
+  }
+
+  public synchronized void free(String queryId, long bytes) {
+    Validate.notNull(queryId);
+    Validate.isTrue(bytes > 0L);
+
+    Long queryReservedBytes = queryMemoryReservations.get(queryId);
+    Validate.notNull(queryReservedBytes);
+    Validate.isTrue(bytes <= queryReservedBytes);
+
+    queryReservedBytes -= bytes;
+    if (queryReservedBytes == 0) {
+      queryMemoryReservations.remove(queryId);
+    } else {
+      queryMemoryReservations.put(queryId, queryReservedBytes);
+    }
+
+    reservedBytes -= bytes;
+  }
+
+  public synchronized long getQueryMemoryReservedBytes(String queryId) {
+    return queryMemoryReservations.getOrDefault(queryId, 0L);
+  }
+
+  public long getReservedBytes() {
+    return reservedBytes;
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/mpp/memory/MemoryPoolTest.java b/server/src/test/java/org/apache/iotdb/mpp/memory/MemoryPoolTest.java
new file mode 100644
index 0000000..cb76b93
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/mpp/memory/MemoryPoolTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.mpp.memory;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MemoryPoolTest {
+
+  MemoryPool pool;
+
+  @Before
+  public void before() {
+    pool = new MemoryPool("test", 1024L);
+  }
+
+  @Test
+  public void testReserve() {
+    String queryId = "q0";
+    Assert.assertTrue(pool.tryReserve(queryId, 512L));
+    Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(512L, pool.getReservedBytes());
+    Assert.assertEquals(1024L, pool.getMaxBytes());
+  }
+
+  @Test
+  public void testReserveZero() {
+    String queryId = "q0";
+    try {
+      pool.tryReserve(queryId, 0L);
+      Assert.fail("Expect IllegalArgumentException");
+    } catch (IllegalArgumentException ignore) {
+    }
+  }
+
+  @Test
+  public void testReserveNegative() {
+    String queryId = "q0";
+    try {
+      pool.tryReserve(queryId, -1L);
+      Assert.fail("Expect IllegalArgumentException");
+    } catch (IllegalArgumentException ignore) {
+    }
+  }
+
+  @Test
+  public void testReserveAll() {
+    String queryId = "q0";
+    Assert.assertTrue(pool.tryReserve(queryId, 1024L));
+    Assert.assertEquals(1024L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(1024L, pool.getReservedBytes());
+    Assert.assertEquals(1024L, pool.getMaxBytes());
+  }
+
+  @Test
+  public void testOverReserve() {
+    String queryId = "q0";
+    Assert.assertFalse(pool.tryReserve(queryId, 1025L));
+    Assert.assertEquals(0L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(0L, pool.getReservedBytes());
+    Assert.assertEquals(1024L, pool.getMaxBytes());
+  }
+
+  @Test
+  public void testFree() {
+    String queryId = "q0";
+    Assert.assertTrue(pool.tryReserve(queryId, 512L));
+    Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(512L, pool.getReservedBytes());
+    Assert.assertEquals(1024L, pool.getMaxBytes());
+
+    pool.free(queryId, 256L);
+    Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(256L, pool.getReservedBytes());
+  }
+
+  @Test
+  public void testFreeAll() {
+    String queryId = "q0";
+    Assert.assertTrue(pool.tryReserve(queryId, 512L));
+    Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(512L, pool.getReservedBytes());
+    Assert.assertEquals(1024L, pool.getMaxBytes());
+
+    pool.free(queryId, 512L);
+    Assert.assertEquals(0L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(0L, pool.getReservedBytes());
+  }
+
+  @Test
+  public void testFreeZero() {
+    String queryId = "q0";
+    Assert.assertTrue(pool.tryReserve(queryId, 512L));
+    Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(512L, pool.getReservedBytes());
+    Assert.assertEquals(1024L, pool.getMaxBytes());
+
+    pool.free(queryId, 256L);
+    Assert.assertEquals(256L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(256L, pool.getReservedBytes());
+  }
+
+  @Test
+  public void testFreeNegative() {
+    String queryId = "q0";
+    Assert.assertTrue(pool.tryReserve(queryId, 512L));
+    Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(512L, pool.getReservedBytes());
+    Assert.assertEquals(1024L, pool.getMaxBytes());
+
+    try {
+      pool.free(queryId, -1L);
+      Assert.fail("Expect IllegalArgumentException");
+    } catch (IllegalArgumentException ignore) {
+    }
+  }
+
+  @Test
+  public void testOverFree() {
+    String queryId = "q0";
+    Assert.assertTrue(pool.tryReserve(queryId, 512L));
+    Assert.assertEquals(512L, pool.getQueryMemoryReservedBytes(queryId));
+    Assert.assertEquals(512L, pool.getReservedBytes());
+    Assert.assertEquals(1024L, pool.getMaxBytes());
+
+    try {
+      pool.free(queryId, 513L);
+      Assert.fail("Expect IllegalArgumentException");
+    } catch (IllegalArgumentException ignore) {
+    }
+  }
+}

[iotdb] 01/06: Basic implementation of FragmentInstanceManager

Posted by er...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch new_mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 678f7126e8da5722cdd37f350cbfd08861e6ba37
Author: ericpai <er...@hotmail.com>
AuthorDate: Mon Mar 14 18:08:07 2022 +0800

    Basic implementation of FragmentInstanceManager
---
 .../org/apache/iotdb/db/service/ServiceType.java   |   3 +-
 .../iotdb/mpp/execution/ExecutionContext.java      |  24 ++++
 .../mpp/execution/FragmentInstanceManager.java     | 101 ++++++++++++++
 .../execution/FragmentInstanceTaskExecutor.java    |  51 +++++++
 .../execution/FragmentInstanceTimeoutSentinel.java |  52 ++++++++
 .../mpp/execution/IFragmentInstanceManager.java    |  50 +++++++
 .../org/apache/iotdb/mpp/execution/queue/ID.java   |  22 +++
 .../mpp/execution/queue/IDIndexedAccessible.java   |  27 ++++
 .../mpp/execution/queue/IndexedBlockingQueue.java  | 148 +++++++++++++++++++++
 .../iotdb/mpp/execution/queue/L1PriorityQueue.java |  76 +++++++++++
 .../iotdb/mpp/execution/queue/L2PriorityQueue.java |  89 +++++++++++++
 .../mpp/execution/task/FragmentInstanceID.java     |  67 ++++++++++
 .../mpp/execution/task/FragmentInstanceTask.java   | 138 +++++++++++++++++++
 .../execution/task/FragmentInstanceTaskStatus.java |  37 ++++++
 14 files changed, 884 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index 95edca6..9e7e19e 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -66,7 +66,8 @@ public enum ServiceType {
       "Cluster Data Heartbeat RPC Service", "ClusterDataHeartbeatRPCService"),
   CLUSTER_META_ENGINE("Cluster Meta Engine", "ClusterMetaEngine"),
   CLUSTER_DATA_ENGINE("Cluster Data Engine", "ClusterDataEngine"),
-  REST_SERVICE("REST Service", "REST Service");
+  REST_SERVICE("REST Service", "REST Service"),
+  FRAGMENT_INSTANCE_MANAGER_SERVICE("Fragment instance manager", "FragmentInstanceManager");
 
   private final String name;
   private final String jmxName;
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/ExecutionContext.java b/server/src/main/java/org/apache/iotdb/mpp/execution/ExecutionContext.java
new file mode 100644
index 0000000..7428b73
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/ExecutionContext.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution;
+
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+
+/** The execution context of a {@link FragmentInstanceTask} */
+public class ExecutionContext {}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java
new file mode 100644
index 0000000..4c7c157
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution;
+
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.mpp.execution.queue.IndexedBlockingQueue;
+import org.apache.iotdb.mpp.execution.queue.L1PriorityQueue;
+import org.apache.iotdb.mpp.execution.queue.L2PriorityQueue;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceID;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** the manager of fragment instances scheduling */
+public class FragmentInstanceManager implements IFragmentInstanceManager, IService {
+
+  public static IFragmentInstanceManager getInstance() {
+    return InstanceHolder.instance;
+  }
+
+  private final IndexedBlockingQueue<FragmentInstanceTask> readyQueue;
+  private final IndexedBlockingQueue<FragmentInstanceTask> timeoutQueue;
+  private final Map<String, List<FragmentInstanceID>> queryMap;
+
+  private static final int MAX_CAPACITY = 1000; // TODO: load from config files
+  private static final int WORKER_THREAD_NUM = 4; // TODO: load from config files
+  private final ThreadGroup workerGroups = new ThreadGroup("ScheduleThreads");
+
+  public FragmentInstanceManager() {
+    this.readyQueue =
+        new L2PriorityQueue<>(
+            MAX_CAPACITY,
+            new FragmentInstanceTask.SchedulePriorityComparator(),
+            new FragmentInstanceTask());
+    this.timeoutQueue =
+        new L1PriorityQueue<>(
+            MAX_CAPACITY,
+            new FragmentInstanceTask.SchedulePriorityComparator(),
+            new FragmentInstanceTask());
+    this.queryMap = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public void start() throws StartupException {
+    for (int i = 0; i < WORKER_THREAD_NUM; i++) {
+      new FragmentInstanceTaskExecutor("Worker-Thread-" + i, workerGroups, readyQueue).start();
+    }
+    new FragmentInstanceTimeoutSentinel("Sentinel-Thread", workerGroups, timeoutQueue).start();
+  }
+
+  @Override
+  public void stop() {
+    workerGroups.interrupt();
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.FRAGMENT_INSTANCE_MANAGER_SERVICE;
+  }
+
+  @Override
+  public void submitFragmentInstance() {}
+
+  @Override
+  public void inputBlockAvailable(
+      FragmentInstanceID instanceID, FragmentInstanceID upstreamInstanceId) {}
+
+  @Override
+  public void outputBlockAvailable(
+      FragmentInstanceID instanceID, FragmentInstanceID downstreamInstanceId) {}
+
+  @Override
+  public void abortQuery(String queryId) {}
+
+  private static class InstanceHolder {
+
+    private InstanceHolder() {}
+
+    private static final IFragmentInstanceManager instance = new FragmentInstanceManager();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
new file mode 100644
index 0000000..5c704db
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution;
+
+import org.apache.iotdb.mpp.execution.queue.IndexedBlockingQueue;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** the worker thread of {@link FragmentInstanceTask} */
+public class FragmentInstanceTaskExecutor extends Thread {
+
+  private static final Logger logger = LoggerFactory.getLogger(FragmentInstanceTaskExecutor.class);
+
+  private final IndexedBlockingQueue<FragmentInstanceTask> queue;
+
+  public FragmentInstanceTaskExecutor(
+      String workerId, ThreadGroup tg, IndexedBlockingQueue<FragmentInstanceTask> queue) {
+    super(tg, workerId);
+    this.queue = queue;
+  }
+
+  @Override
+  public void run() {
+    try {
+      while (true) {
+        FragmentInstanceTask next = queue.poll();
+        // do logic here
+      }
+    } catch (InterruptedException e) {
+      logger.info("{} is interrupted.", this.getName());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java
new file mode 100644
index 0000000..7b352ba
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution;
+
+import org.apache.iotdb.mpp.execution.queue.IndexedBlockingQueue;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** the thread for watching the timeout of {@link FragmentInstanceTask} */
+public class FragmentInstanceTimeoutSentinel extends Thread {
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(FragmentInstanceTimeoutSentinel.class);
+
+  private final IndexedBlockingQueue<FragmentInstanceTask> queue;
+
+  public FragmentInstanceTimeoutSentinel(
+      String workerId, ThreadGroup tg, IndexedBlockingQueue<FragmentInstanceTask> queue) {
+    super(tg, workerId);
+    this.queue = queue;
+  }
+
+  @Override
+  public void run() {
+    try {
+      while (true) {
+        FragmentInstanceTask next = queue.poll();
+        // do logic here
+      }
+    } catch (InterruptedException e) {
+      logger.info("{} is interrupted.", this.getName());
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
new file mode 100644
index 0000000..e0eecfa
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution;
+
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceID;
+
+/** the interface of fragment instance scheduling */
+public interface IFragmentInstanceManager {
+
+  void submitFragmentInstance();
+
+  /**
+   * the notifying interface for {@link DataBlockManager} when upstream data comes.
+   *
+   * @param instanceID the fragment instance to be notified.
+   * @param upstreamInstanceId the upstream instance id.
+   */
+  void inputBlockAvailable(FragmentInstanceID instanceID, FragmentInstanceID upstreamInstanceId);
+
+  /**
+   * the notifying interface for {@link DataBlockManager} when downstream data has been consumed.
+   *
+   * @param instanceID the fragment instance to be notified.
+   * @param downstreamInstanceId the downstream instance id.
+   */
+  void outputBlockAvailable(FragmentInstanceID instanceID, FragmentInstanceID downstreamInstanceId);
+
+  /**
+   * abort all the instances in this query
+   *
+   * @param queryId the id of the query to be aborted.
+   */
+  void abortQuery(String queryId);
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/ID.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/ID.java
new file mode 100644
index 0000000..cc7d58f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/ID.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+/** A simple interface to indicate the id type */
+public interface ID {}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IDIndexedAccessible.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IDIndexedAccessible.java
new file mode 100644
index 0000000..5ae4c96
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IDIndexedAccessible.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+/** A simple interface for id getter and setter */
+public interface IDIndexedAccessible {
+
+  ID getId();
+
+  void setId(ID id);
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java
new file mode 100644
index 0000000..bcabb5c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+/**
+ * The base class of a special kind of blocking queue, which has these characters:
+ *
+ * <p>1. Thread-safe.
+ *
+ * <p>2. Can poll from queue head. When the queue is empty, the poll() will be blocked until an
+ * element is inserted.
+ *
+ * <p>3. Can push a non-null element to queue. When the queue is beyond the max size, an exception
+ * will be thrown.
+ *
+ * <p>4. Can remove an element by a long type id.
+ */
+public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
+
+  private final int MAX_CAPACITY;
+  private final E queryHolder;
+  private int size;
+
+  /**
+   * Init the queue with a max capacity. The queryHolder is just a simple reused object in query to
+   * avoid small objects allocation. It should be not used in any other places out of the queue as
+   * the id may be mutated.
+   *
+   * @param maxCapacity the max capacity of the queue.
+   * @param queryHolder the query holder instance.
+   * @throws IllegalArgumentException if maxCapacity <= 0.
+   */
+  public IndexedBlockingQueue(int maxCapacity, E queryHolder) {
+    this.MAX_CAPACITY = maxCapacity;
+    this.queryHolder = queryHolder;
+  }
+
+  /**
+   * Get and remove the first element of the queue. If the queue is empty, this call will be blocked
+   * until an element has been pushed.
+   *
+   * @return the queue head element.
+   */
+  public synchronized E poll() throws InterruptedException {
+    while (isEmpty()) {
+      this.wait();
+    }
+    E output = pollFirst();
+    size--;
+    return output;
+  }
+
+  /**
+   * Push an element to the queue. The new element position is determined by the implementation. If
+   * the queue size has been reached the maxCapacity, an {@link IllegalStateException} will be
+   * thrown. If the element is null, an {@link NullPointerException} will be thrown.
+   *
+   * @param element the element to be pushed.
+   * @throws NullPointerException the pushed element is null.
+   * @throws IllegalStateException the queue size has been reached the maxCapacity.
+   */
+  public synchronized void push(E element) {
+    if (element == null) {
+      throw new NullPointerException("pushed element is null");
+    }
+    if (size + 1 > MAX_CAPACITY) {
+      throw new IllegalStateException("the queue is full");
+    }
+    pushToQueue(element);
+    this.notifyAll();
+  }
+
+  /**
+   * Remove and return the element by a long id. It returns null if it doesn't exist.
+   *
+   * @param id the id of the element to be removed.
+   * @return the removed element.
+   */
+  public synchronized E remove(ID id) {
+    queryHolder.setId(id);
+    E output = remove(queryHolder);
+    if (output == null) {
+      return null;
+    }
+    size--;
+    return output;
+  }
+
+  /**
+   * Get the current queue size.
+   *
+   * @return the current queue size.
+   */
+  public final synchronized int size() {
+    return size;
+  }
+
+  /**
+   * Whether the queue is empty.
+   *
+   * <p>This implementation needn't be thread-safe.
+   *
+   * @return true if the queue is empty, otherwise false.
+   */
+  protected abstract boolean isEmpty();
+
+  /**
+   * Get and remove the first element.
+   *
+   * <p>This implementation needn't be thread-safe.
+   *
+   * @return The first element.
+   */
+  protected abstract E pollFirst();
+
+  /**
+   * Push the element into the queue.
+   *
+   * <p>This implementation needn't be thread-safe.
+   *
+   * @param element the element to be pushed.
+   */
+  protected abstract void pushToQueue(E element);
+
+  /**
+   * Remove and return the element by a long id. It returns null if it doesn't exist.
+   *
+   * @param element the element to be removed.
+   * @return the removed element.
+   */
+  protected abstract E remove(E element);
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java
new file mode 100644
index 0000000..a5ccf14
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+import java.util.Comparator;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * An efficient subclass of {@link IndexedBlockingQueue} with 1-level priority groups.
+ *
+ * <p>The time complexity of operations are:
+ *
+ * <ul>
+ *   <li><b>{@link #size()}: </b> O(1).
+ *   <li><b>{@link #remove(IDIndexedAccessible)} ()}: </b> O(logN).
+ *   <li><b>{@link #push(IDIndexedAccessible)}: </b> O(logN).
+ *   <li><b>{@link #poll()}: </b> O(logN).
+ * </ul>
+ */
+public class L1PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlockingQueue<E> {
+
+  // Here we use a map not a set to act as a queue because we need to get the element reference
+  // after it was removed.
+  private final SortedMap<E, E> elements;
+
+  /**
+   * Init the queue with max capacity and specified comparator.
+   *
+   * @see IndexedBlockingQueue
+   * @param maxCapacity the max capacity of the queue.
+   * @param comparator the comparator for comparing the elements.
+   * @param queryHolder the query holder instance.
+   * @throws IllegalArgumentException if maxCapacity <= 0.
+   */
+  public L1PriorityQueue(int maxCapacity, Comparator<E> comparator, E queryHolder) {
+    super(maxCapacity, queryHolder);
+    this.elements = new TreeMap<>(comparator);
+  }
+
+  @Override
+  protected boolean isEmpty() {
+    return elements.isEmpty();
+  }
+
+  @Override
+  protected E pollFirst() {
+    return elements.remove(elements.firstKey());
+  }
+
+  @Override
+  protected void pushToQueue(E element) {
+    elements.put(element, element);
+  }
+
+  @Override
+  protected E remove(E element) {
+    return elements.remove(element);
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java
new file mode 100644
index 0000000..3b8a9e6
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+import java.util.Comparator;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * An efficient subclass of {@link IndexedBlockingQueue} with 2-level priority groups. The
+ * advantages compared to {@link L1PriorityQueue} are that each element in this queue will not be
+ * starved to death by its low sequence order.
+ *
+ * <p>The time complexity of operations are:
+ *
+ * <ul>
+ *   <li><b>{@link #size()}: </b> O(1).
+ *   <li><b>{@link #remove(IDIndexedAccessible)} ()}: </b> O(logN).
+ *   <li><b>{@link #push(IDIndexedAccessible)}: </b> O(logN).
+ *   <li><b>{@link #poll()}: </b> O(logN).
+ * </ul>
+ */
+public class L2PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlockingQueue<E> {
+
+  // Here we use a map not a set to act as a queue because we need to get the element reference
+  // after it was removed.
+  private SortedMap<E, E> workingElements;
+  private SortedMap<E, E> idleElements;
+
+  /**
+   * Init the queue with max capacity and specified comparator.
+   *
+   * @see IndexedBlockingQueue
+   * @param maxCapacity the max capacity of the queue.
+   * @param comparator the comparator for comparing the elements.
+   * @param queryHolder the query holder instance.
+   * @throws IllegalArgumentException if maxCapacity <= 0.
+   */
+  public L2PriorityQueue(int maxCapacity, Comparator<E> comparator, E queryHolder) {
+    super(maxCapacity, queryHolder);
+    this.workingElements = new TreeMap<>(comparator);
+    this.idleElements = new TreeMap<>(comparator);
+  }
+
+  @Override
+  protected boolean isEmpty() {
+    return workingElements.isEmpty() && idleElements.isEmpty();
+  }
+
+  @Override
+  protected E pollFirst() {
+    if (workingElements.isEmpty()) {
+      SortedMap<E, E> tmp = workingElements;
+      workingElements = idleElements;
+      idleElements = tmp;
+    }
+    return workingElements.remove(workingElements.firstKey());
+  }
+
+  @Override
+  protected void pushToQueue(E element) {
+    idleElements.put(element, element);
+  }
+
+  @Override
+  protected E remove(E element) {
+    E e = workingElements.remove(element);
+    if (e == null) {
+      e = idleElements.remove(element);
+    }
+    return e;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceID.java b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceID.java
new file mode 100644
index 0000000..f52b5c3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceID.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.task;
+
+import org.apache.iotdb.mpp.execution.queue.ID;
+
+import org.jetbrains.annotations.NotNull;
+
+/** the class of id of the fragment instance */
+public class FragmentInstanceID implements ID, Comparable<FragmentInstanceTask> {
+
+  private final String instanceId;
+  private final String fragmentId;
+  private final String queryId;
+
+  public FragmentInstanceID(String queryId, String fragmentId, String instanceId) {
+    this.queryId = queryId;
+    this.fragmentId = fragmentId;
+    this.instanceId = instanceId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return o instanceof FragmentInstanceID
+        && queryId.equals(((FragmentInstanceID) o).getQueryId())
+        && fragmentId.equals(((FragmentInstanceID) o).getFragmentId())
+        && instanceId.equals(((FragmentInstanceID) o).getInstanceId());
+  }
+
+  public String toString() {
+    return String.format("%s.%s.%s", getInstanceId(), getFragmentId(), getQueryId());
+  }
+
+  public String getInstanceId() {
+    return instanceId;
+  }
+
+  public String getFragmentId() {
+    return fragmentId;
+  }
+
+  public String getQueryId() {
+    return queryId;
+  }
+
+  // This is the default comparator of FragmentInstanceID
+  @Override
+  public int compareTo(@NotNull FragmentInstanceTask o) {
+    return String.CASE_INSENSITIVE_ORDER.compare(this.toString(), o.toString());
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java
new file mode 100644
index 0000000..7bd7205
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.task;
+
+import org.apache.iotdb.mpp.execution.ExecutionContext;
+import org.apache.iotdb.mpp.execution.FragmentInstanceTaskExecutor;
+import org.apache.iotdb.mpp.execution.queue.ID;
+import org.apache.iotdb.mpp.execution.queue.IDIndexedAccessible;
+
+import java.util.Comparator;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * the scheduling element of {@link FragmentInstanceTaskExecutor}. It wraps a single
+ * FragmentInstance.
+ */
+public class FragmentInstanceTask implements IDIndexedAccessible {
+
+  private FragmentInstanceID id;
+  private FragmentInstanceTaskStatus status;
+  private final ExecutionContext executionContext;
+
+  // the higher this field is, the higher probability it will be scheduled.
+  private long schedulePriority;
+  private final long ddl;
+  private final Lock lock;
+
+  /** Initialize a dummy instance for queryHolder */
+  public FragmentInstanceTask() {
+    this(null, 0L, null);
+  }
+
+  public FragmentInstanceTask(
+      FragmentInstanceID id, long timeoutMs, FragmentInstanceTaskStatus status) {
+    this.id = id;
+    this.setStatus(status);
+    this.executionContext = new ExecutionContext();
+    this.schedulePriority = 0L;
+    this.ddl = System.currentTimeMillis() + timeoutMs;
+    this.lock = new ReentrantLock();
+  }
+
+  public FragmentInstanceID getId() {
+    return id;
+  }
+
+  @Override
+  public void setId(ID id) {
+    this.id = (FragmentInstanceID) id;
+  }
+
+  public FragmentInstanceTaskStatus getStatus() {
+    return status;
+  }
+
+  public void setStatus(FragmentInstanceTaskStatus status) {
+    this.status = status;
+  }
+
+  public ExecutionContext getExecutionContext() {
+    return executionContext;
+  }
+
+  /** Update the schedule priority according to the execution context. */
+  public void updateSchedulePriority() {
+    // TODO: need to implement here
+    this.schedulePriority = System.currentTimeMillis() - ddl;
+  }
+
+  public void lock() {
+    lock.lock();
+  }
+
+  public void unlock() {
+    lock.unlock();
+  }
+
+  public double getSchedulePriority() {
+    return schedulePriority;
+  }
+
+  public long getDDL() {
+    return ddl;
+  }
+
+  /** a comparator of ddl, the less the ddl is, the low order it has. */
+  public static class TimeoutComparator implements Comparator<FragmentInstanceTask> {
+
+    @Override
+    public int compare(FragmentInstanceTask o1, FragmentInstanceTask o2) {
+      if (o1.getId().equals(o2.getId())) {
+        return 0;
+      }
+      if (o1.getDDL() < o2.getDDL()) {
+        return -1;
+      }
+      if (o1.getDDL() > o2.getDDL()) {
+        return 1;
+      }
+      return o1.getId().compareTo(o2);
+    }
+  }
+
+  /** a comparator of ddl, the higher the schedulePriority is, the low order it has. */
+  public static class SchedulePriorityComparator implements Comparator<FragmentInstanceTask> {
+
+    @Override
+    public int compare(FragmentInstanceTask o1, FragmentInstanceTask o2) {
+      if (o1.getId().equals(o2.getId())) {
+        return 0;
+      }
+      if (o1.getSchedulePriority() > o2.getSchedulePriority()) {
+        return -1;
+      }
+      if (o1.getSchedulePriority() < o2.getSchedulePriority()) {
+        return 1;
+      }
+      return o1.getId().compareTo(o2);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTaskStatus.java b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTaskStatus.java
new file mode 100644
index 0000000..5ee9fb3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTaskStatus.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.task;
+
+/** the status enum of {@link FragmentInstanceTask} */
+public enum FragmentInstanceTaskStatus {
+  /* Ready to be executed */
+  READY,
+
+  /* Being executed */
+  RUNNING,
+
+  /* Waiting upstream input or output consumed by downstream FragmentInstances */
+  BLOCKED,
+
+  /* Interrupted caused by timeout or coordinator's cancellation */
+  ABORTED,
+
+  /* Finished by met the EOF of upstream inputs */
+  FINISHED,
+}

[iotdb] 04/06: [To_new_mpp] add IDataBlockManager (#5240)

Posted by er...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch new_mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6e4e2cb97aa41f7d931f1deabc5f849daba1e576
Author: Zhong Wang <wa...@alibaba-inc.com>
AuthorDate: Tue Mar 15 16:21:37 2022 +0800

    [To_new_mpp] add IDataBlockManager (#5240)
---
 .../mpp/execution/IFragmentInstanceManager.java    |  6 +-
 .../iotdb/mpp/shuffle/IDataBlockManager.java       | 80 ++++++++++++++++++++++
 2 files changed, 84 insertions(+), 2 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
index e0eecfa..3bda549 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
@@ -26,7 +26,8 @@ public interface IFragmentInstanceManager {
   void submitFragmentInstance();
 
   /**
-   * the notifying interface for {@link DataBlockManager} when upstream data comes.
+   * the notifying interface for {@link org.apache.iotdb.mpp.shuffle.IDataBlockManager} when
+   * upstream data comes.
    *
    * @param instanceID the fragment instance to be notified.
    * @param upstreamInstanceId the upstream instance id.
@@ -34,7 +35,8 @@ public interface IFragmentInstanceManager {
   void inputBlockAvailable(FragmentInstanceID instanceID, FragmentInstanceID upstreamInstanceId);
 
   /**
-   * the notifying interface for {@link DataBlockManager} when downstream data has been consumed.
+   * the notifying interface for {@link org.apache.iotdb.mpp.shuffle.IDataBlockManager} when
+   * downstream data has been consumed.
    *
    * @param instanceID the fragment instance to be notified.
    * @param downstreamInstanceId the downstream instance id.
diff --git a/server/src/main/java/org/apache/iotdb/mpp/shuffle/IDataBlockManager.java b/server/src/main/java/org/apache/iotdb/mpp/shuffle/IDataBlockManager.java
new file mode 100644
index 0000000..a05d04a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/mpp/shuffle/IDataBlockManager.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.mpp.shuffle;
+
+import org.apache.iotdb.mpp.common.ITSBlock;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceID;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+
+public interface IDataBlockManager {
+
+  /**
+   * Register a new fragment instance. The block manager will start looking for upstream data blocks
+   * and flushing data blocks generated to downstream fragment instances.
+   */
+  void registerFragmentInstance(FragmentInstanceTask task);
+
+  /**
+   * Deregister a fragment instance. The block manager will stop looking for upstream data blocks
+   * and release the input data blocks, but will keep flushing data blocks to downstream fragment
+   * instances until all the data blocks are sent. Once all the data blocks are sent, the output
+   * data blocks will be release.
+   *
+   * <p>This method should be called when a fragment instance finished in a normal state.
+   */
+  void deregisterFragmentInstance(FragmentInstanceTask task);
+
+  /**
+   * Deregister a fragment instance. The block manager will release all the related resources.
+   * Including data blocks that are not yet sent to downstream fragment instances.
+   *
+   * <p>This method should be called when a fragment instance finished in an abnormal state.
+   */
+  void forceDeregisterFragmentInstance(FragmentInstanceTask task);
+
+  /**
+   * Put a data block to the output buffer for downstream fragment instances. Will throw an {@link
+   * IllegalStateException} if the output buffer is full.
+   *
+   * <p>Once the block be put into the output buffer, the data block manager will notify downstream
+   * fragment instances that a new data block is available.
+   *
+   * @param instanceID ID of fragment instance that generates the block.
+   * @return If there are enough memory for the next block.
+   */
+  boolean putDataBlock(FragmentInstanceID instanceID, ITSBlock block);
+
+  /**
+   * Check if there are data blocks from the specified upstream fragment instance.
+   *
+   * @param instanceID ID of the upstream fragment instance.
+   * @return If there are available data blocks.
+   */
+  boolean hasDataBlock(FragmentInstanceID instanceID);
+
+  /**
+   * Get a data block from the input buffer of specified upstream fragment instance. Will throw an
+   * {@link IllegalStateException} if the input buffer is empty.
+   *
+   * @param instanceID ID of the upstream fragment instance.
+   * @return A data block.
+   */
+  ITSBlock getDataBlock(FragmentInstanceID instanceID);
+}

[iotdb] 06/06: Sync with master

Posted by er...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch new_mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1feea788164a3484a67101930ef513853f17e777
Author: ericpai <er...@hotmail.com>
AuthorDate: Thu Mar 17 10:41:32 2022 +0800

    Sync with master
---
 .../mpp/buffer}/IDataBlockManager.java             |  6 +--
 .../{ => db}/mpp/memory/LocalMemoryManager.java    |  2 +-
 .../iotdb/{ => db}/mpp/memory/MemoryPool.java      |  2 +-
 .../mpp/schedule}/ExecutionContext.java            |  4 +-
 .../mpp/schedule}/FragmentInstanceManager.java     | 14 +++----
 .../schedule}/FragmentInstanceTaskCallback.java    |  4 +-
 .../schedule}/FragmentInstanceTaskExecutor.java    |  6 +--
 .../schedule}/FragmentInstanceTimeoutSentinel.java |  8 ++--
 .../mpp/schedule}/IFragmentInstanceManager.java    | 11 +++---
 .../execution => db/mpp/schedule}/queue/ID.java    |  2 +-
 .../mpp/schedule}/queue/IDIndexedAccessible.java   |  2 +-
 .../mpp/schedule}/queue/IndexedBlockingQueue.java  |  2 +-
 .../mpp/schedule}/queue/L1PriorityQueue.java       |  2 +-
 .../mpp/schedule}/queue/L2PriorityQueue.java       |  2 +-
 .../mpp/schedule}/task/FragmentInstanceID.java     |  4 +-
 .../mpp/schedule}/task/FragmentInstanceTask.java   | 10 ++---
 .../schedule}/task/FragmentInstanceTaskStatus.java |  2 +-
 .../java/org/apache/iotdb/mpp/common/ITSBlock.java | 45 ----------------------
 .../apache/iotdb/mpp/common/TsBlockMetadata.java   | 22 -----------
 .../iotdb/{ => db}/mpp/memory/MemoryPoolTest.java  |  2 +-
 .../mpp/schedule}/queue/L1PriorityQueueTest.java   |  2 +-
 .../mpp/schedule}/queue/L2PriorityQueueTest.java   |  2 +-
 .../mpp/schedule}/queue/QueueElement.java          |  2 +-
 23 files changed, 45 insertions(+), 113 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/mpp/shuffle/IDataBlockManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/mpp/shuffle/IDataBlockManager.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
index a05d04a..391db95 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/shuffle/IDataBlockManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/buffer/IDataBlockManager.java
@@ -17,11 +17,11 @@
  * under the License.
  */
 
-package org.apache.iotdb.mpp.shuffle;
+package org.apache.iotdb.db.mpp.buffer;
 
+import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceID;
+import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
 import org.apache.iotdb.mpp.common.ITSBlock;
-import org.apache.iotdb.mpp.execution.task.FragmentInstanceID;
-import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
 
 public interface IDataBlockManager {
 
diff --git a/server/src/main/java/org/apache/iotdb/mpp/memory/LocalMemoryManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/memory/LocalMemoryManager.java
similarity index 97%
rename from server/src/main/java/org/apache/iotdb/mpp/memory/LocalMemoryManager.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/memory/LocalMemoryManager.java
index cc5305e..797075f 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/memory/LocalMemoryManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/memory/LocalMemoryManager.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.mpp.memory;
+package org.apache.iotdb.db.mpp.memory;
 
 /**
  * Manages memory of a data node. The memory is divided into two memory pools so that the memory for
diff --git a/server/src/main/java/org/apache/iotdb/mpp/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/db/mpp/memory/MemoryPool.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/mpp/memory/MemoryPool.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/memory/MemoryPool.java
index 29b7228..41a687d 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/memory/MemoryPool.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/memory/MemoryPool.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.mpp.memory;
+package org.apache.iotdb.db.mpp.memory;
 
 import org.apache.commons.lang3.Validate;
 
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/ExecutionContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/ExecutionContext.java
similarity index 89%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/ExecutionContext.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/ExecutionContext.java
index 7428b73..adf7874 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/ExecutionContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/ExecutionContext.java
@@ -16,9 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution;
+package org.apache.iotdb.db.mpp.schedule;
 
-import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
 
 /** The execution context of a {@link FragmentInstanceTask} */
 public class ExecutionContext {}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManager.java
similarity index 92%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManager.java
index 8917dd4..bf38396 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceManager.java
@@ -16,17 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution;
+package org.apache.iotdb.db.mpp.schedule;
 
 import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.mpp.schedule.queue.IndexedBlockingQueue;
+import org.apache.iotdb.db.mpp.schedule.queue.L1PriorityQueue;
+import org.apache.iotdb.db.mpp.schedule.queue.L2PriorityQueue;
+import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceID;
+import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
+import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskStatus;
 import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.ServiceType;
-import org.apache.iotdb.mpp.execution.queue.IndexedBlockingQueue;
-import org.apache.iotdb.mpp.execution.queue.L1PriorityQueue;
-import org.apache.iotdb.mpp.execution.queue.L2PriorityQueue;
-import org.apache.iotdb.mpp.execution.task.FragmentInstanceID;
-import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
-import org.apache.iotdb.mpp.execution.task.FragmentInstanceTaskStatus;
 
 import java.util.List;
 import java.util.Map;
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskCallback.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskCallback.java
similarity index 90%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskCallback.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskCallback.java
index 6eee1ad..c3ba2e7 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskCallback.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskCallback.java
@@ -16,9 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution;
+package org.apache.iotdb.db.mpp.schedule;
 
-import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
 
 /** A common interface for {@link FragmentInstanceTask} business logic callback */
 @FunctionalInterface
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
similarity index 90%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
index b35a6f7..2d3e606 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTaskExecutor.java
@@ -16,10 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution;
+package org.apache.iotdb.db.mpp.schedule;
 
-import org.apache.iotdb.mpp.execution.queue.IndexedBlockingQueue;
-import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+import org.apache.iotdb.db.mpp.schedule.queue.IndexedBlockingQueue;
+import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java
index 4171f26..75f9eba 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/FragmentInstanceTimeoutSentinel.java
@@ -16,11 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution;
+package org.apache.iotdb.db.mpp.schedule;
 
-import org.apache.iotdb.mpp.execution.queue.IndexedBlockingQueue;
-import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
-import org.apache.iotdb.mpp.execution.task.FragmentInstanceTaskStatus;
+import org.apache.iotdb.db.mpp.schedule.queue.IndexedBlockingQueue;
+import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTask;
+import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceTaskStatus;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceManager.java
similarity index 80%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceManager.java
index a27db32..98dc0c4 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/IFragmentInstanceManager.java
@@ -16,9 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution;
+package org.apache.iotdb.db.mpp.schedule;
 
-import org.apache.iotdb.mpp.execution.task.FragmentInstanceID;
+import org.apache.iotdb.db.mpp.buffer.IDataBlockManager;
+import org.apache.iotdb.db.mpp.schedule.task.FragmentInstanceID;
 
 /** the interface of fragment instance scheduling */
 public interface IFragmentInstanceManager {
@@ -26,8 +27,7 @@ public interface IFragmentInstanceManager {
   void submitFragmentInstance();
 
   /**
-   * the notifying interface for {@link org.apache.iotdb.mpp.shuffle.IDataBlockManager} when
-   * upstream data comes.
+   * the notifying interface for {@link IDataBlockManager} when upstream data comes.
    *
    * @param instanceID the fragment instance to be notified.
    * @param upstreamInstanceId the upstream instance id.
@@ -35,8 +35,7 @@ public interface IFragmentInstanceManager {
   void inputBlockAvailable(FragmentInstanceID instanceID, FragmentInstanceID upstreamInstanceId);
 
   /**
-   * the notifying interface for {@link org.apache.iotdb.mpp.shuffle.IDataBlockManager} when
-   * downstream data has been consumed.
+   * the notifying interface for {@link IDataBlockManager} when downstream data has been consumed.
    *
    * @param instanceID the fragment instance to be notified.
    */
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/ID.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/ID.java
similarity index 94%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/queue/ID.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/ID.java
index cc7d58f..940370f 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/ID.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/ID.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution.queue;
+package org.apache.iotdb.db.mpp.schedule.queue;
 
 /** A simple interface to indicate the id type */
 public interface ID {}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IDIndexedAccessible.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/IDIndexedAccessible.java
similarity index 95%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/queue/IDIndexedAccessible.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/IDIndexedAccessible.java
index 5ae4c96..86b0c8b 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IDIndexedAccessible.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/IDIndexedAccessible.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution.queue;
+package org.apache.iotdb.db.mpp.schedule.queue;
 
 /** A simple interface for id getter and setter */
 public interface IDIndexedAccessible {
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/IndexedBlockingQueue.java
similarity index 99%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/IndexedBlockingQueue.java
index 6ddc610..bb6751f 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/IndexedBlockingQueue.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution.queue;
+package org.apache.iotdb.db.mpp.schedule.queue;
 
 /**
  * The base class of a special kind of blocking queue, which has these characters:
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueue.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueue.java
index a9cad83..6aedb89 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueue.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution.queue;
+package org.apache.iotdb.db.mpp.schedule.queue;
 
 import java.util.Comparator;
 import java.util.SortedMap;
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueue.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueue.java
index c23e74a..cd3caf3 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueue.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution.queue;
+package org.apache.iotdb.db.mpp.schedule.queue;
 
 import java.util.Comparator;
 import java.util.SortedMap;
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceID.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceID.java
similarity index 95%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceID.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceID.java
index f52b5c3..dab6e0a 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceID.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceID.java
@@ -16,9 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution.task;
+package org.apache.iotdb.db.mpp.schedule.task;
 
-import org.apache.iotdb.mpp.execution.queue.ID;
+import org.apache.iotdb.db.mpp.schedule.queue.ID;
 
 import org.jetbrains.annotations.NotNull;
 
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
similarity index 93%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
index 1836c74..3c26f33 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTask.java
@@ -16,12 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution.task;
+package org.apache.iotdb.db.mpp.schedule.task;
 
-import org.apache.iotdb.mpp.execution.ExecutionContext;
-import org.apache.iotdb.mpp.execution.FragmentInstanceTaskExecutor;
-import org.apache.iotdb.mpp.execution.queue.ID;
-import org.apache.iotdb.mpp.execution.queue.IDIndexedAccessible;
+import org.apache.iotdb.db.mpp.schedule.ExecutionContext;
+import org.apache.iotdb.db.mpp.schedule.FragmentInstanceTaskExecutor;
+import org.apache.iotdb.db.mpp.schedule.queue.ID;
+import org.apache.iotdb.db.mpp.schedule.queue.IDIndexedAccessible;
 
 import java.util.Comparator;
 import java.util.concurrent.locks.Lock;
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTaskStatus.java b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTaskStatus.java
similarity index 96%
rename from server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTaskStatus.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTaskStatus.java
index 5ee9fb3..f50dc4d 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTaskStatus.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/schedule/task/FragmentInstanceTaskStatus.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution.task;
+package org.apache.iotdb.db.mpp.schedule.task;
 
 /** the status enum of {@link FragmentInstanceTask} */
 public enum FragmentInstanceTaskStatus {
diff --git a/server/src/main/java/org/apache/iotdb/mpp/common/ITSBlock.java b/server/src/main/java/org/apache/iotdb/mpp/common/ITSBlock.java
deleted file mode 100644
index 449e0fc..0000000
--- a/server/src/main/java/org/apache/iotdb/mpp/common/ITSBlock.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.mpp.common;
-
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-
-/**
- * Intermediate result for most of ExecOperators. The TsBlock contains data from one or more columns
- * and constructs them as a row based view The columns can be series, aggregation result for one
- * series or scalar value (such as deviceName). The TsBlock also contains the metadata to describe
- * the columns.
- */
-public class ITSBlock {
-
-  private TsBlockMetadata metadata;
-
-  public boolean hasNext() {
-    return false;
-  }
-
-  public RowRecord getNext() {
-    return null;
-  }
-
-  public TsBlockMetadata getMetadata() {
-    return metadata;
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/common/TsBlockMetadata.java b/server/src/main/java/org/apache/iotdb/mpp/common/TsBlockMetadata.java
deleted file mode 100644
index ed7680f..0000000
--- a/server/src/main/java/org/apache/iotdb/mpp/common/TsBlockMetadata.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.mpp.common;
-
-public class TsBlockMetadata {}
diff --git a/server/src/test/java/org/apache/iotdb/mpp/memory/MemoryPoolTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/memory/MemoryPoolTest.java
similarity index 99%
rename from server/src/test/java/org/apache/iotdb/mpp/memory/MemoryPoolTest.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/memory/MemoryPoolTest.java
index cb76b93..8f21c7d 100644
--- a/server/src/test/java/org/apache/iotdb/mpp/memory/MemoryPoolTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/memory/MemoryPoolTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.mpp.memory;
+package org.apache.iotdb.db.mpp.memory;
 
 import org.junit.Assert;
 import org.junit.Before;
diff --git a/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueueTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueueTest.java
similarity index 98%
rename from server/src/test/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueueTest.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueueTest.java
index 3809d4a..fba87fd 100644
--- a/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueueTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L1PriorityQueueTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution.queue;
+package org.apache.iotdb.db.mpp.schedule.queue;
 
 import org.junit.Assert;
 import org.junit.Test;
diff --git a/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueueTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueueTest.java
similarity index 98%
rename from server/src/test/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueueTest.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueueTest.java
index 5d34bb7..9f161e1 100644
--- a/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueueTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/L2PriorityQueueTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution.queue;
+package org.apache.iotdb.db.mpp.schedule.queue;
 
 import org.junit.Assert;
 import org.junit.Test;
diff --git a/server/src/test/java/org/apache/iotdb/mpp/execution/queue/QueueElement.java b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/QueueElement.java
similarity index 97%
rename from server/src/test/java/org/apache/iotdb/mpp/execution/queue/QueueElement.java
rename to server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/QueueElement.java
index 4aed3a5..844c91d 100644
--- a/server/src/test/java/org/apache/iotdb/mpp/execution/queue/QueueElement.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/schedule/queue/QueueElement.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.mpp.execution.queue;
+package org.apache.iotdb.db.mpp.schedule.queue;
 
 public class QueueElement implements IDIndexedAccessible {
   private QueueElementID id;

[iotdb] 05/06: Fragment schedule develop

Posted by er...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ericpai pushed a commit to branch new_mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0ef2b4237442938785266f47c3153d1b2b7e2db9
Author: ericpai <er...@hotmail.com>
AuthorDate: Tue Mar 15 17:43:12 2022 +0800

    Fragment schedule develop
---
 .../mpp/execution/FragmentInstanceManager.java     |  89 ++++++++++++++-
 ...utor.java => FragmentInstanceTaskCallback.java} |  32 +-----
 .../execution/FragmentInstanceTaskExecutor.java    |   9 +-
 .../execution/FragmentInstanceTimeoutSentinel.java |  43 ++++++-
 .../mpp/execution/IFragmentInstanceManager.java    |   3 +-
 .../mpp/execution/queue/IndexedBlockingQueue.java  |  45 +++++++-
 .../iotdb/mpp/execution/queue/L1PriorityQueue.java |  12 +-
 .../iotdb/mpp/execution/queue/L2PriorityQueue.java |  17 ++-
 .../mpp/execution/task/FragmentInstanceTask.java   |  13 +++
 .../mpp/execution/queue/L1PriorityQueueTest.java   | 115 +++++++++++++++++++
 .../mpp/execution/queue/L2PriorityQueueTest.java   | 124 +++++++++++++++++++++
 .../iotdb/mpp/execution/queue/QueueElement.java    |  80 +++++++++++++
 12 files changed, 530 insertions(+), 52 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java
index 4c7c157..8917dd4 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceManager.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.mpp.execution.queue.L1PriorityQueue;
 import org.apache.iotdb.mpp.execution.queue.L2PriorityQueue;
 import org.apache.iotdb.mpp.execution.task.FragmentInstanceID;
 import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceTaskStatus;
 
 import java.util.List;
 import java.util.Map;
@@ -40,7 +41,7 @@ public class FragmentInstanceManager implements IFragmentInstanceManager, IServi
 
   private final IndexedBlockingQueue<FragmentInstanceTask> readyQueue;
   private final IndexedBlockingQueue<FragmentInstanceTask> timeoutQueue;
-  private final Map<String, List<FragmentInstanceID>> queryMap;
+  private final Map<String, List<FragmentInstanceTask>> queryMap;
 
   private static final int MAX_CAPACITY = 1000; // TODO: load from config files
   private static final int WORKER_THREAD_NUM = 4; // TODO: load from config files
@@ -65,7 +66,9 @@ public class FragmentInstanceManager implements IFragmentInstanceManager, IServi
     for (int i = 0; i < WORKER_THREAD_NUM; i++) {
       new FragmentInstanceTaskExecutor("Worker-Thread-" + i, workerGroups, readyQueue).start();
     }
-    new FragmentInstanceTimeoutSentinel("Sentinel-Thread", workerGroups, timeoutQueue).start();
+    new FragmentInstanceTimeoutSentinel(
+            "Sentinel-Thread", workerGroups, timeoutQueue, this::abortFragmentInstanceTask)
+        .start();
   }
 
   @Override
@@ -79,15 +82,89 @@ public class FragmentInstanceManager implements IFragmentInstanceManager, IServi
   }
 
   @Override
-  public void submitFragmentInstance() {}
+  public void submitFragmentInstance() {
+    // TODO: pass a real task
+    FragmentInstanceTask task = new FragmentInstanceTask();
+
+    task.lock();
+    try {
+      timeoutQueue.push(task);
+      // TODO: if no upstream deps, set to ready
+      task.setStatus(FragmentInstanceTaskStatus.READY);
+      readyQueue.push(task);
+    } finally {
+      task.unlock();
+    }
+  }
 
   @Override
   public void inputBlockAvailable(
-      FragmentInstanceID instanceID, FragmentInstanceID upstreamInstanceId) {}
+      FragmentInstanceID instanceID, FragmentInstanceID upstreamInstanceId) {
+    FragmentInstanceTask task = timeoutQueue.get(instanceID);
+    if (task == null) {
+      return;
+    }
+    task.lock();
+    try {
+      if (task.getStatus() != FragmentInstanceTaskStatus.BLOCKED) {
+        return;
+      }
+      task.inputReady(instanceID);
+      if (task.getStatus() == FragmentInstanceTaskStatus.READY) {
+        readyQueue.push(task);
+      }
+    } finally {
+      task.unlock();
+    }
+  }
 
   @Override
-  public void outputBlockAvailable(
-      FragmentInstanceID instanceID, FragmentInstanceID downstreamInstanceId) {}
+  public void outputBlockAvailable(FragmentInstanceID instanceID) {
+    FragmentInstanceTask task = timeoutQueue.get(instanceID);
+    if (task == null) {
+      return;
+    }
+    task.lock();
+    try {
+      if (task.getStatus() != FragmentInstanceTaskStatus.BLOCKED) {
+        return;
+      }
+      task.outputReady();
+      if (task.getStatus() == FragmentInstanceTaskStatus.READY) {
+        readyQueue.push(task);
+      }
+    } finally {
+      task.unlock();
+    }
+  }
+
+  /** abort a {@link FragmentInstanceTask} */
+  void abortFragmentInstanceTask(FragmentInstanceTask task) {
+    List<FragmentInstanceTask> queryRelatedTasks = queryMap.remove(task.getId().getQueryId());
+    clearFragmentInstanceTask(task);
+    if (queryRelatedTasks != null) {
+      // if queryRelatedTask is not null, it means that the clean request comes from this node, not
+      // coordinator.
+      // TODO: tell coordinator
+      for (FragmentInstanceTask otherTask : queryRelatedTasks) {
+        clearFragmentInstanceTask(otherTask);
+      }
+    }
+    // TODO: call LocalMemoryManager to release resources
+  }
+
+  private void clearFragmentInstanceTask(FragmentInstanceTask task) {
+    task.lock();
+    try {
+      if (task.getStatus() != FragmentInstanceTaskStatus.FINISHED) {
+        task.setStatus(FragmentInstanceTaskStatus.ABORTED);
+      }
+      readyQueue.remove(task.getId());
+      timeoutQueue.remove(task.getId());
+    } finally {
+      task.unlock();
+    }
+  }
 
   @Override
   public void abortQuery(String queryId) {}
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskCallback.java
similarity index 52%
copy from server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
copy to server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskCallback.java
index 5c704db..6eee1ad 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskCallback.java
@@ -18,34 +18,10 @@
  */
 package org.apache.iotdb.mpp.execution;
 
-import org.apache.iotdb.mpp.execution.queue.IndexedBlockingQueue;
 import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** the worker thread of {@link FragmentInstanceTask} */
-public class FragmentInstanceTaskExecutor extends Thread {
-
-  private static final Logger logger = LoggerFactory.getLogger(FragmentInstanceTaskExecutor.class);
-
-  private final IndexedBlockingQueue<FragmentInstanceTask> queue;
-
-  public FragmentInstanceTaskExecutor(
-      String workerId, ThreadGroup tg, IndexedBlockingQueue<FragmentInstanceTask> queue) {
-    super(tg, workerId);
-    this.queue = queue;
-  }
-
-  @Override
-  public void run() {
-    try {
-      while (true) {
-        FragmentInstanceTask next = queue.poll();
-        // do logic here
-      }
-    } catch (InterruptedException e) {
-      logger.info("{} is interrupted.", this.getName());
-    }
-  }
+/** A common interface for {@link FragmentInstanceTask} business logic callback */
+@FunctionalInterface
+public interface FragmentInstanceTaskCallback {
+  void call(FragmentInstanceTask task) throws Exception;
 }
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
index 5c704db..b35a6f7 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTaskExecutor.java
@@ -39,13 +39,14 @@ public class FragmentInstanceTaskExecutor extends Thread {
 
   @Override
   public void run() {
-    try {
-      while (true) {
+    while (true) {
+      try {
         FragmentInstanceTask next = queue.poll();
         // do logic here
+      } catch (InterruptedException e) {
+        logger.info("{} is interrupted.", this.getName());
+        break;
       }
-    } catch (InterruptedException e) {
-      logger.info("{} is interrupted.", this.getName());
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java
index 7b352ba..4171f26 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/FragmentInstanceTimeoutSentinel.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.mpp.execution;
 
 import org.apache.iotdb.mpp.execution.queue.IndexedBlockingQueue;
 import org.apache.iotdb.mpp.execution.task.FragmentInstanceTask;
+import org.apache.iotdb.mpp.execution.task.FragmentInstanceTaskStatus;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,22 +32,52 @@ public class FragmentInstanceTimeoutSentinel extends Thread {
       LoggerFactory.getLogger(FragmentInstanceTimeoutSentinel.class);
 
   private final IndexedBlockingQueue<FragmentInstanceTask> queue;
+  private final FragmentInstanceTaskCallback timeoutCallback;
+  // the check interval in milliseconds if the queue head remains the same.
+  private static final int CHECK_INTERVAL = 100;
 
   public FragmentInstanceTimeoutSentinel(
-      String workerId, ThreadGroup tg, IndexedBlockingQueue<FragmentInstanceTask> queue) {
+      String workerId,
+      ThreadGroup tg,
+      IndexedBlockingQueue<FragmentInstanceTask> queue,
+      FragmentInstanceTaskCallback timeoutCallback) {
     super(tg, workerId);
     this.queue = queue;
+    this.timeoutCallback = timeoutCallback;
   }
 
   @Override
   public void run() {
-    try {
-      while (true) {
+    while (true) {
+      try {
         FragmentInstanceTask next = queue.poll();
-        // do logic here
+        next.lock();
+        try {
+          // if this task is already in an end state, it means that the resource releasing will be
+          // handled by other threads, we don't care anymore.
+          if (next.isEndState()) {
+            continue;
+          }
+          // if this task is not in end state and not timeout, we should push it back to the queue.
+          if (next.getDDL() > System.currentTimeMillis()) {
+            queue.push(next);
+            Thread.sleep(CHECK_INTERVAL);
+            continue;
+          }
+          next.setStatus(FragmentInstanceTaskStatus.ABORTED);
+        } finally {
+          next.unlock();
+        }
+        try {
+          // Or we should do something to abort
+          timeoutCallback.call(next);
+        } catch (Exception e) {
+          logger.error("Abort instance " + next.getId() + " failed", e);
+        }
+      } catch (InterruptedException e) {
+        logger.info("{} is interrupted.", this.getName());
+        break;
       }
-    } catch (InterruptedException e) {
-      logger.info("{} is interrupted.", this.getName());
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java b/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
index 3bda549..a27db32 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/IFragmentInstanceManager.java
@@ -39,9 +39,8 @@ public interface IFragmentInstanceManager {
    * downstream data has been consumed.
    *
    * @param instanceID the fragment instance to be notified.
-   * @param downstreamInstanceId the downstream instance id.
    */
-  void outputBlockAvailable(FragmentInstanceID instanceID, FragmentInstanceID downstreamInstanceId);
+  void outputBlockAvailable(FragmentInstanceID instanceID);
 
   /**
    * abort all the instances in this query
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java
index bcabb5c..6ddc610 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/IndexedBlockingQueue.java
@@ -29,7 +29,9 @@ package org.apache.iotdb.mpp.execution.queue;
  * <p>3. Can push a non-null element to queue. When the queue is beyond the max size, an exception
  * will be thrown.
  *
- * <p>4. Can remove an element by a long type id.
+ * <p>4. Can remove an element by a type of {@link ID}.
+ *
+ * <p>5. Each element has the different ID.
  */
 public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
 
@@ -79,15 +81,17 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
     if (element == null) {
       throw new NullPointerException("pushed element is null");
     }
-    if (size + 1 > MAX_CAPACITY) {
+    int sizeDelta = contains(element) ? 0 : 1;
+    if (size + sizeDelta > MAX_CAPACITY) {
       throw new IllegalStateException("the queue is full");
     }
     pushToQueue(element);
+    size += sizeDelta;
     this.notifyAll();
   }
 
   /**
-   * Remove and return the element by a long id. It returns null if it doesn't exist.
+   * Remove and return the element by id. It returns null if it doesn't exist.
    *
    * @param id the id of the element to be removed.
    * @return the removed element.
@@ -103,6 +107,17 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
   }
 
   /**
+   * Get the element by id. It returns null if it doesn't exist.
+   *
+   * @param id the id of the element.
+   * @return the removed element.
+   */
+  public synchronized E get(ID id) {
+    queryHolder.setId(id);
+    return get(queryHolder);
+  }
+
+  /**
    * Get the current queue size.
    *
    * @return the current queue size.
@@ -139,10 +154,32 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
   protected abstract void pushToQueue(E element);
 
   /**
-   * Remove and return the element by a long id. It returns null if it doesn't exist.
+   * Remove and return the element by its ID. It returns null if it doesn't exist.
+   *
+   * <p>This implementation needn't be thread-safe.
    *
    * @param element the element to be removed.
    * @return the removed element.
    */
   protected abstract E remove(E element);
+
+  /**
+   * Check whether an element with the same ID exists.
+   *
+   * <p>This implementation needn't be thread-safe.
+   *
+   * @param element the element to be checked.
+   * @return true if an element with the same ID exists, otherwise false.
+   */
+  protected abstract boolean contains(E element);
+
+  /**
+   * Return the element with the same id of the input, null if it doesn't exist.
+   *
+   * <p>This implementation needn't be thread-safe.
+   *
+   * @param element the element to be queried.
+   * @return the element with the same id in the queue. Null if it doesn't exist.
+   */
+  protected abstract E get(E element);
 }
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java
index a5ccf14..a9cad83 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueue.java
@@ -28,10 +28,10 @@ import java.util.TreeMap;
  * <p>The time complexity of operations are:
  *
  * <ul>
- *   <li><b>{@link #size()}: </b> O(1).
  *   <li><b>{@link #remove(IDIndexedAccessible)} ()}: </b> O(logN).
  *   <li><b>{@link #push(IDIndexedAccessible)}: </b> O(logN).
  *   <li><b>{@link #poll()}: </b> O(logN).
+ *   <li><b>{@link #get(ID)}}: </b> O(1).
  * </ul>
  */
 public class L1PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlockingQueue<E> {
@@ -73,4 +73,14 @@ public class L1PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlock
   protected E remove(E element) {
     return elements.remove(element);
   }
+
+  @Override
+  protected boolean contains(E element) {
+    return elements.containsKey(element);
+  }
+
+  @Override
+  protected E get(E element) {
+    return elements.get(element);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java
index 3b8a9e6..c23e74a 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueue.java
@@ -30,10 +30,10 @@ import java.util.TreeMap;
  * <p>The time complexity of operations are:
  *
  * <ul>
- *   <li><b>{@link #size()}: </b> O(1).
  *   <li><b>{@link #remove(IDIndexedAccessible)} ()}: </b> O(logN).
  *   <li><b>{@link #push(IDIndexedAccessible)}: </b> O(logN).
  *   <li><b>{@link #poll()}: </b> O(logN).
+ *   <li><b>{@link #get(ID)}}: </b> O(1).
  * </ul>
  */
 public class L2PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlockingQueue<E> {
@@ -75,6 +75,7 @@ public class L2PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlock
 
   @Override
   protected void pushToQueue(E element) {
+    workingElements.remove(element);
     idleElements.put(element, element);
   }
 
@@ -86,4 +87,18 @@ public class L2PriorityQueue<E extends IDIndexedAccessible> extends IndexedBlock
     }
     return e;
   }
+
+  @Override
+  protected boolean contains(E element) {
+    return workingElements.containsKey(element) || idleElements.containsKey(element);
+  }
+
+  @Override
+  protected E get(E element) {
+    E e = workingElements.get(element);
+    if (e != null) {
+      return e;
+    }
+    return idleElements.get(element);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java
index 7bd7205..1836c74 100644
--- a/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java
+++ b/server/src/main/java/org/apache/iotdb/mpp/execution/task/FragmentInstanceTask.java
@@ -70,6 +70,19 @@ public class FragmentInstanceTask implements IDIndexedAccessible {
     return status;
   }
 
+  public boolean isEndState() {
+    return status == FragmentInstanceTaskStatus.ABORTED
+        || status == FragmentInstanceTaskStatus.FINISHED;
+  }
+
+  public void inputReady(FragmentInstanceID inputId) {
+    throw new UnsupportedOperationException("unsupported");
+  }
+
+  public void outputReady() {
+    throw new UnsupportedOperationException("unsupported");
+  }
+
   public void setStatus(FragmentInstanceTaskStatus status) {
     this.status = status;
   }
diff --git a/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueueTest.java b/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueueTest.java
new file mode 100644
index 0000000..3809d4a
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L1PriorityQueueTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class L1PriorityQueueTest {
+
+  @Test
+  public void testPollBlocked() throws InterruptedException {
+    IndexedBlockingQueue<QueueElement> queue =
+        new L1PriorityQueue<>(
+            10,
+            (o1, o2) -> {
+              if (o1.equals(o2)) {
+                return 0;
+              }
+              return Integer.compare(o1.getValue(), o2.getValue());
+            },
+            new QueueElement(new QueueElement.QueueElementID(0), 0));
+    List<QueueElement> res = new ArrayList<>();
+    Thread t1 =
+        new Thread(
+            () -> {
+              try {
+                QueueElement e = queue.poll();
+                res.add(e);
+              } catch (InterruptedException e) {
+                e.printStackTrace();
+                Assert.fail();
+              }
+            });
+    t1.start();
+    Thread.sleep(10);
+    Assert.assertEquals(Thread.State.WAITING, t1.getState());
+    QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(1), 1);
+    queue.push(e2);
+    Thread.sleep(10);
+    Assert.assertEquals(Thread.State.TERMINATED, t1.getState());
+    Assert.assertEquals(1, res.size());
+    Assert.assertEquals(e2.getId().toString(), res.get(0).getId().toString());
+  }
+
+  @Test
+  public void testPushExceedCapacity() {
+    IndexedBlockingQueue<QueueElement> queue =
+        new L1PriorityQueue<>(
+            1,
+            (o1, o2) -> {
+              if (o1.equals(o2)) {
+                return 0;
+              }
+              return Integer.compare(o1.getValue(), o2.getValue());
+            },
+            new QueueElement(new QueueElement.QueueElementID(0), 0));
+    QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(1), 1);
+    queue.push(e2);
+    QueueElement e2e = new QueueElement(new QueueElement.QueueElementID(1), 10);
+    queue.push(e2e);
+    QueueElement e3 = new QueueElement(new QueueElement.QueueElementID(2), 2);
+    try {
+      queue.push(e3);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      // ignore;
+    }
+  }
+
+  @Test
+  public void testPushAndPoll() throws InterruptedException {
+    IndexedBlockingQueue<QueueElement> queue =
+        new L1PriorityQueue<>(
+            10,
+            (o1, o2) -> {
+              if (o1.equals(o2)) {
+                return 0;
+              }
+              return Integer.compare(o1.getValue(), o2.getValue());
+            },
+            new QueueElement(new QueueElement.QueueElementID(0), 0));
+    QueueElement e1 = new QueueElement(new QueueElement.QueueElementID(1), 10);
+    queue.push(e1);
+    QueueElement e1e = new QueueElement(new QueueElement.QueueElementID(1), 20);
+    queue.push(e1e);
+    // only 1 element with the same id can be put into
+    Assert.assertEquals(1, queue.size());
+    QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(2), 5);
+    queue.push(e2);
+    Assert.assertEquals(2, queue.size());
+    Assert.assertEquals(e2.getId().toString(), queue.poll().getId().toString());
+    Assert.assertEquals(1, queue.size());
+    Assert.assertEquals(e1e.getId().toString(), queue.poll().getId().toString());
+    Assert.assertEquals(0, queue.size());
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueueTest.java b/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueueTest.java
new file mode 100644
index 0000000..5d34bb7
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/mpp/execution/queue/L2PriorityQueueTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class L2PriorityQueueTest {
+  @Test
+  public void testPollBlocked() throws InterruptedException {
+    IndexedBlockingQueue<QueueElement> queue =
+        new L2PriorityQueue<>(
+            10,
+            (o1, o2) -> {
+              if (o1.equals(o2)) {
+                return 0;
+              }
+              return Integer.compare(o1.getValue(), o2.getValue());
+            },
+            new QueueElement(new QueueElement.QueueElementID(0), 0));
+    List<QueueElement> res = new ArrayList<>();
+    Thread t1 =
+        new Thread(
+            () -> {
+              try {
+                QueueElement e = queue.poll();
+                res.add(e);
+              } catch (InterruptedException e) {
+                e.printStackTrace();
+                Assert.fail();
+              }
+            });
+    t1.start();
+    Thread.sleep(10);
+    Assert.assertEquals(Thread.State.WAITING, t1.getState());
+    QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(1), 1);
+    queue.push(e2);
+    Thread.sleep(10);
+    Assert.assertEquals(Thread.State.TERMINATED, t1.getState());
+    Assert.assertEquals(1, res.size());
+    Assert.assertEquals(e2.getId().toString(), res.get(0).getId().toString());
+  }
+
+  @Test
+  public void testPushExceedCapacity() {
+    IndexedBlockingQueue<QueueElement> queue =
+        new L2PriorityQueue<>(
+            1,
+            (o1, o2) -> {
+              if (o1.equals(o2)) {
+                return 0;
+              }
+              return Integer.compare(o1.getValue(), o2.getValue());
+            },
+            new QueueElement(new QueueElement.QueueElementID(0), 0));
+    QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(1), 1);
+    queue.push(e2);
+    QueueElement e2e = new QueueElement(new QueueElement.QueueElementID(1), 10);
+    queue.push(e2e);
+    QueueElement e3 = new QueueElement(new QueueElement.QueueElementID(2), 2);
+    try {
+      queue.push(e3);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      // ignore;
+    }
+  }
+
+  @Test
+  public void testPushAndPoll() throws InterruptedException {
+    IndexedBlockingQueue<QueueElement> queue =
+        new L2PriorityQueue<>(
+            10,
+            (o1, o2) -> {
+              if (o1.equals(o2)) {
+                return 0;
+              }
+              int res = Integer.compare(o1.getValue(), o2.getValue());
+              if (res != 0) {
+                return res;
+              }
+              return String.CASE_INSENSITIVE_ORDER.compare(
+                  o1.getId().toString(), o2.getId().toString());
+            },
+            new QueueElement(new QueueElement.QueueElementID(0), 0));
+    QueueElement e1 = new QueueElement(new QueueElement.QueueElementID(1), 10);
+    queue.push(e1);
+    QueueElement e1e = new QueueElement(new QueueElement.QueueElementID(1), 20);
+    queue.push(e1e);
+    // only 1 element with the same id can be put into
+    Assert.assertEquals(1, queue.size());
+    QueueElement e2 = new QueueElement(new QueueElement.QueueElementID(2), 5);
+    queue.push(e2);
+    Assert.assertEquals(2, queue.size());
+    Assert.assertEquals(e2.getId().toString(), queue.poll().getId().toString());
+    Assert.assertEquals(1, queue.size());
+    // L1: 5 -> 20 L2: 10
+    QueueElement e3 = new QueueElement(new QueueElement.QueueElementID(3), 10);
+    queue.push(e3);
+    Assert.assertEquals(e1e.getId().toString(), queue.poll().getId().toString());
+    Assert.assertEquals(1, queue.size());
+    Assert.assertEquals(e3.getId().toString(), queue.poll().getId().toString());
+    Assert.assertEquals(0, queue.size());
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/mpp/execution/queue/QueueElement.java b/server/src/test/java/org/apache/iotdb/mpp/execution/queue/QueueElement.java
new file mode 100644
index 0000000..4aed3a5
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/mpp/execution/queue/QueueElement.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.mpp.execution.queue;
+
+public class QueueElement implements IDIndexedAccessible {
+  private QueueElementID id;
+  private final int value;
+
+  public QueueElement(QueueElementID id, int value) {
+    this.id = id;
+    this.value = value;
+  }
+
+  public int getValue() {
+    return this.value;
+  }
+
+  @Override
+  public ID getId() {
+    return id;
+  }
+
+  @Override
+  public void setId(ID id) {
+    this.id = (QueueElementID) id;
+  }
+
+  @Override
+  public int hashCode() {
+    return id.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return o instanceof QueueElement && ((QueueElement) o).getId().equals(this.id);
+  }
+
+  public static class QueueElementID implements ID {
+    private final int id;
+
+    public QueueElementID(int id) {
+      this.id = id;
+    }
+
+    public int getId() {
+      return this.id;
+    }
+
+    @Override
+    public int hashCode() {
+      return Integer.hashCode(id);
+    }
+
+    @Override
+    public String toString() {
+      return String.valueOf(id);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      return o instanceof QueueElementID && ((QueueElementID) o).getId() == this.id;
+    }
+  }
+}