You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/08/10 16:46:47 UTC

[iotdb] 01/07: temporary commit for refactor thrift rpc

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch cluster-
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5bdf9187b5a6a74904b5d2f64ff1b564188b8ee8
Merge: 4f92cc5 b484f2e
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Tue Aug 3 19:17:27 2021 +0800

    temporary commit for refactor thrift rpc

 README_ZH.md                                       |    2 +-
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4   |   73 +-
 .../org/apache/iotdb/cluster/ClusterIoTDB.java     |  159 ++--
 .../iotdb/cluster/config/ClusterConstant.java      |   52 +
 .../iotdb/cluster/coordinator/Coordinator.java     |    8 +-
 .../iotdb/cluster/log/applier/BaseApplier.java     |   17 +-
 .../log/manage/PartitionedSnapshotLogManager.java  |    8 +-
 .../apache/iotdb/cluster/metadata/CMManager.java   |  223 ++---
 .../apache/iotdb/cluster/metadata/MetaPuller.java  |   27 +-
 .../cluster/query/ClusterPhysicalGenerator.java    |   18 +-
 .../iotdb/cluster/query/ClusterPlanExecutor.java   |   44 +-
 .../apache/iotdb/cluster/query/ClusterPlanner.java |   17 +-
 .../iotdb/cluster/query/ClusterQueryRouter.java    |   40 +
 .../cluster/query/ClusterUDTFQueryExecutor.java    |  111 +++
 .../iotdb/cluster/query/LocalQueryExecutor.java    |   32 +-
 .../cluster/query/aggregate/ClusterAggregator.java |    9 +-
 .../cluster/query/fill/ClusterPreviousFill.java    |   10 +-
 .../query/groupby/RemoteGroupByExecutor.java       |   27 +-
 .../query/last/ClusterLastQueryExecutor.java       |   26 +-
 .../cluster/query/manage/ClusterQueryManager.java  |    8 +-
 .../cluster/query/reader/ClusterReaderFactory.java |    9 +-
 .../cluster/query/reader/ClusterTimeGenerator.java |   14 +-
 .../iotdb/cluster/query/reader/DataSourceInfo.java |   28 +-
 .../reader/RemoteSeriesReaderByTimestamp.java      |    3 +
 .../query/reader/RemoteSimpleSeriesReader.java     |    3 +
 .../query/reader/mult/MultDataSourceInfo.java      |    8 +-
 .../query/reader/mult/RemoteMultSeriesReader.java  |   10 +-
 .../iotdb/cluster/server/ClusterRPCService.java    |   17 +-
 .../iotdb/cluster/server/ClusterTSServiceImpl.java |  112 +--
 .../iotdb/cluster/server/MetaClusterServer.java    |  369 -------
 .../iotdb/cluster/server/MetaClusterServer2.java   |  372 ++++++++
 .../apache/iotdb/cluster/server/RaftServer.java    |  263 -----
 .../cluster/server/heartbeat/HeartbeatThread.java  |    6 +
 .../server/heartbeat/MetaHeartbeatServer.java      |   13 +-
 .../cluster/server/member/MetaGroupMember.java     |   72 +-
 .../apache/iotdb/cluster/utils/PartitionUtils.java |    4 +-
 .../cluster/utils/nodetool/ClusterMonitor.java     |   11 +-
 .../org/apache/iotdb/cluster/common/IoTDBTest.java |    7 +-
 .../org/apache/iotdb/cluster/common/TestUtils.java |    7 +-
 .../cluster/integration/BaseSingleNodeTest.java    |    9 +-
 .../cluster/log/applier/DataLogApplierTest.java    |   40 +-
 .../cluster/log/logtypes/SerializeLogTest.java     |    4 +-
 .../cluster/log/snapshot/FileSnapshotTest.java     |    2 +-
 .../iotdb/cluster/partition/MManagerWhiteBox.java  |    2 +-
 .../query/ClusterAggregateExecutorTest.java        |    4 +-
 .../query/ClusterDataQueryExecutorTest.java        |   10 +-
 .../cluster/query/ClusterFillExecutorTest.java     |    4 +-
 .../query/ClusterPhysicalGeneratorTest.java        |    3 +-
 .../cluster/query/ClusterPlanExecutorTest.java     |    8 +-
 .../cluster/query/ClusterQueryRouterTest.java      |   44 +-
 .../query/ClusterUDTFQueryExecutorTest.java        |  116 +++
 .../iotdb/cluster/query/LoadConfigurationTest.java |    2 +-
 .../ClusterGroupByNoVFilterDataSetTest.java        |    2 +-
 .../groupby/ClusterGroupByVFilterDataSetTest.java  |    2 +-
 .../query/groupby/MergeGroupByExecutorTest.java    |    4 +-
 .../query/groupby/RemoteGroupByExecutorTest.java   |    4 +-
 .../query/manage/ClusterQueryManagerTest.java      |   12 +-
 .../query/reader/ClusterReaderFactoryTest.java     |    2 +-
 .../query/reader/ClusterTimeGeneratorTest.java     |    4 +-
 .../clusterinfo/ClusterInfoServiceImplTest.java    |   13 +-
 .../cluster/server/member/DataGroupMemberTest.java |   14 +-
 .../cluster/server/member/MetaGroupMemberTest.java |   12 +-
 docs/Development/ContributeGuide.md                |   37 +-
 docs/SystemDesign/SchemaManager/SchemaManager.md   |   39 +-
 docs/SystemDesign/TsFile/Format.md                 |  477 +++++-----
 docs/UserGuide/Advanced-Features/Select-Into.md    |  235 +++++
 .../Programming-MQTT.md                            |   79 +-
 .../UserGuide/Ecosystem Integration/Flink IoTDB.md |    3 +-
 .../DDL-Data-Definition-Language.md                |    6 +-
 .../UserGuide/System-Tools/Load-External-Tsfile.md |   32 +-
 docs/zh/Community/Community-Powered By.md          |   41 +-
 docs/zh/Community/Feedback.md                      |   18 +-
 docs/zh/Development/Committer.md                   |   38 +-
 docs/zh/Development/ContributeGuide.md             |   59 +-
 docs/zh/Development/HowToCommit.md                 |   40 +-
 docs/zh/Development/VoteRelease.md                 |    7 +-
 docs/zh/Download/README.md                         |   73 +-
 docs/zh/SystemDesign/Architecture/Architecture.md  |    1 -
 docs/zh/SystemDesign/Client/RPC.md                 |    3 -
 docs/zh/SystemDesign/Connector/Hive-TsFile.md      |   11 +-
 docs/zh/SystemDesign/Connector/Spark-IOTDB.md      |   39 +-
 docs/zh/SystemDesign/Connector/Spark-TsFile.md     |   25 +-
 docs/zh/SystemDesign/DataQuery/AggregationQuery.md |   31 +-
 .../SystemDesign/DataQuery/AlignByDeviceQuery.md   |   12 +-
 docs/zh/SystemDesign/DataQuery/FillFunction.md     |   30 +-
 docs/zh/SystemDesign/DataQuery/GroupByFillQuery.md |   37 +-
 docs/zh/SystemDesign/DataQuery/GroupByQuery.md     |   48 +-
 docs/zh/SystemDesign/DataQuery/LastQuery.md        |   22 +-
 docs/zh/SystemDesign/DataQuery/OrderByTimeQuery.md |   75 +-
 .../zh/SystemDesign/DataQuery/QueryFundamentals.md |   58 +-
 docs/zh/SystemDesign/DataQuery/RawDataQuery.md     |   62 +-
 docs/zh/SystemDesign/DataQuery/SeriesReader.md     |   24 +-
 docs/zh/SystemDesign/QueryEngine/Planner.md        |    1 -
 .../QueryEngine/ResultSetConstruction.md           |    2 +-
 .../zh/SystemDesign/SchemaManager/SchemaManager.md |  190 ++--
 docs/zh/SystemDesign/StorageEngine/Compaction.md   |   40 +-
 .../SystemDesign/StorageEngine/DataManipulation.md |   41 +-
 .../zh/SystemDesign/StorageEngine/DataPartition.md |   24 +-
 docs/zh/SystemDesign/StorageEngine/FileLists.md    |   39 +-
 docs/zh/SystemDesign/StorageEngine/FlushManager.md |    4 +-
 docs/zh/SystemDesign/StorageEngine/MergeManager.md |   22 +-
 docs/zh/SystemDesign/StorageEngine/Recover.md      |   29 +-
 .../zh/SystemDesign/StorageEngine/StorageEngine.md |    2 +-
 docs/zh/SystemDesign/StorageEngine/WAL.md          |    4 +-
 docs/zh/SystemDesign/Tools/Sync.md                 |  105 +-
 docs/zh/SystemDesign/TsFile/Format.md              |  584 ++++++------
 docs/zh/SystemDesign/TsFile/Read.md                |  110 +--
 docs/zh/SystemDesign/TsFile/TsFile.md              |    1 -
 docs/zh/SystemDesign/TsFile/Write.md               |    3 +-
 .../zh/UserGuide/API/Programming-Cpp-Native-API.md |   87 +-
 docs/zh/UserGuide/API/Programming-Go-Native-API.md |   18 +-
 docs/zh/UserGuide/API/Programming-JDBC.md          |   14 +-
 .../UserGuide/API/Programming-Java-Native-API.md   |   59 +-
 .../UserGuide/API/Programming-Python-Native-API.md |   70 +-
 docs/zh/UserGuide/API/Programming-TsFile-API.md    |  137 ++-
 docs/zh/UserGuide/API/Time-zone.md                 |    1 -
 .../Administration-Management/Administration.md    |   35 +-
 docs/zh/UserGuide/Advanced-Features/Alerting.md    |   15 +-
 .../Advanced-Features/Continuous-Query.md          |   17 +-
 docs/zh/UserGuide/Advanced-Features/Select-Into.md |  234 +++++
 docs/zh/UserGuide/Advanced-Features/Triggers.md    |  170 +---
 .../Advanced-Features/UDF-User-Defined-Function.md |  225 ++---
 docs/zh/UserGuide/Appendix/Config-Manual.md        |  139 ++-
 docs/zh/UserGuide/Appendix/SQL-Reference.md        |  160 ++--
 docs/zh/UserGuide/Appendix/Status-Codes.md         |   19 +-
 docs/zh/UserGuide/CLI/Command-Line-Interface.md    |  123 ++-
 docs/zh/UserGuide/Cluster/Cluster-Setup-Example.md |   12 +-
 docs/zh/UserGuide/Cluster/Cluster-Setup.md         |   67 +-
 .../Collaboration-of-Edge-and-Cloud/Sync-Tool.md   |   40 +-
 .../Programming-MQTT.md                            |  111 ++-
 .../Programming-Thrift.md                          |   30 +-
 docs/zh/UserGuide/Comparison/TSDB-Comparison.md    |  201 ++--
 docs/zh/UserGuide/Data-Concept/Compression.md      |   12 +-
 .../Data-Concept/Data-Model-and-Terminology.md     |   59 +-
 docs/zh/UserGuide/Data-Concept/Data-Type.md        |    4 +-
 docs/zh/UserGuide/Data-Concept/Encoding.md         |   24 +-
 docs/zh/UserGuide/Data-Concept/SDT.md              |   30 +-
 docs/zh/UserGuide/Ecosystem Integration/DBeaver.md |   20 +-
 .../UserGuide/Ecosystem Integration/Flink IoTDB.md |    5 +-
 .../Ecosystem Integration/Flink TsFile.md          |   17 +-
 docs/zh/UserGuide/Ecosystem Integration/Grafana.md |   72 +-
 .../UserGuide/Ecosystem Integration/Hive TsFile.md |   55 +-
 .../Ecosystem Integration/MapReduce TsFile.md      |   39 +-
 .../UserGuide/Ecosystem Integration/Spark IoTDB.md |   17 +-
 .../Ecosystem Integration/Spark TsFile.md          |   88 +-
 .../Ecosystem Integration/Writing Data on HDFS.md  |   46 +-
 .../Ecosystem Integration/Zeppelin-IoTDB.md        |   51 +-
 .../zh/UserGuide/FAQ/Frequently-asked-questions.md |   33 +-
 .../UserGuide/IoTDB-Introduction/Architecture.md   |   14 +-
 docs/zh/UserGuide/IoTDB-Introduction/Features.md   |    8 +-
 .../zh/UserGuide/IoTDB-Introduction/Publication.md |    2 +-
 docs/zh/UserGuide/IoTDB-Introduction/Scenario.md   |   20 +-
 .../UserGuide/IoTDB-Introduction/What-is-IoTDB.md  |    4 +-
 .../DDL-Data-Definition-Language.md                |   72 +-
 .../DML-Data-Manipulation-Language.md              |  364 ++++---
 .../IoTDB-SQL-Language/Maintenance-Command.md      |   10 +-
 docs/zh/UserGuide/QuickStart/Files.md              |   17 +-
 docs/zh/UserGuide/QuickStart/QuickStart.md         |   78 +-
 docs/zh/UserGuide/QuickStart/ServerFileList.md     |   64 +-
 docs/zh/UserGuide/QuickStart/WayToGetIoTDB.md      |   43 +-
 docs/zh/UserGuide/System-Tools/CSV-Tool.md         |   24 +-
 docs/zh/UserGuide/System-Tools/JMX-Tool.md         |   24 +-
 .../UserGuide/System-Tools/Load-External-Tsfile.md |   58 +-
 docs/zh/UserGuide/System-Tools/MLogParser-Tool.md  |    7 +-
 .../System-Tools/Monitor-and-Log-Tools.md          |   96 +-
 docs/zh/UserGuide/System-Tools/NodeTool.md         |  117 ++-
 .../Query-History-Visualization-Tool.md            |    6 +-
 docs/zh/UserGuide/System-Tools/Watermark-Tool.md   |   34 +-
 docs/zh/UserGuide/UserGuideReadme.md               |    9 +-
 .../org/apache/iotdb/flink/FlinkIoTDBSink.java     |   26 +-
 .../RPC.md => example/mqtt-customize/README.md     |   45 +-
 example/mqtt-customize/pom.xml                     |   42 +
 .../server/CustomizedJsonPayloadFormatter.java     |   62 ++
 .../org.apache.iotdb.db.mqtt.PayloadFormatter      |    1 +
 example/pom.xml                                    |    1 +
 .../iotdb/HybridTimeseriesSessionExample.java      |  129 +++
 .../org/apache/iotdb/SessionConcurrentExample.java |  199 ++++
 .../main/java/org/apache/iotdb/SessionExample.java |   14 +
 .../iotdb/tsfile/TsFileWriteVectorWithTablet.java  |   89 +-
 .../java/org/apache/iotdb/flink/IoTDBSink.java     |   26 +-
 .../iotdb/flink/options/IoTDBSinkOptions.java      |   11 -
 ...tITCase.java => RowTSRecordOutputFormatIT.java} |    2 +-
 ...rmatITCase.java => RowTsFileInputFormatIT.java} |    2 +-
 pom.xml                                            |    2 +
 .../resources/conf/iotdb-engine.properties         |   24 +
 .../org/apache/iotdb/db/auth/AuthorityChecker.java |    1 +
 .../db/concurrent/IoTDBThreadPoolFactory.java      |   18 +
 .../org/apache/iotdb/db/concurrent/ThreadName.java |    5 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   50 +
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   43 +-
 .../apache/iotdb/db/cq/ContinuousQueryTask.java    |    6 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  183 ++--
 .../db/engine/cache/TimeSeriesMetadataCache.java   |   71 +-
 .../level/LevelCompactionTsFileManagement.java     |    2 +-
 .../iotdb/db/engine/memtable/AbstractMemTable.java |   15 +-
 .../apache/iotdb/db/engine/memtable/IMemTable.java |    2 +
 .../iotdb/db/engine/merge/task/MergeTask.java      |   10 +-
 .../selectinto/InsertTabletPlanGenerator.java      |  254 +++++
 .../selectinto/InsertTabletPlansIterator.java      |  139 +++
 .../engine/storagegroup/StorageGroupProcessor.java |  196 ++--
 .../db/engine/storagegroup/TsFileProcessor.java    |   10 +-
 .../db/engine/storagegroup/TsFileResource.java     |   10 +-
 .../storagegroup/timeindex/DeviceTimeIndex.java    |   13 +-
 .../storagegroup/timeindex/FileTimeIndex.java      |    5 +
 .../engine/storagegroup/timeindex/ITimeIndex.java  |    8 +
 .../storagegroup/timeindex/TimeIndexLevel.java     |   10 -
 .../virtualSg/VirtualStorageGroupManager.java      |  110 ++-
 .../db/engine/trigger/executor/TriggerEngine.java  |   16 +-
 .../engine/trigger/executor/TriggerExecutor.java   |    8 +-
 .../service/TriggerRegistrationService.java        |   10 +-
 .../exception/query/PathNumOverLimitException.java |   13 +-
 .../org/apache/iotdb/db/metadata/MManager.java     | 1004 ++++++--------------
 .../java/org/apache/iotdb/db/metadata/MTree.java   |  657 ++++++++-----
 .../org/apache/iotdb/db/metadata/MetaUtils.java    |   18 +-
 .../iotdb/db/metadata/MetadataOperationType.java   |    4 +
 .../db/metadata/{ => logfile}/MLogTxtWriter.java   |   71 +-
 .../iotdb/db/metadata/logfile/MLogWriter.java      |   24 +-
 .../db/metadata/{ => logfile}/TagLogFile.java      |    2 +-
 .../iotdb/db/metadata/mnode/EntityMNode.java       |  117 +++
 .../iotdb/db/metadata/mnode/IEntityMNode.java      |   55 ++
 .../org/apache/iotdb/db/metadata/mnode/IMNode.java |   82 ++
 .../iotdb/db/metadata/mnode/IMeasurementMNode.java |   58 ++
 .../mnode/IStorageGroupMNode.java}                 |   14 +-
 .../iotdb/db/metadata/mnode/InternalMNode.java     |  271 ++++++
 .../org/apache/iotdb/db/metadata/mnode/MNode.java  |  339 +------
 .../iotdb/db/metadata/mnode/MeasurementMNode.java  |  207 ++--
 ...roupMNode.java => StorageGroupEntityMNode.java} |   23 +-
 .../iotdb/db/metadata/mnode/StorageGroupMNode.java |   11 +-
 .../apache/iotdb/db/metadata/tag/TagManager.java   |  556 +++++++++++
 .../iotdb/db/metadata/template/Template.java       |   11 +-
 .../db/metadata/template/TemplateManager.java      |  141 +++
 .../org/apache/iotdb/db/monitor/StatMonitor.java   |    2 +-
 .../main/java/org/apache/iotdb/db/qp/Planner.java  |   51 +-
 .../apache/iotdb/db/qp/constant/SQLConstant.java   |    4 +
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |   73 +-
 .../org/apache/iotdb/db/qp/logical/Operator.java   |    6 +-
 .../iotdb/db/qp/logical/crud/QueryOperator.java    |  185 ++--
 .../db/qp/logical/crud/SelectIntoOperator.java     |  110 +++
 .../iotdb/db/qp/logical/sys/LoadFilesOperator.java |   25 +-
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |   38 +-
 .../db/qp/physical/crud/AlignByDevicePlan.java     |   57 +-
 .../iotdb/db/qp/physical/crud/InsertPlan.java      |   12 +-
 .../iotdb/db/qp/physical/crud/InsertRowPlan.java   |    3 +-
 .../iotdb/db/qp/physical/crud/MeasurementInfo.java |   75 ++
 .../iotdb/db/qp/physical/crud/SelectIntoPlan.java  |  113 +++
 ...emplatePlan.java => SetSchemaTemplatePlan.java} |   14 +-
 .../iotdb/db/qp/physical/sys/OperateFilePlan.java  |   23 +-
 ...tePlan.java => SetUsingSchemaTemplatePlan.java} |   16 +-
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |  191 ++--
 .../iotdb/db/qp/strategy/LogicalChecker.java       |    5 +
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    |   24 +-
 .../qp/strategy/optimizer/ConcatPathOptimizer.java |    8 +-
 .../qp/strategy/optimizer/ILogicalOptimizer.java   |    3 +-
 .../apache/iotdb/db/qp/utils/WildcardsRemover.java |   28 +-
 .../db/query/control/QueryResourceManager.java     |  100 +-
 .../iotdb/db/query/control/QueryTimeManager.java   |   14 +
 .../iotdb/db/query/control/SessionManager.java     |   10 +-
 .../apache/iotdb/db/query/control/TracingInfo.java |   85 ++
 .../iotdb/db/query/control/TracingManager.java     |  141 ++-
 .../db/query/dataset/AlignByDeviceDataSet.java     |   26 +-
 .../iotdb/db/query/executor/LastQueryExecutor.java |    6 +-
 .../iotdb/db/query/reader/series/SeriesReader.java |   99 +-
 .../reader/universal/DescPriorityMergeReader.java  |    5 +-
 .../reader/universal/PriorityMergeReader.java      |   13 +-
 .../apache/iotdb/db/rescon/MemTableManager.java    |    4 +
 .../apache/iotdb/db/rescon/TVListAllocator.java    |   12 +-
 .../java/org/apache/iotdb/db/service/IoTDB.java    |    4 +-
 .../org/apache/iotdb/db/service/RPCService.java    |    1 +
 .../org/apache/iotdb/db/service/ServiceType.java   |    7 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  313 +++---
 .../org/apache/iotdb/db/service/UpgradeSevice.java |    9 +-
 .../iotdb/db/service/thrift/ThriftService.java     |   18 +-
 .../db/service/thrift/ThriftServiceThread.java     |  220 ++++-
 .../iotdb/db/sync/receiver/SyncServerManager.java  |    7 +-
 .../apache/iotdb/db/tools/TsFileSketchTool.java    |  583 ++++++++----
 .../org/apache/iotdb/db/tools/mlog/MLogParser.java |   19 +-
 .../org/apache/iotdb/db/utils/SchemaUtils.java     |   21 +-
 .../iotdb/db/writelog/recover/LogReplayer.java     |    6 +-
 .../writelog/recover/TsFileRecoverPerformer.java   |   26 +-
 .../db/engine/memtable/MemTableFlushTaskTest.java  |   22 +-
 .../db/engine/memtable/MemTableTestUtils.java      |    5 +-
 .../db/engine/memtable/PrimitiveMemTableTest.java  |    3 +-
 .../storagegroup/StorageGroupProcessorTest.java    |   84 +-
 .../iotdb/db/engine/storagegroup/TTLTest.java      |    9 +-
 .../engine/storagegroup/TsFileProcessorTest.java   |   14 +-
 .../iotdb/db/integration/IoTDBAddSubDeviceIT.java  |   11 +-
 .../iotdb/db/integration/IoTDBAlignByDeviceIT.java |    3 +-
 .../org/apache/iotdb/db/integration/IoTDBAsIT.java |    2 +-
 .../db/integration/IoTDBAutoCreateSchemaIT.java    |    6 +-
 .../db/integration/IoTDBContinuousQueryIT.java     |  147 ++-
 .../db/integration/IoTDBCreateStorageGroupIT.java  |  128 +++
 .../db/integration/IoTDBCreateTimeseriesIT.java    |   34 +-
 .../db/integration/IoTDBInsertWithoutTimeIT.java   |  129 +++
 .../apache/iotdb/db/integration/IoTDBLastIT.java   |   22 +-
 .../db/integration/IoTDBLoadExternalTsfileIT.java  |  121 ++-
 .../iotdb/db/integration/IoTDBMetadataFetchIT.java |   20 +-
 .../db/integration/IoTDBQueryMemoryControlIT.java  |   20 +-
 .../iotdb/db/integration/IoTDBRestartIT.java       |   48 +
 .../iotdb/db/integration/IoTDBSelectIntoIT.java    |  617 ++++++++++++
 .../db/integration/IoTDBSequenceDataQueryIT.java   |   12 +-
 .../iotdb/db/integration/IoTDBSeriesReaderIT.java  |   11 +-
 .../iotdb/db/integration/IoTDBSimpleQueryIT.java   |   14 +-
 .../db/integration/IoTDBTriggerExecutionIT.java    |   43 +-
 .../iotdb/db/metadata/MManagerAdvancedTest.java    |   20 +-
 .../iotdb/db/metadata/MManagerBasicTest.java       |  170 ++--
 .../iotdb/db/metadata/MManagerImproveTest.java     |    8 +-
 .../org/apache/iotdb/db/metadata/MTreeTest.java    |  103 +-
 .../apache/iotdb/db/metadata/MetaUtilsTest.java    |   14 +-
 .../apache/iotdb/db/metadata/mnode/MNodeTest.java  |   41 +-
 .../iotdb/db/qp/logical/LogicalPlanSmallTest.java  |    2 +-
 .../iotdb/db/qp/physical/InsertRowPlanTest.java    |    8 +-
 .../iotdb/db/qp/physical/InsertTabletPlanTest.java |   14 +-
 .../iotdb/db/qp/physical/PhysicalPlanTest.java     |   83 +-
 .../iotdb/db/query/control/TracingManagerTest.java |    7 +-
 .../query/reader/series/SeriesReaderTestUtil.java  |    2 +-
 .../org/apache/iotdb/db/tools/MLogParserTest.java  |   71 +-
 .../iotdb/db/tools/TsFileSketchToolTest.java       |    4 +-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   11 +-
 .../iotdb/db/utils/TsFileRewriteToolTest.java      |    4 +-
 .../iotdb/db/writelog/recover/LogReplayerTest.java |    3 +-
 .../db/writelog/recover/SeqTsFileRecoverTest.java  |    2 +-
 .../apache/iotdb/session/IoTDBSessionSimpleIT.java |    6 +-
 .../apache/iotdb/session/IoTDBSessionVectorIT.java |  213 +++++
 .../java/org/apache/iotdb/session/SessionTest.java |    2 +-
 site/README.md                                     |    2 +-
 site/src/main/.vuepress/config.js                  |   31 +-
 site/src/main/.vuepress/theme/components/Page.vue  |   15 +-
 .../apache/iotdb/spark/db/EnvironmentUtils.java    |  111 ++-
 .../test/java/org/apache/iotdb/db/sql/Cases.java   |  235 ++---
 .../file/metadata/MetadataIndexConstructor.java    |   52 +-
 .../iotdb/tsfile/read/TsFileSequenceReader.java    |   17 +-
 .../org/apache/iotdb/tsfile/read/common/Path.java  |    4 +
 .../java/org/apache/iotdb/tsfile/utils/BitMap.java |    5 +
 .../apache/iotdb/tsfile/write/TsFileWriter.java    |   12 +-
 .../tsfile/write/chunk/ChunkGroupWriterImpl.java   |   77 +-
 .../tsfile/write/chunk/VectorChunkWriterImpl.java  |    5 +-
 .../tsfile/write/schema/IMeasurementSchema.java    |    5 +
 .../tsfile/write/schema/MeasurementSchema.java     |   12 +-
 .../apache/iotdb/tsfile/write/schema/Schema.java   |   28 +-
 .../write/schema/VectorMeasurementSchema.java      |   12 +-
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  |   10 +
 ...ateTest.java => DefaultSchemaTemplateTest.java} |    8 +-
 .../tsfile/write/MetadataIndexConstructorTest.java |  478 ++++++++++
 .../write/schema/converter/SchemaBuilderTest.java  |    4 +-
 .../write/writer/VectorChunkWriterImplTest.java    |   34 +-
 .../write/writer/VectorMeasurementSchemaStub.java  |   12 +-
 346 files changed, 12967 insertions(+), 7924 deletions(-)

diff --cc cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index cec70b2,adc4661..4fcb5a5
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@@ -22,33 -22,28 +22,40 @@@ import org.apache.iotdb.cluster.client.
  import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
  import org.apache.iotdb.cluster.config.ClusterConfig;
  import org.apache.iotdb.cluster.config.ClusterDescriptor;
++import org.apache.iotdb.cluster.coordinator.Coordinator;
  import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
  import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
++import org.apache.iotdb.cluster.metadata.CMManager;
++import org.apache.iotdb.cluster.metadata.MetaPuller;
  import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
  import org.apache.iotdb.cluster.partition.slot.SlotStrategy;
  import org.apache.iotdb.cluster.rpc.thrift.Node;
 -import org.apache.iotdb.cluster.server.MetaClusterServer;
 +import org.apache.iotdb.cluster.server.ClusterRPCService;
- import org.apache.iotdb.cluster.server.MetaClusterServer;
++import org.apache.iotdb.cluster.server.RaftTSMetaServiceImpl;
  import org.apache.iotdb.cluster.server.Response;
  import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer;
++import org.apache.iotdb.cluster.server.member.MetaGroupMember;
++import org.apache.iotdb.cluster.server.raft.MetaRaftHeartBeatService;
++import org.apache.iotdb.cluster.server.raft.MetaRaftService;
++import org.apache.iotdb.cluster.server.service.MetaAsyncService;
++import org.apache.iotdb.cluster.server.service.MetaSyncService;
  import org.apache.iotdb.cluster.utils.ClusterUtils;
 +import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor;
  import org.apache.iotdb.db.conf.IoTDBConfigCheck;
  import org.apache.iotdb.db.conf.IoTDBConstant;
  import org.apache.iotdb.db.conf.IoTDBDescriptor;
  import org.apache.iotdb.db.exception.StartupException;
  import org.apache.iotdb.db.exception.query.QueryProcessException;
 +import org.apache.iotdb.db.service.IoTDB;
 +import org.apache.iotdb.db.service.JMXService;
 +import org.apache.iotdb.db.service.RegisterManager;
++import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
  import org.apache.iotdb.db.utils.TestOnly;
--
  import org.apache.thrift.TException;
  import org.apache.thrift.async.TAsyncClientManager;
  import org.apache.thrift.protocol.TBinaryProtocol.Factory;
  import org.apache.thrift.protocol.TCompactProtocol;
  import org.apache.thrift.protocol.TProtocolFactory;
--import org.apache.thrift.transport.TTransportException;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -58,12 -53,9 +65,13 @@@ import java.util.Set
  
  import static org.apache.iotdb.cluster.utils.ClusterUtils.UNKNOWN_CLIENT_IP;
  
- //we do not inherent IoTDB instance, as it may break the singleton mode of IoTDB.
 -public class ClusterMain {
++// we do not inherent IoTDB instance, as it may break the singleton mode of IoTDB.
 +public class ClusterIoTDB {
  
 -  private static final Logger logger = LoggerFactory.getLogger(ClusterMain.class);
 +  private static final Logger logger = LoggerFactory.getLogger(ClusterIoTDB.class);
 +  private final String mbeanName =
-       String.format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE, "ClusterIoTDB");
++      String.format(
++          "%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE, "ClusterIoTDB");
  
    // establish the cluster as a seed
    private static final String MODE_START = "-s";
@@@ -73,13 -65,8 +81,27 @@@
    // metaport-of-removed-node
    private static final String MODE_REMOVE = "-r";
  
-   private MetaClusterServer metaServer;
 -  private static MetaClusterServer metaServer;
++  private MetaGroupMember metaGroupEngine;
++  private Node thisNode;
++  private Coordinator coordinator;
 +
 +  private IoTDB iotdb = IoTDB.getInstance();
 +
 +  // Cluster IoTDB uses a individual registerManager with its parent.
 +  private RegisterManager registerManager = new RegisterManager();
 +
++  private ClusterIoTDB() {
++    ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
++    thisNode = new Node();
++    // set internal rpc ip and ports
++    thisNode.setInternalIp(config.getInternalIp());
++    thisNode.setMetaPort(config.getInternalMetaPort());
++    thisNode.setDataPort(config.getInternalDataPort());
++    // set client rpc ip and ports
++    thisNode.setClientPort(config.getClusterRpcPort());
++    thisNode.setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress());
++  }
+ 
    public static void main(String[] args) {
      if (args.length < 1) {
        logger.error(
@@@ -115,18 -103,50 +137,18 @@@
      String mode = args[0];
      logger.info("Running mode {}", mode);
  
 +    ClusterIoTDB cluster = ClusterIoTDBHolder.INSTANCE;
-     //we start IoTDB kernel first.
-     cluster.iotdb.active();
++    // we start IoTDB kernel first.
++    // cluster.iotdb.active();
 +
-     //then we start the cluster module.
++    // then we start the cluster module.
      if (MODE_START.equals(mode)) {
 -      try {
 -        metaServer = new MetaClusterServer();
 -        startServerCheck();
 -        preStartCustomize();
 -        metaServer.start();
 -        metaServer.buildCluster();
 -        // Currently, we do not register ClusterInfoService as a JMX Bean,
 -        // so we use startService() rather than start()
 -        ClusterInfoServer.getInstance().startService();
 -      } catch (TTransportException
 -          | StartupException
 -          | QueryProcessException
 -          | StartUpCheckFailureException
 -          | ConfigInconsistentException e) {
 -        metaServer.stop();
 -        logger.error("Fail to start meta server", e);
 -      }
 +      cluster.activeStartNodeMode();
      } else if (MODE_ADD.equals(mode)) {
 -      try {
 -        long startTime = System.currentTimeMillis();
 -        metaServer = new MetaClusterServer();
 -        preStartCustomize();
 -        metaServer.start();
 -        metaServer.joinCluster();
 -        // Currently, we do not register ClusterInfoService as a JMX Bean,
 -        // so we use startService() rather than start()
 -        ClusterInfoServer.getInstance().startService();
 -
 -        logger.info(
 -            "Adding this node {} to cluster costs {} ms",
 -            metaServer.getMember().getThisNode(),
 -            (System.currentTimeMillis() - startTime));
 -      } catch (TTransportException
 -          | StartupException
 -          | QueryProcessException
 -          | StartUpCheckFailureException
 -          | ConfigInconsistentException e) {
 -        metaServer.stop();
 -        logger.error("Fail to join cluster", e);
 -      }
 +      cluster.activeAddNodeMode();
      } else if (MODE_REMOVE.equals(mode)) {
        try {
 -        doRemoveNode(args);
 +        cluster.doRemoveNode(args);
        } catch (IOException e) {
          logger.error("Fail to remove node in cluster", e);
        }
@@@ -135,61 -155,7 +157,94 @@@
      }
    }
  
 -  private static void startServerCheck() throws StartupException {
 +  public void activeStartNodeMode() {
 +    try {
-       metaServer = new MetaClusterServer();
 +      startServerCheck();
 +      preStartCustomize();
-       metaServer.start();
-       metaServer.buildCluster();
++
++      coordinator = new Coordinator();
++      // register MetaGroupMember. MetaGroupMember has the same position with "StorageEngine" in the
++      // cluster moduel.
++      // TODO fixme it is better to remove coordinator out of metaGroupEngine
++
++      // local engine
++      metaGroupEngine =
++          new MetaGroupMember(
++              ThriftServiceThread.getProtocolFactory(
++                  IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()),
++              thisNode,
++              coordinator);
++      IoTDB.setMetaManager(CMManager.getInstance());
++      ((CMManager) IoTDB.metaManager).setMetaGroupMember(metaGroupEngine);
++      ((CMManager) IoTDB.metaManager).setCoordinator(coordinator);
++      MetaPuller.getInstance().init(metaGroupEngine);
++      iotdb.active();
++
++      registerManager.register(metaGroupEngine);
++
++      metaGroupEngine.buildCluster();
++
++      // rpc service
++      if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
++        MetaAsyncService metaAsyncService = new MetaAsyncService(metaGroupEngine);
++        MetaRaftHeartBeatService.getInstance().initAsyncedServiceImpl(metaAsyncService);
++        MetaRaftService.getInstance().initAsyncedServiceImpl(metaAsyncService);
++      } else {
++        MetaSyncService syncService = new MetaSyncService(metaGroupEngine);
++        MetaRaftHeartBeatService.getInstance().initSyncedServiceImpl(syncService);
++        MetaRaftService.getInstance().initSyncedServiceImpl(syncService);
++      }
++
++      // meta group heart beat rpc
++      registerManager.register(MetaRaftHeartBeatService.getInstance());
++      registerManager.register(MetaRaftService.getInstance());
++
 +      // Currently, we do not register ClusterInfoService as a JMX Bean,
 +      // so we use startService() rather than start()
 +      ClusterInfoServer.getInstance().startService();
 +      // JMX based DBA API
 +      registerManager.register(ClusterMonitor.INSTANCE);
 +      // we must wait until the metaGroup established.
 +      // So that the ClusterRPCService can work.
 +      registerManager.register(ClusterRPCService.getInstance());
-     } catch (TTransportException
-         | StartupException
++    } catch (StartupException
 +        | QueryProcessException
 +        | StartUpCheckFailureException
 +        | ConfigInconsistentException e) {
 +      stop();
 +      logger.error("Fail to start meta server", e);
 +    }
 +  }
 +
 +  public void activeAddNodeMode() {
-     try {
-       long startTime = System.currentTimeMillis();
-       metaServer = new MetaClusterServer();
-       preStartCustomize();
-       metaServer.start();
-       metaServer.joinCluster();
-       // Currently, we do not register ClusterInfoService as a JMX Bean,
-       // so we use startService() rather than start()
-       ClusterInfoServer.getInstance().startService();
-       // JMX based DBA API
-       registerManager.register(ClusterMonitor.INSTANCE);
-       //finally, we start the RPC service
-       registerManager.register(ClusterRPCService.getInstance());
-       logger.info(
-           "Adding this node {} to cluster costs {} ms",
-           metaServer.getMember().getThisNode(),
-           (System.currentTimeMillis() - startTime));
-     } catch (TTransportException
-         | StartupException
-         | QueryProcessException
-         | StartUpCheckFailureException
-         | ConfigInconsistentException e) {
-       stop();
-       logger.error("Fail to join cluster", e);
-     }
++    //    try {
++    //      long startTime = System.currentTimeMillis();
++    //      metaServer = new RaftTSMetaServiceImpl();
++    //      preStartCustomize();
++    //      metaServer.start();
++    //      metaServer.joinCluster();
++    //      // Currently, we do not register ClusterInfoService as a JMX Bean,
++    //      // so we use startService() rather than start()
++    //      ClusterInfoServer.getInstance().startService();
++    //      // JMX based DBA API
++    //      registerManager.register(ClusterMonitor.INSTANCE);
++    //      // finally, we start the RPC service
++    //      registerManager.register(ClusterRPCService.getInstance());
++    //      logger.info(
++    //          "Adding this node {} to cluster costs {} ms",
++    //          metaServer.getMember().getThisNode(),
++    //          (System.currentTimeMillis() - startTime));
++    //    } catch (TTransportException
++    //        | StartupException
++    //        | QueryProcessException
++    //        | StartUpCheckFailureException
++    //        | ConfigInconsistentException e) {
++    //      stop();
++    //      logger.error("Fail to join cluster", e);
++    //    }
 +  }
 +
- 
 +  private void startServerCheck() throws StartupException {
      ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
      // check the initial replicateNum and refuse to start when the replicateNum <= 0
      if (config.getReplicationNum() <= 0) {
@@@ -236,18 -202,18 +291,12 @@@
      }
  
      // assert this node is in seed nodes list
--    Node localNode = new Node();
--    localNode
--        .setInternalIp(config.getInternalIp())
--        .setMetaPort(config.getInternalMetaPort())
--        .setDataPort(config.getInternalDataPort())
--        .setClientPort(config.getClusterRpcPort())
--        .setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress());
--    if (!seedNodes.contains(localNode)) {
++
++    if (!seedNodes.contains(thisNode)) {
        String message =
            String.format(
                "SeedNodes must contains local node in start-server mode. LocalNode: %s ,SeedNodes: %s",
--              localNode.toString(), config.getSeedNodeUrls());
++              thisNode.toString(), config.getSeedNodeUrls());
        throw new StartupException(metaServer.getMember().getName(), message);
      }
    }
@@@ -307,7 -273,7 +356,7 @@@
      }
    }
  
-   public MetaClusterServer getMetaServer() {
 -  public static MetaClusterServer getMetaServer() {
++  public RaftTSMetaServiceImpl getMetaServer() {
      return metaServer;
    }
  
@@@ -358,35 -324,8 +407,33 @@@
          });
    }
  
- 
- 
    @TestOnly
-   public void setMetaClusterServer(MetaClusterServer metaClusterServer) {
 -  public static void setMetaClusterServer(MetaClusterServer metaClusterServer) {
--    metaServer = metaClusterServer;
++  public void setMetaClusterServer(RaftTSMetaServiceImpl RaftTSMetaServiceImpl) {
++    metaServer = RaftTSMetaServiceImpl;
 +  }
 +
 +  public void stop() {
 +    deactivate();
 +  }
 +
 +  private void deactivate() {
 +    logger.info("Deactivating Cluster IoTDB...");
 +    metaServer.stop();
 +    registerManager.deregisterAll();
 +    JMXService.deregisterMBean(mbeanName);
 +    logger.info("ClusterIoTDB is deactivated.");
-     //stop the iotdb kernel
++    // stop the iotdb kernel
 +    iotdb.stop();
 +  }
 +
- 
 +  public static ClusterIoTDB getInstance() {
 +    return ClusterIoTDBHolder.INSTANCE;
 +  }
++
 +  private static class ClusterIoTDBHolder {
 +
 +    private static final ClusterIoTDB INSTANCE = new ClusterIoTDB();
 +
 +    private ClusterIoTDBHolder() {}
    }
  }
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
index b377b1a,b377b1a..f5b0a49
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java
@@@ -19,6 -19,6 +19,7 @@@
  package org.apache.iotdb.cluster.config;
  
  import org.apache.iotdb.cluster.rpc.thrift.Node;
++import org.apache.iotdb.cluster.server.RaftServer;
  import org.apache.iotdb.db.utils.TestOnly;
  
  public class ClusterConstant {
@@@ -67,4 -67,4 +68,55 @@@
    public static void setElectionRandomTimeOutMs(long electionRandomTimeOutMs) {
      ClusterConstant.electionRandomTimeOutMs = electionRandomTimeOutMs;
    }
++
++  private static int connectionTimeoutInMS =
++      ClusterDescriptor.getInstance().getConfig().getConnectionTimeoutInMS();
++  private static int readOperationTimeoutMS =
++      ClusterDescriptor.getInstance().getConfig().getReadOperationTimeoutMS();
++  private static int writeOperationTimeoutMS =
++      ClusterDescriptor.getInstance().getConfig().getWriteOperationTimeoutMS();
++  private static int syncLeaderMaxWaitMs = 20 * 1000;
++  private static long heartBeatIntervalMs = 1000L;
++
++  public static int getConnectionTimeoutInMS() {
++    return connectionTimeoutInMS;
++  }
++
++  public static void setConnectionTimeoutInMS(int connectionTimeoutInMS) {
++    ClusterConstant.connectionTimeoutInMS = connectionTimeoutInMS;
++  }
++
++  public static int getReadOperationTimeoutMS() {
++    return readOperationTimeoutMS;
++  }
++
++  public static int getWriteOperationTimeoutMS() {
++    return writeOperationTimeoutMS;
++  }
++
++  public static int getSyncLeaderMaxWaitMs() {
++    return syncLeaderMaxWaitMs;
++  }
++
++  public static void setSyncLeaderMaxWaitMs(int syncLeaderMaxWaitMs) {
++    ClusterConstant.syncLeaderMaxWaitMs = syncLeaderMaxWaitMs;
++  }
++
++  public static long getHeartBeatIntervalMs() {
++    return heartBeatIntervalMs;
++  }
++
++  public static void setHeartBeatIntervalMs(long heartBeatIntervalMs) {
++    ClusterConstant.heartBeatIntervalMs = heartBeatIntervalMs;
++  }
++
++  @TestOnly
++  public static void setReadOperationTimeoutMS(int readOperationTimeoutMS) {
++    RaftServer.readOperationTimeoutMS = readOperationTimeoutMS;
++  }
++
++  @TestOnly
++  public static void setWriteOperationTimeoutMS(int writeOperationTimeoutMS) {
++    RaftServer.writeOperationTimeoutMS = writeOperationTimeoutMS;
++  }
  }
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java
index 8c79ac5,5bbddec..b8a2c00
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java
@@@ -33,15 -26,21 +33,15 @@@ import org.apache.iotdb.db.service.thri
  import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
  import org.apache.iotdb.service.rpc.thrift.TSIService.Processor;
  
- public class ClusterRPCService  extends ThriftService implements ClusterRPCServiceMBean {
 -/** A service to handle jdbc request from client. */
 -public class RPCService extends ThriftService implements RPCServiceMBean {
 -
 -  private TSServiceImpl impl;
++public class ClusterRPCService extends ThriftService implements ClusterRPCServiceMBean {
  
 -  private RPCService() {}
 +  private ClusterTSServiceImpl impl;
  
 -  public static RPCService getInstance() {
 -    return RPCServiceHolder.INSTANCE;
 -  }
 +  private ClusterRPCService() {}
  
    @Override
 -  public int getRPCPort() {
 -    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 -    return config.getRpcPort();
 +  public ThriftService getImplementation() {
 +    return ClusterRPCServiceHolder.INSTANCE;
    }
  
    @Override
@@@ -50,9 -49,12 +50,13 @@@
    }
  
    @Override
--  public void initTProcessor()
-       throws IllegalAccessException, InstantiationException {
-     impl = new ClusterTSServiceImpl();
 -      throws ClassNotFoundException, IllegalAccessException, InstantiationException {
 -    impl =
 -        (TSServiceImpl)
 -            Class.forName(IoTDBDescriptor.getInstance().getConfig().getRpcImplClassName())
 -                .newInstance();
++  public void initTProcessor() throws IllegalAccessException, InstantiationException {
++    try {
++      impl = new ClusterTSServiceImpl();
++      initSyncedServiceImpl(null);
++    } catch (QueryProcessException e) {
++      throw new InstantiationException(e.getMessage());
++    }
      processor = new Processor<>(impl);
    }
  
@@@ -64,9 -66,9 +68,9 @@@
            new ThriftServiceThread(
                processor,
                getID().getName(),
 -              ThreadName.RPC_CLIENT.getName(),
 -              config.getRpcAddress(),
 -              config.getRpcPort(),
 +              ThreadName.CLUSTER_RPC_CLIENT.getName(),
-               config.getRpcAddress(),
-               config.getRpcPort(),
++              getBindIP(),
++              getBindPort(),
                config.getRpcMaxConcurrentClientNum(),
                config.getThriftServerAwaitTimeForStopService(),
                new RPCServiceThriftHandler(impl),
@@@ -88,27 -90,14 +92,26 @@@
    }
  
    @Override
 -  public ServiceType getID() {
 -    return ServiceType.RPC_SERVICE;
 +  public int getRPCPort() {
 +    return getBindPort();
 +  }
 +
 +  public static ClusterRPCService getInstance() {
 +    return ClusterRPCServiceHolder.INSTANCE;
 +  }
 +
 +  public void assignExecutorToServiceImpl(MetaGroupMember member) throws QueryProcessException {
 +    this.impl.setExecutor(member);
 +  }
 +
 +  public void assignCoordinator(Coordinator coordinator) {
 +    this.impl.setCoordinator(coordinator);
    }
  
- 
 -  private static class RPCServiceHolder {
 +  private static class ClusterRPCServiceHolder {
  
 -    private static final RPCService INSTANCE = new RPCService();
 +    private static final ClusterRPCService INSTANCE = new ClusterRPCService();
  
 -    private RPCServiceHolder() {}
 +    private ClusterRPCServiceHolder() {}
    }
  }
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java
index e43e503,0000000..f1fd75a
mode 100644,000000..100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java
@@@ -1,237 -1,0 +1,161 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.cluster.server;
 +
 +import org.apache.iotdb.cluster.client.async.AsyncDataClient;
 +import org.apache.iotdb.cluster.client.sync.SyncDataClient;
++import org.apache.iotdb.cluster.config.ClusterConstant;
 +import org.apache.iotdb.cluster.config.ClusterDescriptor;
 +import org.apache.iotdb.cluster.coordinator.Coordinator;
- import org.apache.iotdb.cluster.metadata.CMManager;
 +import org.apache.iotdb.cluster.query.ClusterPlanExecutor;
- import org.apache.iotdb.cluster.query.ClusterPlanner;
 +import org.apache.iotdb.cluster.query.RemoteQueryContext;
 +import org.apache.iotdb.cluster.rpc.thrift.Node;
 +import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 +import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
 +import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 +import org.apache.iotdb.db.exception.StorageEngineException;
- import org.apache.iotdb.db.exception.metadata.MetadataException;
 +import org.apache.iotdb.db.exception.query.QueryProcessException;
- import org.apache.iotdb.db.metadata.PartialPath;
 +import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 +import org.apache.iotdb.db.query.context.QueryContext;
- import org.apache.iotdb.db.service.IoTDB;
 +import org.apache.iotdb.db.service.TSServiceImpl;
 +import org.apache.iotdb.rpc.RpcUtils;
 +import org.apache.iotdb.rpc.TSStatusCode;
 +import org.apache.iotdb.service.rpc.thrift.TSStatus;
- import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
- 
 +import org.apache.thrift.TException;
- import org.apache.thrift.protocol.TProtocol;
- import org.apache.thrift.server.ServerContext;
- import org.apache.thrift.server.TServerEventHandler;
- import org.apache.thrift.transport.TTransport;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.IOException;
- import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.atomic.AtomicReference;
 +
 +/**
-  * ClusterTSServiceImpl is the cluster version of TSServiceImpl, which is responsible for the processing of
-  * the user requests (sqls and session api). It inherits the basic procedures from TSServiceImpl,
-  * but redirect the queries of data and metadata to a MetaGroupMember of the local node.
++ * ClusterTSServiceImpl is the cluster version of TSServiceImpl, which is responsible for the
++ * processing of the user requests (sqls and session api). It inherits the basic procedures from
++ * TSServiceImpl, but redirect the queries of data and metadata to a MetaGroupMember of the local
++ * node.
 + */
 +public class ClusterTSServiceImpl extends TSServiceImpl {
 +
 +  private static final Logger logger = LoggerFactory.getLogger(ClusterTSServiceImpl.class);
 +  /**
-    * The Coordinator of the local node. Through this node queries data and meta from
-    * the cluster and performs data manipulations to the cluster.
++   * The Coordinator of the local node. Through this node queries data and meta from the cluster and
++   * performs data manipulations to the cluster.
 +   */
 +  private Coordinator coordinator;
 +
- 
- //  /**
- //   * Using the poolServer, ClusterTSServiceImpl will listen to a socket to accept thrift requests like an
- //   * HttpServer.
- //   */
- //  private TServer poolServer;
- //
- //  /** The socket poolServer will listen to. Async service requires nonblocking socket */
- //  private TServerTransport serverTransport;
- 
 +  /**
 +   * queryId -> queryContext map. When a query ends either normally or accidentally, the resources
 +   * used by the query can be found in the context and then released.
 +   */
 +  private Map<Long, RemoteQueryContext> queryContextMap = new ConcurrentHashMap<>();
 +
-   public ClusterTSServiceImpl(MetaGroupMember metaGroupMember) throws QueryProcessException {
-     super();
-     this.processor = new ClusterPlanner();
-   }
++  public ClusterTSServiceImpl() throws QueryProcessException {}
 +
 +  public void setExecutor(MetaGroupMember metaGroupMember) throws QueryProcessException {
-     this.executor = new ClusterPlanExecutor(metaGroupMember);
++    executor = new ClusterPlanExecutor(metaGroupMember);
 +  }
 +
 +  public void setCoordinator(Coordinator coordinator) {
 +    this.coordinator = coordinator;
 +  }
 +
- 
 +  /**
 +   * Redirect the plan to the local Coordinator so that it will be processed cluster-wide.
 +   *
 +   * @param plan
 +   * @return
 +   */
 +  @Override
 +  protected TSStatus executeNonQueryPlan(PhysicalPlan plan) {
 +    try {
 +      plan.checkIntegrity();
 +    } catch (QueryProcessException e) {
 +      logger.warn("Illegal plan detected: {}", plan);
 +      return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
 +    }
 +
 +    return coordinator.executeNonQueryPlan(plan);
 +  }
 +
 +  /**
-    * EventHandler handles the preprocess and postprocess of the thrift requests, but it currently
-    * only release resources when a client disconnects.
-    */
-   class EventHandler implements TServerEventHandler {
- 
-     @Override
-     public void preServe() {
-       // do nothing
-     }
- 
-     @Override
-     public ServerContext createContext(TProtocol input, TProtocol output) {
-       return null;
-     }
- 
-     @Override
-     public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) {
-       ClusterTSServiceImpl.this.handleClientExit();
-     }
- 
-     @Override
-     public void processContext(
-         ServerContext serverContext, TTransport inputTransport, TTransport outputTransport) {
-       // do nothing
-     }
-   }
- 
-   /**
-    * Get the data types of each path in “paths”. If "aggregations" is not null, then it should be
-    * corresponding to "paths" one to one and the data type will be the type of the aggregation over
-    * the corresponding path.
-    *
-    * @param paths full timeseries paths
-    * @param aggregations if not null, it should be the same size as "paths"
-    * @return the data types of "paths" (using the aggregations)
-    * @throws MetadataException
-    */
-   @Override
-   protected List<TSDataType> getSeriesTypesByPaths(
-       List<PartialPath> paths, List<String> aggregations) throws MetadataException {
-     return ((CMManager) IoTDB.metaManager).getSeriesTypesByPath(paths, aggregations).left;
-   }
- 
-   /**
-    * Get the data types of each path in “paths”. If "aggregation" is not null, all "paths" will use
-    * this aggregation.
-    *
-    * @param paths full timeseries paths
-    * @param aggregation if not null, it means "paths" all use this aggregation
-    * @return the data types of "paths" (using the aggregation)
-    * @throws MetadataException
-    */
-   protected List<TSDataType> getSeriesTypesByString(List<PartialPath> paths, String aggregation)
-       throws MetadataException {
-     return ((CMManager) IoTDB.metaManager).getSeriesTypesByPaths(paths, aggregation).left;
-   }
- 
-   /**
 +   * Generate and cache a QueryContext using "queryId". In the distributed version, the QueryContext
 +   * is a RemoteQueryContext.
 +   *
 +   * @param queryId
 +   * @return a RemoteQueryContext using queryId
 +   */
 +  @Override
 +  protected QueryContext genQueryContext(long queryId, boolean debug) {
 +    RemoteQueryContext context = new RemoteQueryContext(queryId, debug);
 +    queryContextMap.put(queryId, context);
 +    return context;
 +  }
 +
 +  /**
 +   * Release the local and remote resources used by a query.
 +   *
 +   * @param queryId
 +   * @throws StorageEngineException
 +   */
 +  @Override
 +  protected void releaseQueryResource(long queryId) throws StorageEngineException {
 +    // release resources locally
 +    super.releaseQueryResource(queryId);
 +    // release resources remotely
 +    RemoteQueryContext context = queryContextMap.remove(queryId);
 +    if (context != null) {
 +      // release the resources in every queried node
 +      for (Entry<RaftNode, Set<Node>> headerEntry : context.getQueriedNodesMap().entrySet()) {
 +        RaftNode header = headerEntry.getKey();
 +        Set<Node> queriedNodes = headerEntry.getValue();
 +
 +        for (Node queriedNode : queriedNodes) {
 +          GenericHandler<Void> handler = new GenericHandler<>(queriedNode, new AtomicReference<>());
 +          try {
 +            if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
 +              AsyncDataClient client =
 +                  coordinator.getAsyncDataClient(
-                       queriedNode, RaftServer.getReadOperationTimeoutMS());
++                      queriedNode, ClusterConstant.getReadOperationTimeoutMS());
 +              client.endQuery(header, coordinator.getThisNode(), queryId, handler);
 +            } else {
 +              try (SyncDataClient syncDataClient =
 +                  coordinator.getSyncDataClient(
-                       queriedNode, RaftServer.getReadOperationTimeoutMS())) {
-                 syncDataClient.endQuery(header, coordinator.getThisNode(), queryId);
++                      queriedNode, ClusterConstant.getReadOperationTimeoutMS())) {
++                try {
++                  syncDataClient.endQuery(header, coordinator.getThisNode(), queryId);
++                } catch (TException e) {
++                  // the connection may be broken, close it to avoid it being reused
++                  syncDataClient.getInputProtocol().getTransport().close();
++                  throw e;
++                }
 +              }
 +            }
 +          } catch (IOException | TException e) {
 +            logger.error("Cannot end query {} in {}", queryId, queriedNode);
 +          }
 +        }
 +      }
 +    }
 +  }
 +}
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index b0f8a25,fce7a87..0000000
deleted file mode 100644,100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ /dev/null
@@@ -1,369 -1,378 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing,
-- * software distributed under the License is distributed on an
-- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- * KIND, either express or implied.  See the License for the
-- * specific language governing permissions and limitations
-- * under the License.
-- */
--package org.apache.iotdb.cluster.server;
--
- import org.apache.iotdb.cluster.ClusterIoTDB;
--import org.apache.iotdb.cluster.config.ClusterDescriptor;
--import org.apache.iotdb.cluster.coordinator.Coordinator;
--import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
--import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
--import org.apache.iotdb.cluster.metadata.CMManager;
--import org.apache.iotdb.cluster.metadata.MetaPuller;
--import org.apache.iotdb.cluster.rpc.thrift.*;
--import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncProcessor;
--import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Processor;
--import org.apache.iotdb.cluster.server.heartbeat.MetaHeartbeatServer;
--import org.apache.iotdb.cluster.server.member.MetaGroupMember;
--import org.apache.iotdb.cluster.server.service.MetaAsyncService;
--import org.apache.iotdb.cluster.server.service.MetaSyncService;
--import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor;
--import org.apache.iotdb.db.exception.StartupException;
--import org.apache.iotdb.db.exception.query.QueryProcessException;
--import org.apache.iotdb.db.service.IoTDB;
--import org.apache.iotdb.db.service.RegisterManager;
--import org.apache.iotdb.db.utils.TestOnly;
--import org.apache.iotdb.service.rpc.thrift.TSStatus;
--
--import org.apache.thrift.TException;
--import org.apache.thrift.TProcessor;
--import org.apache.thrift.async.AsyncMethodCallback;
--import org.apache.thrift.transport.TNonblockingServerSocket;
--import org.apache.thrift.transport.TServerSocket;
--import org.apache.thrift.transport.TServerTransport;
--import org.apache.thrift.transport.TTransportException;
--import org.slf4j.Logger;
--import org.slf4j.LoggerFactory;
--
--import java.net.InetSocketAddress;
--import java.nio.ByteBuffer;
--
--/**
-- * MetaCluster manages the whole cluster's metadata, such as what nodes are in the cluster and the
-- * data partition. Each node has one MetaClusterServer instance, the single-node IoTDB instance is
-- * started-up at the same time.
-- */
--public class MetaClusterServer extends RaftServer
--    implements TSMetaService.AsyncIface, TSMetaService.Iface {
--  private static Logger logger = LoggerFactory.getLogger(MetaClusterServer.class);
--
--  // each node only contains one MetaGroupMember
--  private MetaGroupMember member;
--  private Coordinator coordinator;
- 
- 
 -  // the single-node IoTDB instance
 -  private IoTDB ioTDB;
 -  // to register the ClusterMonitor that helps monitoring the cluster
 -  private RegisterManager registerManager = new RegisterManager();
--  private MetaAsyncService asyncService;
--  private MetaSyncService syncService;
--  private MetaHeartbeatServer metaHeartbeatServer;
--
--  public MetaClusterServer() throws QueryProcessException {
--    super();
--    metaHeartbeatServer = new MetaHeartbeatServer(thisNode, this);
--    coordinator = new Coordinator();
--    member = new MetaGroupMember(protocolFactory, thisNode, coordinator);
--    coordinator.setMetaGroupMember(member);
--    asyncService = new MetaAsyncService(member);
--    syncService = new MetaSyncService(member);
--    MetaPuller.getInstance().init(member);
--  }
--
--  /**
--   * Besides the standard RaftServer start-up, the IoTDB instance, a MetaGroupMember and the
--   * ClusterMonitor are also started.
--   *
--   * @throws TTransportException
--   * @throws StartupException
--   */
--  @Override
--  public void start() throws TTransportException, StartupException {
--    super.start();
--    metaHeartbeatServer.start();
- 
 -    ioTDB = new IoTDB();
--    IoTDB.setMetaManager(CMManager.getInstance());
--    ((CMManager) IoTDB.metaManager).setMetaGroupMember(member);
--    ((CMManager) IoTDB.metaManager).setCoordinator(coordinator);
-     //TODO FIXME move this out of MetaClusterServer
-     IoTDB.getInstance().active();
- 
 -    ioTDB.active();
--    member.start();
- 
 -    // JMX based DBA API
 -    registerManager.register(ClusterMonitor.INSTANCE);
--  }
--
--  /** Also stops the IoTDB instance, the MetaGroupMember and the ClusterMonitor. */
--  @Override
--  public void stop() {
- 
 -    if (ioTDB == null) {
 -      return;
 -    }
--    metaHeartbeatServer.stop();
--    super.stop();
 -    ioTDB.stop();
 -    ioTDB = null;
--    member.stop();
 -    registerManager.deregisterAll();
--  }
--
--  /** Build a initial cluster with other nodes on the seed list. */
--  public void buildCluster() throws ConfigInconsistentException, StartUpCheckFailureException {
--    member.buildCluster();
--  }
--
--  /**
--   * Pick up a node from the seed list and send a join request to it.
--   *
--   * @return whether the node has joined the cluster.
--   */
--  public void joinCluster() throws ConfigInconsistentException, StartUpCheckFailureException {
--    member.joinCluster();
--  }
--
--  /**
--   * MetaClusterServer uses the meta port to create the socket.
--   *
--   * @return the TServerTransport
--   * @throws TTransportException if create the socket fails
--   */
--  @Override
--  TServerTransport getServerSocket() throws TTransportException {
--    logger.info(
--        "[{}] Cluster node will listen {}:{}",
--        getServerClientName(),
--        config.getInternalIp(),
--        config.getInternalMetaPort());
--    if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
--      return new TNonblockingServerSocket(
--          new InetSocketAddress(config.getInternalIp(), config.getInternalMetaPort()),
--          getConnectionTimeoutInMS());
--    } else {
--      return new TServerSocket(
--          new InetSocketAddress(config.getInternalIp(), config.getInternalMetaPort()));
--    }
--  }
--
--  @Override
--  String getClientThreadPrefix() {
--    return "MetaClientThread-";
--  }
--
--  @Override
--  String getServerClientName() {
--    return "MetaServerThread-";
--  }
--
--  @Override
--  TProcessor getProcessor() {
--    if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
--      return new AsyncProcessor<>(this);
--    } else {
--      return new Processor<>(this);
--    }
--  }
--
--  // Request forwarding. There is only one MetaGroupMember each node, so all requests will be
--  // directly sent to that member. See the methods in MetaGroupMember for details
--
--  @Override
--  public void addNode(Node node, StartUpStatus startUpStatus, AsyncMethodCallback resultHandler) {
--    asyncService.addNode(node, startUpStatus, resultHandler);
--  }
--
--  @Override
--  public void sendHeartbeat(HeartBeatRequest request, AsyncMethodCallback resultHandler) {
--    asyncService.sendHeartbeat(request, resultHandler);
--  }
--
--  @Override
--  public void startElection(ElectionRequest electionRequest, AsyncMethodCallback resultHandler) {
--    asyncService.startElection(electionRequest, resultHandler);
--  }
--
--  @Override
--  public void appendEntries(AppendEntriesRequest request, AsyncMethodCallback resultHandler) {
--    asyncService.appendEntries(request, resultHandler);
--  }
--
--  @Override
--  public void appendEntry(AppendEntryRequest request, AsyncMethodCallback resultHandler) {
--    asyncService.appendEntry(request, resultHandler);
--  }
--
--  @Override
--  public void sendSnapshot(SendSnapshotRequest request, AsyncMethodCallback resultHandler) {
--    asyncService.sendSnapshot(request, resultHandler);
--  }
--
--  @Override
--  public void executeNonQueryPlan(
--      ExecutNonQueryReq request, AsyncMethodCallback<TSStatus> resultHandler) {
--    asyncService.executeNonQueryPlan(request, resultHandler);
--  }
--
--  @Override
--  public void requestCommitIndex(
--      RaftNode header, AsyncMethodCallback<RequestCommitIndexResponse> resultHandler) {
--    asyncService.requestCommitIndex(header, resultHandler);
--  }
--
--  @Override
--  public void checkAlive(AsyncMethodCallback<Node> resultHandler) {
--    asyncService.checkAlive(resultHandler);
--  }
--
--  @Override
--  public void collectMigrationStatus(AsyncMethodCallback<ByteBuffer> resultHandler) {
--    asyncService.collectMigrationStatus(resultHandler);
--  }
--
--  @Override
--  public void readFile(
--      String filePath, long offset, int length, AsyncMethodCallback<ByteBuffer> resultHandler) {
--    asyncService.readFile(filePath, offset, length, resultHandler);
--  }
--
--  @Override
--  public void queryNodeStatus(AsyncMethodCallback<TNodeStatus> resultHandler) {
--    asyncService.queryNodeStatus(resultHandler);
--  }
--
--  public MetaGroupMember getMember() {
--    return member;
--  }
--
--  @Override
--  public void checkStatus(
--      StartUpStatus startUpStatus, AsyncMethodCallback<CheckStatusResponse> resultHandler) {
--    asyncService.checkStatus(startUpStatus, resultHandler);
--  }
--
--  @Override
--  public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) {
--    asyncService.removeNode(node, resultHandler);
--  }
--
--  @Override
--  public void exile(ByteBuffer removeNodeLog, AsyncMethodCallback<Void> resultHandler) {
--    asyncService.exile(removeNodeLog, resultHandler);
--  }
--
--  @Override
--  public void matchTerm(
--      long index, long term, RaftNode header, AsyncMethodCallback<Boolean> resultHandler) {
--    asyncService.matchTerm(index, term, header, resultHandler);
--  }
--
--  @Override
--  public AddNodeResponse addNode(Node node, StartUpStatus startUpStatus) throws TException {
--    return syncService.addNode(node, startUpStatus);
--  }
--
--  @Override
--  public CheckStatusResponse checkStatus(StartUpStatus startUpStatus) {
--    return syncService.checkStatus(startUpStatus);
--  }
--
--  @Override
--  public long removeNode(Node node) throws TException {
--    return syncService.removeNode(node);
--  }
--
--  @Override
--  public void exile(ByteBuffer removeNodeLog) {
--    syncService.exile(removeNodeLog);
--  }
--
--  @Override
--  public TNodeStatus queryNodeStatus() {
--    return syncService.queryNodeStatus();
--  }
--
--  @Override
--  public Node checkAlive() {
--    return syncService.checkAlive();
--  }
--
--  @Override
--  public ByteBuffer collectMigrationStatus() {
--    return syncService.collectMigrationStatus();
--  }
--
--  @Override
--  public HeartBeatResponse sendHeartbeat(HeartBeatRequest request) {
--    return syncService.sendHeartbeat(request);
--  }
--
--  @Override
--  public long startElection(ElectionRequest request) {
--    return syncService.startElection(request);
--  }
--
--  @Override
--  public long appendEntries(AppendEntriesRequest request) throws TException {
--    return syncService.appendEntries(request);
--  }
--
--  @Override
--  public long appendEntry(AppendEntryRequest request) throws TException {
--    return syncService.appendEntry(request);
--  }
--
--  @Override
--  public void sendSnapshot(SendSnapshotRequest request) throws TException {
--    syncService.sendSnapshot(request);
--  }
--
--  @Override
--  public TSStatus executeNonQueryPlan(ExecutNonQueryReq request) throws TException {
--    return syncService.executeNonQueryPlan(request);
--  }
--
--  @Override
--  public RequestCommitIndexResponse requestCommitIndex(RaftNode header) throws TException {
--    return syncService.requestCommitIndex(header);
--  }
--
--  @Override
--  public ByteBuffer readFile(String filePath, long offset, int length) throws TException {
--    return syncService.readFile(filePath, offset, length);
--  }
--
--  @Override
--  public boolean matchTerm(long index, long term, RaftNode header) {
--    return syncService.matchTerm(index, term, header);
--  }
--
--  @Override
--  public void removeHardLink(String hardLinkPath) throws TException {
--    syncService.removeHardLink(hardLinkPath);
--  }
--
--  @Override
--  public void removeHardLink(String hardLinkPath, AsyncMethodCallback<Void> resultHandler) {
--    asyncService.removeHardLink(hardLinkPath, resultHandler);
--  }
--
--  @Override
--  public void handshake(Node sender) {
--    syncService.handshake(sender);
--  }
--
--  @Override
--  public void handshake(Node sender, AsyncMethodCallback<Void> resultHandler) {
--    asyncService.handshake(sender, resultHandler);
--  }
--
--  @TestOnly
--  public void setMetaGroupMember(MetaGroupMember metaGroupMember) {
--    this.member = metaGroupMember;
--  }
--
 -  @TestOnly
 -  public IoTDB getIoTDB() {
 -    return ioTDB;
 -  }
--}
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer2.java
index 0000000,0000000..65db372
new file mode 100644
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer2.java
@@@ -1,0 -1,0 +1,372 @@@
++/// *
++// * Licensed to the Apache Software Foundation (ASF) under one
++// * or more contributor license agreements.  See the NOTICE file
++// * distributed with this work for additional information
++// * regarding copyright ownership.  The ASF licenses this file
++// * to you under the Apache License, Version 2.0 (the
++// * "License"); you may not use this file except in compliance
++// * with the License.  You may obtain a copy of the License at
++// *
++// *     http://www.apache.org/licenses/LICENSE-2.0
++// *
++// * Unless required by applicable law or agreed to in writing,
++// * software distributed under the License is distributed on an
++// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++// * KIND, either express or implied.  See the License for the
++// * specific language governing permissions and limitations
++// * under the License.
++// */
++// package org.apache.iotdb.cluster.server;
++//
++// import org.apache.iotdb.cluster.config.ClusterDescriptor;
++// import org.apache.iotdb.cluster.coordinator.Coordinator;
++// import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
++// import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
++// import org.apache.iotdb.cluster.metadata.CMManager;
++// import org.apache.iotdb.cluster.metadata.MetaPuller;
++// import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
++// import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
++// import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
++// import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
++// import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
++// import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
++// import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
++// import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
++// import org.apache.iotdb.cluster.rpc.thrift.Node;
++// import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
++// import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse;
++// import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
++// import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus;
++// import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
++// import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
++// import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncProcessor;
++// import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Processor;
++// import org.apache.iotdb.cluster.server.heartbeat.MetaHeartbeatServer;
++// import org.apache.iotdb.cluster.server.member.MetaGroupMember;
++// import org.apache.iotdb.cluster.server.service.MetaAsyncService;
++// import org.apache.iotdb.cluster.server.service.MetaSyncService;
++// import org.apache.iotdb.db.exception.StartupException;
++// import org.apache.iotdb.db.exception.query.QueryProcessException;
++// import org.apache.iotdb.db.service.IoTDB;
++// import org.apache.iotdb.db.utils.TestOnly;
++// import org.apache.iotdb.service.rpc.thrift.TSStatus;
++// import org.apache.thrift.TException;
++// import org.apache.thrift.async.AsyncMethodCallback;
++// import org.apache.thrift.transport.TNonblockingServerSocket;
++// import org.slf4j.Logger;
++// import org.slf4j.LoggerFactory;
++//
++// import java.net.InetSocketAddress;
++// import java.nio.ByteBuffer;
++//
++/// **
++// * MetaCluster manages the whole cluster's metadata, such as what nodes are in the cluster and the
++// * data partition. Each node has one MetaClusterServer instance, the single-node IoTDB instance is
++// * started-up at the same time.
++// */
++// public class MetaClusterServer2 extends RaftServer
++//    implements TSMetaService.AsyncIface, TSMetaService.Iface {
++//  private static Logger logger = LoggerFactory.getLogger(MetaClusterServer2.class);
++//
++//  // each node only contains one MetaGroupMember
++//  private MetaGroupMember member;
++//  private Coordinator coordinator;
++//
++//  private MetaAsyncService asyncService;
++//  private MetaSyncService syncService;
++//  private MetaHeartbeatServer metaHeartbeatServer;
++//
++//  public MetaClusterServer2() throws QueryProcessException {
++//    super();
++//    metaHeartbeatServer = new MetaHeartbeatServer(thisNode, this);
++//    coordinator = new Coordinator();
++//    member = new MetaGroupMember(protocolFactory, thisNode, coordinator);
++//    coordinator.setMetaGroupMember(member);
++//    asyncService = new MetaAsyncService(member);
++//    syncService = new MetaSyncService(member);
++//    MetaPuller.getInstance().init(member);
++//  }
++//
++//  /**
++//   * Besides the standard RaftServer start-up, the IoTDB instance, a MetaGroupMember and the
++//   * ClusterMonitor are also started.
++//   *
++//   * @throws TTransportException
++//   * @throws StartupException
++//   */
++//  @Override
++//  public void start() throws TTransportException, StartupException {
++//    super.start();
++//    metaHeartbeatServer.start();
++//
++//    IoTDB.setMetaManager(CMManager.getInstance());
++//    ((CMManager) IoTDB.metaManager).setMetaGroupMember(member);
++//    ((CMManager) IoTDB.metaManager).setCoordinator(coordinator);
++//    // TODO FIXME move this out of MetaClusterServer
++//    IoTDB.getInstance().active();
++//
++//    member.start();
++//  }
++//
++//  /** Also stops the IoTDB instance, the MetaGroupMember and the ClusterMonitor. */
++//  @Override
++//  public void stop() {
++//
++//    metaHeartbeatServer.stop();
++//    super.stop();
++//    member.stop();
++//  }
++//
++//  /** Build a initial cluster with other nodes on the seed list. */
++//  public void buildCluster() throws ConfigInconsistentException, StartUpCheckFailureException {
++//    member.buildCluster();
++//  }
++//
++//  /**
++//   * Pick up a node from the seed list and send a join request to it.
++//   *
++//   * @return whether the node has joined the cluster.
++//   */
++//  public void joinCluster() throws ConfigInconsistentException, StartUpCheckFailureException {
++//    member.joinCluster();
++//  }
++//
++//  /**
++//   * MetaClusterServer uses the meta port to create the socket.
++//   *
++//   * @return the TServerTransport
++//   * @throws TTransportException if create the socket fails
++//   */
++//  @Override
++//  TServerTransport getServerSocket() throws TTransportException {
++//    logger.info(
++//        "[{}] Cluster node will listen {}:{}",
++//        getServerClientName(),
++//        config.getInternalIp(),
++//        config.getInternalMetaPort());
++//    if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
++//      return new TNonblockingServerSocket(
++//          new InetSocketAddress(config.getInternalIp(), config.getInternalMetaPort()),
++//          getConnectionTimeoutInMS());
++//    } else {
++//      return new TServerSocket(
++//          new InetSocketAddress(config.getInternalIp(), config.getInternalMetaPort()));
++//    }
++//  }
++//
++//  @Override
++//  String getClientThreadPrefix() {
++//    return "MetaClientThread-";
++//  }
++//
++//  @Override
++//  String getServerClientName() {
++//    return "MetaServerThread-";
++//  }
++//
++//  @Override
++//  TProcessor getProcessor() {
++//    if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
++//      return new AsyncProcessor<>(this);
++//    } else {
++//      return new Processor<>(this);
++//    }
++//  }
++//
++//  // Request forwarding. There is only one MetaGroupMember each node, so all requests will be
++//  // directly sent to that member. See the methods in MetaGroupMember for details
++//
++//  @Override
++//  public void addNode(Node node, StartUpStatus startUpStatus, AsyncMethodCallback resultHandler) {
++//    asyncService.addNode(node, startUpStatus, resultHandler);
++//  }
++//
++//  @Override
++//  public void sendHeartbeat(HeartBeatRequest request, AsyncMethodCallback resultHandler) {
++//    asyncService.sendHeartbeat(request, resultHandler);
++//  }
++//
++//  @Override
++//  public void startElection(ElectionRequest electionRequest, AsyncMethodCallback resultHandler) {
++//    asyncService.startElection(electionRequest, resultHandler);
++//  }
++//
++//  @Override
++//  public void appendEntries(AppendEntriesRequest request, AsyncMethodCallback resultHandler) {
++//    asyncService.appendEntries(request, resultHandler);
++//  }
++//
++//  @Override
++//  public void appendEntry(AppendEntryRequest request, AsyncMethodCallback resultHandler) {
++//    asyncService.appendEntry(request, resultHandler);
++//  }
++//
++//  @Override
++//  public void sendSnapshot(SendSnapshotRequest request, AsyncMethodCallback resultHandler) {
++//    asyncService.sendSnapshot(request, resultHandler);
++//  }
++//
++//  @Override
++//  public void executeNonQueryPlan(
++//      ExecutNonQueryReq request, AsyncMethodCallback<TSStatus> resultHandler) {
++//    asyncService.executeNonQueryPlan(request, resultHandler);
++//  }
++//
++//  @Override
++//  public void requestCommitIndex(
++//      RaftNode header, AsyncMethodCallback<RequestCommitIndexResponse> resultHandler) {
++//    asyncService.requestCommitIndex(header, resultHandler);
++//  }
++//
++//  @Override
++//  public void checkAlive(AsyncMethodCallback<Node> resultHandler) {
++//    asyncService.checkAlive(resultHandler);
++//  }
++//
++//  @Override
++//  public void collectMigrationStatus(AsyncMethodCallback<ByteBuffer> resultHandler) {
++//    asyncService.collectMigrationStatus(resultHandler);
++//  }
++//
++//  @Override
++//  public void readFile(
++//      String filePath, long offset, int length, AsyncMethodCallback<ByteBuffer> resultHandler) {
++//    asyncService.readFile(filePath, offset, length, resultHandler);
++//  }
++//
++//  @Override
++//  public void queryNodeStatus(AsyncMethodCallback<TNodeStatus> resultHandler) {
++//    asyncService.queryNodeStatus(resultHandler);
++//  }
++//
++//  public MetaGroupMember getMember() {
++//    return member;
++//  }
++//
++//  @Override
++//  public void checkStatus(
++//      StartUpStatus startUpStatus, AsyncMethodCallback<CheckStatusResponse> resultHandler) {
++//    asyncService.checkStatus(startUpStatus, resultHandler);
++//  }
++//
++//  @Override
++//  public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) {
++//    asyncService.removeNode(node, resultHandler);
++//  }
++//
++//  @Override
++//  public void exile(ByteBuffer removeNodeLog, AsyncMethodCallback<Void> resultHandler) {
++//    asyncService.exile(removeNodeLog, resultHandler);
++//  }
++//
++//  @Override
++//  public void matchTerm(
++//      long index, long term, RaftNode header, AsyncMethodCallback<Boolean> resultHandler) {
++//    asyncService.matchTerm(index, term, header, resultHandler);
++//  }
++//
++//  @Override
++//  public AddNodeResponse addNode(Node node, StartUpStatus startUpStatus) throws TException {
++//    return syncService.addNode(node, startUpStatus);
++//  }
++//
++//  @Override
++//  public CheckStatusResponse checkStatus(StartUpStatus startUpStatus) {
++//    return syncService.checkStatus(startUpStatus);
++//  }
++//
++//  @Override
++//  public long removeNode(Node node) throws TException {
++//    return syncService.removeNode(node);
++//  }
++//
++//  @Override
++//  public void exile(ByteBuffer removeNodeLog) {
++//    syncService.exile(removeNodeLog);
++//  }
++//
++//  @Override
++//  public TNodeStatus queryNodeStatus() {
++//    return syncService.queryNodeStatus();
++//  }
++//
++//  @Override
++//  public Node checkAlive() {
++//    return syncService.checkAlive();
++//  }
++//
++//  @Override
++//  public ByteBuffer collectMigrationStatus() {
++//    return syncService.collectMigrationStatus();
++//  }
++//
++//  @Override
++//  public HeartBeatResponse sendHeartbeat(HeartBeatRequest request) {
++//    return syncService.sendHeartbeat(request);
++//  }
++//
++//  @Override
++//  public long startElection(ElectionRequest request) {
++//    return syncService.startElection(request);
++//  }
++//
++//  @Override
++//  public long appendEntries(AppendEntriesRequest request) throws TException {
++//    return syncService.appendEntries(request);
++//  }
++//
++//  @Override
++//  public long appendEntry(AppendEntryRequest request) throws TException {
++//    return syncService.appendEntry(request);
++//  }
++//
++//  @Override
++//  public void sendSnapshot(SendSnapshotRequest request) throws TException {
++//    syncService.sendSnapshot(request);
++//  }
++//
++//  @Override
++//  public TSStatus executeNonQueryPlan(ExecutNonQueryReq request) throws TException {
++//    return syncService.executeNonQueryPlan(request);
++//  }
++//
++//  @Override
++//  public RequestCommitIndexResponse requestCommitIndex(RaftNode header) throws TException {
++//    return syncService.requestCommitIndex(header);
++//  }
++//
++//  @Override
++//  public ByteBuffer readFile(String filePath, long offset, int length) throws TException {
++//    return syncService.readFile(filePath, offset, length);
++//  }
++//
++//  @Override
++//  public boolean matchTerm(long index, long term, RaftNode header) {
++//    return syncService.matchTerm(index, term, header);
++//  }
++//
++//  @Override
++//  public void removeHardLink(String hardLinkPath) throws TException {
++//    syncService.removeHardLink(hardLinkPath);
++//  }
++//
++//  @Override
++//  public void removeHardLink(String hardLinkPath, AsyncMethodCallback<Void> resultHandler) {
++//    asyncService.removeHardLink(hardLinkPath, resultHandler);
++//  }
++//
++//  @Override
++//  public void handshake(Node sender) {
++//    syncService.handshake(sender);
++//  }
++//
++//  @Override
++//  public void handshake(Node sender, AsyncMethodCallback<Void> resultHandler) {
++//    asyncService.handshake(sender, resultHandler);
++//  }
++//
++//  @TestOnly
++//  public void setMetaGroupMember(MetaGroupMember metaGroupMember) {
++//    this.member = metaGroupMember;
++//  }
++// }
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
index 09956d2,09956d2..0000000
deleted file mode 100644,100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
+++ /dev/null
@@@ -1,263 -1,263 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- *
-- *     http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing,
-- * software distributed under the License is distributed on an
-- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- * KIND, either express or implied.  See the License for the
-- * specific language governing permissions and limitations
-- * under the License.
-- */
--
--package org.apache.iotdb.cluster.server;
--
--import org.apache.iotdb.cluster.config.ClusterConfig;
--import org.apache.iotdb.cluster.config.ClusterDescriptor;
--import org.apache.iotdb.cluster.rpc.thrift.Node;
--import org.apache.iotdb.cluster.rpc.thrift.RaftService;
--import org.apache.iotdb.cluster.utils.ClusterUtils;
--import org.apache.iotdb.db.conf.IoTDBDescriptor;
--import org.apache.iotdb.db.exception.StartupException;
--import org.apache.iotdb.db.utils.CommonUtils;
--import org.apache.iotdb.db.utils.TestOnly;
--import org.apache.iotdb.rpc.RpcTransportFactory;
--
--import org.apache.thrift.TProcessor;
--import org.apache.thrift.protocol.TBinaryProtocol;
--import org.apache.thrift.protocol.TCompactProtocol;
--import org.apache.thrift.protocol.TProtocolFactory;
--import org.apache.thrift.server.TServer;
--import org.apache.thrift.server.TThreadedSelectorServer;
--import org.apache.thrift.transport.TNonblockingServerTransport;
--import org.apache.thrift.transport.TServerTransport;
--import org.apache.thrift.transport.TTransportException;
--import org.slf4j.Logger;
--import org.slf4j.LoggerFactory;
--
--import java.util.ConcurrentModificationException;
--import java.util.concurrent.ExecutorService;
--import java.util.concurrent.Executors;
--import java.util.concurrent.SynchronousQueue;
--import java.util.concurrent.ThreadFactory;
--import java.util.concurrent.ThreadPoolExecutor;
--import java.util.concurrent.atomic.AtomicLong;
--
--/**
-- * RaftServer works as a broker (network and protocol layer) that sends the requests to the proper
-- * RaftMembers to process.
-- */
--public abstract class RaftServer implements RaftService.AsyncIface, RaftService.Iface {
--
--  private static final Logger logger = LoggerFactory.getLogger(RaftServer.class);
--  private static int connectionTimeoutInMS =
--      ClusterDescriptor.getInstance().getConfig().getConnectionTimeoutInMS();
--  private static int readOperationTimeoutMS =
--      ClusterDescriptor.getInstance().getConfig().getReadOperationTimeoutMS();
--  private static int writeOperationTimeoutMS =
--      ClusterDescriptor.getInstance().getConfig().getWriteOperationTimeoutMS();
--  private static int syncLeaderMaxWaitMs = 20 * 1000;
--  private static long heartBeatIntervalMs = 1000L;
--
--  ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
--  // the socket poolServer will listen to
--  private TServerTransport socket;
--  // RPC processing server
--  private TServer poolServer;
--  Node thisNode;
--
--  TProtocolFactory protocolFactory =
--      config.isRpcThriftCompressionEnabled()
--          ? new TCompactProtocol.Factory()
--          : new TBinaryProtocol.Factory();
--
--  // this thread pool is to run the thrift server (poolServer above)
--  private ExecutorService clientService;
--
--  RaftServer() {
--    thisNode = new Node();
--    // set internal rpc ip and ports
--    thisNode.setInternalIp(config.getInternalIp());
--    thisNode.setMetaPort(config.getInternalMetaPort());
--    thisNode.setDataPort(config.getInternalDataPort());
--    // set client rpc ip and ports
--    thisNode.setClientPort(config.getClusterRpcPort());
--    thisNode.setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress());
--  }
--
--  RaftServer(Node thisNode) {
--    this.thisNode = thisNode;
--  }
--
--  public static int getConnectionTimeoutInMS() {
--    return connectionTimeoutInMS;
--  }
--
--  public static void setConnectionTimeoutInMS(int connectionTimeoutInMS) {
--    RaftServer.connectionTimeoutInMS = connectionTimeoutInMS;
--  }
--
--  public static int getReadOperationTimeoutMS() {
--    return readOperationTimeoutMS;
--  }
--
--  public static int getWriteOperationTimeoutMS() {
--    return writeOperationTimeoutMS;
--  }
--
--  public static int getSyncLeaderMaxWaitMs() {
--    return syncLeaderMaxWaitMs;
--  }
--
--  public static void setSyncLeaderMaxWaitMs(int syncLeaderMaxWaitMs) {
--    RaftServer.syncLeaderMaxWaitMs = syncLeaderMaxWaitMs;
--  }
--
--  public static long getHeartBeatIntervalMs() {
--    return heartBeatIntervalMs;
--  }
--
--  public static void setHeartBeatIntervalMs(long heartBeatIntervalMs) {
--    RaftServer.heartBeatIntervalMs = heartBeatIntervalMs;
--  }
--
--  /**
--   * Establish a thrift server with the configurations in ClusterConfig to listen to and respond to
--   * thrift RPCs. Calling the method twice does not induce side effects.
--   *
--   * @throws TTransportException
--   */
--  @SuppressWarnings("java:S1130") // thrown in override method
--  public void start() throws TTransportException, StartupException {
--    if (poolServer != null) {
--      return;
--    }
--
--    establishServer();
--  }
--
--  /**
--   * Stop the thrift server, close the socket and interrupt all in progress RPCs. Calling the method
--   * twice does not induce side effects.
--   */
--  public void stop() {
--    if (poolServer == null) {
--      return;
--    }
--
--    try {
--      poolServer.stop();
--    } catch (ConcurrentModificationException e) {
--      // ignore
--    }
--    socket.close();
--    clientService.shutdownNow();
--    socket = null;
--    poolServer = null;
--  }
--
--  /**
--   * @return An AsyncProcessor that contains the extended interfaces of a non-abstract subclass of
--   *     RaftService (DataService or MetaService).
--   */
--  abstract TProcessor getProcessor();
--
--  /**
--   * @return A socket that will be used to establish a thrift server to listen to RPC requests.
--   *     DataServer and MetaServer use different port, so this is to be determined.
--   * @throws TTransportException
--   */
--  abstract TServerTransport getServerSocket() throws TTransportException;
--
--  /**
--   * Each thrift RPC request will be processed in a separate thread and this will return the name
--   * prefix of such threads. This is used to fast distinguish DataServer and MetaServer in the logs
--   * for the sake of debug.
--   *
--   * @return name prefix of RPC processing threads.
--   */
--  abstract String getClientThreadPrefix();
--
--  /**
--   * The thrift server will be run in a separate thread, and this will be its name. It help you
--   * locate the desired logs quickly when debugging.
--   *
--   * @return The name of the thread running the thrift server.
--   */
--  abstract String getServerClientName();
--
--  private TServer createAsyncServer() throws TTransportException {
--    socket = getServerSocket();
--    TThreadedSelectorServer.Args poolArgs =
--        new TThreadedSelectorServer.Args((TNonblockingServerTransport) socket);
--    poolArgs.maxReadBufferBytes = IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize();
--    poolArgs.selectorThreads(CommonUtils.getCpuCores());
--    int maxConcurrentClientNum =
--        Math.max(CommonUtils.getCpuCores(), config.getMaxConcurrentClientNum());
--    poolArgs.executorService(
--        new ThreadPoolExecutor(
--            CommonUtils.getCpuCores(),
--            maxConcurrentClientNum,
--            poolArgs.getStopTimeoutVal(),
--            poolArgs.getStopTimeoutUnit(),
--            new SynchronousQueue<>(),
--            new ThreadFactory() {
--              private AtomicLong threadIndex = new AtomicLong(0);
--
--              @Override
--              public Thread newThread(Runnable r) {
--                return new Thread(r, getClientThreadPrefix() + threadIndex.incrementAndGet());
--              }
--            }));
--    poolArgs.processor(getProcessor());
--    poolArgs.protocolFactory(protocolFactory);
--    // async service requires FramedTransport
--    poolArgs.transportFactory(RpcTransportFactory.INSTANCE);
--
--    // run the thrift server in a separate thread so that the main thread is not blocked
--    return new TThreadedSelectorServer(poolArgs);
--  }
--
--  private TServer createSyncServer() throws TTransportException {
--    socket = getServerSocket();
--    return ClusterUtils.createTThreadPoolServer(
--        socket, getClientThreadPrefix(), getProcessor(), protocolFactory);
--  }
--
--  private void establishServer() throws TTransportException {
--    logger.info(
--        "[{}] Cluster node {} begins to set up with {} mode",
--        getServerClientName(),
--        thisNode,
--        ClusterDescriptor.getInstance().getConfig().isUseAsyncServer() ? "Async" : "Sync");
--
--    if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
--      poolServer = createAsyncServer();
--    } else {
--      poolServer = createSyncServer();
--    }
--
--    clientService = Executors.newSingleThreadExecutor(r -> new Thread(r, getServerClientName()));
--
--    clientService.submit(() -> poolServer.serve());
--
--    logger.info("[{}] Cluster node {} is up", getServerClientName(), thisNode);
--  }
--
--  @TestOnly
--  public static void setReadOperationTimeoutMS(int readOperationTimeoutMS) {
--    RaftServer.readOperationTimeoutMS = readOperationTimeoutMS;
--  }
--
--  @TestOnly
--  public static void setWriteOperationTimeoutMS(int writeOperationTimeoutMS) {
--    RaftServer.writeOperationTimeoutMS = writeOperationTimeoutMS;
--  }
--}
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java
index ed99c3d,ed99c3d..348cda0
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java
@@@ -23,9 -23,9 +23,8 @@@ import org.apache.iotdb.cluster.config.
  import org.apache.iotdb.cluster.rpc.thrift.Node;
  import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncProcessor;
  import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Processor;
--import org.apache.iotdb.cluster.server.MetaClusterServer;
++import org.apache.iotdb.cluster.server.RaftTSMetaServiceImpl;
  import org.apache.iotdb.cluster.utils.ClusterUtils;
--
  import org.apache.thrift.TProcessor;
  import org.apache.thrift.transport.TNonblockingServerSocket;
  import org.apache.thrift.transport.TServerSocket;
@@@ -39,22 -39,22 +38,22 @@@ import java.net.InetSocketAddress
  public class MetaHeartbeatServer extends HeartbeatServer {
    private static Logger logger = LoggerFactory.getLogger(MetaHeartbeatServer.class);
  
--  private MetaClusterServer metaClusterServer;
++  private RaftTSMetaServiceImpl RaftTSMetaServiceImpl;
  
    /** Do not use this method for initialization */
    private MetaHeartbeatServer() {}
  
--  public MetaHeartbeatServer(Node thisNode, MetaClusterServer metaClusterServer) {
++  public MetaHeartbeatServer(Node thisNode, RaftTSMetaServiceImpl RaftTSMetaServiceImpl) {
      super(thisNode);
--    this.metaClusterServer = metaClusterServer;
++    this.RaftTSMetaServiceImpl = RaftTSMetaServiceImpl;
    }
  
    @Override
    TProcessor getProcessor() {
      if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
--      return new AsyncProcessor<>(metaClusterServer);
++      return new AsyncProcessor<>(RaftTSMetaServiceImpl);
      } else {
--      return new Processor<>(metaClusterServer);
++      return new Processor<>(RaftTSMetaServiceImpl);
      }
    }
  
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 39ebf3c,f917916..62ce37e
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@@ -65,11 -65,11 +65,9 @@@ import org.apache.iotdb.cluster.rpc.thr
  import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus;
  import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
  import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncClient;
- import org.apache.iotdb.cluster.server.ClusterTSServiceImpl;
 -import org.apache.iotdb.cluster.server.ClientServer;
  import org.apache.iotdb.cluster.server.DataClusterServer;
  import org.apache.iotdb.cluster.server.HardLinkCleaner;
  import org.apache.iotdb.cluster.server.NodeCharacter;
--import org.apache.iotdb.cluster.server.RaftServer;
  import org.apache.iotdb.cluster.server.Response;
  import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
  import org.apache.iotdb.cluster.server.handlers.caller.NodeStatusHandler;
@@@ -87,20 -87,20 +85,22 @@@ import org.apache.iotdb.cluster.utils.S
  import org.apache.iotdb.cluster.utils.nodetool.function.Status;
  import org.apache.iotdb.db.conf.IoTDBDescriptor;
  import org.apache.iotdb.db.engine.StorageEngine;
++import org.apache.iotdb.db.exception.ShutdownException;
  import org.apache.iotdb.db.exception.StartupException;
  import org.apache.iotdb.db.exception.StorageEngineException;
  import org.apache.iotdb.db.exception.metadata.MetadataException;
  import org.apache.iotdb.db.exception.query.QueryProcessException;
  import org.apache.iotdb.db.metadata.PartialPath;
  import org.apache.iotdb.db.qp.physical.PhysicalPlan;
++import org.apache.iotdb.db.service.IService;
  import org.apache.iotdb.db.service.IoTDB;
++import org.apache.iotdb.db.service.ServiceType;
  import org.apache.iotdb.db.utils.TestOnly;
  import org.apache.iotdb.db.utils.TimeValuePairUtils;
  import org.apache.iotdb.db.utils.TimeValuePairUtils.Intervals;
  import org.apache.iotdb.service.rpc.thrift.EndPoint;
  import org.apache.iotdb.service.rpc.thrift.TSStatus;
  import org.apache.iotdb.tsfile.read.filter.basic.Filter;
--
  import org.apache.thrift.TException;
  import org.apache.thrift.protocol.TProtocolFactory;
  import org.apache.thrift.transport.TTransportException;
@@@ -145,7 -145,7 +145,7 @@@ import static org.apache.iotdb.cluster.
  import static org.apache.iotdb.cluster.utils.ClusterUtils.analyseStartUpCheckResult;
  
  @SuppressWarnings("java:S1135")
--public class MetaGroupMember extends RaftMember {
++public class MetaGroupMember extends RaftMember implements IService {
  
    /** the file that contains the identifier of this node */
    static final String NODE_IDENTIFIER_FILE_NAME =
@@@ -209,12 -209,12 +209,6 @@@
    /** each node starts a data heartbeat server to transfer heartbeat requests */
    private DataHeartbeatServer dataHeartbeatServer;
  
--  /**
--   * an override of TSServiceImpl, which redirect JDBC and Session requests to the MetaGroupMember
--   * so they can be processed cluster-wide
--   */
-   private ClusterTSServiceImpl clusterTSServiceImpl;
 -  private ClientServer clientServer;
--
    private DataClientProvider dataClientProvider;
  
    /**
@@@ -276,7 -276,7 +270,6 @@@
      Factory dataMemberFactory = new Factory(factory, this);
      dataClusterServer = new DataClusterServer(thisNode, dataMemberFactory, this);
      dataHeartbeatServer = new DataHeartbeatServer(thisNode, dataClusterServer);
-     clusterTSServiceImpl = new ClusterTSServiceImpl(this);
 -    clientServer = new ClientServer(this);
      startUpStatus = getNewStartUpStatus();
  
      // try loading the partition table if there was a previous cluster
@@@ -333,8 -333,8 +326,8 @@@
    }
  
    /**
-    * Stop the heartbeat and catch-up thread pool, DataClusterServer, ClusterTSServiceImpl and reportThread.
 -   * Stop the heartbeat and catch-up thread pool, DataClusterServer, ClientServer and reportThread.
--   * Calling the method twice does not induce side effects.
++   * Stop the heartbeat and catch-up thread pool, DataClusterServer, ClusterTSServiceImpl and
++   * reportThread. Calling the method twice does not induce side effects.
     */
    @Override
    public void stop() {
@@@ -345,9 -345,9 +338,6 @@@
      if (getDataHeartbeatServer() != null) {
        getDataHeartbeatServer().stop();
      }
-     if (clusterTSServiceImpl != null) {
-       clusterTSServiceImpl.stop();
 -    if (clientServer != null) {
 -      clientServer.stop();
--    }
      if (reportThread != null) {
        reportThread.shutdownNow();
        try {
@@@ -370,15 -370,15 +360,29 @@@
      logger.info("{}: stopped", name);
    }
  
++  @Override
++  public void waitAndStop(long milliseconds) {
++    IService.super.waitAndStop(milliseconds);
++  }
++
++  @Override
++  public void shutdown(long milliseconds) throws ShutdownException {
++    IService.super.shutdown(milliseconds);
++  }
++
++  @Override
++  public ServiceType getID() {
++    return ServiceType.CLUSTER_META_ENGINE;
++  }
++
    /**
-    * Start DataClusterServer and ClusterTSServiceImpl so this node will be able to respond to other nodes
 -   * Start DataClusterServer and ClientServer so this node will be able to respond to other nodes
--   * and clients.
++   * Start DataClusterServer and ClusterTSServiceImpl so this node will be able to respond to other
++   * nodes and clients.
     */
    protected void initSubServers() throws TTransportException, StartupException {
      getDataClusterServer().start();
      getDataHeartbeatServer().start();
-     clusterTSServiceImpl.setCoordinator(this.coordinator);
-     clusterTSServiceImpl.start();
 -    clientServer.setCoordinator(this.coordinator);
 -    clientServer.start();
++    // TODO FIXME
    }
  
    /**
@@@ -715,9 -714,9 +718,9 @@@
    /**
     * Process a HeartBeatResponse from a follower. If the follower has provided its identifier, try
     * registering for it and if all nodes have registered and there is no available partition table,
-    * initialize a new one and start the ClusterTSServiceImpl and DataClusterServer. If the follower requires
 -   * initialize a new one and start the ClientServer and DataClusterServer. If the follower requires
--   * a partition table, add it to the blind node list so that at the next heartbeat this node will
--   * send it a partition table
++   * initialize a new one and start the ClusterTSServiceImpl and DataClusterServer. If the follower
++   * requires a partition table, add it to the blind node list so that at the next heartbeat this
++   * node will send it a partition table
     */
    @Override
    public void processValidHeartbeatResp(HeartBeatResponse response, Node receiver) {
@@@ -800,8 -799,8 +803,8 @@@
    }
  
    /**
-    * Start the DataClusterServer and ClusterTSServiceImpl` so this node can serve other nodes and clients.
 -   * Start the DataClusterServer and ClientServer so this node can serve other nodes and clients.
--   * Also build DataGroupMembers using the partition table.
++   * Start the DataClusterServer and ClusterTSServiceImpl` so this node can serve other nodes and
++   * clients. Also build DataGroupMembers using the partition table.
     */
    protected synchronized void startSubServers() {
      logger.info("Starting sub-servers...");
@@@ -1452,7 -1451,7 +1455,8 @@@
    private TSStatus forwardDataPlanAsync(PhysicalPlan plan, Node receiver, RaftNode header)
        throws IOException {
      RaftService.AsyncClient client =
--        getClientProvider().getAsyncDataClient(receiver, RaftServer.getWriteOperationTimeoutMS());
++        getClientProvider()
++            .getAsyncDataClient(receiver, ClusterConstant.getWriteOperationTimeoutMS());
      return forwardPlanAsync(plan, receiver, header, client);
    }
  
@@@ -1461,7 -1460,7 +1465,8 @@@
      Client client;
      try {
        client =
--          getClientProvider().getSyncDataClient(receiver, RaftServer.getWriteOperationTimeoutMS());
++          getClientProvider()
++              .getSyncDataClient(receiver, ClusterConstant.getWriteOperationTimeoutMS());
      } catch (TException e) {
        throw new IOException(e);
      }
@@@ -1630,7 -1629,7 +1635,7 @@@
      client.collectMigrationStatus(migrationStatusHandler);
      synchronized (resultRef) {
        if (resultRef.get() == null) {
--        resultRef.wait(RaftServer.getConnectionTimeoutInMS());
++        resultRef.wait(ClusterConstant.getConnectionTimeoutInMS());
        }
      }
      return ClusterUtils.deserializeMigrationStatus(resultRef.get());
@@@ -1814,9 -1813,9 +1819,10 @@@
                      // ignore
                    }
                    super.stop();
-                   if (clusterTSServiceImpl != null) {
-                     clusterTSServiceImpl.stop();
 -                  if (clientServer != null) {
 -                    clientServer.stop();
--                  }
++                  // TODO FIXME
++                  //                  if (clusterTSServiceImpl != null) {
++                  //                    clusterTSServiceImpl.stop();
++                  //                  }
                    logger.info("{} has been removed from the cluster", name);
                  })
              .start();
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
index 9250785,ce941f5..560c0fe
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
@@@ -18,7 -18,7 +18,8 @@@
   */
  package org.apache.iotdb.cluster.utils.nodetool;
  
 -import org.apache.iotdb.cluster.ClusterMain;
++import org.apache.commons.collections4.map.MultiKeyMap;
 +import org.apache.iotdb.cluster.ClusterIoTDB;
  import org.apache.iotdb.cluster.config.ClusterConstant;
  import org.apache.iotdb.cluster.config.ClusterDescriptor;
  import org.apache.iotdb.cluster.partition.PartitionGroup;
@@@ -26,8 -26,8 +27,8 @@@ import org.apache.iotdb.cluster.partiti
  import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
  import org.apache.iotdb.cluster.rpc.thrift.Node;
  import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
--import org.apache.iotdb.cluster.server.MetaClusterServer;
  import org.apache.iotdb.cluster.server.NodeCharacter;
++import org.apache.iotdb.cluster.server.RaftTSMetaServiceImpl;
  import org.apache.iotdb.cluster.server.member.DataGroupMember;
  import org.apache.iotdb.cluster.server.member.MetaGroupMember;
  import org.apache.iotdb.cluster.server.monitor.Timer;
@@@ -40,8 -40,8 +41,6 @@@ import org.apache.iotdb.db.service.ISer
  import org.apache.iotdb.db.service.JMXService;
  import org.apache.iotdb.db.service.ServiceType;
  import org.apache.iotdb.tsfile.utils.Pair;
--
--import org.apache.commons.collections4.map.MultiKeyMap;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -201,11 -201,11 +200,11 @@@ public class ClusterMonitor implements 
    }
  
    private MetaGroupMember getMetaGroupMember() {
-     MetaClusterServer metaClusterServer = ClusterIoTDB.getInstance().getMetaServer();
 -    MetaClusterServer metaClusterServer = ClusterMain.getMetaServer();
--    if (metaClusterServer == null) {
++    RaftTSMetaServiceImpl RaftTSMetaServiceImpl = ClusterIoTDB.getInstance().getMetaServer();
++    if (RaftTSMetaServiceImpl == null) {
        return null;
      }
--    return metaClusterServer.getMember();
++    return RaftTSMetaServiceImpl.getMember();
    }
  
    private PartitionTable getPartitionTable() {
diff --cc cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java
index 7565e9d,70fbb66..c4f1dd7
--- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java
@@@ -20,12 -20,12 +20,11 @@@
  package org.apache.iotdb.cluster.integration;
  
  import org.apache.iotdb.cluster.config.ClusterDescriptor;
--import org.apache.iotdb.cluster.server.MetaClusterServer;
++import org.apache.iotdb.cluster.server.RaftTSMetaServiceImpl;
  import org.apache.iotdb.cluster.utils.Constants;
  import org.apache.iotdb.db.utils.EnvironmentUtils;
  import org.apache.iotdb.rpc.IoTDBConnectionException;
  import org.apache.iotdb.session.Session;
--
  import org.junit.After;
  import org.junit.Before;
  
@@@ -34,7 -34,7 +33,7 @@@ import java.util.List
  
  public abstract class BaseSingleNodeTest {
  
--  private MetaClusterServer metaServer;
++  private RaftTSMetaServiceImpl metaServer;
  
    private boolean useAsyncServer;
    private List<String> seedNodeUrls;
@@@ -44,14 -44,13 +43,14 @@@
    @Before
    public void setUp() throws Exception {
      initConfigs();
--    metaServer = new MetaClusterServer();
++    metaServer = new RaftTSMetaServiceImpl();
      metaServer.start();
      metaServer.buildCluster();
    }
  
    @After
    public void tearDown() throws Exception {
-     //TODO fixme
++    // TODO fixme
      metaServer.stop();
      recoverConfigs();
      EnvironmentUtils.cleanEnv();
diff --cc cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImplTest.java
index 499efce,5dde20f..60da68d8
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImplTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImplTest.java
@@@ -19,15 -19,15 +19,14 @@@
  
  package org.apache.iotdb.cluster.server.clusterinfo;
  
 -import org.apache.iotdb.cluster.ClusterMain;
 +import org.apache.iotdb.cluster.ClusterIoTDB;
  import org.apache.iotdb.cluster.rpc.thrift.DataPartitionEntry;
  import org.apache.iotdb.cluster.rpc.thrift.Node;
--import org.apache.iotdb.cluster.server.MetaClusterServer;
++import org.apache.iotdb.cluster.server.RaftTSMetaServiceImpl;
  import org.apache.iotdb.cluster.server.member.MetaGroupMember;
  import org.apache.iotdb.cluster.server.member.MetaGroupMemberTest;
  import org.apache.iotdb.db.exception.metadata.MetadataException;
  import org.apache.iotdb.db.metadata.PartialPath;
--
  import org.apache.thrift.TException;
  import org.junit.After;
  import org.junit.Assert;
@@@ -48,13 -48,13 +47,13 @@@ public class ClusterInfoServiceImplTes
      metaGroupMemberTest.setUp();
      MetaGroupMember metaGroupMember = metaGroupMemberTest.getTestMetaGroupMember();
  
--    MetaClusterServer metaClusterServer = new MetaClusterServer();
--    metaClusterServer.getMember().stop();
--    metaClusterServer.setMetaGroupMember(metaGroupMember);
++    RaftTSMetaServiceImpl RaftTSMetaServiceImpl = new RaftTSMetaServiceImpl();
++    RaftTSMetaServiceImpl.getMember().stop();
++    RaftTSMetaServiceImpl.setMetaGroupMember(metaGroupMember);
  
-     ClusterIoTDB.setMetaClusterServer(metaClusterServer);
 -    ClusterMain.setMetaClusterServer(metaClusterServer);
++    ClusterIoTDB.setMetaClusterServer(RaftTSMetaServiceImpl);
  
--    metaClusterServer.getIoTDB().metaManager.setStorageGroup(new PartialPath("root", "sg"));
++    RaftTSMetaServiceImpl.getIoTDB().metaManager.setStorageGroup(new PartialPath("root", "sg"));
      // metaClusterServer.getMember()
      impl = new ClusterInfoServiceImpl();
    }
diff --cc server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
index 6f425ae,6f425ae..7c75e89
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
@@@ -27,6 -27,6 +27,7 @@@ import java.util.concurrent.Executors
  import java.util.concurrent.ScheduledExecutorService;
  import java.util.concurrent.SynchronousQueue;
  import java.util.concurrent.ThreadPoolExecutor;
++import java.util.concurrent.TimeUnit;
  
  /** This class is used to create thread pool which must contain the pool name. */
  public class IoTDBThreadPoolFactory {
@@@ -132,6 -132,6 +133,23 @@@
  
    /** function for creating thrift rpc client thread pool. */
    public static ExecutorService createThriftRpcClientThreadPool(
++      int minWorkerThreads,
++      int maxWorkerThreads,
++      int stopTimeoutVal,
++      TimeUnit stopTimeoutUnit,
++      String poolName) {
++    SynchronousQueue<Runnable> executorQueue = new SynchronousQueue<>();
++    return new ThreadPoolExecutor(
++        minWorkerThreads,
++        maxWorkerThreads,
++        stopTimeoutVal,
++        stopTimeoutUnit,
++        executorQueue,
++        new IoTThreadFactory(poolName));
++  }
++
++  /** function for creating thrift rpc client thread pool. */
++  public static ExecutorService createThriftRpcClientThreadPool(
        TThreadPoolServer.Args args, String poolName, Thread.UncaughtExceptionHandler handler) {
      SynchronousQueue<Runnable> executorQueue = new SynchronousQueue<>();
      return new ThreadPoolExecutor(
diff --cc server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index bbbd6f1,0850b75..d37f57e
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@@ -46,9 -46,7 +46,12 @@@ public enum ThreadName 
    QUERY_SERVICE("Query"),
    WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"),
    CONTINUOUS_QUERY_SERVICE("ContinuousQueryTaskPoolManager"),
 -  CLUSTER_INFO_SERVICE("ClusterInfoClient");
 +  CLUSTER_INFO_SERVICE("ClusterInfoClient"),
 +  CLUSTER_RPC_SERVICE("ClusterRPC"),
-   CLUSTER_RPC_CLIENT("Cluster-RPC-Client");
++  CLUSTER_RPC_CLIENT("Cluster-RPC-Client"),
++  CLUSTER_META_SERVICE("ClusterMetaService"),
++  CLUSTER_META_HEARTBEAT_SERVICE("ClusterMetaHeartbeatService"),
++  CLUSTER_DATA_SERVICE("ClusterDataService");
  
    private final String name;
  
diff --cc server/src/main/java/org/apache/iotdb/db/query/control/TracingManager.java
index 6b61323,b2dd13a..83db4bc
--- a/server/src/main/java/org/apache/iotdb/db/query/control/TracingManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/TracingManager.java
@@@ -43,19 -43,13 +43,20 @@@ public class TracingManager 
    private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
    private BufferedWriter writer;
    private Map<Long, Long> queryStartTime = new ConcurrentHashMap<>();
+   private Map<Long, TracingInfo> tracingInfoMap = new ConcurrentHashMap<>();
  
 -  public TracingManager(String dirName, String logFileName) {
 -    initTracingManager(dirName, logFileName);
 +  private TracingManager() {
 +    initTracingManager();
    }
  
 -  public void initTracingManager(String dirName, String logFileName) {
 +  public void initTracingManager() {
 +    if (this.writer != null) {
-       //the tracing manager has been initialized.
++      // the tracing manager has been initialized.
 +      return;
 +    }
 +    String dirName = IoTDBDescriptor.getInstance().getConfig().getTracingDir();
 +    String logFileName = IoTDBConstant.TRACING_LOG;
 +
      File tracingDir = SystemFileFactory.INSTANCE.getFile(dirName);
      if (!tracingDir.exists()) {
        if (tracingDir.mkdirs()) {
diff --cc server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 9cd89e6,1154e3d..f15d384
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@@ -152,14 -152,6 +152,14 @@@ public class IoTDB implements IoTDBMBea
  
    private void deactivate() {
      logger.info("Deactivating IoTDB...");
-     //some user may call Tracing on but do not close tracing.
-     //so, when remove the system, we have to close the tracing
++    // some user may call Tracing on but do not close tracing.
++    // so, when remove the system, we have to close the tracing
 +    if (IoTDBDescriptor.getInstance().getConfig().isEnablePerformanceTracing()) {
 +      TracingManager.getInstance().close();
 +    }
 +    PrimitiveArrayManager.close();
 +    SystemInfo.getInstance().close();
 +
      registerManager.deregisterAll();
      JMXService.deregisterMBean(mbeanName);
      logger.info("IoTDB is deactivated.");
diff --cc server/src/main/java/org/apache/iotdb/db/service/RPCService.java
index be5f2f7,5bbddec..6f9292c
--- a/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java
@@@ -47,6 -55,6 +47,7 @@@ public class RPCService extends ThriftS
          (TSServiceImpl)
              Class.forName(IoTDBDescriptor.getInstance().getConfig().getRpcImplClassName())
                  .newInstance();
++    initSyncedServiceImpl(null);
      processor = new Processor<>(impl);
    }
  
diff --cc server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index febacac,2f61d90..90b0136
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@@ -55,9 -55,6 +55,12 @@@ public enum ServiceType 
    SYSTEMINFO_SERVICE("MemTable Monitor Service", "MemTable, Monitor"),
    CONTINUOUS_QUERY_SERVICE("Continuous Query Service", "Continuous Query Service"),
    CLUSTER_INFO_SERVICE("Cluster Monitor Service (thrift-based)", "Cluster Monitor-Thrift"),
 +
-   CLUSTER_RPC_SERVICE("Cluster RPC ServerService", "ClusterRPCService"),
- 
++  CLUSTER_RPC_SERVICE("Cluster RPC Service", "ClusterRPCService"),
++  CLUSTER_META_RPC_SERVICE("Cluster Meta RPC Service", "ClusterMetaRPCService"),
++  CLUSTER_DATA_RPC_SERVICE("Cluster Data RPC Service", "ClusterDataRPCService"),
++  CLUSTER_META_ENGINE("Cluster Meta Engine", "ClusterMetaEngine"),
++  CLUSTER_META_HEART_BEAT("Cluster Meta Heartbeat Service", "ClusterMetaHeartbeat"),
    ;
  
    private final String name;
diff --cc server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftService.java
index dfb2526,d975743..45fb0ec
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftService.java
@@@ -19,13 -19,13 +19,10 @@@
  
  package org.apache.iotdb.db.service.thrift;
  
--import org.apache.iotdb.db.conf.IoTDBConfig;
  import org.apache.iotdb.db.conf.IoTDBConstant;
--import org.apache.iotdb.db.conf.IoTDBDescriptor;
  import org.apache.iotdb.db.exception.StartupException;
  import org.apache.iotdb.db.service.IService;
  import org.apache.iotdb.db.service.JMXService;
--
  import org.apache.thrift.TProcessor;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -78,6 -83,6 +75,17 @@@ public abstract class ThriftService imp
      JMXService.deregisterMBean(mbeanName);
    }
  
++  boolean setSyncedImpl = false;
++  boolean setAsyncedImpl = false;
++
++  public void initSyncedServiceImpl(Object serviceImpl) {
++    setSyncedImpl = true;
++  }
++
++  public void initAsyncedServiceImpl(Object serviceImpl) {
++    setAsyncedImpl = true;
++  }
++
    public abstract void initTProcessor()
        throws ClassNotFoundException, IllegalAccessException, InstantiationException;
  
@@@ -101,6 -106,6 +109,10 @@@
      try {
        reset();
        initTProcessor();
++      if (setSyncedImpl || setAsyncedImpl) {
++        throw new StartupException(
++            "At least one service implementataion of {} should be set.", this.getID().getName());
++      }
        initThriftServiceThread();
        thriftServiceThread.setThreadStopLatch(stopLatch);
        thriftServiceThread.start();
diff --cc server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
index 2564d05,612d187..ed2237d
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
@@@ -24,14 -24,14 +24,18 @@@ import org.apache.iotdb.db.conf.IoTDBCo
  import org.apache.iotdb.db.exception.runtime.RPCServiceException;
  import org.apache.iotdb.db.utils.CommonUtils;
  import org.apache.iotdb.rpc.RpcTransportFactory;
--
++import org.apache.thrift.TBaseAsyncProcessor;
  import org.apache.thrift.TProcessor;
  import org.apache.thrift.protocol.TBinaryProtocol;
  import org.apache.thrift.protocol.TCompactProtocol;
  import org.apache.thrift.protocol.TProtocolFactory;
++import org.apache.thrift.server.THsHaServer;
  import org.apache.thrift.server.TServer;
  import org.apache.thrift.server.TServerEventHandler;
  import org.apache.thrift.server.TThreadPoolServer;
++import org.apache.thrift.server.TThreadedSelectorServer;
++import org.apache.thrift.transport.TNonblockingServerSocket;
++import org.apache.thrift.transport.TNonblockingServerTransport;
  import org.apache.thrift.transport.TServerSocket;
  import org.apache.thrift.transport.TServerTransport;
  import org.apache.thrift.transport.TTransportException;
@@@ -40,6 -40,6 +44,7 @@@ import org.slf4j.LoggerFactory
  
  import java.net.InetSocketAddress;
  import java.util.concurrent.CountDownLatch;
++import java.util.concurrent.TimeUnit;
  
  public class ThriftServiceThread extends Thread {
  
@@@ -51,8 -51,8 +56,95 @@@
    private String serviceName;
  
    private TProtocolFactory protocolFactory;
--  private TThreadPoolServer.Args poolArgs;
  
++  // currently, we can reuse the ProtocolFactory instance.
++  private static TCompactProtocol.Factory compactProtocolFactory = new TCompactProtocol.Factory();
++  private static TBinaryProtocol.Factory binaryProtocolFactory = new TBinaryProtocol.Factory();
++
++  private void initProtocolFactory(boolean compress) {
++    protocolFactory = getProtocolFactory(compress);
++  }
++
++  public static TProtocolFactory getProtocolFactory(boolean compress) {
++    if (compress) {
++      return compactProtocolFactory;
++    } else {
++      return binaryProtocolFactory;
++    }
++  }
++
++  private void catchFailedInitialization(TTransportException e) throws RPCServiceException {
++    close();
++    if (threadStopLatch == null) {
++      logger.debug("Stop Count Down latch is null");
++    } else {
++      logger.debug("Stop Count Down latch is {}", threadStopLatch.getCount());
++    }
++    if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
++      threadStopLatch.countDown();
++    }
++    logger.debug(
++        "{}: close TThreadPoolServer and TServerSocket for {}",
++        IoTDBConstant.GLOBAL_DB_NAME,
++        serviceName);
++    throw new RPCServiceException(
++        String.format(
++            "%s: failed to start %s, because ", IoTDBConstant.GLOBAL_DB_NAME, serviceName),
++        e);
++  }
++
++  /** for asynced ThriftService */
++  @SuppressWarnings("squid:S107")
++  public ThriftServiceThread(
++      TBaseAsyncProcessor processor,
++      String serviceName,
++      String threadsName,
++      String bindAddress,
++      int port,
++      int maxWorkerThreads,
++      int timeoutSecond,
++      TServerEventHandler serverEventHandler,
++      boolean compress,
++      int connectionTimeoutInMS,
++      int maxReadBufferBytes,
++      ServerType serverType) {
++    initProtocolFactory(compress);
++    this.serviceName = serviceName;
++    try {
++      serverTransport = openNonblockingTransport(bindAddress, port, connectionTimeoutInMS);
++      switch (serverType) {
++        case SELECTOR:
++          TThreadedSelectorServer.Args poolArgs =
++              initAsyncedSelectorPoolArgs(
++                  processor, threadsName, maxWorkerThreads, timeoutSecond, maxReadBufferBytes);
++          poolServer = new TThreadedSelectorServer(poolArgs);
++          break;
++        case HSHA:
++          THsHaServer.Args poolArgs1 =
++              initAsyncedHshaPoolArgs(
++                  processor, threadsName, maxWorkerThreads, timeoutSecond, maxReadBufferBytes);
++          poolServer = new THsHaServer(poolArgs1);
++          break;
++      }
++      poolServer.setServerEventHandler(serverEventHandler);
++    } catch (TTransportException e) {
++      catchFailedInitialization(e);
++    }
++  }
++
++  /**
++   * for synced ThriftServiceThread
++   *
++   * @param processor
++   * @param serviceName
++   * @param threadsName
++   * @param bindAddress
++   * @param port
++   * @param maxWorkerThreads
++   * @param timeoutSecond
++   * @param serverEventHandler
++   * @param compress
++   */
    @SuppressWarnings("squid:S107")
    public ThriftServiceThread(
        TProcessor processor,
@@@ -61,53 -61,53 +153,84 @@@
        String bindAddress,
        int port,
        int maxWorkerThreads,
 -      int timeoutMs,
 +      int timeoutSecond,
        TServerEventHandler serverEventHandler,
        boolean compress) {
--    if (compress) {
--      protocolFactory = new TCompactProtocol.Factory();
--    } else {
--      protocolFactory = new TBinaryProtocol.Factory();
--    }
++    initProtocolFactory(compress);
      this.serviceName = serviceName;
  
      try {
        serverTransport = openTransport(bindAddress, port);
--      poolArgs =
--          new TThreadPoolServer.Args(serverTransport)
--              .maxWorkerThreads(maxWorkerThreads)
--              .minWorkerThreads(CommonUtils.getCpuCores())
-               .stopTimeoutVal(timeoutSecond);
 -              .stopTimeoutVal(timeoutMs);
--      poolArgs.executorService =
--          IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs, threadsName);
--      poolArgs.processor(processor);
--      poolArgs.protocolFactory(protocolFactory);
--      poolArgs.transportFactory(RpcTransportFactory.INSTANCE);
++      TThreadPoolServer.Args poolArgs =
++          initSyncedPoolArgs(processor, threadsName, maxWorkerThreads, timeoutSecond);
        poolServer = new TThreadPoolServer(poolArgs);
        poolServer.setServerEventHandler(serverEventHandler);
      } catch (TTransportException e) {
--      close();
--      if (threadStopLatch == null) {
--        logger.debug("Stop Count Down latch is null");
--      } else {
--        logger.debug("Stop Count Down latch is {}", threadStopLatch.getCount());
--      }
--      if (threadStopLatch != null && threadStopLatch.getCount() == 1) {
--        threadStopLatch.countDown();
--      }
--      logger.debug(
--          "{}: close TThreadPoolServer and TServerSocket for {}",
--          IoTDBConstant.GLOBAL_DB_NAME,
--          serviceName);
--      throw new RPCServiceException(
--          String.format(
--              "%s: failed to start %s, because ", IoTDBConstant.GLOBAL_DB_NAME, serviceName),
--          e);
++      catchFailedInitialization(e);
      }
    }
  
++  private TThreadPoolServer.Args initSyncedPoolArgs(
++      TProcessor processor, String threadsName, int maxWorkerThreads, int timeoutSecond) {
++    TThreadPoolServer.Args poolArgs = new TThreadPoolServer.Args(serverTransport);
++    poolArgs
++        .maxWorkerThreads(maxWorkerThreads)
++        .minWorkerThreads(CommonUtils.getCpuCores())
++        .stopTimeoutVal(timeoutSecond);
++    poolArgs.executorService =
++        IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs, threadsName);
++    poolArgs.processor(processor);
++    poolArgs.protocolFactory(protocolFactory);
++    poolArgs.transportFactory(RpcTransportFactory.INSTANCE);
++    return poolArgs;
++  }
++
++  private TThreadedSelectorServer.Args initAsyncedSelectorPoolArgs(
++      TBaseAsyncProcessor processor,
++      String threadsName,
++      int maxWorkerThreads,
++      int timeoutSecond,
++      int maxReadBufferBytes) {
++    TThreadedSelectorServer.Args poolArgs =
++        new TThreadedSelectorServer.Args((TNonblockingServerTransport) serverTransport);
++    poolArgs.maxReadBufferBytes = maxReadBufferBytes;
++    poolArgs.selectorThreads(CommonUtils.getCpuCores());
++    poolArgs.executorService(
++        IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(
++            CommonUtils.getCpuCores(),
++            maxWorkerThreads,
++            timeoutSecond,
++            TimeUnit.SECONDS,
++            threadsName));
++    poolArgs.processor(processor);
++    poolArgs.protocolFactory(protocolFactory);
++    poolArgs.transportFactory(RpcTransportFactory.INSTANCE);
++    return poolArgs;
++  }
++
++  private THsHaServer.Args initAsyncedHshaPoolArgs(
++      TBaseAsyncProcessor processor,
++      String threadsName,
++      int maxWorkerThreads,
++      int timeoutSecond,
++      int maxReadBufferBytes) {
++    THsHaServer.Args poolArgs = new THsHaServer.Args((TNonblockingServerTransport) serverTransport);
++    poolArgs.maxReadBufferBytes = maxReadBufferBytes;
++    poolArgs.executorService(
++        IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(
++            CommonUtils.getCpuCores(),
++            maxWorkerThreads,
++            timeoutSecond,
++            TimeUnit.SECONDS,
++            threadsName));
++    poolArgs.processor(processor);
++    poolArgs.protocolFactory(protocolFactory);
++    poolArgs.transportFactory(RpcTransportFactory.INSTANCE);
++    return poolArgs;
++  }
++
    @SuppressWarnings("java:S2259")
--  public TServerTransport openTransport(String bindAddress, int port) throws TTransportException {
++  private TServerTransport openTransport(String bindAddress, int port) throws TTransportException {
      int maxRetry = 5;
      long retryIntervalMS = 5000;
      TTransportException lastExp = null;
@@@ -127,6 -127,6 +250,28 @@@
      throw lastExp;
    }
  
++  private TServerTransport openNonblockingTransport(
++      String bindAddress, int port, int connectionTimeoutInMS) throws TTransportException {
++    int maxRetry = 5;
++    long retryIntervalMS = 5000;
++    TTransportException lastExp = null;
++    for (int i = 0; i < maxRetry; i++) {
++      try {
++        return new TNonblockingServerSocket(
++            new InetSocketAddress(bindAddress, port), connectionTimeoutInMS);
++      } catch (TTransportException e) {
++        lastExp = e;
++        try {
++          Thread.sleep(retryIntervalMS);
++        } catch (InterruptedException interruptedException) {
++          Thread.currentThread().interrupt();
++          break;
++        }
++      }
++    }
++    throw lastExp;
++  }
++
    public void setThreadStopLatch(CountDownLatch threadStopLatch) {
      this.threadStopLatch = threadStopLatch;
    }
@@@ -177,4 -177,4 +322,9 @@@
      }
      return false;
    }
++
++  public static enum ServerType {
++    SELECTOR,
++    HSHA
++  }
  }
diff --cc server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
index 96ec8ef,96ec8ef..c7a1b9e
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
@@@ -29,7 -29,7 +29,6 @@@ import org.apache.iotdb.db.sync.receive
  import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogAnalyzer;
  import org.apache.iotdb.db.sync.receiver.transfer.SyncServiceImpl;
  import org.apache.iotdb.service.sync.thrift.SyncService;
--
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -61,6 -61,6 +60,7 @@@ public class SyncServerManager extends 
  
    @Override
    public void initTProcessor() {
++    initSyncedServiceImpl(null);
      serviceImpl = new SyncServiceImpl();
      processor = new SyncService.Processor<>(serviceImpl);
    }
@@@ -93,6 -93,6 +93,11 @@@
    }
  
    @Override
++  public int getRPCPort() {
++    return getBindPort();
++  }
++
++  @Override
    public void startService() throws StartupException {
      IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
      if (!config.isSyncEnable()) {