You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2021/01/14 10:59:13 UTC

[iotdb] 01/01: merge master

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f342899dec2f1d056c027e7d7ef9dc62d07906e5
Merge: 327eb7e e09b377
Author: lta <li...@163.com>
AuthorDate: Thu Jan 14 18:58:33 2021 +0800

    merge master

 docker/src/main/Dockerfile-0.11.0 => .dockerignore |  24 +-
 .github/pull_request_template.md                   |  37 +-
 .github/workflows/e2e.yml                          |  52 ++
 NOTICE                                             |   2 +-
 NOTICE-binary                                      |   2 +-
 README.md                                          |  15 +
 README_ZH.md                                       |  14 +-
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4   |  17 +-
 .../java/org/apache/iotdb/cli/AbstractCli.java     | 138 ++-
 cli/src/main/java/org/apache/iotdb/cli/Cli.java    |   2 +
 cli/src/main/java/org/apache/iotdb/cli/WinCli.java |   3 +
 .../org/apache/iotdb/cli/utils/IoTPrinter.java     | 107 +++
 .../main/java/org/apache/iotdb/tool/ImportCsv.java |  63 +-
 cluster/pom.xml                                    |   7 +
 cluster/src/assembly/resources/sbin/start-node.sh  |   2 +-
 .../java/org/apache/iotdb/cluster/ClusterMain.java |  20 +-
 .../apache/iotdb/cluster/RemoteTsFileResource.java |  41 +-
 .../cluster/client/async/AsyncClientPool.java      |  79 +-
 .../iotdb/cluster/client/sync/SyncClientPool.java  |  24 +-
 .../apache/iotdb/cluster/config/ClusterConfig.java |  14 +-
 .../iotdb/cluster/config/ClusterConstant.java      |  33 +-
 .../iotdb/cluster/coordinator/Coordinator.java     | 602 +++++++++++++
 .../apache/iotdb/cluster/log/LogDispatcher.java    |  16 +-
 .../cluster/log/applier/AsyncDataLogApplier.java   |   4 +-
 .../iotdb/cluster/log/applier/MetaLogApplier.java  |   8 +-
 .../iotdb/cluster/log/catchup/CatchUpTask.java     |   2 +-
 .../cluster/log/manage/CommittedEntryManager.java  |   5 +-
 .../log/manage/MetaSingleSnapshotLogManager.java   |   1 +
 .../iotdb/cluster/log/manage/RaftLogManager.java   |  26 +-
 .../log/manage/UnCommittedEntryManager.java        |  10 +-
 .../apache/iotdb/cluster/metadata/CMManager.java   |  10 +-
 .../cluster/query/manage/QueryCoordinator.java     | 115 +--
 .../apache/iotdb/cluster/server/ClientServer.java  |  28 +-
 .../iotdb/cluster/server/DataClusterServer.java    |   2 +-
 .../iotdb/cluster/server/MetaClusterServer.java    |  17 +-
 .../apache/iotdb/cluster/server/RaftServer.java    |   7 +-
 .../handlers/caller/AppendNodeEntryHandler.java    |   6 +-
 .../server/handlers/caller/HeartbeatHandler.java   |   2 +-
 .../cluster/server/heartbeat/HeartbeatThread.java  |   8 +-
 .../cluster/server/member/DataGroupMember.java     |  17 +-
 .../cluster/server/member/MetaGroupMember.java     | 198 +++--
 .../iotdb/cluster/server/member/RaftMember.java    | 130 ++-
 .../cluster/server/{ => monitor}/NodeReport.java   |   3 +-
 .../manage => server/monitor}/NodeStatus.java      |  41 +-
 .../monitor/NodeStatusManager.java}                |  87 +-
 .../iotdb/cluster/server/{ => monitor}/Peer.java   |   2 +-
 .../iotdb/cluster/server/{ => monitor}/Timer.java  |  15 +-
 .../cluster/server/service/MetaAsyncService.java   |   6 +
 .../cluster/server/service/MetaSyncService.java    |   5 +
 .../apache/iotdb/cluster/utils/ClusterUtils.java   |   9 +-
 .../cluster/utils/nodetool/ClusterMonitor.java     |   2 +-
 .../iotdb/cluster/common/EnvironmentUtils.java     | 218 -----
 .../org/apache/iotdb/cluster/common/IoTDBTest.java |   4 +-
 .../cluster/integration/BaseSingleNodeTest.java    |   2 +-
 .../iotdb/cluster/integration/SingleNodeTest.java  |   7 +-
 .../iotdb/cluster/log/CommitLogCallbackTest.java   |   2 +-
 .../iotdb/cluster/log/CommitLogTaskTest.java       |   2 +-
 .../iotdb/cluster/log/LogDispatcherTest.java       |   2 +-
 .../log/applier/AsyncDataLogApplierTest.java       |   2 +-
 .../cluster/log/applier/DataLogApplierTest.java    |  13 +-
 .../iotdb/cluster/log/catchup/CatchUpTaskTest.java |   9 +-
 .../cluster/log/catchup/LogCatchUpTaskTest.java    |   8 +-
 .../log/catchup/SnapshotCatchUpTaskTest.java       |   8 +-
 .../manage/MetaSingleSnapshotLogManagerTest.java   |   3 +
 .../cluster/log/manage/RaftLogManagerTest.java     |  22 +-
 .../cluster/log/snapshot/DataSnapshotTest.java     |  12 +-
 .../log/snapshot/MetaSimpleSnapshotTest.java       |   2 +
 .../cluster/log/snapshot/PullSnapshotTaskTest.java |   5 +
 .../iotdb/cluster/partition/SlotManagerTest.java   |   2 +-
 .../cluster/partition/SlotPartitionTableTest.java  |   2 +-
 .../apache/iotdb/cluster/query/BaseQueryTest.java  |  15 +-
 .../query/ClusterAggregateExecutorTest.java        |  48 +-
 .../query/ClusterDataQueryExecutorTest.java        |  22 +-
 .../cluster/query/ClusterFillExecutorTest.java     |  72 +-
 .../cluster/query/ClusterPlanExecutorTest.java     |   8 +-
 .../cluster/query/ClusterQueryRouterTest.java      | 216 ++---
 .../ClusterGroupByNoVFilterDataSetTest.java        |  64 +-
 .../groupby/ClusterGroupByVFilterDataSetTest.java  |  74 +-
 .../query/groupby/MergeGroupByExecutorTest.java    |  83 +-
 .../query/groupby/RemoteGroupByExecutorTest.java   | 146 ++--
 .../cluster/query/manage/QueryCoordinatorTest.java |  15 +-
 .../query/reader/ClusterTimeGeneratorTest.java     |  34 +-
 .../cluster/query/reader/DatasourceInfoTest.java   |  16 +-
 .../reader/RemoteSeriesReaderByTimestampTest.java  | 122 +--
 .../query/reader/RemoteSimpleSeriesReaderTest.java | 136 +--
 .../caller/AppendGroupEntryHandlerTest.java        |   2 +-
 .../caller/AppendNodeEntryHandlerTest.java         |   4 +-
 .../handlers/caller/ElectionHandlerTest.java       |   2 +-
 .../handlers/caller/HeartbeatHandlerTest.java      |   2 +-
 .../handlers/caller/LogCatchUpHandlerTest.java     |   2 +-
 .../server/heartbeat/DataHeartbeatThreadTest.java  |   5 +
 .../server/heartbeat/HeartbeatThreadTest.java      |  10 +-
 .../server/heartbeat/MetaHeartbeatThreadTest.java  |   5 +
 .../cluster/server/member/DataGroupMemberTest.java | 129 +--
 .../iotdb/cluster/server/member/MemberTest.java    |  35 +-
 .../cluster/server/member/MetaGroupMemberTest.java |  83 +-
 docker/src/main/Dockerfile                         |  46 +-
 docker/src/main/Dockerfile-0.10.0                  |   4 +-
 docker/src/main/Dockerfile-0.10.1                  |   4 +-
 docker/src/main/Dockerfile-0.11.0                  |   4 +-
 .../main/{Dockerfile-0.11.0 => Dockerfile-0.11.1}  |  10 +-
 .../main/{Dockerfile-0.11.0 => Dockerfile-0.11.2}  |  10 +-
 docs/UserGuide/Client/Programming - Native API.md  |  18 +
 docs/UserGuide/Client/Status Codes.md              |   2 +
 .../Ecosystem Integration/Zeppelin-IoTDB.md        |  20 +-
 docs/UserGuide/Operation Manual/Administration.md  |   2 +
 .../DML Data Manipulation Language.md              | 155 +++-
 .../Operation Manual/UDF User Defined Function.md  | 192 ++++-
 docs/UserGuide/Server/Config Manual.md             |  18 +
 docs/UserGuide/System Tools/CSV Tool.md            |   1 +
 docs/zh/UserGuide/Client/Command Line Interface.md |   2 +-
 .../UserGuide/Client/Programming - Native API.md   |  14 +
 docs/zh/UserGuide/Client/Status Codes.md           |   2 +
 .../Ecosystem Integration/Zeppelin-IoTDB.md        |  20 +-
 .../UserGuide/Operation Manual/Administration.md   |   2 +
 .../DML Data Manipulation Language.md              | 156 +++-
 .../Operation Manual/UDF User Defined Function.md  | 195 ++++-
 docs/zh/UserGuide/Server/Config Manual.md          |  17 +
 docs/zh/UserGuide/System Tools/CSV Tool.md         |   1 +
 .../main/java/org/apache/iotdb/SessionExample.java |  20 +-
 grafana/img/add_data_source.png                    | Bin 175851 -> 108927 bytes
 grafana/img/add_graph.png                          | Bin 723579 -> 364163 bytes
 grafana/img/edit_data_source.png                   | Bin 313673 -> 177869 bytes
 .../iotdb/jdbc/AbstractIoTDBJDBCResultSet.java     |   4 +-
 .../main/java/org/apache/iotdb/jdbc/Config.java    |  18 +-
 .../org/apache/iotdb/jdbc/IoTDBConnection.java     |  15 +-
 .../apache/iotdb/jdbc/IoTDBConnectionParams.java   |  20 +-
 .../org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java  |   4 +-
 .../iotdb/jdbc/IoTDBNonAlignJDBCResultSet.java     |   6 +-
 .../apache/iotdb/jdbc/IoTDBPreparedStatement.java  |   8 +-
 .../java/org/apache/iotdb/jdbc/IoTDBStatement.java |  29 +-
 .../src/main/java/org/apache/iotdb/jdbc/Utils.java |   7 +
 pom.xml                                            |   2 +-
 .../file-changelists/TsFileResource-changelist.md  |   8 +-
 .../resources/conf/iotdb-engine.properties         |  28 +
 server/src/assembly/resources/conf/logback.xml     |  25 +-
 server/src/assembly/resources/sbin/start-server.sh |   2 +-
 .../org/apache/iotdb/db/auth/AuthorityChecker.java |   5 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  43 +-
 .../org/apache/iotdb/db/conf/IoTDBConstant.java    |  14 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  16 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  37 +-
 .../db/engine/cache/TimeSeriesMetadataCache.java   |  32 +-
 .../compaction/CompactionMergeTaskPoolManager.java |  32 +-
 .../db/engine/compaction/TsFileManagement.java     |  38 +-
 .../level/LevelCompactionTsFileManagement.java     | 258 +++---
 .../no/NoCompactionTsFileManagement.java           |  10 +-
 .../engine/compaction/utils/CompactionLogger.java  |   2 +-
 .../engine/compaction/utils/CompactionUtils.java   |  12 +-
 .../apache/iotdb/db/engine/flush/FlushManager.java |  24 +-
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |  48 +-
 .../merge/selector/MaxFileMergeFileSelector.java   |  11 +-
 .../iotdb/db/engine/merge/task/MergeFileTask.java  | 138 ++-
 .../db/engine/merge/task/MergeMultiChunkTask.java  |   4 +-
 .../io/LocalTextModificationAccessor.java          |  61 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 220 ++---
 .../db/engine/storagegroup/TsFileProcessor.java    |  89 +-
 .../db/engine/storagegroup/TsFileResource.java     | 356 +++-----
 .../storagegroup/timeindex/DeviceTimeIndex.java    | 308 +++++++
 .../storagegroup/timeindex/FileTimeIndex.java      | 193 +++++
 .../engine/storagegroup/timeindex/ITimeIndex.java  | 138 +++
 .../storagegroup/timeindex/TimeIndexLevel.java}    |  43 +-
 .../apache/iotdb/db/exception/IoTDBException.java  |  21 +
 .../db/exception/PartitionViolationException.java  |   8 +-
 ...xception.java => QueryIdNotExsitException.java} |  10 +-
 .../iotdb/db/exception/StorageEngineException.java |   2 +-
 .../db/exception/UDFRegistrationException.java     |   7 +-
 .../iotdb/db/exception/WriteProcessException.java  |   4 +
 .../metadata/AliasAlreadyExistException.java       |   1 +
 .../exception/metadata/IllegalPathException.java   |   1 +
 .../db/exception/metadata/MetadataException.java   |   8 +
 .../metadata/PathAlreadyExistException.java        |   1 +
 .../exception/metadata/PathNotExistException.java  |  20 +-
 .../metadata/StorageGroupNotSetException.java      |   5 +
 .../db/exception/query/OutOfTTLException.java      |   2 +-
 .../db/exception/query/QueryProcessException.java  |   6 +-
 .../QueryTimeoutRuntimeException.java}             |  68 +-
 .../org/apache/iotdb/db/metadata/MManager.java     |  61 +-
 .../java/org/apache/iotdb/db/metadata/MTree.java   |  77 +-
 .../iotdb/db/metadata/logfile/MLogWriter.java      | 143 +---
 .../apache/iotdb/db/metrics/ui/MetricsPage.java    |   2 +-
 .../apache/iotdb/db/monitor/MonitorConstants.java  |   5 -
 .../org/apache/iotdb/db/monitor/StatMonitor.java   |   6 +-
 .../org/apache/iotdb/db/mqtt/PublishHandler.java   | 123 +--
 .../main/java/org/apache/iotdb/db/qp/Planner.java  |   1 +
 .../apache/iotdb/db/qp/constant/SQLConstant.java   |  12 +
 .../apache/iotdb/db/qp/executor/IPlanExecutor.java |   8 +
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  | 192 ++++-
 .../org/apache/iotdb/db/qp/logical/Operator.java   |   5 +-
 ...TracingOperator.java => KillQueryOperator.java} |  20 +-
 .../db/qp/logical/sys/RemoveFileOperator.java      |   5 -
 .../db/qp/logical/sys/ShowDevicesOperator.java     |  18 +
 .../iotdb/db/qp/logical/sys/TracingOperator.java   |  10 +-
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |   8 +-
 .../iotdb/db/qp/physical/crud/InsertRowPlan.java   |  25 +
 .../physical/crud/InsertRowsOfOneDevicePlan.java   | 154 ++++
 .../apache/iotdb/db/qp/physical/crud/UDFPlan.java  |   3 +-
 .../apache/iotdb/db/qp/physical/crud/UDTFPlan.java |   3 +-
 .../db/qp/physical/sys/AlterTimeSeriesPlan.java    |   2 +-
 .../iotdb/db/qp/physical/sys/AuthorPlan.java       |   2 +-
 .../qp/physical/sys/CreateMultiTimeSeriesPlan.java |   6 +-
 .../db/qp/physical/sys/CreateTimeSeriesPlan.java   |  14 +-
 .../{ShowDevicesPlan.java => KillQueryPlan.java}   |  25 +-
 .../iotdb/db/qp/physical/sys/ShowDevicesPlan.java  |  13 +-
 .../apache/iotdb/db/qp/physical/sys/ShowPlan.java  |  51 +-
 .../physical/sys/ShowQueryProcesslistPlan.java}    |  56 +-
 .../db/qp/physical/sys/ShowTimeSeriesPlan.java     |  56 +-
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |  34 +-
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    |  31 +-
 .../db/qp/{constant => utils}/DatetimeUtils.java   |  12 +-
 .../db/query/aggregation/AggregateResult.java      |   3 +-
 .../db/query/aggregation/impl/AvgAggrResult.java   |  10 +-
 .../db/query/aggregation/impl/SumAggrResult.java   |  10 +-
 .../iotdb/db/query/control/QueryFileManager.java   |   2 +
 .../db/query/control/QueryResourceManager.java     |   3 +
 .../iotdb/db/query/control/QueryTimeManager.java   | 172 ++++
 .../iotdb/db/query/control/TracingManager.java     |   2 +-
 .../apache/iotdb/db/query/dataset/ListDataSet.java |  12 +-
 .../db/query/dataset/NonAlignEngineDataSet.java    |  22 +
 .../dataset/RawQueryDataSetWithoutValueFilter.java |  39 +-
 .../apache/iotdb/db/query/dataset/ShowDataSet.java |  78 ++
 .../iotdb/db/query/dataset/ShowDevicesDataSet.java |  58 ++
 .../db/query/dataset/ShowTimeseriesDataSet.java    |  46 +-
 .../apache/iotdb/db/query/dataset/UDTFDataSet.java |   2 +-
 .../dataset/groupby/GroupByEngineDataSet.java      |   8 +-
 .../groupby/GroupByWithValueFilterDataSet.java     |   3 +-
 .../db/query/executor/AggregationExecutor.java     |   4 +-
 .../db/query/executor/RawDataQueryExecutor.java    |   4 +-
 .../iotdb/db/query/reader/series/SeriesReader.java |  14 +
 .../org/apache/iotdb/db/query/udf/api/UDF.java     |  15 +
 .../org/apache/iotdb/db/query/udf/api/UDTF.java    |  22 +-
 .../db/query/udf/api/collector/PointCollector.java |   4 +-
 .../api/customizer/config/UDTFConfigurations.java  |   3 +-
 .../parameter/UDFParameterValidator.java           | 209 +++++
 .../api/customizer/parameter/UDFParameters.java    |  32 +
 .../strategy/SlidingTimeWindowAccessStrategy.java  |   2 +-
 .../UDFAttributeNotProvidedException.java          |   9 +-
 .../udf/api/exception/UDFException.java}           |  62 +-
 .../UDFInputSeriesDataTypeNotValidException.java}  |  34 +-
 .../UDFInputSeriesIndexNotValidException.java}     |  60 +-
 .../UDFInputSeriesNumberNotValidException.java}    |  17 +-
 .../UDFParameterNotValidException.java}            |  56 +-
 .../db/query/udf/builtin/BuiltinFunction.java      |  76 ++
 .../iotdb/db/query/udf/builtin/UDTFAbs.java}       | 148 ++--
 .../udf/{api/UDF.java => builtin/UDTFAcos.java}    |  57 +-
 .../udf/{api/UDF.java => builtin/UDTFAsin.java}    |  57 +-
 .../udf/{api/UDF.java => builtin/UDTFAtan.java}    |  57 +-
 .../iotdb/db/query/udf/builtin/UDTFBottomK.java    | 105 +++
 .../udf/{api/UDF.java => builtin/UDTFCeil.java}    |  57 +-
 .../db/query/udf/builtin/UDTFCommonDerivative.java |  62 ++
 .../udf/builtin/UDTFCommonValueDifference.java     |  60 ++
 .../iotdb/db/query/udf/builtin/UDTFContains.java}  | 110 +--
 .../udf/{api/UDF.java => builtin/UDTFCos.java}     |  57 +-
 .../udf/{api/UDF.java => builtin/UDTFDegrees.java} |  57 +-
 .../db/query/udf/builtin/UDTFDerivative.java}      | 111 +--
 .../udf/{api/UDF.java => builtin/UDTFExp.java}     |  57 +-
 .../udf/{api/UDF.java => builtin/UDTFFloor.java}   |  57 +-
 .../udf/{api/UDF.java => builtin/UDTFLog.java}     |  57 +-
 .../udf/{api/UDF.java => builtin/UDTFLog10.java}   |  57 +-
 .../iotdb/db/query/udf/builtin/UDTFMatches.java}   | 111 +--
 .../iotdb/db/query/udf/builtin/UDTFMath.java       |  89 ++
 .../udf/builtin/UDTFNonNegativeDerivative.java     |  63 ++
 .../builtin/UDTFNonNegativeValueDifference.java    |  61 ++
 .../udf/{api/UDF.java => builtin/UDTFRadians.java} |  57 +-
 .../udf/{api/UDF.java => builtin/UDTFRound.java}   |  57 +-
 .../iotdb/db/query/udf/builtin/UDTFSelectK.java    | 156 ++++
 .../udf/{api/UDF.java => builtin/UDTFSign.java}    |  57 +-
 .../udf/{api/UDF.java => builtin/UDTFSin.java}     |  57 +-
 .../udf/{api/UDF.java => builtin/UDTFSqrt.java}    |  57 +-
 .../udf/{api/UDF.java => builtin/UDTFTan.java}     |  57 +-
 .../db/query/udf/builtin/UDTFTimeDifference.java}  | 110 +--
 .../iotdb/db/query/udf/builtin/UDTFTopK.java       | 103 +++
 .../db/query/udf/builtin/UDTFValueDifference.java} | 107 ++-
 .../iotdb/db/query/udf/builtin/UDTFValueTrend.java |  73 ++
 .../db/query/udf/core/executor/UDTFExecutor.java   |  35 +-
 .../udf/core/transformer/UDFQueryTransformer.java  |  18 +-
 .../query/udf/service/UDFClassLoaderManager.java   |   9 +-
 .../udf/service/UDFRegistrationInformation.java    |  25 +-
 .../query/udf/service/UDFRegistrationService.java  | 145 +++-
 .../apache/iotdb/db/rescon/MemTableManager.java    | 116 +++
 .../org/apache/iotdb/db/rescon/SystemInfo.java     |   1 +
 .../java/org/apache/iotdb/db/service/IoTDB.java    |   9 +-
 .../apache/iotdb/db/service/RegisterManager.java   |  17 +-
 .../org/apache/iotdb/db/service/ServiceType.java   |   1 +
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 948 +++++++++------------
 .../db/sync/receiver/load/FileLoaderManager.java   |   6 +-
 .../iotdb/db/sync/sender/transfer/SyncClient.java  |  41 +-
 .../apache/iotdb/db/tools/IoTDBDataDirViewer.java  |   4 +-
 .../iotdb/db/tools/TsFileResourcePrinter.java      |  11 +-
 .../db/tools/watermark/WatermarkDetector.java      |   2 +-
 .../org/apache/iotdb/db/utils/FilePathUtils.java   |   4 +
 .../org/apache/iotdb/db/utils/SchemaUtils.java     |   9 +-
 .../apache/iotdb/db/utils/TypeInferenceUtils.java  |   3 +
 .../main/resources/iotdb/ui/static/iotdb-logo.png  | Bin 1768 -> 1187 bytes
 .../compaction/LevelCompactionMergeTest.java       |   4 +-
 .../compaction/LevelCompactionRecoverTest.java     |  10 +-
 .../LevelCompactionTsFileManagementTest.java       |   1 -
 .../NoCompactionTsFileManagementTest.java          |   1 -
 .../engine/storagegroup/TsFileProcessorTest.java   |  14 +-
 .../iotdb/db/integration/IOTDBGroupByIT.java       |   1 -
 .../iotdb/db/integration/IoTDBClearCacheIT.java    |   2 +-
 .../iotdb/db/integration/IoTDBDeletionIT.java      |  35 +
 .../apache/iotdb/db/integration/IoTDBFillIT.java   |   6 +
 .../db/integration/IoTDBFlushQueryMergeIT.java     |   2 +-
 .../integration/IoTDBGroupByFillWithRangeIT.java   |   3 -
 .../iotdb/db/integration/IoTDBInsertNaNIT.java     |  46 +-
 .../iotdb/db/integration/IoTDBKillQueryTest.java   |  84 ++
 .../apache/iotdb/db/integration/IoTDBLastIT.java   |   1 -
 .../db/integration/IoTDBLevelCompactionIT.java     |   1 -
 .../db/integration/IoTDBLoadExternalTsfileIT.java  |  20 +-
 .../iotdb/db/integration/IoTDBMergeTest.java       |   5 +
 .../iotdb/db/integration/IoTDBMultiSeriesIT.java   |  26 +-
 .../db/integration/IoTDBQueryTimeoutTest.java      | 154 ++++
 .../iotdb/db/integration/IoTDBRestartIT.java       |  40 +-
 .../db/integration/IoTDBRpcCompressionIT.java      |   1 -
 .../iotdb/db/integration/IoTDBSensorUpdateIT.java  |   3 -
 .../iotdb/db/integration/IoTDBSimpleQueryIT.java   |  52 ++
 .../iotdb/db/integration/IoTDBUDFManagementIT.java | 163 +++-
 .../integration/IoTDBUDTFAlignByTimeQueryIT.java   | 213 +++++
 .../db/integration/IoTDBUDTFBuiltinFunctionIT.java | 250 ++++++
 .../db/integration/IoTDBUDTFHybridQueryIT.java     |   6 +-
 .../aggregation/IoTDBAggregationIT.java            |  24 +-
 .../aggregation/IoTDBAggregationSmallDataIT.java   |   4 +-
 .../org/apache/iotdb/db/metadata/MTreeTest.java    |  23 +
 .../iotdb/db/monitor/IoTDBStatMonitorTest.java     | 153 ++++
 .../java/org/apache/iotdb/db/qp/PlannerTest.java   |  59 +-
 .../qp/{plan => logical}/IndexLogicalPlanTest.java |   2 +-
 .../qp/{plan => logical}/LogicalPlanSmallTest.java |   3 +-
 .../qp/{plan => physical}/ConcatOptimizerTest.java |   3 +-
 .../IndexSubMatchingPhysicalPlanTest.java          |   3 +-
 .../IndexWholeMatchingPhysicalPlanTest.java        |   3 +-
 .../db/qp/physical/PhysicalPlanSerializeTest.java  | 305 +++++++
 .../db/qp/{plan => physical}/PhysicalPlanTest.java |   4 +-
 .../qp/{plan => physical}/SerializationTest.java   |   3 +-
 .../db/qp/sql/DatetimeQueryDataSetUtilsTest.java   | 142 ---
 .../IoTDBsqlVisitorTest.java}                      |   4 +-
 .../db/qp/utils/DatetimeQueryDataSetUtilsTest.java | 190 +++++
 .../iotdb/db/query/control/TracingManagerTest.java |  15 +-
 .../iotdb/db/query/dataset/ListDataSetTest.java    |   2 +-
 .../db/query/reader/series/SeriesReaderTest.java   |  22 +-
 .../iotdb/db/query/udf/example/Accumulator.java    |  16 +-
 .../apache/iotdb/db/query/udf/example/Adder.java   |  19 +-
 .../apache/iotdb/db/query/udf/example/Counter.java |   8 +-
 ...gSizeWindowConstructorTester1.java => Max.java} | 130 +--
 .../iotdb/db/query/udf/example/Multiplier.java     |  16 +-
 .../SlidingSizeWindowConstructorTester0.java       |   9 +-
 .../SlidingSizeWindowConstructorTester1.java       |  17 +-
 .../SlidingTimeWindowConstructionTester.java       |  17 +-
 ...onstructorTester1.java => TerminateTester.java} | 122 +--
 .../{Multiplier.java => ValidateTester.java}       | 100 +--
 .../db/sync/receiver/load/FileLoaderTest.java      |   8 +-
 .../recover/SyncReceiverLogAnalyzerTest.java       |   4 +-
 .../org/apache/iotdb/db/tools/MLogParserTest.java  |  35 +-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |  13 +-
 .../db/utils/datastructure/PrecisionTest.java      |  22 +-
 .../db/writelog/recover/DeviceStringTest.java      |   8 +-
 server/src/test/resources/iotdb-engine.properties  |   2 +
 server/src/test/resources/logback.xml              |   1 +
 .../java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java |   9 +-
 .../org/apache/iotdb/rpc/RpcTransportFactory.java  |  14 +-
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |   8 +-
 .../rpc/TCompressedElasticFramedTransport.java     |  45 +-
 .../apache/iotdb/rpc/TElasticFramedTransport.java  |  41 +-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 .../iotdb/rpc/TSnappyElasticFramedTransport.java   |   2 +-
 .../rpc/TimeoutChangeableTFastFramedTransport.java |  11 +-
 .../main/java/org/apache/iotdb/session/Config.java |  13 +-
 .../java/org/apache/iotdb/session/Session.java     | 176 +++-
 .../apache/iotdb/session/SessionConnection.java    |  34 +-
 .../org/apache/iotdb/session/SessionDataSet.java   |  12 +-
 .../org/apache/iotdb/session/pool/SessionPool.java |  81 ++
 .../iotdb/session/IoTDBSessionComplexIT.java       |   3 +-
 .../iotdb/session/IoTDBSessionIteratorIT.java      |  29 +
 .../apache/iotdb/session/IoTDBSessionSimpleIT.java | 130 +++
 .../public/img/contributor-avatar/jlq.png          | Bin 122303 -> 98500 bytes
 .../public/img/contributor-avatar/kfx.jpeg         | Bin 134257 -> 119029 bytes
 .../public/img/contributor-avatar/xdh.jpg          | Bin 136069 -> 123718 bytes
 .../.vuepress/public/img/contributor-avatar/yt.jpg | Bin 131028 -> 115804 bytes
 .../public/img/contributor-avatar/zss.jpg          | Bin 98085 -> 91455 bytes
 site/src/main/.vuepress/public/img/home-Slide1.png | Bin 438294 -> 323108 bytes
 site/src/main/.vuepress/public/img/home-Slide2.png | Bin 440893 -> 323620 bytes
 site/src/main/.vuepress/public/img/home-Slide3.png | Bin 441335 -> 324332 bytes
 site/src/main/.vuepress/public/img/home-icon2.png  | Bin 5529 -> 4463 bytes
 site/src/main/.vuepress/public/img/home-icon3.png  | Bin 20637 -> 10753 bytes
 site/src/main/.vuepress/public/img/home-icon4.png  | Bin 13225 -> 8681 bytes
 site/src/main/.vuepress/public/img/home-icon5.png  | Bin 1989 -> 1531 bytes
 site/src/main/.vuepress/public/img/home-icon6.png  | Bin 16502 -> 11537 bytes
 site/src/main/.vuepress/public/img/logo.png        | Bin 31747 -> 21687 bytes
 site/src/main/.vuepress/public/img/tools.jpg       | Bin 347602 -> 294103 bytes
 spark-iotdb-connector/Readme.md                    |  32 +-
 spark-tsfile/README.md                             |  31 +-
 .../e2e/base/docker-compose.yaml                   |  54 +-
 test/e2e/cases/README.md                           |  53 ++
 .../e2e/cases/cli/README.md                        |  20 +-
 .../e2e/cases/cli/cleanup.sh                       |  30 +-
 .../e2e/cases/cli/docker-compose.yaml              |  44 +-
 test/e2e/cases/cli/res/init.sql                    |  26 +
 .../Dockerfile-0.11.0 => test/e2e/cases/cli/run.sh |  51 +-
 thrift/src/main/thrift/cluster.thrift              |   7 +
 thrift/src/main/thrift/rpc.thrift                  |  15 +
 .../tsfile/encoding/bitpacking/IntPacker.java      |   4 +-
 .../tsfile/encoding/bitpacking/LongPacker.java     |   4 +-
 .../exception/QueryTimeoutRuntimeException.java    |  68 +-
 .../write/UnSupportedDataTypeException.java        |   4 +-
 .../iotdb/tsfile/read/TsFileSequenceReader.java    | 195 +++--
 .../iotdb/tsfile/read/reader/LocalTsFileInput.java |  20 +
 .../org/apache/iotdb/tsfile/utils/BytesUtils.java  |   4 +-
 .../apache/iotdb/tsfile/write/TsFileWriter.java    |  13 +-
 .../org/apache/iotdb/tsfile/read/ReadTest.java     |  22 +-
 .../iotdb/tsfile/read/TimePlainEncodeReadTest.java |   5 -
 .../tsfile/read/TimeSeriesMetadataReadTest.java    |  87 ++
 .../read/query/executor/QueryExecutorTest.java     |   1 -
 .../apache/iotdb/tsfile/utils/FileGenerator.java   |  22 +-
 .../tsfile/write/DefaultDeviceTemplateTest.java    | 110 +++
 zeppelin-interpreter/pom.xml                       |   2 +-
 .../apache/zeppelin/iotdb/IoTDBInterpreter.java    | 212 +++--
 .../zeppelin/iotdb/IoTDBInterpreterTest.java       | 108 ++-
 417 files changed, 12524 insertions(+), 5187 deletions(-)

