You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/13 14:43:48 UTC

[iotdb] branch xingtanzjr/write_instance_parallel updated (850198002e -> b8a351c26a)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch xingtanzjr/write_instance_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git


    from 850198002e tmp save
     add 506def32b0 [IOTDB-2804] DataRegion consensus state machine (#5479)
     add 53089cee1f [IOTDB-2803] Implement create timeseries metadata operation through consensus layer in MPP framework (#5403)
     add d3b30cf654 [IOTDB-2889] Add method converting TsBlock to TSQueryDataSet in QueryDataSetUtils (#5490)
     add f33ee814b8 [IOTDB-2890] Dynamic port support for DataBlockManager (#5491)
     add f0ff5510a6 [IOTDB-2879] Add influxdb-protocol module into the root pom (#5473)
     add 0ec17d79eb fix StorageEngineV2 cannot start (#5484)
     add ce4f2b25e4 [IOTDB-2886] refact LocalConfigManager and finish createSchemaRegion (#5486)
     add 4fc9c7da54 [IOTDB-2803][new cluster] Adapt show timeseries to mpp (#5418)
     add c368467c20 [IOTDB-2866] Support get or create SchemaPartition with PatternTree in config node (#5493)
     add fc0313ed8b Fix compile error (#5500)
     add 22a6c225e0 remove IllegalPathException and IOException in PlanFragment deserialization
     add 1ef5ffaaa4 fix some issue
     add 25c819914e tmp save
     add add58b9a5e add schemaRegion create in test
     add 303bd75f24 reformat code in FragmentInstance
     add 85ed394166 use visit mode to implement PlanNode to PhysicalPlan
     add be6bd1596c fix compile issue
     add 1fe96960fa make intercal_ip as default 127.0.0.1
     add 9e1a508be4 [IOTDB-2897] Fix wal recover deadlock (#5499)
     add f60c584f4d New sync framework with TsFileSync (#5353)
     add 47e2af285a [IOTDB-2871] Data node client to connect with config node (#5488)
     add b9a8098146 Merge branch 'master' into xingtanzjr/modification_pr
     add c5192db130 spotless
     new d32fc0713c Merge branch 'xingtanzjr/modification_pr' into xingtanzjr/write_instance_parallel
     new 8534b11a14 fix UT error
     new b8a351c26a add UT for DistributedPlan Write

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/influxdb-protocol.yml            |   2 +-
 .github/workflows/main-unix.yml                    |   3 +-
 .github/workflows/main-win.yml                     |   3 +-
 .../apache/iotdb/db/qp/sql/InfluxDBSqlParser.g4    |   2 +-
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  67 +-
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |  24 +
 .../org/apache/iotdb/cluster/ClusterIoTDB.java     |   1 -
 .../apache/iotdb/cluster/query/ClusterPlanner.java |   1 +
 .../apache/iotdb/cluster/utils/StatusUtils.java    |   6 +-
 confignode/pom.xml                                 |   5 +
 .../resources/conf/iotdb-confignode.properties     |  13 +-
 .../iotdb/confignode/conf/ConfigNodeConf.java      |  14 +
 .../confignode/conf/ConfigNodeDescriptor.java      |   4 +
 .../consensus/response/DataPartitionDataSet.java   |  22 +-
 .../consensus/response/SchemaPartitionDataSet.java |  77 +-
 .../response/StorageGroupSchemaDataSet.java        |  12 +-
 .../iotdb/confignode/manager/ConfigManager.java    | 114 ++-
 .../iotdb/confignode/manager/ConsensusManager.java |  36 +-
 .../apache/iotdb/confignode/manager/Manager.java   |  24 +-
 .../iotdb/confignode/manager/PartitionManager.java | 130 ++--
 .../iotdb/confignode/manager/RegionManager.java    |   6 +-
 .../confignode/partition/StorageGroupSchema.java   |   9 +
 .../persistence/PartitionInfoPersistence.java      |  60 +-
 .../persistence/RegionInfoPersistence.java         |  29 +-
 .../iotdb/confignode/physical/PhysicalPlan.java    |   2 +-
 .../physical/crud/CreateDataPartitionPlan.java     |   5 +-
 .../physical/crud/CreateRegionsPlan.java           |   5 +-
 .../physical/crud/CreateSchemaPartitionPlan.java   |  70 +-
 .../crud/GetOrCreateDataPartitionPlan.java         |   6 +-
 .../crud/GetOrCreateSchemaPartitionPlan.java       |  86 ++-
 .../confignode/service/executor/PlanExecutor.java  |   4 +-
 .../server/ConfigNodeRPCServerProcessor.java       |  64 +-
 .../confignode/consensus/RatisConsensusDemo.java   |  10 +-
 .../hash/DeviceGroupHashExecutorManualTest.java    |   5 +-
 .../physical/SerializeDeserializeUT.java           |  45 +-
 .../server/ConfigNodeRPCServerProcessorTest.java   | 284 +++++---
 .../iotdb/consensus/ratis/RequestMessage.java      |   1 +
 distribution/pom.xml                               |   6 +
 docs/UserGuide/Maintenance-Tools/Sync-Tool.md      | 396 +++++++---
 docs/zh/UserGuide/Maintenance-Tools/Sync-Tool.md   | 493 ++++++++++---
 influxdb-protocol/pom.xml                          |  38 +-
 .../iotdb/influxdb/protocol/dto/SessionPoint.java  |   6 +-
 .../protocol/impl/IoTDBInfluxDBService.java        |  12 +-
 .../iotdb/influxdb/session/InfluxDBSession.java    |  42 +-
 .../sync/IoTDBSyncReceiverCollectorIT.java         | 513 +++++++++++++
 .../db/integration/sync/IoTDBSyncReceiverIT.java   | 357 +++++++++
 .../sync/IoTDBSyncReceiverLoaderIT.java            | 210 ++++++
 .../db/integration/sync/IoTDBSyncSenderIT.java     | 304 ++++++++
 .../iotdb/db/integration/sync/SyncTestUtil.java    | 203 ++++++
 .../db/integration/sync/TransportClientMock.java   |  69 ++
 .../src/test/resources/iotdb-engine.properties     |   3 +-
 integration/src/test/resources/logback-test.xml    |   2 +-
 .../iotdb/commons/cluster/DataNodeLocation.java    |   6 +-
 .../org/apache/iotdb/commons/cluster/Endpoint.java |   6 +-
 .../iotdb/commons/concurrent/ThreadName.java       |   6 +-
 .../apache/iotdb/commons/conf/IoTDBConstant.java   |  15 +
 .../iotdb/commons/consensus/ConsensusGroupId.java  |   5 +-
 .../iotdb/commons/partition/DataPartition.java     |  15 +-
 .../iotdb/commons/partition/RegionReplicaSet.java  |  41 +-
 .../iotdb/commons/partition/SchemaPartition.java   | 120 ++-
 .../apache/iotdb/commons/service/ServiceType.java  |   2 +
 .../apache/iotdb/commons/utils/CommonUtils.java    |   4 +-
 .../apache/iotdb/commons}/utils/StatusUtils.java   |  49 +-
 pom.xml                                            |   1 +
 server/pom.xml                                     |   5 +
 .../resources/conf/iotdb-engine.properties         |  19 +-
 .../resources/conf/iotdb-sync-client.properties    |  35 -
 .../assembly/resources/tools/start-sync-client.bat |  71 --
 .../assembly/resources/tools/start-sync-client.sh  |  54 --
 .../assembly/resources/tools/stop-sync-client.bat  |  23 -
 .../assembly/resources/tools/stop-sync-client.sh   |  30 -
 .../apache/iotdb/db/client/ConfigNodeClient.java   | 309 ++++++++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  81 ++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  43 +-
 .../consensus/statemachine/BaseStateMachine.java   |   2 +
 .../statemachine/DataRegionStateMachine.java       |  37 +-
 .../statemachine/SchemaRegionStateMachine.java     |  17 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  16 +-
 .../apache/iotdb/db/engine/StorageEngineV2.java    |  62 +-
 .../iotdb/db/engine/modification/Deletion.java     |  23 +
 .../iotdb/db/engine/storagegroup/DataRegion.java   | 183 +++--
 .../db/engine/storagegroup/TsFileManager.java      |  38 +
 .../db/engine/storagegroup/TsFileProcessor.java    |  23 +
 .../dataregion/StorageGroupManager.java            |  10 +
 .../sync/PipeDataLoadBearableException.java}       |  10 +-
 .../sync/PipeDataLoadException.java}               |  10 +-
 .../sync/PipeDataLoadUnbearableException.java}     |  10 +-
 .../sync/PipeException.java}                       |  20 +-
 .../sync/PipeServerException.java}                 |  22 +-
 .../PipeSinkException.java}                        |  24 +-
 .../iotdb/db/metadata/Executor/SchemaVisitor.java  |  96 +++
 ...ocalConfigManager.java => LocalConfigNode.java} | 170 ++---
 .../db/metadata/LocalSchemaPartitionTable.java     |   4 +-
 .../iotdb/db/metadata/LocalSchemaProcessor.java    |  28 +-
 .../iotdb/db/metadata/mtree/MTreeBelowSG.java      |   1 +
 .../traverser/collector/MeasurementCollector.java  |   4 +
 .../db/metadata/schemaregion/SchemaEngine.java     |  27 +-
 .../db/metadata/schemaregion/SchemaRegion.java     |  20 +
 .../storagegroup/IStorageGroupSchemaManager.java   |   8 +
 .../storagegroup/StorageGroupSchemaManager.java    |  28 +
 .../iotdb/db/mpp/buffer/DataBlockManager.java      |   7 +-
 .../iotdb/db/mpp/buffer/IDataBlockManager.java     |   5 +
 .../apache/iotdb/db/mpp/common/PlanFragmentId.java |  10 +-
 .../org/apache/iotdb/db/mpp/common/QueryId.java    |   8 +-
 .../apache/iotdb/db/mpp/execution/DataDriver.java  |   3 +-
 .../db/mpp/execution/FragmentInstanceManager.java  |   4 +-
 .../iotdb/db/mpp/execution/QueryExecution.java     |  22 +-
 .../db/mpp/execution/SchemaDriverContext.java      |   8 +-
 .../scheduler/SimpleFragInstanceDispatcher.java    |   4 +-
 .../operator/schema/DevicesSchemaScanOperator.java |  97 +++
 .../mpp/operator/schema/SchemaMergeOperator.java   |  79 ++
 .../db/mpp/operator/schema/SchemaScanOperator.java | 107 +++
 .../schema/TimeSeriesSchemaScanOperator.java       | 142 ++++
 .../db/mpp/operator/source/SeriesScanOperator.java |   2 +-
 .../apache/iotdb/db/mpp/sql/analyze/Analyzer.java  |  42 +-
 .../db/mpp/sql/analyze/ClusterSchemaFetcher.java   |   7 +-
 .../db/mpp/sql/analyze/FakeSchemaFetcherImpl.java  |   7 +-
 .../iotdb/db/mpp/sql/analyze/ISchemaFetcher.java   |   7 +-
 .../mpp/sql/analyze/StandaloneSchemaFetcher.java   |   7 +-
 .../apache/iotdb/db/mpp/sql/parser/ASTVisitor.java |   5 +
 .../db/mpp/sql/planner/DistributionPlanner.java    | 133 +++-
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  |  65 +-
 .../iotdb/db/mpp/sql/planner/LogicalPlanner.java   |  47 ++
 .../iotdb/db/mpp/sql/planner/QueryPlanBuilder.java |  42 ++
 .../db/mpp/sql/planner/plan/FragmentInstance.java  |  61 +-
 .../db/mpp/sql/planner/plan/PlanFragment.java      |  10 +-
 .../plan/SimpleFragmentParallelPlanner.java        |   6 +-
 .../planner/plan/WriteFragmentParallelPlanner.java |  19 +-
 .../db/mpp/sql/planner/plan/node/PlanNode.java     |   1 +
 .../db/mpp/sql/planner/plan/node/PlanNodeType.java |  15 +-
 .../db/mpp/sql/planner/plan/node/PlanNodeUtil.java |   3 +
 .../db/mpp/sql/planner/plan/node/PlanVisitor.java  |  30 +
 .../node/metedata/read/DevicesSchemaScanNode.java  | 108 +++
 .../read/SchemaMergeNode.java}                     |  44 +-
 .../plan/node/metedata/read/SchemaScanNode.java    | 119 +++
 .../metedata/read/TimeSeriesSchemaScanNode.java    | 145 ++++
 .../node/metedata/write/AlterTimeSeriesNode.java   |  37 +
 .../write/CreateAlignedTimeSeriesNode.java         |  13 +
 .../node/metedata/write/CreateTimeSeriesNode.java  | 127 +++-
 .../planner/plan/node/process/ExchangeNode.java    |   1 +
 .../planner/plan/node/sink/FragmentSinkNode.java   |   1 +
 .../plan/node/source/SeriesAggregateScanNode.java  |  11 +-
 .../planner/plan/node/source/SeriesScanNode.java   |  16 +-
 .../sql/planner/plan/node/source/SourceNode.java   |   4 +-
 .../plan/node/write/InsertMultiTabletsNode.java    |   7 +-
 .../sql/planner/plan/node/write/InsertNode.java    |   1 +
 .../planner/plan/node/write/InsertRowsNode.java    |   5 +
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |   5 +
 .../db/mpp/sql/statement/StatementVisitor.java     |  10 +
 .../crud/InsertMultiTabletsStatement.java          |   8 +
 .../crud/InsertRowsOfOneDeviceStatement.java       |   1 +
 .../sql/statement/crud/InsertRowsStatement.java    |   8 +
 .../db/mpp/sql/statement/crud/InsertStatement.java |   1 +
 .../db/mpp/sql/statement/crud/QueryStatement.java  |   1 +
 .../metadata/AlterTimeSeriesStatement.java         |   1 +
 .../metadata/CreateAlignedTimeSeriesStatement.java |   1 +
 .../metadata/CreateTimeSeriesStatement.java        |   1 +
 .../statement/metadata/ShowDevicesStatement.java   |   6 +
 .../mpp/sql/statement/metadata/ShowStatement.java  |  10 +
 .../metadata/ShowTimeSeriesStatement.java          |   6 +
 .../db/protocol/influxdb/handler/QueryHandler.java |   6 +-
 .../apache/iotdb/db/qp/constant/SQLConstant.java   |  26 +
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  | 189 ++++-
 .../org/apache/iotdb/db/qp/logical/Operator.java   |  14 +-
 .../db/qp/logical/sys/CreatePipeOperator.java      |  69 ++
 .../db/qp/logical/sys/CreatePipeSinkOperator.java  |  61 ++
 .../iotdb/db/qp/logical/sys/DropPipeOperator.java  |  24 +-
 .../db/qp/logical/sys/DropPipeSinkOperator.java    |  23 +-
 .../iotdb/db/qp/logical/sys/ShowPipeOperator.java  |  27 +-
 .../db/qp/logical/sys/ShowPipeServerOperator.java  |  20 +-
 .../db/qp/logical/sys/ShowPipeSinkOperator.java    |  27 +-
 .../qp/logical/sys/ShowPipeSinkTypeOperator.java   |  20 +-
 .../iotdb/db/qp/logical/sys/StartPipeOperator.java |  24 +-
 .../db/qp/logical/sys/StartPipeServerOperator.java |  18 +-
 .../iotdb/db/qp/logical/sys/StopPipeOperator.java  |  24 +-
 .../db/qp/logical/sys/StopPipeServerOperator.java  |  18 +-
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |  10 +
 .../qp/physical/crud/InsertMultiTabletsPlan.java   |   2 +-
 .../iotdb/db/qp/physical/crud/InsertRowsPlan.java  |   2 +-
 .../physical/sys/CreateAlignedTimeSeriesPlan.java  |  15 +-
 .../qp/physical/sys/CreateMultiTimeSeriesPlan.java |   2 +-
 .../iotdb/db/qp/physical/sys/CreatePipePlan.java   | 110 +++
 .../db/qp/physical/sys/CreatePipeSinkPlan.java     |  98 +++
 .../db/qp/physical/sys/CreateTimeSeriesPlan.java   |  10 +
 .../db/qp/physical/sys/DeleteTimeSeriesPlan.java   |   2 +-
 .../iotdb/db/qp/physical/sys/DropPipeSinkPlan.java |  29 +-
 .../iotdb/db/qp/physical/sys/OperatePipePlan.java  |  29 +-
 .../physical/sys/ShowPipePlan.java}                |  22 +-
 .../physical/sys/ShowPipeServerPlan.java}          |  11 +-
 .../physical/sys/ShowPipeSinkPlan.java}            |  23 +-
 .../physical/sys/ShowPipeSinkTypePlan.java}        |  11 +-
 .../apache/iotdb/db/qp/physical/sys/ShowPlan.java  |   6 +-
 .../db/qp/physical/sys/StartPipeServerPlan.java    |  56 ++
 .../db/qp/physical/sys/StopPipeServerPlan.java     |  56 ++
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    | 190 ++++-
 .../apache/iotdb/db/qp/utils/DatetimeUtils.java    |  15 +
 .../java/org/apache/iotdb/db/service/DataNode.java |  67 +-
 .../iotdb/db/service/InternalServiceImpl.java      |  32 +-
 .../java/org/apache/iotdb/db/service/IoTDB.java    |  11 +-
 .../thrift/impl/DataNodeManagementServiceImpl.java |  68 +-
 .../service/thrift/impl/InfluxDBServiceImpl.java   |  42 +-
 .../apache/iotdb/db/sync/conf/SyncConstant.java    |  85 +--
 .../apache/iotdb/db/sync/conf/SyncPathUtil.java    | 100 +++
 .../iotdb/db/sync/conf/SyncSenderConfig.java       | 127 ----
 .../iotdb/db/sync/conf/SyncSenderDescriptor.java   | 124 ----
 .../org/apache/iotdb/db/sync/package-info.java     |  38 -
 .../iotdb/db/sync/pipedata/DeletionPipeData.java   |  85 +++
 .../apache/iotdb/db/sync/pipedata/PipeData.java    |  96 +++
 .../iotdb/db/sync/pipedata/SchemaPipeData.java     |  98 +++
 .../iotdb/db/sync/pipedata/TsFilePipeData.java     | 177 +++++
 .../sync/pipedata/queue/BufferedPipeDataQueue.java | 438 +++++++++++
 .../queue/PipeDataQueue.java}                      |  21 +-
 .../sync/pipedata/queue/PipeDataQueueFactory.java  |  57 ++
 .../iotdb/db/sync/receiver/ReceiverService.java    | 278 +++++++
 .../db/sync/receiver/collector/Collector.java      | 170 +++++
 .../db/sync/receiver/load/DeletionLoader.java      |  48 ++
 .../iotdb/db/sync/receiver/load/FileLoader.java    | 203 ------
 .../db/sync/receiver/load/FileLoaderManager.java   | 213 ------
 .../iotdb/db/sync/receiver/load/IFileLoader.java   |  50 --
 .../iotdb/db/sync/receiver/load/ILoadLogger.java   |  57 --
 .../receiver/load/{LoadType.java => ILoader.java}  |  12 +-
 .../iotdb/db/sync/receiver/load/LoadLogger.java    |  72 --
 .../iotdb/db/sync/receiver/load/SchemaLoader.java  |  60 ++
 .../iotdb/db/sync/receiver/load/TsFileLoader.java  |  67 ++
 .../iotdb/db/sync/receiver/manager/PipeInfo.java   |  85 +++
 .../db/sync/receiver/manager/PipeMessage.java      |  76 ++
 .../db/sync/receiver/manager/ReceiverManager.java  | 216 ++++++
 .../sync/receiver/recover/ISyncReceiverLogger.java |  50 --
 .../receiver/recover/SyncReceiverLogAnalyzer.java  | 154 ----
 .../sync/receiver/recover/SyncReceiverLogger.java  |  72 --
 .../db/sync/receiver/recovery/ReceiverLog.java     | 127 ++++
 .../receiver/recovery/ReceiverLogAnalyzer.java     | 157 ++++
 .../db/sync/receiver/transfer/SyncServiceImpl.java | 370 ----------
 .../db/sync/sender/manage/ISyncFileManager.java    |  72 --
 .../db/sync/sender/manage/SyncFileManager.java     | 291 --------
 .../db/sync/sender/manager/SchemaSyncManager.java  | 163 +++++
 .../db/sync/sender/manager/TsFileSyncManager.java  | 118 +++
 .../iotdb/db/sync/sender/pipe/IoTDBPipeSink.java   |  97 +++
 .../org/apache/iotdb/db/sync/sender/pipe/Pipe.java | 106 +++
 .../sender/pipe/PipeSink.java}                     |  41 +-
 .../iotdb/db/sync/sender/pipe/TsFilePipe.java      | 334 +++++++++
 .../sender/recover/ISyncSenderLogAnalyzer.java     |  47 --
 .../db/sync/sender/recover/ISyncSenderLogger.java  |  67 --
 .../sync/sender/recover/SyncSenderLogAnalyzer.java | 128 ----
 .../db/sync/sender/recover/SyncSenderLogger.java   |  72 --
 .../db/sync/sender/recovery/SenderLogAnalyzer.java | 172 +++++
 .../db/sync/sender/recovery/SenderLogger.java      | 141 ++++
 .../db/sync/sender/recovery/TsFilePipeLogger.java  | 150 ++++
 .../db/sync/sender/service/SenderService.java      | 417 +++++++++++
 .../db/sync/sender/service/TransportHandler.java   | 127 ++++
 .../iotdb/db/sync/sender/transfer/ISyncClient.java |  95 ---
 .../iotdb/db/sync/sender/transfer/SyncClient.java  | 810 ---------------------
 .../client/ITransportClient.java}                  |  19 +-
 .../db/sync/transport/client/TransportClient.java  | 527 ++++++++++++++
 .../conf/TransportConfig.java}                     |  26 +-
 .../transport/conf/TransportConstant.java}         |  24 +-
 .../server/TransportServerManager.java}            |  80 +-
 .../server/TransportServerManagerMBean.java}       |   6 +-
 .../server/TransportServerThriftHandler.java}      |  30 +-
 .../transport/server/TransportServiceImpl.java     | 385 ++++++++++
 .../org/apache/iotdb/db/utils/DataTypeUtils.java   |   8 +-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   8 +
 .../org/apache/iotdb/db/utils/FileLoaderUtils.java |   2 +-
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   | 181 ++++-
 .../org/apache/iotdb/db/utils/StatusUtils.java     |  46 --
 .../java/org/apache/iotdb/db/utils/SyncUtils.java  | 109 ---
 .../iotdb/db/wal/recover/WALNodeRecoverTask.java   |   4 +-
 .../db/mpp/operator/SchemaScanOperatorTest.java    | 242 ++++++
 .../db/mpp/sql/plan/DistributionPlannerTest.java   | 165 ++++-
 .../db/mpp/sql/plan/FragmentInstanceSerdeTest.java |  16 +-
 .../iotdb/db/mpp/sql/plan/LogicalPlannerTest.java  |  85 +++
 .../sql/plan/node/PlanNodeDeserializeHelper.java   |   3 +-
 .../metadata/read/ShowDevicesNodeSerdeTest.java    |   3 +-
 .../sql/plan/node/process/OffsetNodeSerdeTest.java |   6 +-
 .../plan/node/sink/FragmentSinkNodeSerdeTest.java  |   3 +-
 .../source/SeriesAggregateScanNodeSerdeTest.java   |   2 +-
 .../iotdb/db/qp/physical/PhysicalPlanTest.java     |  28 +
 .../iotdb/db/service/InternalServiceImplTest.java  | 167 +++++
 .../sync/pipedata/BufferedPipeDataQueueTest.java   | 542 ++++++++++++++
 .../iotdb/db/sync/pipedata/PipeDataTest.java       |  86 +++
 .../db/sync/receiver/load/FileLoaderTest.java      | 405 -----------
 .../sync/receiver/manager/ReceiverManagerTest.java |  98 +++
 .../recover/SyncReceiverLogAnalyzerTest.java       | 229 ------
 .../receiver/recover/SyncReceiverLoggerTest.java   | 115 ---
 .../receiver/recovery/ReceiverLogAnalyzerTest.java | 124 ++++
 .../db/sync/sender/manage/SyncFileManagerTest.java | 350 ---------
 .../sender/recover/SyncSenderLogAnalyzerTest.java  | 201 -----
 .../sync/sender/recover/SyncSenderLoggerTest.java  | 112 ---
 .../db/sync/sender/transfer/SyncClientTest.java    | 161 ----
 .../db/sync/transport/TransportServiceTest.java    | 205 ++++++
 server/src/test/resources/iotdb-engine.properties  |   4 +-
 server/src/test/resources/logback-test.xml         |   2 +-
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |  15 +-
 .../iotdb/rpc/StatementExecutionException.java     |   4 +-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   4 +-
 .../src/main/thrift/confignode.thrift              |  67 +-
 thrift-influxdb/src/main/thrift/influxdb.thrift    |  36 +-
 thrift-sync/src/main/thrift/sync.thrift            |  51 --
 thrift-sync/src/main/thrift/transport.thrift       |  90 +++
 thrift/src/main/thrift/common.thrift               |  43 +-
 300 files changed, 14600 insertions(+), 6900 deletions(-)
 create mode 100644 integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverCollectorIT.java
 create mode 100644 integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
 create mode 100644 integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverLoaderIT.java
 create mode 100644 integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncSenderIT.java
 create mode 100644 integration/src/test/java/org/apache/iotdb/db/integration/sync/SyncTestUtil.java
 create mode 100644 integration/src/test/java/org/apache/iotdb/db/integration/sync/TransportClientMock.java
 copy {cluster/src/main/java/org/apache/iotdb/cluster => node-commons/src/main/java/org/apache/iotdb/commons}/utils/StatusUtils.java (90%)
 delete mode 100644 server/src/assembly/resources/conf/iotdb-sync-client.properties
 delete mode 100755 server/src/assembly/resources/tools/start-sync-client.bat
 delete mode 100755 server/src/assembly/resources/tools/start-sync-client.sh
 delete mode 100755 server/src/assembly/resources/tools/stop-sync-client.bat
 delete mode 100755 server/src/assembly/resources/tools/stop-sync-client.sh
 create mode 100644 server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
 copy server/src/main/java/org/apache/iotdb/db/{sync/receiver/load/LoadType.java => exception/sync/PipeDataLoadBearableException.java} (80%)
 copy server/src/main/java/org/apache/iotdb/db/{sync/receiver/load/LoadType.java => exception/sync/PipeDataLoadException.java} (81%)
 copy server/src/main/java/org/apache/iotdb/db/{sync/receiver/load/LoadType.java => exception/sync/PipeDataLoadUnbearableException.java} (79%)
 copy server/src/main/java/org/apache/iotdb/db/{mpp/execution/SchemaDriverContext.java => exception/sync/PipeException.java} (64%)
 copy server/src/main/java/org/apache/iotdb/db/{sync/receiver/SyncServerManagerMBean.java => exception/sync/PipeServerException.java} (66%)
 rename server/src/main/java/org/apache/iotdb/db/exception/{SyncDeviceOwnerConflictException.java => sync/PipeSinkException.java} (59%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/Executor/SchemaVisitor.java
 rename server/src/main/java/org/apache/iotdb/db/metadata/{LocalConfigManager.java => LocalConfigNode.java} (86%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/DevicesSchemaScanOperator.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaMergeOperator.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/SchemaScanOperator.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/operator/schema/TimeSeriesSchemaScanOperator.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/DevicesSchemaScanNode.java
 rename server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/{write/InsertMultiTabletNode.java => metedata/read/SchemaMergeNode.java} (54%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/SchemaScanNode.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/read/TimeSeriesSchemaScanNode.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreatePipeOperator.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreatePipeSinkOperator.java
 copy cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java => server/src/main/java/org/apache/iotdb/db/qp/logical/sys/DropPipeOperator.java (63%)
 copy cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java => server/src/main/java/org/apache/iotdb/db/qp/logical/sys/DropPipeSinkOperator.java (62%)
 copy cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java => server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeOperator.java (61%)
 copy cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java => server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeServerOperator.java (69%)
 copy cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java => server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeSinkOperator.java (59%)
 copy cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java => server/src/main/java/org/apache/iotdb/db/qp/logical/sys/ShowPipeSinkTypeOperator.java (65%)
 copy cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java => server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StartPipeOperator.java (62%)
 copy cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java => server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StartPipeServerOperator.java (68%)
 copy cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java => server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StopPipeOperator.java (63%)
 copy cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java => server/src/main/java/org/apache/iotdb/db/qp/logical/sys/StopPipeServerOperator.java (68%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipePlan.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreatePipeSinkPlan.java
 copy cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java => server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DropPipeSinkPlan.java (62%)
 copy cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanner.java => server/src/main/java/org/apache/iotdb/db/qp/physical/sys/OperatePipePlan.java (64%)
 copy server/src/main/java/org/apache/iotdb/db/{sync/receiver/recover/ISyncReceiverLogAnalyzer.java => qp/physical/sys/ShowPipePlan.java} (68%)
 copy server/src/main/java/org/apache/iotdb/db/{sync/receiver/load/LoadType.java => qp/physical/sys/ShowPipeServerPlan.java} (82%)
 copy server/src/main/java/org/apache/iotdb/db/{sync/receiver/SyncServerManagerMBean.java => qp/physical/sys/ShowPipeSinkPlan.java} (71%)
 copy server/src/main/java/org/apache/iotdb/db/{sync/receiver/load/LoadType.java => qp/physical/sys/ShowPipeSinkTypePlan.java} (81%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StartPipeServerPlan.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/qp/physical/sys/StopPipeServerPlan.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/conf/SyncPathUtil.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderConfig.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/sync/conf/SyncSenderDescriptor.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/sync/package-info.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/pipedata/DeletionPipeData.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/pipedata/PipeData.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/pipedata/SchemaPipeData.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/pipedata/TsFilePipeData.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java
 copy server/src/main/java/org/apache/iotdb/db/sync/{receiver/SyncServerManagerMBean.java => pipedata/queue/PipeDataQueue.java} (66%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/PipeDataQueueFactory.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/receiver/ReceiverService.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/receiver/collector/Collector.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/receiver/load/DeletionLoader.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/sync/receiver/load/IFileLoader.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/sync/receiver/load/ILoadLogger.java
 rename server/src/main/java/org/apache/iotdb/db/sync/receiver/load/{LoadType.java => ILoader.java} (77%)
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/sync/receiver/load/LoadLogger.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/receiver/load/SchemaLoader.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/receiver/load/TsFileLoader.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/PipeInfo.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/PipeMessage.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManager.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/ISyncReceiverLogger.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzer.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogger.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLog.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzer.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/sender/manager/SchemaSyncManager.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/sender/manager/TsFileSyncManager.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/IoTDBPipeSink.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java
 copy server/src/main/java/org/apache/iotdb/db/{mpp/sql/statement/metadata/ShowStatement.java => sync/sender/pipe/PipeSink.java} (55%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogger.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogger.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/SenderLogAnalyzer.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/SenderLogger.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/sender/recovery/TsFilePipeLogger.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/sender/service/TransportHandler.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/ISyncClient.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
 copy server/src/main/java/org/apache/iotdb/db/sync/{receiver/recover/ISyncReceiverLogAnalyzer.java => transport/client/ITransportClient.java} (68%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClient.java
 rename server/src/main/java/org/apache/iotdb/db/sync/{receiver/recover/ISyncReceiverLogAnalyzer.java => transport/conf/TransportConfig.java} (53%)
 copy server/src/main/java/org/apache/iotdb/db/{mpp/execution/SchemaDriverContext.java => sync/transport/conf/TransportConstant.java} (62%)
 rename server/src/main/java/org/apache/iotdb/db/sync/{receiver/SyncServerManager.java => transport/server/TransportServerManager.java} (58%)
 rename server/src/main/java/org/apache/iotdb/db/sync/{receiver/SyncServerManagerMBean.java => transport/server/TransportServerManagerMBean.java} (91%)
 rename server/src/main/java/org/apache/iotdb/db/sync/{receiver/SyncServerThriftHandler.java => transport/server/TransportServerThriftHandler.java} (66%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/utils/StatusUtils.java
 delete mode 100644 server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
 create mode 100644 server/src/test/java/org/apache/iotdb/db/mpp/operator/SchemaScanOperatorTest.java
 create mode 100644 server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
 create mode 100644 server/src/test/java/org/apache/iotdb/db/sync/pipedata/BufferedPipeDataQueueTest.java
 create mode 100644 server/src/test/java/org/apache/iotdb/db/sync/pipedata/PipeDataTest.java
 delete mode 100644 server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java
 create mode 100644 server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManagerTest.java
 delete mode 100644 server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java
 delete mode 100644 server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLoggerTest.java
 create mode 100644 server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzerTest.java
 delete mode 100644 server/src/test/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManagerTest.java
 delete mode 100644 server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java
 delete mode 100644 server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLoggerTest.java
 delete mode 100644 server/src/test/java/org/apache/iotdb/db/sync/sender/transfer/SyncClientTest.java
 create mode 100644 server/src/test/java/org/apache/iotdb/db/sync/transport/TransportServiceTest.java
 delete mode 100755 thrift-sync/src/main/thrift/sync.thrift
 create mode 100644 thrift-sync/src/main/thrift/transport.thrift


[iotdb] 01/03: Merge branch 'xingtanzjr/modification_pr' into xingtanzjr/write_instance_parallel

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/write_instance_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d32fc0713cd568c35cf7e43edcaf1eb8537a94f0
Merge: 850198002e c5192db130
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 13 21:00:22 2022 +0800

    Merge branch 'xingtanzjr/modification_pr' into xingtanzjr/write_instance_parallel

 .github/workflows/influxdb-protocol.yml            |   2 +-
 .github/workflows/main-unix.yml                    |   3 +-
 .github/workflows/main-win.yml                     |   3 +-
 .../apache/iotdb/db/qp/sql/InfluxDBSqlParser.g4    |   2 +-
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  67 +-
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |  24 +
 .../org/apache/iotdb/cluster/ClusterIoTDB.java     |   1 -
 .../apache/iotdb/cluster/query/ClusterPlanner.java |   1 +
 .../apache/iotdb/cluster/utils/StatusUtils.java    |   6 +-
 confignode/pom.xml                                 |   5 +
 .../resources/conf/iotdb-confignode.properties     |  13 +-
 .../iotdb/confignode/conf/ConfigNodeConf.java      |  14 +
 .../confignode/conf/ConfigNodeDescriptor.java      |   4 +
 .../consensus/response/DataPartitionDataSet.java   |  22 +-
 .../consensus/response/SchemaPartitionDataSet.java |  77 +-
 .../response/StorageGroupSchemaDataSet.java        |  12 +-
 .../iotdb/confignode/manager/ConfigManager.java    | 114 ++-
 .../iotdb/confignode/manager/ConsensusManager.java |  36 +-
 .../apache/iotdb/confignode/manager/Manager.java   |  24 +-
 .../iotdb/confignode/manager/PartitionManager.java | 130 ++--
 .../iotdb/confignode/manager/RegionManager.java    |   6 +-
 .../confignode/partition/StorageGroupSchema.java   |   9 +
 .../persistence/PartitionInfoPersistence.java      |  60 +-
 .../persistence/RegionInfoPersistence.java         |  29 +-
 .../iotdb/confignode/physical/PhysicalPlan.java    |   2 +-
 .../physical/crud/CreateDataPartitionPlan.java     |   5 +-
 .../physical/crud/CreateRegionsPlan.java           |   5 +-
 .../physical/crud/CreateSchemaPartitionPlan.java   |  70 +-
 .../crud/GetOrCreateDataPartitionPlan.java         |   6 +-
 .../crud/GetOrCreateSchemaPartitionPlan.java       |  86 ++-
 .../confignode/service/executor/PlanExecutor.java  |   4 +-
 .../server/ConfigNodeRPCServerProcessor.java       |  64 +-
 .../confignode/consensus/RatisConsensusDemo.java   |  10 +-
 .../hash/DeviceGroupHashExecutorManualTest.java    |   5 +-
 .../physical/SerializeDeserializeUT.java           |  45 +-
 .../server/ConfigNodeRPCServerProcessorTest.java   | 284 +++++---
 .../iotdb/consensus/ratis/RequestMessage.java      |   1 +
 distribution/pom.xml                               |   6 +
 docs/UserGuide/Maintenance-Tools/Sync-Tool.md      | 396 +++++++---
 docs/zh/UserGuide/Maintenance-Tools/Sync-Tool.md   | 493 ++++++++++---
 influxdb-protocol/pom.xml                          |  38 +-
 .../iotdb/influxdb/protocol/dto/SessionPoint.java  |   6 +-
 .../protocol/impl/IoTDBInfluxDBService.java        |  12 +-
 .../iotdb/influxdb/session/InfluxDBSession.java    |  42 +-
 .../sync/IoTDBSyncReceiverCollectorIT.java         | 513 +++++++++++++
 .../db/integration/sync/IoTDBSyncReceiverIT.java   | 357 +++++++++
 .../sync/IoTDBSyncReceiverLoaderIT.java            | 210 ++++++
 .../db/integration/sync/IoTDBSyncSenderIT.java     | 304 ++++++++
 .../iotdb/db/integration/sync/SyncTestUtil.java    | 203 ++++++
 .../db/integration/sync/TransportClientMock.java   |  69 ++
 .../src/test/resources/iotdb-engine.properties     |   3 +-
 integration/src/test/resources/logback-test.xml    |   2 +-
 .../iotdb/commons/cluster/DataNodeLocation.java    |   6 +-
 .../org/apache/iotdb/commons/cluster/Endpoint.java |   6 +-
 .../iotdb/commons/concurrent/ThreadName.java       |   6 +-
 .../apache/iotdb/commons/conf/IoTDBConstant.java   |  15 +
 .../iotdb/commons/consensus/ConsensusGroupId.java  |   5 +-
 .../iotdb/commons/partition/DataPartition.java     |   4 +-
 .../iotdb/commons/partition/RegionReplicaSet.java  |  42 +-
 .../iotdb/commons/partition/SchemaPartition.java   | 120 ++-
 .../apache/iotdb/commons/service/ServiceType.java  |   2 +
 .../apache/iotdb/commons/utils/CommonUtils.java    |   4 +-
 .../apache/iotdb/commons}/utils/StatusUtils.java   |  49 +-
 pom.xml                                            |   1 +
 server/pom.xml                                     |   5 +
 .../resources/conf/iotdb-engine.properties         |  19 +-
 .../resources/conf/iotdb-sync-client.properties    |  35 -
 .../assembly/resources/tools/start-sync-client.bat |  71 --
 .../assembly/resources/tools/start-sync-client.sh  |  54 --
 .../assembly/resources/tools/stop-sync-client.bat  |  23 -
 .../assembly/resources/tools/stop-sync-client.sh   |  30 -
 .../apache/iotdb/db/client/ConfigNodeClient.java   | 309 ++++++++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  81 ++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  43 +-
 .../consensus/statemachine/BaseStateMachine.java   |   2 +
 .../statemachine/DataRegionStateMachine.java       |  37 +-
 .../statemachine/SchemaRegionStateMachine.java     |  17 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  16 +-
 .../apache/iotdb/db/engine/StorageEngineV2.java    |  62 +-
 .../iotdb/db/engine/modification/Deletion.java     |  23 +
 .../iotdb/db/engine/storagegroup/DataRegion.java   | 183 +++--
 .../db/engine/storagegroup/TsFileManager.java      |  38 +
 .../db/engine/storagegroup/TsFileProcessor.java    |  23 +
 .../dataregion/StorageGroupManager.java            |  10 +
 .../sync/PipeDataLoadBearableException.java}       |  10 +-
 .../sync/PipeDataLoadException.java}               |  10 +-
 .../sync/PipeDataLoadUnbearableException.java}     |  10 +-
 .../sync/PipeException.java}                       |  20 +-
 .../sync/PipeServerException.java}                 |  22 +-
 .../PipeSinkException.java}                        |  24 +-
 .../iotdb/db/metadata/Executor/SchemaVisitor.java  |  96 +++
 ...ocalConfigManager.java => LocalConfigNode.java} | 170 ++---
 .../db/metadata/LocalSchemaPartitionTable.java     |   4 +-
 .../iotdb/db/metadata/LocalSchemaProcessor.java    |  28 +-
 .../iotdb/db/metadata/mtree/MTreeBelowSG.java      |   1 +
 .../traverser/collector/MeasurementCollector.java  |   4 +
 .../db/metadata/schemaregion/SchemaEngine.java     |  27 +-
 .../db/metadata/schemaregion/SchemaRegion.java     |  20 +
 .../storagegroup/IStorageGroupSchemaManager.java   |   8 +
 .../storagegroup/StorageGroupSchemaManager.java    |  28 +
 .../iotdb/db/mpp/buffer/DataBlockManager.java      |   7 +-
 .../iotdb/db/mpp/buffer/IDataBlockManager.java     |   5 +
 .../apache/iotdb/db/mpp/common/PlanFragmentId.java |  10 +-
 .../org/apache/iotdb/db/mpp/common/QueryId.java    |   8 +-
 .../apache/iotdb/db/mpp/execution/DataDriver.java  |   3 +-
 .../db/mpp/execution/FragmentInstanceManager.java  |   4 +-
 .../iotdb/db/mpp/execution/QueryExecution.java     |  22 +-
 .../db/mpp/execution/SchemaDriverContext.java      |   8 +-
 .../scheduler/SimpleFragInstanceDispatcher.java    |   4 +-
 .../operator/schema/DevicesSchemaScanOperator.java |  97 +++
 .../mpp/operator/schema/SchemaMergeOperator.java   |  79 ++
 .../db/mpp/operator/schema/SchemaScanOperator.java | 107 +++
 .../schema/TimeSeriesSchemaScanOperator.java       | 142 ++++
 .../db/mpp/operator/source/SeriesScanOperator.java |   2 +-
 .../apache/iotdb/db/mpp/sql/analyze/Analyzer.java  |  42 +-
 .../db/mpp/sql/analyze/ClusterSchemaFetcher.java   |   7 +-
 .../db/mpp/sql/analyze/FakeSchemaFetcherImpl.java  |   7 +-
 .../iotdb/db/mpp/sql/analyze/ISchemaFetcher.java   |   7 +-
 .../mpp/sql/analyze/StandaloneSchemaFetcher.java   |   7 +-
 .../apache/iotdb/db/mpp/sql/parser/ASTVisitor.java |   5 +
 .../db/mpp/sql/planner/DistributionPlanner.java    | 133 +++-
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  |  65 +-
 .../iotdb/db/mpp/sql/planner/LogicalPlanner.java   |  47 ++
 .../iotdb/db/mpp/sql/planner/QueryPlanBuilder.java |  42 ++
 .../db/mpp/sql/planner/plan/FragmentInstance.java  |  66 +-
 .../db/mpp/sql/planner/plan/PlanFragment.java      |  10 +-
 .../plan/SimpleFragmentParallelPlanner.java        |   6 +-
 .../planner/plan/WriteFragmentParallelPlanner.java |  17 +-
 .../db/mpp/sql/planner/plan/node/PlanNode.java     |   1 +
 .../db/mpp/sql/planner/plan/node/PlanNodeType.java |  15 +-
 .../db/mpp/sql/planner/plan/node/PlanNodeUtil.java |   3 +
 .../db/mpp/sql/planner/plan/node/PlanVisitor.java  |  30 +
 .../node/metedata/read/DevicesSchemaScanNode.java  | 108 +++
 .../read/SchemaMergeNode.java}                     |  44 +-
 .../plan/node/metedata/read/SchemaScanNode.java    | 119 +++
 .../metedata/read/TimeSeriesSchemaScanNode.java    | 145 ++++
 .../node/metedata/write/AlterTimeSeriesNode.java   |  37 +
 .../write/CreateAlignedTimeSeriesNode.java         |  13 +
 .../node/metedata/write/CreateTimeSeriesNode.java  | 127 +++-
 .../planner/plan/node/process/ExchangeNode.java    |   1 +
 .../planner/plan/node/sink/FragmentSinkNode.java   |   1 +
 .../plan/node/source/SeriesAggregateScanNode.java  |  10 +-
 .../planner/plan/node/source/SeriesScanNode.java   |  15 +-
 .../sql/planner/plan/node/source/SourceNode.java   |   4 +-
 .../plan/node/write/InsertMultiTabletsNode.java    |   7 +-
 .../sql/planner/plan/node/write/InsertNode.java    |   1 +
 .../planner/plan/node/write/InsertRowsNode.java    |   5 +
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |   5 +
 .../db/mpp/sql/statement/StatementVisitor.java     |  10 +
 .../crud/InsertMultiTabletsStatement.java          |   8 +
 .../crud/InsertRowsOfOneDeviceStatement.java       |   1 +
 .../sql/statement/crud/InsertRowsStatement.java    |   8 +
 .../db/mpp/sql/statement/crud/InsertStatement.java |   1 +
 .../db/mpp/sql/statement/crud/QueryStatement.java  |   1 +
 .../metadata/AlterTimeSeriesStatement.java         |   1 +
 .../metadata/CreateAlignedTimeSeriesStatement.java |   1 +
 .../metadata/CreateTimeSeriesStatement.java        |   1 +
 .../statement/metadata/ShowDevicesStatement.java   |   6 +
 .../mpp/sql/statement/metadata/ShowStatement.java  |  10 +
 .../metadata/ShowTimeSeriesStatement.java          |   6 +
 .../db/protocol/influxdb/handler/QueryHandler.java |   6 +-
 .../apache/iotdb/db/qp/constant/SQLConstant.java   |  26 +
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  | 189 ++++-
 .../org/apache/iotdb/db/qp/logical/Operator.java   |  14 +-
 .../db/qp/logical/sys/CreatePipeOperator.java      |  69 ++
 .../db/qp/logical/sys/CreatePipeSinkOperator.java  |  61 ++
 .../iotdb/db/qp/logical/sys/DropPipeOperator.java  |  24 +-
 .../db/qp/logical/sys/DropPipeSinkOperator.java    |  23 +-
 .../iotdb/db/qp/logical/sys/ShowPipeOperator.java  |  27 +-
 .../db/qp/logical/sys/ShowPipeServerOperator.java  |  20 +-
 .../db/qp/logical/sys/ShowPipeSinkOperator.java    |  27 +-
 .../qp/logical/sys/ShowPipeSinkTypeOperator.java   |  20 +-
 .../iotdb/db/qp/logical/sys/StartPipeOperator.java |  24 +-
 .../db/qp/logical/sys/StartPipeServerOperator.java |  18 +-
 .../iotdb/db/qp/logical/sys/StopPipeOperator.java  |  24 +-
 .../db/qp/logical/sys/StopPipeServerOperator.java  |  18 +-
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |  10 +
 .../qp/physical/crud/InsertMultiTabletsPlan.java   |   2 +-
 .../iotdb/db/qp/physical/crud/InsertRowsPlan.java  |   2 +-
 .../physical/sys/CreateAlignedTimeSeriesPlan.java  |  15 +-
 .../qp/physical/sys/CreateMultiTimeSeriesPlan.java |   2 +-
 .../iotdb/db/qp/physical/sys/CreatePipePlan.java   | 110 +++
 .../db/qp/physical/sys/CreatePipeSinkPlan.java     |  98 +++
 .../db/qp/physical/sys/CreateTimeSeriesPlan.java   |  10 +
 .../db/qp/physical/sys/DeleteTimeSeriesPlan.java   |   2 +-
 .../iotdb/db/qp/physical/sys/DropPipeSinkPlan.java |  29 +-
 .../iotdb/db/qp/physical/sys/OperatePipePlan.java  |  29 +-
 .../physical/sys/ShowPipePlan.java}                |  22 +-
 .../physical/sys/ShowPipeServerPlan.java}          |  11 +-
 .../physical/sys/ShowPipeSinkPlan.java}            |  23 +-
 .../physical/sys/ShowPipeSinkTypePlan.java}        |  11 +-
 .../apache/iotdb/db/qp/physical/sys/ShowPlan.java  |   6 +-
 .../db/qp/physical/sys/StartPipeServerPlan.java    |  56 ++
 .../db/qp/physical/sys/StopPipeServerPlan.java     |  56 ++
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    | 190 ++++-
 .../apache/iotdb/db/qp/utils/DatetimeUtils.java    |  15 +
 .../java/org/apache/iotdb/db/service/DataNode.java |  67 +-
 .../iotdb/db/service/InternalServiceImpl.java      |  32 +-
 .../java/org/apache/iotdb/db/service/IoTDB.java    |  11 +-
 .../thrift/impl/DataNodeManagementServiceImpl.java |  68 +-
 .../service/thrift/impl/InfluxDBServiceImpl.java   |  42 +-
 .../apache/iotdb/db/sync/conf/SyncConstant.java    |  85 +--
 .../apache/iotdb/db/sync/conf/SyncPathUtil.java    | 100 +++
 .../iotdb/db/sync/conf/SyncSenderConfig.java       | 127 ----
 .../iotdb/db/sync/conf/SyncSenderDescriptor.java   | 124 ----
 .../org/apache/iotdb/db/sync/package-info.java     |  38 -
 .../iotdb/db/sync/pipedata/DeletionPipeData.java   |  85 +++
 .../apache/iotdb/db/sync/pipedata/PipeData.java    |  96 +++
 .../iotdb/db/sync/pipedata/SchemaPipeData.java     |  98 +++
 .../iotdb/db/sync/pipedata/TsFilePipeData.java     | 177 +++++
 .../sync/pipedata/queue/BufferedPipeDataQueue.java | 438 +++++++++++
 .../queue/PipeDataQueue.java}                      |  21 +-
 .../sync/pipedata/queue/PipeDataQueueFactory.java  |  57 ++
 .../iotdb/db/sync/receiver/ReceiverService.java    | 278 +++++++
 .../db/sync/receiver/collector/Collector.java      | 170 +++++
 .../db/sync/receiver/load/DeletionLoader.java      |  48 ++
 .../iotdb/db/sync/receiver/load/FileLoader.java    | 203 ------
 .../db/sync/receiver/load/FileLoaderManager.java   | 213 ------
 .../iotdb/db/sync/receiver/load/IFileLoader.java   |  50 --
 .../iotdb/db/sync/receiver/load/ILoadLogger.java   |  57 --
 .../receiver/load/{LoadType.java => ILoader.java}  |  12 +-
 .../iotdb/db/sync/receiver/load/LoadLogger.java    |  72 --
 .../iotdb/db/sync/receiver/load/SchemaLoader.java  |  60 ++
 .../iotdb/db/sync/receiver/load/TsFileLoader.java  |  67 ++
 .../iotdb/db/sync/receiver/manager/PipeInfo.java   |  85 +++
 .../db/sync/receiver/manager/PipeMessage.java      |  76 ++
 .../db/sync/receiver/manager/ReceiverManager.java  | 216 ++++++
 .../sync/receiver/recover/ISyncReceiverLogger.java |  50 --
 .../receiver/recover/SyncReceiverLogAnalyzer.java  | 154 ----
 .../sync/receiver/recover/SyncReceiverLogger.java  |  72 --
 .../db/sync/receiver/recovery/ReceiverLog.java     | 127 ++++
 .../receiver/recovery/ReceiverLogAnalyzer.java     | 157 ++++
 .../db/sync/receiver/transfer/SyncServiceImpl.java | 370 ----------
 .../db/sync/sender/manage/ISyncFileManager.java    |  72 --
 .../db/sync/sender/manage/SyncFileManager.java     | 291 --------
 .../db/sync/sender/manager/SchemaSyncManager.java  | 163 +++++
 .../db/sync/sender/manager/TsFileSyncManager.java  | 118 +++
 .../iotdb/db/sync/sender/pipe/IoTDBPipeSink.java   |  97 +++
 .../org/apache/iotdb/db/sync/sender/pipe/Pipe.java | 106 +++
 .../sender/pipe/PipeSink.java}                     |  41 +-
 .../iotdb/db/sync/sender/pipe/TsFilePipe.java      | 334 +++++++++
 .../sender/recover/ISyncSenderLogAnalyzer.java     |  47 --
 .../db/sync/sender/recover/ISyncSenderLogger.java  |  67 --
 .../sync/sender/recover/SyncSenderLogAnalyzer.java | 128 ----
 .../db/sync/sender/recover/SyncSenderLogger.java   |  72 --
 .../db/sync/sender/recovery/SenderLogAnalyzer.java | 172 +++++
 .../db/sync/sender/recovery/SenderLogger.java      | 141 ++++
 .../db/sync/sender/recovery/TsFilePipeLogger.java  | 150 ++++
 .../db/sync/sender/service/SenderService.java      | 417 +++++++++++
 .../db/sync/sender/service/TransportHandler.java   | 127 ++++
 .../iotdb/db/sync/sender/transfer/ISyncClient.java |  95 ---
 .../iotdb/db/sync/sender/transfer/SyncClient.java  | 810 ---------------------
 .../client/ITransportClient.java}                  |  19 +-
 .../db/sync/transport/client/TransportClient.java  | 527 ++++++++++++++
 .../conf/TransportConfig.java}                     |  26 +-
 .../transport/conf/TransportConstant.java}         |  24 +-
 .../server/TransportServerManager.java}            |  80 +-
 .../server/TransportServerManagerMBean.java}       |   6 +-
 .../server/TransportServerThriftHandler.java}      |  30 +-
 .../transport/server/TransportServiceImpl.java     | 385 ++++++++++
 .../org/apache/iotdb/db/utils/DataTypeUtils.java   |   8 +-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   8 +
 .../org/apache/iotdb/db/utils/FileLoaderUtils.java |   2 +-
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   | 181 ++++-
 .../org/apache/iotdb/db/utils/StatusUtils.java     |  46 --
 .../java/org/apache/iotdb/db/utils/SyncUtils.java  | 109 ---
 .../iotdb/db/wal/recover/WALNodeRecoverTask.java   |   4 +-
 .../db/mpp/operator/SchemaScanOperatorTest.java    | 242 ++++++
 .../db/mpp/sql/plan/DistributionPlannerTest.java   | 108 ++-
 .../db/mpp/sql/plan/FragmentInstanceSerdeTest.java |  16 +-
 .../iotdb/db/mpp/sql/plan/LogicalPlannerTest.java  |  85 +++
 .../sql/plan/node/PlanNodeDeserializeHelper.java   |   3 +-
 .../metadata/read/ShowDevicesNodeSerdeTest.java    |   3 +-
 .../sql/plan/node/process/OffsetNodeSerdeTest.java |   6 +-
 .../plan/node/sink/FragmentSinkNodeSerdeTest.java  |   3 +-
 .../source/SeriesAggregateScanNodeSerdeTest.java   |   2 +-
 .../iotdb/db/qp/physical/PhysicalPlanTest.java     |  28 +
 .../iotdb/db/service/InternalServiceImplTest.java  | 167 +++++
 .../sync/pipedata/BufferedPipeDataQueueTest.java   | 542 ++++++++++++++
 .../iotdb/db/sync/pipedata/PipeDataTest.java       |  86 +++
 .../db/sync/receiver/load/FileLoaderTest.java      | 405 -----------
 .../sync/receiver/manager/ReceiverManagerTest.java |  98 +++
 .../recover/SyncReceiverLogAnalyzerTest.java       | 229 ------
 .../receiver/recover/SyncReceiverLoggerTest.java   | 115 ---
 .../receiver/recovery/ReceiverLogAnalyzerTest.java | 124 ++++
 .../db/sync/sender/manage/SyncFileManagerTest.java | 350 ---------
 .../sender/recover/SyncSenderLogAnalyzerTest.java  | 201 -----
 .../sync/sender/recover/SyncSenderLoggerTest.java  | 112 ---
 .../db/sync/sender/transfer/SyncClientTest.java    | 161 ----
 .../db/sync/transport/TransportServiceTest.java    | 205 ++++++
 server/src/test/resources/iotdb-engine.properties  |   4 +-
 server/src/test/resources/logback-test.xml         |   2 +-
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |  15 +-
 .../iotdb/rpc/StatementExecutionException.java     |   4 +-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   4 +-
 .../src/main/thrift/confignode.thrift              |  67 +-
 thrift-influxdb/src/main/thrift/influxdb.thrift    |  36 +-
 thrift-sync/src/main/thrift/sync.thrift            |  51 --
 thrift-sync/src/main/thrift/transport.thrift       |  90 +++
 thrift/src/main/thrift/common.thrift               |  43 +-
 300 files changed, 14532 insertions(+), 6902 deletions(-)

