You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/04/21 10:05:27 UTC

[iotdb] branch standaloneMPPWrite created (now 5714d1ea93)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a change to branch standaloneMPPWrite
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 5714d1ea93 merge master

This branch includes the following new commits:

     new 92a1e2669c init
     new 5714d1ea93 merge master

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 02/02: merge master

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch standaloneMPPWrite
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5714d1ea93548e7675443706d4ab21b0f85cc0a0
Merge: 92a1e2669c 2df96fee1e
Author: HTHou <hh...@outlook.com>
AuthorDate: Thu Apr 21 18:05:00 2022 +0800

    merge master

 checkstyle.xml                                     |   7 +-
 client-cpp/pom.xml                                 |   7 +-
 client-py/pom.xml                                  |   8 +
 .../iotdb/cluster/log/applier/DataLogApplier.java  |   6 +-
 .../log/manage/PartitionedSnapshotLogManager.java  |   2 +-
 .../iotdb/cluster/metadata/CSchemaProcessor.java   |  43 +-
 .../apache/iotdb/cluster/metadata/MetaPuller.java  |   3 +-
 .../iotdb/cluster/query/ClusterPlanRouter.java     |   5 +-
 .../iotdb/cluster/query/LocalQueryExecutor.java    |   8 +-
 .../server/handlers/caller/LogCatchUpHandler.java  |   4 +-
 .../FilePartitionedSnapshotLogManagerTest.java     |   2 +
 .../cluster/server/member/DataGroupMemberTest.java |   2 +
 .../cluster/server/member/MetaGroupMemberTest.java |   5 +-
 confignode/pom.xml                                 |  10 -
 .../resources/conf/iotdb-confignode.properties     |  18 +-
 .../iotdb/confignode/cli/TemporaryClient.java      |  54 +-
 .../iotdb/confignode/conf/ConfigNodeConf.java      |  59 +-
 .../confignode/conf/ConfigNodeDescriptor.java      |  22 +-
 .../response/DataNodeConfigurationDataSet.java     |   6 +-
 ...oDataSet.java => DataNodeLocationsDataSet.java} |  34 +-
 .../consensus/response/DataPartitionDataSet.java   |  53 +-
 .../consensus/response/SchemaPartitionDataSet.java |  31 +-
 .../response/StorageGroupSchemaDataSet.java        |  11 +-
 .../statemachine/PartitionRegionStateMachine.java  |  17 +
 .../iotdb/confignode/manager/ConfigManager.java    |  10 +-
 .../iotdb/confignode/manager/ConsensusManager.java |   6 +-
 .../iotdb/confignode/manager/DataNodeManager.java  |  25 +-
 .../iotdb/confignode/manager/PartitionManager.java |  48 +-
 .../iotdb/confignode/manager/RegionManager.java    |  85 +-
 .../confignode/partition/StorageGroupSchema.java   | 121 ---
 .../persistence/DataNodeInfoPersistence.java       |  39 +-
 .../persistence/PartitionInfoPersistence.java      |  22 +-
 .../persistence/RegionInfoPersistence.java         |  54 +-
 .../physical/crud/CreateDataPartitionPlan.java     |  44 +-
 .../physical/crud/CreateRegionsPlan.java           |  21 +-
 .../physical/crud/CreateSchemaPartitionPlan.java   |  27 +-
 .../crud/GetOrCreateDataPartitionPlan.java         |  40 +-
 .../crud/GetOrCreateSchemaPartitionPlan.java       |  22 +-
 .../iotdb/confignode/physical/sys/AuthorPlan.java  |  24 +-
 .../physical/sys/RegisterDataNodePlan.java         |  32 +-
 .../physical/sys/SetStorageGroupPlan.java          |  17 +-
 .../server/ConfigNodeRPCServerProcessor.java       |  50 +-
 .../iotdb/confignode/cli/TemporaryClientDemo.java  |   4 +-
 .../confignode/consensus/RatisConsensusDemo.java   | 151 ----
 .../manager/ConfigManagerManualTest.java           | 133 ---
 ...serializeUT.java => PhysicalPlanSerDeTest.java} | 110 ++-
 .../server/ConfigNodeRPCServerProcessorTest.java   | 209 ++---
 consensus/pom.xml                                  |   5 -
 .../apache/iotdb/consensus/ConsensusFactory.java   |   7 +-
 .../org/apache/iotdb/consensus/common/Peer.java    |   8 +-
 .../iotdb/consensus/common/SnapshotMeta.java       |  47 +-
 .../iotdb/consensus/ratis/RatisConsensus.java      |   7 +-
 .../iotdb/consensus/ratis/RequestMessage.java      |   2 +-
 .../org/apache/iotdb/consensus/ratis/Utils.java    |  16 +-
 .../consensus/standalone/StandAloneConsensus.java  |   6 +-
 .../consensus/standalone/StandAloneServerImpl.java |  18 +
 .../consensus/statemachine/EmptyStateMachine.java  |  18 +
 .../consensus/statemachine/IStateMachine.java      |  49 +
 .../iotdb/consensus/ratis/RatisConsensusTest.java  |  23 +-
 .../standalone/StandAloneConsensusTest.java        |  46 +-
 docs/Development/ContributeGuide.md                |  16 +
 .../Data-Modeling/SchemaRegion-rocksdb.md          |  93 ++
 .../Ecosystem Integration/Grafana Plugin.md        | 141 ++-
 docs/zh/Development/ContributeGuide.md             |  17 +
 .../Data-Modeling/SchemaRegion-rocksdb.md          |  93 ++
 .../Ecosystem Integration/Grafana Plugin.md        | 143 ++-
 .../iotdb/hadoop/tsfile/TSFMRReadExample.java      |   5 +-
 .../apache/iotdb/hadoop/tsfile/TSFInputFormat.java |  11 +-
 .../iotdb/hadoop/tsfile/record/HDFSTSRecord.java   |   8 +-
 .../org/apache/iotdb/hive/TSFHiveInputFormat.java  |   6 +-
 .../org/apache/iotdb/hive/TsFileDeserializer.java  |  10 +-
 .../org/apache/iotdb/influxdb/IoTDBInfluxDB.java   |   2 +-
 .../dropwizard/DropwizardMetricManager.java        |  25 +-
 .../iotdb/metrics/dropwizard/MetricName.java       |   6 +-
 .../reporter/DropwizardMetricsExporter.java        |  10 +-
 .../metrics/dropwizard/reporter/IoTDBReporter.java |  16 +-
 .../org/apache/iotdb/metrics/MetricManager.java    |   6 +-
 .../micrometer/MicrometerMetricManager.java        |  21 +-
 node-commons/pom.xml                               |  10 +
 .../iotdb/commons/cluster/DataNodeLocation.java    |  81 --
 .../org/apache/iotdb/commons/cluster/Endpoint.java |  92 --
 .../iotdb/commons/concurrent/ThreadName.java       |   3 +-
 .../iotdb/commons/consensus/ConsensusGroupId.java  |  44 +-
 .../iotdb/commons/consensus/DataRegionId.java      |  21 +-
 .../iotdb/commons/consensus/PartitionRegionId.java |  21 +-
 .../iotdb/commons/consensus/SchemaRegionId.java    |  21 +-
 .../iotdb/commons/partition/DataPartition.java     |  69 +-
 .../commons/partition/DataPartitionQueryParam.java |   8 +-
 .../apache/iotdb/commons/partition/Partition.java  |   3 +-
 .../iotdb/commons/partition/RegionReplicaSet.java  | 141 ---
 .../iotdb/commons/partition/SchemaPartition.java   |  31 +-
 .../iotdb/commons/partition/TimePartitionSlot.java |  64 --
 .../executor/SeriesPartitionExecutor.java          |   4 +-
 .../partition/executor/hash/APHashExecutor.java    |   6 +-
 .../partition/executor/hash/BKDRHashExecutor.java  |   6 +-
 .../partition/executor/hash/JSHashExecutor.java    |   6 +-
 .../partition/executor/hash/SDBMHashExecutor.java  |   6 +-
 .../apache/iotdb/commons/service/ServiceType.java  |   3 +-
 .../commons/utils/BasicStructureSerDeUtil.java     |   6 +-
 .../apache/iotdb/commons/utils/CommonUtils.java    |  12 +-
 .../commons/utils/ThriftCommonsSerDeUtils.java     | 144 +++
 .../commons/utils/ThriftConfigNodeSerDeUtils.java  |  87 ++
 .../apache/iotdb/commons/ConsensusGroupIdTest.java |  23 +-
 .../commons/utils/BasicStructureSerDeUtilTest.java |  18 +-
 .../commons/utils/ThriftCommonsSerDeUtilsTest.java | 116 +++
 .../utils/ThriftConfigNodeSerDeUtilsTest.java      |  67 ++
 pom.xml                                            |   4 +
 {consensus => procedure}/pom.xml                   |  70 +-
 .../iotdb/procedure/CompletedProcedureCleaner.java | 106 +++
 .../procedure/CompletedProcedureRetainer.java      |  18 +-
 .../apache/iotdb/procedure/InternalProcedure.java  |  61 ++
 .../java/org/apache/iotdb/procedure/Procedure.java | 891 +++++++++++++++++++
 .../apache/iotdb/procedure/ProcedureExecutor.java  | 985 +++++++++++++++++++++
 .../apache/iotdb/procedure/ProcedureLockState.java |  11 +-
 .../apache/iotdb/procedure/RootProcedureStack.java | 177 ++++
 .../iotdb/procedure/StateMachineProcedure.java     | 329 +++++++
 .../apache/iotdb/procedure/StoppableThread.java    |  36 +-
 .../iotdb/procedure/TimeoutExecutorThread.java     | 107 +++
 .../iotdb/procedure/conf/ProcedureNodeConfig.java  | 153 ++++
 .../conf/ProcedureNodeConfigDescriptor.java        | 144 +--
 .../procedure/conf/ProcedureNodeConstant.java      |  32 +-
 .../procedure/env/ClusterProcedureEnvironment.java |  12 +-
 .../exception/ProcedureAbortedException.java       |  12 +-
 .../procedure/exception/ProcedureException.java    |  30 +-
 .../exception/ProcedureSuspendedException.java     |  21 +-
 .../exception/ProcedureTimeoutException.java       |  11 +-
 .../exception/ProcedureYieldException.java         |  18 +-
 .../scheduler/AbstractProcedureScheduler.java      | 211 +++++
 .../procedure/scheduler/ProcedureScheduler.java    | 109 +++
 .../scheduler/SimpleProcedureScheduler.java        |  78 ++
 .../iotdb/procedure/service/ProcedureNode.java     |  98 ++
 .../procedure/service/ProcedureNodeMBean.java      |   9 +-
 .../iotdb/procedure/service/ProcedureServer.java   | 123 +++
 .../service/ProcedureServerCommandLine.java        |  52 ++
 .../service/ProcedureServerProcessor.java          |  77 ++
 .../procedure/service/ProcedureServiceHanlder.java |  46 +-
 .../iotdb/procedure/store/IProcedureStore.java     |  30 +-
 .../iotdb/procedure/store/ProcedureStore.java      | 195 ++++
 .../apache/iotdb/procedure/store/ProcedureWAL.java | 103 +++
 .../apache/iotdb/procedure/NoopProcedureStore.java |  49 +-
 .../org/apache/iotdb/procedure/TestLockRegime.java |  30 +-
 .../org/apache/iotdb/procedure/TestProcEnv.java    |  57 ++
 .../apache/iotdb/procedure/TestProcedureBase.java  |  83 ++
 .../iotdb/procedure/TestProcedureExecutor.java     | 111 +++
 .../apache/iotdb/procedure/TestSTMProcedure.java   |  57 ++
 .../iotdb/procedure/entity/IncProcedure.java       |  57 ++
 .../iotdb/procedure/entity/NoopProcedure.java      |  27 +-
 .../procedure/entity/SimpleLockProcedure.java      |  80 ++
 .../iotdb/procedure/entity/SimpleSTMProcedure.java |  97 ++
 .../iotdb/procedure/entity/SleepProcedure.java     |  31 +-
 .../iotdb/procedure/entity/StuckProcedure.java     |  59 ++
 .../iotdb/procedure/entity/StuckSTMProcedure.java  | 112 +++
 .../procedure/service/TestProcedureService.java    |  87 ++
 .../iotdb/procedure/store/TestProcedureStore.java  | 104 +++
 .../iotdb/procedure/util/ProcedureTestUtil.java    |  66 ++
 server/pom.xml                                     |  15 +-
 .../resources/conf/schema-rocksdb.properties       |  86 ++
 .../resources/tools/schema/SchemaFileSketcher.sh   |   2 +-
 .../assembly/resources/tools/schema/mLogParser.sh  |   2 +-
 .../apache/iotdb/db/client/ConfigNodeClient.java   |  23 +-
 .../apache/iotdb/db/consensus/ConsensusImpl.java   |   4 +-
 .../statemachine/DataRegionStateMachine.java       |  17 +
 .../statemachine/SchemaRegionStateMachine.java     |  22 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |   6 +-
 .../io/LocalTextModificationAccessor.java          |   7 +-
 .../apache/iotdb/db/metadata/LocalConfigNode.java  |  68 +-
 .../db/metadata/LocalSchemaPartitionTable.java     |  60 +-
 .../iotdb/db/metadata/LocalSchemaProcessor.java    |  39 -
 .../iotdb/db/metadata/mtree/IMTreeBelowSG.java     | 307 +++++++
 ...reeBelowSG.java => MTreeBelowSGCachedImpl.java} | 134 ++-
 ...reeBelowSG.java => MTreeBelowSGMemoryImpl.java} | 909 ++++++++-----------
 .../mtree/store/disk/schemafile/RecordUtils.java   |   6 +-
 .../mtree/store/disk/schemafile/SchemaFile.java    |   8 +-
 .../db/metadata/schemaregion/ISchemaRegion.java    | 247 +++++-
 .../db/metadata/schemaregion/SchemaEngine.java     |  64 +-
 ...hemaRegion.java => SchemaRegionMemoryImpl.java} | 600 +++++--------
 ...Region.java => SchemaRegionSchemaFileImpl.java} |  93 +-
 .../schemaregion/rocksdb/RSchemaConfLoader.java    | 182 ++++
 .../rocksdb/RSchemaReadWriteHandler.java           |  27 +-
 .../schemaregion/rocksdb/RSchemaRegion.java        | 207 ++---
 .../db/metadata/upgrade/MetadataUpgrader.java      |  41 +-
 .../SchemaExecutionVisitor.java}                   |  59 +-
 .../iotdb/db/mpp/buffer/DataBlockManager.java      |   6 +-
 .../mpp/buffer/DataBlockServiceClientFactory.java  |   6 +-
 .../iotdb/db/mpp/buffer/IDataBlockManager.java     |   6 +-
 .../iotdb/db/mpp/common/MPPQueryContext.java       |   8 +-
 .../iotdb/db/mpp/common/ResultNodeContext.java     |   8 +-
 .../iotdb/db/mpp/common/filter/InFilter.java       |   8 +-
 .../iotdb/db/mpp/common/header/DatasetHeader.java  |   7 +-
 .../apache/iotdb/db/mpp/execution/Coordinator.java |   8 +-
 .../iotdb/db/mpp/execution/QueryExecution.java     |   4 +-
 .../mpp/execution/config/AuthorizerConfigTask.java |   6 +-
 .../db/mpp/execution/config/ConfigExecution.java   |  39 +-
 ...SampleConfigTask.java => ConfigTaskResult.java} |  38 +-
 .../iotdb/db/mpp/execution/config/IConfigTask.java |   2 +-
 .../mpp/execution/config/SetStorageGroupTask.java  |  12 +-
 .../scheduler/AbstractFragInsStateTracker.java     |   4 +-
 .../mpp/execution/scheduler/ClusterScheduler.java  |   1 +
 .../scheduler/FixedRateFragInsStateTracker.java    |   5 +-
 .../scheduler/InternalServiceClientFactory.java    |   6 +-
 .../scheduler/SimpleFragInstanceDispatcher.java    |  12 +-
 .../execution/scheduler/SimpleQueryTerminator.java |  10 +-
 .../execution/scheduler/StandaloneScheduler.java   |  11 +-
 .../schedule/FragmentInstanceAbortedException.java |  28 +-
 .../db/mpp/schedule/FragmentInstanceScheduler.java |  13 +-
 .../mpp/schedule/FragmentInstanceTaskExecutor.java |   1 +
 .../schedule/FragmentInstanceTimeoutSentinel.java  |   1 +
 .../db/mpp/schedule/queue/L1PriorityQueue.java     |  37 +-
 .../db/mpp/schedule/queue/L2PriorityQueue.java     |  66 +-
 .../db/mpp/schedule/task/FragmentInstanceTask.java |  10 +
 .../mpp/schedule/task/FragmentInstanceTaskID.java  |  10 +-
 .../apache/iotdb/db/mpp/sql/analyze/Analysis.java  |   4 +-
 .../mpp/sql/analyze/ClusterPartitionFetcher.java   |  26 +-
 .../db/mpp/sql/analyze/ClusterSchemaFetcher.java   |   6 +-
 .../mpp/sql/analyze/FakePartitionFetcherImpl.java  | 114 ++-
 .../db/mpp/sql/analyze/FakeSchemaFetcherImpl.java  |   7 +-
 .../sql/analyze/StandalonePartitionFetcher.java    |  21 +-
 .../db/mpp/sql/parser/StatementGenerator.java      |  20 +-
 .../db/mpp/sql/planner/DistributionPlanner.java    |  28 +-
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  |  10 +-
 .../db/mpp/sql/planner/plan/FragmentInstance.java  |  33 +-
 .../db/mpp/sql/planner/plan/PlanFragment.java      |   8 +-
 .../plan/SimpleFragmentParallelPlanner.java        |   4 +-
 .../sql/planner/plan/node/PlanGraphPrinter.java    |   2 +-
 .../db/mpp/sql/planner/plan/node/PlanVisitor.java  |  13 +-
 .../mpp/sql/planner/plan/node/WritePlanNode.java   |   4 +-
 .../plan/node/metedata/read/SchemaScanNode.java    |   8 +-
 .../node/metedata/write/AlterTimeSeriesNode.java   |  41 +-
 .../node/metedata/write/CreateTimeSeriesNode.java  |  11 +-
 .../planner/plan/node/process/DeviceMergeNode.java |   8 +-
 .../planner/plan/node/process/ExchangeNode.java    |  13 +-
 .../planner/plan/node/sink/FragmentSinkNode.java   |  13 +-
 .../plan/node/source/SeriesAggregateScanNode.java  |  13 +-
 .../planner/plan/node/source/SeriesScanNode.java   |  16 +-
 .../sql/planner/plan/node/source/SourceNode.java   |   6 +-
 .../plan/node/write/InsertMultiTabletsNode.java    |  12 +-
 .../sql/planner/plan/node/write/InsertNode.java    |  10 +-
 .../sql/planner/plan/node/write/InsertRowNode.java |   4 +-
 .../planner/plan/node/write/InsertRowsNode.java    |   6 +-
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |   6 +-
 .../planner/plan/node/write/InsertTabletNode.java  |  22 +-
 .../db/mpp/sql/rewriter/ConcatPathRewriter.java    |   9 +-
 .../db/mpp/sql/rewriter/WildcardsRemover.java      |  14 +-
 .../component/GroupByLevelController.java          |   8 +-
 .../statement/crud/AggregationQueryStatement.java  |   7 +-
 .../mpp/sql/statement/crud/InsertRowStatement.java |   4 +-
 .../crud/InsertRowsOfOneDeviceStatement.java       |  14 +-
 .../sql/statement/crud/InsertTabletStatement.java  |   8 +-
 .../db/mpp/sql/statement/crud/QueryStatement.java  |  17 +-
 .../influxdb/function/InfluxFunctionFactory.java   |   8 +-
 .../protocol/influxdb/input/InfluxLineParser.java  |   5 +-
 .../protocol/influxdb/sql/InfluxDBSqlVisitor.java  |   3 +-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  19 +-
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |   9 +-
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |  13 +-
 .../iotdb/db/query/executor/QueryRouter.java       |   7 +-
 .../db/query/executor/RawDataQueryExecutor.java    |   7 +-
 .../db/query/factory/AggregateResultFactory.java   |  17 +-
 .../java/org/apache/iotdb/db/service/DataNode.java |  23 +-
 .../thrift/impl/DataNodeTSIServiceImpl.java        |  39 +-
 .../service/thrift/impl/InternalServiceImpl.java   |  35 +-
 .../db/sync/sender/manager/SchemaSyncManager.java  |   6 +-
 .../transport/server/TransportServiceImpl.java     |   9 +-
 .../org/apache/iotdb/db/wal/buffer/WALBuffer.java  | 128 +--
 .../java/org/apache/iotdb/db/wal/node/WALNode.java |   6 +-
 .../iotdb/db/metadata/mtree/MTreeBelowSGTest.java  |  18 +-
 .../schemaregion/rocksdb/MRocksDBUnitTest.java     |  29 +-
 .../rocksdb/RSchemaRegionAdvancedTest.java         |   4 +-
 .../db/metadata/upgrade/MetadataUpgradeTest.java   |   7 +
 .../db/mpp/execution/ConfigExecutionTest.java      |  78 +-
 .../db/mpp/schedule/DefaultTaskSchedulerTest.java  |  18 +
 .../schedule/FragmentInstanceSchedulerTest.java    |  20 +
 .../FragmentInstanceTimeoutSentinelTest.java       |  55 +-
 .../db/mpp/schedule/queue/L1PriorityQueueTest.java |  22 +
 .../db/mpp/schedule/queue/L2PriorityQueueTest.java |  27 +
 .../db/mpp/sql/plan/DistributionPlannerTest.java   | 155 ++--
 .../db/mpp/sql/plan/FragmentInstanceSerdeTest.java |  48 +-
 .../iotdb/db/mpp/sql/plan/QueryPlannerTest.java    |   5 +-
 .../read/DeviceSchemaScanNodeSerdeTest.java        |   6 +-
 .../read/TimeSeriesSchemaScanNodeSerdeTest.java    |   6 +-
 .../plan/node/process/AggregateNodeSerdeTest.java  |   8 +-
 .../node/process/DeviceMergeNodeSerdeTest.java     |   8 +-
 .../plan/node/process/ExchangeNodeSerdeTest.java   |  14 +-
 .../sql/plan/node/process/FillNodeSerdeTest.java   |   8 +-
 .../sql/plan/node/process/FilterNodeSerdeTest.java |   8 +-
 .../plan/node/process/FilterNullNodeSerdeTest.java |   8 +-
 .../node/process/GroupByLevelNodeSerdeTest.java    |   8 +-
 .../sql/plan/node/process/LimitNodeSerdeTest.java  |   8 +-
 .../sql/plan/node/process/OffsetNodeSerdeTest.java |  17 +-
 .../sql/plan/node/process/SortNodeSerdeTest.java   |   8 +-
 .../plan/node/process/TimeJoinNodeSerdeTest.java   |   8 +-
 .../plan/node/sink/FragmentSinkNodeSerdeTest.java  |   4 +-
 .../source/SeriesAggregateScanNodeSerdeTest.java   |   8 +-
 .../plan/node/source/SeriesScanNodeSerdeTest.java  |   8 +-
 .../iotdb/db/service/InternalServiceImplTest.java  | 164 +++-
 .../db/wal/recover/WALRecoverManagerTest.java      |   6 +-
 spark-iotdb-connector/pom.xml                      |   2 +-
 thrift-cluster/pom.xml                             |   5 +
 {thrift-cluster => thrift-commons}/pom.xml         |  11 +-
 .../src/main/thrift/common.thrift                  |  29 +-
 thrift-confignode/pom.xml                          |   2 +-
 .../src/main/thrift/confignode.thrift              |  38 +-
 {thrift-cluster => thrift-procedure}/pom.xml       |   5 +-
 .../src/main/thrift/procedure.thrift               |  24 +-
 thrift/pom.xml                                     |   5 +
 thrift/src/main/thrift/mpp.thrift                  |   7 +-
 .../apache/iotdb/tsfile/read/common/TimeRange.java |  24 +-
 .../read/controller/MetadataQuerierByFileImpl.java |  17 +-
 .../query/dataset/DataSetWithoutTimeGenerator.java |   6 +-
 .../tsfile/v2/read/TsFileSequenceReaderForV2.java  |  17 +-
 zeppelin-interpreter/pom.xml                       |   1 -
 311 files changed, 10456 insertions(+), 4211 deletions(-)

