You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/13 14:43:49 UTC

[iotdb] 01/03: Merge branch 'xingtanzjr/modification_pr' into xingtanzjr/write_instance_parallel

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/write_instance_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d32fc0713cd568c35cf7e43edcaf1eb8537a94f0
Merge: 850198002e c5192db130
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 13 21:00:22 2022 +0800

    Merge branch 'xingtanzjr/modification_pr' into xingtanzjr/write_instance_parallel

 .github/workflows/influxdb-protocol.yml            |   2 +-
 .github/workflows/main-unix.yml                    |   3 +-
 .github/workflows/main-win.yml                     |   3 +-
 .../apache/iotdb/db/qp/sql/InfluxDBSqlParser.g4    |   2 +-
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  67 +-
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |  24 +
 .../org/apache/iotdb/cluster/ClusterIoTDB.java     |   1 -
 .../apache/iotdb/cluster/query/ClusterPlanner.java |   1 +
 .../apache/iotdb/cluster/utils/StatusUtils.java    |   6 +-
 confignode/pom.xml                                 |   5 +
 .../resources/conf/iotdb-confignode.properties     |  13 +-
 .../iotdb/confignode/conf/ConfigNodeConf.java      |  14 +
 .../confignode/conf/ConfigNodeDescriptor.java      |   4 +
 .../consensus/response/DataPartitionDataSet.java   |  22 +-
 .../consensus/response/SchemaPartitionDataSet.java |  77 +-
 .../response/StorageGroupSchemaDataSet.java        |  12 +-
 .../iotdb/confignode/manager/ConfigManager.java    | 114 ++-
 .../iotdb/confignode/manager/ConsensusManager.java |  36 +-
 .../apache/iotdb/confignode/manager/Manager.java   |  24 +-
 .../iotdb/confignode/manager/PartitionManager.java | 130 ++--
 .../iotdb/confignode/manager/RegionManager.java    |   6 +-
 .../confignode/partition/StorageGroupSchema.java   |   9 +
 .../persistence/PartitionInfoPersistence.java      |  60 +-
 .../persistence/RegionInfoPersistence.java         |  29 +-
 .../iotdb/confignode/physical/PhysicalPlan.java    |   2 +-
 .../physical/crud/CreateDataPartitionPlan.java     |   5 +-
 .../physical/crud/CreateRegionsPlan.java           |   5 +-
 .../physical/crud/CreateSchemaPartitionPlan.java   |  70 +-
 .../crud/GetOrCreateDataPartitionPlan.java         |   6 +-
 .../crud/GetOrCreateSchemaPartitionPlan.java       |  86 ++-
 .../confignode/service/executor/PlanExecutor.java  |   4 +-
 .../server/ConfigNodeRPCServerProcessor.java       |  64 +-
 .../confignode/consensus/RatisConsensusDemo.java   |  10 +-
 .../hash/DeviceGroupHashExecutorManualTest.java    |   5 +-
 .../physical/SerializeDeserializeUT.java           |  45 +-
 .../server/ConfigNodeRPCServerProcessorTest.java   | 284 +++++---
 .../iotdb/consensus/ratis/RequestMessage.java      |   1 +
 distribution/pom.xml                               |   6 +
 docs/UserGuide/Maintenance-Tools/Sync-Tool.md      | 396 +++++++---
 docs/zh/UserGuide/Maintenance-Tools/Sync-Tool.md   | 493 ++++++++++---
 influxdb-protocol/pom.xml                          |  38 +-
 .../iotdb/influxdb/protocol/dto/SessionPoint.java  |   6 +-
 .../protocol/impl/IoTDBInfluxDBService.java        |  12 +-
 .../iotdb/influxdb/session/InfluxDBSession.java    |  42 +-
 .../sync/IoTDBSyncReceiverCollectorIT.java         | 513 +++++++++++++
 .../db/integration/sync/IoTDBSyncReceiverIT.java   | 357 +++++++++
 .../sync/IoTDBSyncReceiverLoaderIT.java            | 210 ++++++
 .../db/integration/sync/IoTDBSyncSenderIT.java     | 304 ++++++++
 .../iotdb/db/integration/sync/SyncTestUtil.java    | 203 ++++++
 .../db/integration/sync/TransportClientMock.java   |  69 ++
 .../src/test/resources/iotdb-engine.properties     |   3 +-
 integration/src/test/resources/logback-test.xml    |   2 +-
 .../iotdb/commons/cluster/DataNodeLocation.java    |   6 +-
 .../org/apache/iotdb/commons/cluster/Endpoint.java |   6 +-
 .../iotdb/commons/concurrent/ThreadName.java       |   6 +-
 .../apache/iotdb/commons/conf/IoTDBConstant.java   |  15 +
 .../iotdb/commons/consensus/ConsensusGroupId.java  |   5 +-
 .../iotdb/commons/partition/DataPartition.java     |   4 +-
 .../iotdb/commons/partition/RegionReplicaSet.java  |  42 +-
 .../iotdb/commons/partition/SchemaPartition.java   | 120 ++-
 .../apache/iotdb/commons/service/ServiceType.java  |   2 +
 .../apache/iotdb/commons/utils/CommonUtils.java    |   4 +-
 .../apache/iotdb/commons}/utils/StatusUtils.java   |  49 +-
 pom.xml                                            |   1 +
 server/pom.xml                                     |   5 +
 .../resources/conf/iotdb-engine.properties         |  19 +-
 .../resources/conf/iotdb-sync-client.properties    |  35 -
 .../assembly/resources/tools/start-sync-client.bat |  71 --
 .../assembly/resources/tools/start-sync-client.sh  |  54 --
 .../assembly/resources/tools/stop-sync-client.bat  |  23 -
 .../assembly/resources/tools/stop-sync-client.sh   |  30 -
 .../apache/iotdb/db/client/ConfigNodeClient.java   | 309 ++++++++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  81 ++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  43 +-
 .../consensus/statemachine/BaseStateMachine.java   |   2 +
 .../statemachine/DataRegionStateMachine.java       |  37 +-
 .../statemachine/SchemaRegionStateMachine.java     |  17 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  16 +-
 .../apache/iotdb/db/engine/StorageEngineV2.java    |  62 +-
 .../iotdb/db/engine/modification/Deletion.java     |  23 +
 .../iotdb/db/engine/storagegroup/DataRegion.java   | 183 +++--
 .../db/engine/storagegroup/TsFileManager.java      |  38 +
 .../db/engine/storagegroup/TsFileProcessor.java    |  23 +
 .../dataregion/StorageGroupManager.java            |  10 +
 .../sync/PipeDataLoadBearableException.java}       |  10 +-
 .../sync/PipeDataLoadException.java}               |  10 +-
 .../sync/PipeDataLoadUnbearableException.java}     |  10 +-
 .../sync/PipeException.java}                       |  20 +-
 .../sync/PipeServerException.java}                 |  22 +-
 .../PipeSinkException.java}                        |  24 +-
 .../iotdb/db/metadata/Executor/SchemaVisitor.java  |  96 +++
 ...ocalConfigManager.java => LocalConfigNode.java} | 170 ++---
 .../db/metadata/LocalSchemaPartitionTable.java     |   4 +-
 .../iotdb/db/metadata/LocalSchemaProcessor.java    |  28 +-
 .../iotdb/db/metadata/mtree/MTreeBelowSG.java      |   1 +
 .../traverser/collector/MeasurementCollector.java  |   4 +
 .../db/metadata/schemaregion/SchemaEngine.java     |  27 +-
 .../db/metadata/schemaregion/SchemaRegion.java     |  20 +
 .../storagegroup/IStorageGroupSchemaManager.java   |   8 +
 .../storagegroup/StorageGroupSchemaManager.java    |  28 +
 .../iotdb/db/mpp/buffer/DataBlockManager.java      |   7 +-
 .../iotdb/db/mpp/buffer/IDataBlockManager.java     |   5 +
 .../apache/iotdb/db/mpp/common/PlanFragmentId.java |  10 +-
 .../org/apache/iotdb/db/mpp/common/QueryId.java    |   8 +-
 .../apache/iotdb/db/mpp/execution/DataDriver.java  |   3 +-
 .../db/mpp/execution/FragmentInstanceManager.java  |   4 +-
 .../iotdb/db/mpp/execution/QueryExecution.java     |  22 +-
 .../db/mpp/execution/SchemaDriverContext.java      |   8 +-
 .../scheduler/SimpleFragInstanceDispatcher.java    |   4 +-
 .../operator/schema/DevicesSchemaScanOperator.java |  97 +++
 .../mpp/operator/schema/SchemaMergeOperator.java   |  79 ++
 .../db/mpp/operator/schema/SchemaScanOperator.java | 107 +++
 .../schema/TimeSeriesSchemaScanOperator.java       | 142 ++++
 .../db/mpp/operator/source/SeriesScanOperator.java |   2 +-
 .../apache/iotdb/db/mpp/sql/analyze/Analyzer.java  |  42 +-
 .../db/mpp/sql/analyze/ClusterSchemaFetcher.java   |   7 +-
 .../db/mpp/sql/analyze/FakeSchemaFetcherImpl.java  |   7 +-
 .../iotdb/db/mpp/sql/analyze/ISchemaFetcher.java   |   7 +-
 .../mpp/sql/analyze/StandaloneSchemaFetcher.java   |   7 +-
 .../apache/iotdb/db/mpp/sql/parser/ASTVisitor.java |   5 +
 .../db/mpp/sql/planner/DistributionPlanner.java    | 133 +++-
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  |  65 +-
 .../iotdb/db/mpp/sql/planner/LogicalPlanner.java   |  47 ++
 .../iotdb/db/mpp/sql/planner/QueryPlanBuilder.java |  42 ++
 .../db/mpp/sql/planner/plan/FragmentInstance.java  |  66 +-
 .../db/mpp/sql/planner/plan/PlanFragment.java      |  10 +-
 .../plan/SimpleFragmentParallelPlanner.java        |   6 +-
 .../planner/plan/WriteFragmentParallelPlanner.java |  17 +-
 .../db/mpp/sql/planner/plan/node/PlanNode.java     |   1 +
 .../db/mpp/sql/planner/plan/node/PlanNodeType.java |  15 +-
 .../db/mpp/sql/planner/plan/node/PlanNodeUtil.java |   3 +
 .../db/mpp/sql/planner/plan/node/PlanVisitor.java  |  30 +
 .../node/metedata/read/DevicesSchemaScanNode.java  | 108 +++
 .../read/SchemaMergeNode.java}                     |  44 +-
 .../plan/node/metedata/read/SchemaScanNode.java    | 119 +++
 .../metedata/read/TimeSeriesSchemaScanNode.java    | 145 ++++
 .../node/metedata/write/AlterTimeSeriesNode.java   |  37 +
 .../write/CreateAlignedTimeSeriesNode.java         |  13 +
 .../node/metedata/write/CreateTimeSeriesNode.java  | 127 +++-
 .../planner/plan/node/process/ExchangeNode.java    |   1 +
 .../planner/plan/node/sink/FragmentSinkNode.java   |   1 +
 .../plan/node/source/SeriesAggregateScanNode.java  |  10 +-
 .../planner/plan/node/source/SeriesScanNode.java   |  15 +-
 .../sql/planner/plan/node/source/SourceNode.java   |   4 +-
 .../plan/node/write/InsertMultiTabletsNode.java    |   7 +-
 .../sql/planner/plan/node/write/InsertNode.java    |   1 +
 .../planner/plan/node/write/InsertRowsNode.java    |   5 +
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |   5 +
 .../db/mpp/sql/statement/StatementVisitor.java     |  10 +
 .../crud/InsertMultiTabletsStatement.java          |   8 +
 .../crud/InsertRowsOfOneDeviceStatement.java       |   1 +
 .../sql/statement/crud/InsertRowsStatement.java    |   8 +
 .../db/mpp/sql/statement/crud/InsertStatement.java |   1 +
 .../db/mpp/sql/statement/crud/QueryStatement.java  |   1 +
 .../metadata/AlterTimeSeriesStatement.java         |   1 +
 .../metadata/CreateAlignedTimeSeriesStatement.java |   1 +
 .../metadata/CreateTimeSeriesStatement.java        |   1 +
 .../statement/metadata/ShowDevicesStatement.java   |   6 +
 .../mpp/sql/statement/metadata/ShowStatement.java  |  10 +
 .../metadata/ShowTimeSeriesStatement.java          |   6 +
 .../db/protocol/influxdb/handler/QueryHandler.java |   6 +-
 .../apache/iotdb/db/qp/constant/SQLConstant.java   |  26 +
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  | 189 ++++-
 .../org/apache/iotdb/db/qp/logical/Operator.java   |  14 +-
 .../db/qp/logical/sys/CreatePipeOperator.java      |  69 ++
 .../db/qp/logical/sys/CreatePipeSinkOperator.java  |  61 ++
 .../iotdb/db/qp/logical/sys/DropPipeOperator.java  |  24 +-
 .../db/qp/logical/sys/DropPipeSinkOperator.java    |  23 +-
 .../iotdb/db/qp/logical/sys/ShowPipeOperator.java  |  27 +-
 .../db/qp/logical/sys/ShowPipeServerOperator.java  |  20 +-
 .../db/qp/logical/sys/ShowPipeSinkOperator.java    |  27 +-
 .../qp/logical/sys/ShowPipeSinkTypeOperator.java   |  20 +-
 .../iotdb/db/qp/logical/sys/StartPipeOperator.java |  24 +-
 .../db/qp/logical/sys/StartPipeServerOperator.java |  18 +-
 .../iotdb/db/qp/logical/sys/StopPipeOperator.java  |  24 +-
 .../db/qp/logical/sys/StopPipeServerOperator.java  |  18 +-
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |  10 +
 .../qp/physical/crud/InsertMultiTabletsPlan.java   |   2 +-
 .../iotdb/db/qp/physical/crud/InsertRowsPlan.java  |   2 +-
 .../physical/sys/CreateAlignedTimeSeriesPlan.java  |  15 +-
 .../qp/physical/sys/CreateMultiTimeSeriesPlan.java |   2 +-
 .../iotdb/db/qp/physical/sys/CreatePipePlan.java   | 110 +++
 .../db/qp/physical/sys/CreatePipeSinkPlan.java     |  98 +++
 .../db/qp/physical/sys/CreateTimeSeriesPlan.java   |  10 +
 .../db/qp/physical/sys/DeleteTimeSeriesPlan.java   |   2 +-
 .../iotdb/db/qp/physical/sys/DropPipeSinkPlan.java |  29 +-
 .../iotdb/db/qp/physical/sys/OperatePipePlan.java  |  29 +-
 .../physical/sys/ShowPipePlan.java}                |  22 +-
 .../physical/sys/ShowPipeServerPlan.java}          |  11 +-
 .../physical/sys/ShowPipeSinkPlan.java}            |  23 +-
 .../physical/sys/ShowPipeSinkTypePlan.java}        |  11 +-
 .../apache/iotdb/db/qp/physical/sys/ShowPlan.java  |   6 +-
 .../db/qp/physical/sys/StartPipeServerPlan.java    |  56 ++
 .../db/qp/physical/sys/StopPipeServerPlan.java     |  56 ++
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    | 190 ++++-
 .../apache/iotdb/db/qp/utils/DatetimeUtils.java    |  15 +
 .../java/org/apache/iotdb/db/service/DataNode.java |  67 +-
 .../iotdb/db/service/InternalServiceImpl.java      |  32 +-
 .../java/org/apache/iotdb/db/service/IoTDB.java    |  11 +-
 .../thrift/impl/DataNodeManagementServiceImpl.java |  68 +-
 .../service/thrift/impl/InfluxDBServiceImpl.java   |  42 +-
 .../apache/iotdb/db/sync/conf/SyncConstant.java    |  85 +--
 .../apache/iotdb/db/sync/conf/SyncPathUtil.java    | 100 +++
 .../iotdb/db/sync/conf/SyncSenderConfig.java       | 127 ----
 .../iotdb/db/sync/conf/SyncSenderDescriptor.java   | 124 ----
 .../org/apache/iotdb/db/sync/package-info.java     |  38 -
 .../iotdb/db/sync/pipedata/DeletionPipeData.java   |  85 +++
 .../apache/iotdb/db/sync/pipedata/PipeData.java    |  96 +++
 .../iotdb/db/sync/pipedata/SchemaPipeData.java     |  98 +++
 .../iotdb/db/sync/pipedata/TsFilePipeData.java     | 177 +++++
 .../sync/pipedata/queue/BufferedPipeDataQueue.java | 438 +++++++++++
 .../queue/PipeDataQueue.java}                      |  21 +-
 .../sync/pipedata/queue/PipeDataQueueFactory.java  |  57 ++
 .../iotdb/db/sync/receiver/ReceiverService.java    | 278 +++++++
 .../db/sync/receiver/collector/Collector.java      | 170 +++++
 .../db/sync/receiver/load/DeletionLoader.java      |  48 ++
 .../iotdb/db/sync/receiver/load/FileLoader.java    | 203 ------
 .../db/sync/receiver/load/FileLoaderManager.java   | 213 ------
 .../iotdb/db/sync/receiver/load/IFileLoader.java   |  50 --
 .../iotdb/db/sync/receiver/load/ILoadLogger.java   |  57 --
 .../receiver/load/{LoadType.java => ILoader.java}  |  12 +-
 .../iotdb/db/sync/receiver/load/LoadLogger.java    |  72 --
 .../iotdb/db/sync/receiver/load/SchemaLoader.java  |  60 ++
 .../iotdb/db/sync/receiver/load/TsFileLoader.java  |  67 ++
 .../iotdb/db/sync/receiver/manager/PipeInfo.java   |  85 +++
 .../db/sync/receiver/manager/PipeMessage.java      |  76 ++
 .../db/sync/receiver/manager/ReceiverManager.java  | 216 ++++++
 .../sync/receiver/recover/ISyncReceiverLogger.java |  50 --
 .../receiver/recover/SyncReceiverLogAnalyzer.java  | 154 ----
 .../sync/receiver/recover/SyncReceiverLogger.java  |  72 --
 .../db/sync/receiver/recovery/ReceiverLog.java     | 127 ++++
 .../receiver/recovery/ReceiverLogAnalyzer.java     | 157 ++++
 .../db/sync/receiver/transfer/SyncServiceImpl.java | 370 ----------
 .../db/sync/sender/manage/ISyncFileManager.java    |  72 --
 .../db/sync/sender/manage/SyncFileManager.java     | 291 --------
 .../db/sync/sender/manager/SchemaSyncManager.java  | 163 +++++
 .../db/sync/sender/manager/TsFileSyncManager.java  | 118 +++
 .../iotdb/db/sync/sender/pipe/IoTDBPipeSink.java   |  97 +++
 .../org/apache/iotdb/db/sync/sender/pipe/Pipe.java | 106 +++
 .../sender/pipe/PipeSink.java}                     |  41 +-
 .../iotdb/db/sync/sender/pipe/TsFilePipe.java      | 334 +++++++++
 .../sender/recover/ISyncSenderLogAnalyzer.java     |  47 --
 .../db/sync/sender/recover/ISyncSenderLogger.java  |  67 --
 .../sync/sender/recover/SyncSenderLogAnalyzer.java | 128 ----
 .../db/sync/sender/recover/SyncSenderLogger.java   |  72 --
 .../db/sync/sender/recovery/SenderLogAnalyzer.java | 172 +++++
 .../db/sync/sender/recovery/SenderLogger.java      | 141 ++++
 .../db/sync/sender/recovery/TsFilePipeLogger.java  | 150 ++++
 .../db/sync/sender/service/SenderService.java      | 417 +++++++++++
 .../db/sync/sender/service/TransportHandler.java   | 127 ++++
 .../iotdb/db/sync/sender/transfer/ISyncClient.java |  95 ---
 .../iotdb/db/sync/sender/transfer/SyncClient.java  | 810 ---------------------
 .../client/ITransportClient.java}                  |  19 +-
 .../db/sync/transport/client/TransportClient.java  | 527 ++++++++++++++
 .../conf/TransportConfig.java}                     |  26 +-
 .../transport/conf/TransportConstant.java}         |  24 +-
 .../server/TransportServerManager.java}            |  80 +-
 .../server/TransportServerManagerMBean.java}       |   6 +-
 .../server/TransportServerThriftHandler.java}      |  30 +-
 .../transport/server/TransportServiceImpl.java     | 385 ++++++++++
 .../org/apache/iotdb/db/utils/DataTypeUtils.java   |   8 +-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   8 +
 .../org/apache/iotdb/db/utils/FileLoaderUtils.java |   2 +-
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   | 181 ++++-
 .../org/apache/iotdb/db/utils/StatusUtils.java     |  46 --
 .../java/org/apache/iotdb/db/utils/SyncUtils.java  | 109 ---
 .../iotdb/db/wal/recover/WALNodeRecoverTask.java   |   4 +-
 .../db/mpp/operator/SchemaScanOperatorTest.java    | 242 ++++++
 .../db/mpp/sql/plan/DistributionPlannerTest.java   | 108 ++-
 .../db/mpp/sql/plan/FragmentInstanceSerdeTest.java |  16 +-
 .../iotdb/db/mpp/sql/plan/LogicalPlannerTest.java  |  85 +++
 .../sql/plan/node/PlanNodeDeserializeHelper.java   |   3 +-
 .../metadata/read/ShowDevicesNodeSerdeTest.java    |   3 +-
 .../sql/plan/node/process/OffsetNodeSerdeTest.java |   6 +-
 .../plan/node/sink/FragmentSinkNodeSerdeTest.java  |   3 +-
 .../source/SeriesAggregateScanNodeSerdeTest.java   |   2 +-
 .../iotdb/db/qp/physical/PhysicalPlanTest.java     |  28 +
 .../iotdb/db/service/InternalServiceImplTest.java  | 167 +++++
 .../sync/pipedata/BufferedPipeDataQueueTest.java   | 542 ++++++++++++++
 .../iotdb/db/sync/pipedata/PipeDataTest.java       |  86 +++
 .../db/sync/receiver/load/FileLoaderTest.java      | 405 -----------
 .../sync/receiver/manager/ReceiverManagerTest.java |  98 +++
 .../recover/SyncReceiverLogAnalyzerTest.java       | 229 ------
 .../receiver/recover/SyncReceiverLoggerTest.java   | 115 ---
 .../receiver/recovery/ReceiverLogAnalyzerTest.java | 124 ++++
 .../db/sync/sender/manage/SyncFileManagerTest.java | 350 ---------
 .../sender/recover/SyncSenderLogAnalyzerTest.java  | 201 -----
 .../sync/sender/recover/SyncSenderLoggerTest.java  | 112 ---
 .../db/sync/sender/transfer/SyncClientTest.java    | 161 ----
 .../db/sync/transport/TransportServiceTest.java    | 205 ++++++
 server/src/test/resources/iotdb-engine.properties  |   4 +-
 server/src/test/resources/logback-test.xml         |   2 +-
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |  15 +-
 .../iotdb/rpc/StatementExecutionException.java     |   4 +-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   4 +-
 .../src/main/thrift/confignode.thrift              |  67 +-
 thrift-influxdb/src/main/thrift/influxdb.thrift    |  36 +-
 thrift-sync/src/main/thrift/sync.thrift            |  51 --
 thrift-sync/src/main/thrift/transport.thrift       |  90 +++
 thrift/src/main/thrift/common.thrift               |  43 +-
 300 files changed, 14532 insertions(+), 6902 deletions(-)