diff --cc node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
index 115c4406c4,d98695824d..fa3e61ef10
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
@@@ -46,16 -45,19 +45,18 @@@ public class RegionReplicaSet 
      this.dataNodeList = dataNodeList;
    }
  
-   public ConsensusGroupId getId() {
-     return id;
+   public ConsensusGroupId getConsensusGroupId() {
+     return consensusGroupId;
    }
  
-   public void setId(ConsensusGroupId id) {
-     this.id = id;
+   public void setConsensusGroupId(ConsensusGroupId consensusGroupId) {
+     this.consensusGroupId = consensusGroupId;
    }
  
+   @Override
    public String toString() {
-     return String.format("RegionReplicaSet[%s-%s]: %s", id.getType(), id, dataNodeList);
+     return String.format(
 -        "RegionReplicaSet[%s-%d]: %s",
 -        consensusGroupId.getType(), consensusGroupId.getId(), dataNodeList);
++        "RegionReplicaSet[%s-%s]: %s", consensusGroupId.getType(), consensusGroupId, dataNodeList);
    }
  
    public void serializeImpl(ByteBuffer buffer) {
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index f87979a2a2,4ef647dc7e..7836d3460f
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@@ -22,9 -22,19 +22,22 @@@ import org.apache.iotdb.commons.partiti
  import org.apache.iotdb.db.mpp.common.MPPQueryContext;
  import org.apache.iotdb.db.mpp.common.PlanFragmentId;
  import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 +import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
- import org.apache.iotdb.db.mpp.sql.planner.plan.*;
- import org.apache.iotdb.db.mpp.sql.planner.plan.node.*;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.IFragmentParallelPlaner;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.SimpleFragmentParallelPlanner;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.SubPlan;
++import org.apache.iotdb.db.mpp.sql.planner.plan.WriteFragmentParallelPlanner;
++import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.node.SimplePlanNodeRewriter;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaScanNode;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
@@@ -83,9 -92,8 +98,10 @@@ public class DistributionPlanner 
    // Convert fragment to detailed instance
    // And for parallel-able fragment, clone it into several instances with different params.
    public List<FragmentInstance> planFragmentInstances(SubPlan subPlan) {
-     IFragmentParallelPlaner parallelPlaner = context.getQueryType() == QueryType.READ ?
-         new SimpleFragmentParallelPlanner(subPlan, analysis, context) :
-         new WriteFragmentParallelPlanner(subPlan, analysis, context);
+     IFragmentParallelPlaner parallelPlaner =
 -        new SimpleFragmentParallelPlanner(subPlan, analysis, context);
++        context.getQueryType() == QueryType.READ
++            ? new SimpleFragmentParallelPlanner(subPlan, analysis, context)
++            : new WriteFragmentParallelPlanner(subPlan, analysis, context);
      return parallelPlaner.parallelPlan();
    }
  
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index c263732b00,3e55a5d32c..72177f06c8
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@@ -30,9 -31,8 +30,7 @@@ import org.apache.iotdb.tsfile.read.fil
  import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
  import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
  
- import java.io.IOException;
  import java.nio.ByteBuffer;
--import java.util.Objects;
  
  public class FragmentInstance implements IConsensusRequest {
    private final FragmentInstanceId id;
@@@ -48,21 -50,23 +48,31 @@@
    // We can add some more params for a specific FragmentInstance
    // So that we can make different FragmentInstance owns different data range.
  
-   public FragmentInstance(PlanFragment fragment, FragmentInstanceId id, Filter timeFilter, QueryType type) {
 -  public FragmentInstance(PlanFragment fragment, int index, Filter timeFilter, QueryType type) {
++  public FragmentInstance(
++      PlanFragment fragment, FragmentInstanceId id, Filter timeFilter, QueryType type) {
      this.fragment = fragment;
      this.timeFilter = timeFilter;
 -    this.id = generateId(fragment.getId(), index);
 +    this.id = id;
      this.type = type;
    }
  
 -  public static FragmentInstanceId generateId(PlanFragmentId id, int index) {
 -    return new FragmentInstanceId(id, String.valueOf(index));
 +  public RegionReplicaSet getDataRegionId() {
-     return dataRegion;
++    return regionReplicaSet;
++  }
++
++  public void setDataRegionAndHost(RegionReplicaSet regionReplicaSet) {
++    this.regionReplicaSet = regionReplicaSet;
++    // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current
++    // instance
++    this.hostEndpoint = regionReplicaSet.getDataNodeList().get(0).getEndPoint();
    }
  
-   public void setDataRegionAndHost(RegionReplicaSet dataRegion) {
-     this.dataRegion = dataRegion;
-     // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current instance
-     this.hostEndpoint = dataRegion.getDataNodeList().get(0).getEndPoint();
+   public RegionReplicaSet getRegionReplicaSet() {
+     return regionReplicaSet;
+   }
+ 
+   public void setRegionReplicaSet(RegionReplicaSet regionReplicaSet) {
+     this.regionReplicaSet = regionReplicaSet;
    }
  
    public Endpoint getHostEndpoint() {
@@@ -120,17 -133,10 +135,9 @@@
      Filter timeFilter = hasTimeFilter ? FilterFactory.deserialize(buffer) : null;
      QueryType queryType = QueryType.values()[ReadWriteIOUtils.readInt(buffer)];
      FragmentInstance fragmentInstance =
 -        new FragmentInstance(
 -            planFragment, Integer.parseInt(id.getInstanceId()), timeFilter, queryType);
 +        new FragmentInstance(planFragment, id, timeFilter, queryType);
-     RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
-     try {
-       regionReplicaSet.deserializeImpl(buffer);
-     } catch (IOException e) {
-       e.printStackTrace();
-     }
-     Endpoint endpoint = new Endpoint();
-     endpoint.deserializeImpl(buffer);
-     fragmentInstance.dataRegion = regionReplicaSet;
-     fragmentInstance.hostEndpoint = endpoint;
+     fragmentInstance.regionReplicaSet = RegionReplicaSet.deserializeImpl(buffer);
+     fragmentInstance.hostEndpoint = Endpoint.deserializeImpl(buffer);
  
      return fragmentInstance;
    }
@@@ -144,20 -150,25 +151,7 @@@
        timeFilter.serialize(buffer);
      }
      ReadWriteIOUtils.write(type.ordinal(), buffer);
-     dataRegion.serializeImpl(buffer);
+     regionReplicaSet.serializeImpl(buffer);
      hostEndpoint.serializeImpl(buffer);
    }
--
--  @Override
--  public boolean equals(Object o) {
--    if (this == o) return true;
--    if (o == null || getClass() != o.getClass()) return false;
--    FragmentInstance instance = (FragmentInstance) o;
-     return Objects.equals(id, instance.id) && type == instance.type && Objects.equals(fragment, instance.fragment) && Objects.equals(dataRegion, instance.dataRegion) && Objects.equals(hostEndpoint, instance.hostEndpoint) && Objects.equals(timeFilter, instance.timeFilter);
 -    return Objects.equals(id, instance.id)
 -        && type == instance.type
 -        && Objects.equals(fragment, instance.fragment)
 -        && Objects.equals(regionReplicaSet, instance.regionReplicaSet)
 -        && Objects.equals(hostEndpoint, instance.hostEndpoint)
 -        && Objects.equals(timeFilter, instance.timeFilter);
--  }
--
--  @Override
--  public int hashCode() {
-     return Objects.hash(id, type, fragment, dataRegion, hostEndpoint, timeFilter);
 -    return Objects.hash(id, type, fragment, regionReplicaSet, hostEndpoint, timeFilter);
--  }
  }
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index 280d9891af,d3357ec275..631b217695
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@@ -99,7 -100,11 +99,7 @@@ public class SimpleFragmentParallelPlan
      // We need to store all the replica host in case of the scenario that the instance need to be
      // redirected
      // to another host when scheduling
-     fragmentInstance.setDataRegionAndHost(dataRegion);
 -    fragmentInstance.setRegionReplicaSet(regionReplicaSet);
 -
 -    // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current
 -    // instance
 -    fragmentInstance.setHostEndpoint(regionReplicaSet.getDataNodeList().get(0).getEndPoint());
++    fragmentInstance.setDataRegionAndHost(regionReplicaSet);
      instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
      fragmentInstanceList.add(fragmentInstance);
    }
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
index eebf705cd3,0000000000..d8c9435214
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
@@@ -1,61 -1,0 +1,70 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.db.mpp.sql.planner.plan;
 +
 +import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 +import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 +import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
 +import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 +import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 +import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 +
 +import java.util.ArrayList;
 +import java.util.List;
 +
- public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner{
++public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner {
 +
 +  private SubPlan subPlan;
 +  private Analysis analysis;
 +  private MPPQueryContext queryContext;
 +
-   public WriteFragmentParallelPlanner(SubPlan subPlan, Analysis analysis, MPPQueryContext queryContext) {
++  public WriteFragmentParallelPlanner(
++      SubPlan subPlan, Analysis analysis, MPPQueryContext queryContext) {
 +    this.subPlan = subPlan;
 +    this.analysis = analysis;
 +    this.queryContext = queryContext;
 +  }
 +
 +  @Override
 +  public List<FragmentInstance> parallelPlan() {
 +    PlanFragment fragment = subPlan.getPlanFragment();
-     Filter timeFilter = analysis.getQueryFilter() != null ? ((GlobalTimeExpression) analysis.getQueryFilter()).getFilter() : null;
++    Filter timeFilter =
++        analysis.getQueryFilter() != null
++            ? ((GlobalTimeExpression) analysis.getQueryFilter()).getFilter()
++            : null;
 +    PlanNode node = fragment.getRoot();
 +    if (!(node instanceof IWritePlanNode)) {
 +      throw new IllegalArgumentException("PlanNode should be IWritePlanNode in WRITE operation");
 +    }
 +    List<IWritePlanNode> splits = ((IWritePlanNode) node).splitByPartition(analysis);
 +    List<FragmentInstance> ret = new ArrayList<>();
 +    for (IWritePlanNode split : splits) {
-       FragmentInstance instance = new FragmentInstance(new PlanFragment(fragment.getId(), split), fragment.getId().genFragmentInstanceId(), timeFilter, queryContext.getQueryType());
++      FragmentInstance instance =
++          new FragmentInstance(
++              new PlanFragment(fragment.getId(), split),
++              fragment.getId().genFragmentInstanceId(),
++              timeFilter,
++              queryContext.getQueryType());
 +      instance.setDataRegionAndHost(((IWritePlanNode) node).getRegionReplicaSet());
 +      ret.add(instance);
 +    }
 +    return ret;
 +  }
 +}
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
index 49b1065a05,47fbed6dff..3fbbf98f0f
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
@@@ -20,8 -20,8 +20,9 @@@ package org.apache.iotdb.db.mpp.sql.pla
  
  import org.apache.iotdb.common.rpc.thrift.TSStatus;
  import org.apache.iotdb.commons.partition.RegionReplicaSet;
+ import org.apache.iotdb.commons.utils.StatusUtils;
  import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 +import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
  import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@@ -112,9 -112,9 +113,9 @@@ public class InsertMultiTabletsNode ext
      Map<RegionReplicaSet, InsertMultiTabletsNode> splitMap = new HashMap<>();
      for (int i = 0; i < insertTabletNodeList.size(); i++) {
        InsertTabletNode insertTabletNode = insertTabletNodeList.get(i);
 -      List<InsertNode> tmpResult = insertTabletNode.splitByPartition(analysis);
 -      for (InsertNode subNode : tmpResult) {
 -        RegionReplicaSet dataRegionReplicaSet = subNode.getDataRegionReplicaSet();
 +      List<IWritePlanNode> tmpResult = insertTabletNode.splitByPartition(analysis);
 +      for (IWritePlanNode subNode : tmpResult) {
-         RegionReplicaSet dataRegionReplicaSet = ((InsertNode)subNode).getDataRegionReplicaSet();
++        RegionReplicaSet dataRegionReplicaSet = ((InsertNode) subNode).getDataRegionReplicaSet();
          if (splitMap.containsKey(dataRegionReplicaSet)) {
            InsertMultiTabletsNode tmpNode = splitMap.get(dataRegionReplicaSet);
            tmpNode.addInsertTabletNode((InsertTabletNode) subNode, i);
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
index d7eef6b50b,30b091d83b..2aead26a69
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
@@@ -26,9 -27,11 +26,10 @@@ import org.apache.iotdb.db.mpp.sql.plan
  import org.apache.iotdb.tsfile.exception.NotImplementedException;
  import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
  import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+ 
  import java.nio.ByteBuffer;
 -import java.util.List;
  
 -public abstract class InsertNode extends PlanNode {
 +public abstract class InsertNode extends IWritePlanNode {
  
    /**
     * if use id table, this filed is id form of device path <br>
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
index 3cba509506,b3012c9986..4a8583d999
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
@@@ -20,9 -20,9 +20,10 @@@ package org.apache.iotdb.db.mpp.sql.pla
  
  import org.apache.iotdb.common.rpc.thrift.TSStatus;
  import org.apache.iotdb.commons.partition.RegionReplicaSet;
+ import org.apache.iotdb.commons.utils.StatusUtils;
  import org.apache.iotdb.db.engine.StorageEngine;
  import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 +import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
  import org.apache.iotdb.tsfile.exception.NotImplementedException;
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 8dc7fd7671,3c22c0fd57..3b5079e625
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@@ -20,9 -20,9 +20,10 @@@ package org.apache.iotdb.db.mpp.sql.pla
  
  import org.apache.iotdb.common.rpc.thrift.TSStatus;
  import org.apache.iotdb.commons.partition.RegionReplicaSet;
+ import org.apache.iotdb.commons.utils.StatusUtils;
  import org.apache.iotdb.db.engine.StorageEngine;
  import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 +import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
  import org.apache.iotdb.tsfile.exception.NotImplementedException;
diff --cc server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index ed192c357b,51a2313f08..39d9c6a71d
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@@ -41,13 -46,10 +46,12 @@@ import org.apache.iotdb.db.mpp.sql.plan
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
- import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertNode;
 +import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
  import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
  import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
++import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
  
  import com.google.common.collect.Sets;
- import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
  import org.junit.Test;
  
  import java.util.ArrayList;
@@@ -205,35 -254,6 +256,32 @@@ public class DistributionPlannerTest 
      assertEquals(3, plan.getInstances().size());
    }
  
 +  @Test
 +  public void TestWriteParallelPlan() throws IllegalPathException {
 +    QueryId queryId = new QueryId("test_write");
-     InsertRowNode insertRowNode = new InsertRowNode(
-         queryId.genPlanNodeId(),
-         new PartialPath("root.sg.d1"),
-         false,
-         new MeasurementSchema[]{
-             new MeasurementSchema("s1", TSDataType.INT32),
-         },
-         new TSDataType[]{
-             TSDataType.INT32
-         },
-         1L,
-         new Object[]{
-             10
-         });
++    InsertRowNode insertRowNode =
++        new InsertRowNode(
++            queryId.genPlanNodeId(),
++            new PartialPath("root.sg.d1"),
++            false,
++            new MeasurementSchema[] {
++              new MeasurementSchema("s1", TSDataType.INT32),
++            },
++            new TSDataType[] {TSDataType.INT32},
++            1L,
++            new Object[] {10});
 +
 +    Analysis analysis = constructAnalysis();
 +
 +    MPPQueryContext context =
 +        new MPPQueryContext("", queryId, null, QueryType.WRITE, new Endpoint());
 +    DistributionPlanner planner =
 +        new DistributionPlanner(analysis, new LogicalQueryPlan(context, insertRowNode));
 +    DistributedQueryPlan plan = planner.planFragments();
 +    plan.getInstances().forEach(System.out::println);
 +    assertEquals(1, plan.getInstances().size());
 +  }
 +
    private Analysis constructAnalysis() {
      Analysis analysis = new Analysis();
  
diff --cc server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
index f57f60dda6,6871ff6560..4bf6a477e2
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
@@@ -18,8 -18,6 +18,7 @@@
   */
  package org.apache.iotdb.db.mpp.sql.plan;
  
- import com.google.common.collect.ImmutableList;
 +import org.apache.iotdb.commons.cluster.DataNodeLocation;
  import org.apache.iotdb.commons.cluster.Endpoint;
  import org.apache.iotdb.commons.consensus.DataRegionId;
  import org.apache.iotdb.commons.partition.RegionReplicaSet;
@@@ -45,6 -43,6 +44,7 @@@ import org.apache.iotdb.tsfile.read.exp
  import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
  import org.apache.iotdb.tsfile.read.filter.operator.Gt;
  
++import com.google.common.collect.ImmutableList;
  import org.junit.Test;
  
  import java.nio.ByteBuffer;
@@@ -64,8 -61,9 +64,10 @@@ public class FragmentInstanceSerdeTest 
              new GroupByFilter(1, 2, 3, 4),
              QueryType.READ);
      RegionReplicaSet regionReplicaSet =
-         new RegionReplicaSet(new DataRegionId(1), ImmutableList.of(new DataNodeLocation(0, new Endpoint("127.0.0.1", 6666))));
 -        new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
 -    fragmentInstance.setRegionReplicaSet(regionReplicaSet);
 -    fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.1", 6666));
++        new RegionReplicaSet(
++            new DataRegionId(1),
++            ImmutableList.of(new DataNodeLocation(0, new Endpoint("127.0.0.1", 6666))));
 +    fragmentInstance.setDataRegionAndHost(regionReplicaSet);
  
      ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
      fragmentInstance.serializeRequest(byteBuffer);
@@@ -84,8 -81,9 +86,10 @@@
              null,
              QueryType.READ);
      RegionReplicaSet regionReplicaSet =
-         new RegionReplicaSet(new DataRegionId(1), ImmutableList.of(new DataNodeLocation(0, new Endpoint("127.0.0.2", 6667))));
 -        new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
 -    fragmentInstance.setRegionReplicaSet(regionReplicaSet);
 -    fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.2", 6667));
++        new RegionReplicaSet(
++            new DataRegionId(1),
++            ImmutableList.of(new DataNodeLocation(0, new Endpoint("127.0.0.2", 6667))));
 +    fragmentInstance.setDataRegionAndHost(regionReplicaSet);
  
      ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
      fragmentInstance.serializeRequest(byteBuffer);
diff --cc server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
index 0000000000,7ed08d65b6..d5ba4d1f07
mode 000000,100644..100644
--- a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@@ -1,0 -1,164 +1,167 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ 
+ package org.apache.iotdb.db.service;
+ 
+ import org.apache.iotdb.commons.cluster.DataNodeLocation;
+ import org.apache.iotdb.commons.cluster.Endpoint;
+ import org.apache.iotdb.commons.consensus.SchemaRegionId;
+ import org.apache.iotdb.commons.partition.RegionReplicaSet;
+ import org.apache.iotdb.consensus.common.Peer;
+ import org.apache.iotdb.db.conf.IoTDBConfig;
+ import org.apache.iotdb.db.conf.IoTDBDescriptor;
+ import org.apache.iotdb.db.consensus.ConsensusImpl;
+ import org.apache.iotdb.db.exception.metadata.MetadataException;
+ import org.apache.iotdb.db.metadata.LocalConfigNode;
+ import org.apache.iotdb.db.metadata.path.PartialPath;
+ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+ import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+ import org.apache.iotdb.db.utils.EnvironmentUtils;
+ import org.apache.iotdb.mpp.rpc.thrift.TConsensusGroupId;
+ import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
+ import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
+ import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+ import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
+ 
+ import org.apache.ratis.util.FileUtils;
+ import org.junit.After;
+ import org.junit.Assert;
+ import org.junit.Before;
+ import org.junit.Test;
+ 
+ import java.io.File;
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.List;
+ 
+ public class InternalServiceImplTest {
+   private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
+   InternalServiceImpl internalServiceImpl;
+   LocalConfigNode configNode;
+ 
+   @Before
+   public void setUp() throws Exception {
+     IoTDB.configManager.init();
+     configNode = LocalConfigNode.getInstance();
+     configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln"));
+     ConsensusImpl.getInstance().start();
+     RegionReplicaSet regionReplicaSet = genRegionReplicaSet();
+     ConsensusImpl.getInstance()
+         .addConsensusGroup(regionReplicaSet.getConsensusGroupId(), genPeerList(regionReplicaSet));
+     internalServiceImpl = new InternalServiceImpl();
+   }
+ 
+   @After
+   public void tearDown() throws Exception {
+     IoTDB.configManager.clear();
+     RegionReplicaSet regionReplicaSet = genRegionReplicaSet();
+     ConsensusImpl.getInstance().removeConsensusGroup(regionReplicaSet.getConsensusGroupId());
+     ConsensusImpl.getInstance().stop();
+     EnvironmentUtils.cleanEnv();
+     FileUtils.deleteFully(new File("data" + File.separator + "consensus"));
+   }
+ 
+   @Test
+   public void createTimeseriesTest() throws MetadataException {
+     configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln"));
+     CreateTimeSeriesNode createTimeSeriesNode =
+         new CreateTimeSeriesNode(
+             new PlanNodeId("0"),
+             new PartialPath("root.ln.wf01.wt01.status"),
+             TSDataType.BOOLEAN,
+             TSEncoding.PLAIN,
+             CompressionType.SNAPPY,
+             new HashMap<String, String>() {
+               {
+                 put("MAX_POINT_NUMBER", "3");
+               }
+             },
+             new HashMap<String, String>() {
+               {
+                 put("tag1", "v1");
+                 put("tag2", "v2");
+               }
+             },
+             new HashMap<String, String>() {
+               {
+                 put("attr1", "a1");
+                 put("attr2", "a2");
+               }
+             },
+             "meter1");
+ 
+     RegionReplicaSet regionReplicaSet = genRegionReplicaSet();
+     PlanFragment planFragment = new PlanFragment(new PlanFragmentId("2", 3), createTimeSeriesNode);
+     FragmentInstance fragmentInstance =
 -        new FragmentInstance(planFragment, 4, new GroupByFilter(1, 2, 3, 4), QueryType.WRITE);
 -    fragmentInstance.setRegionReplicaSet(regionReplicaSet);
 -    fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.1", 6666));
++        new FragmentInstance(
++            planFragment,
++            planFragment.getId().genFragmentInstanceId(),
++            new GroupByFilter(1, 2, 3, 4),
++            QueryType.WRITE);
++    fragmentInstance.setDataRegionAndHost(regionReplicaSet);
+ 
+     // serialize fragmentInstance
+     ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+     fragmentInstance.serializeRequest(byteBuffer);
+     byteBuffer.flip();
+ 
+     // put serialized fragmentInstance to TSendFragmentInstanceReq
+     TSendFragmentInstanceReq request = new TSendFragmentInstanceReq();
+     TFragmentInstance tFragmentInstance = new TFragmentInstance();
+     tFragmentInstance.setBody(byteBuffer);
+     request.setFragmentInstance(tFragmentInstance);
+     request.setConsensusGroupId(
+         new TConsensusGroupId(
+             regionReplicaSet.getConsensusGroupId().getId(),
+             regionReplicaSet.getConsensusGroupId().getType().toString()));
+     request.setQueryType(QueryType.WRITE.toString());
+ 
+     // Use consensus layer to execute request
+     TSendFragmentInstanceResp response = internalServiceImpl.sendFragmentInstance(request);
+ 
+     Assert.assertTrue(response.accepted);
+   }
+ 
+   private RegionReplicaSet genRegionReplicaSet() {
+     List<DataNodeLocation> dataNodeList = new ArrayList<>();
+     dataNodeList.add(
+         new DataNodeLocation(
+             conf.getConsensusPort(), new Endpoint(conf.getInternalIp(), conf.getConsensusPort())));
+ 
+     // construct fragmentInstance
+     SchemaRegionId schemaRegionId = new SchemaRegionId(0);
+     return new RegionReplicaSet(schemaRegionId, dataNodeList);
+   }
+ 
+   private List<Peer> genPeerList(RegionReplicaSet regionReplicaSet) {
+     List<Peer> peerList = new ArrayList<>();
+     for (DataNodeLocation node : regionReplicaSet.getDataNodeList()) {
+       peerList.add(new Peer(regionReplicaSet.getConsensusGroupId(), node.getEndPoint()));
+     }
+     return peerList;
+   }
+ }


[iotdb] 03/03: add UT for DistributedPlan Write

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/write_instance_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b8a351c26ad4f3793343c0f2f809e4898e40261d
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 13 22:43:35 2022 +0800

    add UT for DistributedPlan Write
---
 .../iotdb/commons/partition/DataPartition.java     | 11 ++++-
 .../planner/plan/WriteFragmentParallelPlanner.java |  2 +-
 .../db/mpp/sql/plan/DistributionPlannerTest.java   | 57 ++++++++++++++++++++--
 3 files changed, 62 insertions(+), 8 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 3f677ce2c9..f352d7cdeb 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -68,7 +68,10 @@ public class DataPartition {
     // A list of data region replica sets will store data in a same time partition.
     // We will insert data to the last set in the list.
     // TODO return the latest dataRegionReplicaSet for each time partition
-    return Collections.emptyList();
+    String storageGroup = getStorageGroupByDevice(deviceName);
+    SeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName);
+    // IMPORTANT TODO: (xingtanzjr) need to handle the situation for write operation that there are more than 1 Regions for one timeSlot
+    return dataPartitionMap.get(storageGroup).get(seriesPartitionSlot).entrySet().stream().filter(entry -> timePartitionSlotList.contains(entry.getKey())).flatMap(entry -> entry.getValue().stream()).collect(Collectors.toList());
   }
 
   public RegionReplicaSet getDataRegionReplicaSetForWriting(
@@ -76,7 +79,11 @@ public class DataPartition {
     // A list of data region replica sets will store data in a same time partition.
     // We will insert data to the last set in the list.
     // TODO return the latest dataRegionReplicaSet for each time partition
-    return null;
+    String storageGroup = getStorageGroupByDevice(deviceName);
+    SeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName);
+    List<RegionReplicaSet> regions = dataPartitionMap.get(storageGroup).get(seriesPartitionSlot).entrySet().stream().filter(entry -> entry.getKey().equals(timePartitionSlot)).flatMap(entry -> entry.getValue().stream()).collect(Collectors.toList());
+    // IMPORTANT TODO: (xingtanzjr) need to handle the situation for write operation that there are more than 1 Regions for one timeSlot
+    return regions.get(0);
   }
 
   private SeriesPartitionSlot calculateDeviceGroupId(String deviceName) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
index d8c9435214..a31ac6f25f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
@@ -62,7 +62,7 @@ public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner {
               fragment.getId().genFragmentInstanceId(),
               timeFilter,
               queryContext.getQueryType());
-      instance.setDataRegionAndHost(((IWritePlanNode) node).getRegionReplicaSet());
+      instance.setDataRegionAndHost(split.getRegionReplicaSet());
       ret.add(instance);
     }
     return ret;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index 39d9c6a71d..b4b77956f7 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowsNode;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -257,7 +258,7 @@ public class DistributionPlannerTest {
   }
 
   @Test
-  public void TestWriteParallelPlan() throws IllegalPathException {
+  public void TestInsertRowNodeParallelPlan() throws IllegalPathException {
     QueryId queryId = new QueryId("test_write");
     InsertRowNode insertRowNode =
         new InsertRowNode(
@@ -282,6 +283,48 @@ public class DistributionPlannerTest {
     assertEquals(1, plan.getInstances().size());
   }
 
+  @Test
+  public void TestInsertRowsNodeParallelPlan() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_write");
+    InsertRowNode insertRowNode1 =
+        new InsertRowNode(
+            queryId.genPlanNodeId(),
+            new PartialPath("root.sg.d1"),
+            false,
+            new MeasurementSchema[] {
+                new MeasurementSchema("s1", TSDataType.INT32),
+            },
+            new TSDataType[] {TSDataType.INT32},
+            1L,
+            new Object[] {10});
+
+    InsertRowNode insertRowNode2 =
+        new InsertRowNode(
+            queryId.genPlanNodeId(),
+            new PartialPath("root.sg.d1"),
+            false,
+            new MeasurementSchema[] {
+                new MeasurementSchema("s1", TSDataType.INT32),
+            },
+            new TSDataType[] {TSDataType.INT32},
+            100000L,
+            new Object[] {10});
+
+    InsertRowsNode node = new InsertRowsNode(queryId.genPlanNodeId());
+    node.setInsertRowNodeList(Arrays.asList(insertRowNode1, insertRowNode2));
+    node.setInsertRowNodeIndexList(Arrays.asList(0, 1));
+
+    Analysis analysis = constructAnalysis();
+
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, QueryType.WRITE, new Endpoint());
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, node));
+    DistributedQueryPlan plan = planner.planFragments();
+    plan.getInstances().forEach(System.out::println);
+    assertEquals(1, plan.getInstances().size());
+  }
+
   private Analysis constructAnalysis() {
     Analysis analysis = new Analysis();
 
@@ -295,21 +338,25 @@ public class DistributionPlannerTest {
     Map<SeriesPartitionSlot, Map<TimePartitionSlot, List<RegionReplicaSet>>> sgPartitionMap =
         new HashMap<>();
 
-    List<RegionReplicaSet> d1DataRegions = new ArrayList<>();
-    d1DataRegions.add(
+    List<RegionReplicaSet> d1DataRegion1 = new ArrayList<>();
+    d1DataRegion1.add(
         new RegionReplicaSet(
             new DataRegionId(1),
             Arrays.asList(
                 new DataNodeLocation(11, new Endpoint("192.0.1.1", 9000)),
                 new DataNodeLocation(12, new Endpoint("192.0.1.2", 9000)))));
-    d1DataRegions.add(
+
+    List<RegionReplicaSet> d1DataRegion2 = new ArrayList<>();
+    d1DataRegion2.add(
         new RegionReplicaSet(
             new DataRegionId(2),
             Arrays.asList(
                 new DataNodeLocation(21, new Endpoint("192.0.2.1", 9000)),
                 new DataNodeLocation(22, new Endpoint("192.0.2.2", 9000)))));
+
     Map<TimePartitionSlot, List<RegionReplicaSet>> d1DataRegionMap = new HashMap<>();
-    d1DataRegionMap.put(new TimePartitionSlot(0), d1DataRegions);
+    d1DataRegionMap.put(new TimePartitionSlot(0), d1DataRegion1);
+    d1DataRegionMap.put(new TimePartitionSlot(1), d1DataRegion2);
 
     List<RegionReplicaSet> d2DataRegions = new ArrayList<>();
     d2DataRegions.add(


[iotdb] 02/03: fix UT error

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/write_instance_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8534b11a1432c2054d99ae4378285e63eb924c73
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 13 21:18:35 2022 +0800

    fix UT error
---
 .../iotdb/commons/partition/RegionReplicaSet.java     |  3 +--
 .../db/mpp/sql/planner/plan/FragmentInstance.java     | 19 +++++++++++++++++++
 .../plan/node/source/SeriesAggregateScanNode.java     |  3 +--
 .../sql/planner/plan/node/source/SeriesScanNode.java  |  3 +--
 4 files changed, 22 insertions(+), 6 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
index fa3e61ef10..955a021e50 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
@@ -55,8 +55,7 @@ public class RegionReplicaSet {
 
   @Override
   public String toString() {
-    return String.format(
-        "RegionReplicaSet[%s-%s]: %s", consensusGroupId.getType(), consensusGroupId, dataNodeList);
+    return String.format("RegionReplicaSet[%s]: %s", consensusGroupId, dataNodeList);
   }
 
   public void serializeImpl(ByteBuffer buffer) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index 72177f06c8..a3065c8c03 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.ByteBuffer;
+import java.util.Objects;
 
 public class FragmentInstance implements IConsensusRequest {
   private final FragmentInstanceId id;
@@ -154,4 +155,22 @@ public class FragmentInstance implements IConsensusRequest {
     regionReplicaSet.serializeImpl(buffer);
     hostEndpoint.serializeImpl(buffer);
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    FragmentInstance instance = (FragmentInstance) o;
+    return Objects.equals(id, instance.id)
+        && type == instance.type
+        && Objects.equals(fragment, instance.fragment)
+        && Objects.equals(regionReplicaSet, instance.regionReplicaSet)
+        && Objects.equals(hostEndpoint, instance.hostEndpoint)
+        && Objects.equals(timeFilter, instance.timeFilter);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(id, type, fragment, regionReplicaSet, hostEndpoint, timeFilter);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
index 87c2ff1d05..756bf76f79 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
@@ -195,8 +195,7 @@ public class SeriesAggregateScanNode extends SourceNode implements IOutputPlanNo
     Filter timeFilter = FilterFactory.deserialize(byteBuffer);
 
     // TODO serialize groupByTimeParameter
-    RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
-    RegionReplicaSet.deserializeImpl(byteBuffer);
+    RegionReplicaSet regionReplicaSet = RegionReplicaSet.deserializeImpl(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     SeriesAggregateScanNode seriesAggregateScanNode =
         new SeriesAggregateScanNode(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
index 9700720466..913890ffcd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
@@ -209,8 +209,7 @@ public class SeriesScanNode extends SourceNode implements IOutputPlanNode {
     if (isNull == 1) valueFilter = FilterFactory.deserialize(byteBuffer);
     int limit = ReadWriteIOUtils.readInt(byteBuffer);
     int offset = ReadWriteIOUtils.readInt(byteBuffer);
-    RegionReplicaSet dataRegionReplicaSet = new RegionReplicaSet();
-    RegionReplicaSet.deserializeImpl(byteBuffer);
+    RegionReplicaSet dataRegionReplicaSet = RegionReplicaSet.deserializeImpl(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     SeriesScanNode seriesScanNode = new SeriesScanNode(planNodeId, partialPath);
     seriesScanNode.allSensors = allSensors;