diff --cc cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
index bcde426,39994bb..5266f4f
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
@@@ -299,10 -310,11 +312,11 @@@ public class ClusterMain 
        public int calculateSlotByPartitionNum(String storageGroupName, long partitionId,
            int maxSlotNum) {
          int sgSerialNum = extractSerialNumInSGName(storageGroupName) % k;
 -        if (sgSerialNum > 0) {
 -          return maxSlotNum / k * sgSerialNum;
 +        if (sgSerialNum >= 0) {
 +          return (int)(maxSlotNum / k * (sgSerialNum + 0.5));
          } else {
-           return defaultStrategy.calculateSlotByPartitionNum(storageGroupName, partitionId, maxSlotNum);
+           return defaultStrategy
+               .calculateSlotByPartitionNum(storageGroupName, partitionId, maxSlotNum);
          }
        }
  
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index 0000000,4e15c6f..dbfb9ff
mode 000000,100644..100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@@ -1,0 -1,598 +1,602 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ 
+ package org.apache.iotdb.cluster.coordinator;
+ 
+ import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+ import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+ import org.apache.iotdb.cluster.config.ClusterDescriptor;
+ import org.apache.iotdb.cluster.exception.CheckConsistencyException;
++import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
+ import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
+ import org.apache.iotdb.cluster.metadata.CMManager;
+ import org.apache.iotdb.cluster.partition.PartitionGroup;
+ import org.apache.iotdb.cluster.query.ClusterPlanRouter;
+ import org.apache.iotdb.cluster.rpc.thrift.Node;
++import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+ import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+ import org.apache.iotdb.cluster.server.RaftServer;
+ import org.apache.iotdb.cluster.server.monitor.Timer;
+ import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+ import org.apache.iotdb.cluster.utils.PartitionUtils;
+ import org.apache.iotdb.cluster.utils.StatusUtils;
+ import org.apache.iotdb.db.conf.IoTDBConstant;
+ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+ import org.apache.iotdb.db.exception.metadata.MetadataException;
+ import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+ import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+ import org.apache.iotdb.db.exception.query.QueryProcessException;
+ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+ import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
+ import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+ import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+ import org.apache.iotdb.db.service.IoTDB;
+ import org.apache.iotdb.rpc.RpcUtils;
+ import org.apache.iotdb.rpc.TSStatusCode;
+ import org.apache.iotdb.service.rpc.thrift.EndPoint;
+ import org.apache.iotdb.service.rpc.thrift.TSStatus;
+ import org.apache.thrift.TException;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.List;
+ import java.util.Map;
+ 
+ /**
+  * Coordinator of client non-query request
+  */
+ public class Coordinator {
+ 
+   private static final Logger logger = LoggerFactory.getLogger(Coordinator.class);
+ 
+   private MetaGroupMember metaGroupMember;
+ 
+   private String name;
+   private Node thisNode;
+   /**
+    * router calculates the partition groups that a partitioned plan should be sent to
+    */
+   private ClusterPlanRouter router;
+ 
+   private static final String MSG_MULTIPLE_ERROR = "The following errors occurred when executing "
+     + "the query, please retry or contact the DBA: ";
+ 
+   public Coordinator(MetaGroupMember metaGroupMember) {
+     this.metaGroupMember = metaGroupMember;
+     this.name = metaGroupMember.getName();
+     this.thisNode = metaGroupMember.getThisNode();
+   }
+ 
+   public Coordinator() {
+ 
+   }
+ 
+   public void setMetaGroupMember(MetaGroupMember metaGroupMember) {
+     this.metaGroupMember = metaGroupMember;
+     this.name = metaGroupMember.getName();
+     this.thisNode = metaGroupMember.getThisNode();
+   }
+ 
+   public void setRouter(ClusterPlanRouter router) {
+     this.router = router;
+   }
+ 
+   /**
+    * Execute a non-query plan. According to the type of the plan, the plan will be executed on all
+    * nodes (like timeseries deletion) or the nodes that belong to certain groups (like data
+    * ingestion).
+    *
+    * @param plan a non-query plan.
+    */
+   public TSStatus executeNonQueryPlan(PhysicalPlan plan) {
+     TSStatus result;
+     long startTime = Timer.Statistic.COORDINATOR_EXECUTE_NON_QUERY.getOperationStartTime();
+     if (PartitionUtils.isLocalNonQueryPlan(plan)) {
+       // run locally
+       result = executeNonQueryLocally(plan);
+     } else if (PartitionUtils.isGlobalMetaPlan(plan)) {
+       //forward the plan to all meta group nodes
+       result = metaGroupMember.processNonPartitionedMetaPlan(plan);
+     } else if (PartitionUtils.isGlobalDataPlan(plan)) {
+       //forward the plan to all data group nodes
+       result = processNonPartitionedDataPlan(plan);
+     } else {
+       //split the plan and forward them to some PartitionGroups
+       try {
+         result = processPartitionedPlan(plan);
+       } catch (UnsupportedPlanException e) {
+         return StatusUtils.getStatus(StatusUtils.UNSUPPORTED_OPERATION, e.getMessage());
+       }
+     }
+     Timer.Statistic.COORDINATOR_EXECUTE_NON_QUERY.calOperationCostTimeFromStart(startTime);
+     return result;
+   }
+ 
+   /**
+    * execute a non-query plan that is not necessary to be executed on other nodes.
+    */
+   private TSStatus executeNonQueryLocally(PhysicalPlan plan) {
+     boolean execRet;
+     try {
+       execRet = metaGroupMember.getLocalExecutor().processNonQuery(plan);
+     } catch (QueryProcessException e) {
+       if (e.getErrorCode() != TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) {
+         logger.debug("meet error while processing non-query. ", e);
+       } else {
+         logger.warn("meet error while processing non-query. ", e);
+       }
+       return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
+     } catch (Exception e) {
+       logger.error("{}: server Internal Error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+       return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
+     }
+ 
+     return execRet
+       ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully")
+       : RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
+   }
+ 
+   /**
+    * A non-partitioned plan (like DeleteData) should be executed on all data group nodes, so the
+    * DataGroupLeader should take the responsible to make sure that every node receives the plan.
+    * Thus the plan will be processed locally only by the DataGroupLeader and forwarded by non-leader
+    * nodes.
+    */
+   private TSStatus processNonPartitionedDataPlan(PhysicalPlan plan) {
+     if (plan instanceof DeleteTimeSeriesPlan || plan instanceof DeletePlan) {
+       try {
+         // as delete related plans may have abstract paths (paths with wildcards), we convert
+         // them to full paths so the executor nodes will not need to query the metadata holders,
+         // eliminating the risk that when they are querying the metadata holders, the timeseries
+         // has already been deleted
+         ((CMManager) IoTDB.metaManager).convertToFullPaths(plan);
+       } catch (PathNotExistException e) {
+         if (plan.getPaths().isEmpty()) {
+           // only reports an error when there is no matching path
+           return StatusUtils.getStatus(StatusUtils.TIMESERIES_NOT_EXIST_ERROR, e.getMessage());
+         }
+       }
+     }
+     try {
+       metaGroupMember.syncLeaderWithConsistencyCheck(true);
+       List<PartitionGroup> globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups();
+       logger.debug("Forwarding global data plan {} to {} groups", plan, globalGroups.size());
+       return forwardPlan(globalGroups, plan);
+     } catch (CheckConsistencyException e) {
+       logger.debug("Forwarding global data plan {} to meta leader {}", plan, metaGroupMember.getLeader());
+       metaGroupMember.waitLeader();
+       return metaGroupMember.forwardPlan(plan, metaGroupMember.getLeader(), null);
+     }
+   }
+ 
+ 
+   /**
+    * A partitioned plan (like batch insertion) will be split into several sub-plans, each belongs to
+    * a data group. And these sub-plans will be sent to and executed on the corresponding groups
+    * separately.
+    */
+   public TSStatus processPartitionedPlan(PhysicalPlan plan) throws UnsupportedPlanException {
+     logger.debug("{}: Received a partitioned plan {}", name, plan);
+     if (metaGroupMember.getPartitionTable() == null) {
+       logger.debug("{}: Partition table is not ready", name);
+       return StatusUtils.PARTITION_TABLE_NOT_READY;
+     }
+ 
+     // split the plan into sub-plans that each only involve one data group
+     Map<PhysicalPlan, PartitionGroup> planGroupMap;
+     try {
+       planGroupMap = splitPlan(plan);
+     } catch (CheckConsistencyException checkConsistencyException) {
+       return StatusUtils
+         .getStatus(StatusUtils.CONSISTENCY_FAILURE, checkConsistencyException.getMessage());
+     }
+ 
+     // the storage group is not found locally
+     if (planGroupMap == null || planGroupMap.isEmpty()) {
+       if ((plan instanceof InsertPlan
+         || plan instanceof CreateTimeSeriesPlan
+         || plan instanceof CreateMultiTimeSeriesPlan)
+         && ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
+         logger.debug("{}: No associated storage group found for {}, auto-creating", name, plan);
+         try {
+           ((CMManager) IoTDB.metaManager).createSchema(plan);
+           return processPartitionedPlan(plan);
+         } catch (MetadataException | CheckConsistencyException e) {
+           logger.error(
+             String.format("Failed to set storage group or create timeseries, because %s", e));
+         }
+       }
+       logger.error("{}: Cannot find storage groups for {}", name, plan);
+       return StatusUtils.NO_STORAGE_GROUP;
+     }
+     logger.debug("{}: The data groups of {} are {}", name, plan, planGroupMap);
+     return forwardPlan(planGroupMap, plan);
+   }
+ 
+   /**
+    * Forward a plan to all DataGroupMember groups. Only when all nodes time out, will a TIME_OUT be
+    * returned. The error messages from each group (if any) will be compacted into one string.
+    *
+    * @param partitionGroups
+    * @param plan
+    */
+   private TSStatus forwardPlan(List<PartitionGroup> partitionGroups, PhysicalPlan plan) {
+     // the error codes from the groups that cannot execute the plan
+     TSStatus status;
+     List<String> errorCodePartitionGroups = new ArrayList<>();
+     for (PartitionGroup partitionGroup : partitionGroups) {
+       if (partitionGroup.contains(thisNode)) {
+         // the query should be handled by a group the local node is in, handle it with in the group
+         logger.debug("Execute {} in a local group of {}", plan, partitionGroup.getHeader());
 -        status = metaGroupMember.getLocalDataMember(partitionGroup.getHeader())
 -          .executeNonQueryPlan(plan);
++        status = metaGroupMember.getLocalDataMember(new RaftNode(partitionGroup.getHeader(),
++            partitionGroup.getId())).executeNonQueryPlan(plan);
+       } else {
+         // forward the query to the group that should handle it
+         logger.debug("Forward {} to a remote group of {}", plan,
+           partitionGroup.getHeader());
+         status = forwardPlan(plan, partitionGroup);
+       }
+       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && (
+         !(plan instanceof DeleteTimeSeriesPlan) ||
+           status.getCode() != TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode())) {
+         // execution failed, record the error message
+         errorCodePartitionGroups.add(String.format("[%s@%s:%s]",
+           status.getCode(), partitionGroup.getHeader(),
+           status.getMessage()));
+       }
+     }
+     if (errorCodePartitionGroups.isEmpty()) {
+       status = StatusUtils.OK;
+     } else {
+       status = StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR,
+         MSG_MULTIPLE_ERROR + errorCodePartitionGroups.toString());
+     }
+     logger.debug("{}: executed {} with answer {}", name, plan, status);
+     return status;
+   }
+ 
+   /**
+    * split a plan into several sub-plans, each belongs to only one data group.
+    */
+   private Map<PhysicalPlan, PartitionGroup> splitPlan(PhysicalPlan plan)
+     throws UnsupportedPlanException, CheckConsistencyException {
+     Map<PhysicalPlan, PartitionGroup> planGroupMap = null;
+     try {
+       planGroupMap = router.splitAndRoutePlan(plan);
+     } catch (StorageGroupNotSetException e) {
+       // synchronize with the leader to see if this node has unpulled storage groups
+       metaGroupMember.syncLeaderWithConsistencyCheck(true);
+       try {
+         planGroupMap = router.splitAndRoutePlan(plan);
 -      } catch (MetadataException ex) {
++      } catch (MetadataException | UnknownLogTypeException ex) {
+         // ignore
+       }
 -    } catch (MetadataException e) {
++    } catch (MetadataException | UnknownLogTypeException e) {
+       logger.error("Cannot route plan {}", plan, e);
+     }
+     return planGroupMap;
+   }
+ 
+   /**
+    * Forward plans to the DataGroupMember of one node in the corresponding group. Only when all
+    * nodes time out, will a TIME_OUT be returned.
+    *
+    * @param planGroupMap sub-plan -> belong data group pairs
+    */
+   private TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan plan) {
+     // the error codes from the groups that cannot execute the plan
+     TSStatus status;
+     if (planGroupMap.size() == 1) {
+       status = forwardToSingleGroup(planGroupMap.entrySet().iterator().next());
+     } else {
+       if (plan instanceof InsertTabletPlan || plan instanceof CreateMultiTimeSeriesPlan) {
+         // InsertTabletPlan and CreateMultiTimeSeriesPlan contains many rows, each will correspond to a TSStatus as its
+         // execution result, as the plan is split and the sub-plans may have interleaving ranges,
+         // we must assure that each TSStatus is placed to the right position
+         // e.g., an InsertTabletPlan contains 3 rows, row1 and row3 belong to NodeA and row2
+         // belongs to NodeB, when NodeA returns a success while NodeB returns a failure, the
+         // failure and success should be placed into proper positions in TSStatus.subStatus
+         status = forwardMultiSubPlan(planGroupMap, plan);
+       } else {
+         status = forwardToMultipleGroup(planGroupMap);
+       }
+     }
+     if (plan instanceof InsertPlan
+       && status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
+       && ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
+       TSStatus tmpStatus = createTimeseriesForFailedInsertion(planGroupMap, ((InsertPlan) plan));
+       if (tmpStatus != null) {
+         status = tmpStatus;
+       }
+     }
+     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() && status
+       .isSetRedirectNode()) {
+       status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+     }
+     logger.debug("{}: executed {} with answer {}", name, plan, status);
+     return status;
+   }
+ 
+   private TSStatus createTimeseriesForFailedInsertion(
+     Map<PhysicalPlan, PartitionGroup> planGroupMap, InsertPlan plan) {
+     // try to create timeseries
+     if (plan.getFailedMeasurements() != null) {
+       plan.getPlanFromFailed();
+     }
+     boolean hasCreate;
+     try {
+       hasCreate = ((CMManager) IoTDB.metaManager).createTimeseries(plan);
+     } catch (IllegalPathException | CheckConsistencyException e) {
+       return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, e.getMessage());
+     }
+     if (hasCreate) {
+       return forwardPlan(planGroupMap, plan);
+     } else {
+       logger.error("{}, Cannot auto create timeseries.", thisNode);
+     }
+     return null;
+   }
+ 
+ 
+   private TSStatus forwardToSingleGroup(Map.Entry<PhysicalPlan, PartitionGroup> entry) {
+     TSStatus result;
+     if (entry.getValue().contains(thisNode)) {
+       // the query should be handled by a group the local node is in, handle it with in the group
+       long startTime = Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP
+         .getOperationStartTime();
+       logger.debug("Execute {} in a local group of {}", entry.getKey(),
+         entry.getValue().getHeader());
 -      result = metaGroupMember.getLocalDataMember(entry.getValue().getHeader())
++      result = metaGroupMember.getLocalDataMember(new RaftNode(entry.getValue().getHeader(), entry.getValue().getId()))
+         .executeNonQueryPlan(entry.getKey());
+       Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP
+         .calOperationCostTimeFromStart(startTime);
+     } else {
+       // forward the query to the group that should handle it
+       long startTime = Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_REMOTE_GROUP
+         .getOperationStartTime();
+       logger.debug("Forward {} to a remote group of {}", entry.getKey(),
+         entry.getValue().getHeader());
+       result = forwardPlan(entry.getKey(), entry.getValue());
+       Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_REMOTE_GROUP
+         .calOperationCostTimeFromStart(startTime);
+     }
+     return result;
+   }
+ 
+   /**
+    * forward each sub-plan to its corresponding data group, if some groups goes wrong, the error
+    * messages from each group will be compacted into one string.
+    *
+    * @param planGroupMap sub-plan -> data group pairs
+    */
+   private TSStatus forwardToMultipleGroup(Map<PhysicalPlan, PartitionGroup> planGroupMap) {
+     List<String> errorCodePartitionGroups = new ArrayList<>();
+     TSStatus tmpStatus;
+     boolean allRedirect = true;
+     EndPoint endPoint = null;
+     for (Map.Entry<PhysicalPlan, PartitionGroup> entry : planGroupMap.entrySet()) {
+       tmpStatus = forwardToSingleGroup(entry);
+       if (tmpStatus.isSetRedirectNode()) {
+         endPoint = tmpStatus.getRedirectNode();
+       } else {
+         allRedirect = false;
+       }
+       if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+         // execution failed, record the error message
+         errorCodePartitionGroups.add(String.format("[%s@%s:%s]",
+           tmpStatus.getCode(), entry.getValue().getHeader(),
+           tmpStatus.getMessage()));
+       }
+     }
+     TSStatus status;
+     if (errorCodePartitionGroups.isEmpty()) {
 -      status = StatusUtils.OK;
+       if (allRedirect) {
 -        status = StatusUtils.getStatus(status, endPoint);
++        status = new TSStatus();
++        status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
++      } else {
++        status = StatusUtils.OK;
+       }
+     } else {
+       status = StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR,
+         MSG_MULTIPLE_ERROR + errorCodePartitionGroups.toString());
+     }
+     return status;
+   }
+ 
+ 
+   /**
+    * Forward each sub-plan to its belonging data group, and combine responses from the groups.
+    *
+    * @param planGroupMap sub-plan -> data group pairs
+    */
+   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
+   private TSStatus forwardMultiSubPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap,
+                                        PhysicalPlan parentPlan) {
+     List<String> errorCodePartitionGroups = new ArrayList<>();
+     TSStatus tmpStatus;
+     TSStatus[] subStatus = null;
+     boolean noFailure = true;
+     boolean isBatchFailure = false;
+     EndPoint endPoint = null;
+     int totalRowNum = 0;
+     // send sub-plans to each belonging data group and collect results
+     for (Map.Entry<PhysicalPlan, PartitionGroup> entry : planGroupMap.entrySet()) {
+       tmpStatus = forwardToSingleGroup(entry);
+       logger.debug("{}: from {},{},{}", name, entry.getKey(), entry.getValue(), tmpStatus);
+       noFailure =
+         (tmpStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) && noFailure;
+       isBatchFailure = (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode())
+         || isBatchFailure;
+       if (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+         if (parentPlan instanceof InsertTabletPlan) {
+           totalRowNum = ((InsertTabletPlan) parentPlan).getRowCount();
+         } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
+           totalRowNum = ((CreateMultiTimeSeriesPlan) parentPlan).getIndexes().size();
+         }
+         if (subStatus == null) {
+           subStatus = new TSStatus[totalRowNum];
+           Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
+         }
+         // set the status from one group to the proper positions of the overall status
+         if (parentPlan instanceof InsertTabletPlan) {
+           PartitionUtils.reordering((InsertTabletPlan) entry.getKey(), subStatus,
+             tmpStatus.subStatus.toArray(new TSStatus[]{}));
+         } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
+           CreateMultiTimeSeriesPlan subPlan = (CreateMultiTimeSeriesPlan) entry.getKey();
+           for (int i = 0; i < subPlan.getIndexes().size(); i++) {
+             subStatus[subPlan.getIndexes().get(i)] = tmpStatus.subStatus.get(i);
+           }
+         }
+       }
+       if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+         // execution failed, record the error message
+         errorCodePartitionGroups.add(String.format("[%s@%s:%s:%s]",
+           tmpStatus.getCode(), entry.getValue().getHeader(),
+           tmpStatus.getMessage(), tmpStatus.subStatus));
+       }
+       if (parentPlan instanceof InsertTabletPlan && tmpStatus.isSetRedirectNode() &&
+         ((InsertTabletPlan) entry.getKey()).getMaxTime() == ((InsertTabletPlan) parentPlan)
+           .getMaxTime()) {
+         endPoint = tmpStatus.getRedirectNode();
+       }
+     }
+ 
+     if (parentPlan instanceof CreateMultiTimeSeriesPlan &&
+       !((CreateMultiTimeSeriesPlan) parentPlan).getResults().isEmpty()) {
+       if (subStatus == null) {
+         subStatus = new TSStatus[totalRowNum];
+         Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
+       }
+       noFailure = false;
+       isBatchFailure = true;
+       for (Map.Entry<Integer, TSStatus> integerTSStatusEntry : ((CreateMultiTimeSeriesPlan) parentPlan)
+         .getResults().entrySet()) {
+         subStatus[integerTSStatusEntry.getKey()] = integerTSStatusEntry.getValue();
+       }
+     }
+     return concludeFinalStatus(noFailure, endPoint, isBatchFailure, subStatus,
+       errorCodePartitionGroups);
+   }
+ 
+   private TSStatus concludeFinalStatus(boolean noFailure, EndPoint endPoint,
+                                        boolean isBatchFailure, TSStatus[] subStatus,
+                                        List<String> errorCodePartitionGroups) {
+     TSStatus status;
+     if (noFailure) {
+       status = StatusUtils.OK;
+       if (endPoint != null) {
+         status = StatusUtils.getStatus(status, endPoint);
+       }
+     } else if (isBatchFailure) {
+       status = RpcUtils.getStatus(Arrays.asList(subStatus));
+     } else {
+       status = StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR,
+         MSG_MULTIPLE_ERROR + errorCodePartitionGroups.toString());
+     }
+     return status;
+   }
+ 
+ 
+   /**
+    * Forward a plan to the DataGroupMember of one node in the group. Only when all nodes time out,
+    * will a TIME_OUT be returned.
+    */
+   private TSStatus forwardPlan(PhysicalPlan plan, PartitionGroup group) {
+     for (Node node : group) {
+       TSStatus status;
+       try {
+         // only data plans are partitioned, so it must be processed by its data server instead of
+         // meta server
+         if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+           status = forwardDataPlanAsync(plan, node, group.getHeader());
+         } else {
+           status = forwardDataPlanSync(plan, node, group.getHeader());
+         }
+       } catch (IOException e) {
+         status = StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, e.getMessage());
+       }
+       if (!StatusUtils.TIME_OUT.equals(status)) {
+         if (!status.isSetRedirectNode()) {
+           status.setRedirectNode(new EndPoint(node.getIp(), node.getClientPort()));
+         }
+         return status;
+       } else {
+         logger.warn("Forward {} to {} timed out", plan, node);
+       }
+     }
+     logger.warn("Forward {} to {} timed out", plan, group);
+     return StatusUtils.TIME_OUT;
+   }
+ 
+   /**
+    * Forward a non-query plan to the data port of "receiver"
+    *
+    * @param plan   a non-query plan
+    * @param header to determine which DataGroupMember of "receiver" will process the request.
+    * @return a TSStatus indicating if the forwarding is successful.
+    */
+   private TSStatus forwardDataPlanAsync(PhysicalPlan plan, Node receiver, Node header)
+     throws IOException {
+     RaftService.AsyncClient client = metaGroupMember.getClientProvider().getAsyncDataClient(receiver,
+       RaftServer.getWriteOperationTimeoutMS());
+     return this.metaGroupMember.forwardPlanAsync(plan, receiver, header, client);
+   }
+ 
+   private TSStatus forwardDataPlanSync(PhysicalPlan plan, Node receiver, Node header)
+     throws IOException {
+     RaftService.Client client = null;
+     try {
+       client = metaGroupMember.getClientProvider().getSyncDataClient(receiver,
+         RaftServer.getWriteOperationTimeoutMS());
+     } catch (TException e) {
+       throw new IOException(e);
+     }
+     return this.metaGroupMember.forwardPlanSync(plan, receiver, header, client);
+   }
+ 
+   /**
+    * Get a thrift client that will connect to "node" using the data port.
+    *
+    * @param node    the node to be connected
+    * @param timeout timeout threshold of connection
+    */
+   public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
+     return metaGroupMember.getClientProvider().getAsyncDataClient(node, timeout);
+   }
+ 
+   public Node getThisNode() {
+     return thisNode;
+   }
+ 
+   /**
+    * Get a thrift client that will connect to "node" using the data port.
+    *
+    * @param node    the node to be connected
+    * @param timeout timeout threshold of connection
+    */
+   public SyncDataClient getSyncDataClient(Node node, int timeout) throws TException {
+     return metaGroupMember.getClientProvider().getSyncDataClient(node, timeout);
+   }
+ }
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
index aaa03a4,d7dd5f9..705000f
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
@@@ -19,7 -19,6 +19,8 @@@
  
  package org.apache.iotdb.cluster.log.applier;
  
 +import org.apache.iotdb.cluster.exception.ChangeMembershipException;
++import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
  import org.apache.iotdb.cluster.log.Log;
  import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
  import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
@@@ -70,19 -60,11 +71,20 @@@ public class MetaLogApplier extends Bas
        } else {
          logger.error("Unsupported log: {} {}", log.getClass().getName(), log);
        }
-     } catch (StorageEngineException | StorageGroupNotSetException | QueryProcessException | ChangeMembershipException e) {
 -    } catch (StorageEngineException | StorageGroupNotSetException | QueryProcessException e) {
++    } catch (StorageEngineException | StorageGroupNotSetException | QueryProcessException | ChangeMembershipException | UnsupportedPlanException e) {
        logger.debug("Exception occurred when executing {}", log, e);
        log.setException(e);
      } finally {
        log.setApplied(true);
      }
    }
 +
-   private void sendLogToAllDataGroups(Log log) throws ChangeMembershipException {
++  private void sendLogToAllDataGroups(Log log)
++      throws ChangeMembershipException, UnsupportedPlanException {
 +    LogPlan plan = new LogPlan(log.serialize());
-     TSStatus status = member.executeNonQueryPlan(plan);
++    TSStatus status = member.processPartitionedPlan(plan);
 +    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
 +      throw new ChangeMembershipException(String.format("apply %s failed with status {%s}", log, status));
 +    }
 +  }
  }
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
index 1e86e11,ff650e3..c92a482
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
@@@ -85,19 -81,4 +85,20 @@@ public class MetaSingleSnapshotLogManag
      snapshot.setLastLogTerm(term);
      return snapshot;
    }
 +
 +  @Override
 +  void applyEntries(List<Log> entries) {
 +    for (Log entry : entries) {
 +      if (blockAppliedCommitIndex > 0 && entry.getCurrLogIndex() > blockAppliedCommitIndex) {
 +        blockedUnappliedLogList.add(entry);
 +        continue;
 +      }
 +      try {
 +        ((MetaLogApplier)logApplier).apply(entry, metaGroupMember.getCharacter() == NodeCharacter.LEADER);
 +      } catch (Exception e) {
++        logger.error("Can not apply log {}", entry, e);
 +        entry.setException(e);
 +      }
 +    }
 +  }
  }
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index c57ee06,c27cf9e..f774567
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@@ -114,9 -114,9 +114,9 @@@ public abstract class RaftLogManager 
     * Each time new logs are appended, this condition will be notified so logs that have larger
     * indices but arrived earlier can proceed.
     */
-   private final Object logUpdateCondition = new Object();
+   private final Object[] logUpdateConditions = new Object[1024];
  
 -  private List<Log> blockedUnappliedLogList;
 +  protected List<Log> blockedUnappliedLogList;
  
    protected RaftLogManager(StableEntryManager stableEntryManager, LogApplier applier, String name) {
      this.logApplier = applier;
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
index c1ab3e5,dd63e2f..fe9b8f7
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
@@@ -303,15 -306,13 +307,13 @@@ public class ClientServer extends TSSer
            GenericHandler<Void> handler = new GenericHandler<>(queriedNode, new AtomicReference<>());
            try {
              if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
-               AsyncDataClient client = metaGroupMember
-                   .getClientProvider().getAsyncDataClient(queriedNode,
+               AsyncDataClient client = coordinator.getAsyncDataClient(queriedNode,
 -                  RaftServer.getReadOperationTimeoutMS());
 -              client.endQuery(header, coordinator.getThisNode(), queryId, handler);
 +                      RaftServer.getReadOperationTimeoutMS());
-               client.endQuery(header.getNode(), header.getRaftId(), metaGroupMember.getThisNode(), queryId, handler);
++              client.endQuery(header.getNode(), header.getRaftId(), coordinator.getThisNode(), queryId, handler);
              } else {
-               SyncDataClient syncDataClient = metaGroupMember
-                   .getClientProvider().getSyncDataClient(queriedNode,
+               SyncDataClient syncDataClient = coordinator.getSyncDataClient(queriedNode,
 -                  RaftServer.getReadOperationTimeoutMS());
 -              syncDataClient.endQuery(header, coordinator.getThisNode(), queryId);
 +                      RaftServer.getReadOperationTimeoutMS());
-               syncDataClient.endQuery(header.getNode(), header.getRaftId(), metaGroupMember.getThisNode(), queryId);
++              syncDataClient.endQuery(header.getNode(), header.getRaftId(), coordinator.getThisNode(), queryId);
              }
            } catch (IOException | TException e) {
              logger.error("Cannot end query {} in {}", queryId, queriedNode);
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index d198039,8df621d..f97d181
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@@ -339,8 -344,18 +344,18 @@@ public class MetaClusterServer extends 
    }
  
    @Override
 -  public void removeHardLink(String hardLinkPath,
 +  public void removeHardLink(String hardLinkPath, int raftId,
        AsyncMethodCallback<Void> resultHandler) {
 -    asyncService.removeHardLink(hardLinkPath, resultHandler);
 +    asyncService.removeHardLink(hardLinkPath, raftId, resultHandler);
    }
+ 
+   @Override
+   public void handshake(Node sender) {
+     syncService.handshake(sender);
+   }
+ 
+   @Override
+   public void handshake(Node sender, AsyncMethodCallback<Void> resultHandler) {
+     asyncService.handshake(sender, resultHandler);
+   }
  }
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index df45b53,f8811e9..7b4f663
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@@ -75,15 -75,15 +75,16 @@@ import org.apache.iotdb.cluster.rpc.thr
  import org.apache.iotdb.cluster.rpc.thrift.Node;
  import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
  import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
 +import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
  import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
  import org.apache.iotdb.cluster.server.NodeCharacter;
- import org.apache.iotdb.cluster.server.NodeReport.DataMemberReport;
- import org.apache.iotdb.cluster.server.Peer;
+ import org.apache.iotdb.cluster.server.monitor.NodeReport.DataMemberReport;
+ import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
+ import org.apache.iotdb.cluster.server.monitor.Peer;
  import org.apache.iotdb.cluster.server.PullSnapshotHintService;
  import org.apache.iotdb.cluster.server.Response;
- import org.apache.iotdb.cluster.server.Timer;
- import org.apache.iotdb.cluster.server.Timer.Statistic;
+ import org.apache.iotdb.cluster.server.monitor.Timer;
+ import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
  import org.apache.iotdb.cluster.server.heartbeat.DataHeartbeatThread;
  import org.apache.iotdb.cluster.utils.StatusUtils;
  import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@@ -824,8 -768,8 +825,8 @@@ public class DataGroupMember extends Ra
      lastReportedLogIndex = logManager.getLastLogIndex();
      return new DataMemberReport(character, leader.get(), term.get(),
          logManager.getLastLogTerm(), lastReportedLogIndex, logManager.getCommitLogIndex(),
 -        logManager.getCommitLogTerm(), getHeader(), readOnly,
 +        logManager.getCommitLogTerm(), getHeader(), getRaftGroupId(), readOnly,
-         QueryCoordinator.getINSTANCE()
+         NodeStatusManager.getINSTANCE()
              .getLastResponseLatency(getHeader()), lastHeartbeatReceivedTime, prevLastLogIndex,
          logManager.getMaxHaveAppliedCommitIndex());
    }
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 1534323,74b9d3b..5484bb5
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@@ -19,42 -19,6 +19,7 @@@
  
  package org.apache.iotdb.cluster.server.member;
  
- import static org.apache.iotdb.cluster.utils.ClusterUtils.WAIT_START_UP_CHECK_TIME_SEC;
- import static org.apache.iotdb.cluster.utils.ClusterUtils.analyseStartUpCheckResult;
- 
- import java.io.BufferedInputStream;
- import java.io.BufferedOutputStream;
- import java.io.BufferedReader;
- import java.io.BufferedWriter;
- import java.io.DataInputStream;
- import java.io.DataOutputStream;
- import java.io.File;
- import java.io.FileInputStream;
- import java.io.FileOutputStream;
- import java.io.FileReader;
- import java.io.FileWriter;
- import java.io.IOException;
- import java.nio.ByteBuffer;
- import java.nio.file.Files;
- import java.nio.file.Paths;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.Collection;
- import java.util.HashMap;
- import java.util.HashSet;
- import java.util.List;
- import java.util.Map;
 +import java.util.Map.Entry;
- import java.util.Objects;
- import java.util.Set;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.ScheduledThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.atomic.AtomicBoolean;
- import java.util.concurrent.atomic.AtomicInteger;
- import java.util.concurrent.atomic.AtomicLong;
  import org.apache.iotdb.cluster.client.DataClientProvider;
  import org.apache.iotdb.cluster.client.async.AsyncClientPool;
  import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
@@@ -65,17 -29,15 +30,17 @@@ import org.apache.iotdb.cluster.client.
  import org.apache.iotdb.cluster.client.sync.SyncMetaHeartbeatClient;
  import org.apache.iotdb.cluster.config.ClusterConstant;
  import org.apache.iotdb.cluster.config.ClusterDescriptor;
+ import org.apache.iotdb.cluster.coordinator.Coordinator;
  import org.apache.iotdb.cluster.exception.AddSelfException;
 +import org.apache.iotdb.cluster.exception.CheckConsistencyException;
  import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
  import org.apache.iotdb.cluster.exception.EmptyIntervalException;
  import org.apache.iotdb.cluster.exception.LogExecutionException;
  import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
  import org.apache.iotdb.cluster.exception.SnapshotInstallationException;
  import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
 -import org.apache.iotdb.cluster.log.Log;
 +import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 +import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
- import org.apache.iotdb.cluster.log.Log;
  import org.apache.iotdb.cluster.log.LogApplier;
  import org.apache.iotdb.cluster.log.applier.MetaLogApplier;
  import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
@@@ -89,9 -50,8 +54,7 @@@ import org.apache.iotdb.cluster.partiti
  import org.apache.iotdb.cluster.partition.PartitionTable;
  import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
  import org.apache.iotdb.cluster.query.ClusterPlanRouter;
- import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
  import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
--import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
  import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
  import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
  import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
@@@ -107,12 -64,9 +70,8 @@@ import org.apache.iotdb.cluster.server.
  import org.apache.iotdb.cluster.server.DataClusterServer;
  import org.apache.iotdb.cluster.server.HardLinkCleaner;
  import org.apache.iotdb.cluster.server.NodeCharacter;
- import org.apache.iotdb.cluster.server.NodeReport;
- import org.apache.iotdb.cluster.server.NodeReport.MetaMemberReport;
  import org.apache.iotdb.cluster.server.RaftServer;
  import org.apache.iotdb.cluster.server.Response;
- import org.apache.iotdb.cluster.server.Timer;
--import org.apache.iotdb.cluster.server.handlers.caller.AppendGroupEntryHandler;
  import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
  import org.apache.iotdb.cluster.server.handlers.caller.NodeStatusHandler;
  import org.apache.iotdb.cluster.server.heartbeat.DataHeartbeatServer;
@@@ -155,6 -101,42 +117,40 @@@ import org.apache.thrift.transport.TTra
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
+ import java.io.BufferedInputStream;
+ import java.io.BufferedOutputStream;
+ import java.io.BufferedReader;
+ import java.io.BufferedWriter;
+ import java.io.DataInputStream;
+ import java.io.DataOutputStream;
+ import java.io.File;
+ import java.io.FileInputStream;
+ import java.io.FileOutputStream;
+ import java.io.FileReader;
+ import java.io.FileWriter;
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+ import java.nio.file.Files;
+ import java.nio.file.Paths;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Objects;
+ import java.util.Set;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.ScheduledExecutorService;
+ import java.util.concurrent.ScheduledThreadPoolExecutor;
+ import java.util.concurrent.TimeUnit;
 -import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicInteger;
 -import java.util.concurrent.atomic.AtomicLong;
+ 
+ import static org.apache.iotdb.cluster.utils.ClusterUtils.WAIT_START_UP_CHECK_TIME_SEC;
+ import static org.apache.iotdb.cluster.utils.ClusterUtils.analyseStartUpCheckResult;
+ 
  @SuppressWarnings("java:S1135")
  public class MetaGroupMember extends RaftMember {
  
@@@ -172,9 -154,7 +168,10 @@@
     * in case of data loss, some file changes would be made to a temporary file first
     */
    private static final String TEMP_SUFFIX = ".tmp";
+ 
 +  private static final String MSG_MULTIPLE_ERROR = "The following errors occurred when executing "
 +      + "the query, please retry or contact the DBA: ";
 +
    private static final Logger logger = LoggerFactory.getLogger(MetaGroupMember.class);
    /**
     * when joining a cluster this node will retry at most "DEFAULT_JOIN_RETRY" times before returning
@@@ -265,12 -264,12 +276,12 @@@
    public MetaGroupMember() {
    }
  
-   public MetaGroupMember(TProtocolFactory factory, Node thisNode) throws QueryProcessException {
+   public MetaGroupMember(TProtocolFactory factory, Node thisNode, Coordinator coordinator) throws QueryProcessException {
      super("Meta", new AsyncClientPool(new AsyncMetaClient.FactoryAsync(factory)),
          new SyncClientPool(new SyncMetaClient.FactorySync(factory)),
-         new AsyncClientPool(new AsyncMetaHeartbeatClient.FactoryAsync(factory), false),
+         new AsyncClientPool(new AsyncMetaHeartbeatClient.FactoryAsync(factory)),
          new SyncClientPool(new SyncMetaHeartbeatClient.FactorySync(factory)));
 -    allNodes = new ArrayList<>();
 +    allNodes = new PartitionGroup();
      initPeerMap();
  
      dataClientProvider = new DataClientProvider(factory);
@@@ -1308,415 -1402,6 +1319,415 @@@
    }
  
    /**
 +   * A non-partitioned plan (like DeleteData) should be executed on all data group nodes, so the
 +   * DataGroupLeader should take the responsible to make sure that every node receives the plan.
 +   * Thus the plan will be processed locally only by the DataGroupLeader and forwarded by non-leader
 +   * nodes.
 +   */
 +  private TSStatus processNonPartitionedDataPlan(PhysicalPlan plan) {
 +    if (plan instanceof DeleteTimeSeriesPlan || plan instanceof DeletePlan) {
 +      try {
 +        // as delete related plans may have abstract paths (paths with wildcards), we convert
 +        // them to full paths so the executor nodes will not need to query the metadata holders,
 +        // eliminating the risk that when they are querying the metadata holders, the timeseries
 +        // has already been deleted
 +        ((CMManager) IoTDB.metaManager).convertToFullPaths(plan);
 +      } catch (PathNotExistException e) {
 +        if (plan.getPaths().isEmpty()) {
 +          // only reports an error when there is no matching path
 +          return StatusUtils.getStatus(StatusUtils.TIMESERIES_NOT_EXIST_ERROR, e.getMessage());
 +        }
 +      }
 +    }
 +    try {
 +      syncLeaderWithConsistencyCheck(true);
 +      List<PartitionGroup> globalGroups = partitionTable.getGlobalGroups();
 +      logger.debug("Forwarding global data plan {} to {} groups", plan, globalGroups.size());
 +      return forwardPlan(globalGroups, plan);
 +    } catch (CheckConsistencyException e) {
 +      logger.debug("Forwarding global data plan {} to meta leader {}", plan, leader.get());
 +      waitLeader();
 +      return forwardPlan(plan, leader.get(), null);
 +    }
 +  }
 +
 +  /**
 +   * A partitioned plan (like batch insertion) will be split into several sub-plans, each belongs to
 +   * a data group. And these sub-plans will be sent to and executed on the corresponding groups
 +   * separately.
 +   */
 +  public TSStatus processPartitionedPlan(PhysicalPlan plan) throws UnsupportedPlanException {
 +    logger.debug("{}: Received a partitioned plan {}", name, plan);
 +    if (partitionTable == null) {
 +      logger.debug("{}: Partition table is not ready", name);
 +      return StatusUtils.PARTITION_TABLE_NOT_READY;
 +    }
 +
 +    // split the plan into sub-plans that each only involve one data group
 +    Map<PhysicalPlan, PartitionGroup> planGroupMap;
 +    try {
 +      planGroupMap = splitPlan(plan);
 +    } catch (CheckConsistencyException checkConsistencyException) {
 +      return StatusUtils
 +          .getStatus(StatusUtils.CONSISTENCY_FAILURE, checkConsistencyException.getMessage());
 +    }
 +
 +    // the storage group is not found locally
 +    if (planGroupMap == null || planGroupMap.isEmpty()) {
 +      if ((plan instanceof InsertPlan || plan instanceof CreateTimeSeriesPlan
 +          || plan instanceof CreateMultiTimeSeriesPlan)
 +          && config.isEnableAutoCreateSchema()) {
 +        logger.debug("{}: No associated storage group found for {}, auto-creating", name, plan);
 +        try {
 +          ((CMManager) IoTDB.metaManager).createSchema(plan);
 +          return processPartitionedPlan(plan);
 +        } catch (MetadataException | CheckConsistencyException e) {
 +          logger.error(
 +              String.format("Failed to set storage group or create timeseries, because %s", e));
 +        }
 +      }
 +      logger.error("{}: Cannot find storage groups for {}", name, plan);
 +      return StatusUtils.NO_STORAGE_GROUP;
 +    }
 +    logger.debug("{}: The data groups of {} are {}", name, plan, planGroupMap);
 +    return forwardPlan(planGroupMap, plan);
 +  }
 +
 +  /**
 +   * split a plan into several sub-plans, each belongs to only one data group.
 +   */
 +  private Map<PhysicalPlan, PartitionGroup> splitPlan(PhysicalPlan plan)
 +      throws UnsupportedPlanException, CheckConsistencyException {
 +    Map<PhysicalPlan, PartitionGroup> planGroupMap = null;
 +    try {
 +      planGroupMap = router.splitAndRoutePlan(plan);
 +    } catch (StorageGroupNotSetException e) {
 +      // synchronize with the leader to see if this node has unpulled storage groups
 +      syncLeaderWithConsistencyCheck(true);
 +      try {
 +        planGroupMap = router.splitAndRoutePlan(plan);
 +      } catch (MetadataException | UnknownLogTypeException ex) {
 +        // ignore
 +      }
 +    } catch (MetadataException | UnknownLogTypeException e) {
 +      logger.error("Cannot route plan {}", plan, e);
 +    }
 +    return planGroupMap;
 +  }
 +
 +  /**
 +   * Forward plans to the DataGroupMember of one node in the corresponding group. Only when all
 +   * nodes time out, will a TIME_OUT be returned.
 +   *
 +   * @param planGroupMap sub-plan -> belong data group pairs
 +   */
 +  public TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan plan) {
 +    // the error codes from the groups that cannot execute the plan
 +    TSStatus status;
 +    if (planGroupMap.size() == 1) {
 +      status = forwardToSingleGroup(planGroupMap.entrySet().iterator().next());
 +    } else {
 +      if (plan instanceof InsertTabletPlan || plan instanceof CreateMultiTimeSeriesPlan) {
 +        // InsertTabletPlan and CreateMultiTimeSeriesPlan contains many rows, each will correspond to a TSStatus as its
 +        // execution result, as the plan is split and the sub-plans may have interleaving ranges,
 +        // we must assure that each TSStatus is placed to the right position
 +        // e.g., an InsertTabletPlan contains 3 rows, row1 and row3 belong to NodeA and row2
 +        // belongs to NodeB, when NodeA returns a success while NodeB returns a failure, the
 +        // failure and success should be placed into proper positions in TSStatus.subStatus
 +        status = forwardMultiSubPlan(planGroupMap, plan);
 +      } else {
 +        status = forwardToMultipleGroup(planGroupMap);
 +      }
 +    }
 +    if (plan instanceof InsertPlan
 +        && status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
 +        && config.isEnableAutoCreateSchema()) {
 +      TSStatus tmpStatus = createTimeseriesForFailedInsertion(planGroupMap, ((InsertPlan) plan));
 +      if (tmpStatus != null) {
 +        status = tmpStatus;
 +      }
 +    }
-     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() && status
-         .isSetRedirectNode()) {
-       status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
-     }
 +    logger.debug("{}: executed {} with answer {}", name, plan, status);
 +    return status;
 +  }
 +
 +  private TSStatus createTimeseriesForFailedInsertion(
 +      Map<PhysicalPlan, PartitionGroup> planGroupMap, InsertPlan plan) {
 +    // try to create timeseries
 +    if (plan.getFailedMeasurements() != null) {
 +      plan.getPlanFromFailed();
 +    }
 +    boolean hasCreate;
 +    try {
 +      hasCreate = ((CMManager) IoTDB.metaManager).createTimeseries(plan);
 +    } catch (IllegalPathException | CheckConsistencyException e) {
 +      return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, e.getMessage());
 +    }
 +    if (hasCreate) {
 +      return forwardPlan(planGroupMap, plan);
 +    } else {
 +      logger.error("{}, Cannot auto create timeseries.", thisNode);
 +    }
 +    return null;
 +  }
 +
 +  /**
 +   * Forward each sub-plan to its belonging data group, and combine responses from the groups.
 +   *
 +   * @param planGroupMap sub-plan -> data group pairs
 +   */
 +  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
 +  private TSStatus forwardMultiSubPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap,
 +      PhysicalPlan parentPlan) {
 +    List<String> errorCodePartitionGroups = new ArrayList<>();
 +    TSStatus tmpStatus;
 +    TSStatus[] subStatus = null;
 +    boolean noFailure = true;
 +    boolean isBatchFailure = false;
 +    EndPoint endPoint = null;
 +    int totalRowNum = 0;
 +    // send sub-plans to each belonging data group and collect results
 +    for (Map.Entry<PhysicalPlan, PartitionGroup> entry : planGroupMap.entrySet()) {
 +      tmpStatus = forwardToSingleGroup(entry);
 +      logger.debug("{}: from {},{},{}", name, entry.getKey(), entry.getValue(), tmpStatus);
 +      noFailure =
 +          (tmpStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) && noFailure;
 +      isBatchFailure = (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode())
 +          || isBatchFailure;
 +      if (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
 +        if (parentPlan instanceof InsertTabletPlan) {
 +          totalRowNum = ((InsertTabletPlan) parentPlan).getRowCount();
 +        } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
 +          totalRowNum = ((CreateMultiTimeSeriesPlan) parentPlan).getIndexes().size();
 +        }
 +        if (subStatus == null) {
 +          subStatus = new TSStatus[totalRowNum];
 +          Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
 +        }
 +        // set the status from one group to the proper positions of the overall status
 +        if (parentPlan instanceof InsertTabletPlan) {
 +          PartitionUtils.reordering((InsertTabletPlan) entry.getKey(), subStatus,
 +              tmpStatus.subStatus.toArray(new TSStatus[]{}));
 +        } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
 +          CreateMultiTimeSeriesPlan subPlan = (CreateMultiTimeSeriesPlan) entry.getKey();
 +          for (int i = 0; i < subPlan.getIndexes().size(); i++) {
 +            subStatus[subPlan.getIndexes().get(i)] = tmpStatus.subStatus.get(i);
 +          }
 +        }
 +      }
 +      if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
 +        // execution failed, record the error message
 +        errorCodePartitionGroups.add(String.format("[%s@%s:%s:%s]",
 +            tmpStatus.getCode(), entry.getValue().getHeader(),
 +            tmpStatus.getMessage(), tmpStatus.subStatus));
 +      }
 +      if (parentPlan instanceof InsertTabletPlan && tmpStatus.isSetRedirectNode() &&
 +          ((InsertTabletPlan) entry.getKey()).getMaxTime() == ((InsertTabletPlan) parentPlan)
 +              .getMaxTime()) {
 +        endPoint = tmpStatus.getRedirectNode();
 +      }
 +    }
 +
 +    if (parentPlan instanceof CreateMultiTimeSeriesPlan &&
 +        !((CreateMultiTimeSeriesPlan) parentPlan).getResults().isEmpty()) {
 +      if (subStatus == null) {
 +        subStatus = new TSStatus[totalRowNum];
 +        Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
 +      }
 +      noFailure = false;
 +      isBatchFailure = true;
 +      for (Entry<Integer, TSStatus> integerTSStatusEntry : ((CreateMultiTimeSeriesPlan) parentPlan)
 +          .getResults().entrySet()) {
 +        subStatus[integerTSStatusEntry.getKey()] = integerTSStatusEntry.getValue();
 +      }
 +    }
 +    return concludeFinalStatus(noFailure, endPoint, isBatchFailure, subStatus,
 +        errorCodePartitionGroups);
 +  }
 +
 +  private TSStatus concludeFinalStatus(boolean noFailure, EndPoint endPoint,
 +      boolean isBatchFailure, TSStatus[] subStatus, List<String> errorCodePartitionGroups) {
 +    TSStatus status;
 +    if (noFailure) {
 +      status = StatusUtils.OK;
 +      if (endPoint != null) {
 +        status = StatusUtils.getStatus(status, endPoint);
 +      }
 +    } else if (isBatchFailure) {
 +      status = RpcUtils.getStatus(Arrays.asList(subStatus));
 +    } else {
 +      status = StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR,
 +          MSG_MULTIPLE_ERROR + errorCodePartitionGroups.toString());
 +    }
 +    return status;
 +  }
 +
 +  private TSStatus forwardToSingleGroup(Map.Entry<PhysicalPlan, PartitionGroup> entry) {
 +    TSStatus result;
 +    if (entry.getValue().contains(thisNode)) {
 +      // the query should be handled by a group the local node is in, handle it with in the group
 +      long startTime = Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP
 +          .getOperationStartTime();
 +      logger.debug("Execute {} in a local group of {}", entry.getKey(),
 +          entry.getValue().getHeader());
 +      result = getLocalDataMember(entry.getValue().getHeader(), entry.getValue().getId())
 +          .executeNonQueryPlan(entry.getKey());
 +      Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP
 +          .calOperationCostTimeFromStart(startTime);
 +    } else {
 +      // forward the query to the group that should handle it
 +      long startTime = Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_REMOTE_GROUP
 +          .getOperationStartTime();
 +      logger.debug("Forward {} to a remote group of {}", entry.getKey(),
 +          entry.getValue().getHeader());
 +      result = forwardPlan(entry.getKey(), entry.getValue());
 +      Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_REMOTE_GROUP
 +          .calOperationCostTimeFromStart(startTime);
 +    }
 +    return result;
 +  }
 +
 +  /**
 +   * forward each sub-plan to its corresponding data group, if some groups goes wrong, the error
 +   * messages from each group will be compacted into one string.
 +   *
 +   * @param planGroupMap sub-plan -> data group pairs
 +   */
 +  private TSStatus forwardToMultipleGroup(Map<PhysicalPlan, PartitionGroup> planGroupMap) {
 +    List<String> errorCodePartitionGroups = new ArrayList<>();
 +    TSStatus tmpStatus;
 +    boolean allRedirect = true;
 +    EndPoint endPoint = null;
 +    for (Map.Entry<PhysicalPlan, PartitionGroup> entry : planGroupMap.entrySet()) {
 +      tmpStatus = forwardToSingleGroup(entry);
 +      if (tmpStatus.isSetRedirectNode()) {
 +        endPoint = tmpStatus.getRedirectNode();
 +      } else {
 +        allRedirect = false;
 +      }
 +      if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
 +        // execution failed, record the error message
 +        errorCodePartitionGroups.add(String.format("[%s@%s:%s]",
 +            tmpStatus.getCode(), entry.getValue().getHeader(),
 +            tmpStatus.getMessage()));
 +      }
 +    }
 +    TSStatus status;
 +    if (errorCodePartitionGroups.isEmpty()) {
-       status = StatusUtils.OK;
 +      if (allRedirect) {
-         status = StatusUtils.getStatus(status, endPoint);
++        status = new TSStatus();
++        status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
++      } else {
++        status = StatusUtils.OK;
 +      }
 +    } else {
 +      status = StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR,
 +          MSG_MULTIPLE_ERROR + errorCodePartitionGroups.toString());
 +    }
 +    return status;
 +  }
 +
 +  /**
 +   * Forward a plan to all DataGroupMember groups. Only when all nodes time out, will a TIME_OUT be
 +   * returned. The error messages from each group (if any) will be compacted into one string.
 +   *
 +   * @para plan
 +   */
 +  private TSStatus forwardPlan(List<PartitionGroup> partitionGroups, PhysicalPlan plan) {
 +    // the error codes from the groups that cannot execute the plan
 +    TSStatus status;
 +    List<String> errorCodePartitionGroups = new ArrayList<>();
 +    for (PartitionGroup partitionGroup : partitionGroups) {
 +      if (partitionGroup.contains(thisNode)) {
 +        // the query should be handled by a group the local node is in, handle it with in the group
 +        logger.debug("Execute {} in a local group of {}", plan, partitionGroup.getHeader());
 +        status = getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId())
 +            .executeNonQueryPlan(plan);
 +      } else {
 +        // forward the query to the group that should handle it
 +        logger.debug("Forward {} to a remote group of {}", plan,
 +            partitionGroup.getHeader());
 +        status = forwardPlan(plan, partitionGroup);
 +      }
 +      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && (
 +          !(plan instanceof DeleteTimeSeriesPlan) ||
 +              status.getCode() != TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode())) {
 +        // execution failed, record the error message
 +        errorCodePartitionGroups.add(String.format("[%s@%s:%s]",
 +            status.getCode(), partitionGroup.getHeader(),
 +            status.getMessage()));
 +      }
 +    }
 +    if (errorCodePartitionGroups.isEmpty()) {
 +      status = StatusUtils.OK;
 +    } else {
 +      status = StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR,
 +          MSG_MULTIPLE_ERROR + errorCodePartitionGroups.toString());
 +    }
 +    logger.debug("{}: executed {} with answer {}", name, plan, status);
 +    return status;
 +  }
 +
 +  /**
 +   * Forward a plan to the DataGroupMember of one node in the group. Only when all nodes time out,
 +   * will a TIME_OUT be returned.
 +   */
 +  private TSStatus forwardPlan(PhysicalPlan plan, PartitionGroup group) {
 +    for (Node node : group) {
 +      TSStatus status;
 +      try {
 +        // only data plans are partitioned, so it must be processed by its data server instead of
 +        // meta server
 +        if (config.isUseAsyncServer()) {
 +          status = forwardDataPlanAsync(plan, node, group.getHeader());
 +        } else {
 +          status = forwardDataPlanSync(plan, node, group.getHeader());
 +        }
 +      } catch (IOException e) {
 +        status = StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, e.getMessage());
 +      }
 +      if (!StatusUtils.TIME_OUT.equals(status)) {
 +        if (!status.isSetRedirectNode()) {
 +          status.setRedirectNode(new EndPoint(node.getIp(), node.getClientPort()));
 +        }
 +        return status;
 +      } else {
 +        logger.warn("Forward {} to {} timed out", plan, node);
 +      }
 +    }
 +    logger.warn("Forward {} to {} timed out", plan, group);
 +    return StatusUtils.TIME_OUT;
 +  }
 +
 +  /**
 +   * Forward a non-query plan to the data port of "receiver"
 +   *
 +   * @param plan   a non-query plan
 +   * @param header to determine which DataGroupMember of "receiver" will process the request.
 +   * @return a TSStatus indicating if the forwarding is successful.
 +   */
 +  private TSStatus forwardDataPlanAsync(PhysicalPlan plan, Node receiver, Node header)
 +      throws IOException {
 +    RaftService.AsyncClient client = getClientProvider().getAsyncDataClient(receiver,
 +        RaftServer.getWriteOperationTimeoutMS());
 +    return forwardPlanAsync(plan, receiver, header, client);
 +  }
 +
 +  private TSStatus forwardDataPlanSync(PhysicalPlan plan, Node receiver, Node header)
 +      throws IOException {
 +    Client client = null;
 +    try {
 +      client = getClientProvider().getSyncDataClient(receiver,
 +          RaftServer.getWriteOperationTimeoutMS());
 +    } catch (TException e) {
 +      throw new IOException(e);
 +    }
 +    return forwardPlanSync(plan, receiver, header, client);
 +  }
 +
 +  /**
++=======
++>>>>>>> master
     * Get the data groups that should be queried when querying "path" with "filter". First, the time
     * interval qualified by the filter will be extracted. If any side of the interval is open, query
     * all groups. Otherwise compute all involved groups w.r.t. the time partitioning.
@@@ -2096,7 -1764,7 +2108,11 @@@
      this.dataClientProvider = dataClientProvider;
    }
  
 +  public void setRouter(ClusterPlanRouter router) {
 +    this.router = router;
 +  }
++
+   public void handleHandshake(Node sender) {
+     NodeStatusManager.getINSTANCE().activate(sender);
+   }
  }
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 121892b,fc83ba4..d05ddd9
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@@ -85,6 -84,6 +85,7 @@@ import org.apache.iotdb.cluster.utils.C
  import org.apache.iotdb.cluster.utils.IOUtils;
  import org.apache.iotdb.cluster.utils.PlanSerializer;
  import org.apache.iotdb.cluster.utils.StatusUtils;
++import org.apache.iotdb.cluster.utils.nodetool.function.Status;
  import org.apache.iotdb.db.exception.BatchProcessException;
  import org.apache.iotdb.db.exception.IoTDBException;
  import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@@ -932,6 -876,6 +907,7 @@@ public abstract class RaftMember 
  
      try {
        if (appendLogInGroup(log)) {
++        TSStatus res = StatusUtils.OK;
          return StatusUtils.OK;
        }
      } catch (LogExecutionException e) {
@@@ -1221,9 -1165,9 +1197,9 @@@
      return forwardPlanAsync(plan, receiver, header, client);
    }
  
-   TSStatus forwardPlanAsync(PhysicalPlan plan, Node receiver, Node header, AsyncClient client) {
+   public TSStatus forwardPlanAsync(PhysicalPlan plan, Node receiver, Node header, AsyncClient client) {
      try {
 -      TSStatus tsStatus = SyncClientAdaptor.executeNonQuery(client, plan, header, receiver);
 +      TSStatus tsStatus = SyncClientAdaptor.executeNonQuery(client, plan, header, receiver, getRaftGroupId());
        if (tsStatus == null) {
          tsStatus = StatusUtils.TIME_OUT;
          logger.warn(MSG_FORWARD_TIMEOUT, name, plan, receiver);
@@@ -1249,10 -1193,9 +1225,10 @@@
      return forwardPlanSync(plan, receiver, header, client);
    }
  
-   TSStatus forwardPlanSync(PhysicalPlan plan, Node receiver, Node header, Client client) {
+   public TSStatus forwardPlanSync(PhysicalPlan plan, Node receiver, Node header, Client client) {
      try {
        ExecutNonQueryReq req = new ExecutNonQueryReq();
 +      req.setRaftId(getRaftGroupId());
        req.setPlanBytes(PlanSerializer.getInstance().serialize(plan));
        if (header != null) {
          req.setHeader(header);
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
index 3b2df98,114aa5a..a555e17
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
@@@ -197,10 -195,14 +197,16 @@@ public class MetaAsyncService extends B
     * @param resultHandler
     */
    @Override
 -  public void exile(AsyncMethodCallback<Void> resultHandler) {
 -    metaGroupMember.applyRemoveNode(metaGroupMember.getThisNode());
 +  public void exile(ByteBuffer removeNodeLogBuffer, AsyncMethodCallback<Void> resultHandler) {
 +    RemoveNodeLog removeNodeLog = new RemoveNodeLog();
 +    removeNodeLog.deserialize(removeNodeLogBuffer);
 +    metaGroupMember.applyRemoveNode(removeNodeLog);
      resultHandler.onComplete(null);
    }
+ 
+   @Override
+   public void handshake(Node sender, AsyncMethodCallback<Void> resultHandler) {
+     metaGroupMember.handleHandshake(sender);
+     resultHandler.onComplete(null);
+   }
  }
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
index 48c0e58,ec1519a..d88f4f5
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
@@@ -190,9 -188,12 +190,14 @@@ public class MetaSyncService extends Ba
     * must tell it directly.
     */
    @Override
 -  public void exile() {
 -    metaGroupMember.applyRemoveNode(metaGroupMember.getThisNode());
 +  public void exile(ByteBuffer removeNodeLogBuffer) {
 +    RemoveNodeLog removeNodeLog = new RemoveNodeLog();
 +    removeNodeLog.deserialize(removeNodeLogBuffer);
 +    metaGroupMember.applyRemoveNode(removeNodeLog);
    }
+ 
+   @Override
+   public void handshake(Node sender) {
+     metaGroupMember.handleHandshake(sender);
+   }
  }
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
index a0e5f56,28922ad..890b402
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
@@@ -28,9 -27,8 +28,9 @@@ import org.apache.iotdb.cluster.partiti
  import org.apache.iotdb.cluster.partition.PartitionTable;
  import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
  import org.apache.iotdb.cluster.rpc.thrift.Node;
 +import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
  import org.apache.iotdb.cluster.server.MetaClusterServer;
- import org.apache.iotdb.cluster.server.Timer;
+ import org.apache.iotdb.cluster.server.monitor.Timer;
  import org.apache.iotdb.cluster.server.member.MetaGroupMember;
  import org.apache.iotdb.db.conf.IoTDBConstant;
  import org.apache.iotdb.db.exception.StartupException;
diff --cc cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index fbffb7f,6cc5418..65bb031
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@@ -988,70 -971,74 +993,74 @@@ public class DataGroupMemberTest extend
      request.setTimeFilterBytes(SerializeUtils.serializeFilter(timeFilter));
      QueryContext queryContext =
          new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true, 1024, -1));
-     request.setQueryId(queryContext.getQueryId());
-     request.setRequestor(TestUtils.getNode(0));
-     request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal());
-     request.setDeviceMeasurements(Collections.singleton(TestUtils.getTestMeasurement(0)));
-     request.setAscending(true);
- 
-     DataGroupMember dataGroupMember;
-     AtomicReference<Long> resultRef;
-     GenericHandler<Long> handler;
-     Long executorId;
-     AtomicReference<List<ByteBuffer>> aggrResultRef;
-     GenericHandler<List<ByteBuffer>> aggrResultHandler;
-     List<ByteBuffer> byteBuffers;
-     List<AggregateResult> aggregateResults;
-     Object[] answers;
-     // get an executor from a node holding this timeseries
-     request.setHeader(TestUtils.getNode(10));
-     dataGroupMember = getDataGroupMember(TestUtils.getNode(10));
      try {
-       resultRef = new AtomicReference<>();
-       handler = new GenericHandler<>(TestUtils.getNode(0), resultRef);
-       new DataAsyncService(dataGroupMember).getGroupByExecutor(request, handler);
-       executorId = resultRef.get();
-       assertEquals(1L, (long) executorId);
- 
-       // fetch result
-       aggrResultRef = new AtomicReference<>();
-       aggrResultHandler = new GenericHandler<>(TestUtils.getNode(0), aggrResultRef);
-       new DataAsyncService(dataGroupMember)
-           .getGroupByResult(TestUtils.getNode(10), raftId, executorId, 0, 20, aggrResultHandler);
- 
-       byteBuffers = aggrResultRef.get();
-       assertNotNull(byteBuffers);
-       aggregateResults = new ArrayList<>();
-       for (ByteBuffer byteBuffer : byteBuffers) {
-         aggregateResults.add(AggregateResult.deserializeFrom(byteBuffer));
+       request.setQueryId(queryContext.getQueryId());
+       request.setRequestor(TestUtils.getNode(0));
+       request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal());
+       request.setDeviceMeasurements(Collections.singleton(TestUtils.getTestMeasurement(0)));
+       request.setAscending(true);
+ 
+       DataGroupMember dataGroupMember;
+       AtomicReference<Long> resultRef;
+       GenericHandler<Long> handler;
+       Long executorId;
+       AtomicReference<List<ByteBuffer>> aggrResultRef;
+       GenericHandler<List<ByteBuffer>> aggrResultHandler;
+       List<ByteBuffer> byteBuffers;
+       List<AggregateResult> aggregateResults;
+       Object[] answers;
+       // get an executor from a node holding this timeseries
+       request.setHeader(TestUtils.getNode(10));
+       dataGroupMember = getDataGroupMember(TestUtils.getNode(10));
+       try {
+         resultRef = new AtomicReference<>();
+         handler = new GenericHandler<>(TestUtils.getNode(0), resultRef);
+         new DataAsyncService(dataGroupMember).getGroupByExecutor(request, handler);
+         executorId = resultRef.get();
+         assertEquals(1L, (long) executorId);
+ 
+         // fetch result
+         aggrResultRef = new AtomicReference<>();
+         aggrResultHandler = new GenericHandler<>(TestUtils.getNode(0), aggrResultRef);
+         new DataAsyncService(dataGroupMember)
 -            .getGroupByResult(TestUtils.getNode(10), executorId, 0, 20, aggrResultHandler);
++            .getGroupByResult(TestUtils.getNode(10), raftId, executorId, 0, 20, aggrResultHandler);
+ 
+         byteBuffers = aggrResultRef.get();
+         assertNotNull(byteBuffers);
+         aggregateResults = new ArrayList<>();
+         for (ByteBuffer byteBuffer : byteBuffers) {
+           aggregateResults.add(AggregateResult.deserializeFrom(byteBuffer));
+         }
+         answers = new Object[]{15.0, 12.0, 180.0, 5.0, 19.0, 19.0, 5.0, 19.0, 5.0};
+         checkAggregates(answers, aggregateResults);
+       } finally {
+         dataGroupMember.closeLogManager();
        }
-       answers = new Object[]{15.0, 12.0, 180.0, 5.0, 19.0, 19.0, 5.0, 19.0, 5.0};
-       checkAggregates(answers, aggregateResults);
-     } finally {
-       dataGroupMember.closeLogManager();
-     }
  