diff --cc node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
index 115c4406c4,d98695824d..fa3e61ef10
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
@@@ -46,16 -45,19 +45,18 @@@ public class RegionReplicaSet 
      this.dataNodeList = dataNodeList;
    }
  
-   public ConsensusGroupId getId() {
-     return id;
+   public ConsensusGroupId getConsensusGroupId() {
+     return consensusGroupId;
    }
  
-   public void setId(ConsensusGroupId id) {
-     this.id = id;
+   public void setConsensusGroupId(ConsensusGroupId consensusGroupId) {
+     this.consensusGroupId = consensusGroupId;
    }
  
+   @Override
    public String toString() {
-     return String.format("RegionReplicaSet[%s-%s]: %s", id.getType(), id, dataNodeList);
+     return String.format(
 -        "RegionReplicaSet[%s-%d]: %s",
 -        consensusGroupId.getType(), consensusGroupId.getId(), dataNodeList);
++        "RegionReplicaSet[%s-%s]: %s", consensusGroupId.getType(), consensusGroupId, dataNodeList);
    }
  
    public void serializeImpl(ByteBuffer buffer) {
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index f87979a2a2,4ef647dc7e..7836d3460f
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@@ -22,9 -22,19 +22,22 @@@ import org.apache.iotdb.commons.partiti
  import org.apache.iotdb.db.mpp.common.MPPQueryContext;
  import org.apache.iotdb.db.mpp.common.PlanFragmentId;
  import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 +import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
- import org.apache.iotdb.db.mpp.sql.planner.plan.*;
- import org.apache.iotdb.db.mpp.sql.planner.plan.node.*;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.IFragmentParallelPlaner;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.SimpleFragmentParallelPlanner;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.SubPlan;
++import org.apache.iotdb.db.mpp.sql.planner.plan.WriteFragmentParallelPlanner;
++import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.node.SimplePlanNodeRewriter;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaScanNode;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
@@@ -83,9 -92,8 +98,10 @@@ public class DistributionPlanner 
    // Convert fragment to detailed instance
    // And for parallel-able fragment, clone it into several instances with different params.
    public List<FragmentInstance> planFragmentInstances(SubPlan subPlan) {
-     IFragmentParallelPlaner parallelPlaner = context.getQueryType() == QueryType.READ ?
-         new SimpleFragmentParallelPlanner(subPlan, analysis, context) :
-         new WriteFragmentParallelPlanner(subPlan, analysis, context);
+     IFragmentParallelPlaner parallelPlaner =
 -        new SimpleFragmentParallelPlanner(subPlan, analysis, context);
++        context.getQueryType() == QueryType.READ
++            ? new SimpleFragmentParallelPlanner(subPlan, analysis, context)
++            : new WriteFragmentParallelPlanner(subPlan, analysis, context);
      return parallelPlaner.parallelPlan();
    }
  
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index c263732b00,3e55a5d32c..72177f06c8
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@@ -30,9 -31,8 +30,7 @@@ import org.apache.iotdb.tsfile.read.fil
  import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
  import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
  
- import java.io.IOException;
  import java.nio.ByteBuffer;
--import java.util.Objects;
  
  public class FragmentInstance implements IConsensusRequest {
    private final FragmentInstanceId id;
@@@ -48,21 -50,23 +48,31 @@@
    // We can add some more params for a specific FragmentInstance
    // So that we can make different FragmentInstance owns different data range.
  
-   public FragmentInstance(PlanFragment fragment, FragmentInstanceId id, Filter timeFilter, QueryType type) {
 -  public FragmentInstance(PlanFragment fragment, int index, Filter timeFilter, QueryType type) {
++  public FragmentInstance(
++      PlanFragment fragment, FragmentInstanceId id, Filter timeFilter, QueryType type) {
      this.fragment = fragment;
      this.timeFilter = timeFilter;
 -    this.id = generateId(fragment.getId(), index);
 +    this.id = id;
      this.type = type;
    }
  
 -  public static FragmentInstanceId generateId(PlanFragmentId id, int index) {
 -    return new FragmentInstanceId(id, String.valueOf(index));
 +  public RegionReplicaSet getDataRegionId() {
-     return dataRegion;
++    return regionReplicaSet;
++  }
++
++  public void setDataRegionAndHost(RegionReplicaSet regionReplicaSet) {
++    this.regionReplicaSet = regionReplicaSet;
++    // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current
++    // instance
++    this.hostEndpoint = regionReplicaSet.getDataNodeList().get(0).getEndPoint();
    }
  
-   public void setDataRegionAndHost(RegionReplicaSet dataRegion) {
-     this.dataRegion = dataRegion;
-     // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current instance
-     this.hostEndpoint = dataRegion.getDataNodeList().get(0).getEndPoint();
+   public RegionReplicaSet getRegionReplicaSet() {
+     return regionReplicaSet;
+   }
+ 
+   public void setRegionReplicaSet(RegionReplicaSet regionReplicaSet) {
+     this.regionReplicaSet = regionReplicaSet;
    }
  
    public Endpoint getHostEndpoint() {
@@@ -120,17 -133,10 +135,9 @@@
      Filter timeFilter = hasTimeFilter ? FilterFactory.deserialize(buffer) : null;
      QueryType queryType = QueryType.values()[ReadWriteIOUtils.readInt(buffer)];
      FragmentInstance fragmentInstance =
 -        new FragmentInstance(
 -            planFragment, Integer.parseInt(id.getInstanceId()), timeFilter, queryType);
 +        new FragmentInstance(planFragment, id, timeFilter, queryType);
-     RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
-     try {
-       regionReplicaSet.deserializeImpl(buffer);
-     } catch (IOException e) {
-       e.printStackTrace();
-     }
-     Endpoint endpoint = new Endpoint();
-     endpoint.deserializeImpl(buffer);
-     fragmentInstance.dataRegion = regionReplicaSet;
-     fragmentInstance.hostEndpoint = endpoint;
+     fragmentInstance.regionReplicaSet = RegionReplicaSet.deserializeImpl(buffer);
+     fragmentInstance.hostEndpoint = Endpoint.deserializeImpl(buffer);
  
      return fragmentInstance;
    }
@@@ -144,20 -150,25 +151,7 @@@
        timeFilter.serialize(buffer);
      }
      ReadWriteIOUtils.write(type.ordinal(), buffer);
-     dataRegion.serializeImpl(buffer);
+     regionReplicaSet.serializeImpl(buffer);
      hostEndpoint.serializeImpl(buffer);
    }
--
--  @Override
--  public boolean equals(Object o) {
--    if (this == o) return true;
--    if (o == null || getClass() != o.getClass()) return false;
--    FragmentInstance instance = (FragmentInstance) o;
-     return Objects.equals(id, instance.id) && type == instance.type && Objects.equals(fragment, instance.fragment) && Objects.equals(dataRegion, instance.dataRegion) && Objects.equals(hostEndpoint, instance.hostEndpoint) && Objects.equals(timeFilter, instance.timeFilter);
 -    return Objects.equals(id, instance.id)
 -        && type == instance.type
 -        && Objects.equals(fragment, instance.fragment)
 -        && Objects.equals(regionReplicaSet, instance.regionReplicaSet)
 -        && Objects.equals(hostEndpoint, instance.hostEndpoint)
 -        && Objects.equals(timeFilter, instance.timeFilter);
--  }
--
--  @Override
--  public int hashCode() {
-     return Objects.hash(id, type, fragment, dataRegion, hostEndpoint, timeFilter);
 -    return Objects.hash(id, type, fragment, regionReplicaSet, hostEndpoint, timeFilter);
--  }
  }
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index 280d9891af,d3357ec275..631b217695
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@@ -99,7 -100,11 +99,7 @@@ public class SimpleFragmentParallelPlan
      // We need to store all the replica host in case of the scenario that the instance need to be
      // redirected
      // to another host when scheduling
-     fragmentInstance.setDataRegionAndHost(dataRegion);
 -    fragmentInstance.setRegionReplicaSet(regionReplicaSet);
 -
 -    // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current
 -    // instance
 -    fragmentInstance.setHostEndpoint(regionReplicaSet.getDataNodeList().get(0).getEndPoint());
++    fragmentInstance.setDataRegionAndHost(regionReplicaSet);
      instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
      fragmentInstanceList.add(fragmentInstance);
    }
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
index eebf705cd3,0000000000..d8c9435214
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
@@@ -1,61 -1,0 +1,70 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.db.mpp.sql.planner.plan;
 +
 +import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 +import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 +import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
 +import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 +import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 +import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 +
 +import java.util.ArrayList;
 +import java.util.List;
 +
- public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner{
++public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner {
 +
 +  private SubPlan subPlan;
 +  private Analysis analysis;
 +  private MPPQueryContext queryContext;
 +
-   public WriteFragmentParallelPlanner(SubPlan subPlan, Analysis analysis, MPPQueryContext queryContext) {
++  public WriteFragmentParallelPlanner(
++      SubPlan subPlan, Analysis analysis, MPPQueryContext queryContext) {
 +    this.subPlan = subPlan;
 +    this.analysis = analysis;
 +    this.queryContext = queryContext;
 +  }
 +
 +  @Override
 +  public List<FragmentInstance> parallelPlan() {
 +    PlanFragment fragment = subPlan.getPlanFragment();
-     Filter timeFilter = analysis.getQueryFilter() != null ? ((GlobalTimeExpression) analysis.getQueryFilter()).getFilter() : null;
++    Filter timeFilter =
++        analysis.getQueryFilter() != null
++            ? ((GlobalTimeExpression) analysis.getQueryFilter()).getFilter()
++            : null;
 +    PlanNode node = fragment.getRoot();
 +    if (!(node instanceof IWritePlanNode)) {
 +      throw new IllegalArgumentException("PlanNode should be IWritePlanNode in WRITE operation");
 +    }
 +    List<IWritePlanNode> splits = ((IWritePlanNode) node).splitByPartition(analysis);
 +    List<FragmentInstance> ret = new ArrayList<>();
 +    for (IWritePlanNode split : splits) {
-       FragmentInstance instance = new FragmentInstance(new PlanFragment(fragment.getId(), split), fragment.getId().genFragmentInstanceId(), timeFilter, queryContext.getQueryType());
++      FragmentInstance instance =
++          new FragmentInstance(
++              new PlanFragment(fragment.getId(), split),
++              fragment.getId().genFragmentInstanceId(),
++              timeFilter,
++              queryContext.getQueryType());
 +      instance.setDataRegionAndHost(((IWritePlanNode) node).getRegionReplicaSet());
 +      ret.add(instance);
 +    }
 +    return ret;
 +  }
 +}
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
index 49b1065a05,47fbed6dff..3fbbf98f0f
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
@@@ -20,8 -20,8 +20,9 @@@ package org.apache.iotdb.db.mpp.sql.pla
  
  import org.apache.iotdb.common.rpc.thrift.TSStatus;
  import org.apache.iotdb.commons.partition.RegionReplicaSet;
+ import org.apache.iotdb.commons.utils.StatusUtils;
  import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 +import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
  import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@@ -112,9 -112,9 +113,9 @@@ public class InsertMultiTabletsNode ext
      Map<RegionReplicaSet, InsertMultiTabletsNode> splitMap = new HashMap<>();
      for (int i = 0; i < insertTabletNodeList.size(); i++) {
        InsertTabletNode insertTabletNode = insertTabletNodeList.get(i);
 -      List<InsertNode> tmpResult = insertTabletNode.splitByPartition(analysis);
 -      for (InsertNode subNode : tmpResult) {
 -        RegionReplicaSet dataRegionReplicaSet = subNode.getDataRegionReplicaSet();
 +      List<IWritePlanNode> tmpResult = insertTabletNode.splitByPartition(analysis);
 +      for (IWritePlanNode subNode : tmpResult) {
-         RegionReplicaSet dataRegionReplicaSet = ((InsertNode)subNode).getDataRegionReplicaSet();
++        RegionReplicaSet dataRegionReplicaSet = ((InsertNode) subNode).getDataRegionReplicaSet();
          if (splitMap.containsKey(dataRegionReplicaSet)) {
            InsertMultiTabletsNode tmpNode = splitMap.get(dataRegionReplicaSet);
            tmpNode.addInsertTabletNode((InsertTabletNode) subNode, i);
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
index d7eef6b50b,30b091d83b..2aead26a69
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
@@@ -26,9 -27,11 +26,10 @@@ import org.apache.iotdb.db.mpp.sql.plan
  import org.apache.iotdb.tsfile.exception.NotImplementedException;
  import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
  import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+ 
  import java.nio.ByteBuffer;
 -import java.util.List;
  
 -public abstract class InsertNode extends PlanNode {
 +public abstract class InsertNode extends IWritePlanNode {
  
    /**
     * if use id table, this filed is id form of device path <br>
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
index 3cba509506,b3012c9986..4a8583d999
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
@@@ -20,9 -20,9 +20,10 @@@ package org.apache.iotdb.db.mpp.sql.pla
  
  import org.apache.iotdb.common.rpc.thrift.TSStatus;
  import org.apache.iotdb.commons.partition.RegionReplicaSet;
+ import org.apache.iotdb.commons.utils.StatusUtils;
  import org.apache.iotdb.db.engine.StorageEngine;
  import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 +import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
  import org.apache.iotdb.tsfile.exception.NotImplementedException;
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 8dc7fd7671,3c22c0fd57..3b5079e625
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@@ -20,9 -20,9 +20,10 @@@ package org.apache.iotdb.db.mpp.sql.pla
  
  import org.apache.iotdb.common.rpc.thrift.TSStatus;
  import org.apache.iotdb.commons.partition.RegionReplicaSet;
+ import org.apache.iotdb.commons.utils.StatusUtils;
  import org.apache.iotdb.db.engine.StorageEngine;
  import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 +import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
  import org.apache.iotdb.tsfile.exception.NotImplementedException;
diff --cc server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index ed192c357b,51a2313f08..39d9c6a71d
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@@ -41,13 -46,10 +46,12 @@@ import org.apache.iotdb.db.mpp.sql.plan
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
  import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
- import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertNode;
 +import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
  import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
  import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
++import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
  
  import com.google.common.collect.Sets;
- import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
  import org.junit.Test;
  
  import java.util.ArrayList;
@@@ -205,35 -254,6 +256,32 @@@ public class DistributionPlannerTest 
      assertEquals(3, plan.getInstances().size());
    }
  
 +  @Test
 +  public void TestWriteParallelPlan() throws IllegalPathException {
 +    QueryId queryId = new QueryId("test_write");
-     InsertRowNode insertRowNode = new InsertRowNode(
-         queryId.genPlanNodeId(),
-         new PartialPath("root.sg.d1"),
-         false,
-         new MeasurementSchema[]{
-             new MeasurementSchema("s1", TSDataType.INT32),
-         },
-         new TSDataType[]{
-             TSDataType.INT32
-         },
-         1L,
-         new Object[]{
-             10
-         });
++    InsertRowNode insertRowNode =
++        new InsertRowNode(
++            queryId.genPlanNodeId(),
++            new PartialPath("root.sg.d1"),
++            false,
++            new MeasurementSchema[] {
++              new MeasurementSchema("s1", TSDataType.INT32),
++            },
++            new TSDataType[] {TSDataType.INT32},
++            1L,
++            new Object[] {10});
 +
 +    Analysis analysis = constructAnalysis();
 +
 +    MPPQueryContext context =
 +        new MPPQueryContext("", queryId, null, QueryType.WRITE, new Endpoint());
 +    DistributionPlanner planner =
 +        new DistributionPlanner(analysis, new LogicalQueryPlan(context, insertRowNode));
 +    DistributedQueryPlan plan = planner.planFragments();
 +    plan.getInstances().forEach(System.out::println);
 +    assertEquals(1, plan.getInstances().size());
 +  }
 +
    private Analysis constructAnalysis() {
      Analysis analysis = new Analysis();
  
diff --cc server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
index f57f60dda6,6871ff6560..4bf6a477e2
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
@@@ -18,8 -18,6 +18,7 @@@
   */
  package org.apache.iotdb.db.mpp.sql.plan;
  
- import com.google.common.collect.ImmutableList;
 +import org.apache.iotdb.commons.cluster.DataNodeLocation;
  import org.apache.iotdb.commons.cluster.Endpoint;
  import org.apache.iotdb.commons.consensus.DataRegionId;
  import org.apache.iotdb.commons.partition.RegionReplicaSet;
@@@ -45,6 -43,6 +44,7 @@@ import org.apache.iotdb.tsfile.read.exp
  import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
  import org.apache.iotdb.tsfile.read.filter.operator.Gt;
  
++import com.google.common.collect.ImmutableList;
  import org.junit.Test;
  
  import java.nio.ByteBuffer;
@@@ -64,8 -61,9 +64,10 @@@ public class FragmentInstanceSerdeTest 
              new GroupByFilter(1, 2, 3, 4),
              QueryType.READ);
      RegionReplicaSet regionReplicaSet =
-         new RegionReplicaSet(new DataRegionId(1), ImmutableList.of(new DataNodeLocation(0, new Endpoint("127.0.0.1", 6666))));
 -        new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
 -    fragmentInstance.setRegionReplicaSet(regionReplicaSet);
 -    fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.1", 6666));
++        new RegionReplicaSet(
++            new DataRegionId(1),
++            ImmutableList.of(new DataNodeLocation(0, new Endpoint("127.0.0.1", 6666))));
 +    fragmentInstance.setDataRegionAndHost(regionReplicaSet);
  
      ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
      fragmentInstance.serializeRequest(byteBuffer);
@@@ -84,8 -81,9 +86,10 @@@
              null,
              QueryType.READ);
      RegionReplicaSet regionReplicaSet =
-         new RegionReplicaSet(new DataRegionId(1), ImmutableList.of(new DataNodeLocation(0, new Endpoint("127.0.0.2", 6667))));
 -        new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
 -    fragmentInstance.setRegionReplicaSet(regionReplicaSet);
 -    fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.2", 6667));
++        new RegionReplicaSet(
++            new DataRegionId(1),
++            ImmutableList.of(new DataNodeLocation(0, new Endpoint("127.0.0.2", 6667))));
 +    fragmentInstance.setDataRegionAndHost(regionReplicaSet);
  
      ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
      fragmentInstance.serializeRequest(byteBuffer);
diff --cc server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
index 0000000000,7ed08d65b6..d5ba4d1f07
mode 000000,100644..100644
--- a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@@ -1,0 -1,164 +1,167 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ 
+ package org.apache.iotdb.db.service;
+ 
+ import org.apache.iotdb.commons.cluster.DataNodeLocation;
+ import org.apache.iotdb.commons.cluster.Endpoint;
+ import org.apache.iotdb.commons.consensus.SchemaRegionId;
+ import org.apache.iotdb.commons.partition.RegionReplicaSet;
+ import org.apache.iotdb.consensus.common.Peer;
+ import org.apache.iotdb.db.conf.IoTDBConfig;
+ import org.apache.iotdb.db.conf.IoTDBDescriptor;
+ import org.apache.iotdb.db.consensus.ConsensusImpl;
+ import org.apache.iotdb.db.exception.metadata.MetadataException;
+ import org.apache.iotdb.db.metadata.LocalConfigNode;
+ import org.apache.iotdb.db.metadata.path.PartialPath;
+ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+ import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+ import org.apache.iotdb.db.utils.EnvironmentUtils;
+ import org.apache.iotdb.mpp.rpc.thrift.TConsensusGroupId;
+ import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
+ import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
+ import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+ import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
+ 
+ import org.apache.ratis.util.FileUtils;
+ import org.junit.After;
+ import org.junit.Assert;
+ import org.junit.Before;
+ import org.junit.Test;
+ 
+ import java.io.File;
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.List;
+ 
+ public class InternalServiceImplTest {
+   private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
+   InternalServiceImpl internalServiceImpl;
+   LocalConfigNode configNode;
+ 
+   @Before
+   public void setUp() throws Exception {
+     IoTDB.configManager.init();
+     configNode = LocalConfigNode.getInstance();
+     configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln"));
+     ConsensusImpl.getInstance().start();
+     RegionReplicaSet regionReplicaSet = genRegionReplicaSet();
+     ConsensusImpl.getInstance()
+         .addConsensusGroup(regionReplicaSet.getConsensusGroupId(), genPeerList(regionReplicaSet));
+     internalServiceImpl = new InternalServiceImpl();
+   }
+ 
+   @After
+   public void tearDown() throws Exception {
+     IoTDB.configManager.clear();
+     RegionReplicaSet regionReplicaSet = genRegionReplicaSet();
+     ConsensusImpl.getInstance().removeConsensusGroup(regionReplicaSet.getConsensusGroupId());
+     ConsensusImpl.getInstance().stop();
+     EnvironmentUtils.cleanEnv();
+     FileUtils.deleteFully(new File("data" + File.separator + "consensus"));
+   }
+ 
+   @Test
+   public void createTimeseriesTest() throws MetadataException {
+     configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln"));
+     CreateTimeSeriesNode createTimeSeriesNode =
+         new CreateTimeSeriesNode(
+             new PlanNodeId("0"),
+             new PartialPath("root.ln.wf01.wt01.status"),
+             TSDataType.BOOLEAN,
+             TSEncoding.PLAIN,
+             CompressionType.SNAPPY,
+             new HashMap<String, String>() {
+               {
+                 put("MAX_POINT_NUMBER", "3");
+               }
+             },
+             new HashMap<String, String>() {
+               {
+                 put("tag1", "v1");
+                 put("tag2", "v2");
+               }
+             },
+             new HashMap<String, String>() {
+               {
+                 put("attr1", "a1");
+                 put("attr2", "a2");
+               }
+             },
+             "meter1");
+ 
+     RegionReplicaSet regionReplicaSet = genRegionReplicaSet();
+     PlanFragment planFragment = new PlanFragment(new PlanFragmentId("2", 3), createTimeSeriesNode);
+     FragmentInstance fragmentInstance =
 -        new FragmentInstance(planFragment, 4, new GroupByFilter(1, 2, 3, 4), QueryType.WRITE);
 -    fragmentInstance.setRegionReplicaSet(regionReplicaSet);
 -    fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.1", 6666));
++        new FragmentInstance(
++            planFragment,
++            planFragment.getId().genFragmentInstanceId(),
++            new GroupByFilter(1, 2, 3, 4),
++            QueryType.WRITE);
++    fragmentInstance.setDataRegionAndHost(regionReplicaSet);
+ 
+     // serialize fragmentInstance
+     ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+     fragmentInstance.serializeRequest(byteBuffer);
+     byteBuffer.flip();
+ 
+     // put serialized fragmentInstance to TSendFragmentInstanceReq
+     TSendFragmentInstanceReq request = new TSendFragmentInstanceReq();
+     TFragmentInstance tFragmentInstance = new TFragmentInstance();
+     tFragmentInstance.setBody(byteBuffer);
+     request.setFragmentInstance(tFragmentInstance);
+     request.setConsensusGroupId(
+         new TConsensusGroupId(
+             regionReplicaSet.getConsensusGroupId().getId(),
+             regionReplicaSet.getConsensusGroupId().getType().toString()));
+     request.setQueryType(QueryType.WRITE.toString());
+ 
+     // Use consensus layer to execute request
+     TSendFragmentInstanceResp response = internalServiceImpl.sendFragmentInstance(request);
+ 
+     Assert.assertTrue(response.accepted);
+   }
+ 
+   private RegionReplicaSet genRegionReplicaSet() {
+     List<DataNodeLocation> dataNodeList = new ArrayList<>();
+     dataNodeList.add(
+         new DataNodeLocation(
+             conf.getConsensusPort(), new Endpoint(conf.getInternalIp(), conf.getConsensusPort())));
+ 
+     // construct fragmentInstance
+     SchemaRegionId schemaRegionId = new SchemaRegionId(0);
+     return new RegionReplicaSet(schemaRegionId, dataNodeList);
+   }
+ 
+   private List<Peer> genPeerList(RegionReplicaSet regionReplicaSet) {
+     List<Peer> peerList = new ArrayList<>();
+     for (DataNodeLocation node : regionReplicaSet.getDataNodeList()) {
+       peerList.add(new Peer(regionReplicaSet.getConsensusGroupId(), node.getEndPoint()));
+     }
+     return peerList;
+   }
+ }