diff --cc server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java
index 3500192207,8df5935f77..bdd9b0f49b
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java
@@@ -133,10 -125,9 +133,9 @@@ public class LocalConfigNode 
  
        templateManager.init();
        storageGroupSchemaManager.init();
-       schemaPartitionTable.init();
-       schemaEngine.init();
  
-       initSchemaRegion();
+       Map<PartialPath, List<SchemaRegionId>> recoveredLocalSchemaRegionInfo = schemaEngine.init();
 -      partitionTable.init(recoveredLocalSchemaRegionInfo);
++      schemaPartitionTable.init(recoveredLocalSchemaRegionInfo);
  
        if (config.getSyncMlogPeriodInMs() != 0) {
          timedForceMLogThread =
@@@ -228,10 -195,10 +203,10 @@@
     */
    public void setStorageGroup(PartialPath storageGroup) throws MetadataException {
      storageGroupSchemaManager.setStorageGroup(storageGroup);
-     schemaPartitionTable.setStorageGroup(storageGroup);
 -    for (SchemaRegionId schemaRegionId : partitionTable.setStorageGroup(storageGroup)) {
++    for (SchemaRegionId schemaRegionId : schemaPartitionTable.setStorageGroup(storageGroup)) {
+       schemaEngine.createSchemaRegion(storageGroup, schemaRegionId);
+     }
  
-     schemaEngine.createSchemaRegion(
-         storageGroup, schemaPartitionTable.allocateSchemaRegionId(storageGroup));
      if (SchemaSyncManager.getInstance().isEnableSync()) {
        SchemaSyncManager.getInstance().syncMetadataPlan(new SetStorageGroupPlan(storageGroup));
      }
@@@ -542,19 -510,23 +518,23 @@@
     */
    public SchemaRegionId getBelongedSchemaRegionId(PartialPath path) throws MetadataException {
      PartialPath storageGroup = storageGroupSchemaManager.getBelongedStorageGroup(path);
-     SchemaRegionId schemaRegionId = schemaPartitionTable.getSchemaRegionId(storageGroup, path);
-     ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
-     if (schemaRegion == null) {
-       schemaEngine.createSchemaRegion(storageGroup, schemaRegionId);
-     }
 -    return partitionTable.getSchemaRegionId(storageGroup, path);
 +    return schemaPartitionTable.getSchemaRegionId(storageGroup, path);
    }
  
-   // This interface involves storage group auto creation
+   // This interface involves storage group and schema region auto creation
    public SchemaRegionId getBelongedSchemaRegionIdWithAutoCreate(PartialPath path)
        throws MetadataException {
-     ensureStorageGroup(path);
-     return getBelongedSchemaRegionId(path);
+     PartialPath storageGroup = ensureStorageGroup(path);
 -    SchemaRegionId schemaRegionId = partitionTable.getSchemaRegionId(storageGroup, path);
++    SchemaRegionId schemaRegionId = schemaPartitionTable.getSchemaRegionId(storageGroup, path);
+     if (schemaRegionId == null) {
 -      partitionTable.setStorageGroup(storageGroup);
 -      schemaRegionId = partitionTable.getSchemaRegionId(storageGroup, path);
++      schemaPartitionTable.setStorageGroup(storageGroup);
++      schemaRegionId = schemaPartitionTable.getSchemaRegionId(storageGroup, path);
+     }
+     ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
+     if (schemaRegion == null) {
+       schemaEngine.createSchemaRegion(storageGroup, schemaRegionId);
+     }
+     return schemaRegionId;
    }
  
    /**
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
index 5abf0490e9,1081b7359c..a1c31c99ab
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
@@@ -18,26 -18,17 +18,27 @@@
   */
  package org.apache.iotdb.db.mpp.execution.scheduler;
  
- import java.util.List;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.ScheduledExecutorService;
 -import org.apache.iotdb.db.engine.StorageEngine;
 +import org.apache.iotdb.db.engine.StorageEngineV2;
  import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
  import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 +import org.apache.iotdb.db.mpp.common.MPPQueryContext;
  import org.apache.iotdb.db.mpp.common.PlanFragmentId;
  import org.apache.iotdb.db.mpp.execution.FragmentInfo;
- 
- import io.airlift.units.Duration;
 +import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 +import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 +import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+ 
+ import io.airlift.units.Duration;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
++import java.util.List;
++import java.util.concurrent.ExecutorService;
++import java.util.concurrent.ScheduledExecutorService;
+ 
  public class StandaloneScheduler implements IScheduler {
  
 -  private static final StorageEngine STORAGE_ENGINE = StorageEngine.getInstance();
 +  private static final StorageEngineV2 STORAGE_ENGINE = StorageEngineV2.getInstance();
  
    private static final LocalSchemaProcessor SCHEMA_ENGINE = LocalSchemaProcessor.getInstance();
  
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
index c78fc3d8d9,7fbdb00bf4..fbe1be4c7f
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
@@@ -18,24 -18,11 +18,25 @@@
   */
  package org.apache.iotdb.db.mpp.sql.analyze;
  
++import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
++import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
++import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
++import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 +import org.apache.iotdb.commons.consensus.DataRegionId;
  import org.apache.iotdb.commons.partition.DataPartition;
  import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
- import org.apache.iotdb.commons.partition.RegionReplicaSet;
  import org.apache.iotdb.commons.partition.SchemaPartition;
- import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
- import org.apache.iotdb.commons.partition.TimePartitionSlot;
 +import org.apache.iotdb.db.conf.IoTDBDescriptor;
 +import org.apache.iotdb.db.engine.StorageEngineV2;
 +import org.apache.iotdb.db.exception.DataRegionException;
 +import org.apache.iotdb.db.exception.metadata.MetadataException;
 +import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 +import org.apache.iotdb.db.metadata.LocalConfigNode;
 +import org.apache.iotdb.db.metadata.path.PartialPath;
  import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
  
 +import java.util.Collections;
 +import java.util.HashMap;
  import java.util.List;
  import java.util.Map;
  
@@@ -67,42 -48,7 +68,44 @@@ public class StandalonePartitionFetche
    @Override
    public DataPartition getDataPartition(
        Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
 -    return null;
 +    try {
-       Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
++      Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>>
 +          dataPartitionMap = new HashMap<>();
 +      for (Map.Entry<String, List<DataPartitionQueryParam>> sgEntry :
 +          sgNameToQueryParamsMap.entrySet()) {
 +        // for each sg
 +        String storageGroupName = sgEntry.getKey();
 +        List<DataPartitionQueryParam> dataPartitionQueryParams = sgEntry.getValue();
-         Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>
++        Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>
 +            deviceToRegionsMap = new HashMap<>();
 +        for (DataPartitionQueryParam dataPartitionQueryParam : dataPartitionQueryParams) {
 +          // for each device
 +          String deviceId = dataPartitionQueryParam.getDevicePath();
 +          DataRegionId dataRegionId =
 +              localConfigNode.getBelongedDataRegionRegionId(new PartialPath(deviceId));
-           Map<TimePartitionSlot, List<RegionReplicaSet>> timePartitionToRegionsMap =
++          Map<TTimePartitionSlot, List<TRegionReplicaSet>> timePartitionToRegionsMap =
 +              new HashMap<>();
-           for (TimePartitionSlot timePartitionSlot :
++          for (TTimePartitionSlot timePartitionSlot :
 +              dataPartitionQueryParam.getTimePartitionSlotList()) {
 +            // for each time partition
 +            timePartitionToRegionsMap.put(
 +                timePartitionSlot,
 +                Collections.singletonList(
-                     new RegionReplicaSet(dataRegionId, Collections.EMPTY_LIST)));
++                    new TRegionReplicaSet(
++                        new TConsensusGroupId(dataRegionId.getType(), dataRegionId.getId()),
++                        Collections.EMPTY_LIST)));
 +          }
-           deviceToRegionsMap.put(new SeriesPartitionSlot(), timePartitionToRegionsMap);
++          deviceToRegionsMap.put(new TSeriesPartitionSlot(), timePartitionToRegionsMap);
 +        }
 +        dataPartitionMap.put(storageGroupName, deviceToRegionsMap);
 +      }
 +      return new DataPartition(
 +          dataPartitionMap,
 +          IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
 +          IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
 +    } catch (MetadataException | DataRegionException e) {
 +      throw new StatementAnalyzeException("An error occurred when executing getDataPartition()");
 +    }
    }
  
    @Override


[iotdb] 01/02: init

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch standaloneMPPWrite
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 92a1e2669c071b92d028fc1464aec6718ab52df1
Author: HTHou <hh...@outlook.com>
AuthorDate: Thu Apr 21 17:09:26 2022 +0800

    init
---
 .../apache/iotdb/db/metadata/LocalConfigNode.java  |  60 +++++++---
 .../iotdb/db/metadata/LocalDataPartitionTable.java | 124 +++++++++++++++++++++
 .../iotdb/db/mpp/execution/QueryExecution.java     |  24 ++--
 .../execution/scheduler/StandaloneScheduler.java   |  56 +++++++++-
 .../sql/analyze/StandalonePartitionFetcher.java    |  64 ++++++++++-
 .../mpp/sql/analyze/StandaloneSchemaFetcher.java   |  38 ++++++-
 .../java/org/apache/iotdb/db/service/DataNode.java |   1 +
 .../thrift/impl/DataNodeTSIServiceImpl.java        |  17 ++-
 8 files changed, 352 insertions(+), 32 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java
index 2b2f20433a..3500192207 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalConfigNode.java
@@ -21,10 +21,14 @@ package org.apache.iotdb.db.metadata;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.exception.DataRegionException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
@@ -87,7 +91,11 @@ public class LocalConfigNode {
       StorageGroupSchemaManager.getInstance();
   private TemplateManager templateManager = TemplateManager.getInstance();
   private SchemaEngine schemaEngine = SchemaEngine.getInstance();
-  private LocalSchemaPartitionTable partitionTable = LocalSchemaPartitionTable.getInstance();
+  private LocalSchemaPartitionTable schemaPartitionTable = LocalSchemaPartitionTable.getInstance();
+
+  private StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
+
+  private LocalDataPartitionTable dataPartitionTable = LocalDataPartitionTable.getInstance();
 
   private LocalConfigNode() {
     String schemaDir = config.getSchemaDir();
@@ -125,7 +133,7 @@ public class LocalConfigNode {
 
       templateManager.init();
       storageGroupSchemaManager.init();
-      partitionTable.init();
+      schemaPartitionTable.init();
       schemaEngine.init();
 
       initSchemaRegion();
@@ -150,7 +158,7 @@ public class LocalConfigNode {
 
   private void initSchemaRegion() throws MetadataException {
     for (PartialPath storageGroup : storageGroupSchemaManager.getAllStorageGroupPaths()) {
-      partitionTable.setStorageGroup(storageGroup);
+      schemaPartitionTable.setStorageGroup(storageGroup);
 
       File sgDir = new File(config.getSchemaDir(), storageGroup.getFullPath());
 
@@ -167,7 +175,7 @@ public class LocalConfigNode {
         SchemaRegionId schemaRegionId =
             new SchemaRegionId(Integer.parseInt(schemaRegionDir.getName()));
         schemaEngine.createSchemaRegion(storageGroup, schemaRegionId);
-        partitionTable.putSchemaRegionId(storageGroup, schemaRegionId);
+        schemaPartitionTable.putSchemaRegionId(storageGroup, schemaRegionId);
       }
     }
   }
@@ -185,7 +193,7 @@ public class LocalConfigNode {
         timedForceMLogThread = null;
       }
 
-      partitionTable.clear();
+      schemaPartitionTable.clear();
       schemaEngine.clear();
       storageGroupSchemaManager.clear();
       templateManager.clear();
@@ -220,10 +228,10 @@ public class LocalConfigNode {
    */
   public void setStorageGroup(PartialPath storageGroup) throws MetadataException {
     storageGroupSchemaManager.setStorageGroup(storageGroup);
-    partitionTable.setStorageGroup(storageGroup);
+    schemaPartitionTable.setStorageGroup(storageGroup);
 
     schemaEngine.createSchemaRegion(
-        storageGroup, partitionTable.allocateSchemaRegionId(storageGroup));
+        storageGroup, schemaPartitionTable.allocateSchemaRegionId(storageGroup));
     if (SchemaSyncManager.getInstance().isEnableSync()) {
       SchemaSyncManager.getInstance().syncMetadataPlan(new SetStorageGroupPlan(storageGroup));
     }
@@ -235,7 +243,7 @@ public class LocalConfigNode {
 
   public void deleteStorageGroup(PartialPath storageGroup) throws MetadataException {
     deleteSchemaRegionsInStorageGroup(
-        storageGroup, partitionTable.getSchemaRegionIdsByStorageGroup(storageGroup));
+        storageGroup, schemaPartitionTable.getSchemaRegionIdsByStorageGroup(storageGroup));
 
     for (Template template : templateManager.getTemplateMap().values()) {
       templateManager.unmarkStorageGroup(template, storageGroup.getFullPath());
@@ -249,7 +257,7 @@ public class LocalConfigNode {
       MemTableManager.getInstance().addOrDeleteStorageGroup(-1);
     }
 
-    partitionTable.deleteStorageGroup(storageGroup);
+    schemaPartitionTable.deleteStorageGroup(storageGroup);
 
     // delete storage group after all related resources have been cleared
     storageGroupSchemaManager.deleteStorageGroup(storageGroup);
@@ -534,12 +542,12 @@ public class LocalConfigNode {
    */
   public SchemaRegionId getBelongedSchemaRegionId(PartialPath path) throws MetadataException {
     PartialPath storageGroup = storageGroupSchemaManager.getBelongedStorageGroup(path);
-    SchemaRegionId schemaRegionId = partitionTable.getSchemaRegionId(storageGroup, path);
+    SchemaRegionId schemaRegionId = schemaPartitionTable.getSchemaRegionId(storageGroup, path);
     ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
     if (schemaRegion == null) {
       schemaEngine.createSchemaRegion(storageGroup, schemaRegionId);
     }
-    return partitionTable.getSchemaRegionId(storageGroup, path);
+    return schemaPartitionTable.getSchemaRegionId(storageGroup, path);
   }
 
   // This interface involves storage group auto creation
@@ -561,14 +569,15 @@ public class LocalConfigNode {
     for (PartialPath storageGroup :
         storageGroupSchemaManager.getInvolvedStorageGroups(pathPattern, isPrefixMatch)) {
       result.addAll(
-          partitionTable.getInvolvedSchemaRegionIds(storageGroup, pathPattern, isPrefixMatch));
+          schemaPartitionTable.getInvolvedSchemaRegionIds(
+              storageGroup, pathPattern, isPrefixMatch));
     }
     return result;
   }
 
   public List<SchemaRegionId> getSchemaRegionIdsByStorageGroup(PartialPath storageGroup)
       throws MetadataException {
-    return partitionTable.getSchemaRegionIdsByStorageGroup(storageGroup);
+    return schemaPartitionTable.getSchemaRegionIdsByStorageGroup(storageGroup);
   }
 
   // endregion
@@ -754,4 +763,29 @@ public class LocalConfigNode {
   }
 
   // endregion
+
+  // region Interfaces for DataRegionId Management
+  /**
+   * Get the target DataRegionIds, which the given path belongs to. The path must be a fullPath
+   * without wildcards, * or **. This method is the first step when there's a task on one certain
+   * path, e.g., root.sg1 is a storage group and path = root.sg1.d1, return DataRegionId of
+   * root.sg1. If there's no storage group on the given path, StorageGroupNotSetException will be
+   * thrown.
+   */
+  public DataRegionId getBelongedDataRegionRegionId(PartialPath path)
+      throws MetadataException, DataRegionException {
+    PartialPath storageGroup = storageGroupSchemaManager.getBelongedStorageGroup(path);
+    DataRegionId dataRegionId = dataPartitionTable.getDataRegionId(storageGroup, path);
+    DataRegion dataRegion = storageEngine.getDataRegion(dataRegionId);
+    if (dataRegion == null) {
+      storageEngine.createDataRegion(dataRegionId, storageGroup.getFullPath(), Long.MAX_VALUE);
+    }
+    return dataPartitionTable.getDataRegionId(storageGroup, path);
+  }
+
+  public List<DataRegionId> getDataRegionIdsByStorageGroup(PartialPath storageGroup) {
+    return dataPartitionTable.getDataRegionIdsByStorageGroup(storageGroup);
+  }
+
+  // endregion
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalDataPartitionTable.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalDataPartitionTable.java
new file mode 100644
index 0000000000..52fc102437
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalDataPartitionTable.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+// This class is used for data partition maintaining the map between storage group and
+// dataRegionIds.
+public class LocalDataPartitionTable {
+
+  private AtomicInteger dataRegionIdGenerator;
+
+  private Map<PartialPath, Set<DataRegionId>> table;
+
+  private static class LocalDataPartitionTableHolder {
+    private static final LocalDataPartitionTable INSTANCE = new LocalDataPartitionTable();
+
+    private LocalDataPartitionTableHolder() {};
+  }
+
+  private LocalDataPartitionTable() {}
+
+  public static LocalDataPartitionTable getInstance() {
+    return LocalDataPartitionTableHolder.INSTANCE;
+  }
+
+  public synchronized void init() {
+    table = new ConcurrentHashMap<>();
+    dataRegionIdGenerator = new AtomicInteger(0);
+  }
+
+  public synchronized void clear() {
+    if (table != null) {
+      table.clear();
+      table = null;
+    }
+
+    if (dataRegionIdGenerator != null) {
+      dataRegionIdGenerator = null;
+    }
+  }
+
+  public synchronized DataRegionId allocateDataRegionId(PartialPath storageGroup) {
+    DataRegionId dataRegionId = new DataRegionId(dataRegionIdGenerator.getAndIncrement());
+    table.get(storageGroup).add(dataRegionId);
+    return dataRegionId;
+  }
+
+  public synchronized void putDataRegionId(PartialPath storageGroup, DataRegionId dataRegionId) {
+    table.get(storageGroup).add(dataRegionId);
+
+    if (dataRegionId.getId() >= dataRegionIdGenerator.get()) {
+      dataRegionIdGenerator.set(dataRegionId.getId() + 1);
+    }
+  }
+
+  public synchronized void removeDataRegionId(PartialPath storageGroup, DataRegionId dataRegionId) {
+    table.get(storageGroup).remove(dataRegionId);
+  }
+
+  public DataRegionId getDataRegionId(PartialPath storageGroup, PartialPath path) {
+    return calculateDataRegionId(storageGroup, path);
+  }
+
+  public List<DataRegionId> getInvolvedDataRegionIds(
+      PartialPath storageGroup, PartialPath pathPattern, boolean isPrefixMatch) {
+    List<DataRegionId> result = new ArrayList<>();
+    if (table.containsKey(storageGroup)) {
+      result.addAll(table.get(storageGroup));
+    }
+    return result;
+  }
+
+  public List<DataRegionId> getDataRegionIdsByStorageGroup(PartialPath storageGroup) {
+    return new ArrayList<>(table.get(storageGroup));
+  }
+
+  public synchronized void setStorageGroup(PartialPath storageGroup) {
+    if (table.containsKey(storageGroup)) {
+      return;
+    }
+    table.put(storageGroup, Collections.synchronizedSet(new HashSet<>()));
+  }
+
+  public synchronized Set<DataRegionId> deleteStorageGroup(PartialPath storageGroup) {
+    return table.remove(storageGroup);
+  }
+
+  // This method may be extended to implement multi dataRegion for one storageGroup
+  // todo keep consistent with the partition method of config node in new cluster
+  private DataRegionId calculateDataRegionId(PartialPath storageGroup, PartialPath path) {
+    if (!table.containsKey(storageGroup)) {
+      setStorageGroup(storageGroup);
+    }
+    return table.get(storageGroup).iterator().next();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
index a9bd605823..6eaf77d939 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.db.mpp.execution.scheduler.ClusterScheduler;
 import org.apache.iotdb.db.mpp.execution.scheduler.IScheduler;
+import org.apache.iotdb.db.mpp.execution.scheduler.StandaloneScheduler;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.analyze.Analyzer;
 import org.apache.iotdb.db.mpp.sql.analyze.IPartitionFetcher;
@@ -37,6 +38,7 @@ import org.apache.iotdb.db.mpp.sql.planner.LogicalPlanner;
 import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -142,13 +144,21 @@ public class QueryExecution implements IQueryExecution {
   private void schedule() {
     // TODO: (xingtanzjr) initialize the query scheduler according to configuration
     this.scheduler =
-        new ClusterScheduler(
-            context,
-            stateMachine,
-            distributedPlan.getInstances(),
-            context.getQueryType(),
-            executor,
-            scheduledExecutor);
+        IoTDB.getInstance().isClusterMode()
+            ? new ClusterScheduler(
+                context,
+                stateMachine,
+                distributedPlan.getInstances(),
+                context.getQueryType(),
+                executor,
+                scheduledExecutor)
+            : new StandaloneScheduler(
+                context,
+                stateMachine,
+                distributedPlan.getInstances(),
+                context.getQueryType(),
+                executor,
+                scheduledExecutor);
     this.scheduler.start();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
index 1081b7359c..5abf0490e9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/scheduler/StandaloneScheduler.java
@@ -18,22 +18,72 @@
  */
 package org.apache.iotdb.db.mpp.execution.scheduler;
 
-import org.apache.iotdb.db.engine.StorageEngine;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.iotdb.db.engine.StorageEngineV2;
 import org.apache.iotdb.db.metadata.LocalSchemaProcessor;
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.execution.FragmentInfo;
 
 import io.airlift.units.Duration;
+import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class StandaloneScheduler implements IScheduler {
 
-  private static final StorageEngine STORAGE_ENGINE = StorageEngine.getInstance();
+  private static final StorageEngineV2 STORAGE_ENGINE = StorageEngineV2.getInstance();
 
   private static final LocalSchemaProcessor SCHEMA_ENGINE = LocalSchemaProcessor.getInstance();
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(ClusterScheduler.class);
+
+  private MPPQueryContext queryContext;
+  // The stateMachine of the QueryExecution owned by this QueryScheduler
+  private QueryStateMachine stateMachine;
+  private QueryType queryType;
+  // The fragment instances which should be sent to corresponding Nodes.
+  private List<FragmentInstance> instances;
+
+  private ExecutorService executor;
+  private ScheduledExecutorService scheduledExecutor;
+
+  private IFragInstanceDispatcher dispatcher;
+  private IFragInstanceStateTracker stateTracker;
+  private IQueryTerminator queryTerminator;
+
+  public StandaloneScheduler(
+      MPPQueryContext queryContext,
+      QueryStateMachine stateMachine,
+      List<FragmentInstance> instances,
+      QueryType queryType,
+      ExecutorService executor,
+      ScheduledExecutorService scheduledExecutor) {
+    this.queryContext = queryContext;
+    this.instances = instances;
+    this.queryType = queryType;
+    this.executor = executor;
+    this.scheduledExecutor = scheduledExecutor;
+    this.stateTracker =
+        new FixedRateFragInsStateTracker(stateMachine, executor, scheduledExecutor, instances);
+    this.queryTerminator =
+        new SimpleQueryTerminator(executor, queryContext.getQueryId(), instances);
+  }
+
   @Override
-  public void start() {}
+  public void start() {
+    // For the FragmentInstance of WRITE, it will be executed directly when dispatching.
+    // TODO: Other QueryTypes
+    if (queryType == QueryType.WRITE) {
+
+      return;
+    }
+  }
 
   @Override
   public void stop() {}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
index 7fbdb00bf4..c78fc3d8d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandalonePartitionFetcher.java
@@ -18,21 +18,40 @@
  */
 package org.apache.iotdb.db.mpp.sql.analyze;
 
+import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.commons.partition.RegionReplicaSet;
 import org.apache.iotdb.commons.partition.SchemaPartition;
+import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
+import org.apache.iotdb.commons.partition.TimePartitionSlot;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.exception.DataRegionException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
+import org.apache.iotdb.db.metadata.LocalConfigNode;
+import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 public class StandalonePartitionFetcher implements IPartitionFetcher {
 
-  private StandalonePartitionFetcher() {}
+  private final LocalConfigNode localConfigNode = LocalConfigNode.getInstance();
+  private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
+
+  private static final class StandalonePartitionFetcherHolder {
+    private static final StandalonePartitionFetcher INSTANCE = new StandalonePartitionFetcher();
+
+    private StandalonePartitionFetcherHolder() {}
+  }
 
-  // TODO need to use safe singleton pattern
   public static StandalonePartitionFetcher getInstance() {
-    return new StandalonePartitionFetcher();
+    return StandalonePartitionFetcher.StandalonePartitionFetcherHolder.INSTANCE;
   }
 
   @Override
@@ -48,12 +67,47 @@ public class StandalonePartitionFetcher implements IPartitionFetcher {
   @Override
   public DataPartition getDataPartition(
       Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
-    return null;
+    try {
+      Map<String, Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>>
+          dataPartitionMap = new HashMap<>();
+      for (Map.Entry<String, List<DataPartitionQueryParam>> sgEntry :
+          sgNameToQueryParamsMap.entrySet()) {
+        // for each sg
+        String storageGroupName = sgEntry.getKey();
+        List<DataPartitionQueryParam> dataPartitionQueryParams = sgEntry.getValue();
+        Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>>
+            deviceToRegionsMap = new HashMap<>();
+        for (DataPartitionQueryParam dataPartitionQueryParam : dataPartitionQueryParams) {
+          // for each device
+          String deviceId = dataPartitionQueryParam.getDevicePath();
+          DataRegionId dataRegionId =
+              localConfigNode.getBelongedDataRegionRegionId(new PartialPath(deviceId));
+          Map<TimePartitionSlot, List<RegionReplicaSet>> timePartitionToRegionsMap =
+              new HashMap<>();
+          for (TimePartitionSlot timePartitionSlot :
+              dataPartitionQueryParam.getTimePartitionSlotList()) {
+            // for each time partition
+            timePartitionToRegionsMap.put(
+                timePartitionSlot,
+                Collections.singletonList(
+                    new RegionReplicaSet(dataRegionId, Collections.EMPTY_LIST)));
+          }
+          deviceToRegionsMap.put(new SeriesPartitionSlot(), timePartitionToRegionsMap);
+        }
+        dataPartitionMap.put(storageGroupName, deviceToRegionsMap);
+      }
+      return new DataPartition(
+          dataPartitionMap,
+          IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+          IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+    } catch (MetadataException | DataRegionException e) {
+      throw new StatementAnalyzeException("An error occurred when executing getDataPartition()");
+    }
   }
 
   @Override
   public DataPartition getOrCreateDataPartition(
       Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
-    return null;
+    return getDataPartition(sgNameToQueryParamsMap);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
index e1deb0f9b5..81d1702710 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/StandaloneSchemaFetcher.java
@@ -18,15 +18,29 @@
  */
 package org.apache.iotdb.db.mpp.sql.analyze;
 
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.LocalConfigNode;
 import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
+import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.common.schematree.SchemaTree;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 public class StandaloneSchemaFetcher implements ISchemaFetcher {
 
+  private final LocalConfigNode localConfigNode = LocalConfigNode.getInstance();
+  private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
+
   private StandaloneSchemaFetcher() {}
 
   public static StandaloneSchemaFetcher getInstance() {
@@ -35,7 +49,22 @@ public class StandaloneSchemaFetcher implements ISchemaFetcher {
 
   @Override
   public SchemaTree fetchSchema(PathPatternTree patternTree) {
-    return null;
+    Set<String> storageGroupSet = new HashSet<>();
+    SchemaTree schemaTree = new SchemaTree();
+    List<PartialPath> partialPathList = patternTree.splitToPathList();
+    try {
+      for (PartialPath path : partialPathList) {
+        String storageGroup = localConfigNode.getBelongedStorageGroup(path).getFullPath();
+        storageGroupSet.add(storageGroup);
+        SchemaRegionId schemaRegionId = localConfigNode.getBelongedSchemaRegionId(path);
+        ISchemaRegion schemaRegion = schemaEngine.getSchemaRegion(schemaRegionId);
+        schemaTree.appendMeasurementPaths(schemaRegion.getMeasurementPaths(path, false));
+      }
+    } catch (MetadataException e) {
+      throw new RuntimeException(e);
+    }
+    schemaTree.setStorageGroups(new ArrayList<>(storageGroupSet));
+    return schemaTree;
   }
 
   @Override
@@ -50,6 +79,11 @@ public class StandaloneSchemaFetcher implements ISchemaFetcher {
       List<String[]> measurements,
       List<TSDataType[]> tsDataTypes,
       List<Boolean> aligned) {
-    return null;
+    Map<PartialPath, List<String>> deviceToMeasurementMap = new HashMap<>();
+    for (int i = 0; i < devicePath.size(); i++) {
+      deviceToMeasurementMap.put(devicePath.get(i), Arrays.asList(measurements.get(i)));
+    }
+    // todo implement auto create schema
+    return fetchSchema(new PathPatternTree(deviceToMeasurementMap));
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index f2285abbc3..e06451f7a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -159,6 +159,7 @@ public class DataNode implements DataNodeMBean {
   public void active() throws StartupException {
     // set the mpp mode to true
     IoTDBDescriptor.getInstance().getConfig().setMppMode(true);
+    IoTDB.getInstance().setClusterMode();
     // start iotdb server first
     IoTDB.getInstance().active();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
index 3de0a28cc2..091548c8d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeTSIServiceImpl.java
@@ -31,6 +31,8 @@ import org.apache.iotdb.db.mpp.sql.analyze.ClusterPartitionFetcher;
 import org.apache.iotdb.db.mpp.sql.analyze.ClusterSchemaFetcher;
 import org.apache.iotdb.db.mpp.sql.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.mpp.sql.analyze.ISchemaFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.StandalonePartitionFetcher;
+import org.apache.iotdb.db.mpp.sql.analyze.StandaloneSchemaFetcher;
 import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
 import org.apache.iotdb.db.mpp.sql.statement.crud.InsertMultiTabletsStatement;
@@ -39,6 +41,7 @@ import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowsOfOneDeviceStatement
 import org.apache.iotdb.db.mpp.sql.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.query.control.SessionManager;
+import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
 import org.apache.iotdb.db.service.metrics.MetricsService;
 import org.apache.iotdb.db.service.metrics.Operation;
@@ -74,9 +77,19 @@ public class DataNodeTSIServiceImpl implements TSIEventHandler {
 
   private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
 
-  private static final IPartitionFetcher PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
+  private final IPartitionFetcher PARTITION_FETCHER;
 
-  private static final ISchemaFetcher SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
+  private final ISchemaFetcher SCHEMA_FETCHER;
+
+  public DataNodeTSIServiceImpl() {
+    if (IoTDB.getInstance().isClusterMode()) {
+      PARTITION_FETCHER = ClusterPartitionFetcher.getInstance();
+      SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance();
+    } else {
+      PARTITION_FETCHER = StandalonePartitionFetcher.getInstance();
+      SCHEMA_FETCHER = StandaloneSchemaFetcher.getInstance();
+    }
+  }
 
   @Override
   public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException {