You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/08/10 16:46:47 UTC
[iotdb] 01/07: temporary commit for refactor thrift rpc
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch cluster-
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5bdf9187b5a6a74904b5d2f64ff1b564188b8ee8
Merge: 4f92cc5 b484f2e
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Tue Aug 3 19:17:27 2021 +0800
temporary commit for refactor thrift rpc
README_ZH.md | 2 +-
.../antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4 | 73 +-
.../org/apache/iotdb/cluster/ClusterIoTDB.java | 159 ++--
.../iotdb/cluster/config/ClusterConstant.java | 52 +
.../iotdb/cluster/coordinator/Coordinator.java | 8 +-
.../iotdb/cluster/log/applier/BaseApplier.java | 17 +-
.../log/manage/PartitionedSnapshotLogManager.java | 8 +-
.../apache/iotdb/cluster/metadata/CMManager.java | 223 ++---
.../apache/iotdb/cluster/metadata/MetaPuller.java | 27 +-
.../cluster/query/ClusterPhysicalGenerator.java | 18 +-
.../iotdb/cluster/query/ClusterPlanExecutor.java | 44 +-
.../apache/iotdb/cluster/query/ClusterPlanner.java | 17 +-
.../iotdb/cluster/query/ClusterQueryRouter.java | 40 +
.../cluster/query/ClusterUDTFQueryExecutor.java | 111 +++
.../iotdb/cluster/query/LocalQueryExecutor.java | 32 +-
.../cluster/query/aggregate/ClusterAggregator.java | 9 +-
.../cluster/query/fill/ClusterPreviousFill.java | 10 +-
.../query/groupby/RemoteGroupByExecutor.java | 27 +-
.../query/last/ClusterLastQueryExecutor.java | 26 +-
.../cluster/query/manage/ClusterQueryManager.java | 8 +-
.../cluster/query/reader/ClusterReaderFactory.java | 9 +-
.../cluster/query/reader/ClusterTimeGenerator.java | 14 +-
.../iotdb/cluster/query/reader/DataSourceInfo.java | 28 +-
.../reader/RemoteSeriesReaderByTimestamp.java | 3 +
.../query/reader/RemoteSimpleSeriesReader.java | 3 +
.../query/reader/mult/MultDataSourceInfo.java | 8 +-
.../query/reader/mult/RemoteMultSeriesReader.java | 10 +-
.../iotdb/cluster/server/ClusterRPCService.java | 17 +-
.../iotdb/cluster/server/ClusterTSServiceImpl.java | 112 +--
.../iotdb/cluster/server/MetaClusterServer.java | 369 -------
.../iotdb/cluster/server/MetaClusterServer2.java | 372 ++++++++
.../apache/iotdb/cluster/server/RaftServer.java | 263 -----
.../cluster/server/heartbeat/HeartbeatThread.java | 6 +
.../server/heartbeat/MetaHeartbeatServer.java | 13 +-
.../cluster/server/member/MetaGroupMember.java | 72 +-
.../apache/iotdb/cluster/utils/PartitionUtils.java | 4 +-
.../cluster/utils/nodetool/ClusterMonitor.java | 11 +-
.../org/apache/iotdb/cluster/common/IoTDBTest.java | 7 +-
.../org/apache/iotdb/cluster/common/TestUtils.java | 7 +-
.../cluster/integration/BaseSingleNodeTest.java | 9 +-
.../cluster/log/applier/DataLogApplierTest.java | 40 +-
.../cluster/log/logtypes/SerializeLogTest.java | 4 +-
.../cluster/log/snapshot/FileSnapshotTest.java | 2 +-
.../iotdb/cluster/partition/MManagerWhiteBox.java | 2 +-
.../query/ClusterAggregateExecutorTest.java | 4 +-
.../query/ClusterDataQueryExecutorTest.java | 10 +-
.../cluster/query/ClusterFillExecutorTest.java | 4 +-
.../query/ClusterPhysicalGeneratorTest.java | 3 +-
.../cluster/query/ClusterPlanExecutorTest.java | 8 +-
.../cluster/query/ClusterQueryRouterTest.java | 44 +-
.../query/ClusterUDTFQueryExecutorTest.java | 116 +++
.../iotdb/cluster/query/LoadConfigurationTest.java | 2 +-
.../ClusterGroupByNoVFilterDataSetTest.java | 2 +-
.../groupby/ClusterGroupByVFilterDataSetTest.java | 2 +-
.../query/groupby/MergeGroupByExecutorTest.java | 4 +-
.../query/groupby/RemoteGroupByExecutorTest.java | 4 +-
.../query/manage/ClusterQueryManagerTest.java | 12 +-
.../query/reader/ClusterReaderFactoryTest.java | 2 +-
.../query/reader/ClusterTimeGeneratorTest.java | 4 +-
.../clusterinfo/ClusterInfoServiceImplTest.java | 13 +-
.../cluster/server/member/DataGroupMemberTest.java | 14 +-
.../cluster/server/member/MetaGroupMemberTest.java | 12 +-
docs/Development/ContributeGuide.md | 37 +-
docs/SystemDesign/SchemaManager/SchemaManager.md | 39 +-
docs/SystemDesign/TsFile/Format.md | 477 +++++-----
docs/UserGuide/Advanced-Features/Select-Into.md | 235 +++++
.../Programming-MQTT.md | 79 +-
.../UserGuide/Ecosystem Integration/Flink IoTDB.md | 3 +-
.../DDL-Data-Definition-Language.md | 6 +-
.../UserGuide/System-Tools/Load-External-Tsfile.md | 32 +-
docs/zh/Community/Community-Powered By.md | 41 +-
docs/zh/Community/Feedback.md | 18 +-
docs/zh/Development/Committer.md | 38 +-
docs/zh/Development/ContributeGuide.md | 59 +-
docs/zh/Development/HowToCommit.md | 40 +-
docs/zh/Development/VoteRelease.md | 7 +-
docs/zh/Download/README.md | 73 +-
docs/zh/SystemDesign/Architecture/Architecture.md | 1 -
docs/zh/SystemDesign/Client/RPC.md | 3 -
docs/zh/SystemDesign/Connector/Hive-TsFile.md | 11 +-
docs/zh/SystemDesign/Connector/Spark-IOTDB.md | 39 +-
docs/zh/SystemDesign/Connector/Spark-TsFile.md | 25 +-
docs/zh/SystemDesign/DataQuery/AggregationQuery.md | 31 +-
.../SystemDesign/DataQuery/AlignByDeviceQuery.md | 12 +-
docs/zh/SystemDesign/DataQuery/FillFunction.md | 30 +-
docs/zh/SystemDesign/DataQuery/GroupByFillQuery.md | 37 +-
docs/zh/SystemDesign/DataQuery/GroupByQuery.md | 48 +-
docs/zh/SystemDesign/DataQuery/LastQuery.md | 22 +-
docs/zh/SystemDesign/DataQuery/OrderByTimeQuery.md | 75 +-
.../zh/SystemDesign/DataQuery/QueryFundamentals.md | 58 +-
docs/zh/SystemDesign/DataQuery/RawDataQuery.md | 62 +-
docs/zh/SystemDesign/DataQuery/SeriesReader.md | 24 +-
docs/zh/SystemDesign/QueryEngine/Planner.md | 1 -
.../QueryEngine/ResultSetConstruction.md | 2 +-
.../zh/SystemDesign/SchemaManager/SchemaManager.md | 190 ++--
docs/zh/SystemDesign/StorageEngine/Compaction.md | 40 +-
.../SystemDesign/StorageEngine/DataManipulation.md | 41 +-
.../zh/SystemDesign/StorageEngine/DataPartition.md | 24 +-
docs/zh/SystemDesign/StorageEngine/FileLists.md | 39 +-
docs/zh/SystemDesign/StorageEngine/FlushManager.md | 4 +-
docs/zh/SystemDesign/StorageEngine/MergeManager.md | 22 +-
docs/zh/SystemDesign/StorageEngine/Recover.md | 29 +-
.../zh/SystemDesign/StorageEngine/StorageEngine.md | 2 +-
docs/zh/SystemDesign/StorageEngine/WAL.md | 4 +-
docs/zh/SystemDesign/Tools/Sync.md | 105 +-
docs/zh/SystemDesign/TsFile/Format.md | 584 ++++++------
docs/zh/SystemDesign/TsFile/Read.md | 110 +--
docs/zh/SystemDesign/TsFile/TsFile.md | 1 -
docs/zh/SystemDesign/TsFile/Write.md | 3 +-
.../zh/UserGuide/API/Programming-Cpp-Native-API.md | 87 +-
docs/zh/UserGuide/API/Programming-Go-Native-API.md | 18 +-
docs/zh/UserGuide/API/Programming-JDBC.md | 14 +-
.../UserGuide/API/Programming-Java-Native-API.md | 59 +-
.../UserGuide/API/Programming-Python-Native-API.md | 70 +-
docs/zh/UserGuide/API/Programming-TsFile-API.md | 137 ++-
docs/zh/UserGuide/API/Time-zone.md | 1 -
.../Administration-Management/Administration.md | 35 +-
docs/zh/UserGuide/Advanced-Features/Alerting.md | 15 +-
.../Advanced-Features/Continuous-Query.md | 17 +-
docs/zh/UserGuide/Advanced-Features/Select-Into.md | 234 +++++
docs/zh/UserGuide/Advanced-Features/Triggers.md | 170 +---
.../Advanced-Features/UDF-User-Defined-Function.md | 225 ++---
docs/zh/UserGuide/Appendix/Config-Manual.md | 139 ++-
docs/zh/UserGuide/Appendix/SQL-Reference.md | 160 ++--
docs/zh/UserGuide/Appendix/Status-Codes.md | 19 +-
docs/zh/UserGuide/CLI/Command-Line-Interface.md | 123 ++-
docs/zh/UserGuide/Cluster/Cluster-Setup-Example.md | 12 +-
docs/zh/UserGuide/Cluster/Cluster-Setup.md | 67 +-
.../Collaboration-of-Edge-and-Cloud/Sync-Tool.md | 40 +-
.../Programming-MQTT.md | 111 ++-
.../Programming-Thrift.md | 30 +-
docs/zh/UserGuide/Comparison/TSDB-Comparison.md | 201 ++--
docs/zh/UserGuide/Data-Concept/Compression.md | 12 +-
.../Data-Concept/Data-Model-and-Terminology.md | 59 +-
docs/zh/UserGuide/Data-Concept/Data-Type.md | 4 +-
docs/zh/UserGuide/Data-Concept/Encoding.md | 24 +-
docs/zh/UserGuide/Data-Concept/SDT.md | 30 +-
docs/zh/UserGuide/Ecosystem Integration/DBeaver.md | 20 +-
.../UserGuide/Ecosystem Integration/Flink IoTDB.md | 5 +-
.../Ecosystem Integration/Flink TsFile.md | 17 +-
docs/zh/UserGuide/Ecosystem Integration/Grafana.md | 72 +-
.../UserGuide/Ecosystem Integration/Hive TsFile.md | 55 +-
.../Ecosystem Integration/MapReduce TsFile.md | 39 +-
.../UserGuide/Ecosystem Integration/Spark IoTDB.md | 17 +-
.../Ecosystem Integration/Spark TsFile.md | 88 +-
.../Ecosystem Integration/Writing Data on HDFS.md | 46 +-
.../Ecosystem Integration/Zeppelin-IoTDB.md | 51 +-
.../zh/UserGuide/FAQ/Frequently-asked-questions.md | 33 +-
.../UserGuide/IoTDB-Introduction/Architecture.md | 14 +-
docs/zh/UserGuide/IoTDB-Introduction/Features.md | 8 +-
.../zh/UserGuide/IoTDB-Introduction/Publication.md | 2 +-
docs/zh/UserGuide/IoTDB-Introduction/Scenario.md | 20 +-
.../UserGuide/IoTDB-Introduction/What-is-IoTDB.md | 4 +-
.../DDL-Data-Definition-Language.md | 72 +-
.../DML-Data-Manipulation-Language.md | 364 ++++---
.../IoTDB-SQL-Language/Maintenance-Command.md | 10 +-
docs/zh/UserGuide/QuickStart/Files.md | 17 +-
docs/zh/UserGuide/QuickStart/QuickStart.md | 78 +-
docs/zh/UserGuide/QuickStart/ServerFileList.md | 64 +-
docs/zh/UserGuide/QuickStart/WayToGetIoTDB.md | 43 +-
docs/zh/UserGuide/System-Tools/CSV-Tool.md | 24 +-
docs/zh/UserGuide/System-Tools/JMX-Tool.md | 24 +-
.../UserGuide/System-Tools/Load-External-Tsfile.md | 58 +-
docs/zh/UserGuide/System-Tools/MLogParser-Tool.md | 7 +-
.../System-Tools/Monitor-and-Log-Tools.md | 96 +-
docs/zh/UserGuide/System-Tools/NodeTool.md | 117 ++-
.../Query-History-Visualization-Tool.md | 6 +-
docs/zh/UserGuide/System-Tools/Watermark-Tool.md | 34 +-
docs/zh/UserGuide/UserGuideReadme.md | 9 +-
.../org/apache/iotdb/flink/FlinkIoTDBSink.java | 26 +-
.../RPC.md => example/mqtt-customize/README.md | 45 +-
example/mqtt-customize/pom.xml | 42 +
.../server/CustomizedJsonPayloadFormatter.java | 62 ++
.../org.apache.iotdb.db.mqtt.PayloadFormatter | 1 +
example/pom.xml | 1 +
.../iotdb/HybridTimeseriesSessionExample.java | 129 +++
.../org/apache/iotdb/SessionConcurrentExample.java | 199 ++++
.../main/java/org/apache/iotdb/SessionExample.java | 14 +
.../iotdb/tsfile/TsFileWriteVectorWithTablet.java | 89 +-
.../java/org/apache/iotdb/flink/IoTDBSink.java | 26 +-
.../iotdb/flink/options/IoTDBSinkOptions.java | 11 -
...tITCase.java => RowTSRecordOutputFormatIT.java} | 2 +-
...rmatITCase.java => RowTsFileInputFormatIT.java} | 2 +-
pom.xml | 2 +
.../resources/conf/iotdb-engine.properties | 24 +
.../org/apache/iotdb/db/auth/AuthorityChecker.java | 1 +
.../db/concurrent/IoTDBThreadPoolFactory.java | 18 +
.../org/apache/iotdb/db/concurrent/ThreadName.java | 5 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 50 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 43 +-
.../apache/iotdb/db/cq/ContinuousQueryTask.java | 6 +-
.../org/apache/iotdb/db/engine/StorageEngine.java | 183 ++--
.../db/engine/cache/TimeSeriesMetadataCache.java | 71 +-
.../level/LevelCompactionTsFileManagement.java | 2 +-
.../iotdb/db/engine/memtable/AbstractMemTable.java | 15 +-
.../apache/iotdb/db/engine/memtable/IMemTable.java | 2 +
.../iotdb/db/engine/merge/task/MergeTask.java | 10 +-
.../selectinto/InsertTabletPlanGenerator.java | 254 +++++
.../selectinto/InsertTabletPlansIterator.java | 139 +++
.../engine/storagegroup/StorageGroupProcessor.java | 196 ++--
.../db/engine/storagegroup/TsFileProcessor.java | 10 +-
.../db/engine/storagegroup/TsFileResource.java | 10 +-
.../storagegroup/timeindex/DeviceTimeIndex.java | 13 +-
.../storagegroup/timeindex/FileTimeIndex.java | 5 +
.../engine/storagegroup/timeindex/ITimeIndex.java | 8 +
.../storagegroup/timeindex/TimeIndexLevel.java | 10 -
.../virtualSg/VirtualStorageGroupManager.java | 110 ++-
.../db/engine/trigger/executor/TriggerEngine.java | 16 +-
.../engine/trigger/executor/TriggerExecutor.java | 8 +-
.../service/TriggerRegistrationService.java | 10 +-
.../exception/query/PathNumOverLimitException.java | 13 +-
.../org/apache/iotdb/db/metadata/MManager.java | 1004 ++++++--------------
.../java/org/apache/iotdb/db/metadata/MTree.java | 657 ++++++++-----
.../org/apache/iotdb/db/metadata/MetaUtils.java | 18 +-
.../iotdb/db/metadata/MetadataOperationType.java | 4 +
.../db/metadata/{ => logfile}/MLogTxtWriter.java | 71 +-
.../iotdb/db/metadata/logfile/MLogWriter.java | 24 +-
.../db/metadata/{ => logfile}/TagLogFile.java | 2 +-
.../iotdb/db/metadata/mnode/EntityMNode.java | 117 +++
.../iotdb/db/metadata/mnode/IEntityMNode.java | 55 ++
.../org/apache/iotdb/db/metadata/mnode/IMNode.java | 82 ++
.../iotdb/db/metadata/mnode/IMeasurementMNode.java | 58 ++
.../mnode/IStorageGroupMNode.java} | 14 +-
.../iotdb/db/metadata/mnode/InternalMNode.java | 271 ++++++
.../org/apache/iotdb/db/metadata/mnode/MNode.java | 339 +------
.../iotdb/db/metadata/mnode/MeasurementMNode.java | 207 ++--
...roupMNode.java => StorageGroupEntityMNode.java} | 23 +-
.../iotdb/db/metadata/mnode/StorageGroupMNode.java | 11 +-
.../apache/iotdb/db/metadata/tag/TagManager.java | 556 +++++++++++
.../iotdb/db/metadata/template/Template.java | 11 +-
.../db/metadata/template/TemplateManager.java | 141 +++
.../org/apache/iotdb/db/monitor/StatMonitor.java | 2 +-
.../main/java/org/apache/iotdb/db/qp/Planner.java | 51 +-
.../apache/iotdb/db/qp/constant/SQLConstant.java | 4 +
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 73 +-
.../org/apache/iotdb/db/qp/logical/Operator.java | 6 +-
.../iotdb/db/qp/logical/crud/QueryOperator.java | 185 ++--
.../db/qp/logical/crud/SelectIntoOperator.java | 110 +++
.../iotdb/db/qp/logical/sys/LoadFilesOperator.java | 25 +-
.../apache/iotdb/db/qp/physical/PhysicalPlan.java | 38 +-
.../db/qp/physical/crud/AlignByDevicePlan.java | 57 +-
.../iotdb/db/qp/physical/crud/InsertPlan.java | 12 +-
.../iotdb/db/qp/physical/crud/InsertRowPlan.java | 3 +-
.../iotdb/db/qp/physical/crud/MeasurementInfo.java | 75 ++
.../iotdb/db/qp/physical/crud/SelectIntoPlan.java | 113 +++
...emplatePlan.java => SetSchemaTemplatePlan.java} | 14 +-
.../iotdb/db/qp/physical/sys/OperateFilePlan.java | 23 +-
...tePlan.java => SetUsingSchemaTemplatePlan.java} | 16 +-
.../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java | 191 ++--
.../iotdb/db/qp/strategy/LogicalChecker.java | 5 +
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 24 +-
.../qp/strategy/optimizer/ConcatPathOptimizer.java | 8 +-
.../qp/strategy/optimizer/ILogicalOptimizer.java | 3 +-
.../apache/iotdb/db/qp/utils/WildcardsRemover.java | 28 +-
.../db/query/control/QueryResourceManager.java | 100 +-
.../iotdb/db/query/control/QueryTimeManager.java | 14 +
.../iotdb/db/query/control/SessionManager.java | 10 +-
.../apache/iotdb/db/query/control/TracingInfo.java | 85 ++
.../iotdb/db/query/control/TracingManager.java | 141 ++-
.../db/query/dataset/AlignByDeviceDataSet.java | 26 +-
.../iotdb/db/query/executor/LastQueryExecutor.java | 6 +-
.../iotdb/db/query/reader/series/SeriesReader.java | 99 +-
.../reader/universal/DescPriorityMergeReader.java | 5 +-
.../reader/universal/PriorityMergeReader.java | 13 +-
.../apache/iotdb/db/rescon/MemTableManager.java | 4 +
.../apache/iotdb/db/rescon/TVListAllocator.java | 12 +-
.../java/org/apache/iotdb/db/service/IoTDB.java | 4 +-
.../org/apache/iotdb/db/service/RPCService.java | 1 +
.../org/apache/iotdb/db/service/ServiceType.java | 7 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 313 +++---
.../org/apache/iotdb/db/service/UpgradeSevice.java | 9 +-
.../iotdb/db/service/thrift/ThriftService.java | 18 +-
.../db/service/thrift/ThriftServiceThread.java | 220 ++++-
.../iotdb/db/sync/receiver/SyncServerManager.java | 7 +-
.../apache/iotdb/db/tools/TsFileSketchTool.java | 583 ++++++++----
.../org/apache/iotdb/db/tools/mlog/MLogParser.java | 19 +-
.../org/apache/iotdb/db/utils/SchemaUtils.java | 21 +-
.../iotdb/db/writelog/recover/LogReplayer.java | 6 +-
.../writelog/recover/TsFileRecoverPerformer.java | 26 +-
.../db/engine/memtable/MemTableFlushTaskTest.java | 22 +-
.../db/engine/memtable/MemTableTestUtils.java | 5 +-
.../db/engine/memtable/PrimitiveMemTableTest.java | 3 +-
.../storagegroup/StorageGroupProcessorTest.java | 84 +-
.../iotdb/db/engine/storagegroup/TTLTest.java | 9 +-
.../engine/storagegroup/TsFileProcessorTest.java | 14 +-
.../iotdb/db/integration/IoTDBAddSubDeviceIT.java | 11 +-
.../iotdb/db/integration/IoTDBAlignByDeviceIT.java | 3 +-
.../org/apache/iotdb/db/integration/IoTDBAsIT.java | 2 +-
.../db/integration/IoTDBAutoCreateSchemaIT.java | 6 +-
.../db/integration/IoTDBContinuousQueryIT.java | 147 ++-
.../db/integration/IoTDBCreateStorageGroupIT.java | 128 +++
.../db/integration/IoTDBCreateTimeseriesIT.java | 34 +-
.../db/integration/IoTDBInsertWithoutTimeIT.java | 129 +++
.../apache/iotdb/db/integration/IoTDBLastIT.java | 22 +-
.../db/integration/IoTDBLoadExternalTsfileIT.java | 121 ++-
.../iotdb/db/integration/IoTDBMetadataFetchIT.java | 20 +-
.../db/integration/IoTDBQueryMemoryControlIT.java | 20 +-
.../iotdb/db/integration/IoTDBRestartIT.java | 48 +
.../iotdb/db/integration/IoTDBSelectIntoIT.java | 617 ++++++++++++
.../db/integration/IoTDBSequenceDataQueryIT.java | 12 +-
.../iotdb/db/integration/IoTDBSeriesReaderIT.java | 11 +-
.../iotdb/db/integration/IoTDBSimpleQueryIT.java | 14 +-
.../db/integration/IoTDBTriggerExecutionIT.java | 43 +-
.../iotdb/db/metadata/MManagerAdvancedTest.java | 20 +-
.../iotdb/db/metadata/MManagerBasicTest.java | 170 ++--
.../iotdb/db/metadata/MManagerImproveTest.java | 8 +-
.../org/apache/iotdb/db/metadata/MTreeTest.java | 103 +-
.../apache/iotdb/db/metadata/MetaUtilsTest.java | 14 +-
.../apache/iotdb/db/metadata/mnode/MNodeTest.java | 41 +-
.../iotdb/db/qp/logical/LogicalPlanSmallTest.java | 2 +-
.../iotdb/db/qp/physical/InsertRowPlanTest.java | 8 +-
.../iotdb/db/qp/physical/InsertTabletPlanTest.java | 14 +-
.../iotdb/db/qp/physical/PhysicalPlanTest.java | 83 +-
.../iotdb/db/query/control/TracingManagerTest.java | 7 +-
.../query/reader/series/SeriesReaderTestUtil.java | 2 +-
.../org/apache/iotdb/db/tools/MLogParserTest.java | 71 +-
.../iotdb/db/tools/TsFileSketchToolTest.java | 4 +-
.../apache/iotdb/db/utils/EnvironmentUtils.java | 11 +-
.../iotdb/db/utils/TsFileRewriteToolTest.java | 4 +-
.../iotdb/db/writelog/recover/LogReplayerTest.java | 3 +-
.../db/writelog/recover/SeqTsFileRecoverTest.java | 2 +-
.../apache/iotdb/session/IoTDBSessionSimpleIT.java | 6 +-
.../apache/iotdb/session/IoTDBSessionVectorIT.java | 213 +++++
.../java/org/apache/iotdb/session/SessionTest.java | 2 +-
site/README.md | 2 +-
site/src/main/.vuepress/config.js | 31 +-
site/src/main/.vuepress/theme/components/Page.vue | 15 +-
.../apache/iotdb/spark/db/EnvironmentUtils.java | 111 ++-
.../test/java/org/apache/iotdb/db/sql/Cases.java | 235 ++---
.../file/metadata/MetadataIndexConstructor.java | 52 +-
.../iotdb/tsfile/read/TsFileSequenceReader.java | 17 +-
.../org/apache/iotdb/tsfile/read/common/Path.java | 4 +
.../java/org/apache/iotdb/tsfile/utils/BitMap.java | 5 +
.../apache/iotdb/tsfile/write/TsFileWriter.java | 12 +-
.../tsfile/write/chunk/ChunkGroupWriterImpl.java | 77 +-
.../tsfile/write/chunk/VectorChunkWriterImpl.java | 5 +-
.../tsfile/write/schema/IMeasurementSchema.java | 5 +
.../tsfile/write/schema/MeasurementSchema.java | 12 +-
.../apache/iotdb/tsfile/write/schema/Schema.java | 28 +-
.../write/schema/VectorMeasurementSchema.java | 12 +-
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 10 +
...ateTest.java => DefaultSchemaTemplateTest.java} | 8 +-
.../tsfile/write/MetadataIndexConstructorTest.java | 478 ++++++++++
.../write/schema/converter/SchemaBuilderTest.java | 4 +-
.../write/writer/VectorChunkWriterImplTest.java | 34 +-
.../write/writer/VectorMeasurementSchemaStub.java | 12 +-
346 files changed, 12967 insertions(+), 7924 deletions(-)
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index cec70b2,adc4661..4fcb5a5
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@@ -22,33 -22,28 +22,40 @@@ import org.apache.iotdb.cluster.client.
import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
++import org.apache.iotdb.cluster.coordinator.Coordinator;
import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
++import org.apache.iotdb.cluster.metadata.CMManager;
++import org.apache.iotdb.cluster.metadata.MetaPuller;
import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.partition.slot.SlotStrategy;
import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.server.MetaClusterServer;
+import org.apache.iotdb.cluster.server.ClusterRPCService;
- import org.apache.iotdb.cluster.server.MetaClusterServer;
++import org.apache.iotdb.cluster.server.RaftTSMetaServiceImpl;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer;
++import org.apache.iotdb.cluster.server.member.MetaGroupMember;
++import org.apache.iotdb.cluster.server.raft.MetaRaftHeartBeatService;
++import org.apache.iotdb.cluster.server.raft.MetaRaftService;
++import org.apache.iotdb.cluster.server.service.MetaAsyncService;
++import org.apache.iotdb.cluster.server.service.MetaSyncService;
import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor;
import org.apache.iotdb.db.conf.IoTDBConfigCheck;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.service.JMXService;
+import org.apache.iotdb.db.service.RegisterManager;
++import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
import org.apache.iotdb.db.utils.TestOnly;
--
import org.apache.thrift.TException;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
--import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -58,12 -53,9 +65,13 @@@ import java.util.Set
import static org.apache.iotdb.cluster.utils.ClusterUtils.UNKNOWN_CLIENT_IP;
- //we do not inherent IoTDB instance, as it may break the singleton mode of IoTDB.
-public class ClusterMain {
++// we do not inherent IoTDB instance, as it may break the singleton mode of IoTDB.
+public class ClusterIoTDB {
- private static final Logger logger = LoggerFactory.getLogger(ClusterMain.class);
+ private static final Logger logger = LoggerFactory.getLogger(ClusterIoTDB.class);
+ private final String mbeanName =
- String.format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE, "ClusterIoTDB");
++ String.format(
++ "%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE, "ClusterIoTDB");
// establish the cluster as a seed
private static final String MODE_START = "-s";
@@@ -73,13 -65,8 +81,27 @@@
// metaport-of-removed-node
private static final String MODE_REMOVE = "-r";
- private MetaClusterServer metaServer;
- private static MetaClusterServer metaServer;
++ private MetaGroupMember metaGroupEngine;
++ private Node thisNode;
++ private Coordinator coordinator;
+
+ private IoTDB iotdb = IoTDB.getInstance();
+
+ // Cluster IoTDB uses a individual registerManager with its parent.
+ private RegisterManager registerManager = new RegisterManager();
+
++ private ClusterIoTDB() {
++ ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
++ thisNode = new Node();
++ // set internal rpc ip and ports
++ thisNode.setInternalIp(config.getInternalIp());
++ thisNode.setMetaPort(config.getInternalMetaPort());
++ thisNode.setDataPort(config.getInternalDataPort());
++ // set client rpc ip and ports
++ thisNode.setClientPort(config.getClusterRpcPort());
++ thisNode.setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress());
++ }
+
public static void main(String[] args) {
if (args.length < 1) {
logger.error(
@@@ -115,18 -103,50 +137,18 @@@
String mode = args[0];
logger.info("Running mode {}", mode);
+ ClusterIoTDB cluster = ClusterIoTDBHolder.INSTANCE;
- //we start IoTDB kernel first.
- cluster.iotdb.active();
++ // we start IoTDB kernel first.
++ // cluster.iotdb.active();
+
- //then we start the cluster module.
++ // then we start the cluster module.
if (MODE_START.equals(mode)) {
- try {
- metaServer = new MetaClusterServer();
- startServerCheck();
- preStartCustomize();
- metaServer.start();
- metaServer.buildCluster();
- // Currently, we do not register ClusterInfoService as a JMX Bean,
- // so we use startService() rather than start()
- ClusterInfoServer.getInstance().startService();
- } catch (TTransportException
- | StartupException
- | QueryProcessException
- | StartUpCheckFailureException
- | ConfigInconsistentException e) {
- metaServer.stop();
- logger.error("Fail to start meta server", e);
- }
+ cluster.activeStartNodeMode();
} else if (MODE_ADD.equals(mode)) {
- try {
- long startTime = System.currentTimeMillis();
- metaServer = new MetaClusterServer();
- preStartCustomize();
- metaServer.start();
- metaServer.joinCluster();
- // Currently, we do not register ClusterInfoService as a JMX Bean,
- // so we use startService() rather than start()
- ClusterInfoServer.getInstance().startService();
-
- logger.info(
- "Adding this node {} to cluster costs {} ms",
- metaServer.getMember().getThisNode(),
- (System.currentTimeMillis() - startTime));
- } catch (TTransportException
- | StartupException
- | QueryProcessException
- | StartUpCheckFailureException
- | ConfigInconsistentException e) {
- metaServer.stop();
- logger.error("Fail to join cluster", e);
- }
+ cluster.activeAddNodeMode();
} else if (MODE_REMOVE.equals(mode)) {
try {
- doRemoveNode(args);
+ cluster.doRemoveNode(args);
} catch (IOException e) {
logger.error("Fail to remove node in cluster", e);
}
@@@ -135,61 -155,7 +157,94 @@@
}
}
- private static void startServerCheck() throws StartupException {
+ public void activeStartNodeMode() {
+ try {
- metaServer = new MetaClusterServer();
+ startServerCheck();
+ preStartCustomize();
- metaServer.start();
- metaServer.buildCluster();
++
++ coordinator = new Coordinator();
++ // register MetaGroupMember. MetaGroupMember has the same position with "StorageEngine" in the
++ // cluster moduel.
++ // TODO fixme it is better to remove coordinator out of metaGroupEngine
++
++ // local engine
++ metaGroupEngine =
++ new MetaGroupMember(
++ ThriftServiceThread.getProtocolFactory(
++ IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()),
++ thisNode,
++ coordinator);
++ IoTDB.setMetaManager(CMManager.getInstance());
++ ((CMManager) IoTDB.metaManager).setMetaGroupMember(metaGroupEngine);
++ ((CMManager) IoTDB.metaManager).setCoordinator(coordinator);
++ MetaPuller.getInstance().init(metaGroupEngine);
++ iotdb.active();
++
++ registerManager.register(metaGroupEngine);
++
++ metaGroupEngine.buildCluster();
++
++ // rpc service
++ if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
++ MetaAsyncService metaAsyncService = new MetaAsyncService(metaGroupEngine);
++ MetaRaftHeartBeatService.getInstance().initAsyncedServiceImpl(metaAsyncService);
++ MetaRaftService.getInstance().initAsyncedServiceImpl(metaAsyncService);
++ } else {
++ MetaSyncService syncService = new MetaSyncService(metaGroupEngine);
++ MetaRaftHeartBeatService.getInstance().initSyncedServiceImpl(syncService);
++ MetaRaftService.getInstance().initSyncedServiceImpl(syncService);
++ }
++
++ // meta group heart beat rpc
++ registerManager.register(MetaRaftHeartBeatService.getInstance());
++ registerManager.register(MetaRaftService.getInstance());
++
+ // Currently, we do not register ClusterInfoService as a JMX Bean,
+ // so we use startService() rather than start()
+ ClusterInfoServer.getInstance().startService();
+ // JMX based DBA API
+ registerManager.register(ClusterMonitor.INSTANCE);
+ // we must wait until the metaGroup established.
+ // So that the ClusterRPCService can work.
+ registerManager.register(ClusterRPCService.getInstance());
- } catch (TTransportException
- | StartupException
++ } catch (StartupException
+ | QueryProcessException
+ | StartUpCheckFailureException
+ | ConfigInconsistentException e) {
+ stop();
+ logger.error("Fail to start meta server", e);
+ }
+ }
+
+ public void activeAddNodeMode() {
- try {
- long startTime = System.currentTimeMillis();
- metaServer = new MetaClusterServer();
- preStartCustomize();
- metaServer.start();
- metaServer.joinCluster();
- // Currently, we do not register ClusterInfoService as a JMX Bean,
- // so we use startService() rather than start()
- ClusterInfoServer.getInstance().startService();
- // JMX based DBA API
- registerManager.register(ClusterMonitor.INSTANCE);
- //finally, we start the RPC service
- registerManager.register(ClusterRPCService.getInstance());
- logger.info(
- "Adding this node {} to cluster costs {} ms",
- metaServer.getMember().getThisNode(),
- (System.currentTimeMillis() - startTime));
- } catch (TTransportException
- | StartupException
- | QueryProcessException
- | StartUpCheckFailureException
- | ConfigInconsistentException e) {
- stop();
- logger.error("Fail to join cluster", e);
- }
++ // try {
++ // long startTime = System.currentTimeMillis();
++ // metaServer = new RaftTSMetaServiceImpl();
++ // preStartCustomize();
++ // metaServer.start();
++ // metaServer.joinCluster();
++ // // Currently, we do not register ClusterInfoService as a JMX Bean,
++ // // so we use startService() rather than start()
++ // ClusterInfoServer.getInstance().startService();
++ // // JMX based DBA API
++ // registerManager.register(ClusterMonitor.INSTANCE);
++ // // finally, we start the RPC service
++ // registerManager.register(ClusterRPCService.getInstance());
++ // logger.info(
++ // "Adding this node {} to cluster costs {} ms",
++ // metaServer.getMember().getThisNode(),
++ // (System.currentTimeMillis() - startTime));
++ // } catch (TTransportException
++ // | StartupException
++ // | QueryProcessException
++ // | StartUpCheckFailureException
++ // | ConfigInconsistentException e) {
++ // stop();
++ // logger.error("Fail to join cluster", e);
++ // }
+ }
+
-
+ private void startServerCheck() throws StartupException {
ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
// check the initial replicateNum and refuse to start when the replicateNum <= 0
if (config.getReplicationNum() <= 0) {
@@@ -236,18 -202,18 +291,12 @@@
}
// assert this node is in seed nodes list
-- Node localNode = new Node();
-- localNode
-- .setInternalIp(config.getInternalIp())
-- .setMetaPort(config.getInternalMetaPort())
-- .setDataPort(config.getInternalDataPort())
-- .setClientPort(config.getClusterRpcPort())
-- .setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress());
-- if (!seedNodes.contains(localNode)) {
++
++ if (!seedNodes.contains(thisNode)) {
String message =
String.format(
"SeedNodes must contains local node in start-server mode. LocalNode: %s ,SeedNodes: %s",
-- localNode.toString(), config.getSeedNodeUrls());
++ thisNode.toString(), config.getSeedNodeUrls());
throw new StartupException(metaServer.getMember().getName(), message);
}
}
@@@ -307,7 -273,7 +356,7 @@@
}
}
- public MetaClusterServer getMetaServer() {
- public static MetaClusterServer getMetaServer() {
++ public RaftTSMetaServiceImpl getMetaServer() {
return metaServer;
}
@@@ -358,35 -324,8 +407,33 @@@
});
}
-
-
@TestOnly
- public void setMetaClusterServer(MetaClusterServer metaClusterServer) {
- public static void setMetaClusterServer(MetaClusterServer metaClusterServer) {
-- metaServer = metaClusterServer;
++ public void setMetaClusterServer(RaftTSMetaServiceImpl RaftTSMetaServiceImpl) {
++ metaServer = RaftTSMetaServiceImpl;
+ }
+
+ public void stop() {
+ deactivate();
+ }
+
+ private void deactivate() {
+ logger.info("Deactivating Cluster IoTDB...");
+ metaServer.stop();
+ registerManager.deregisterAll();
+ JMXService.deregisterMBean(mbeanName);
+ logger.info("ClusterIoTDB is deactivated.");
- //stop the iotdb kernel
++ // stop the iotdb kernel
+ iotdb.stop();
+ }
+
-
+ public static ClusterIoTDB getInstance() {
+ return ClusterIoTDBHolder.INSTANCE;
+ }
++
+ private static class ClusterIoTDBHolder {
+
+ private static final ClusterIoTDB INSTANCE = new ClusterIoTDB();
+
+ private ClusterIoTDBHolder() {}
}
}
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
index b377b1a,b377b1a..f5b0a49
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
@@@ -19,6 -19,6 +19,7 @@@
package org.apache.iotdb.cluster.config;
import org.apache.iotdb.cluster.rpc.thrift.Node;
++import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.db.utils.TestOnly;
public class ClusterConstant {
@@@ -67,4 -67,4 +68,55 @@@
public static void setElectionRandomTimeOutMs(long electionRandomTimeOutMs) {
ClusterConstant.electionRandomTimeOutMs = electionRandomTimeOutMs;
}
++
++ private static int connectionTimeoutInMS =
++ ClusterDescriptor.getInstance().getConfig().getConnectionTimeoutInMS();
++ private static int readOperationTimeoutMS =
++ ClusterDescriptor.getInstance().getConfig().getReadOperationTimeoutMS();
++ private static int writeOperationTimeoutMS =
++ ClusterDescriptor.getInstance().getConfig().getWriteOperationTimeoutMS();
++ private static int syncLeaderMaxWaitMs = 20 * 1000;
++ private static long heartBeatIntervalMs = 1000L;
++
++ public static int getConnectionTimeoutInMS() {
++ return connectionTimeoutInMS;
++ }
++
++ public static void setConnectionTimeoutInMS(int connectionTimeoutInMS) {
++ ClusterConstant.connectionTimeoutInMS = connectionTimeoutInMS;
++ }
++
++ public static int getReadOperationTimeoutMS() {
++ return readOperationTimeoutMS;
++ }
++
++ public static int getWriteOperationTimeoutMS() {
++ return writeOperationTimeoutMS;
++ }
++
++ public static int getSyncLeaderMaxWaitMs() {
++ return syncLeaderMaxWaitMs;
++ }
++
++ public static void setSyncLeaderMaxWaitMs(int syncLeaderMaxWaitMs) {
++ ClusterConstant.syncLeaderMaxWaitMs = syncLeaderMaxWaitMs;
++ }
++
++ public static long getHeartBeatIntervalMs() {
++ return heartBeatIntervalMs;
++ }
++
++ public static void setHeartBeatIntervalMs(long heartBeatIntervalMs) {
++ ClusterConstant.heartBeatIntervalMs = heartBeatIntervalMs;
++ }
++
++ @TestOnly
++ public static void setReadOperationTimeoutMS(int readOperationTimeoutMS) {
++ RaftServer.readOperationTimeoutMS = readOperationTimeoutMS;
++ }
++
++ @TestOnly
++ public static void setWriteOperationTimeoutMS(int writeOperationTimeoutMS) {
++ RaftServer.writeOperationTimeoutMS = writeOperationTimeoutMS;
++ }
}
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java
index 8c79ac5,5bbddec..b8a2c00
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java
@@@ -33,15 -26,21 +33,15 @@@ import org.apache.iotdb.db.service.thri
import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
import org.apache.iotdb.service.rpc.thrift.TSIService.Processor;
- public class ClusterRPCService extends ThriftService implements ClusterRPCServiceMBean {
-/** A service to handle jdbc request from client. */
-public class RPCService extends ThriftService implements RPCServiceMBean {
-
- private TSServiceImpl impl;
++public class ClusterRPCService extends ThriftService implements ClusterRPCServiceMBean {
- private RPCService() {}
+ private ClusterTSServiceImpl impl;
- public static RPCService getInstance() {
- return RPCServiceHolder.INSTANCE;
- }
+ private ClusterRPCService() {}
@Override
- public int getRPCPort() {
- IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- return config.getRpcPort();
+ public ThriftService getImplementation() {
+ return ClusterRPCServiceHolder.INSTANCE;
}
@Override
@@@ -50,9 -49,12 +50,13 @@@
}
@Override
-- public void initTProcessor()
- throws IllegalAccessException, InstantiationException {
- impl = new ClusterTSServiceImpl();
- throws ClassNotFoundException, IllegalAccessException, InstantiationException {
- impl =
- (TSServiceImpl)
- Class.forName(IoTDBDescriptor.getInstance().getConfig().getRpcImplClassName())
- .newInstance();
++ public void initTProcessor() throws IllegalAccessException, InstantiationException {
++ try {
++ impl = new ClusterTSServiceImpl();
++ initSyncedServiceImpl(null);
++ } catch (QueryProcessException e) {
++ throw new InstantiationException(e.getMessage());
++ }
processor = new Processor<>(impl);
}
@@@ -64,9 -66,9 +68,9 @@@
new ThriftServiceThread(
processor,
getID().getName(),
- ThreadName.RPC_CLIENT.getName(),
- config.getRpcAddress(),
- config.getRpcPort(),
+ ThreadName.CLUSTER_RPC_CLIENT.getName(),
- config.getRpcAddress(),
- config.getRpcPort(),
++ getBindIP(),
++ getBindPort(),
config.getRpcMaxConcurrentClientNum(),
config.getThriftServerAwaitTimeForStopService(),
new RPCServiceThriftHandler(impl),
@@@ -88,27 -90,14 +92,26 @@@
}
@Override
- public ServiceType getID() {
- return ServiceType.RPC_SERVICE;
+ public int getRPCPort() {
+ return getBindPort();
+ }
+
+ public static ClusterRPCService getInstance() {
+ return ClusterRPCServiceHolder.INSTANCE;
+ }
+
+ public void assignExecutorToServiceImpl(MetaGroupMember member) throws QueryProcessException {
+ this.impl.setExecutor(member);
+ }
+
+ public void assignCoordinator(Coordinator coordinator) {
+ this.impl.setCoordinator(coordinator);
}
-
- private static class RPCServiceHolder {
+ private static class ClusterRPCServiceHolder {
- private static final RPCService INSTANCE = new RPCService();
+ private static final ClusterRPCService INSTANCE = new ClusterRPCService();
- private RPCServiceHolder() {}
+ private ClusterRPCServiceHolder() {}
}
}
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java
index e43e503,0000000..f1fd75a
mode 100644,000000..100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java
@@@ -1,237 -1,0 +1,161 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.server;
+
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
++import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.coordinator.Coordinator;
- import org.apache.iotdb.cluster.metadata.CMManager;
+import org.apache.iotdb.cluster.query.ClusterPlanExecutor;
- import org.apache.iotdb.cluster.query.ClusterPlanner;
+import org.apache.iotdb.cluster.query.RemoteQueryContext;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.db.exception.StorageEngineException;
- import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
- import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
- import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.service.TSServiceImpl;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
- import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-
+import org.apache.thrift.TException;
- import org.apache.thrift.protocol.TProtocol;
- import org.apache.thrift.server.ServerContext;
- import org.apache.thrift.server.TServerEventHandler;
- import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
- import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
- * ClusterTSServiceImpl is the cluster version of TSServiceImpl, which is responsible for the processing of
- * the user requests (sqls and session api). It inherits the basic procedures from TSServiceImpl,
- * but redirect the queries of data and metadata to a MetaGroupMember of the local node.
++ * ClusterTSServiceImpl is the cluster version of TSServiceImpl, which is responsible for the
++ * processing of the user requests (sqls and session api). It inherits the basic procedures from
++ * TSServiceImpl, but redirect the queries of data and metadata to a MetaGroupMember of the local
++ * node.
+ */
+public class ClusterTSServiceImpl extends TSServiceImpl {
+
+ private static final Logger logger = LoggerFactory.getLogger(ClusterTSServiceImpl.class);
+ /**
- * The Coordinator of the local node. Through this node queries data and meta from
- * the cluster and performs data manipulations to the cluster.
++ * The Coordinator of the local node. Through this node queries data and meta from the cluster and
++ * performs data manipulations to the cluster.
+ */
+ private Coordinator coordinator;
+
-
- // /**
- // * Using the poolServer, ClusterTSServiceImpl will listen to a socket to accept thrift requests like an
- // * HttpServer.
- // */
- // private TServer poolServer;
- //
- // /** The socket poolServer will listen to. Async service requires nonblocking socket */
- // private TServerTransport serverTransport;
-
+ /**
+ * queryId -> queryContext map. When a query ends either normally or accidentally, the resources
+ * used by the query can be found in the context and then released.
+ */
+ private Map<Long, RemoteQueryContext> queryContextMap = new ConcurrentHashMap<>();
+
- public ClusterTSServiceImpl(MetaGroupMember metaGroupMember) throws QueryProcessException {
- super();
- this.processor = new ClusterPlanner();
- }
++ public ClusterTSServiceImpl() throws QueryProcessException {}
+
+ public void setExecutor(MetaGroupMember metaGroupMember) throws QueryProcessException {
- this.executor = new ClusterPlanExecutor(metaGroupMember);
++ executor = new ClusterPlanExecutor(metaGroupMember);
+ }
+
+ public void setCoordinator(Coordinator coordinator) {
+ this.coordinator = coordinator;
+ }
+
-
+ /**
+ * Redirect the plan to the local Coordinator so that it will be processed cluster-wide.
+ *
+ * @param plan
+ * @return
+ */
+ @Override
+ protected TSStatus executeNonQueryPlan(PhysicalPlan plan) {
+ try {
+ plan.checkIntegrity();
+ } catch (QueryProcessException e) {
+ logger.warn("Illegal plan detected: {}", plan);
+ return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
+ }
+
+ return coordinator.executeNonQueryPlan(plan);
+ }
+
+ /**
- * EventHandler handles the preprocess and postprocess of the thrift requests, but it currently
- * only release resources when a client disconnects.
- */
- class EventHandler implements TServerEventHandler {
-
- @Override
- public void preServe() {
- // do nothing
- }
-
- @Override
- public ServerContext createContext(TProtocol input, TProtocol output) {
- return null;
- }
-
- @Override
- public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) {
- ClusterTSServiceImpl.this.handleClientExit();
- }
-
- @Override
- public void processContext(
- ServerContext serverContext, TTransport inputTransport, TTransport outputTransport) {
- // do nothing
- }
- }
-
- /**
- * Get the data types of each path in “paths”. If "aggregations" is not null, then it should be
- * corresponding to "paths" one to one and the data type will be the type of the aggregation over
- * the corresponding path.
- *
- * @param paths full timeseries paths
- * @param aggregations if not null, it should be the same size as "paths"
- * @return the data types of "paths" (using the aggregations)
- * @throws MetadataException
- */
- @Override
- protected List<TSDataType> getSeriesTypesByPaths(
- List<PartialPath> paths, List<String> aggregations) throws MetadataException {
- return ((CMManager) IoTDB.metaManager).getSeriesTypesByPath(paths, aggregations).left;
- }
-
- /**
- * Get the data types of each path in “paths”. If "aggregation" is not null, all "paths" will use
- * this aggregation.
- *
- * @param paths full timeseries paths
- * @param aggregation if not null, it means "paths" all use this aggregation
- * @return the data types of "paths" (using the aggregation)
- * @throws MetadataException
- */
- protected List<TSDataType> getSeriesTypesByString(List<PartialPath> paths, String aggregation)
- throws MetadataException {
- return ((CMManager) IoTDB.metaManager).getSeriesTypesByPaths(paths, aggregation).left;
- }
-
- /**
+ * Generate and cache a QueryContext using "queryId". In the distributed version, the QueryContext
+ * is a RemoteQueryContext.
+ *
+ * @param queryId
+ * @return a RemoteQueryContext using queryId
+ */
+ @Override
+ protected QueryContext genQueryContext(long queryId, boolean debug) {
+ RemoteQueryContext context = new RemoteQueryContext(queryId, debug);
+ queryContextMap.put(queryId, context);
+ return context;
+ }
+
+ /**
+ * Release the local and remote resources used by a query.
+ *
+ * @param queryId
+ * @throws StorageEngineException
+ */
+ @Override
+ protected void releaseQueryResource(long queryId) throws StorageEngineException {
+ // release resources locally
+ super.releaseQueryResource(queryId);
+ // release resources remotely
+ RemoteQueryContext context = queryContextMap.remove(queryId);
+ if (context != null) {
+ // release the resources in every queried node
+ for (Entry<RaftNode, Set<Node>> headerEntry : context.getQueriedNodesMap().entrySet()) {
+ RaftNode header = headerEntry.getKey();
+ Set<Node> queriedNodes = headerEntry.getValue();
+
+ for (Node queriedNode : queriedNodes) {
+ GenericHandler<Void> handler = new GenericHandler<>(queriedNode, new AtomicReference<>());
+ try {
+ if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+ AsyncDataClient client =
+ coordinator.getAsyncDataClient(
- queriedNode, RaftServer.getReadOperationTimeoutMS());
++ queriedNode, ClusterConstant.getReadOperationTimeoutMS());
+ client.endQuery(header, coordinator.getThisNode(), queryId, handler);
+ } else {
+ try (SyncDataClient syncDataClient =
+ coordinator.getSyncDataClient(
- queriedNode, RaftServer.getReadOperationTimeoutMS())) {
- syncDataClient.endQuery(header, coordinator.getThisNode(), queryId);
++ queriedNode, ClusterConstant.getReadOperationTimeoutMS())) {
++ try {
++ syncDataClient.endQuery(header, coordinator.getThisNode(), queryId);
++ } catch (TException e) {
++ // the connection may be broken, close it to avoid it being reused
++ syncDataClient.getInputProtocol().getTransport().close();
++ throw e;
++ }
+ }
+ }
+ } catch (IOException | TException e) {
+ logger.error("Cannot end query {} in {}", queryId, queriedNode);
+ }
+ }
+ }
+ }
+ }
+}
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index b0f8a25,fce7a87..0000000
deleted file mode 100644,100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ /dev/null
@@@ -1,369 -1,378 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing,
-- * software distributed under the License is distributed on an
-- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- * KIND, either express or implied. See the License for the
-- * specific language governing permissions and limitations
-- * under the License.
-- */
--package org.apache.iotdb.cluster.server;
--
- import org.apache.iotdb.cluster.ClusterIoTDB;
--import org.apache.iotdb.cluster.config.ClusterDescriptor;
--import org.apache.iotdb.cluster.coordinator.Coordinator;
--import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
--import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
--import org.apache.iotdb.cluster.metadata.CMManager;
--import org.apache.iotdb.cluster.metadata.MetaPuller;
--import org.apache.iotdb.cluster.rpc.thrift.*;
--import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncProcessor;
--import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Processor;
--import org.apache.iotdb.cluster.server.heartbeat.MetaHeartbeatServer;
--import org.apache.iotdb.cluster.server.member.MetaGroupMember;
--import org.apache.iotdb.cluster.server.service.MetaAsyncService;
--import org.apache.iotdb.cluster.server.service.MetaSyncService;
--import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor;
--import org.apache.iotdb.db.exception.StartupException;
--import org.apache.iotdb.db.exception.query.QueryProcessException;
--import org.apache.iotdb.db.service.IoTDB;
--import org.apache.iotdb.db.service.RegisterManager;
--import org.apache.iotdb.db.utils.TestOnly;
--import org.apache.iotdb.service.rpc.thrift.TSStatus;
--
--import org.apache.thrift.TException;
--import org.apache.thrift.TProcessor;
--import org.apache.thrift.async.AsyncMethodCallback;
--import org.apache.thrift.transport.TNonblockingServerSocket;
--import org.apache.thrift.transport.TServerSocket;
--import org.apache.thrift.transport.TServerTransport;
--import org.apache.thrift.transport.TTransportException;
--import org.slf4j.Logger;
--import org.slf4j.LoggerFactory;
--
--import java.net.InetSocketAddress;
--import java.nio.ByteBuffer;
--
--/**
-- * MetaCluster manages the whole cluster's metadata, such as what nodes are in the cluster and the
-- * data partition. Each node has one MetaClusterServer instance, the single-node IoTDB instance is
-- * started-up at the same time.
-- */
--public class MetaClusterServer extends RaftServer
-- implements TSMetaService.AsyncIface, TSMetaService.Iface {
-- private static Logger logger = LoggerFactory.getLogger(MetaClusterServer.class);
--
-- // each node only contains one MetaGroupMember
-- private MetaGroupMember member;
-- private Coordinator coordinator;
-
-
- // the single-node IoTDB instance
- private IoTDB ioTDB;
- // to register the ClusterMonitor that helps monitoring the cluster
- private RegisterManager registerManager = new RegisterManager();
-- private MetaAsyncService asyncService;
-- private MetaSyncService syncService;
-- private MetaHeartbeatServer metaHeartbeatServer;
--
-- public MetaClusterServer() throws QueryProcessException {
-- super();
-- metaHeartbeatServer = new MetaHeartbeatServer(thisNode, this);
-- coordinator = new Coordinator();
-- member = new MetaGroupMember(protocolFactory, thisNode, coordinator);
-- coordinator.setMetaGroupMember(member);
-- asyncService = new MetaAsyncService(member);
-- syncService = new MetaSyncService(member);
-- MetaPuller.getInstance().init(member);
-- }
--
-- /**
-- * Besides the standard RaftServer start-up, the IoTDB instance, a MetaGroupMember and the
-- * ClusterMonitor are also started.
-- *
-- * @throws TTransportException
-- * @throws StartupException
-- */
-- @Override
-- public void start() throws TTransportException, StartupException {
-- super.start();
-- metaHeartbeatServer.start();
-
- ioTDB = new IoTDB();
-- IoTDB.setMetaManager(CMManager.getInstance());
-- ((CMManager) IoTDB.metaManager).setMetaGroupMember(member);
-- ((CMManager) IoTDB.metaManager).setCoordinator(coordinator);
- //TODO FIXME move this out of MetaClusterServer
- IoTDB.getInstance().active();
-
- ioTDB.active();
-- member.start();
-
- // JMX based DBA API
- registerManager.register(ClusterMonitor.INSTANCE);
-- }
--
-- /** Also stops the IoTDB instance, the MetaGroupMember and the ClusterMonitor. */
-- @Override
-- public void stop() {
-
- if (ioTDB == null) {
- return;
- }
-- metaHeartbeatServer.stop();
-- super.stop();
- ioTDB.stop();
- ioTDB = null;
-- member.stop();
- registerManager.deregisterAll();
-- }
--
-- /** Build a initial cluster with other nodes on the seed list. */
-- public void buildCluster() throws ConfigInconsistentException, StartUpCheckFailureException {
-- member.buildCluster();
-- }
--
-- /**
-- * Pick up a node from the seed list and send a join request to it.
-- *
-- * @return whether the node has joined the cluster.
-- */
-- public void joinCluster() throws ConfigInconsistentException, StartUpCheckFailureException {
-- member.joinCluster();
-- }
--
-- /**
-- * MetaClusterServer uses the meta port to create the socket.
-- *
-- * @return the TServerTransport
-- * @throws TTransportException if create the socket fails
-- */
-- @Override
-- TServerTransport getServerSocket() throws TTransportException {
-- logger.info(
-- "[{}] Cluster node will listen {}:{}",
-- getServerClientName(),
-- config.getInternalIp(),
-- config.getInternalMetaPort());
-- if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
-- return new TNonblockingServerSocket(
-- new InetSocketAddress(config.getInternalIp(), config.getInternalMetaPort()),
-- getConnectionTimeoutInMS());
-- } else {
-- return new TServerSocket(
-- new InetSocketAddress(config.getInternalIp(), config.getInternalMetaPort()));
-- }
-- }
--
-- @Override
-- String getClientThreadPrefix() {
-- return "MetaClientThread-";
-- }
--
-- @Override
-- String getServerClientName() {
-- return "MetaServerThread-";
-- }
--
-- @Override
-- TProcessor getProcessor() {
-- if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
-- return new AsyncProcessor<>(this);
-- } else {
-- return new Processor<>(this);
-- }
-- }
--
-- // Request forwarding. There is only one MetaGroupMember each node, so all requests will be
-- // directly sent to that member. See the methods in MetaGroupMember for details
--
-- @Override
-- public void addNode(Node node, StartUpStatus startUpStatus, AsyncMethodCallback resultHandler) {
-- asyncService.addNode(node, startUpStatus, resultHandler);
-- }
--
-- @Override
-- public void sendHeartbeat(HeartBeatRequest request, AsyncMethodCallback resultHandler) {
-- asyncService.sendHeartbeat(request, resultHandler);
-- }
--
-- @Override
-- public void startElection(ElectionRequest electionRequest, AsyncMethodCallback resultHandler) {
-- asyncService.startElection(electionRequest, resultHandler);
-- }
--
-- @Override
-- public void appendEntries(AppendEntriesRequest request, AsyncMethodCallback resultHandler) {
-- asyncService.appendEntries(request, resultHandler);
-- }
--
-- @Override
-- public void appendEntry(AppendEntryRequest request, AsyncMethodCallback resultHandler) {
-- asyncService.appendEntry(request, resultHandler);
-- }
--
-- @Override
-- public void sendSnapshot(SendSnapshotRequest request, AsyncMethodCallback resultHandler) {
-- asyncService.sendSnapshot(request, resultHandler);
-- }
--
-- @Override
-- public void executeNonQueryPlan(
-- ExecutNonQueryReq request, AsyncMethodCallback<TSStatus> resultHandler) {
-- asyncService.executeNonQueryPlan(request, resultHandler);
-- }
--
-- @Override
-- public void requestCommitIndex(
-- RaftNode header, AsyncMethodCallback<RequestCommitIndexResponse> resultHandler) {
-- asyncService.requestCommitIndex(header, resultHandler);
-- }
--
-- @Override
-- public void checkAlive(AsyncMethodCallback<Node> resultHandler) {
-- asyncService.checkAlive(resultHandler);
-- }
--
-- @Override
-- public void collectMigrationStatus(AsyncMethodCallback<ByteBuffer> resultHandler) {
-- asyncService.collectMigrationStatus(resultHandler);
-- }
--
-- @Override
-- public void readFile(
-- String filePath, long offset, int length, AsyncMethodCallback<ByteBuffer> resultHandler) {
-- asyncService.readFile(filePath, offset, length, resultHandler);
-- }
--
-- @Override
-- public void queryNodeStatus(AsyncMethodCallback<TNodeStatus> resultHandler) {
-- asyncService.queryNodeStatus(resultHandler);
-- }
--
-- public MetaGroupMember getMember() {
-- return member;
-- }
--
-- @Override
-- public void checkStatus(
-- StartUpStatus startUpStatus, AsyncMethodCallback<CheckStatusResponse> resultHandler) {
-- asyncService.checkStatus(startUpStatus, resultHandler);
-- }
--
-- @Override
-- public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) {
-- asyncService.removeNode(node, resultHandler);
-- }
--
-- @Override
-- public void exile(ByteBuffer removeNodeLog, AsyncMethodCallback<Void> resultHandler) {
-- asyncService.exile(removeNodeLog, resultHandler);
-- }
--
-- @Override
-- public void matchTerm(
-- long index, long term, RaftNode header, AsyncMethodCallback<Boolean> resultHandler) {
-- asyncService.matchTerm(index, term, header, resultHandler);
-- }
--
-- @Override
-- public AddNodeResponse addNode(Node node, StartUpStatus startUpStatus) throws TException {
-- return syncService.addNode(node, startUpStatus);
-- }
--
-- @Override
-- public CheckStatusResponse checkStatus(StartUpStatus startUpStatus) {
-- return syncService.checkStatus(startUpStatus);
-- }
--
-- @Override
-- public long removeNode(Node node) throws TException {
-- return syncService.removeNode(node);
-- }
--
-- @Override
-- public void exile(ByteBuffer removeNodeLog) {
-- syncService.exile(removeNodeLog);
-- }
--
-- @Override
-- public TNodeStatus queryNodeStatus() {
-- return syncService.queryNodeStatus();
-- }
--
-- @Override
-- public Node checkAlive() {
-- return syncService.checkAlive();
-- }
--
-- @Override
-- public ByteBuffer collectMigrationStatus() {
-- return syncService.collectMigrationStatus();
-- }
--
-- @Override
-- public HeartBeatResponse sendHeartbeat(HeartBeatRequest request) {
-- return syncService.sendHeartbeat(request);
-- }
--
-- @Override
-- public long startElection(ElectionRequest request) {
-- return syncService.startElection(request);
-- }
--
-- @Override
-- public long appendEntries(AppendEntriesRequest request) throws TException {
-- return syncService.appendEntries(request);
-- }
--
-- @Override
-- public long appendEntry(AppendEntryRequest request) throws TException {
-- return syncService.appendEntry(request);
-- }
--
-- @Override
-- public void sendSnapshot(SendSnapshotRequest request) throws TException {
-- syncService.sendSnapshot(request);
-- }
--
-- @Override
-- public TSStatus executeNonQueryPlan(ExecutNonQueryReq request) throws TException {
-- return syncService.executeNonQueryPlan(request);
-- }
--
-- @Override
-- public RequestCommitIndexResponse requestCommitIndex(RaftNode header) throws TException {
-- return syncService.requestCommitIndex(header);
-- }
--
-- @Override
-- public ByteBuffer readFile(String filePath, long offset, int length) throws TException {
-- return syncService.readFile(filePath, offset, length);
-- }
--
-- @Override
-- public boolean matchTerm(long index, long term, RaftNode header) {
-- return syncService.matchTerm(index, term, header);
-- }
--
-- @Override
-- public void removeHardLink(String hardLinkPath) throws TException {
-- syncService.removeHardLink(hardLinkPath);
-- }
--
-- @Override
-- public void removeHardLink(String hardLinkPath, AsyncMethodCallback<Void> resultHandler) {
-- asyncService.removeHardLink(hardLinkPath, resultHandler);
-- }
--
-- @Override
-- public void handshake(Node sender) {
-- syncService.handshake(sender);
-- }
--
-- @Override
-- public void handshake(Node sender, AsyncMethodCallback<Void> resultHandler) {
-- asyncService.handshake(sender, resultHandler);
-- }
--
-- @TestOnly
-- public void setMetaGroupMember(MetaGroupMember metaGroupMember) {
-- this.member = metaGroupMember;
-- }
--
- @TestOnly
- public IoTDB getIoTDB() {
- return ioTDB;
- }
--}
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer2.java
index 0000000,0000000..65db372
new file mode 100644
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer2.java
@@@ -1,0 -1,0 +1,372 @@@
++/// *
++// * Licensed to the Apache Software Foundation (ASF) under one
++// * or more contributor license agreements. See the NOTICE file
++// * distributed with this work for additional information
++// * regarding copyright ownership. The ASF licenses this file
++// * to you under the Apache License, Version 2.0 (the
++// * "License"); you may not use this file except in compliance
++// * with the License. You may obtain a copy of the License at
++// *
++// * http://www.apache.org/licenses/LICENSE-2.0
++// *
++// * Unless required by applicable law or agreed to in writing,
++// * software distributed under the License is distributed on an
++// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++// * KIND, either express or implied. See the License for the
++// * specific language governing permissions and limitations
++// * under the License.
++// */
++// package org.apache.iotdb.cluster.server;
++//
++// import org.apache.iotdb.cluster.config.ClusterDescriptor;
++// import org.apache.iotdb.cluster.coordinator.Coordinator;
++// import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
++// import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
++// import org.apache.iotdb.cluster.metadata.CMManager;
++// import org.apache.iotdb.cluster.metadata.MetaPuller;
++// import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
++// import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
++// import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
++// import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
++// import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
++// import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
++// import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
++// import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
++// import org.apache.iotdb.cluster.rpc.thrift.Node;
++// import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
++// import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse;
++// import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
++// import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus;
++// import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
++// import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
++// import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncProcessor;
++// import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Processor;
++// import org.apache.iotdb.cluster.server.heartbeat.MetaHeartbeatServer;
++// import org.apache.iotdb.cluster.server.member.MetaGroupMember;
++// import org.apache.iotdb.cluster.server.service.MetaAsyncService;
++// import org.apache.iotdb.cluster.server.service.MetaSyncService;
++// import org.apache.iotdb.db.exception.StartupException;
++// import org.apache.iotdb.db.exception.query.QueryProcessException;
++// import org.apache.iotdb.db.service.IoTDB;
++// import org.apache.iotdb.db.utils.TestOnly;
++// import org.apache.iotdb.service.rpc.thrift.TSStatus;
++// import org.apache.thrift.TException;
++// import org.apache.thrift.async.AsyncMethodCallback;
++// import org.apache.thrift.transport.TNonblockingServerSocket;
++// import org.slf4j.Logger;
++// import org.slf4j.LoggerFactory;
++//
++// import java.net.InetSocketAddress;
++// import java.nio.ByteBuffer;
++//
++/// **
++// * MetaCluster manages the whole cluster's metadata, such as what nodes are in the cluster and the
++// * data partition. Each node has one MetaClusterServer instance, the single-node IoTDB instance is
++// * started-up at the same time.
++// */
++// public class MetaClusterServer2 extends RaftServer
++// implements TSMetaService.AsyncIface, TSMetaService.Iface {
++// private static Logger logger = LoggerFactory.getLogger(MetaClusterServer2.class);
++//
++// // each node only contains one MetaGroupMember
++// private MetaGroupMember member;
++// private Coordinator coordinator;
++//
++// private MetaAsyncService asyncService;
++// private MetaSyncService syncService;
++// private MetaHeartbeatServer metaHeartbeatServer;
++//
++// public MetaClusterServer2() throws QueryProcessException {
++// super();
++// metaHeartbeatServer = new MetaHeartbeatServer(thisNode, this);
++// coordinator = new Coordinator();
++// member = new MetaGroupMember(protocolFactory, thisNode, coordinator);
++// coordinator.setMetaGroupMember(member);
++// asyncService = new MetaAsyncService(member);
++// syncService = new MetaSyncService(member);
++// MetaPuller.getInstance().init(member);
++// }
++//
++// /**
++// * Besides the standard RaftServer start-up, the IoTDB instance, a MetaGroupMember and the
++// * ClusterMonitor are also started.
++// *
++// * @throws TTransportException
++// * @throws StartupException
++// */
++// @Override
++// public void start() throws TTransportException, StartupException {
++// super.start();
++// metaHeartbeatServer.start();
++//
++// IoTDB.setMetaManager(CMManager.getInstance());
++// ((CMManager) IoTDB.metaManager).setMetaGroupMember(member);
++// ((CMManager) IoTDB.metaManager).setCoordinator(coordinator);
++// // TODO FIXME move this out of MetaClusterServer
++// IoTDB.getInstance().active();
++//
++// member.start();
++// }
++//
++// /** Also stops the IoTDB instance, the MetaGroupMember and the ClusterMonitor. */
++// @Override
++// public void stop() {
++//
++// metaHeartbeatServer.stop();
++// super.stop();
++// member.stop();
++// }
++//
++// /** Build a initial cluster with other nodes on the seed list. */
++// public void buildCluster() throws ConfigInconsistentException, StartUpCheckFailureException {
++// member.buildCluster();
++// }
++//
++// /**
++// * Pick up a node from the seed list and send a join request to it.
++// *
++// * @return whether the node has joined the cluster.
++// */
++// public void joinCluster() throws ConfigInconsistentException, StartUpCheckFailureException {
++// member.joinCluster();
++// }
++//
++// /**
++// * MetaClusterServer uses the meta port to create the socket.
++// *
++// * @return the TServerTransport
++// * @throws TTransportException if create the socket fails
++// */
++// @Override
++// TServerTransport getServerSocket() throws TTransportException {
++// logger.info(
++// "[{}] Cluster node will listen {}:{}",
++// getServerClientName(),
++// config.getInternalIp(),
++// config.getInternalMetaPort());
++// if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
++// return new TNonblockingServerSocket(
++// new InetSocketAddress(config.getInternalIp(), config.getInternalMetaPort()),
++// getConnectionTimeoutInMS());
++// } else {
++// return new TServerSocket(
++// new InetSocketAddress(config.getInternalIp(), config.getInternalMetaPort()));
++// }
++// }
++//
++// @Override
++// String getClientThreadPrefix() {
++// return "MetaClientThread-";
++// }
++//
++// @Override
++// String getServerClientName() {
++// return "MetaServerThread-";
++// }
++//
++// @Override
++// TProcessor getProcessor() {
++// if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
++// return new AsyncProcessor<>(this);
++// } else {
++// return new Processor<>(this);
++// }
++// }
++//
++// // Request forwarding. There is only one MetaGroupMember each node, so all requests will be
++// // directly sent to that member. See the methods in MetaGroupMember for details
++//
++// @Override
++// public void addNode(Node node, StartUpStatus startUpStatus, AsyncMethodCallback resultHandler) {
++// asyncService.addNode(node, startUpStatus, resultHandler);
++// }
++//
++// @Override
++// public void sendHeartbeat(HeartBeatRequest request, AsyncMethodCallback resultHandler) {
++// asyncService.sendHeartbeat(request, resultHandler);
++// }
++//
++// @Override
++// public void startElection(ElectionRequest electionRequest, AsyncMethodCallback resultHandler) {
++// asyncService.startElection(electionRequest, resultHandler);
++// }
++//
++// @Override
++// public void appendEntries(AppendEntriesRequest request, AsyncMethodCallback resultHandler) {
++// asyncService.appendEntries(request, resultHandler);
++// }
++//
++// @Override
++// public void appendEntry(AppendEntryRequest request, AsyncMethodCallback resultHandler) {
++// asyncService.appendEntry(request, resultHandler);
++// }
++//
++// @Override
++// public void sendSnapshot(SendSnapshotRequest request, AsyncMethodCallback resultHandler) {
++// asyncService.sendSnapshot(request, resultHandler);
++// }
++//
++// @Override
++// public void executeNonQueryPlan(
++// ExecutNonQueryReq request, AsyncMethodCallback<TSStatus> resultHandler) {
++// asyncService.executeNonQueryPlan(request, resultHandler);
++// }
++//
++// @Override
++// public void requestCommitIndex(
++// RaftNode header, AsyncMethodCallback<RequestCommitIndexResponse> resultHandler) {
++// asyncService.requestCommitIndex(header, resultHandler);
++// }
++//
++// @Override
++// public void checkAlive(AsyncMethodCallback<Node> resultHandler) {
++// asyncService.checkAlive(resultHandler);
++// }
++//
++// @Override
++// public void collectMigrationStatus(AsyncMethodCallback<ByteBuffer> resultHandler) {
++// asyncService.collectMigrationStatus(resultHandler);
++// }
++//
++// @Override
++// public void readFile(
++// String filePath, long offset, int length, AsyncMethodCallback<ByteBuffer> resultHandler) {
++// asyncService.readFile(filePath, offset, length, resultHandler);
++// }
++//
++// @Override
++// public void queryNodeStatus(AsyncMethodCallback<TNodeStatus> resultHandler) {
++// asyncService.queryNodeStatus(resultHandler);
++// }
++//
++// public MetaGroupMember getMember() {
++// return member;
++// }
++//
++// @Override
++// public void checkStatus(
++// StartUpStatus startUpStatus, AsyncMethodCallback<CheckStatusResponse> resultHandler) {
++// asyncService.checkStatus(startUpStatus, resultHandler);
++// }
++//
++// @Override
++// public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) {
++// asyncService.removeNode(node, resultHandler);
++// }
++//
++// @Override
++// public void exile(ByteBuffer removeNodeLog, AsyncMethodCallback<Void> resultHandler) {
++// asyncService.exile(removeNodeLog, resultHandler);
++// }
++//
++// @Override
++// public void matchTerm(
++// long index, long term, RaftNode header, AsyncMethodCallback<Boolean> resultHandler) {
++// asyncService.matchTerm(index, term, header, resultHandler);
++// }
++//
++// @Override
++// public AddNodeResponse addNode(Node node, StartUpStatus startUpStatus) throws TException {
++// return syncService.addNode(node, startUpStatus);
++// }
++//
++// @Override
++// public CheckStatusResponse checkStatus(StartUpStatus startUpStatus) {
++// return syncService.checkStatus(startUpStatus);
++// }
++//
++// @Override
++// public long removeNode(Node node) throws TException {
++// return syncService.removeNode(node);
++// }
++//
++// @Override
++// public void exile(ByteBuffer removeNodeLog) {
++// syncService.exile(removeNodeLog);
++// }
++//
++// @Override
++// public TNodeStatus queryNodeStatus() {
++// return syncService.queryNodeStatus();
++// }
++//
++// @Override
++// public Node checkAlive() {
++// return syncService.checkAlive();
++// }
++//
++// @Override
++// public ByteBuffer collectMigrationStatus() {
++// return syncService.collectMigrationStatus();
++// }
++//
++// @Override
++// public HeartBeatResponse sendHeartbeat(HeartBeatRequest request) {
++// return syncService.sendHeartbeat(request);
++// }
++//
++// @Override
++// public long startElection(ElectionRequest request) {
++// return syncService.startElection(request);
++// }
++//
++// @Override
++// public long appendEntries(AppendEntriesRequest request) throws TException {
++// return syncService.appendEntries(request);
++// }
++//
++// @Override
++// public long appendEntry(AppendEntryRequest request) throws TException {
++// return syncService.appendEntry(request);
++// }
++//
++// @Override
++// public void sendSnapshot(SendSnapshotRequest request) throws TException {
++// syncService.sendSnapshot(request);
++// }
++//
++// @Override
++// public TSStatus executeNonQueryPlan(ExecutNonQueryReq request) throws TException {
++// return syncService.executeNonQueryPlan(request);
++// }
++//
++// @Override
++// public RequestCommitIndexResponse requestCommitIndex(RaftNode header) throws TException {
++// return syncService.requestCommitIndex(header);
++// }
++//
++// @Override
++// public ByteBuffer readFile(String filePath, long offset, int length) throws TException {
++// return syncService.readFile(filePath, offset, length);
++// }
++//
++// @Override
++// public boolean matchTerm(long index, long term, RaftNode header) {
++// return syncService.matchTerm(index, term, header);
++// }
++//
++// @Override
++// public void removeHardLink(String hardLinkPath) throws TException {
++// syncService.removeHardLink(hardLinkPath);
++// }
++//
++// @Override
++// public void removeHardLink(String hardLinkPath, AsyncMethodCallback<Void> resultHandler) {
++// asyncService.removeHardLink(hardLinkPath, resultHandler);
++// }
++//
++// @Override
++// public void handshake(Node sender) {
++// syncService.handshake(sender);
++// }
++//
++// @Override
++// public void handshake(Node sender, AsyncMethodCallback<Void> resultHandler) {
++// asyncService.handshake(sender, resultHandler);
++// }
++//
++// @TestOnly
++// public void setMetaGroupMember(MetaGroupMember metaGroupMember) {
++// this.member = metaGroupMember;
++// }
++// }
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
index 09956d2,09956d2..0000000
deleted file mode 100644,100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
+++ /dev/null
@@@ -1,263 -1,263 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing,
-- * software distributed under the License is distributed on an
-- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- * KIND, either express or implied. See the License for the
-- * specific language governing permissions and limitations
-- * under the License.
-- */
--
--package org.apache.iotdb.cluster.server;
--
--import org.apache.iotdb.cluster.config.ClusterConfig;
--import org.apache.iotdb.cluster.config.ClusterDescriptor;
--import org.apache.iotdb.cluster.rpc.thrift.Node;
--import org.apache.iotdb.cluster.rpc.thrift.RaftService;
--import org.apache.iotdb.cluster.utils.ClusterUtils;
--import org.apache.iotdb.db.conf.IoTDBDescriptor;
--import org.apache.iotdb.db.exception.StartupException;
--import org.apache.iotdb.db.utils.CommonUtils;
--import org.apache.iotdb.db.utils.TestOnly;
--import org.apache.iotdb.rpc.RpcTransportFactory;
--
--import org.apache.thrift.TProcessor;
--import org.apache.thrift.protocol.TBinaryProtocol;
--import org.apache.thrift.protocol.TCompactProtocol;
--import org.apache.thrift.protocol.TProtocolFactory;
--import org.apache.thrift.server.TServer;
--import org.apache.thrift.server.TThreadedSelectorServer;
--import org.apache.thrift.transport.TNonblockingServerTransport;
--import org.apache.thrift.transport.TServerTransport;
--import org.apache.thrift.transport.TTransportException;
--import org.slf4j.Logger;
--import org.slf4j.LoggerFactory;
--
--import java.util.ConcurrentModificationException;
--import java.util.concurrent.ExecutorService;
--import java.util.concurrent.Executors;
--import java.util.concurrent.SynchronousQueue;
--import java.util.concurrent.ThreadFactory;
--import java.util.concurrent.ThreadPoolExecutor;
--import java.util.concurrent.atomic.AtomicLong;
--
--/**
-- * RaftServer works as a broker (network and protocol layer) that sends the requests to the proper
-- * RaftMembers to process.
-- */
--public abstract class RaftServer implements RaftService.AsyncIface, RaftService.Iface {
--
-- private static final Logger logger = LoggerFactory.getLogger(RaftServer.class);
-- private static int connectionTimeoutInMS =
-- ClusterDescriptor.getInstance().getConfig().getConnectionTimeoutInMS();
-- private static int readOperationTimeoutMS =
-- ClusterDescriptor.getInstance().getConfig().getReadOperationTimeoutMS();
-- private static int writeOperationTimeoutMS =
-- ClusterDescriptor.getInstance().getConfig().getWriteOperationTimeoutMS();
-- private static int syncLeaderMaxWaitMs = 20 * 1000;
-- private static long heartBeatIntervalMs = 1000L;
--
-- ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
-- // the socket poolServer will listen to
-- private TServerTransport socket;
-- // RPC processing server
-- private TServer poolServer;
-- Node thisNode;
--
-- TProtocolFactory protocolFactory =
-- config.isRpcThriftCompressionEnabled()
-- ? new TCompactProtocol.Factory()
-- : new TBinaryProtocol.Factory();
--
-- // this thread pool is to run the thrift server (poolServer above)
-- private ExecutorService clientService;
--
-- RaftServer() {
-- thisNode = new Node();
-- // set internal rpc ip and ports
-- thisNode.setInternalIp(config.getInternalIp());
-- thisNode.setMetaPort(config.getInternalMetaPort());
-- thisNode.setDataPort(config.getInternalDataPort());
-- // set client rpc ip and ports
-- thisNode.setClientPort(config.getClusterRpcPort());
-- thisNode.setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress());
-- }
--
-- RaftServer(Node thisNode) {
-- this.thisNode = thisNode;
-- }
--
-- public static int getConnectionTimeoutInMS() {
-- return connectionTimeoutInMS;
-- }
--
-- public static void setConnectionTimeoutInMS(int connectionTimeoutInMS) {
-- RaftServer.connectionTimeoutInMS = connectionTimeoutInMS;
-- }
--
-- public static int getReadOperationTimeoutMS() {
-- return readOperationTimeoutMS;
-- }
--
-- public static int getWriteOperationTimeoutMS() {
-- return writeOperationTimeoutMS;
-- }
--
-- public static int getSyncLeaderMaxWaitMs() {
-- return syncLeaderMaxWaitMs;
-- }
--
-- public static void setSyncLeaderMaxWaitMs(int syncLeaderMaxWaitMs) {
-- RaftServer.syncLeaderMaxWaitMs = syncLeaderMaxWaitMs;
-- }
--
-- public static long getHeartBeatIntervalMs() {
-- return heartBeatIntervalMs;
-- }
--
-- public static void setHeartBeatIntervalMs(long heartBeatIntervalMs) {
-- RaftServer.heartBeatIntervalMs = heartBeatIntervalMs;
-- }
--
-- /**
-- * Establish a thrift server with the configurations in ClusterConfig to listen to and respond to
-- * thrift RPCs. Calling the method twice does not induce side effects.
-- *
-- * @throws TTransportException
-- */
-- @SuppressWarnings("java:S1130") // thrown in override method
-- public void start() throws TTransportException, StartupException {
-- if (poolServer != null) {
-- return;
-- }
--
-- establishServer();
-- }
--
-- /**
-- * Stop the thrift server, close the socket and interrupt all in progress RPCs. Calling the method
-- * twice does not induce side effects.
-- */
-- public void stop() {
-- if (poolServer == null) {
-- return;
-- }
--
-- try {
-- poolServer.stop();
-- } catch (ConcurrentModificationException e) {
-- // ignore
-- }
-- socket.close();
-- clientService.shutdownNow();
-- socket = null;
-- poolServer = null;
-- }
--
-- /**
-- * @return An AsyncProcessor that contains the extended interfaces of a non-abstract subclass of
-- * RaftService (DataService or MetaService).
-- */
-- abstract TProcessor getProcessor();
--
-- /**
-- * @return A socket that will be used to establish a thrift server to listen to RPC requests.
-- * DataServer and MetaServer use different port, so this is to be determined.
-- * @throws TTransportException
-- */
-- abstract TServerTransport getServerSocket() throws TTransportException;
--
-- /**
-- * Each thrift RPC request will be processed in a separate thread and this will return the name
-- * prefix of such threads. This is used to fast distinguish DataServer and MetaServer in the logs
-- * for the sake of debug.
-- *
-- * @return name prefix of RPC processing threads.
-- */
-- abstract String getClientThreadPrefix();
--
-- /**
-- * The thrift server will be run in a separate thread, and this will be its name. It help you
-- * locate the desired logs quickly when debugging.
-- *
-- * @return The name of the thread running the thrift server.
-- */
-- abstract String getServerClientName();
--
-- private TServer createAsyncServer() throws TTransportException {
-- socket = getServerSocket();
-- TThreadedSelectorServer.Args poolArgs =
-- new TThreadedSelectorServer.Args((TNonblockingServerTransport) socket);
-- poolArgs.maxReadBufferBytes = IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize();
-- poolArgs.selectorThreads(CommonUtils.getCpuCores());
-- int maxConcurrentClientNum =
-- Math.max(CommonUtils.getCpuCores(), config.getMaxConcurrentClientNum());
-- poolArgs.executorService(
-- new ThreadPoolExecutor(
-- CommonUtils.getCpuCores(),
-- maxConcurrentClientNum,
-- poolArgs.getStopTimeoutVal(),
-- poolArgs.getStopTimeoutUnit(),
-- new SynchronousQueue<>(),
-- new ThreadFactory() {
-- private AtomicLong threadIndex = new AtomicLong(0);
--
-- @Override
-- public Thread newThread(Runnable r) {
-- return new Thread(r, getClientThreadPrefix() + threadIndex.incrementAndGet());
-- }
-- }));
-- poolArgs.processor(getProcessor());
-- poolArgs.protocolFactory(protocolFactory);
-- // async service requires FramedTransport
-- poolArgs.transportFactory(RpcTransportFactory.INSTANCE);
--
-- // run the thrift server in a separate thread so that the main thread is not blocked
-- return new TThreadedSelectorServer(poolArgs);
-- }
--
-- private TServer createSyncServer() throws TTransportException {
-- socket = getServerSocket();
-- return ClusterUtils.createTThreadPoolServer(
-- socket, getClientThreadPrefix(), getProcessor(), protocolFactory);
-- }
--
-- private void establishServer() throws TTransportException {
-- logger.info(
-- "[{}] Cluster node {} begins to set up with {} mode",
-- getServerClientName(),
-- thisNode,
-- ClusterDescriptor.getInstance().getConfig().isUseAsyncServer() ? "Async" : "Sync");
--
-- if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
-- poolServer = createAsyncServer();
-- } else {
-- poolServer = createSyncServer();
-- }
--
-- clientService = Executors.newSingleThreadExecutor(r -> new Thread(r, getServerClientName()));
--
-- clientService.submit(() -> poolServer.serve());
--
-- logger.info("[{}] Cluster node {} is up", getServerClientName(), thisNode);
-- }
--
-- @TestOnly
-- public static void setReadOperationTimeoutMS(int readOperationTimeoutMS) {
-- RaftServer.readOperationTimeoutMS = readOperationTimeoutMS;
-- }
--
-- @TestOnly
-- public static void setWriteOperationTimeoutMS(int writeOperationTimeoutMS) {
-- RaftServer.writeOperationTimeoutMS = writeOperationTimeoutMS;
-- }
--}
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java
index ed99c3d,ed99c3d..348cda0
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java
@@@ -23,9 -23,9 +23,8 @@@ import org.apache.iotdb.cluster.config.
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncProcessor;
import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Processor;
--import org.apache.iotdb.cluster.server.MetaClusterServer;
++import org.apache.iotdb.cluster.server.RaftTSMetaServiceImpl;
import org.apache.iotdb.cluster.utils.ClusterUtils;
--
import org.apache.thrift.TProcessor;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TServerSocket;
@@@ -39,22 -39,22 +38,22 @@@ import java.net.InetSocketAddress
public class MetaHeartbeatServer extends HeartbeatServer {
private static Logger logger = LoggerFactory.getLogger(MetaHeartbeatServer.class);
-- private MetaClusterServer metaClusterServer;
++ private RaftTSMetaServiceImpl RaftTSMetaServiceImpl;
/** Do not use this method for initialization */
private MetaHeartbeatServer() {}
-- public MetaHeartbeatServer(Node thisNode, MetaClusterServer metaClusterServer) {
++ public MetaHeartbeatServer(Node thisNode, RaftTSMetaServiceImpl RaftTSMetaServiceImpl) {
super(thisNode);
-- this.metaClusterServer = metaClusterServer;
++ this.RaftTSMetaServiceImpl = RaftTSMetaServiceImpl;
}
@Override
TProcessor getProcessor() {
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
-- return new AsyncProcessor<>(metaClusterServer);
++ return new AsyncProcessor<>(RaftTSMetaServiceImpl);
} else {
-- return new Processor<>(metaClusterServer);
++ return new Processor<>(RaftTSMetaServiceImpl);
}
}
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 39ebf3c,f917916..62ce37e
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@@ -65,11 -65,11 +65,9 @@@ import org.apache.iotdb.cluster.rpc.thr
import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus;
import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncClient;
- import org.apache.iotdb.cluster.server.ClusterTSServiceImpl;
-import org.apache.iotdb.cluster.server.ClientServer;
import org.apache.iotdb.cluster.server.DataClusterServer;
import org.apache.iotdb.cluster.server.HardLinkCleaner;
import org.apache.iotdb.cluster.server.NodeCharacter;
--import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
import org.apache.iotdb.cluster.server.handlers.caller.NodeStatusHandler;
@@@ -87,20 -87,20 +85,22 @@@ import org.apache.iotdb.cluster.utils.S
import org.apache.iotdb.cluster.utils.nodetool.function.Status;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
++import org.apache.iotdb.db.exception.ShutdownException;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
++import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.IoTDB;
++import org.apache.iotdb.db.service.ServiceType;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.db.utils.TimeValuePairUtils;
import org.apache.iotdb.db.utils.TimeValuePairUtils.Intervals;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
--
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TTransportException;
@@@ -145,7 -145,7 +145,7 @@@ import static org.apache.iotdb.cluster.
import static org.apache.iotdb.cluster.utils.ClusterUtils.analyseStartUpCheckResult;
@SuppressWarnings("java:S1135")
--public class MetaGroupMember extends RaftMember {
++public class MetaGroupMember extends RaftMember implements IService {
/** the file that contains the identifier of this node */
static final String NODE_IDENTIFIER_FILE_NAME =
@@@ -209,12 -209,12 +209,6 @@@
/** each node starts a data heartbeat server to transfer heartbeat requests */
private DataHeartbeatServer dataHeartbeatServer;
-- /**
-- * an override of TSServiceImpl, which redirect JDBC and Session requests to the MetaGroupMember
-- * so they can be processed cluster-wide
-- */
- private ClusterTSServiceImpl clusterTSServiceImpl;
- private ClientServer clientServer;
--
private DataClientProvider dataClientProvider;
/**
@@@ -276,7 -276,7 +270,6 @@@
Factory dataMemberFactory = new Factory(factory, this);
dataClusterServer = new DataClusterServer(thisNode, dataMemberFactory, this);
dataHeartbeatServer = new DataHeartbeatServer(thisNode, dataClusterServer);
- clusterTSServiceImpl = new ClusterTSServiceImpl(this);
- clientServer = new ClientServer(this);
startUpStatus = getNewStartUpStatus();
// try loading the partition table if there was a previous cluster
@@@ -333,8 -333,8 +326,8 @@@
}
/**
- * Stop the heartbeat and catch-up thread pool, DataClusterServer, ClusterTSServiceImpl and reportThread.
- * Stop the heartbeat and catch-up thread pool, DataClusterServer, ClientServer and reportThread.
-- * Calling the method twice does not induce side effects.
++ * Stop the heartbeat and catch-up thread pool, DataClusterServer, ClusterTSServiceImpl and
++ * reportThread. Calling the method twice does not induce side effects.
*/
@Override
public void stop() {
@@@ -345,9 -345,9 +338,6 @@@
if (getDataHeartbeatServer() != null) {
getDataHeartbeatServer().stop();
}
- if (clusterTSServiceImpl != null) {
- clusterTSServiceImpl.stop();
- if (clientServer != null) {
- clientServer.stop();
-- }
if (reportThread != null) {
reportThread.shutdownNow();
try {
@@@ -370,15 -370,15 +360,29 @@@
logger.info("{}: stopped", name);
}
++ @Override
++ public void waitAndStop(long milliseconds) {
++ IService.super.waitAndStop(milliseconds);
++ }
++
++ @Override
++ public void shutdown(long milliseconds) throws ShutdownException {
++ IService.super.shutdown(milliseconds);
++ }
++
++ @Override
++ public ServiceType getID() {
++ return ServiceType.CLUSTER_META_ENGINE;
++ }
++
/**
- * Start DataClusterServer and ClusterTSServiceImpl so this node will be able to respond to other nodes
- * Start DataClusterServer and ClientServer so this node will be able to respond to other nodes
-- * and clients.
++ * Start DataClusterServer and ClusterTSServiceImpl so this node will be able to respond to other
++ * nodes and clients.
*/
protected void initSubServers() throws TTransportException, StartupException {
getDataClusterServer().start();
getDataHeartbeatServer().start();
- clusterTSServiceImpl.setCoordinator(this.coordinator);
- clusterTSServiceImpl.start();
- clientServer.setCoordinator(this.coordinator);
- clientServer.start();
++ // TODO FIXME
}
/**
@@@ -715,9 -714,9 +718,9 @@@
/**
* Process a HeartBeatResponse from a follower. If the follower has provided its identifier, try
* registering for it and if all nodes have registered and there is no available partition table,
- * initialize a new one and start the ClusterTSServiceImpl and DataClusterServer. If the follower requires
- * initialize a new one and start the ClientServer and DataClusterServer. If the follower requires
-- * a partition table, add it to the blind node list so that at the next heartbeat this node will
-- * send it a partition table
++ * initialize a new one and start the ClusterTSServiceImpl and DataClusterServer. If the follower
++ * requires a partition table, add it to the blind node list so that at the next heartbeat this
++ * node will send it a partition table
*/
@Override
public void processValidHeartbeatResp(HeartBeatResponse response, Node receiver) {
@@@ -800,8 -799,8 +803,8 @@@
}
/**
- * Start the DataClusterServer and ClusterTSServiceImpl` so this node can serve other nodes and clients.
- * Start the DataClusterServer and ClientServer so this node can serve other nodes and clients.
-- * Also build DataGroupMembers using the partition table.
++ * Start the DataClusterServer and ClusterTSServiceImpl` so this node can serve other nodes and
++ * clients. Also build DataGroupMembers using the partition table.
*/
protected synchronized void startSubServers() {
logger.info("Starting sub-servers...");
@@@ -1452,7 -1451,7 +1455,8 @@@
private TSStatus forwardDataPlanAsync(PhysicalPlan plan, Node receiver, RaftNode header)
throws IOException {
RaftService.AsyncClient client =
-- getClientProvider().getAsyncDataClient(receiver, RaftServer.getWriteOperationTimeoutMS());
++ getClientProvider()
++ .getAsyncDataClient(receiver, ClusterConstant.getWriteOperationTimeoutMS());
return forwardPlanAsync(plan, receiver, header, client);
}
@@@ -1461,7 -1460,7 +1465,8 @@@
Client client;
try {
client =
-- getClientProvider().getSyncDataClient(receiver, RaftServer.getWriteOperationTimeoutMS());
++ getClientProvider()
++ .getSyncDataClient(receiver, ClusterConstant.getWriteOperationTimeoutMS());
} catch (TException e) {
throw new IOException(e);
}
@@@ -1630,7 -1629,7 +1635,7 @@@
client.collectMigrationStatus(migrationStatusHandler);
synchronized (resultRef) {
if (resultRef.get() == null) {
-- resultRef.wait(RaftServer.getConnectionTimeoutInMS());
++ resultRef.wait(ClusterConstant.getConnectionTimeoutInMS());
}
}
return ClusterUtils.deserializeMigrationStatus(resultRef.get());
@@@ -1814,9 -1813,9 +1819,10 @@@
// ignore
}
super.stop();
- if (clusterTSServiceImpl != null) {
- clusterTSServiceImpl.stop();
- if (clientServer != null) {
- clientServer.stop();
-- }
++ // TODO FIXME
++ // if (clusterTSServiceImpl != null) {
++ // clusterTSServiceImpl.stop();
++ // }
logger.info("{} has been removed from the cluster", name);
})
.start();
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
index 9250785,ce941f5..560c0fe
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
@@@ -18,7 -18,7 +18,8 @@@
*/
package org.apache.iotdb.cluster.utils.nodetool;
-import org.apache.iotdb.cluster.ClusterMain;
++import org.apache.commons.collections4.map.MultiKeyMap;
+import org.apache.iotdb.cluster.ClusterIoTDB;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.partition.PartitionGroup;
@@@ -26,8 -26,8 +27,8 @@@ import org.apache.iotdb.cluster.partiti
import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
--import org.apache.iotdb.cluster.server.MetaClusterServer;
import org.apache.iotdb.cluster.server.NodeCharacter;
++import org.apache.iotdb.cluster.server.RaftTSMetaServiceImpl;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.cluster.server.monitor.Timer;
@@@ -40,8 -40,8 +41,6 @@@ import org.apache.iotdb.db.service.ISer
import org.apache.iotdb.db.service.JMXService;
import org.apache.iotdb.db.service.ServiceType;
import org.apache.iotdb.tsfile.utils.Pair;
--
--import org.apache.commons.collections4.map.MultiKeyMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -201,11 -201,11 +200,11 @@@ public class ClusterMonitor implements
}
private MetaGroupMember getMetaGroupMember() {
- MetaClusterServer metaClusterServer = ClusterIoTDB.getInstance().getMetaServer();
- MetaClusterServer metaClusterServer = ClusterMain.getMetaServer();
-- if (metaClusterServer == null) {
++ RaftTSMetaServiceImpl RaftTSMetaServiceImpl = ClusterIoTDB.getInstance().getMetaServer();
++ if (RaftTSMetaServiceImpl == null) {
return null;
}
-- return metaClusterServer.getMember();
++ return RaftTSMetaServiceImpl.getMember();
}
private PartitionTable getPartitionTable() {
diff --cc cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java
index 7565e9d,70fbb66..c4f1dd7
--- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java
@@@ -20,12 -20,12 +20,11 @@@
package org.apache.iotdb.cluster.integration;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
--import org.apache.iotdb.cluster.server.MetaClusterServer;
++import org.apache.iotdb.cluster.server.RaftTSMetaServiceImpl;
import org.apache.iotdb.cluster.utils.Constants;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.session.Session;
--
import org.junit.After;
import org.junit.Before;
@@@ -34,7 -34,7 +33,7 @@@ import java.util.List
public abstract class BaseSingleNodeTest {
-- private MetaClusterServer metaServer;
++ private RaftTSMetaServiceImpl metaServer;
private boolean useAsyncServer;
private List<String> seedNodeUrls;
@@@ -44,14 -44,13 +43,14 @@@
@Before
public void setUp() throws Exception {
initConfigs();
-- metaServer = new MetaClusterServer();
++ metaServer = new RaftTSMetaServiceImpl();
metaServer.start();
metaServer.buildCluster();
}
@After
public void tearDown() throws Exception {
- //TODO fixme
++ // TODO fixme
metaServer.stop();
recoverConfigs();
EnvironmentUtils.cleanEnv();
diff --cc cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImplTest.java
index 499efce,5dde20f..60da68d8
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImplTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImplTest.java
@@@ -19,15 -19,15 +19,14 @@@
package org.apache.iotdb.cluster.server.clusterinfo;
-import org.apache.iotdb.cluster.ClusterMain;
+import org.apache.iotdb.cluster.ClusterIoTDB;
import org.apache.iotdb.cluster.rpc.thrift.DataPartitionEntry;
import org.apache.iotdb.cluster.rpc.thrift.Node;
--import org.apache.iotdb.cluster.server.MetaClusterServer;
++import org.apache.iotdb.cluster.server.RaftTSMetaServiceImpl;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.cluster.server.member.MetaGroupMemberTest;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.PartialPath;
--
import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Assert;
@@@ -48,13 -48,13 +47,13 @@@ public class ClusterInfoServiceImplTes
metaGroupMemberTest.setUp();
MetaGroupMember metaGroupMember = metaGroupMemberTest.getTestMetaGroupMember();
-- MetaClusterServer metaClusterServer = new MetaClusterServer();
-- metaClusterServer.getMember().stop();
-- metaClusterServer.setMetaGroupMember(metaGroupMember);
++ RaftTSMetaServiceImpl RaftTSMetaServiceImpl = new RaftTSMetaServiceImpl();
++ RaftTSMetaServiceImpl.getMember().stop();
++ RaftTSMetaServiceImpl.setMetaGroupMember(metaGroupMember);
- ClusterIoTDB.setMetaClusterServer(metaClusterServer);
- ClusterMain.setMetaClusterServer(metaClusterServer);
++ ClusterIoTDB.setMetaClusterServer(RaftTSMetaServiceImpl);
-- metaClusterServer.getIoTDB().metaManager.setStorageGroup(new PartialPath("root", "sg"));
++ RaftTSMetaServiceImpl.getIoTDB().metaManager.setStorageGroup(new PartialPath("root", "sg"));
// metaClusterServer.getMember()
impl = new ClusterInfoServiceImpl();
}
diff --cc server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
index 6f425ae,6f425ae..7c75e89
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
@@@ -27,6 -27,6 +27,7 @@@ import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
++import java.util.concurrent.TimeUnit;
/** This class is used to create thread pool which must contain the pool name. */
public class IoTDBThreadPoolFactory {
@@@ -132,6 -132,6 +133,23 @@@
/** function for creating thrift rpc client thread pool. */
public static ExecutorService createThriftRpcClientThreadPool(
++ int minWorkerThreads,
++ int maxWorkerThreads,
++ int stopTimeoutVal,
++ TimeUnit stopTimeoutUnit,
++ String poolName) {
++ SynchronousQueue<Runnable> executorQueue = new SynchronousQueue<>();
++ return new ThreadPoolExecutor(
++ minWorkerThreads,
++ maxWorkerThreads,
++ stopTimeoutVal,
++ stopTimeoutUnit,
++ executorQueue,
++ new IoTThreadFactory(poolName));
++ }
++
++ /** function for creating thrift rpc client thread pool. */
++ public static ExecutorService createThriftRpcClientThreadPool(
TThreadPoolServer.Args args, String poolName, Thread.UncaughtExceptionHandler handler) {
SynchronousQueue<Runnable> executorQueue = new SynchronousQueue<>();
return new ThreadPoolExecutor(
diff --cc server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index bbbd6f1,0850b75..d37f57e
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@@ -46,9 -46,7 +46,12 @@@ public enum ThreadName
QUERY_SERVICE("Query"),
WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"),
CONTINUOUS_QUERY_SERVICE("ContinuousQueryTaskPoolManager"),
- CLUSTER_INFO_SERVICE("ClusterInfoClient");
+ CLUSTER_INFO_SERVICE("ClusterInfoClient"),
+ CLUSTER_RPC_SERVICE("ClusterRPC"),
- CLUSTER_RPC_CLIENT("Cluster-RPC-Client");
++ CLUSTER_RPC_CLIENT("Cluster-RPC-Client"),
++ CLUSTER_META_SERVICE("ClusterMetaService"),
++ CLUSTER_META_HEARTBEAT_SERVICE("ClusterMetaHeartbeatService"),
++ CLUSTER_DATA_SERVICE("ClusterDataService");
private final String name;
diff --cc server/src/main/java/org/apache/iotdb/db/query/control/TracingManager.java
index 6b61323,b2dd13a..83db4bc
--- a/server/src/main/java/org/apache/iotdb/db/query/control/TracingManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/TracingManager.java
@@@ -43,19 -43,13 +43,20 @@@ public class TracingManager
private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
private BufferedWriter writer;
private Map<Long, Long> queryStartTime = new ConcurrentHashMap<>();
+ private Map<Long, TracingInfo> tracingInfoMap = new ConcurrentHashMap<>();
- public TracingManager(String dirName, String logFileName) {
- initTracingManager(dirName, logFileName);
+ private TracingManager() {
+ initTracingManager();
}
- public void initTracingManager(String dirName, String logFileName) {
+ public void initTracingManager() {
+ if (this.writer != null) {
- //the tracing manager has been initialized.
++ // the tracing manager has been initialized.
+ return;
+ }
+ String dirName = IoTDBDescriptor.getInstance().getConfig().getTracingDir();
+ String logFileName = IoTDBConstant.TRACING_LOG;
+
File tracingDir = SystemFileFactory.INSTANCE.getFile(dirName);
if (!tracingDir.exists()) {
if (tracingDir.mkdirs()) {
diff --cc server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 9cd89e6,1154e3d..f15d384
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@@ -152,14 -152,6 +152,14 @@@ public class IoTDB implements IoTDBMBea
private void deactivate() {
logger.info("Deactivating IoTDB...");
- //some user may call Tracing on but do not close tracing.
- //so, when remove the system, we have to close the tracing
++ // some user may call Tracing on but do not close tracing.
++ // so, when remove the system, we have to close the tracing
+ if (IoTDBDescriptor.getInstance().getConfig().isEnablePerformanceTracing()) {
+ TracingManager.getInstance().close();
+ }
+ PrimitiveArrayManager.close();
+ SystemInfo.getInstance().close();
+
registerManager.deregisterAll();
JMXService.deregisterMBean(mbeanName);
logger.info("IoTDB is deactivated.");
diff --cc server/src/main/java/org/apache/iotdb/db/service/RPCService.java
index be5f2f7,5bbddec..6f9292c
--- a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
@@@ -47,6 -55,6 +47,7 @@@ public class RPCService extends ThriftS
(TSServiceImpl)
Class.forName(IoTDBDescriptor.getInstance().getConfig().getRpcImplClassName())
.newInstance();
++ initSyncedServiceImpl(null);
processor = new Processor<>(impl);
}
diff --cc server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index febacac,2f61d90..90b0136
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@@ -55,9 -55,6 +55,12 @@@ public enum ServiceType
SYSTEMINFO_SERVICE("MemTable Monitor Service", "MemTable, Monitor"),
CONTINUOUS_QUERY_SERVICE("Continuous Query Service", "Continuous Query Service"),
CLUSTER_INFO_SERVICE("Cluster Monitor Service (thrift-based)", "Cluster Monitor-Thrift"),
+
- CLUSTER_RPC_SERVICE("Cluster RPC ServerService", "ClusterRPCService"),
-
++ CLUSTER_RPC_SERVICE("Cluster RPC Service", "ClusterRPCService"),
++ CLUSTER_META_RPC_SERVICE("Cluster Meta RPC Service", "ClusterMetaRPCService"),
++ CLUSTER_DATA_RPC_SERVICE("Cluster Data RPC Service", "ClusterDataRPCService"),
++ CLUSTER_META_ENGINE("Cluster Meta Engine", "ClusterMetaEngine"),
++ CLUSTER_META_HEART_BEAT("Cluster Meta Heartbeat Service", "ClusterMetaHeartbeat"),
;
private final String name;
diff --cc server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftService.java
index dfb2526,d975743..45fb0ec
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftService.java
@@@ -19,13 -19,13 +19,10 @@@
package org.apache.iotdb.db.service.thrift;
--import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
--import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.JMXService;
--
import org.apache.thrift.TProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -78,6 -83,6 +75,17 @@@ public abstract class ThriftService imp
JMXService.deregisterMBean(mbeanName);
}
++ boolean setSyncedImpl = false;
++ boolean setAsyncedImpl = false;
++
++ public void initSyncedServiceImpl(Object serviceImpl) {
++ setSyncedImpl = true;
++ }
++
++ public void initAsyncedServiceImpl(Object serviceImpl) {
++ setAsyncedImpl = true;
++ }
++
public abstract void initTProcessor()
throws ClassNotFoundException, IllegalAccessException, InstantiationException;
@@@ -101,6 -106,6 +109,10 @@@
try {
reset();
initTProcessor();
++ if (setSyncedImpl || setAsyncedImpl) {
++ throw new StartupException(
++ "At least one service implementataion of {} should be set.", this.getID().getName());
++ }
initThriftServiceThread();
thriftServiceThread.setThreadStopLatch(stopLatch);
thriftServiceThread.start();
diff --cc server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
index 2564d05,612d187..ed2237d
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
@@@ -24,14 -24,14 +24,18 @@@ import org.apache.iotdb.db.conf.IoTDBCo
import org.apache.iotdb.db.exception.runtime.RPCServiceException;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.rpc.RpcTransportFactory;
--
++import org.apache.thrift.TBaseAsyncProcessor;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
++import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.server.TThreadPoolServer;
++import org.apache.thrift.server.TThreadedSelectorServer;
++import org.apache.thrift.transport.TNonblockingServerSocket;
++import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportException;
@@@ -40,6 -40,6 +44,7 @@@ import org.slf4j.LoggerFactory
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
++import java.util.concurrent.TimeUnit;
public class ThriftServiceThread extends Thread {
@@@ -51,8 -51,8 +56,95 @@@
private String serviceName;
private TProtocolFactory protocolFactory;
-- private TThreadPoolServer.Args poolArgs;
++ // currently, we can reuse the ProtocolFactory instance.
++ private static TCompactProtocol.Factory compactProtocolFactory = new TCompactProtocol.Factory();
++ private static TBinaryProtocol.Factory binaryProtocolFactory = new TBinaryProtocol.Factory();
++
++ private void initProtocolFactory(boolean compress) {
++ protocolFactory = getProtocolFactory(compress);
++ }
++
++ public static TProtocolFactory getProtocolFactory(boolean compress) {
++ if (compress) {
++ return compactProtocolFactory;
++ } else {
++ return binaryProtocolFactory;
++ }
++ }
++
++ private void catchFailedInitialization(TTransportException e) throws RPCServiceException {
++ close();
++ if (threadStopLatch == null) {
++ logger.debug("Stop Count Down latch is null");
++ } else {
++ logger.debug("Stop Count Down latch is {}", threadStopLatch.getCount());
++ }
++ if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
++ threadStopLatch.countDown();
++ }
++ logger.debug(
++ "{}: close TThreadPoolServer and TServerSocket for {}",
++ IoTDBConstant.GLOBAL_DB_NAME,
++ serviceName);
++ throw new RPCServiceException(
++ String.format(
++ "%s: failed to start %s, because ", IoTDBConstant.GLOBAL_DB_NAME, serviceName),
++ e);
++ }
++
++ /** for asynced ThriftService */
++ @SuppressWarnings("squid:S107")
++ public ThriftServiceThread(
++ TBaseAsyncProcessor processor,
++ String serviceName,
++ String threadsName,
++ String bindAddress,
++ int port,
++ int maxWorkerThreads,
++ int timeoutSecond,
++ TServerEventHandler serverEventHandler,
++ boolean compress,
++ int connectionTimeoutInMS,
++ int maxReadBufferBytes,
++ ServerType serverType) {
++ initProtocolFactory(compress);
++ this.serviceName = serviceName;
++ try {
++ serverTransport = openNonblockingTransport(bindAddress, port, connectionTimeoutInMS);
++ switch (serverType) {
++ case SELECTOR:
++ TThreadedSelectorServer.Args poolArgs =
++ initAsyncedSelectorPoolArgs(
++ processor, threadsName, maxWorkerThreads, timeoutSecond, maxReadBufferBytes);
++ poolServer = new TThreadedSelectorServer(poolArgs);
++ break;
++ case HSHA:
++ THsHaServer.Args poolArgs1 =
++ initAsyncedHshaPoolArgs(
++ processor, threadsName, maxWorkerThreads, timeoutSecond, maxReadBufferBytes);
++ poolServer = new THsHaServer(poolArgs1);
++ break;
++ }
++ poolServer.setServerEventHandler(serverEventHandler);
++ } catch (TTransportException e) {
++ catchFailedInitialization(e);
++ }
++ }
++
++ /**
++ * for synced ThriftServiceThread
++ *
++ * @param processor
++ * @param serviceName
++ * @param threadsName
++ * @param bindAddress
++ * @param port
++ * @param maxWorkerThreads
++ * @param timeoutSecond
++ * @param serverEventHandler
++ * @param compress
++ */
@SuppressWarnings("squid:S107")
public ThriftServiceThread(
TProcessor processor,
@@@ -61,53 -61,53 +153,84 @@@
String bindAddress,
int port,
int maxWorkerThreads,
- int timeoutMs,
+ int timeoutSecond,
TServerEventHandler serverEventHandler,
boolean compress) {
-- if (compress) {
-- protocolFactory = new TCompactProtocol.Factory();
-- } else {
-- protocolFactory = new TBinaryProtocol.Factory();
-- }
++ initProtocolFactory(compress);
this.serviceName = serviceName;
try {
serverTransport = openTransport(bindAddress, port);
-- poolArgs =
-- new TThreadPoolServer.Args(serverTransport)
-- .maxWorkerThreads(maxWorkerThreads)
-- .minWorkerThreads(CommonUtils.getCpuCores())
- .stopTimeoutVal(timeoutSecond);
- .stopTimeoutVal(timeoutMs);
-- poolArgs.executorService =
-- IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs, threadsName);
-- poolArgs.processor(processor);
-- poolArgs.protocolFactory(protocolFactory);
-- poolArgs.transportFactory(RpcTransportFactory.INSTANCE);
++ TThreadPoolServer.Args poolArgs =
++ initSyncedPoolArgs(processor, threadsName, maxWorkerThreads, timeoutSecond);
poolServer = new TThreadPoolServer(poolArgs);
poolServer.setServerEventHandler(serverEventHandler);
} catch (TTransportException e) {
-- close();
-- if (threadStopLatch == null) {
-- logger.debug("Stop Count Down latch is null");
-- } else {
-- logger.debug("Stop Count Down latch is {}", threadStopLatch.getCount());
-- }
-- if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
-- threadStopLatch.countDown();
-- }
-- logger.debug(
-- "{}: close TThreadPoolServer and TServerSocket for {}",
-- IoTDBConstant.GLOBAL_DB_NAME,
-- serviceName);
-- throw new RPCServiceException(
-- String.format(
-- "%s: failed to start %s, because ", IoTDBConstant.GLOBAL_DB_NAME, serviceName),
-- e);
++ catchFailedInitialization(e);
}
}
++ private TThreadPoolServer.Args initSyncedPoolArgs(
++ TProcessor processor, String threadsName, int maxWorkerThreads, int timeoutSecond) {
++ TThreadPoolServer.Args poolArgs = new TThreadPoolServer.Args(serverTransport);
++ poolArgs
++ .maxWorkerThreads(maxWorkerThreads)
++ .minWorkerThreads(CommonUtils.getCpuCores())
++ .stopTimeoutVal(timeoutSecond);
++ poolArgs.executorService =
++ IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs, threadsName);
++ poolArgs.processor(processor);
++ poolArgs.protocolFactory(protocolFactory);
++ poolArgs.transportFactory(RpcTransportFactory.INSTANCE);
++ return poolArgs;
++ }
++
++ private TThreadedSelectorServer.Args initAsyncedSelectorPoolArgs(
++ TBaseAsyncProcessor processor,
++ String threadsName,
++ int maxWorkerThreads,
++ int timeoutSecond,
++ int maxReadBufferBytes) {
++ TThreadedSelectorServer.Args poolArgs =
++ new TThreadedSelectorServer.Args((TNonblockingServerTransport) serverTransport);
++ poolArgs.maxReadBufferBytes = maxReadBufferBytes;
++ poolArgs.selectorThreads(CommonUtils.getCpuCores());
++ poolArgs.executorService(
++ IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(
++ CommonUtils.getCpuCores(),
++ maxWorkerThreads,
++ timeoutSecond,
++ TimeUnit.SECONDS,
++ threadsName));
++ poolArgs.processor(processor);
++ poolArgs.protocolFactory(protocolFactory);
++ poolArgs.transportFactory(RpcTransportFactory.INSTANCE);
++ return poolArgs;
++ }
++
++ private THsHaServer.Args initAsyncedHshaPoolArgs(
++ TBaseAsyncProcessor processor,
++ String threadsName,
++ int maxWorkerThreads,
++ int timeoutSecond,
++ int maxReadBufferBytes) {
++ THsHaServer.Args poolArgs = new THsHaServer.Args((TNonblockingServerTransport) serverTransport);
++ poolArgs.maxReadBufferBytes = maxReadBufferBytes;
++ poolArgs.executorService(
++ IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(
++ CommonUtils.getCpuCores(),
++ maxWorkerThreads,
++ timeoutSecond,
++ TimeUnit.SECONDS,
++ threadsName));
++ poolArgs.processor(processor);
++ poolArgs.protocolFactory(protocolFactory);
++ poolArgs.transportFactory(RpcTransportFactory.INSTANCE);
++ return poolArgs;
++ }
++
@SuppressWarnings("java:S2259")
-- public TServerTransport openTransport(String bindAddress, int port) throws TTransportException {
++ private TServerTransport openTransport(String bindAddress, int port) throws TTransportException {
int maxRetry = 5;
long retryIntervalMS = 5000;
TTransportException lastExp = null;
@@@ -127,6 -127,6 +250,28 @@@
throw lastExp;
}
++ private TServerTransport openNonblockingTransport(
++ String bindAddress, int port, int connectionTimeoutInMS) throws TTransportException {
++ int maxRetry = 5;
++ long retryIntervalMS = 5000;
++ TTransportException lastExp = null;
++ for (int i = 0; i < maxRetry; i++) {
++ try {
++ return new TNonblockingServerSocket(
++ new InetSocketAddress(bindAddress, port), connectionTimeoutInMS);
++ } catch (TTransportException e) {
++ lastExp = e;
++ try {
++ Thread.sleep(retryIntervalMS);
++ } catch (InterruptedException interruptedException) {
++ Thread.currentThread().interrupt();
++ break;
++ }
++ }
++ }
++ throw lastExp;
++ }
++
public void setThreadStopLatch(CountDownLatch threadStopLatch) {
this.threadStopLatch = threadStopLatch;
}
@@@ -177,4 -177,4 +322,9 @@@
}
return false;
}
++
++ public static enum ServerType {
++ SELECTOR,
++ HSHA
++ }
}
diff --cc server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
index 96ec8ef,96ec8ef..c7a1b9e
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
@@@ -29,7 -29,7 +29,6 @@@ import org.apache.iotdb.db.sync.receive
import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogAnalyzer;
import org.apache.iotdb.db.sync.receiver.transfer.SyncServiceImpl;
import org.apache.iotdb.service.sync.thrift.SyncService;
--
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -61,6 -61,6 +60,7 @@@ public class SyncServerManager extends
@Override
public void initTProcessor() {
++ initSyncedServiceImpl(null);
serviceImpl = new SyncServiceImpl();
processor = new SyncService.Processor<>(serviceImpl);
}
@@@ -93,6 -93,6 +93,11 @@@
}
@Override
++ public int getRPCPort() {
++ return getBindPort();
++ }
++
++ @Override
public void startService() throws StartupException {
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
if (!config.isSyncEnable()) {