You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/13 14:43:49 UTC
[iotdb] 01/03: Merge branch 'xingtanzjr/modification_pr' into xingtanzjr/write_instance_parallel
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/write_instance_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d32fc0713cd568c35cf7e43edcaf1eb8537a94f0
Merge: 850198002e c5192db130
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 13 21:00:22 2022 +0800
Merge branch 'xingtanzjr/modification_pr' into xingtanzjr/write_instance_parallel
.github/workflows/influxdb-protocol.yml | 2 +-
.github/workflows/main-unix.yml | 3 +-
.github/workflows/main-win.yml | 3 +-
.../apache/iotdb/db/qp/sql/InfluxDBSqlParser.g4 | 2 +-
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 67 +-
.../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 24 +
.../org/apache/iotdb/cluster/ClusterIoTDB.java | 1 -
.../apache/iotdb/cluster/query/ClusterPlanner.java | 1 +
.../apache/iotdb/cluster/utils/StatusUtils.java | 6 +-
confignode/pom.xml | 5 +
.../resources/conf/iotdb-confignode.properties | 13 +-
.../iotdb/confignode/conf/ConfigNodeConf.java | 14 +
.../confignode/conf/ConfigNodeDescriptor.java | 4 +
.../consensus/response/DataPartitionDataSet.java | 22 +-
.../consensus/response/SchemaPartitionDataSet.java | 77 +-
.../response/StorageGroupSchemaDataSet.java | 12 +-
.../iotdb/confignode/manager/ConfigManager.java | 114 ++-
.../iotdb/confignode/manager/ConsensusManager.java | 36 +-
.../apache/iotdb/confignode/manager/Manager.java | 24 +-
.../iotdb/confignode/manager/PartitionManager.java | 130 ++--
.../iotdb/confignode/manager/RegionManager.java | 6 +-
.../confignode/partition/StorageGroupSchema.java | 9 +
.../persistence/PartitionInfoPersistence.java | 60 +-
.../persistence/RegionInfoPersistence.java | 29 +-
.../iotdb/confignode/physical/PhysicalPlan.java | 2 +-
.../physical/crud/CreateDataPartitionPlan.java | 5 +-
.../physical/crud/CreateRegionsPlan.java | 5 +-
.../physical/crud/CreateSchemaPartitionPlan.java | 70 +-
.../crud/GetOrCreateDataPartitionPlan.java | 6 +-
.../crud/GetOrCreateSchemaPartitionPlan.java | 86 ++-
.../confignode/service/executor/PlanExecutor.java | 4 +-
.../server/ConfigNodeRPCServerProcessor.java | 64 +-
.../confignode/consensus/RatisConsensusDemo.java | 10 +-
.../hash/DeviceGroupHashExecutorManualTest.java | 5 +-
.../physical/SerializeDeserializeUT.java | 45 +-
.../server/ConfigNodeRPCServerProcessorTest.java | 284 +++++---
.../iotdb/consensus/ratis/RequestMessage.java | 1 +
distribution/pom.xml | 6 +
docs/UserGuide/Maintenance-Tools/Sync-Tool.md | 396 +++++++---
docs/zh/UserGuide/Maintenance-Tools/Sync-Tool.md | 493 ++++++++++---
influxdb-protocol/pom.xml | 38 +-
.../iotdb/influxdb/protocol/dto/SessionPoint.java | 6 +-
.../protocol/impl/IoTDBInfluxDBService.java | 12 +-
.../iotdb/influxdb/session/InfluxDBSession.java | 42 +-
.../sync/IoTDBSyncReceiverCollectorIT.java | 513 +++++++++++++
.../db/integration/sync/IoTDBSyncReceiverIT.java | 357 +++++++++
.../sync/IoTDBSyncReceiverLoaderIT.java | 210 ++++++
.../db/integration/sync/IoTDBSyncSenderIT.java | 304 ++++++++
.../iotdb/db/integration/sync/SyncTestUtil.java | 203 ++++++
.../db/integration/sync/TransportClientMock.java | 69 ++
.../src/test/resources/iotdb-engine.properties | 3 +-
integration/src/test/resources/logback-test.xml | 2 +-
.../iotdb/commons/cluster/DataNodeLocation.java | 6 +-
.../org/apache/iotdb/commons/cluster/Endpoint.java | 6 +-
.../iotdb/commons/concurrent/ThreadName.java | 6 +-
.../apache/iotdb/commons/conf/IoTDBConstant.java | 15 +
.../iotdb/commons/consensus/ConsensusGroupId.java | 5 +-
.../iotdb/commons/partition/DataPartition.java | 4 +-
.../iotdb/commons/partition/RegionReplicaSet.java | 42 +-
.../iotdb/commons/partition/SchemaPartition.java | 120 ++-
.../apache/iotdb/commons/service/ServiceType.java | 2 +
.../apache/iotdb/commons/utils/CommonUtils.java | 4 +-
.../apache/iotdb/commons}/utils/StatusUtils.java | 49 +-
pom.xml | 1 +
server/pom.xml | 5 +
.../resources/conf/iotdb-engine.properties | 19 +-
.../resources/conf/iotdb-sync-client.properties | 35 -
.../assembly/resources/tools/start-sync-client.bat | 71 --
.../assembly/resources/tools/start-sync-client.sh | 54 --
.../assembly/resources/tools/stop-sync-client.bat | 23 -
.../assembly/resources/tools/stop-sync-client.sh | 30 -
.../apache/iotdb/db/client/ConfigNodeClient.java | 309 ++++++++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 81 ++-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 43 +-
.../consensus/statemachine/BaseStateMachine.java | 2 +
.../statemachine/DataRegionStateMachine.java | 37 +-
.../statemachine/SchemaRegionStateMachine.java | 17 +-
.../org/apache/iotdb/db/engine/StorageEngine.java | 16 +-
.../apache/iotdb/db/engine/StorageEngineV2.java | 62 +-
.../iotdb/db/engine/modification/Deletion.java | 23 +
.../iotdb/db/engine/storagegroup/DataRegion.java | 183 +++--
.../db/engine/storagegroup/TsFileManager.java | 38 +
.../db/engine/storagegroup/TsFileProcessor.java | 23 +
.../dataregion/StorageGroupManager.java | 10 +
.../sync/PipeDataLoadBearableException.java} | 10 +-
.../sync/PipeDataLoadException.java} | 10 +-
.../sync/PipeDataLoadUnbearableException.java} | 10 +-
.../sync/PipeException.java} | 20 +-
.../sync/PipeServerException.java} | 22 +-
.../PipeSinkException.java} | 24 +-
.../iotdb/db/metadata/Executor/SchemaVisitor.java | 96 +++
...ocalConfigManager.java => LocalConfigNode.java} | 170 ++---
.../db/metadata/LocalSchemaPartitionTable.java | 4 +-
.../iotdb/db/metadata/LocalSchemaProcessor.java | 28 +-
.../iotdb/db/metadata/mtree/MTreeBelowSG.java | 1 +
.../traverser/collector/MeasurementCollector.java | 4 +
.../db/metadata/schemaregion/SchemaEngine.java | 27 +-
.../db/metadata/schemaregion/SchemaRegion.java | 20 +
.../storagegroup/IStorageGroupSchemaManager.java | 8 +
.../storagegroup/StorageGroupSchemaManager.java | 28 +
.../iotdb/db/mpp/buffer/DataBlockManager.java | 7 +-
.../iotdb/db/mpp/buffer/IDataBlockManager.java | 5 +
.../apache/iotdb/db/mpp/common/PlanFragmentId.java | 10 +-
.../org/apache/iotdb/db/mpp/common/QueryId.java | 8 +-
.../apache/iotdb/db/mpp/execution/DataDriver.java | 3 +-
.../db/mpp/execution/FragmentInstanceManager.java | 4 +-
.../iotdb/db/mpp/execution/QueryExecution.java | 22 +-
.../db/mpp/execution/SchemaDriverContext.java | 8 +-
.../scheduler/SimpleFragInstanceDispatcher.java | 4 +-
.../operator/schema/DevicesSchemaScanOperator.java | 97 +++
.../mpp/operator/schema/SchemaMergeOperator.java | 79 ++
.../db/mpp/operator/schema/SchemaScanOperator.java | 107 +++
.../schema/TimeSeriesSchemaScanOperator.java | 142 ++++
.../db/mpp/operator/source/SeriesScanOperator.java | 2 +-
.../apache/iotdb/db/mpp/sql/analyze/Analyzer.java | 42 +-
.../db/mpp/sql/analyze/ClusterSchemaFetcher.java | 7 +-
.../db/mpp/sql/analyze/FakeSchemaFetcherImpl.java | 7 +-
.../iotdb/db/mpp/sql/analyze/ISchemaFetcher.java | 7 +-
.../mpp/sql/analyze/StandaloneSchemaFetcher.java | 7 +-
.../apache/iotdb/db/mpp/sql/parser/ASTVisitor.java | 5 +
.../db/mpp/sql/planner/DistributionPlanner.java | 133 +++-
.../db/mpp/sql/planner/LocalExecutionPlanner.java | 65 +-
.../iotdb/db/mpp/sql/planner/LogicalPlanner.java | 47 ++
.../iotdb/db/mpp/sql/planner/QueryPlanBuilder.java | 42 ++
.../db/mpp/sql/planner/plan/FragmentInstance.java | 66 +-
.../db/mpp/sql/planner/plan/PlanFragment.java | 10 +-
.../plan/SimpleFragmentParallelPlanner.java | 6 +-
.../planner/plan/WriteFragmentParallelPlanner.java | 17 +-
.../db/mpp/sql/planner/plan/node/PlanNode.java | 1 +
.../db/mpp/sql/planner/plan/node/PlanNodeType.java | 15 +-
.../db/mpp/sql/planner/plan/node/PlanNodeUtil.java | 3 +
.../db/mpp/sql/planner/plan/node/PlanVisitor.java | 30 +
.../node/metedata/read/DevicesSchemaScanNode.java | 108 +++
.../read/SchemaMergeNode.java} | 44 +-
.../plan/node/metedata/read/SchemaScanNode.java | 119 +++
.../metedata/read/TimeSeriesSchemaScanNode.java | 145 ++++
.../node/metedata/write/AlterTimeSeriesNode.java | 37 +
.../write/CreateAlignedTimeSeriesNode.java | 13 +
.../node/metedata/write/CreateTimeSeriesNode.java | 127 +++-
.../planner/plan/node/process/ExchangeNode.java | 1 +
.../planner/plan/node/sink/FragmentSinkNode.java | 1 +
.../plan/node/source/SeriesAggregateScanNode.java | 10 +-
.../planner/plan/node/source/SeriesScanNode.java | 15 +-
.../sql/planner/plan/node/source/SourceNode.java | 4 +-
.../plan/node/write/InsertMultiTabletsNode.java | 7 +-
.../sql/planner/plan/node/write/InsertNode.java | 1 +
.../planner/plan/node/write/InsertRowsNode.java | 5 +
.../plan/node/write/InsertRowsOfOneDeviceNode.java | 5 +
.../db/mpp/sql/statement/StatementVisitor.java | 10 +
.../crud/InsertMultiTabletsStatement.java | 8 +
.../crud/InsertRowsOfOneDeviceStatement.java | 1 +
.../sql/statement/crud/InsertRowsStatement.java | 8 +
.../db/mpp/sql/statement/crud/InsertStatement.java | 1 +
.../db/mpp/sql/statement/crud/QueryStatement.java | 1 +
.../metadata/AlterTimeSeriesStatement.java | 1 +
.../metadata/CreateAlignedTimeSeriesStatement.java | 1 +
.../metadata/CreateTimeSeriesStatement.java | 1 +
.../statement/metadata/ShowDevicesStatement.java | 6 +
.../mpp/sql/statement/metadata/ShowStatement.java | 10 +
.../metadata/ShowTimeSeriesStatement.java | 6 +
.../db/protocol/influxdb/handler/QueryHandler.java | 6 +-
.../apache/iotdb/db/qp/constant/SQLConstant.java | 26 +
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 189 ++++-
.../org/apache/iotdb/db/qp/logical/Operator.java | 14 +-
.../db/qp/logical/sys/CreatePipeOperator.java | 69 ++
.../db/qp/logical/sys/CreatePipeSinkOperator.java | 61 ++
.../iotdb/db/qp/logical/sys/DropPipeOperator.java | 24 +-
.../db/qp/logical/sys/DropPipeSinkOperator.java | 23 +-
.../iotdb/db/qp/logical/sys/ShowPipeOperator.java | 27 +-
.../db/qp/logical/sys/ShowPipeServerOperator.java | 20 +-
.../db/qp/logical/sys/ShowPipeSinkOperator.java | 27 +-
.../qp/logical/sys/ShowPipeSinkTypeOperator.java | 20 +-
.../iotdb/db/qp/logical/sys/StartPipeOperator.java | 24 +-
.../db/qp/logical/sys/StartPipeServerOperator.java | 18 +-
.../iotdb/db/qp/logical/sys/StopPipeOperator.java | 24 +-
.../db/qp/logical/sys/StopPipeServerOperator.java | 18 +-
.../apache/iotdb/db/qp/physical/PhysicalPlan.java | 10 +
.../qp/physical/crud/InsertMultiTabletsPlan.java | 2 +-
.../iotdb/db/qp/physical/crud/InsertRowsPlan.java | 2 +-
.../physical/sys/CreateAlignedTimeSeriesPlan.java | 15 +-
.../qp/physical/sys/CreateMultiTimeSeriesPlan.java | 2 +-
.../iotdb/db/qp/physical/sys/CreatePipePlan.java | 110 +++
.../db/qp/physical/sys/CreatePipeSinkPlan.java | 98 +++
.../db/qp/physical/sys/CreateTimeSeriesPlan.java | 10 +
.../db/qp/physical/sys/DeleteTimeSeriesPlan.java | 2 +-
.../iotdb/db/qp/physical/sys/DropPipeSinkPlan.java | 29 +-
.../iotdb/db/qp/physical/sys/OperatePipePlan.java | 29 +-
.../physical/sys/ShowPipePlan.java} | 22 +-
.../physical/sys/ShowPipeServerPlan.java} | 11 +-
.../physical/sys/ShowPipeSinkPlan.java} | 23 +-
.../physical/sys/ShowPipeSinkTypePlan.java} | 11 +-
.../apache/iotdb/db/qp/physical/sys/ShowPlan.java | 6 +-
.../db/qp/physical/sys/StartPipeServerPlan.java | 56 ++
.../db/qp/physical/sys/StopPipeServerPlan.java | 56 ++
.../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java | 190 ++++-
.../apache/iotdb/db/qp/utils/DatetimeUtils.java | 15 +
.../java/org/apache/iotdb/db/service/DataNode.java | 67 +-
.../iotdb/db/service/InternalServiceImpl.java | 32 +-
.../java/org/apache/iotdb/db/service/IoTDB.java | 11 +-
.../thrift/impl/DataNodeManagementServiceImpl.java | 68 +-
.../service/thrift/impl/InfluxDBServiceImpl.java | 42 +-
.../apache/iotdb/db/sync/conf/SyncConstant.java | 85 +--
.../apache/iotdb/db/sync/conf/SyncPathUtil.java | 100 +++
.../iotdb/db/sync/conf/SyncSenderConfig.java | 127 ----
.../iotdb/db/sync/conf/SyncSenderDescriptor.java | 124 ----
.../org/apache/iotdb/db/sync/package-info.java | 38 -
.../iotdb/db/sync/pipedata/DeletionPipeData.java | 85 +++
.../apache/iotdb/db/sync/pipedata/PipeData.java | 96 +++
.../iotdb/db/sync/pipedata/SchemaPipeData.java | 98 +++
.../iotdb/db/sync/pipedata/TsFilePipeData.java | 177 +++++
.../sync/pipedata/queue/BufferedPipeDataQueue.java | 438 +++++++++++
.../queue/PipeDataQueue.java} | 21 +-
.../sync/pipedata/queue/PipeDataQueueFactory.java | 57 ++
.../iotdb/db/sync/receiver/ReceiverService.java | 278 +++++++
.../db/sync/receiver/collector/Collector.java | 170 +++++
.../db/sync/receiver/load/DeletionLoader.java | 48 ++
.../iotdb/db/sync/receiver/load/FileLoader.java | 203 ------
.../db/sync/receiver/load/FileLoaderManager.java | 213 ------
.../iotdb/db/sync/receiver/load/IFileLoader.java | 50 --
.../iotdb/db/sync/receiver/load/ILoadLogger.java | 57 --
.../receiver/load/{LoadType.java => ILoader.java} | 12 +-
.../iotdb/db/sync/receiver/load/LoadLogger.java | 72 --
.../iotdb/db/sync/receiver/load/SchemaLoader.java | 60 ++
.../iotdb/db/sync/receiver/load/TsFileLoader.java | 67 ++
.../iotdb/db/sync/receiver/manager/PipeInfo.java | 85 +++
.../db/sync/receiver/manager/PipeMessage.java | 76 ++
.../db/sync/receiver/manager/ReceiverManager.java | 216 ++++++
.../sync/receiver/recover/ISyncReceiverLogger.java | 50 --
.../receiver/recover/SyncReceiverLogAnalyzer.java | 154 ----
.../sync/receiver/recover/SyncReceiverLogger.java | 72 --
.../db/sync/receiver/recovery/ReceiverLog.java | 127 ++++
.../receiver/recovery/ReceiverLogAnalyzer.java | 157 ++++
.../db/sync/receiver/transfer/SyncServiceImpl.java | 370 ----------
.../db/sync/sender/manage/ISyncFileManager.java | 72 --
.../db/sync/sender/manage/SyncFileManager.java | 291 --------
.../db/sync/sender/manager/SchemaSyncManager.java | 163 +++++
.../db/sync/sender/manager/TsFileSyncManager.java | 118 +++
.../iotdb/db/sync/sender/pipe/IoTDBPipeSink.java | 97 +++
.../org/apache/iotdb/db/sync/sender/pipe/Pipe.java | 106 +++
.../sender/pipe/PipeSink.java} | 41 +-
.../iotdb/db/sync/sender/pipe/TsFilePipe.java | 334 +++++++++
.../sender/recover/ISyncSenderLogAnalyzer.java | 47 --
.../db/sync/sender/recover/ISyncSenderLogger.java | 67 --
.../sync/sender/recover/SyncSenderLogAnalyzer.java | 128 ----
.../db/sync/sender/recover/SyncSenderLogger.java | 72 --
.../db/sync/sender/recovery/SenderLogAnalyzer.java | 172 +++++
.../db/sync/sender/recovery/SenderLogger.java | 141 ++++
.../db/sync/sender/recovery/TsFilePipeLogger.java | 150 ++++
.../db/sync/sender/service/SenderService.java | 417 +++++++++++
.../db/sync/sender/service/TransportHandler.java | 127 ++++
.../iotdb/db/sync/sender/transfer/ISyncClient.java | 95 ---
.../iotdb/db/sync/sender/transfer/SyncClient.java | 810 ---------------------
.../client/ITransportClient.java} | 19 +-
.../db/sync/transport/client/TransportClient.java | 527 ++++++++++++++
.../conf/TransportConfig.java} | 26 +-
.../transport/conf/TransportConstant.java} | 24 +-
.../server/TransportServerManager.java} | 80 +-
.../server/TransportServerManagerMBean.java} | 6 +-
.../server/TransportServerThriftHandler.java} | 30 +-
.../transport/server/TransportServiceImpl.java | 385 ++++++++++
.../org/apache/iotdb/db/utils/DataTypeUtils.java | 8 +-
.../apache/iotdb/db/utils/EnvironmentUtils.java | 8 +
.../org/apache/iotdb/db/utils/FileLoaderUtils.java | 2 +-
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 181 ++++-
.../org/apache/iotdb/db/utils/StatusUtils.java | 46 --
.../java/org/apache/iotdb/db/utils/SyncUtils.java | 109 ---
.../iotdb/db/wal/recover/WALNodeRecoverTask.java | 4 +-
.../db/mpp/operator/SchemaScanOperatorTest.java | 242 ++++++
.../db/mpp/sql/plan/DistributionPlannerTest.java | 108 ++-
.../db/mpp/sql/plan/FragmentInstanceSerdeTest.java | 16 +-
.../iotdb/db/mpp/sql/plan/LogicalPlannerTest.java | 85 +++
.../sql/plan/node/PlanNodeDeserializeHelper.java | 3 +-
.../metadata/read/ShowDevicesNodeSerdeTest.java | 3 +-
.../sql/plan/node/process/OffsetNodeSerdeTest.java | 6 +-
.../plan/node/sink/FragmentSinkNodeSerdeTest.java | 3 +-
.../source/SeriesAggregateScanNodeSerdeTest.java | 2 +-
.../iotdb/db/qp/physical/PhysicalPlanTest.java | 28 +
.../iotdb/db/service/InternalServiceImplTest.java | 167 +++++
.../sync/pipedata/BufferedPipeDataQueueTest.java | 542 ++++++++++++++
.../iotdb/db/sync/pipedata/PipeDataTest.java | 86 +++
.../db/sync/receiver/load/FileLoaderTest.java | 405 -----------
.../sync/receiver/manager/ReceiverManagerTest.java | 98 +++
.../recover/SyncReceiverLogAnalyzerTest.java | 229 ------
.../receiver/recover/SyncReceiverLoggerTest.java | 115 ---
.../receiver/recovery/ReceiverLogAnalyzerTest.java | 124 ++++
.../db/sync/sender/manage/SyncFileManagerTest.java | 350 ---------
.../sender/recover/SyncSenderLogAnalyzerTest.java | 201 -----
.../sync/sender/recover/SyncSenderLoggerTest.java | 112 ---
.../db/sync/sender/transfer/SyncClientTest.java | 161 ----
.../db/sync/transport/TransportServiceTest.java | 205 ++++++
server/src/test/resources/iotdb-engine.properties | 4 +-
server/src/test/resources/logback-test.xml | 2 +-
.../main/java/org/apache/iotdb/rpc/RpcUtils.java | 15 +-
.../iotdb/rpc/StatementExecutionException.java | 4 +-
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 4 +-
.../src/main/thrift/confignode.thrift | 67 +-
thrift-influxdb/src/main/thrift/influxdb.thrift | 36 +-
thrift-sync/src/main/thrift/sync.thrift | 51 --
thrift-sync/src/main/thrift/transport.thrift | 90 +++
thrift/src/main/thrift/common.thrift | 43 +-
300 files changed, 14532 insertions(+), 6902 deletions(-)
diff --cc node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
index 115c4406c4,d98695824d..fa3e61ef10
--- a/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/partition/RegionReplicaSet.java
@@@ -46,16 -45,19 +45,18 @@@ public class RegionReplicaSet
this.dataNodeList = dataNodeList;
}
- public ConsensusGroupId getId() {
- return id;
+ public ConsensusGroupId getConsensusGroupId() {
+ return consensusGroupId;
}
- public void setId(ConsensusGroupId id) {
- this.id = id;
+ public void setConsensusGroupId(ConsensusGroupId consensusGroupId) {
+ this.consensusGroupId = consensusGroupId;
}
+ @Override
public String toString() {
- return String.format("RegionReplicaSet[%s-%s]: %s", id.getType(), id, dataNodeList);
+ return String.format(
- "RegionReplicaSet[%s-%d]: %s",
- consensusGroupId.getType(), consensusGroupId.getId(), dataNodeList);
++ "RegionReplicaSet[%s-%s]: %s", consensusGroupId.getType(), consensusGroupId, dataNodeList);
}
public void serializeImpl(ByteBuffer buffer) {
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
index f87979a2a2,4ef647dc7e..7836d3460f
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/DistributionPlanner.java
@@@ -22,9 -22,19 +22,22 @@@ import org.apache.iotdb.commons.partiti
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
- import org.apache.iotdb.db.mpp.sql.planner.plan.*;
- import org.apache.iotdb.db.mpp.sql.planner.plan.node.*;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.DistributedQueryPlan;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.IFragmentParallelPlaner;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.SimpleFragmentParallelPlanner;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.SubPlan;
++import org.apache.iotdb.db.mpp.sql.planner.plan.WriteFragmentParallelPlanner;
++import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.node.SimplePlanNodeRewriter;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.sink.FragmentSinkNode;
@@@ -83,9 -92,8 +98,10 @@@ public class DistributionPlanner
// Convert fragment to detailed instance
// And for parallel-able fragment, clone it into several instances with different params.
public List<FragmentInstance> planFragmentInstances(SubPlan subPlan) {
- IFragmentParallelPlaner parallelPlaner = context.getQueryType() == QueryType.READ ?
- new SimpleFragmentParallelPlanner(subPlan, analysis, context) :
- new WriteFragmentParallelPlanner(subPlan, analysis, context);
+ IFragmentParallelPlaner parallelPlaner =
- new SimpleFragmentParallelPlanner(subPlan, analysis, context);
++ context.getQueryType() == QueryType.READ
++ ? new SimpleFragmentParallelPlanner(subPlan, analysis, context)
++ : new WriteFragmentParallelPlanner(subPlan, analysis, context);
return parallelPlaner.parallelPlan();
}
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
index c263732b00,3e55a5d32c..72177f06c8
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/FragmentInstance.java
@@@ -30,9 -31,8 +30,7 @@@ import org.apache.iotdb.tsfile.read.fil
import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
- import java.io.IOException;
import java.nio.ByteBuffer;
--import java.util.Objects;
public class FragmentInstance implements IConsensusRequest {
private final FragmentInstanceId id;
@@@ -48,21 -50,23 +48,31 @@@
// We can add some more params for a specific FragmentInstance
// So that we can make different FragmentInstance owns different data range.
- public FragmentInstance(PlanFragment fragment, FragmentInstanceId id, Filter timeFilter, QueryType type) {
- public FragmentInstance(PlanFragment fragment, int index, Filter timeFilter, QueryType type) {
++ public FragmentInstance(
++ PlanFragment fragment, FragmentInstanceId id, Filter timeFilter, QueryType type) {
this.fragment = fragment;
this.timeFilter = timeFilter;
- this.id = generateId(fragment.getId(), index);
+ this.id = id;
this.type = type;
}
- public static FragmentInstanceId generateId(PlanFragmentId id, int index) {
- return new FragmentInstanceId(id, String.valueOf(index));
+ public RegionReplicaSet getDataRegionId() {
- return dataRegion;
++ return regionReplicaSet;
++ }
++
++ public void setDataRegionAndHost(RegionReplicaSet regionReplicaSet) {
++ this.regionReplicaSet = regionReplicaSet;
++ // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current
++ // instance
++ this.hostEndpoint = regionReplicaSet.getDataNodeList().get(0).getEndPoint();
}
- public void setDataRegionAndHost(RegionReplicaSet dataRegion) {
- this.dataRegion = dataRegion;
- // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current instance
- this.hostEndpoint = dataRegion.getDataNodeList().get(0).getEndPoint();
+ public RegionReplicaSet getRegionReplicaSet() {
+ return regionReplicaSet;
+ }
+
+ public void setRegionReplicaSet(RegionReplicaSet regionReplicaSet) {
+ this.regionReplicaSet = regionReplicaSet;
}
public Endpoint getHostEndpoint() {
@@@ -120,17 -133,10 +135,9 @@@
Filter timeFilter = hasTimeFilter ? FilterFactory.deserialize(buffer) : null;
QueryType queryType = QueryType.values()[ReadWriteIOUtils.readInt(buffer)];
FragmentInstance fragmentInstance =
- new FragmentInstance(
- planFragment, Integer.parseInt(id.getInstanceId()), timeFilter, queryType);
+ new FragmentInstance(planFragment, id, timeFilter, queryType);
- RegionReplicaSet regionReplicaSet = new RegionReplicaSet();
- try {
- regionReplicaSet.deserializeImpl(buffer);
- } catch (IOException e) {
- e.printStackTrace();
- }
- Endpoint endpoint = new Endpoint();
- endpoint.deserializeImpl(buffer);
- fragmentInstance.dataRegion = regionReplicaSet;
- fragmentInstance.hostEndpoint = endpoint;
+ fragmentInstance.regionReplicaSet = RegionReplicaSet.deserializeImpl(buffer);
+ fragmentInstance.hostEndpoint = Endpoint.deserializeImpl(buffer);
return fragmentInstance;
}
@@@ -144,20 -150,25 +151,7 @@@
timeFilter.serialize(buffer);
}
ReadWriteIOUtils.write(type.ordinal(), buffer);
- dataRegion.serializeImpl(buffer);
+ regionReplicaSet.serializeImpl(buffer);
hostEndpoint.serializeImpl(buffer);
}
--
-- @Override
-- public boolean equals(Object o) {
-- if (this == o) return true;
-- if (o == null || getClass() != o.getClass()) return false;
-- FragmentInstance instance = (FragmentInstance) o;
- return Objects.equals(id, instance.id) && type == instance.type && Objects.equals(fragment, instance.fragment) && Objects.equals(dataRegion, instance.dataRegion) && Objects.equals(hostEndpoint, instance.hostEndpoint) && Objects.equals(timeFilter, instance.timeFilter);
- return Objects.equals(id, instance.id)
- && type == instance.type
- && Objects.equals(fragment, instance.fragment)
- && Objects.equals(regionReplicaSet, instance.regionReplicaSet)
- && Objects.equals(hostEndpoint, instance.hostEndpoint)
- && Objects.equals(timeFilter, instance.timeFilter);
-- }
--
-- @Override
-- public int hashCode() {
- return Objects.hash(id, type, fragment, dataRegion, hostEndpoint, timeFilter);
- return Objects.hash(id, type, fragment, regionReplicaSet, hostEndpoint, timeFilter);
-- }
}
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
index 280d9891af,d3357ec275..631b217695
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/SimpleFragmentParallelPlanner.java
@@@ -99,7 -100,11 +99,7 @@@ public class SimpleFragmentParallelPlan
// We need to store all the replica host in case of the scenario that the instance need to be
// redirected
// to another host when scheduling
- fragmentInstance.setDataRegionAndHost(dataRegion);
- fragmentInstance.setRegionReplicaSet(regionReplicaSet);
-
- // TODO: (xingtanzjr) We select the first Endpoint as the default target host for current
- // instance
- fragmentInstance.setHostEndpoint(regionReplicaSet.getDataNodeList().get(0).getEndPoint());
++ fragmentInstance.setDataRegionAndHost(regionReplicaSet);
instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
fragmentInstanceList.add(fragmentInstance);
}
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
index eebf705cd3,0000000000..d8c9435214
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/WriteFragmentParallelPlanner.java
@@@ -1,61 -1,0 +1,70 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.planner.plan;
+
+import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import java.util.ArrayList;
+import java.util.List;
+
- public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner{
++public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner {
+
+ private SubPlan subPlan;
+ private Analysis analysis;
+ private MPPQueryContext queryContext;
+
- public WriteFragmentParallelPlanner(SubPlan subPlan, Analysis analysis, MPPQueryContext queryContext) {
++ public WriteFragmentParallelPlanner(
++ SubPlan subPlan, Analysis analysis, MPPQueryContext queryContext) {
+ this.subPlan = subPlan;
+ this.analysis = analysis;
+ this.queryContext = queryContext;
+ }
+
+ @Override
+ public List<FragmentInstance> parallelPlan() {
+ PlanFragment fragment = subPlan.getPlanFragment();
- Filter timeFilter = analysis.getQueryFilter() != null ? ((GlobalTimeExpression) analysis.getQueryFilter()).getFilter() : null;
++ Filter timeFilter =
++ analysis.getQueryFilter() != null
++ ? ((GlobalTimeExpression) analysis.getQueryFilter()).getFilter()
++ : null;
+ PlanNode node = fragment.getRoot();
+ if (!(node instanceof IWritePlanNode)) {
+ throw new IllegalArgumentException("PlanNode should be IWritePlanNode in WRITE operation");
+ }
+ List<IWritePlanNode> splits = ((IWritePlanNode) node).splitByPartition(analysis);
+ List<FragmentInstance> ret = new ArrayList<>();
+ for (IWritePlanNode split : splits) {
- FragmentInstance instance = new FragmentInstance(new PlanFragment(fragment.getId(), split), fragment.getId().genFragmentInstanceId(), timeFilter, queryContext.getQueryType());
++ FragmentInstance instance =
++ new FragmentInstance(
++ new PlanFragment(fragment.getId(), split),
++ fragment.getId().genFragmentInstanceId(),
++ timeFilter,
++ queryContext.getQueryType());
+ instance.setDataRegionAndHost(((IWritePlanNode) node).getRegionReplicaSet());
+ ret.add(instance);
+ }
+ return ret;
+ }
+}
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
index 49b1065a05,47fbed6dff..3fbbf98f0f
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertMultiTabletsNode.java
@@@ -20,8 -20,8 +20,9 @@@ package org.apache.iotdb.db.mpp.sql.pla
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
+ import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
@@@ -112,9 -112,9 +113,9 @@@ public class InsertMultiTabletsNode ext
Map<RegionReplicaSet, InsertMultiTabletsNode> splitMap = new HashMap<>();
for (int i = 0; i < insertTabletNodeList.size(); i++) {
InsertTabletNode insertTabletNode = insertTabletNodeList.get(i);
- List<InsertNode> tmpResult = insertTabletNode.splitByPartition(analysis);
- for (InsertNode subNode : tmpResult) {
- RegionReplicaSet dataRegionReplicaSet = subNode.getDataRegionReplicaSet();
+ List<IWritePlanNode> tmpResult = insertTabletNode.splitByPartition(analysis);
+ for (IWritePlanNode subNode : tmpResult) {
- RegionReplicaSet dataRegionReplicaSet = ((InsertNode)subNode).getDataRegionReplicaSet();
++ RegionReplicaSet dataRegionReplicaSet = ((InsertNode) subNode).getDataRegionReplicaSet();
if (splitMap.containsKey(dataRegionReplicaSet)) {
InsertMultiTabletsNode tmpNode = splitMap.get(dataRegionReplicaSet);
tmpNode.addInsertTabletNode((InsertTabletNode) subNode, i);
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
index d7eef6b50b,30b091d83b..2aead26a69
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertNode.java
@@@ -26,9 -27,11 +26,10 @@@ import org.apache.iotdb.db.mpp.sql.plan
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
import java.nio.ByteBuffer;
-import java.util.List;
-public abstract class InsertNode extends PlanNode {
+public abstract class InsertNode extends IWritePlanNode {
/**
* if use id table, this filed is id form of device path <br>
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
index 3cba509506,b3012c9986..4a8583d999
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsNode.java
@@@ -20,9 -20,9 +20,10 @@@ package org.apache.iotdb.db.mpp.sql.pla
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
+ import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
diff --cc server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 8dc7fd7671,3c22c0fd57..3b5079e625
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@@ -20,9 -20,9 +20,10 @@@ package org.apache.iotdb.db.mpp.sql.pla
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
+ import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.IWritePlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
diff --cc server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index ed192c357b,51a2313f08..39d9c6a71d
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@@ -41,13 -46,10 +46,12 @@@ import org.apache.iotdb.db.mpp.sql.plan
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
- import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
++import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import com.google.common.collect.Sets;
- import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.Test;
import java.util.ArrayList;
@@@ -205,35 -254,6 +256,32 @@@ public class DistributionPlannerTest
assertEquals(3, plan.getInstances().size());
}
+ @Test
+ public void TestWriteParallelPlan() throws IllegalPathException {
+ QueryId queryId = new QueryId("test_write");
- InsertRowNode insertRowNode = new InsertRowNode(
- queryId.genPlanNodeId(),
- new PartialPath("root.sg.d1"),
- false,
- new MeasurementSchema[]{
- new MeasurementSchema("s1", TSDataType.INT32),
- },
- new TSDataType[]{
- TSDataType.INT32
- },
- 1L,
- new Object[]{
- 10
- });
++ InsertRowNode insertRowNode =
++ new InsertRowNode(
++ queryId.genPlanNodeId(),
++ new PartialPath("root.sg.d1"),
++ false,
++ new MeasurementSchema[] {
++ new MeasurementSchema("s1", TSDataType.INT32),
++ },
++ new TSDataType[] {TSDataType.INT32},
++ 1L,
++ new Object[] {10});
+
+ Analysis analysis = constructAnalysis();
+
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, QueryType.WRITE, new Endpoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context, insertRowNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ plan.getInstances().forEach(System.out::println);
+ assertEquals(1, plan.getInstances().size());
+ }
+
private Analysis constructAnalysis() {
Analysis analysis = new Analysis();
diff --cc server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
index f57f60dda6,6871ff6560..4bf6a477e2
--- a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/FragmentInstanceSerdeTest.java
@@@ -18,8 -18,6 +18,7 @@@
*/
package org.apache.iotdb.db.mpp.sql.plan;
- import com.google.common.collect.ImmutableList;
+import org.apache.iotdb.commons.cluster.DataNodeLocation;
import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
@@@ -45,6 -43,6 +44,7 @@@ import org.apache.iotdb.tsfile.read.exp
import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
import org.apache.iotdb.tsfile.read.filter.operator.Gt;
++import com.google.common.collect.ImmutableList;
import org.junit.Test;
import java.nio.ByteBuffer;
@@@ -64,8 -61,9 +64,10 @@@ public class FragmentInstanceSerdeTest
new GroupByFilter(1, 2, 3, 4),
QueryType.READ);
RegionReplicaSet regionReplicaSet =
- new RegionReplicaSet(new DataRegionId(1), ImmutableList.of(new DataNodeLocation(0, new Endpoint("127.0.0.1", 6666))));
- new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
- fragmentInstance.setRegionReplicaSet(regionReplicaSet);
- fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.1", 6666));
++ new RegionReplicaSet(
++ new DataRegionId(1),
++ ImmutableList.of(new DataNodeLocation(0, new Endpoint("127.0.0.1", 6666))));
+ fragmentInstance.setDataRegionAndHost(regionReplicaSet);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
fragmentInstance.serializeRequest(byteBuffer);
@@@ -84,8 -81,9 +86,10 @@@
null,
QueryType.READ);
RegionReplicaSet regionReplicaSet =
- new RegionReplicaSet(new DataRegionId(1), ImmutableList.of(new DataNodeLocation(0, new Endpoint("127.0.0.2", 6667))));
- new RegionReplicaSet(new DataRegionId(1), new ArrayList<>());
- fragmentInstance.setRegionReplicaSet(regionReplicaSet);
- fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.2", 6667));
++ new RegionReplicaSet(
++ new DataRegionId(1),
++ ImmutableList.of(new DataNodeLocation(0, new Endpoint("127.0.0.2", 6667))));
+ fragmentInstance.setDataRegionAndHost(regionReplicaSet);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
fragmentInstance.serializeRequest(byteBuffer);
diff --cc server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
index 0000000000,7ed08d65b6..d5ba4d1f07
mode 000000,100644..100644
--- a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@@ -1,0 -1,164 +1,167 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ package org.apache.iotdb.db.service;
+
+ import org.apache.iotdb.commons.cluster.DataNodeLocation;
+ import org.apache.iotdb.commons.cluster.Endpoint;
+ import org.apache.iotdb.commons.consensus.SchemaRegionId;
+ import org.apache.iotdb.commons.partition.RegionReplicaSet;
+ import org.apache.iotdb.consensus.common.Peer;
+ import org.apache.iotdb.db.conf.IoTDBConfig;
+ import org.apache.iotdb.db.conf.IoTDBDescriptor;
+ import org.apache.iotdb.db.consensus.ConsensusImpl;
+ import org.apache.iotdb.db.exception.metadata.MetadataException;
+ import org.apache.iotdb.db.metadata.LocalConfigNode;
+ import org.apache.iotdb.db.metadata.path.PartialPath;
+ import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+ import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+ import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+ import org.apache.iotdb.db.utils.EnvironmentUtils;
+ import org.apache.iotdb.mpp.rpc.thrift.TConsensusGroupId;
+ import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
+ import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
+ import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
+ import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+ import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
+
+ import org.apache.ratis.util.FileUtils;
+ import org.junit.After;
+ import org.junit.Assert;
+ import org.junit.Before;
+ import org.junit.Test;
+
+ import java.io.File;
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.List;
+
+ public class InternalServiceImplTest {
+ private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
+ InternalServiceImpl internalServiceImpl;
+ LocalConfigNode configNode;
+
+ @Before
+ public void setUp() throws Exception {
+ IoTDB.configManager.init();
+ configNode = LocalConfigNode.getInstance();
+ configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln"));
+ ConsensusImpl.getInstance().start();
+ RegionReplicaSet regionReplicaSet = genRegionReplicaSet();
+ ConsensusImpl.getInstance()
+ .addConsensusGroup(regionReplicaSet.getConsensusGroupId(), genPeerList(regionReplicaSet));
+ internalServiceImpl = new InternalServiceImpl();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ IoTDB.configManager.clear();
+ RegionReplicaSet regionReplicaSet = genRegionReplicaSet();
+ ConsensusImpl.getInstance().removeConsensusGroup(regionReplicaSet.getConsensusGroupId());
+ ConsensusImpl.getInstance().stop();
+ EnvironmentUtils.cleanEnv();
+ FileUtils.deleteFully(new File("data" + File.separator + "consensus"));
+ }
+
+ @Test
+ public void createTimeseriesTest() throws MetadataException {
+ configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln"));
+ CreateTimeSeriesNode createTimeSeriesNode =
+ new CreateTimeSeriesNode(
+ new PlanNodeId("0"),
+ new PartialPath("root.ln.wf01.wt01.status"),
+ TSDataType.BOOLEAN,
+ TSEncoding.PLAIN,
+ CompressionType.SNAPPY,
+ new HashMap<String, String>() {
+ {
+ put("MAX_POINT_NUMBER", "3");
+ }
+ },
+ new HashMap<String, String>() {
+ {
+ put("tag1", "v1");
+ put("tag2", "v2");
+ }
+ },
+ new HashMap<String, String>() {
+ {
+ put("attr1", "a1");
+ put("attr2", "a2");
+ }
+ },
+ "meter1");
+
+ RegionReplicaSet regionReplicaSet = genRegionReplicaSet();
+ PlanFragment planFragment = new PlanFragment(new PlanFragmentId("2", 3), createTimeSeriesNode);
+ FragmentInstance fragmentInstance =
- new FragmentInstance(planFragment, 4, new GroupByFilter(1, 2, 3, 4), QueryType.WRITE);
- fragmentInstance.setRegionReplicaSet(regionReplicaSet);
- fragmentInstance.setHostEndpoint(new Endpoint("127.0.0.1", 6666));
++ new FragmentInstance(
++ planFragment,
++ planFragment.getId().genFragmentInstanceId(),
++ new GroupByFilter(1, 2, 3, 4),
++ QueryType.WRITE);
++ fragmentInstance.setDataRegionAndHost(regionReplicaSet);
+
+ // serialize fragmentInstance
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ fragmentInstance.serializeRequest(byteBuffer);
+ byteBuffer.flip();
+
+ // put serialized fragmentInstance to TSendFragmentInstanceReq
+ TSendFragmentInstanceReq request = new TSendFragmentInstanceReq();
+ TFragmentInstance tFragmentInstance = new TFragmentInstance();
+ tFragmentInstance.setBody(byteBuffer);
+ request.setFragmentInstance(tFragmentInstance);
+ request.setConsensusGroupId(
+ new TConsensusGroupId(
+ regionReplicaSet.getConsensusGroupId().getId(),
+ regionReplicaSet.getConsensusGroupId().getType().toString()));
+ request.setQueryType(QueryType.WRITE.toString());
+
+ // Use consensus layer to execute request
+ TSendFragmentInstanceResp response = internalServiceImpl.sendFragmentInstance(request);
+
+ Assert.assertTrue(response.accepted);
+ }
+
+ private RegionReplicaSet genRegionReplicaSet() {
+ List<DataNodeLocation> dataNodeList = new ArrayList<>();
+ dataNodeList.add(
+ new DataNodeLocation(
+ conf.getConsensusPort(), new Endpoint(conf.getInternalIp(), conf.getConsensusPort())));
+
+ // construct fragmentInstance
+ SchemaRegionId schemaRegionId = new SchemaRegionId(0);
+ return new RegionReplicaSet(schemaRegionId, dataNodeList);
+ }
+
+ private List<Peer> genPeerList(RegionReplicaSet regionReplicaSet) {
+ List<Peer> peerList = new ArrayList<>();
+ for (DataNodeLocation node : regionReplicaSet.getDataNodeList()) {
+ peerList.add(new Peer(regionReplicaSet.getConsensusGroupId(), node.getEndPoint()));
+ }
+ return peerList;
+ }
+ }