-     // get an executor from a node not holding this timeseries
-     request.setHeader(TestUtils.getNode(30));
-     dataGroupMember = getDataGroupMember(TestUtils.getNode(30));
-     try {
-       resultRef = new AtomicReference<>();
-       handler = new GenericHandler<>(TestUtils.getNode(0), resultRef);
-       request.timeFilterBytes.position(0);
-       new DataAsyncService(dataGroupMember).getGroupByExecutor(request, handler);
-       executorId = resultRef.get();
-       assertEquals(-1L, (long) executorId);
- 
-       // fetch result
-       aggrResultRef = new AtomicReference<>();
-       aggrResultHandler = new GenericHandler<>(TestUtils.getNode(0), aggrResultRef);
-       new DataAsyncService(dataGroupMember)
-           .getGroupByResult(TestUtils.getNode(30), raftId, executorId, 0, 20, aggrResultHandler);
- 
-       byteBuffers = aggrResultRef.get();
-       assertNull(byteBuffers);
+       // get an executor from a node not holding this timeseries
+       request.setHeader(TestUtils.getNode(30));
+       dataGroupMember = getDataGroupMember(TestUtils.getNode(30));
+       try {
+         resultRef = new AtomicReference<>();
+         handler = new GenericHandler<>(TestUtils.getNode(0), resultRef);
+         request.timeFilterBytes.position(0);
+         new DataAsyncService(dataGroupMember).getGroupByExecutor(request, handler);
+         executorId = resultRef.get();
+         assertEquals(-1L, (long) executorId);
+ 
+         // fetch result
+         aggrResultRef = new AtomicReference<>();
+         aggrResultHandler = new GenericHandler<>(TestUtils.getNode(0), aggrResultRef);
+         new DataAsyncService(dataGroupMember)
 -            .getGroupByResult(TestUtils.getNode(30), executorId, 0, 20, aggrResultHandler);
++            .getGroupByResult(TestUtils.getNode(30), raftId, executorId, 0, 20, aggrResultHandler);
+ 
+         byteBuffers = aggrResultRef.get();
+         assertNull(byteBuffers);
+       } finally {
+         dataGroupMember.closeLogManager();
+       }
      } finally {
-       dataGroupMember.closeLogManager();
+       QueryResourceManager.getInstance().endQuery(queryContext.getQueryId());
      }
    }
  
diff --cc cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
index 8c1beef,e2a5767..d5a6515
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
@@@ -100,8 -100,11 +105,10 @@@ public class MemberTest 
      prevUseAsyncServer = ClusterDescriptor.getInstance().getConfig().isUseAsyncServer();
      preLogBufferSize = ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize();
      ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(true);
 -    ClusterDescriptor.getInstance().getConfig().setRaftLogBufferSize(4096);
      testThreadPool = Executors.newFixedThreadPool(4);
      prevLeaderWait = RaftMember.getWaitLeaderTimeMs();
+     prevEnableWAL = IoTDBDescriptor.getInstance().getConfig().isEnableWal();
+     IoTDBDescriptor.getInstance().getConfig().setEnableWal(false);
      RaftMember.setWaitLeaderTimeMs(10);
  
      allNodes = new PartitionGroup();
diff --cc cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 94f5067,fe0adfc..12fe37e
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@@ -73,10 -67,8 +74,9 @@@ import org.apache.iotdb.cluster.metadat
  import org.apache.iotdb.cluster.partition.PartitionGroup;
  import org.apache.iotdb.cluster.partition.PartitionTable;
  import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 +import org.apache.iotdb.cluster.query.ClusterPlanRouter;
  import org.apache.iotdb.cluster.query.LocalQueryExecutor;
  import org.apache.iotdb.cluster.query.RemoteQueryContext;
- import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
  import org.apache.iotdb.cluster.query.reader.ClusterReaderFactory;
  import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
  import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
@@@ -173,6 -163,6 +174,13 @@@ public class MetaGroupMemberTest extend
      RaftServer.setReadOperationTimeoutMS(1000);
  
      super.setUp();
++    partitionTable = new SlotPartitionTable(allNodes, TestUtils.getNode(0)) {
++      @Override
++      public RaftNode routeToHeaderByTime(String storageGroupName, long timestamp) {
++        return new RaftNode(TestUtils.getNode(0), 0);
++      }
++    };
++    testMetaMember.setPartitionTable(partitionTable);
      dummyResponse.set(Response.RESPONSE_AGREE);
      testMetaMember.setAllNodes(allNodes);
  
@@@ -186,14 -176,8 +194,14 @@@
  
      buildDataGroups(dataClusterServer);
      testMetaMember.getThisNode().setNodeIdentifier(0);
 +    testMetaMember.setRouter(new ClusterPlanRouter(testMetaMember.getPartitionTable()){
 +      @Override
 +      protected Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(LogPlan plan) {
 +        return Collections.singletonMap(plan, partitionTable.getHeaderGroup(testMetaMember.getThisNode()));
 +      }
 +    });
      mockDataClusterServer = false;
-     QueryCoordinator.getINSTANCE().setMetaGroupMember(testMetaMember);
+     NodeStatusManager.getINSTANCE().setMetaGroupMember(testMetaMember);
      exiledNode = null;
      System.out.println("Init term of metaGroupMember: " + testMetaMember.getTerm().get());
    }
@@@ -307,9 -286,8 +315,9 @@@
      return resp;
    }
  
 +  @Override
    protected MetaGroupMember getMetaGroupMember(Node node) throws QueryProcessException {
-     MetaGroupMember metaGroupMember = new MetaGroupMember(new Factory(), node) {
+     MetaGroupMember metaGroupMember = new MetaGroupMember(new Factory(), node, new Coordinator()) {
  
        @Override
        public DataClusterServer getDataClusterServer() {
@@@ -893,7 -914,7 +949,8 @@@
    public void testProcessValidHeartbeatReq() throws QueryProcessException {
      System.out.println("Start testProcessValidHeartbeatReq()");
      MetaGroupMember testMetaMember = getMetaGroupMember(TestUtils.getNode(10));
 +    partitionTable = new SlotPartitionTable(allNodes, TestUtils.getNode(0));
+     testMetaMember.setCoordinator(new Coordinator());
      try {
        HeartBeatRequest request = new HeartBeatRequest();
        request.setRequireIdentifier(true);
diff --cc server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index b03f984,30d5d52..c9d130e
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@@ -500,10 -510,12 +510,10 @@@ public class StorageEngine implements I
     * @throws StorageGroupNotSetException
     */
    public void closeStorageGroupProcessor(PartialPath storageGroupPath, long partitionId,
 -      boolean isSeq,
 -      boolean isSync)
 -      throws StorageGroupNotSetException {
 +      boolean isSeq, boolean isSync) throws StorageGroupNotSetException {
      StorageGroupProcessor processor = processorMap.get(storageGroupPath);
      if (processor == null) {
-       throw new StorageGroupNotSetException(storageGroupPath.getFullPath());
+       throw new StorageGroupNotSetException(storageGroupPath.getFullPath(), true);
      }
  
      logger.info("async closing sg processor is called for closing {}, seq = {}, partitionId = {}",
diff --cc server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
index aa31a4e,e733813..8928110
--- a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
@@@ -114,115 -115,65 +115,66 @@@ public class MLogWriter implements Auto
      }
    }
  
-   public synchronized void createTimeseries(CreateTimeSeriesPlan createTimeSeriesPlan) throws IOException {
-     try {
-       putLog(createTimeSeriesPlan);
-     } catch (BufferOverflowException e) {
-       throw new IOException(
-         LOG_TOO_LARGE_INFO, e);
-     }
+   public void createTimeseries(CreateTimeSeriesPlan createTimeSeriesPlan) throws IOException {
+     putLog(createTimeSeriesPlan);
    }
  
-   public synchronized void deleteTimeseries(DeleteTimeSeriesPlan deleteTimeSeriesPlan) throws IOException {
-     try {
-       putLog(deleteTimeSeriesPlan);
-     } catch (BufferOverflowException e) {
-       throw new IOException(
-         LOG_TOO_LARGE_INFO, e);
-     }
+   public void deleteTimeseries(DeleteTimeSeriesPlan deleteTimeSeriesPlan) throws IOException {
+     putLog(deleteTimeSeriesPlan);
    }
  
-   public synchronized void setStorageGroup(PartialPath storageGroup) throws IOException {
-     try {
-       SetStorageGroupPlan plan = new SetStorageGroupPlan(storageGroup);
-       putLog(plan);
-     } catch (BufferOverflowException e) {
-       throw new IOException(
-         LOG_TOO_LARGE_INFO, e);
-     }
+   public void setStorageGroup(PartialPath storageGroup) throws IOException {
+     SetStorageGroupPlan plan = new SetStorageGroupPlan(storageGroup);
+     putLog(plan);
    }
  
-   public synchronized void deleteStorageGroup(PartialPath storageGroup) throws IOException {
-     try {
-       DeleteStorageGroupPlan plan = new DeleteStorageGroupPlan(Collections.singletonList(storageGroup));
-       putLog(plan);
-     } catch (BufferOverflowException e) {
-       throw new IOException(
-         LOG_TOO_LARGE_INFO, e);
-     }
+   public void deleteStorageGroup(PartialPath storageGroup) throws IOException {
+     DeleteStorageGroupPlan plan = new DeleteStorageGroupPlan(Collections.singletonList(storageGroup));
+     putLog(plan);
    }
  
-   public synchronized void setTTL(PartialPath storageGroup, long ttl) throws IOException {
-     try {
-       SetTTLPlan plan = new SetTTLPlan(storageGroup, ttl);
-       putLog(plan);
-     } catch (BufferOverflowException e) {
-       throw new IOException(
-         LOG_TOO_LARGE_INFO, e);
-     }
+   public void setTTL(PartialPath storageGroup, long ttl) throws IOException {
+     SetTTLPlan plan = new SetTTLPlan(storageGroup, ttl);
+     putLog(plan);
    }
  
-   public synchronized void changeOffset(PartialPath path, long offset) throws IOException {
-     try {
-       ChangeTagOffsetPlan plan = new ChangeTagOffsetPlan(path, offset);
-       putLog(plan);
-     } catch (BufferOverflowException e) {
-       throw new IOException(
-         LOG_TOO_LARGE_INFO, e);
-     }
+   public void changeOffset(PartialPath path, long offset) throws IOException {
+     ChangeTagOffsetPlan plan = new ChangeTagOffsetPlan(path, offset);
+     putLog(plan);
    }
  
-   public synchronized void changeAlias(PartialPath path, String alias) throws IOException {
-     try {
-       ChangeAliasPlan plan = new ChangeAliasPlan(path, alias);
-       putLog(plan);
-     } catch (BufferOverflowException e) {
-       throw new IOException(
-         LOG_TOO_LARGE_INFO, e);
-     }
+   public void changeAlias(PartialPath path, String alias) throws IOException {
+     ChangeAliasPlan plan = new ChangeAliasPlan(path, alias);
+     putLog(plan);
    }
  
-   public synchronized void serializeMNode(MNode node) throws IOException {
-     try {
-       int childSize = 0;
-       if (node.getChildren() != null) {
-         childSize = node.getChildren().size();
-       }
-       MNodePlan plan = new MNodePlan(node.getName(), childSize);
-       putLog(plan);
-     } catch (BufferOverflowException e) {
-       throw new IOException(
-         LOG_TOO_LARGE_INFO, e);
+   public void serializeMNode(MNode node) throws IOException {
+     int childSize = 0;
+     if (node.getChildren() != null) {
+       childSize = node.getChildren().size();
      }
+     MNodePlan plan = new MNodePlan(node.getName(), childSize);
+     putLog(plan);
    }
  
-   public synchronized void serializeMeasurementMNode(MeasurementMNode node) throws IOException {
-     try {
-       int childSize = 0;
-       if (node.getChildren() != null) {
-         childSize = node.getChildren().size();
-       }
-       MeasurementMNodePlan plan = new MeasurementMNodePlan(node.getName(), node.getAlias(),
-         node.getOffset(), childSize, node.getSchema());
-       putLog(plan);
-     } catch (BufferOverflowException e) {
-       throw new IOException(
-         LOG_TOO_LARGE_INFO, e);
++
+   public void serializeMeasurementMNode(MeasurementMNode node) throws IOException {
+     int childSize = 0;
+     if (node.getChildren() != null) {
+       childSize = node.getChildren().size();
      }
+     MeasurementMNodePlan plan = new MeasurementMNodePlan(node.getName(), node.getAlias(),
+       node.getOffset(), childSize, node.getSchema());
+     putLog(plan);
    }
  
-   public synchronized void serializeStorageGroupMNode(StorageGroupMNode node) throws IOException {
-     try {
-       int childSize = 0;
-       if (node.getChildren() != null) {
-         childSize = node.getChildren().size();
-       }
-       StorageGroupMNodePlan plan = new StorageGroupMNodePlan(node.getName(), node.getDataTTL(), childSize);
-       putLog(plan);
-     } catch (BufferOverflowException e) {
-       throw new IOException(
-         LOG_TOO_LARGE_INFO, e);
+   public void serializeStorageGroupMNode(StorageGroupMNode node) throws IOException {
+     int childSize = 0;
+     if (node.getChildren() != null) {
+       childSize = node.getChildren().size();
      }
+     StorageGroupMNodePlan plan = new StorageGroupMNodePlan(node.getName(), node.getDataTTL(), childSize);
+     putLog(plan);
    }
  
    @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
diff --cc server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index 0d64b30,e9bf655..dc553be
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@@ -366,7 -361,8 +366,8 @@@ public abstract class PhysicalPlan 
      REVOKE_USER_PRIVILEGE, GRANT_ROLE_PRIVILEGE, GRANT_USER_PRIVILEGE, GRANT_USER_ROLE, MODIFY_PASSWORD, DELETE_USER,
      DELETE_STORAGE_GROUP, SHOW_TIMESERIES, DELETE_TIMESERIES, LOAD_CONFIGURATION, CREATE_MULTI_TIMESERIES,
      ALTER_TIMESERIES, FLUSH, CREATE_INDEX, DROP_INDEX,
-     CHANGE_TAG_OFFSET, CHANGE_ALIAS, MNODE, MEASUREMENT_MNODE, STORAGE_GROUP_MNODE, CLUSTER_LOG
 -    CHANGE_TAG_OFFSET, CHANGE_ALIAS, MNODE, MEASUREMENT_MNODE, STORAGE_GROUP_MNODE,
++    CHANGE_TAG_OFFSET, CHANGE_ALIAS, MNODE, MEASUREMENT_MNODE, STORAGE_GROUP_MNODE, CLUSTER_LOG,
+     BATCH_INSERT_ONE_DEVICE
    }
  
    public long getIndex() {